From 9d25a47b2327427f047a535b5e7cf679f7680990 Mon Sep 17 00:00:00 2001 From: kdletters Date: Thu, 23 Apr 2026 14:33:15 +0800 Subject: [PATCH] feat: pool spacetime client connections --- .env.example | 1 + ...CETIME_CLIENT_POOLING_DESIGN_2026-04-23.md | 141 +++++++++ server-rs/crates/api-server/src/config.rs | 55 ++++ server-rs/crates/api-server/src/state.rs | 1 + server-rs/crates/spacetime-client/src/lib.rs | 269 ++++++++++++++---- 5 files changed, 415 insertions(+), 52 deletions(-) create mode 100644 docs/technical/API_SERVER_SPACETIME_CLIENT_POOLING_DESIGN_2026-04-23.md diff --git a/.env.example b/.env.example index 03b8c665..904f5a5b 100644 --- a/.env.example +++ b/.env.example @@ -31,6 +31,7 @@ GENARRATIVE_API_TARGET="http://127.0.0.1:3100" GENARRATIVE_INTERNAL_API_SECRET="CHANGE_ME_FOR_PRODUCTION" GENARRATIVE_SPACETIME_SERVER_URL="http://127.0.0.1:3001" GENARRATIVE_SPACETIME_DATABASE="genarrative-dev" +GENARRATIVE_SPACETIME_POOL_SIZE="4" # Local Caddy upstream target used for dist-based testing. CADDY_API_UPSTREAM="http://127.0.0.1:8081" diff --git a/docs/technical/API_SERVER_SPACETIME_CLIENT_POOLING_DESIGN_2026-04-23.md b/docs/technical/API_SERVER_SPACETIME_CLIENT_POOLING_DESIGN_2026-04-23.md new file mode 100644 index 00000000..fce73d8d --- /dev/null +++ b/docs/technical/API_SERVER_SPACETIME_CLIENT_POOLING_DESIGN_2026-04-23.md @@ -0,0 +1,141 @@ +# api-server SpacetimeClient 连接池化设计 2026-04-23 + +更新时间:`2026-04-23` + +## 1. 背景 + +当前 `api-server` 虽然在 `AppState` 中只持有一个 `SpacetimeClient` 实例,但 `spacetime-client` 内部仍然是: + +1. 每次 procedure / reducer 调用都执行一次 `DbConnection::builder().build()` +2. 建连后立即 `run_threaded()` +3. 拿到结果后立刻 `disconnect()` + +也就是说,当前问题不是 `api-server` 每次请求都 new 一个 client,而是: + +**每次 client 调用都新建并销毁一条 SpacetimeDB 连接。** + +## 2. 本轮目标 + +本轮不继续维持“每次 HTTP 请求一条短连接”的阶段性策略。 + +本轮目标改为: + +1. `api-server` 进程内预热并持有一组可复用的 SpacetimeDB 连接 +2. 每次 HTTP 请求只从池里借一个可用连接执行 procedure / reducer +3. 请求完成后归还连接,不主动断开 +4. 连接失效时自动剔除并按需重建 + +## 3. 为什么不直接引第三方池库 + +当前仓库使用的是 `spacetimedb-sdk` 生成的 `DbConnection`,不是传统 SQL client。 + +它的连接模型包含: + +1. `on_connect` +2. `on_disconnect` +3. `run_threaded` +4. reducer / procedure callback + +这类对象不是标准的 `bb8` / `deadpool` 资源接口。 + +当前仓库也没有已经接入的通用资源池库,因此本轮优先在 `spacetime-client` 内实现最小可控池化层,而不是强行套第三方 SQL 风格池库。 + +## 4. 池化设计 + +## 4.1 结构 + +`SpacetimeClient` 内新增一个共享池状态: + +1. `pool_size` +2. `Semaphore` +3. `Vec>>` + +其中 `PooledConnection` 持有: + +1. `DbConnection` +2. `run_threaded` 返回的后台线程句柄 +3. 连接唯一 id + +## 4.2 借还模型 + +每次调用 procedure / reducer 时: + +1. 先获取 `Semaphore permit` +2. 选取一个空闲槽位 +3. 若槽位已有健康连接,则直接复用 +4. 若槽位为空或连接已坏,则现场重建 +5. 调用完成后归还槽位,但不主动断开连接 + +## 4.3 健康判断 + +当前阶段不做复杂心跳表。 + +最小健康策略如下: + +1. procedure / reducer callback 正常完成:连接保持在池中 +2. 连接在调用期间触发 `on_disconnect`:标记该槽位失效 +3. 下次借用该槽位时重建连接 + +## 4.4 并发策略 + +不共享同一个 `DbConnection` 给多个并发请求同时发 procedure。 + +原因: + +1. SDK callback 是异步回调模型 +2. 当前仓库调用层没有 request id 级别的统一 dispatcher +3. 多请求共用一条连接容易把回调和调用方绑定关系搞乱 + +所以本轮采取: + +**一个池槽位同一时刻只服务一个请求。** + +这本质上是“连接池”,不是“多路复用单连接”。 + +## 5. 默认规模 + +默认池大小取小值,避免本地开发和轻量部署浪费连接: + +1. 默认 `4` +2. 允许通过环境变量覆盖,例如 `GENARRATIVE_SPACETIME_POOL_SIZE` + +## 6. 错误与超时策略 + +沿用现有 `SpacetimeClientError` 口径: + +1. 建连失败:`Build` / `Runtime` +2. 连接在返回前断开:`ConnectDropped` 或 `Procedure` +3. 调用超时:`Timeout` + +新增规则: + +1. 借用池槽位超时,也映射为 `Timeout` +2. 某槽位一旦确认断线,必须在池中清空,不能继续复用脏连接 +3. procedure / reducer 等待结果无论成功、失败还是超时,都必须先归还租约再向上层返回,避免槽位泄漏把池卡死 +4. 调用期间若连接先收到 `on_disconnect`,当前阶段只标记坏连接;若业务回调未及时返回,则最终由调用超时路径统一清槽并回传错误 + +## 7. 与现有文档的关系 + +之前 [`AXUM_TO_SPACETIMEDB_ASSET_OBJECT_CONFIRM_CALL_DESIGN_2026-04-21.md`](D:/Genarrative/docs/technical/AXUM_TO_SPACETIMEDB_ASSET_OBJECT_CONFIRM_CALL_DESIGN_2026-04-21.md) +中写明“当前阶段每次 HTTP 请求可以建立一条短连接,待真实链路验证稳定后再评估连接池或长连接复用”。 + +本轮就是进入这个“下一阶段”: + +1. 保留 `on_connect` 后再发请求的约束 +2. 去掉“请求完成立即断开”的短连接策略 +3. 改成 `spacetime-client` 进程内连接池 + +## 8. 验收标准 + +落地后至少满足: + +1. `api-server` 启动后,`SpacetimeClient` 不再为每次调用单独建连 +2. 同一进程内连续多个 API 请求可以复用池中连接 +3. 单个连接断开后不会污染后续请求 +4. `api-server` 调用侧无需修改业务 handler + +## 9. 一句话结论 + +本轮不引第三方 SQL 风格池库,而是在 `spacetime-client` 内实现一层: + +**面向 `DbConnection` 的最小连接池,让 `api-server` 复用长活连接,而不是每次调用都单独建连。** diff --git a/server-rs/crates/api-server/src/config.rs b/server-rs/crates/api-server/src/config.rs index c7e8dee7..744acfe2 100644 --- a/server-rs/crates/api-server/src/config.rs +++ b/server-rs/crates/api-server/src/config.rs @@ -67,6 +67,7 @@ pub struct AppConfig { pub spacetime_server_url: String, pub spacetime_database: String, pub spacetime_token: Option, + pub spacetime_pool_size: u32, pub llm_provider: LlmProvider, pub llm_base_url: String, pub llm_api_key: Option, @@ -139,6 +140,7 @@ impl Default for AppConfig { spacetime_server_url: "http://127.0.0.1:3000".to_string(), spacetime_database: "genarrative-dev".to_string(), spacetime_token: None, + spacetime_pool_size: 4, llm_provider: LlmProvider::Ark, llm_base_url: DEFAULT_ARK_BASE_URL.to_string(), llm_api_key: None, @@ -370,6 +372,11 @@ impl AppConfig { } config.spacetime_token = read_first_non_empty_env(&["GENARRATIVE_SPACETIME_TOKEN"]); + if let Some(spacetime_pool_size) = + read_first_positive_u32_env(&["GENARRATIVE_SPACETIME_POOL_SIZE"]) + { + config.spacetime_pool_size = spacetime_pool_size; + } if let Some(llm_provider) = read_first_llm_provider_env(&["GENARRATIVE_LLM_PROVIDER", "LLM_PROVIDER"]) @@ -620,3 +627,51 @@ fn parse_positive_u16(raw: &str) -> Option { Some(value) } + +#[cfg(test)] +mod tests { + use super::AppConfig; + use std::sync::{Mutex, OnceLock}; + + static ENV_LOCK: OnceLock> = OnceLock::new(); + + #[test] + fn from_env_reads_spacetime_pool_size() { + let _guard = ENV_LOCK + .get_or_init(|| Mutex::new(())) + .lock() + .expect("env lock should not poison"); + + unsafe { + std::env::remove_var("GENARRATIVE_SPACETIME_POOL_SIZE"); + std::env::set_var("GENARRATIVE_SPACETIME_POOL_SIZE", "7"); + } + + let config = AppConfig::from_env(); + assert_eq!(config.spacetime_pool_size, 7); + + unsafe { + std::env::remove_var("GENARRATIVE_SPACETIME_POOL_SIZE"); + } + } + + #[test] + fn from_env_ignores_zero_spacetime_pool_size() { + let _guard = ENV_LOCK + .get_or_init(|| Mutex::new(())) + .lock() + .expect("env lock should not poison"); + + unsafe { + std::env::remove_var("GENARRATIVE_SPACETIME_POOL_SIZE"); + std::env::set_var("GENARRATIVE_SPACETIME_POOL_SIZE", "0"); + } + + let config = AppConfig::from_env(); + assert_eq!(config.spacetime_pool_size, 4); + + unsafe { + std::env::remove_var("GENARRATIVE_SPACETIME_POOL_SIZE"); + } + } +} diff --git a/server-rs/crates/api-server/src/state.rs b/server-rs/crates/api-server/src/state.rs index dc630bcc..e602b43b 100644 --- a/server-rs/crates/api-server/src/state.rs +++ b/server-rs/crates/api-server/src/state.rs @@ -116,6 +116,7 @@ impl AppState { server_url: config.spacetime_server_url.clone(), database: config.spacetime_database.clone(), token: config.spacetime_token.clone(), + pool_size: config.spacetime_pool_size, }); let llm_client = build_llm_client(&config)?; diff --git a/server-rs/crates/spacetime-client/src/lib.rs b/server-rs/crates/spacetime-client/src/lib.rs index 707eed26..fd769a1a 100644 --- a/server-rs/crates/spacetime-client/src/lib.rs +++ b/server-rs/crates/spacetime-client/src/lib.rs @@ -3,7 +3,9 @@ pub mod module_bindings; use std::{ error::Error, fmt, + sync::atomic::{AtomicBool, Ordering}, sync::{Arc, Mutex}, + thread::JoinHandle, time::Duration, }; @@ -95,7 +97,10 @@ use module_story::{ }; use shared_kernel::format_timestamp_micros; use spacetimedb_sdk::DbContext; -use tokio::{sync::oneshot, time::timeout}; +use tokio::{ + sync::{OwnedSemaphorePermit, Semaphore, oneshot}, + time::timeout, +}; use crate::module_bindings::{ AiResultReferenceInput as BindingAiResultReferenceInput, @@ -350,11 +355,13 @@ pub struct SpacetimeClientConfig { pub server_url: String, pub database: String, pub token: Option, + pub pool_size: u32, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct SpacetimeClient { config: SpacetimeClientConfig, + pool: Arc, } #[derive(Debug)] @@ -372,9 +379,45 @@ type ProcedureResultSender = Arc>>>>; type ReducerResultSender = Arc>>>>; +struct SpacetimeConnectionPool { + slots: Vec>, + permits: Arc, +} + +struct PooledConnectionSlot { + connection: Option, + in_use: bool, +} + +struct PooledConnection { + connection: DbConnection, + runner: Option>, + broken: Arc, +} + +struct PooledConnectionLease { + slot_index: usize, + connection: Option, + _permit: OwnedSemaphorePermit, +} + impl SpacetimeClient { pub fn new(config: SpacetimeClientConfig) -> Self { - Self { config } + let pool_size = config.pool_size.max(1) as usize; + let slots = (0..pool_size) + .map(|_| { + tokio::sync::Mutex::new(PooledConnectionSlot { + connection: None, + in_use: false, + }) + }) + .collect::>(); + let pool = Arc::new(SpacetimeConnectionPool { + slots, + permits: Arc::new(Semaphore::new(pool_size)), + }); + + Self { config, pool } } pub async fn create_ai_task( @@ -2213,70 +2256,122 @@ impl SpacetimeClient { where T: Send + 'static, { - let config = self.config.clone(); let (sender, receiver) = oneshot::channel(); let result_sender = Arc::new(Mutex::new(Some(sender))); - let connect_sender = result_sender.clone(); - let disconnect_sender = result_sender.clone(); + let lease = self.acquire_connection().await?; + let final_result = if let Some(connection) = lease.connection.as_ref() { + call(&connection.connection, result_sender.clone()); + match timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await { + Ok(inner) => match inner { + Ok(value) => value, + Err(_) => Err(SpacetimeClientError::ConnectDropped), + }, + Err(_) => Err(Self::resolve_timeout_error(Some(connection))), + } + } else { + Err(SpacetimeClientError::Runtime( + "SpacetimeDB 连接租约缺少连接".to_string(), + )) + }; + self.release_connection(lease).await; - let connection = tokio::task::spawn_blocking(move || { - DbConnection::builder() - .with_uri(config.server_url) - .with_database_name(config.database) - .with_token(config.token) - .on_connect(move |connection, _, _| { - // SDK 收到 IdentityToken 后才调用 procedure,避免 WebSocket 已建好但身份握手未完成时丢请求。 - call(connection, connect_sender); - }) - .on_disconnect(move |_, error| { - let message = error - .map(|error| error.to_string()) - .unwrap_or_else(|| "SpacetimeDB 连接在 procedure 返回前断开".to_string()); - send_once( - &disconnect_sender, - Err(SpacetimeClientError::Procedure(message)), - ); - }) - .build() - .map_err(|error| SpacetimeClientError::Build(error.to_string())) - }) - .await - .map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??; - - let runner = connection.run_threaded(); - let result = timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await; - let _ = connection.disconnect(); - // SDK 线程会在断开消息被处理后自行退出;HTTP 请求不能同步等待该线程,否则 Windows 本地联调可能卡在收尾阶段。 - drop(runner); - - result - .map_err(|_| SpacetimeClientError::Timeout)? - .map_err(|_| SpacetimeClientError::ConnectDropped)? + final_result } async fn call_reducer_after_connect( &self, call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static, ) -> Result<(), SpacetimeClientError> { - let config = self.config.clone(); let (sender, receiver) = oneshot::channel(); let result_sender = Arc::new(Mutex::new(Some(sender))); - let connect_sender = result_sender.clone(); - let disconnect_sender = result_sender.clone(); + let lease = self.acquire_connection().await?; + let final_result = if let Some(connection) = lease.connection.as_ref() { + call(&connection.connection, result_sender.clone()); + match timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await { + Ok(inner) => match inner { + Ok(value) => value, + Err(_) => Err(SpacetimeClientError::ConnectDropped), + }, + Err(_) => Err(Self::resolve_timeout_error(Some(connection))), + } + } else { + Err(SpacetimeClientError::Runtime( + "SpacetimeDB 连接租约缺少连接".to_string(), + )) + }; + self.release_connection(lease).await; + final_result + } + + async fn acquire_connection(&self) -> Result { + let permit = timeout( + CONFIRM_ASSET_OBJECT_TIMEOUT, + self.pool.permits.clone().acquire_owned(), + ) + .await + .map_err(|_| SpacetimeClientError::Timeout)? + .map_err(|error| SpacetimeClientError::Runtime(error.to_string()))?; + + loop { + for (slot_index, slot) in self.pool.slots.iter().enumerate() { + if let Ok(mut slot_guard) = slot.try_lock() { + if slot_guard.in_use { + continue; + } + let reusable_connection = slot_guard + .connection + .take() + .filter(|connection| !connection.is_broken()); + slot_guard.in_use = true; + drop(slot_guard); + + let connection = if let Some(connection) = reusable_connection { + connection + } else { + match self.build_pooled_connection().await { + Ok(connection) => connection, + Err(error) => { + let mut slot_guard = self.pool.slots[slot_index].lock().await; + slot_guard.in_use = false; + return Err(error); + } + } + }; + + return Ok(PooledConnectionLease { + slot_index, + connection: Some(connection), + _permit: permit, + }); + } + } + + tokio::task::yield_now().await; + } + } + + async fn build_pooled_connection(&self) -> Result { + let config = self.config.clone(); + let broken = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = oneshot::channel::>(); + let connect_sender = Arc::new(Mutex::new(Some(sender))); + let broken_flag = broken.clone(); + let disconnect_sender = connect_sender.clone(); let connection = tokio::task::spawn_blocking(move || { DbConnection::builder() .with_uri(config.server_url) .with_database_name(config.database) .with_token(config.token) - .on_connect(move |connection, _, _| { - call(connection, connect_sender); + .on_connect(move |_, _, _| { + send_connect_once(&connect_sender, Ok(())); }) .on_disconnect(move |_, error| { + broken_flag.store(true, Ordering::SeqCst); let message = error .map(|error| error.to_string()) - .unwrap_or_else(|| "SpacetimeDB 连接在 reducer 返回前断开".to_string()); - send_reducer_once( + .unwrap_or_else(|| "SpacetimeDB 连接已断开".to_string()); + send_connect_once( &disconnect_sender, Err(SpacetimeClientError::Procedure(message)), ); @@ -2288,13 +2383,70 @@ impl SpacetimeClient { .map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??; let runner = connection.run_threaded(); - let result = timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await; - let _ = connection.disconnect(); - drop(runner); - - result + timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver) + .await .map_err(|_| SpacetimeClientError::Timeout)? - .map_err(|_| SpacetimeClientError::ConnectDropped)? + .map_err(|_| SpacetimeClientError::ConnectDropped)??; + + Ok(PooledConnection { + connection, + runner: Some(runner), + broken, + }) + } + + async fn release_connection(&self, mut lease: PooledConnectionLease) { + let mut slot_guard = self.pool.slots[lease.slot_index].lock().await; + slot_guard.in_use = false; + let Some(connection) = lease.connection.take() else { + slot_guard.connection = None; + return; + }; + if connection.is_broken() { + slot_guard.connection = None; + } else { + slot_guard.connection = Some(connection); + } + } + + // 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。 + fn resolve_timeout_error(connection: Option<&PooledConnection>) -> SpacetimeClientError { + if let Some(connection) = connection { + if connection.is_broken() { + return SpacetimeClientError::ConnectDropped; + } + connection.mark_broken(); + } + + SpacetimeClientError::Timeout + } +} + +impl PooledConnection { + fn is_broken(&self) -> bool { + self.broken.load(Ordering::SeqCst) + } + + fn mark_broken(&self) { + self.broken.store(true, Ordering::SeqCst); + } +} + +impl Drop for PooledConnection { + fn drop(&mut self) { + let _ = self.connection.disconnect(); + if let Some(runner) = self.runner.take() { + drop(runner); + } + } +} + +impl fmt::Debug for SpacetimeClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SpacetimeClient") + .field("config", &self.config) + .field("pool_size", &self.pool.slots.len()) + .finish() } } @@ -2318,6 +2470,19 @@ fn send_reducer_once(sender: &ReducerResultSender, result: Result<(), SpacetimeC } } +fn send_connect_once( + sender: &Arc>>>>, + result: Result<(), SpacetimeClientError>, +) { + if let Some(sender) = sender + .lock() + .expect("spacetime connect sender should not poison") + .take() + { + let _ = sender.send(result); + } +} + fn map_entity_binding_input( input: module_assets::AssetEntityBindingInput, ) -> BindingAssetEntityBindingInput {