// `module_bindings` 是 SpacetimeDB CLI 生成产物,禁止再被 rustfmt 二次改写。 #[rustfmt::skip] pub mod module_bindings; mod mapper; mod telemetry; use mapper::*; pub use mapper::{ AiResultReferenceRecord, AiTaskMutationRecord, AiTaskRecord, AiTaskStageRecord, AiTextChunkRecord, BarkBattleDraftConfigRecord, BarkBattleRunRecord, BarkBattleRuntimeConfigRecord, BattleStateRecord, BigFishAgentMessageRecord, BigFishAnchorItemRecord, BigFishAnchorPackRecord, BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput, BigFishAssetSlotRecord, BigFishBackgroundBlueprintRecord, BigFishDraftCompileRecordInput, BigFishGameDraftRecord, BigFishInputSubmitRecordInput, BigFishLevelBlueprintRecord, BigFishLikeReportRecordInput, BigFishMessageFinalizeRecordInput, BigFishMessageSubmitRecordInput, BigFishPlayReportRecordInput, BigFishRunStartRecordInput, BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRunRecord, BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record, BigFishWorkRemixRecordInput, BigFishWorkSummaryRecord, CreationEntryConfigRecord, CustomWorldAgentActionExecuteRecord, CustomWorldAgentActionExecuteRecordInput, CustomWorldAgentCheckpointRecord, CustomWorldAgentMessageFinalizeRecordInput, CustomWorldAgentMessageRecord, CustomWorldAgentMessageSubmitRecordInput, CustomWorldAgentOperationProgressRecordInput, CustomWorldAgentOperationRecord, CustomWorldAgentSessionCreateRecordInput, CustomWorldAgentSessionRecord, CustomWorldCheckpointRecord, CustomWorldDraftCardDetailRecord, CustomWorldDraftCardDetailSectionRecord, CustomWorldDraftCardRecord, CustomWorldGalleryEntryRecord, CustomWorldLibraryEntryRecord, CustomWorldLibraryMutationRecord, CustomWorldProfileLikeReportRecordInput, CustomWorldProfilePlayReportRecordInput, CustomWorldProfileRemixRecordInput, CustomWorldProfileUpsertRecordInput, CustomWorldPublishGateRecord, CustomWorldPublishWorldRecord, CustomWorldPublishWorldRecordInput, CustomWorldPublishedProfileCompileRecord, CustomWorldResultPreviewBlockerRecord, CustomWorldSupportedActionRecord, CustomWorldWorkSummaryRecord, Match3DAgentMessageFinalizeRecordInput, Match3DAgentMessageRecord, Match3DAgentMessageSubmitRecordInput, Match3DAgentSessionCreateRecordInput, Match3DAgentSessionRecord, Match3DAnchorItemRecord, Match3DAnchorPackRecord, Match3DClickConfirmationRecord, Match3DCompileDraftRecordInput, Match3DCreatorConfigRecord, Match3DItemSnapshotRecord, Match3DResultDraftRecord, Match3DRunClickRecordInput, Match3DRunRecord, Match3DRunRestartRecordInput, Match3DRunStartRecordInput, Match3DRunStopRecordInput, Match3DRunTimeUpRecordInput, Match3DTraySlotRecord, Match3DWorkProfileRecord, Match3DWorkUpdateRecordInput, JumpHopActionRequest, JumpHopActionResponse, JumpHopActionType, JumpHopCharacterAsset, JumpHopDifficulty, JumpHopDraftResponse, JumpHopGalleryCardResponse, JumpHopGalleryDetailResponse, JumpHopGalleryResponse, JumpHopGenerationStatus, JumpHopJumpRequest, JumpHopJumpResponse, JumpHopJumpResult, JumpHopLastJump, JumpHopPath, JumpHopPlatform, JumpHopRestartRunRequest, JumpHopRunResponse, JumpHopRunStatus, JumpHopRuntimeRunSnapshotResponse, JumpHopScoring, JumpHopSessionResponse, JumpHopSessionSnapshotResponse, JumpHopStartRunRequest, JumpHopStylePreset, JumpHopTileAsset, JumpHopTileType, JumpHopWorkDetailResponse, JumpHopWorkMutationResponse, JumpHopWorkProfileResponse, JumpHopWorkSummaryResponse, JumpHopWorksResponse, JumpHopWorkspaceCreateRequest, NpcBattleInteractionRecord, NpcInteractionRecord, NpcStateRecord, PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord, PuzzleCreatorIntentRecord, PuzzleDraftLevelRecord, PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, PuzzleGalleryCardRecord, PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzleMergedGroupRecord, PuzzlePieceStateRecord, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunNextLevelRecordInput, PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleRuntimeLevelRecord, PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput, PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput, PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput, ResolveCombatActionRecord, ResolveNpcBattleInteractionInput, SquareHoleAgentMessageFinalizeRecordInput, SquareHoleAgentMessageRecord, SquareHoleAgentMessageSubmitRecordInput, SquareHoleAgentSessionCreateRecordInput, SquareHoleAgentSessionRecord, SquareHoleAnchorItemRecord, SquareHoleAnchorPackRecord, SquareHoleCompileDraftRecordInput, SquareHoleCreatorConfigRecord, SquareHoleDropConfirmationRecord, SquareHoleDropFeedbackRecord, SquareHoleHoleOptionRecord, SquareHoleHoleSnapshotRecord, SquareHoleResultDraftRecord, SquareHoleRunDropRecordInput, SquareHoleRunRecord, SquareHoleRunRestartRecordInput, SquareHoleRunStartRecordInput, SquareHoleRunStopRecordInput, SquareHoleRunTimeUpRecordInput, SquareHoleShapeOptionRecord, SquareHoleShapeSnapshotRecord, SquareHoleWorkProfileRecord, SquareHoleWorkUpdateRecordInput, VisualNovelAgentMessageFinalizeRecordInput, VisualNovelAgentMessageRecord, VisualNovelAgentMessageSubmitRecordInput, VisualNovelAgentSessionCreateRecordInput, VisualNovelAgentSessionRecord, VisualNovelHistoryEntryRecord, VisualNovelHistoryEntryRecordInput, VisualNovelRunRecord, VisualNovelRunSnapshotRecordInput, VisualNovelRunStartRecordInput, VisualNovelRuntimeEventRecord, VisualNovelRuntimeEventRecordInput, VisualNovelWorkCompileRecordInput, VisualNovelWorkProfileRecord, VisualNovelWorkUpdateRecordInput, }; pub mod ai; pub mod assets; pub mod auth; pub mod bark_battle; pub use bark_battle::{ BarkBattleDraftConfigUpsertRecordInput, BarkBattleDraftCreateRecordInput, BarkBattleRunFinishRecordInput, BarkBattleRunStartRecordInput, BarkBattleWorkPublishRecordInput, }; pub mod big_fish; pub mod combat; pub mod custom_world; pub mod inventory; pub mod jump_hop; pub mod match3d; pub mod npc; pub mod puzzle; pub mod runtime; pub mod square_hole; pub mod story; pub mod story_runtime; pub mod visual_novel; use std::{ collections::HashMap, error::Error, fmt, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, Mutex}, thread::JoinHandle, time::Duration, }; use module_ai::{ AiResultReferenceInput as DomainAiResultReferenceInput, AiResultReferenceKind as DomainAiResultReferenceKind, AiStageCompletionInput as DomainAiStageCompletionInput, AiTaskCancelInput as DomainAiTaskCancelInput, AiTaskCreateInput as DomainAiTaskCreateInput, AiTaskFailureInput as DomainAiTaskFailureInput, AiTaskFinishInput as DomainAiTaskFinishInput, AiTaskKind as DomainAiTaskKind, AiTaskStageBlueprint as DomainAiTaskStageBlueprint, AiTaskStageKind as DomainAiTaskStageKind, AiTaskStageStartInput as DomainAiTaskStageStartInput, AiTaskStartInput as DomainAiTaskStartInput, AiTextChunkAppendInput as DomainAiTextChunkAppendInput, }; use module_assets::{ AssetEntityBindingRecord, AssetHistoryEntryRecord, AssetObjectAccessPolicy, AssetObjectRecord, build_asset_entity_binding_record, build_asset_history_entry_record, build_asset_object_record, }; use module_combat::{ BattleMode as DomainBattleMode, BattleStateInput as DomainBattleStateInput, BattleStateQueryInput as DomainBattleStateQueryInput, BattleStateSnapshot as DomainBattleStateSnapshot, BattleStatus as DomainBattleStatus, CombatOutcome as DomainCombatOutcome, ResolveCombatActionInput as DomainResolveCombatActionInput, ResolveCombatActionResult as DomainResolveCombatActionResult, build_battle_state_query_input, validate_battle_state_input, validate_resolve_combat_action_input, }; use module_custom_world::CustomWorldThemeMode as DomainCustomWorldThemeMode; use module_inventory::{ RuntimeInventoryStateQueryInput as DomainRuntimeInventoryStateQueryInput, RuntimeInventoryStateRecord, RuntimeInventoryStateSnapshot as DomainRuntimeInventoryStateSnapshot, build_runtime_inventory_state_query_input, build_runtime_inventory_state_record, }; use module_npc::{ NpcInteractionBattleMode as DomainNpcInteractionBattleMode, NpcInteractionResult as DomainNpcInteractionResult, NpcInteractionStatus as DomainNpcInteractionStatus, NpcRelationStance as DomainNpcRelationStance, NpcRelationState as DomainNpcRelationState, NpcStanceProfile as DomainNpcStanceProfile, NpcStateSnapshot as DomainNpcStateSnapshot, ResolveNpcInteractionInput as DomainResolveNpcInteractionInput, }; use module_puzzle::{ PuzzleAgentMessageSnapshot as DomainPuzzleAgentMessageSnapshot, PuzzleAgentSessionSnapshot as DomainPuzzleAgentSessionSnapshot, PuzzleAgentSuggestedAction as DomainPuzzleAgentSuggestedAction, PuzzleAnchorItem as DomainPuzzleAnchorItem, PuzzleAnchorPack as DomainPuzzleAnchorPack, PuzzleBoardSnapshot as DomainPuzzleBoardSnapshot, PuzzleCellPosition as DomainPuzzleCellPosition, PuzzleCreatorIntent as DomainPuzzleCreatorIntent, PuzzleDraftLevel as DomainPuzzleDraftLevel, PuzzleGeneratedImageCandidate as DomainPuzzleGeneratedImageCandidate, PuzzleMergedGroupState as DomainPuzzleMergedGroupState, PuzzlePieceState as DomainPuzzlePieceState, PuzzleResultDraft as DomainPuzzleResultDraft, PuzzleResultPreviewBlocker as DomainPuzzleResultPreviewBlocker, PuzzleResultPreviewEnvelope as DomainPuzzleResultPreviewEnvelope, PuzzleResultPreviewFinding as DomainPuzzleResultPreviewFinding, PuzzleRunSnapshot as DomainPuzzleRunSnapshot, PuzzleRuntimeLevelSnapshot as DomainPuzzleRuntimeLevelSnapshot, PuzzleWorkProfile as DomainPuzzleWorkProfile, }; use module_runtime::{ AnalyticsMetricQueryResponse as DomainAnalyticsMetricQueryResponse, RuntimeBrowseHistoryRecord, RuntimePlatformTheme as DomainRuntimePlatformTheme, RuntimeProfileDashboardRecord, RuntimeProfileFeedbackSubmissionRecord, RuntimeProfileInviteCodeRecord, RuntimeProfilePlayStatsRecord, RuntimeProfileRechargeCenterRecord, RuntimeProfileRechargeOrderRecord, RuntimeProfileRechargeProductConfigRecord, RuntimeProfileRedeemCodeMode as DomainRuntimeProfileRedeemCodeMode, RuntimeProfileRedeemCodeRecord, RuntimeProfileRewardCodeRedeemRecord, RuntimeProfileSaveArchiveRecord, RuntimeProfileTaskCenterRecord, RuntimeProfileTaskClaimRecord, RuntimeProfileTaskConfigRecord, RuntimeProfileTaskCycle as DomainRuntimeProfileTaskCycle, RuntimeProfileTaskStatus as DomainRuntimeProfileTaskStatus, RuntimeProfileWalletLedgerEntryRecord, RuntimeReferralInviteCenterRecord, RuntimeReferralRedeemRecord, RuntimeSettingsRecord, RuntimeSnapshotRecord, RuntimeTrackingScopeKind as DomainRuntimeTrackingScopeKind, build_analytics_metric_query_input, build_runtime_browse_history_clear_input, build_runtime_browse_history_list_input, build_runtime_browse_history_record, build_runtime_browse_history_sync_input, build_runtime_profile_dashboard_get_input, build_runtime_profile_dashboard_record, build_runtime_profile_feedback_submission_input, build_runtime_profile_feedback_submission_record, build_runtime_profile_invite_code_admin_list_input, build_runtime_profile_invite_code_admin_upsert_input, build_runtime_profile_invite_code_record, build_runtime_profile_play_stats_get_input, build_runtime_profile_play_stats_record, build_runtime_profile_recharge_center_get_input, build_runtime_profile_recharge_center_record, build_runtime_profile_recharge_order_create_input, build_runtime_profile_recharge_order_get_input, build_runtime_profile_recharge_product_admin_list_input, build_runtime_profile_recharge_product_admin_upsert_input, build_runtime_profile_recharge_product_config_record, build_runtime_profile_redeem_code_admin_disable_input, build_runtime_profile_redeem_code_admin_list_input, build_runtime_profile_redeem_code_admin_upsert_input, build_runtime_profile_redeem_code_record, build_runtime_profile_reward_code_redeem_input, build_runtime_profile_reward_code_redeem_record, build_runtime_profile_save_archive_list_input, build_runtime_profile_save_archive_record, build_runtime_profile_save_archive_resume_input, build_runtime_profile_task_center_get_input, build_runtime_profile_task_center_record, build_runtime_profile_task_claim_input, build_runtime_profile_task_claim_record, build_runtime_profile_task_config_admin_disable_input, build_runtime_profile_task_config_admin_list_input, build_runtime_profile_task_config_admin_upsert_input, build_runtime_profile_task_config_record, build_runtime_profile_wallet_adjustment_input, build_runtime_profile_wallet_ledger_entry_record, build_runtime_profile_wallet_ledger_list_input, build_runtime_referral_invite_center_get_input, build_runtime_referral_invite_center_record, build_runtime_referral_redeem_input, build_runtime_referral_redeem_record, build_runtime_setting_get_input, build_runtime_setting_record, build_runtime_setting_upsert_input, build_runtime_snapshot_delete_input, build_runtime_snapshot_get_input, build_runtime_snapshot_record, build_runtime_snapshot_upsert_input, }; use module_runtime_item::{ RuntimeItemEquipmentSlot as DomainRuntimeItemEquipmentSlot, RuntimeItemRewardItemRarity as DomainRuntimeItemRewardItemRarity, RuntimeItemRewardItemSnapshot as DomainRuntimeItemRewardItemSnapshot, normalize_reward_item_snapshot, }; use module_story::{ StoryContinueInput as DomainStoryContinueInput, StoryEventKind as DomainStoryEventKind, StoryEventRecord, StorySessionInput as DomainStorySessionInput, StorySessionRecord, StorySessionResultRecord, StorySessionStateInput as DomainStorySessionStateInput, StorySessionStateRecord, StorySessionStatus as DomainStorySessionStatus, build_story_continue_input, build_story_session_input, build_story_session_state_input, }; use shared_kernel::format_timestamp_micros; use spacetimedb_sdk::{DbContext, Table}; use tokio::{ sync::{OwnedSemaphorePermit, RwLock, Semaphore, oneshot}, time::timeout, }; use crate::module_bindings::*; #[derive(Clone, Debug)] pub struct SpacetimeClientConfig { pub server_url: String, pub database: String, pub token: Option, pub pool_size: u32, pub procedure_timeout: Duration, } #[derive(Clone, Debug, PartialEq, Eq)] pub struct AuthStoreSnapshotRecord { pub snapshot_json: Option, pub updated_at_micros: Option, } #[derive(Clone, Debug, PartialEq, Eq)] pub struct AuthStoreSnapshotImportRecord { pub imported_user_count: u32, pub imported_identity_count: u32, pub imported_refresh_session_count: u32, } #[derive(Clone)] pub struct SpacetimeClient { config: SpacetimeClientConfig, pool: Arc, creation_entry_config_cache: Arc>>, custom_world_gallery_legacy_sync_attempted: Arc, } #[derive(Debug)] pub enum SpacetimeClientError { Build(String), ConnectDropped, Procedure(String), Runtime(String), Timeout, } const DEFAULT_PROCEDURE_TIMEOUT: Duration = Duration::from_secs(30); const PUBLIC_WORK_PLAY_DAY_MICROS: i64 = 86_400_000_000; const PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS: i64 = 7; type ProcedureResultSender = Arc>>>>; type ReducerResultSender = Arc>>>>; struct SpacetimeConnectionPool { slots: Vec>, permits: Arc, } struct PooledConnectionSlot { connection: Option, in_use: bool, } struct PooledConnection { connection: DbConnection, _read_model_subscriptions: Vec, runner: Option>, broken: Arc, } struct PooledConnectionLease { slot_index: usize, connection: Option, _permit: OwnedSemaphorePermit, } impl SpacetimeClient { pub fn new(config: SpacetimeClientConfig) -> Self { let pool_size = config.pool_size.max(1) as usize; let config = SpacetimeClientConfig { procedure_timeout: if config.procedure_timeout.is_zero() { DEFAULT_PROCEDURE_TIMEOUT } else { config.procedure_timeout }, ..config }; let slots = (0..pool_size) .map(|_| { tokio::sync::Mutex::new(PooledConnectionSlot { connection: None, in_use: false, }) }) .collect::>(); let pool = Arc::new(SpacetimeConnectionPool { slots, permits: Arc::new(Semaphore::new(pool_size)), }); Self { config, pool, creation_entry_config_cache: Arc::new(RwLock::new(None)), custom_world_gallery_legacy_sync_attempted: Arc::new(AtomicBool::new(false)), } } async fn call_after_connect( &self, procedure: &'static str, call: impl FnOnce(&DbConnection, ProcedureResultSender) + Send + 'static, ) -> Result where T: Send + 'static, { let metrics_guard = telemetry::begin_procedure(procedure); let (sender, receiver) = oneshot::channel(); let result_sender = Arc::new(Mutex::new(Some(sender))); let final_result = match self.acquire_connection().await { Ok(lease) => { let result = if let Some(connection) = lease.connection.as_ref() { call(&connection.connection, result_sender.clone()); match timeout(self.config.procedure_timeout, receiver).await { Ok(inner) => match inner { Ok(value) => value, Err(_) => Err(SpacetimeClientError::ConnectDropped), }, Err(_) => Err(Self::resolve_timeout_error(Some(connection))), } } else { Err(SpacetimeClientError::Runtime( "SpacetimeDB 连接租约缺少连接".to_string(), )) }; self.release_connection(lease).await; result } Err(error) => Err(error), }; metrics_guard.finish(&final_result); final_result } async fn call_reducer_after_connect( &self, procedure: &'static str, call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static, ) -> Result<(), SpacetimeClientError> { let metrics_guard = telemetry::begin_procedure(procedure); let (sender, receiver) = oneshot::channel(); let result_sender = Arc::new(Mutex::new(Some(sender))); let final_result = match self.acquire_connection().await { Ok(lease) => { let result = if let Some(connection) = lease.connection.as_ref() { call(&connection.connection, result_sender.clone()); match timeout(self.config.procedure_timeout, receiver).await { Ok(inner) => match inner { Ok(value) => value, Err(_) => Err(SpacetimeClientError::ConnectDropped), }, Err(_) => Err(Self::resolve_timeout_error(Some(connection))), } } else { Err(SpacetimeClientError::Runtime( "SpacetimeDB 连接租约缺少连接".to_string(), )) }; self.release_connection(lease).await; result } Err(error) => Err(error), }; metrics_guard.finish(&final_result); final_result } async fn read_after_connect( &self, read_name: &'static str, read: impl FnOnce(&DbConnection) -> Result + Send + 'static, ) -> Result where T: Send + 'static, { let metrics_guard = telemetry::begin_read(read_name); let lease = match self.acquire_connection().await { Ok(lease) => lease, Err(error) => { let final_result = Err(error); metrics_guard.finish(&final_result); return final_result; } }; let final_result = if let Some(connection) = lease.connection.as_ref() { read(&connection.connection) } else { Err(SpacetimeClientError::Runtime( "SpacetimeDB 连接租约缺少连接".to_string(), )) }; self.release_connection(lease).await; metrics_guard.finish(&final_result); final_result } async fn cache_creation_entry_config(&self, config: CreationEntryConfigRecord) { *self.creation_entry_config_cache.write().await = Some(config); } async fn read_cached_creation_entry_config(&self) -> Option { self.creation_entry_config_cache.read().await.clone() } async fn acquire_connection(&self) -> Result { let permit = timeout( self.config.procedure_timeout, self.pool.permits.clone().acquire_owned(), ) .await .map_err(|_| SpacetimeClientError::Timeout)? .map_err(|error| SpacetimeClientError::Runtime(error.to_string()))?; loop { for (slot_index, slot) in self.pool.slots.iter().enumerate() { if let Ok(mut slot_guard) = slot.try_lock() { if slot_guard.in_use { continue; } let reusable_connection = slot_guard .connection .take() .filter(|connection| !connection.is_broken()); slot_guard.in_use = true; drop(slot_guard); let connection = if let Some(connection) = reusable_connection { connection } else { match self.build_pooled_connection().await { Ok(connection) => connection, Err(error) => { let mut slot_guard = self.pool.slots[slot_index].lock().await; slot_guard.in_use = false; return Err(error); } } }; return Ok(PooledConnectionLease { slot_index, connection: Some(connection), _permit: permit, }); } } tokio::task::yield_now().await; } } async fn build_pooled_connection(&self) -> Result { let config = self.config.clone(); let broken = Arc::new(AtomicBool::new(false)); let (sender, receiver) = oneshot::channel::>(); let connect_sender = Arc::new(Mutex::new(Some(sender))); let broken_flag = broken.clone(); let disconnect_sender = connect_sender.clone(); let connection = timeout( self.config.procedure_timeout, tokio::task::spawn_blocking(move || { DbConnection::builder() .with_uri(config.server_url) .with_database_name(config.database) .with_token(config.token) .on_connect(move |_, _, _| { send_connect_once(&connect_sender, Ok(())); }) .on_disconnect(move |_, error| { broken_flag.store(true, Ordering::SeqCst); let message = error .map(|error| error.to_string()) .unwrap_or_else(|| "SpacetimeDB 连接已断开".to_string()); send_connect_once( &disconnect_sender, Err(SpacetimeClientError::Procedure(message)), ); }) .build() .map_err(|error| SpacetimeClientError::Build(error.to_string())) }), ) .await .map_err(|_| SpacetimeClientError::Timeout)? .map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??; let runner = connection.run_threaded(); timeout(self.config.procedure_timeout, receiver) .await .map_err(|_| SpacetimeClientError::Timeout)? .map_err(|_| SpacetimeClientError::ConnectDropped)??; let read_model_subscriptions = self .subscribe_cached_read_models(&connection, broken.clone()) .await?; Ok(PooledConnection { connection, _read_model_subscriptions: read_model_subscriptions, runner: Some(runner), broken, }) } async fn subscribe_cached_read_models( &self, connection: &DbConnection, broken: Arc, ) -> Result, SpacetimeClientError> { let mut subscriptions = Vec::new(); for query in [ "SELECT * FROM puzzle_gallery_card_view", "SELECT * FROM jump_hop_gallery_card_view", "SELECT * FROM custom_world_gallery_entry", "SELECT * FROM match_3_d_gallery_view", "SELECT * FROM square_hole_gallery_view", "SELECT * FROM visual_novel_gallery_view", "SELECT * FROM big_fish_gallery_view", ] { let subscription = self .subscribe_cached_read_model_query(connection, broken.clone(), query, true) .await?; subscriptions.push(subscription); } for query in [ "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'puzzle'", "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'jump-hop'", "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'custom-world'", "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'match3d'", "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'square-hole'", "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'visual-novel'", "SELECT * FROM public_work_play_daily_stat WHERE source_type = 'big-fish'", "SELECT * FROM creation_entry_config", "SELECT * FROM creation_entry_type_config", ] { if let Ok(subscription) = self .subscribe_cached_read_model_query(connection, broken.clone(), query, false) .await { subscriptions.push(subscription); } } Ok(subscriptions) } async fn subscribe_cached_read_model_query( &self, connection: &DbConnection, broken: Arc, query: &'static str, mark_broken_on_error: bool, ) -> Result { let (sender, receiver) = oneshot::channel::>(); let applied_sender = Arc::new(Mutex::new(Some(sender))); let on_applied_sender = applied_sender.clone(); let on_error_sender = applied_sender.clone(); let broken_flag = broken.clone(); let subscription = connection .subscription_builder() .on_applied(move |_| { send_connect_once(&on_applied_sender, Ok(())); }) .on_error(move |_, error| { if mark_broken_on_error { broken_flag.store(true, Ordering::SeqCst); } send_connect_once( &on_error_sender, Err(SpacetimeClientError::Procedure(error.to_string())), ); }) .subscribe(query); timeout(self.config.procedure_timeout, receiver) .await .map_err(|_| SpacetimeClientError::Timeout)? .map_err(|_| SpacetimeClientError::ConnectDropped)??; Ok(subscription) } async fn release_connection(&self, mut lease: PooledConnectionLease) { let mut slot_guard = self.pool.slots[lease.slot_index].lock().await; slot_guard.in_use = false; let Some(connection) = lease.connection.take() else { slot_guard.connection = None; return; }; if connection.is_broken() { slot_guard.connection = None; } else { slot_guard.connection = Some(connection); } } // 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。 fn resolve_timeout_error(connection: Option<&PooledConnection>) -> SpacetimeClientError { if let Some(connection) = connection { if connection.is_broken() { return SpacetimeClientError::ConnectDropped; } connection.mark_broken(); } SpacetimeClientError::Timeout } } fn current_unix_micros() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|duration| duration.as_micros() as i64) .unwrap_or(0) } fn current_public_work_day() -> i64 { current_unix_micros().div_euclid(PUBLIC_WORK_PLAY_DAY_MICROS) } fn public_work_recent_play_counts( connection: &DbConnection, source_type: &str, ) -> HashMap { let current_day = current_public_work_day(); let first_day = current_day - (PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS - 1); let mut counts = HashMap::new(); for row in connection.db().public_work_play_daily_stat().iter() { if row.source_type != source_type || row.played_day < first_day || row.played_day > current_day { continue; } let entry: &mut u32 = counts.entry(row.profile_id).or_insert(0); *entry = (*entry).saturating_add(row.play_count); } counts } impl SpacetimeClientError { pub(crate) fn from_sdk_error(error: impl fmt::Display) -> Self { Self::Procedure(error.to_string()) } pub(crate) fn validation_failed(error: impl fmt::Display) -> Self { Self::Runtime(error.to_string()) } pub(crate) fn reducer_failed(message: String) -> Self { Self::Runtime(message) } pub(crate) fn procedure_failed(message: Option) -> Self { Self::Procedure(message.unwrap_or_else(|| "SpacetimeDB procedure 返回未知错误".to_string())) } pub(crate) fn missing_snapshot(label: &'static str) -> Self { Self::Procedure(format!("SpacetimeDB procedure 未返回{label}")) } } impl PooledConnection { fn is_broken(&self) -> bool { self.broken.load(Ordering::SeqCst) } fn mark_broken(&self) { self.broken.store(true, Ordering::SeqCst); } } impl Drop for PooledConnection { fn drop(&mut self) { let _ = self.connection.disconnect(); if let Some(runner) = self.runner.take() { drop(runner); } } } impl fmt::Debug for SpacetimeClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SpacetimeClient") .field("config", &self.config) .field("pool_size", &self.pool.slots.len()) .finish() } } fn send_once(sender: &ProcedureResultSender, result: Result) { if let Some(sender) = sender .lock() .expect("spacetime result sender should not poison") .take() { let _ = sender.send(result); } } fn send_reducer_once(sender: &ReducerResultSender, result: Result<(), SpacetimeClientError>) { if let Some(sender) = sender .lock() .expect("spacetime reducer result sender should not poison") .take() { let _ = sender.send(result); } } fn send_connect_once( sender: &Arc>>>>, result: Result<(), SpacetimeClientError>, ) { if let Some(sender) = sender .lock() .expect("spacetime connect sender should not poison") .take() { let _ = sender.send(result); } } impl fmt::Display for SpacetimeClientError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Build(message) | Self::Procedure(message) | Self::Runtime(message) => { f.write_str(message) } Self::ConnectDropped => f.write_str("SpacetimeDB 连接在返回结果前已断开"), Self::Timeout => f.write_str("SpacetimeDB procedure 调用超时"), } } } impl Error for SpacetimeClientError {} #[cfg(test)] mod tests { use super::*; #[test] fn procedure_failed_keeps_server_message_or_default() { assert_eq!( SpacetimeClientError::procedure_failed(Some("领域错误".to_string())).to_string(), "领域错误" ); assert_eq!( SpacetimeClientError::procedure_failed(None).to_string(), "SpacetimeDB procedure 返回未知错误" ); } #[test] fn missing_snapshot_names_adapter_boundary() { assert_eq!( SpacetimeClientError::missing_snapshot("story session 快照").to_string(), "SpacetimeDB procedure 未返回story session 快照" ); } #[test] fn validation_and_reducer_failures_stay_runtime_classified() { assert!(matches!( SpacetimeClientError::validation_failed("字段缺失"), SpacetimeClientError::Runtime(_) )); assert!(matches!( SpacetimeClientError::reducer_failed("状态非法".to_string()), SpacetimeClientError::Runtime(_) )); } }