From 077b139e80610cb0ce9e45408b2c1cec897bcb8e Mon Sep 17 00:00:00 2001 From: kdletters <61648117+kdletters@users.noreply.github.com> Date: Thu, 11 Jun 2026 13:52:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20SpacetimeDB=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=B1=A0=E8=AF=B7=E6=B1=82=E5=8F=96=E6=B6=88=E5=90=8E?= =?UTF-8?q?=E6=A7=BD=E4=BD=8D=E6=B3=84=E6=BC=8F=E5=AF=BC=E8=87=B4=E6=B1=A0?= =?UTF-8?q?=E8=80=97=E5=B0=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - spacetime-client:PooledConnectionLease 增加 Drop 兜底,请求 future 被取消时也复位槽位并归还连接与 permit - spacetime-client:槽位改为 AtomicBool 占用标记 + Mutex 连接存放,acquire 改为 CAS 抢占,删除可能永久空转的扫描循环 - spacetime-client:release_connection 与取消路径统一走租约 Drop 归还逻辑 - spacetime-client:新增 dropped_lease_releases_slot_and_permit 等单元测试复现并锁定该故障机制 - docs:新增 SpacetimeDB 连接池租约 Drop 兜底与取消安全文档记录根因、复现与验收 --- ...timeDB连接池租约Drop兜底与取消安全-2026-06-11.md | 40 ++++ server-rs/crates/spacetime-client/src/lib.rs | 198 +++++++++++++----- 2 files changed, 185 insertions(+), 53 deletions(-) create mode 100644 docs/【后端架构】SpacetimeDB连接池租约Drop兜底与取消安全-2026-06-11.md diff --git a/docs/【后端架构】SpacetimeDB连接池租约Drop兜底与取消安全-2026-06-11.md b/docs/【后端架构】SpacetimeDB连接池租约Drop兜底与取消安全-2026-06-11.md new file mode 100644 index 00000000..e72bec65 --- /dev/null +++ b/docs/【后端架构】SpacetimeDB连接池租约Drop兜底与取消安全-2026-06-11.md @@ -0,0 +1,40 @@ +# SpacetimeDB 连接池租约 Drop 兜底与取消安全 + +- 日期:2026-06-11 +- 关联故障:release 环境 api-server 周期性全量 `spacetime_stage="pool_acquire" elapsed_ms=45000` 超时,`/readyz` 503(`reason=spacetime_unhealthy, stage=pool_acquire`),重启后临时恢复。 +- 涉及代码:`server-rs/crates/spacetime-client/src/lib.rs` + +## 故障根因 + +修复前的连接池存在两个叠加缺陷: + +1. **租约没有 Drop 兜底**。`PooledConnectionLease` 只能通过显式 `release_connection` 归还。当 HTTP 请求方在等待 StDB 回包期间断开(前端超时、用户刷新、Nginx 截断),axum/hyper 会直接丢弃 handler future,租约被 Drop:permit 因 `OwnedSemaphorePermit` 自动归还,但槽位的 `in_use` 标记永远不会复位。 +2. **acquire 在槽位泄漏后永久空转**。后续请求拿到 permit 后进入 `loop { 扫描槽位; yield_now }`,找不到空闲槽位就无限自旋,且这段自旋不受 `procedure_timeout` 约束,自旋期间 permit 不归还。 + +叠加效果:StDB 一旦变慢(请求占用连接接近 45 秒),客户端取消请求的概率大增,每次取消泄漏一个槽位并连带吞掉一个 permit;泄漏数量达到 `pool_size`(release 为 8)后,所有业务请求与健康检查全部在 `pool_acquire` 阶段 45 秒超时,服务表现为"连不上 StDB",只有重启能恢复。 + +## 本地复现 + +不需要真实 SpacetimeDB,单元测试即可复现机制(位于 `spacetime-client` tests 模块): + +- 修复前:将一个槽位置为 `in_use=true` 后调用 `acquire_connection_with_timeout(200ms)`,acquire 在 5 秒守护窗口内不返回(永久自旋),测试红。 +- `dropped_lease_releases_slot_and_permit`:模拟"请求被取消、租约未经 release 直接 Drop",断言槽位与 permit 都被复位归还。 +- `acquire_times_out_at_pool_acquire_when_pool_is_busy`:池内 permit 全部被占用时,acquire 必须在超时窗口内返回 `PoolAcquire + Timeout`,不允许无限等待。 + +## 修复方案 + +1. `PooledConnectionSlot` 改为 `in_use: AtomicBool + connection: Mutex>`,槽位占用标记不再依赖异步锁。 +2. `PooledConnectionLease` 持有 `Arc` 并实现 `Drop`:无论显式归还还是 future 被取消,统一在 Drop 中复位槽位、按 broken 状态决定连接是否回池,permit 随后自动归还。Drop 体先复位 `in_use` 再释放 permit(字段在 Drop 体之后析构),保证新请求拿到 permit 时必有空闲槽位。 +3. acquire 改为 CAS 抢占槽位:持有 permit 即保证并发持有者不超过 `pool_size`,扫描一轮必然命中空闲槽位,彻底删除自旋循环;建连失败直接返回错误,槽位由租约 Drop 复位。 +4. `release_connection` 退化为 `drop(lease)`,显式与隐式归还共用同一条兜底路径。 + +## 验收 + +- `cargo test -p spacetime-client --manifest-path server-rs/Cargo.toml --lib`(35 通过,含上述新测试) +- `cargo test -p api-server --manifest-path server-rs/Cargo.toml readyz`(2 通过) +- `cargo check -p api-server --manifest-path server-rs/Cargo.toml` + +## 运维提示 + +- 此修复解决的是"取消导致的永久泄漏"。StDB 真慢时仍会出现成批 45 秒超时(连接被在途请求合法占用),那是容量/上游问题,应结合 `GENARRATIVE_SPACETIME_POOL_SIZE` 与 StDB 负载排查,不要再怀疑池泄漏。 +- 健康检查 `/readyz` 在池被在途请求占满时仍可能短暂 503(stage=pool_acquire),恢复后自动转好,无需重启。 diff --git a/server-rs/crates/spacetime-client/src/lib.rs b/server-rs/crates/spacetime-client/src/lib.rs index 6b43db21..626b34d3 100644 --- a/server-rs/crates/spacetime-client/src/lib.rs +++ b/server-rs/crates/spacetime-client/src/lib.rs @@ -348,7 +348,7 @@ type ProcedureResultSender = type ReducerResultSender = Arc>>>>; struct SpacetimeConnectionPool { - slots: Vec>, + slots: Vec, permits: Arc, } @@ -371,8 +371,10 @@ impl SpacetimeStageError { } struct PooledConnectionSlot { - connection: Option, - in_use: bool, + // 槽位占用标记独立成原子量:抢占/复位不依赖锁,租约 Drop 兜底可以同步完成。 + in_use: AtomicBool, + // in_use=true 的持有者独占本槽连接,正常情况下锁上不会有竞争。 + connection: tokio::sync::Mutex>, } struct PooledConnection { @@ -385,9 +387,28 @@ struct PooledConnection { struct PooledConnectionLease { slot_index: usize, connection: Option, + pool: Arc, _permit: OwnedSemaphorePermit, } +impl Drop for PooledConnectionLease { + // 租约 Drop 兜底:请求 future 被取消(如客户端断开导致 handler 被丢弃)时, + // 也必须归还连接并复位槽位,否则槽位会永久停留在 in_use 状态、连接池逐渐耗尽。 + fn drop(&mut self) { + let slot = &self.pool.slots[self.slot_index]; + if let Some(connection) = self.connection.take() { + if !connection.is_broken() { + if let Ok(mut slot_connection) = slot.connection.try_lock() { + *slot_connection = Some(connection); + } + // try_lock 理论上不会失败(in_use 持有者独占);万一失败只丢弃连接,不丢槽位。 + } + } + slot.in_use.store(false, Ordering::Release); + // _permit 随 Drop 自动归还信号量。 + } +} + impl SpacetimeClient { pub fn new(config: SpacetimeClientConfig) -> Self { let pool_size = config.pool_size.max(1) as usize; @@ -400,11 +421,9 @@ impl SpacetimeClient { ..config }; let slots = (0..pool_size) - .map(|_| { - tokio::sync::Mutex::new(PooledConnectionSlot { - connection: None, - in_use: false, - }) + .map(|_| PooledConnectionSlot { + in_use: AtomicBool::new(false), + connection: tokio::sync::Mutex::new(None), }) .collect::>(); let pool = Arc::new(SpacetimeConnectionPool { @@ -678,42 +697,49 @@ impl SpacetimeClient { ) })?; - loop { - for (slot_index, slot) in self.pool.slots.iter().enumerate() { - if let Ok(mut slot_guard) = slot.try_lock() { - if slot_guard.in_use { - continue; - } - let reusable_connection = slot_guard - .connection - .take() - .filter(|connection| !connection.is_broken()); - slot_guard.in_use = true; - drop(slot_guard); + // 持有 permit 即保证最多 pool_size 个并发持有者,必然能抢到一个空闲槽位; + // CAS 抢占后立即构造租约,后续任何失败/取消都由租约 Drop 兜底复位槽位。 + let slot_index = self + .pool + .slots + .iter() + .position(|slot| { + slot.in_use + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + }) + .ok_or_else(|| { + SpacetimeStageError::new( + SpacetimeClientStage::PoolAcquire, + SpacetimeClientError::Runtime( + "SpacetimeDB 连接池 permit 与槽位状态不一致".to_string(), + ), + ) + })?; - let connection = if let Some(connection) = reusable_connection { - connection - } else { - match self.build_pooled_connection(operation_timeout).await { - Ok(connection) => connection, - Err(error) => { - let mut slot_guard = self.pool.slots[slot_index].lock().await; - slot_guard.in_use = false; - return Err(error); - } - } - }; + let mut lease = PooledConnectionLease { + slot_index, + connection: None, + pool: self.pool.clone(), + _permit: permit, + }; - return Ok(PooledConnectionLease { - slot_index, - connection: Some(connection), - _permit: permit, - }); - } - } + let reusable_connection = self.pool.slots[slot_index] + .connection + .lock() + .await + .take() + .filter(|connection| !connection.is_broken()); - tokio::task::yield_now().await; - } + let connection = if let Some(connection) = reusable_connection { + connection + } else { + // 建连失败时直接返回错误,槽位与 permit 由 lease Drop 自动归还。 + self.build_pooled_connection(operation_timeout).await? + }; + + lease.connection = Some(connection); + Ok(lease) } async fn build_pooled_connection( @@ -911,18 +937,10 @@ impl SpacetimeClient { Ok(subscription) } - async fn release_connection(&self, mut lease: PooledConnectionLease) { - let mut slot_guard = self.pool.slots[lease.slot_index].lock().await; - slot_guard.in_use = false; - let Some(connection) = lease.connection.take() else { - slot_guard.connection = None; - return; - }; - if connection.is_broken() { - slot_guard.connection = None; - } else { - slot_guard.connection = Some(connection); - } + async fn release_connection(&self, lease: PooledConnectionLease) { + // 显式归还与“请求被取消”的隐式归还共用同一套租约 Drop 兜底逻辑, + // 保证任何路径下槽位与 permit 都会复位,连接池不会被慢慢泄漏占满。 + drop(lease); } // 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。 @@ -1127,4 +1145,78 @@ mod tests { SpacetimeClientError::Runtime(_) )); } + + fn test_client(pool_size: u32, procedure_timeout: Duration) -> SpacetimeClient { + SpacetimeClient::new(SpacetimeClientConfig { + // 指向本机不可达端口:测试只验证连接池行为,不需要真实 SpacetimeDB。 + server_url: "http://127.0.0.1:9".to_string(), + database: "pool-test".to_string(), + token: None, + pool_size, + procedure_timeout, + }) + } + + /// 复现线上故障机制:修复前请求 future 被取消时租约不会归还,槽位永久停留在 in_use, + /// 后续 acquire 拿着 permit 空转挂死。修复后租约 Drop 必须同时复位槽位与 permit。 + #[tokio::test] + async fn dropped_lease_releases_slot_and_permit() { + let client = test_client(1, Duration::from_millis(200)); + let permit = client + .pool + .permits + .clone() + .acquire_owned() + .await + .expect("permit should acquire"); + client.pool.slots[0].in_use.store(true, Ordering::SeqCst); + assert_eq!(client.pool.permits.available_permits(), 0); + + // 模拟请求被取消:租约未经过 release_connection 直接被 Drop。 + let lease = PooledConnectionLease { + slot_index: 0, + connection: None, + pool: client.pool.clone(), + _permit: permit, + }; + drop(lease); + + assert!( + !client.pool.slots[0].in_use.load(Ordering::SeqCst), + "租约 Drop 后槽位必须复位,否则连接池会被泄漏占满" + ); + assert_eq!( + client.pool.permits.available_permits(), + 1, + "租约 Drop 后 permit 必须归还" + ); + } + + /// 池内 permit 全部被占用(持续在途请求)时,acquire 必须在超时窗口内返回 + /// pool_acquire 超时,而不是无限等待。 + #[tokio::test] + async fn acquire_times_out_at_pool_acquire_when_pool_is_busy() { + let client = test_client(1, Duration::from_millis(200)); + let _held_permit = client + .pool + .permits + .clone() + .acquire_owned() + .await + .expect("permit should acquire"); + + let result = tokio::time::timeout( + Duration::from_secs(5), + client.acquire_connection_with_timeout(Duration::from_millis(200)), + ) + .await + .expect("acquire 必须在超时窗口内返回,而不是空转挂死"); + + let error = match result { + Ok(_) => panic!("池占满时应返回 pool_acquire 超时"), + Err(error) => error, + }; + assert_eq!(error.stage, SpacetimeClientStage::PoolAcquire); + assert!(matches!(error.error, SpacetimeClientError::Timeout)); + } }