Files
Genarrative/server-rs/crates/spacetime-client/src/lib.rs

500 lines
20 KiB
Rust

// `module_bindings` 是 SpacetimeDB CLI 生成产物,禁止再被 rustfmt 二次改写。
#[rustfmt::skip]
pub mod module_bindings;
mod mapper;
pub(crate) use mapper::*;
pub use mapper::{
AiResultReferenceRecord, AiTaskMutationRecord, AiTaskRecord, AiTaskStageRecord,
AiTextChunkRecord, BattleStateRecord, BigFishAgentMessageRecord, BigFishAnchorItemRecord,
BigFishAnchorPackRecord, BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput,
BigFishAssetSlotRecord, BigFishBackgroundBlueprintRecord, BigFishGameDraftRecord,
BigFishLevelBlueprintRecord, BigFishMessageFinalizeRecordInput,
BigFishMessageSubmitRecordInput, BigFishRuntimeParamsRecord, BigFishSessionCreateRecordInput,
BigFishSessionRecord, BigFishWorkSummaryRecord, CustomWorldAgentActionExecuteRecord,
CustomWorldAgentActionExecuteRecordInput, CustomWorldAgentCheckpointRecord,
CustomWorldAgentMessageFinalizeRecordInput, CustomWorldAgentMessageRecord,
CustomWorldAgentMessageSubmitRecordInput, CustomWorldAgentOperationProgressRecordInput,
CustomWorldAgentOperationRecord, CustomWorldAgentSessionCreateRecordInput,
CustomWorldAgentSessionRecord, CustomWorldCheckpointRecord, CustomWorldDraftCardDetailRecord,
CustomWorldDraftCardDetailSectionRecord, CustomWorldDraftCardRecord,
CustomWorldGalleryEntryRecord, CustomWorldLibraryEntryRecord, CustomWorldLibraryMutationRecord,
CustomWorldProfileUpsertRecordInput, CustomWorldPublishGateRecord,
CustomWorldPublishWorldRecord, CustomWorldPublishWorldRecordInput,
CustomWorldPublishedProfileCompileRecord, CustomWorldResultPreviewBlockerRecord,
CustomWorldSupportedActionRecord, CustomWorldWorkSummaryRecord, NpcBattleInteractionRecord,
NpcInteractionRecord, NpcStateRecord, PuzzleAgentMessageFinalizeRecordInput,
PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput,
PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord,
PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord,
PuzzleBoardRecord, PuzzleCellPositionRecord, PuzzleCreatorIntentRecord,
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzleMergedGroupRecord,
PuzzlePieceStateRecord, PuzzlePublishRecordInput, PuzzleResultDraftRecord,
PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord,
PuzzleRunDragRecordInput, PuzzleRunNextLevelRecordInput, PuzzleRunRecord,
PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleRuntimeLevelRecord,
PuzzleSelectCoverImageRecordInput, PuzzleWorkProfileRecord, PuzzleWorkUpsertRecordInput,
ResolveCombatActionRecord, ResolveNpcBattleInteractionInput,
};
pub mod ai;
pub mod assets;
pub mod auth;
pub mod big_fish;
pub mod combat;
pub mod custom_world;
pub mod inventory;
pub mod npc;
pub mod puzzle;
pub mod runtime;
pub mod story;
use std::{
error::Error,
fmt,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, Mutex},
thread::JoinHandle,
time::Duration,
};
use module_ai::{
AiResultReferenceInput as DomainAiResultReferenceInput,
AiResultReferenceKind as DomainAiResultReferenceKind,
AiStageCompletionInput as DomainAiStageCompletionInput,
AiTaskCancelInput as DomainAiTaskCancelInput, AiTaskCreateInput as DomainAiTaskCreateInput,
AiTaskFailureInput as DomainAiTaskFailureInput, AiTaskFinishInput as DomainAiTaskFinishInput,
AiTaskKind as DomainAiTaskKind, AiTaskStageBlueprint as DomainAiTaskStageBlueprint,
AiTaskStageKind as DomainAiTaskStageKind, AiTaskStageStartInput as DomainAiTaskStageStartInput,
AiTaskStartInput as DomainAiTaskStartInput,
AiTextChunkAppendInput as DomainAiTextChunkAppendInput,
};
use module_assets::{
AssetEntityBindingRecord, AssetHistoryEntryRecord, AssetObjectAccessPolicy, AssetObjectRecord,
build_asset_entity_binding_record, build_asset_history_entry_record, build_asset_object_record,
};
use module_combat::{
BattleMode as DomainBattleMode, BattleStateInput as DomainBattleStateInput,
BattleStateQueryInput as DomainBattleStateQueryInput,
BattleStateSnapshot as DomainBattleStateSnapshot, BattleStatus as DomainBattleStatus,
CombatOutcome as DomainCombatOutcome,
ResolveCombatActionInput as DomainResolveCombatActionInput,
ResolveCombatActionResult as DomainResolveCombatActionResult, build_battle_state_query_input,
validate_battle_state_input, validate_resolve_combat_action_input,
};
use module_custom_world::CustomWorldThemeMode as DomainCustomWorldThemeMode;
use module_inventory::{
RuntimeInventoryStateQueryInput as DomainRuntimeInventoryStateQueryInput,
RuntimeInventoryStateRecord,
RuntimeInventoryStateSnapshot as DomainRuntimeInventoryStateSnapshot,
build_runtime_inventory_state_query_input, build_runtime_inventory_state_record,
};
use module_npc::{
NpcInteractionBattleMode as DomainNpcInteractionBattleMode,
NpcInteractionResult as DomainNpcInteractionResult,
NpcInteractionStatus as DomainNpcInteractionStatus,
NpcRelationStance as DomainNpcRelationStance, NpcRelationState as DomainNpcRelationState,
NpcStanceProfile as DomainNpcStanceProfile, NpcStateSnapshot as DomainNpcStateSnapshot,
ResolveNpcInteractionInput as DomainResolveNpcInteractionInput,
};
use module_puzzle::{
PuzzleAgentMessageSnapshot as DomainPuzzleAgentMessageSnapshot,
PuzzleAgentSessionSnapshot as DomainPuzzleAgentSessionSnapshot,
PuzzleAgentSuggestedAction as DomainPuzzleAgentSuggestedAction,
PuzzleAnchorItem as DomainPuzzleAnchorItem, PuzzleAnchorPack as DomainPuzzleAnchorPack,
PuzzleBoardSnapshot as DomainPuzzleBoardSnapshot,
PuzzleCellPosition as DomainPuzzleCellPosition,
PuzzleCreatorIntent as DomainPuzzleCreatorIntent,
PuzzleGeneratedImageCandidate as DomainPuzzleGeneratedImageCandidate,
PuzzleMergedGroupState as DomainPuzzleMergedGroupState,
PuzzlePieceState as DomainPuzzlePieceState, PuzzleResultDraft as DomainPuzzleResultDraft,
PuzzleResultPreviewBlocker as DomainPuzzleResultPreviewBlocker,
PuzzleResultPreviewEnvelope as DomainPuzzleResultPreviewEnvelope,
PuzzleResultPreviewFinding as DomainPuzzleResultPreviewFinding,
PuzzleRunSnapshot as DomainPuzzleRunSnapshot,
PuzzleRuntimeLevelSnapshot as DomainPuzzleRuntimeLevelSnapshot,
PuzzleWorkProfile as DomainPuzzleWorkProfile,
};
use module_runtime::{
RuntimeBrowseHistoryRecord, RuntimePlatformTheme as DomainRuntimePlatformTheme,
RuntimeProfileDashboardRecord, RuntimeProfilePlayStatsRecord,
RuntimeProfileRechargeCenterRecord, RuntimeProfileRechargeOrderRecord,
RuntimeProfileRedeemCodeMode as DomainRuntimeProfileRedeemCodeMode,
RuntimeProfileRedeemCodeRecord, RuntimeProfileRewardCodeRedeemRecord,
RuntimeProfileSaveArchiveRecord, RuntimeProfileWalletLedgerEntryRecord,
RuntimeReferralInviteCenterRecord, RuntimeReferralRedeemRecord, RuntimeSettingsRecord,
RuntimeSnapshotRecord, build_runtime_browse_history_clear_input,
build_runtime_browse_history_list_input, build_runtime_browse_history_record,
build_runtime_browse_history_sync_input, build_runtime_profile_dashboard_get_input,
build_runtime_profile_dashboard_record, build_runtime_profile_play_stats_get_input,
build_runtime_profile_play_stats_record, build_runtime_profile_recharge_center_get_input,
build_runtime_profile_recharge_center_record,
build_runtime_profile_recharge_order_create_input,
build_runtime_profile_redeem_code_admin_disable_input,
build_runtime_profile_redeem_code_admin_upsert_input, build_runtime_profile_redeem_code_record,
build_runtime_profile_reward_code_redeem_input,
build_runtime_profile_reward_code_redeem_record, build_runtime_profile_save_archive_list_input,
build_runtime_profile_save_archive_record, build_runtime_profile_save_archive_resume_input,
build_runtime_profile_wallet_adjustment_input,
build_runtime_profile_wallet_ledger_entry_record,
build_runtime_profile_wallet_ledger_list_input, build_runtime_referral_invite_center_get_input,
build_runtime_referral_invite_center_record, build_runtime_referral_redeem_input,
build_runtime_referral_redeem_record, build_runtime_setting_get_input,
build_runtime_setting_record, build_runtime_setting_upsert_input,
build_runtime_snapshot_delete_input, build_runtime_snapshot_get_input,
build_runtime_snapshot_record, build_runtime_snapshot_upsert_input,
};
use module_runtime_item::{
RuntimeItemEquipmentSlot as DomainRuntimeItemEquipmentSlot,
RuntimeItemRewardItemRarity as DomainRuntimeItemRewardItemRarity,
RuntimeItemRewardItemSnapshot as DomainRuntimeItemRewardItemSnapshot,
normalize_reward_item_snapshot,
};
use module_story::{
StoryContinueInput as DomainStoryContinueInput, StoryEventKind as DomainStoryEventKind,
StoryEventRecord, StorySessionInput as DomainStorySessionInput, StorySessionRecord,
StorySessionResultRecord, StorySessionStateInput as DomainStorySessionStateInput,
StorySessionStateRecord, StorySessionStatus as DomainStorySessionStatus,
build_story_continue_input, build_story_session_input, build_story_session_state_input,
};
use shared_kernel::format_timestamp_micros;
use spacetimedb_sdk::DbContext;
use tokio::{
sync::{OwnedSemaphorePermit, Semaphore, oneshot},
time::timeout,
};
use crate::module_bindings::*;
#[derive(Clone, Debug)]
pub struct SpacetimeClientConfig {
pub server_url: String,
pub database: String,
pub token: Option<String>,
pub pool_size: u32,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AuthStoreSnapshotRecord {
pub snapshot_json: Option<String>,
pub updated_at_micros: Option<i64>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AuthStoreSnapshotImportRecord {
pub imported_user_count: u32,
pub imported_identity_count: u32,
pub imported_refresh_session_count: u32,
}
#[derive(Clone)]
pub struct SpacetimeClient {
config: SpacetimeClientConfig,
pool: Arc<SpacetimeConnectionPool>,
}
#[derive(Debug)]
pub enum SpacetimeClientError {
Build(String),
ConnectDropped,
Procedure(String),
Runtime(String),
Timeout,
}
const CONFIRM_ASSET_OBJECT_TIMEOUT: Duration = Duration::from_secs(10);
type ProcedureResultSender<T> =
Arc<Mutex<Option<oneshot::Sender<Result<T, SpacetimeClientError>>>>>;
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
struct SpacetimeConnectionPool {
slots: Vec<tokio::sync::Mutex<PooledConnectionSlot>>,
permits: Arc<Semaphore>,
}
struct PooledConnectionSlot {
connection: Option<PooledConnection>,
in_use: bool,
}
struct PooledConnection {
connection: DbConnection,
runner: Option<JoinHandle<()>>,
broken: Arc<AtomicBool>,
}
struct PooledConnectionLease {
slot_index: usize,
connection: Option<PooledConnection>,
_permit: OwnedSemaphorePermit,
}
impl SpacetimeClient {
pub fn new(config: SpacetimeClientConfig) -> Self {
let pool_size = config.pool_size.max(1) as usize;
let slots = (0..pool_size)
.map(|_| {
tokio::sync::Mutex::new(PooledConnectionSlot {
connection: None,
in_use: false,
})
})
.collect::<Vec<_>>();
let pool = Arc::new(SpacetimeConnectionPool {
slots,
permits: Arc::new(Semaphore::new(pool_size)),
});
Self { config, pool }
}
async fn call_after_connect<T>(
&self,
call: impl FnOnce(&DbConnection, ProcedureResultSender<T>) + Send + 'static,
) -> Result<T, SpacetimeClientError>
where
T: Send + 'static,
{
let (sender, receiver) = oneshot::channel();
let result_sender = Arc::new(Mutex::new(Some(sender)));
let lease = self.acquire_connection().await?;
let final_result = if let Some(connection) = lease.connection.as_ref() {
call(&connection.connection, result_sender.clone());
match timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await {
Ok(inner) => match inner {
Ok(value) => value,
Err(_) => Err(SpacetimeClientError::ConnectDropped),
},
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
}
} else {
Err(SpacetimeClientError::Runtime(
"SpacetimeDB 连接租约缺少连接".to_string(),
))
};
self.release_connection(lease).await;
final_result
}
async fn call_reducer_after_connect(
&self,
call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static,
) -> Result<(), SpacetimeClientError> {
let (sender, receiver) = oneshot::channel();
let result_sender = Arc::new(Mutex::new(Some(sender)));
let lease = self.acquire_connection().await?;
let final_result = if let Some(connection) = lease.connection.as_ref() {
call(&connection.connection, result_sender.clone());
match timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver).await {
Ok(inner) => match inner {
Ok(value) => value,
Err(_) => Err(SpacetimeClientError::ConnectDropped),
},
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
}
} else {
Err(SpacetimeClientError::Runtime(
"SpacetimeDB 连接租约缺少连接".to_string(),
))
};
self.release_connection(lease).await;
final_result
}
async fn acquire_connection(&self) -> Result<PooledConnectionLease, SpacetimeClientError> {
let permit = timeout(
CONFIRM_ASSET_OBJECT_TIMEOUT,
self.pool.permits.clone().acquire_owned(),
)
.await
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))?;
loop {
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
if let Ok(mut slot_guard) = slot.try_lock() {
if slot_guard.in_use {
continue;
}
let reusable_connection = slot_guard
.connection
.take()
.filter(|connection| !connection.is_broken());
slot_guard.in_use = true;
drop(slot_guard);
let connection = if let Some(connection) = reusable_connection {
connection
} else {
match self.build_pooled_connection().await {
Ok(connection) => connection,
Err(error) => {
let mut slot_guard = self.pool.slots[slot_index].lock().await;
slot_guard.in_use = false;
return Err(error);
}
}
};
return Ok(PooledConnectionLease {
slot_index,
connection: Some(connection),
_permit: permit,
});
}
}
tokio::task::yield_now().await;
}
}
async fn build_pooled_connection(&self) -> Result<PooledConnection, SpacetimeClientError> {
let config = self.config.clone();
let broken = Arc::new(AtomicBool::new(false));
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
let connect_sender = Arc::new(Mutex::new(Some(sender)));
let broken_flag = broken.clone();
let disconnect_sender = connect_sender.clone();
let connection = tokio::task::spawn_blocking(move || {
DbConnection::builder()
.with_uri(config.server_url)
.with_database_name(config.database)
.with_token(config.token)
.on_connect(move |_, _, _| {
send_connect_once(&connect_sender, Ok(()));
})
.on_disconnect(move |_, error| {
broken_flag.store(true, Ordering::SeqCst);
let message = error
.map(|error| error.to_string())
.unwrap_or_else(|| "SpacetimeDB 连接已断开".to_string());
send_connect_once(
&disconnect_sender,
Err(SpacetimeClientError::Procedure(message)),
);
})
.build()
.map_err(|error| SpacetimeClientError::Build(error.to_string()))
})
.await
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??;
let runner = connection.run_threaded();
timeout(CONFIRM_ASSET_OBJECT_TIMEOUT, receiver)
.await
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
Ok(PooledConnection {
connection,
runner: Some(runner),
broken,
})
}
async fn release_connection(&self, mut lease: PooledConnectionLease) {
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
slot_guard.in_use = false;
let Some(connection) = lease.connection.take() else {
slot_guard.connection = None;
return;
};
if connection.is_broken() {
slot_guard.connection = None;
} else {
slot_guard.connection = Some(connection);
}
}
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
fn resolve_timeout_error(connection: Option<&PooledConnection>) -> SpacetimeClientError {
if let Some(connection) = connection {
if connection.is_broken() {
return SpacetimeClientError::ConnectDropped;
}
connection.mark_broken();
}
SpacetimeClientError::Timeout
}
}
impl PooledConnection {
fn is_broken(&self) -> bool {
self.broken.load(Ordering::SeqCst)
}
fn mark_broken(&self) {
self.broken.store(true, Ordering::SeqCst);
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
let _ = self.connection.disconnect();
if let Some(runner) = self.runner.take() {
drop(runner);
}
}
}
impl fmt::Debug for SpacetimeClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SpacetimeClient")
.field("config", &self.config)
.field("pool_size", &self.pool.slots.len())
.finish()
}
}
fn send_once<T>(sender: &ProcedureResultSender<T>, result: Result<T, SpacetimeClientError>) {
if let Some(sender) = sender
.lock()
.expect("spacetime result sender should not poison")
.take()
{
let _ = sender.send(result);
}
}
fn send_reducer_once(sender: &ReducerResultSender, result: Result<(), SpacetimeClientError>) {
if let Some(sender) = sender
.lock()
.expect("spacetime reducer result sender should not poison")
.take()
{
let _ = sender.send(result);
}
}
fn send_connect_once(
sender: &Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>,
result: Result<(), SpacetimeClientError>,
) {
if let Some(sender) = sender
.lock()
.expect("spacetime connect sender should not poison")
.take()
{
let _ = sender.send(result);
}
}
impl fmt::Display for SpacetimeClientError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Build(message) | Self::Procedure(message) | Self::Runtime(message) => {
f.write_str(message)
}
Self::ConnectDropped => f.write_str("SpacetimeDB 连接在返回结果前已断开"),
Self::Timeout => f.write_str("SpacetimeDB procedure 调用超时"),
}
}
}
impl Error for SpacetimeClientError {}