feat: pool spacetime client connections

This commit is contained in:
2026-04-23 14:33:15 +08:00
parent da7c1ff0c5
commit 9d25a47b23
5 changed files with 415 additions and 52 deletions

View File

@@ -3,7 +3,9 @@ pub mod module_bindings;
use std::{
error::Error,
fmt,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, Mutex},
thread::JoinHandle,
time::Duration,
};
@@ -95,7 +97,10 @@ use module_story::{
};
use shared_kernel::format_timestamp_micros;
use spacetimedb_sdk::DbContext;
use tokio::{sync::oneshot, time::timeout};
use tokio::{
sync::{OwnedSemaphorePermit, Semaphore, oneshot},
time::timeout,
};
use crate::module_bindings::{
AiResultReferenceInput as BindingAiResultReferenceInput,
@@ -350,11 +355,13 @@ pub struct SpacetimeClientConfig {
pub server_url: String,
pub database: String,
pub token: Option<String>,
pub pool_size: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct SpacetimeClient {
config: SpacetimeClientConfig,
pool: Arc<SpacetimeConnectionPool>,
}
#[derive(Debug)]
@@ -372,9 +379,45 @@ type ProcedureResultSender<T> =
Arc<Mutex<Option<oneshot::Sender<Result<T, SpacetimeClientError>>>>>;
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
struct SpacetimeConnectionPool {
slots: Vec<tokio::sync::Mutex<PooledConnectionSlot>>,
permits: Arc<Semaphore>,
}
struct PooledConnectionSlot {
connection: Option<PooledConnection>,
in_use: bool,
}
struct PooledConnection {
connection: DbConnection,
runner: Option<JoinHandle<()>>,
broken: Arc<AtomicBool>,
}
struct PooledConnectionLease {
slot_index: usize,
connection: Option<PooledConnection>,
_permit: OwnedSemaphorePermit,
}
impl SpacetimeClient {
pub fn new(config: SpacetimeClientConfig) -> Self {
Self { config }
let pool_size = config.pool_size.max(1) as usize;
let slots = (0..pool_size)
.map(|_| {
tokio::sync::Mutex::new(PooledConnectionSlot {
connection: None,
in_use: false,
})
})
.collect::<Vec<_>>();
let pool = Arc::new(SpacetimeConnectionPool {
slots,
permits: Arc::new(Semaphore::new(pool_size)),
});
Self { config, pool }
}
pub async fn create_ai_task(
@@ -2213,70 +2256,122 @@ impl SpacetimeClient {
where
T: Send + 'static,
{
let config = self.config.clone();
let (sender, receiver) = oneshot::channel();
let result_sender = Arc::new(Mutex::new(Some(sender)));
let connect_sender = result_sender.clone();
let disconnect_sender = result_sender.clone();
let lease = self.acquire_connection().await?;
let final_result = if let Some(connection) = lease.connection.as_ref() {
call(&connection.connection, result_sender.clone());
match timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await {
Ok(inner) => match inner {
Ok(value) => value,
Err(_) => Err(SpacetimeClientError::ConnectDropped),
},
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
}
} else {
Err(SpacetimeClientError::Runtime(
"SpacetimeDB 连接租约缺少连接".to_string(),
))
};
self.release_connection(lease).await;
let connection = tokio::task::spawn_blocking(move || {
DbConnection::builder()
.with_uri(config.server_url)
.with_database_name(config.database)
.with_token(config.token)
.on_connect(move |connection, _, _| {
// SDK 收到 IdentityToken 后才调用 procedure避免 WebSocket 已建好但身份握手未完成时丢请求。
call(connection, connect_sender);
})
.on_disconnect(move |_, error| {
let message = error
.map(|error| error.to_string())
.unwrap_or_else(|| "SpacetimeDB 连接在 procedure 返回前断开".to_string());
send_once(
&disconnect_sender,
Err(SpacetimeClientError::Procedure(message)),
);
})
.build()
.map_err(|error| SpacetimeClientError::Build(error.to_string()))
})
.await
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??;
let runner = connection.run_threaded();
let result = timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await;
let _ = connection.disconnect();
// SDK 线程会在断开消息被处理后自行退出HTTP 请求不能同步等待该线程,否则 Windows 本地联调可能卡在收尾阶段。
drop(runner);
result
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|_| SpacetimeClientError::ConnectDropped)?
final_result
}
async fn call_reducer_after_connect(
&self,
call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static,
) -> Result<(), SpacetimeClientError> {
let config = self.config.clone();
let (sender, receiver) = oneshot::channel();
let result_sender = Arc::new(Mutex::new(Some(sender)));
let connect_sender = result_sender.clone();
let disconnect_sender = result_sender.clone();
let lease = self.acquire_connection().await?;
let final_result = if let Some(connection) = lease.connection.as_ref() {
call(&connection.connection, result_sender.clone());
match timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await {
Ok(inner) => match inner {
Ok(value) => value,
Err(_) => Err(SpacetimeClientError::ConnectDropped),
},
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
}
} else {
Err(SpacetimeClientError::Runtime(
"SpacetimeDB 连接租约缺少连接".to_string(),
))
};
self.release_connection(lease).await;
final_result
}
async fn acquire_connection(&self) -> Result<PooledConnectionLease, SpacetimeClientError> {
let permit = timeout(
CONFIRM_ASSET_OBJECT_TIMEOUT,
self.pool.permits.clone().acquire_owned(),
)
.await
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))?;
loop {
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
if let Ok(mut slot_guard) = slot.try_lock() {
if slot_guard.in_use {
continue;
}
let reusable_connection = slot_guard
.connection
.take()
.filter(|connection| !connection.is_broken());
slot_guard.in_use = true;
drop(slot_guard);
let connection = if let Some(connection) = reusable_connection {
connection
} else {
match self.build_pooled_connection().await {
Ok(connection) => connection,
Err(error) => {
let mut slot_guard = self.pool.slots[slot_index].lock().await;
slot_guard.in_use = false;
return Err(error);
}
}
};
return Ok(PooledConnectionLease {
slot_index,
connection: Some(connection),
_permit: permit,
});
}
}
tokio::task::yield_now().await;
}
}
async fn build_pooled_connection(&self) -> Result<PooledConnection, SpacetimeClientError> {
let config = self.config.clone();
let broken = Arc::new(AtomicBool::new(false));
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
let connect_sender = Arc::new(Mutex::new(Some(sender)));
let broken_flag = broken.clone();
let disconnect_sender = connect_sender.clone();
let connection = tokio::task::spawn_blocking(move || {
DbConnection::builder()
.with_uri(config.server_url)
.with_database_name(config.database)
.with_token(config.token)
.on_connect(move |connection, _, _| {
call(connection, connect_sender);
.on_connect(move |_, _, _| {
send_connect_once(&connect_sender, Ok(()));
})
.on_disconnect(move |_, error| {
broken_flag.store(true, Ordering::SeqCst);
let message = error
.map(|error| error.to_string())
.unwrap_or_else(|| "SpacetimeDB 连接在 reducer 返回前断开".to_string());
send_reducer_once(
.unwrap_or_else(|| "SpacetimeDB 连接断开".to_string());
send_connect_once(
&disconnect_sender,
Err(SpacetimeClientError::Procedure(message)),
);
@@ -2288,13 +2383,70 @@ impl SpacetimeClient {
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??;
let runner = connection.run_threaded();
let result = timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await;
let _ = connection.disconnect();
drop(runner);
result
timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver)
.await
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|_| SpacetimeClientError::ConnectDropped)?
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
Ok(PooledConnection {
connection,
runner: Some(runner),
broken,
})
}
async fn release_connection(&self, mut lease: PooledConnectionLease) {
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
slot_guard.in_use = false;
let Some(connection) = lease.connection.take() else {
slot_guard.connection = None;
return;
};
if connection.is_broken() {
slot_guard.connection = None;
} else {
slot_guard.connection = Some(connection);
}
}
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
fn resolve_timeout_error(connection: Option<&PooledConnection>) -> SpacetimeClientError {
if let Some(connection) = connection {
if connection.is_broken() {
return SpacetimeClientError::ConnectDropped;
}
connection.mark_broken();
}
SpacetimeClientError::Timeout
}
}
impl PooledConnection {
fn is_broken(&self) -> bool {
self.broken.load(Ordering::SeqCst)
}
fn mark_broken(&self) {
self.broken.store(true, Ordering::SeqCst);
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
let _ = self.connection.disconnect();
if let Some(runner) = self.runner.take() {
drop(runner);
}
}
}
impl fmt::Debug for SpacetimeClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SpacetimeClient")
.field("config", &self.config)
.field("pool_size", &self.pool.slots.len())
.finish()
}
}
@@ -2318,6 +2470,19 @@ fn send_reducer_once(sender: &ReducerResultSender, result: Result<(), SpacetimeC
}
}
fn send_connect_once(
sender: &Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>,
result: Result<(), SpacetimeClientError>,
) {
if let Some(sender) = sender
.lock()
.expect("spacetime connect sender should not poison")
.take()
{
let _ = sender.send(result);
}
}
fn map_entity_binding_input(
input: module_assets::AssetEntityBindingInput,
) -> BindingAssetEntityBindingInput {