修复 SpacetimeDB 连接池请求取消后槽位泄漏导致池耗尽
- spacetime-client:PooledConnectionLease 增加 Drop 兜底,请求 future 被取消时也复位槽位并归还连接与 permit - spacetime-client:槽位改为 AtomicBool 占用标记 + Mutex 连接存放,acquire 改为 CAS 抢占,删除可能永久空转的扫描循环 - spacetime-client:release_connection 与取消路径统一走租约 Drop 归还逻辑 - spacetime-client:新增 dropped_lease_releases_slot_and_permit 等单元测试复现并锁定该故障机制 - docs:新增 SpacetimeDB 连接池租约 Drop 兜底与取消安全文档记录根因、复现与验收
This commit is contained in:
@@ -348,7 +348,7 @@ type ProcedureResultSender<T> =
|
||||
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
|
||||
|
||||
struct SpacetimeConnectionPool {
|
||||
slots: Vec<tokio::sync::Mutex<PooledConnectionSlot>>,
|
||||
slots: Vec<PooledConnectionSlot>,
|
||||
permits: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
@@ -371,8 +371,10 @@ impl SpacetimeStageError {
|
||||
}
|
||||
|
||||
struct PooledConnectionSlot {
|
||||
connection: Option<PooledConnection>,
|
||||
in_use: bool,
|
||||
// 槽位占用标记独立成原子量:抢占/复位不依赖锁,租约 Drop 兜底可以同步完成。
|
||||
in_use: AtomicBool,
|
||||
// in_use=true 的持有者独占本槽连接,正常情况下锁上不会有竞争。
|
||||
connection: tokio::sync::Mutex<Option<PooledConnection>>,
|
||||
}
|
||||
|
||||
struct PooledConnection {
|
||||
@@ -385,9 +387,28 @@ struct PooledConnection {
|
||||
struct PooledConnectionLease {
|
||||
slot_index: usize,
|
||||
connection: Option<PooledConnection>,
|
||||
pool: Arc<SpacetimeConnectionPool>,
|
||||
_permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
impl Drop for PooledConnectionLease {
|
||||
// 租约 Drop 兜底:请求 future 被取消(如客户端断开导致 handler 被丢弃)时,
|
||||
// 也必须归还连接并复位槽位,否则槽位会永久停留在 in_use 状态、连接池逐渐耗尽。
|
||||
fn drop(&mut self) {
|
||||
let slot = &self.pool.slots[self.slot_index];
|
||||
if let Some(connection) = self.connection.take() {
|
||||
if !connection.is_broken() {
|
||||
if let Ok(mut slot_connection) = slot.connection.try_lock() {
|
||||
*slot_connection = Some(connection);
|
||||
}
|
||||
// try_lock 理论上不会失败(in_use 持有者独占);万一失败只丢弃连接,不丢槽位。
|
||||
}
|
||||
}
|
||||
slot.in_use.store(false, Ordering::Release);
|
||||
// _permit 随 Drop 自动归还信号量。
|
||||
}
|
||||
}
|
||||
|
||||
impl SpacetimeClient {
|
||||
pub fn new(config: SpacetimeClientConfig) -> Self {
|
||||
let pool_size = config.pool_size.max(1) as usize;
|
||||
@@ -400,11 +421,9 @@ impl SpacetimeClient {
|
||||
..config
|
||||
};
|
||||
let slots = (0..pool_size)
|
||||
.map(|_| {
|
||||
tokio::sync::Mutex::new(PooledConnectionSlot {
|
||||
connection: None,
|
||||
in_use: false,
|
||||
})
|
||||
.map(|_| PooledConnectionSlot {
|
||||
in_use: AtomicBool::new(false),
|
||||
connection: tokio::sync::Mutex::new(None),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let pool = Arc::new(SpacetimeConnectionPool {
|
||||
@@ -678,42 +697,49 @@ impl SpacetimeClient {
|
||||
)
|
||||
})?;
|
||||
|
||||
loop {
|
||||
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
|
||||
if let Ok(mut slot_guard) = slot.try_lock() {
|
||||
if slot_guard.in_use {
|
||||
continue;
|
||||
}
|
||||
let reusable_connection = slot_guard
|
||||
.connection
|
||||
.take()
|
||||
.filter(|connection| !connection.is_broken());
|
||||
slot_guard.in_use = true;
|
||||
drop(slot_guard);
|
||||
// 持有 permit 即保证最多 pool_size 个并发持有者,必然能抢到一个空闲槽位;
|
||||
// CAS 抢占后立即构造租约,后续任何失败/取消都由租约 Drop 兜底复位槽位。
|
||||
let slot_index = self
|
||||
.pool
|
||||
.slots
|
||||
.iter()
|
||||
.position(|slot| {
|
||||
slot.in_use
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::PoolAcquire,
|
||||
SpacetimeClientError::Runtime(
|
||||
"SpacetimeDB 连接池 permit 与槽位状态不一致".to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
|
||||
let connection = if let Some(connection) = reusable_connection {
|
||||
connection
|
||||
} else {
|
||||
match self.build_pooled_connection(operation_timeout).await {
|
||||
Ok(connection) => connection,
|
||||
Err(error) => {
|
||||
let mut slot_guard = self.pool.slots[slot_index].lock().await;
|
||||
slot_guard.in_use = false;
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut lease = PooledConnectionLease {
|
||||
slot_index,
|
||||
connection: None,
|
||||
pool: self.pool.clone(),
|
||||
_permit: permit,
|
||||
};
|
||||
|
||||
return Ok(PooledConnectionLease {
|
||||
slot_index,
|
||||
connection: Some(connection),
|
||||
_permit: permit,
|
||||
});
|
||||
}
|
||||
}
|
||||
let reusable_connection = self.pool.slots[slot_index]
|
||||
.connection
|
||||
.lock()
|
||||
.await
|
||||
.take()
|
||||
.filter(|connection| !connection.is_broken());
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
let connection = if let Some(connection) = reusable_connection {
|
||||
connection
|
||||
} else {
|
||||
// 建连失败时直接返回错误,槽位与 permit 由 lease Drop 自动归还。
|
||||
self.build_pooled_connection(operation_timeout).await?
|
||||
};
|
||||
|
||||
lease.connection = Some(connection);
|
||||
Ok(lease)
|
||||
}
|
||||
|
||||
async fn build_pooled_connection(
|
||||
@@ -911,18 +937,10 @@ impl SpacetimeClient {
|
||||
Ok(subscription)
|
||||
}
|
||||
|
||||
async fn release_connection(&self, mut lease: PooledConnectionLease) {
|
||||
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
|
||||
slot_guard.in_use = false;
|
||||
let Some(connection) = lease.connection.take() else {
|
||||
slot_guard.connection = None;
|
||||
return;
|
||||
};
|
||||
if connection.is_broken() {
|
||||
slot_guard.connection = None;
|
||||
} else {
|
||||
slot_guard.connection = Some(connection);
|
||||
}
|
||||
async fn release_connection(&self, lease: PooledConnectionLease) {
|
||||
// 显式归还与“请求被取消”的隐式归还共用同一套租约 Drop 兜底逻辑,
|
||||
// 保证任何路径下槽位与 permit 都会复位,连接池不会被慢慢泄漏占满。
|
||||
drop(lease);
|
||||
}
|
||||
|
||||
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
|
||||
@@ -1127,4 +1145,78 @@ mod tests {
|
||||
SpacetimeClientError::Runtime(_)
|
||||
));
|
||||
}
|
||||
|
||||
fn test_client(pool_size: u32, procedure_timeout: Duration) -> SpacetimeClient {
|
||||
SpacetimeClient::new(SpacetimeClientConfig {
|
||||
// 指向本机不可达端口:测试只验证连接池行为,不需要真实 SpacetimeDB。
|
||||
server_url: "http://127.0.0.1:9".to_string(),
|
||||
database: "pool-test".to_string(),
|
||||
token: None,
|
||||
pool_size,
|
||||
procedure_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
/// 复现线上故障机制:修复前请求 future 被取消时租约不会归还,槽位永久停留在 in_use,
|
||||
/// 后续 acquire 拿着 permit 空转挂死。修复后租约 Drop 必须同时复位槽位与 permit。
|
||||
#[tokio::test]
|
||||
async fn dropped_lease_releases_slot_and_permit() {
|
||||
let client = test_client(1, Duration::from_millis(200));
|
||||
let permit = client
|
||||
.pool
|
||||
.permits
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("permit should acquire");
|
||||
client.pool.slots[0].in_use.store(true, Ordering::SeqCst);
|
||||
assert_eq!(client.pool.permits.available_permits(), 0);
|
||||
|
||||
// 模拟请求被取消:租约未经过 release_connection 直接被 Drop。
|
||||
let lease = PooledConnectionLease {
|
||||
slot_index: 0,
|
||||
connection: None,
|
||||
pool: client.pool.clone(),
|
||||
_permit: permit,
|
||||
};
|
||||
drop(lease);
|
||||
|
||||
assert!(
|
||||
!client.pool.slots[0].in_use.load(Ordering::SeqCst),
|
||||
"租约 Drop 后槽位必须复位,否则连接池会被泄漏占满"
|
||||
);
|
||||
assert_eq!(
|
||||
client.pool.permits.available_permits(),
|
||||
1,
|
||||
"租约 Drop 后 permit 必须归还"
|
||||
);
|
||||
}
|
||||
|
||||
/// 池内 permit 全部被占用(持续在途请求)时,acquire 必须在超时窗口内返回
|
||||
/// pool_acquire 超时,而不是无限等待。
|
||||
#[tokio::test]
|
||||
async fn acquire_times_out_at_pool_acquire_when_pool_is_busy() {
|
||||
let client = test_client(1, Duration::from_millis(200));
|
||||
let _held_permit = client
|
||||
.pool
|
||||
.permits
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("permit should acquire");
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
client.acquire_connection_with_timeout(Duration::from_millis(200)),
|
||||
)
|
||||
.await
|
||||
.expect("acquire 必须在超时窗口内返回,而不是空转挂死");
|
||||
|
||||
let error = match result {
|
||||
Ok(_) => panic!("池占满时应返回 pool_acquire 超时"),
|
||||
Err(error) => error,
|
||||
};
|
||||
assert_eq!(error.stage, SpacetimeClientStage::PoolAcquire);
|
||||
assert!(matches!(error.error, SpacetimeClientError::Timeout));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user