新增 Web Project runtime job、持久日志、lease、取消、expired、stale 和 active preview guard 状态机 接入 api-server Web Project runtime worker 与 TempDirBuildRuntime 构建执行链路 补齐 SpacetimeDB procedure、spacetime-client facade、shared contracts 和前端 web-project client 契约 更新 /editor/agent 的 runtime job 恢复、日志回填、SSE 重连、取消按钮和 active preview 刷新恢复 新增 P2 dev smoke 脚本,并让完整 npm run dev 默认以 all 角色启动 P2 worker 补充 P2 自动化测试、浏览器 smoke 验收记录、开发运维文档和 Hermes 踩坑记忆
1254 lines
51 KiB
Rust
1254 lines
51 KiB
Rust
// `module_bindings` 是 SpacetimeDB CLI 生成产物,禁止再被 rustfmt 二次改写。
|
||
#[rustfmt::skip]
|
||
pub mod module_bindings;
|
||
|
||
mod mapper;
|
||
mod telemetry;
|
||
use mapper::*;
|
||
pub use mapper::{
|
||
AdminWorkVisibilityRecord, AiResultReferenceRecord, AiTaskMutationRecord, AiTaskRecord,
|
||
AiTaskStageRecord, AiTextChunkRecord, BarkBattleDraftConfigRecord, BarkBattleRunRecord,
|
||
BarkBattleRuntimeConfigRecord, BattleStateRecord, BigFishAgentMessageRecord,
|
||
BigFishAnchorItemRecord, BigFishAnchorPackRecord, BigFishAssetCoverageRecord,
|
||
BigFishAssetGenerateRecordInput, BigFishAssetSlotRecord, BigFishBackgroundBlueprintRecord,
|
||
BigFishDraftCompileRecordInput, BigFishGameDraftRecord, BigFishInputSubmitRecordInput,
|
||
BigFishLevelBlueprintRecord, BigFishLikeReportRecordInput, BigFishMessageFinalizeRecordInput,
|
||
BigFishMessageSubmitRecordInput, BigFishPlayReportRecordInput, BigFishRunStartRecordInput,
|
||
BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRunRecord,
|
||
BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record,
|
||
BigFishWorkRemixRecordInput, BigFishWorkSummaryRecord, CreationEntryConfigRecord,
|
||
CustomWorldAgentActionExecuteRecord, CustomWorldAgentActionExecuteRecordInput,
|
||
CustomWorldAgentCheckpointRecord, CustomWorldAgentMessageFinalizeRecordInput,
|
||
CustomWorldAgentMessageRecord, CustomWorldAgentMessageSubmitRecordInput,
|
||
CustomWorldAgentOperationProgressRecordInput, CustomWorldAgentOperationRecord,
|
||
CustomWorldAgentSessionCreateRecordInput, CustomWorldAgentSessionRecord,
|
||
CustomWorldCheckpointRecord, CustomWorldDraftCardDetailRecord,
|
||
CustomWorldDraftCardDetailSectionRecord, CustomWorldDraftCardRecord,
|
||
CustomWorldGalleryEntryRecord, CustomWorldLibraryEntryRecord, CustomWorldLibraryMutationRecord,
|
||
CustomWorldProfileLikeReportRecordInput, CustomWorldProfilePlayReportRecordInput,
|
||
CustomWorldProfileRemixRecordInput, CustomWorldProfileUpsertRecordInput,
|
||
CustomWorldPublishGateRecord, CustomWorldPublishWorldRecord,
|
||
CustomWorldPublishWorldRecordInput, CustomWorldPublishedProfileCompileRecord,
|
||
CustomWorldResultPreviewBlockerRecord, CustomWorldSupportedActionRecord,
|
||
CustomWorldWorkSummaryRecord, EditorAssetCreateRecordInput, EditorAssetDeleteRecordInput,
|
||
EditorAssetFolderCreateRecordInput, EditorAssetFolderDeleteRecordInput,
|
||
EditorAssetFolderRecord, EditorAssetFolderUpdateRecordInput, EditorAssetLibraryRecord,
|
||
EditorAssetRecord, EditorAssetUpdateRecordInput, EditorCanvasRecord,
|
||
EditorCanvasViewportRecord, EditorProjectCreateRecordInput, EditorProjectDeleteRecordInput,
|
||
EditorProjectGetRecordInput, EditorProjectLayoutSaveRecordInput, EditorProjectRecord,
|
||
EditorProjectRenameRecordInput, EditorProjectResourceCreateRecordInput,
|
||
EditorProjectResourceRecord, ExternalGenerationJobClaimRecordInput,
|
||
ExternalGenerationJobCompleteRecordInput, ExternalGenerationJobEnqueueRecordInput,
|
||
ExternalGenerationJobFailRecordInput, ExternalGenerationJobGetRecordInput,
|
||
ExternalGenerationJobRecord, ExternalGenerationJobRenewLeaseRecordInput,
|
||
ExternalGenerationQueueStatsRecord, JumpHopActionRequest, JumpHopActionResponse,
|
||
JumpHopActionType, JumpHopCharacterAsset, JumpHopDifficulty, JumpHopDraftResponse,
|
||
JumpHopGalleryCardResponse, JumpHopGalleryDetailResponse, JumpHopGalleryResponse,
|
||
JumpHopGenerationStatus, JumpHopJumpRequest, JumpHopJumpResponse, JumpHopJumpResult,
|
||
JumpHopLastJump, JumpHopPath, JumpHopPlatform, JumpHopRestartRunRequest, JumpHopRunResponse,
|
||
JumpHopRunStatus, JumpHopRuntimeRunSnapshotResponse, JumpHopScoring, JumpHopSessionResponse,
|
||
JumpHopSessionSnapshotResponse, JumpHopStartRunRequest, JumpHopStylePreset, JumpHopTileAsset,
|
||
JumpHopTileType, JumpHopWorkDetailResponse, JumpHopWorkMutationResponse,
|
||
JumpHopWorkProfileResponse, JumpHopWorkSummaryResponse, JumpHopWorksResponse,
|
||
JumpHopWorkspaceCreateRequest, Match3DAgentMessageFinalizeRecordInput,
|
||
Match3DAgentMessageRecord, Match3DAgentMessageSubmitRecordInput,
|
||
Match3DAgentSessionCreateRecordInput, Match3DAgentSessionRecord, Match3DAnchorItemRecord,
|
||
Match3DAnchorPackRecord, Match3DClickConfirmationRecord, Match3DCompileDraftRecordInput,
|
||
Match3DCreatorConfigRecord, Match3DItemSnapshotRecord, Match3DResultDraftRecord,
|
||
Match3DRunClickRecordInput, Match3DRunRecord, Match3DRunRestartRecordInput,
|
||
Match3DRunStartRecordInput, Match3DRunStopRecordInput, Match3DRunTimeUpRecordInput,
|
||
Match3DTraySlotRecord, Match3DWorkProfileRecord, Match3DWorkUpdateRecordInput,
|
||
NpcBattleInteractionRecord, NpcInteractionRecord, NpcStateRecord, PublicWorkDetailEntryRecord,
|
||
PublicWorkGalleryEntryRecord, PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord,
|
||
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
|
||
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
|
||
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
|
||
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord,
|
||
PuzzleClearActionRequest, PuzzleClearActionResponse, PuzzleClearActionType,
|
||
PuzzleClearBoardCell, PuzzleClearBoardSnapshot, PuzzleClearCardAsset, PuzzleClearDraftResponse,
|
||
PuzzleClearGenerationStatus, PuzzleClearImageAsset, PuzzleClearNextLevelRequest,
|
||
PuzzleClearPatternGroup, PuzzleClearRetryLevelRequest, PuzzleClearRunResponse,
|
||
PuzzleClearRunStatus, PuzzleClearRuntimeSnapshotResponse, PuzzleClearSessionResponse,
|
||
PuzzleClearSessionSnapshotResponse, PuzzleClearStartRunRequest, PuzzleClearSwapRequest,
|
||
PuzzleClearTimeUpRequest, PuzzleClearWorkDetailResponse, PuzzleClearWorkMutationResponse,
|
||
PuzzleClearWorkProfileResponse, PuzzleClearWorkSummaryResponse, PuzzleClearWorksResponse,
|
||
PuzzleClearWorkspaceCreateRequest, PuzzleCreatorIntentRecord,
|
||
PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord,
|
||
PuzzleFormDraftSaveRecordInput, PuzzleGalleryCardRecord, PuzzleGeneratedImageCandidateRecord,
|
||
PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord,
|
||
PuzzleLeaderboardSubmitRecordInput, PuzzleLevelGenerationFailureRecordInput,
|
||
PuzzleMergedGroupRecord, PuzzlePieceStateRecord, PuzzlePublishRecordInput,
|
||
PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord,
|
||
PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord, PuzzleRunDragRecordInput,
|
||
PuzzleRunNextLevelRecordInput, PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput,
|
||
PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleRuntimeLevelRecord,
|
||
PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput,
|
||
PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput,
|
||
PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput,
|
||
ResolveCombatActionRecord, ResolveNpcBattleInteractionInput,
|
||
SquareHoleAgentMessageFinalizeRecordInput, SquareHoleAgentMessageRecord,
|
||
SquareHoleAgentMessageSubmitRecordInput, SquareHoleAgentSessionCreateRecordInput,
|
||
SquareHoleAgentSessionRecord, SquareHoleAnchorItemRecord, SquareHoleAnchorPackRecord,
|
||
SquareHoleCompileDraftRecordInput, SquareHoleCreatorConfigRecord,
|
||
SquareHoleDropConfirmationRecord, SquareHoleDropFeedbackRecord, SquareHoleHoleOptionRecord,
|
||
SquareHoleHoleSnapshotRecord, SquareHoleResultDraftRecord, SquareHoleRunDropRecordInput,
|
||
SquareHoleRunRecord, SquareHoleRunRestartRecordInput, SquareHoleRunStartRecordInput,
|
||
SquareHoleRunStopRecordInput, SquareHoleRunTimeUpRecordInput, SquareHoleShapeOptionRecord,
|
||
SquareHoleShapeSnapshotRecord, SquareHoleWorkProfileRecord, SquareHoleWorkUpdateRecordInput,
|
||
VisualNovelAgentMessageFinalizeRecordInput, VisualNovelAgentMessageRecord,
|
||
VisualNovelAgentMessageSubmitRecordInput, VisualNovelAgentSessionCreateRecordInput,
|
||
VisualNovelAgentSessionRecord, VisualNovelHistoryEntryRecord,
|
||
VisualNovelHistoryEntryRecordInput, VisualNovelRunRecord, VisualNovelRunSnapshotRecordInput,
|
||
VisualNovelRunStartRecordInput, VisualNovelRuntimeEventRecord,
|
||
VisualNovelRuntimeEventRecordInput, VisualNovelWorkCompileRecordInput,
|
||
VisualNovelWorkProfileRecord, VisualNovelWorkUpdateRecordInput, WebProjectCreateRecordInput,
|
||
WebProjectFileRecord, WebProjectGetRecordInput, WebProjectPreviewBuildCreateRecordInput,
|
||
WebProjectPreviewBuildGetRecordInput, WebProjectPreviewBuildMutationRecord,
|
||
WebProjectPreviewBuildRecord, WebProjectPreviewBuildTokenGetRecordInput,
|
||
WebProjectPreviewBuildUpdateRecordInput, WebProjectRecord,
|
||
WebProjectRuntimeJobAppendLogRecordInput, WebProjectRuntimeJobCancelRecordInput,
|
||
WebProjectRuntimeJobClaimRecordInput, WebProjectRuntimeJobCompletePreviewBuildRecordInput,
|
||
WebProjectRuntimeJobCompleteRecordInput, WebProjectRuntimeJobCreateRecordInput,
|
||
WebProjectRuntimeJobExpireRecordInput, WebProjectRuntimeJobFailRecordInput,
|
||
WebProjectRuntimeJobGetRecordInput, WebProjectRuntimeJobListLogsRecordInput,
|
||
WebProjectRuntimeJobListOpenRecordInput, WebProjectRuntimeJobLogRecord,
|
||
WebProjectRuntimeJobPreviewBuildMutationRecord, WebProjectRuntimeJobRecord,
|
||
WebProjectRuntimeJobRenewLeaseRecordInput, WebProjectRuntimeJobStaleRecordInput,
|
||
WebProjectSnapshotGetRecordInput, WebProjectSnapshotMutationRecord, WebProjectSnapshotRecord,
|
||
WebProjectSnapshotSaveRecordInput, WoodenFishActionRequest, WoodenFishActionResponse,
|
||
WoodenFishActionType, WoodenFishAudioAsset, WoodenFishCheckpointRunRequest,
|
||
WoodenFishDraftResponse, WoodenFishFinishRunRequest, WoodenFishGalleryCardResponse,
|
||
WoodenFishGalleryDetailResponse, WoodenFishGalleryResponse, WoodenFishGenerationStatus,
|
||
WoodenFishImageAsset, WoodenFishRunResponse, WoodenFishRunStatus,
|
||
WoodenFishRuntimeRunSnapshotResponse, WoodenFishSessionResponse,
|
||
WoodenFishSessionSnapshotResponse, WoodenFishStartRunRequest, WoodenFishWordCounter,
|
||
WoodenFishWorkDetailResponse, WoodenFishWorkMutationResponse, WoodenFishWorkProfileResponse,
|
||
WoodenFishWorkSummaryResponse, WoodenFishWorkspaceCreateRequest,
|
||
};
|
||
|
||
pub mod ai;
|
||
pub mod assets;
|
||
pub mod auth;
|
||
pub mod bark_battle;
|
||
pub use bark_battle::{
|
||
BarkBattleDraftConfigUpsertRecordInput, BarkBattleDraftCreateRecordInput,
|
||
BarkBattleRunFinishRecordInput, BarkBattleRunStartRecordInput, BarkBattleWorkDeleteRecordInput,
|
||
BarkBattleWorkPublishRecordInput,
|
||
};
|
||
pub mod big_fish;
|
||
pub mod combat;
|
||
pub mod custom_world;
|
||
pub mod editor_project;
|
||
pub mod external_generation;
|
||
|
||
pub mod inventory;
|
||
pub mod jump_hop;
|
||
pub mod match3d;
|
||
pub mod npc;
|
||
pub mod public_work;
|
||
pub mod puzzle;
|
||
pub mod puzzle_clear;
|
||
pub mod runtime;
|
||
pub mod square_hole;
|
||
pub mod story;
|
||
pub mod story_runtime;
|
||
pub mod visual_novel;
|
||
pub mod web_project;
|
||
pub mod wooden_fish;
|
||
|
||
use std::{
|
||
collections::HashMap,
|
||
error::Error,
|
||
fmt,
|
||
sync::atomic::{AtomicBool, Ordering},
|
||
sync::{Arc, Mutex},
|
||
thread::JoinHandle,
|
||
time::{Duration, Instant},
|
||
};
|
||
|
||
use module_ai::{
|
||
AiResultReferenceInput as DomainAiResultReferenceInput,
|
||
AiResultReferenceKind as DomainAiResultReferenceKind,
|
||
AiStageCompletionInput as DomainAiStageCompletionInput,
|
||
AiTaskCancelInput as DomainAiTaskCancelInput, AiTaskCreateInput as DomainAiTaskCreateInput,
|
||
AiTaskFailureInput as DomainAiTaskFailureInput, AiTaskFinishInput as DomainAiTaskFinishInput,
|
||
AiTaskKind as DomainAiTaskKind, AiTaskStageBlueprint as DomainAiTaskStageBlueprint,
|
||
AiTaskStageKind as DomainAiTaskStageKind, AiTaskStageStartInput as DomainAiTaskStageStartInput,
|
||
AiTaskStartInput as DomainAiTaskStartInput,
|
||
AiTextChunkAppendInput as DomainAiTextChunkAppendInput,
|
||
};
|
||
use module_assets::{
|
||
AssetEntityBindingRecord, AssetHistoryEntryRecord, AssetObjectAccessPolicy, AssetObjectRecord,
|
||
build_asset_entity_binding_record, build_asset_history_entry_record, build_asset_object_record,
|
||
};
|
||
use module_combat::{
|
||
BattleMode as DomainBattleMode, BattleStateInput as DomainBattleStateInput,
|
||
BattleStateQueryInput as DomainBattleStateQueryInput,
|
||
BattleStateSnapshot as DomainBattleStateSnapshot, BattleStatus as DomainBattleStatus,
|
||
CombatOutcome as DomainCombatOutcome,
|
||
ResolveCombatActionInput as DomainResolveCombatActionInput,
|
||
ResolveCombatActionResult as DomainResolveCombatActionResult, build_battle_state_query_input,
|
||
validate_battle_state_input, validate_resolve_combat_action_input,
|
||
};
|
||
use module_custom_world::CustomWorldThemeMode as DomainCustomWorldThemeMode;
|
||
use module_inventory::{
|
||
RuntimeInventoryStateQueryInput as DomainRuntimeInventoryStateQueryInput,
|
||
RuntimeInventoryStateRecord,
|
||
RuntimeInventoryStateSnapshot as DomainRuntimeInventoryStateSnapshot,
|
||
build_runtime_inventory_state_query_input, build_runtime_inventory_state_record,
|
||
};
|
||
use module_npc::{
|
||
NpcInteractionBattleMode as DomainNpcInteractionBattleMode,
|
||
NpcInteractionResult as DomainNpcInteractionResult,
|
||
NpcInteractionStatus as DomainNpcInteractionStatus,
|
||
NpcRelationStance as DomainNpcRelationStance, NpcRelationState as DomainNpcRelationState,
|
||
NpcStanceProfile as DomainNpcStanceProfile, NpcStateSnapshot as DomainNpcStateSnapshot,
|
||
ResolveNpcInteractionInput as DomainResolveNpcInteractionInput,
|
||
};
|
||
use module_runtime::{
|
||
AnalyticsMetricQueryResponse as DomainAnalyticsMetricQueryResponse, RuntimeBrowseHistoryRecord,
|
||
RuntimePlatformTheme as DomainRuntimePlatformTheme, RuntimeProfileDashboardRecord,
|
||
RuntimeProfileFeedbackSubmissionRecord, RuntimeProfileInviteCodeRecord,
|
||
RuntimeProfilePlayStatsRecord, RuntimeProfileRechargeCenterRecord,
|
||
RuntimeProfileRechargeOrderRecord, RuntimeProfileRechargeProductConfigRecord,
|
||
RuntimeProfileRedeemCodeMode as DomainRuntimeProfileRedeemCodeMode,
|
||
RuntimeProfileRedeemCodeRecord, RuntimeProfileRewardCodeRedeemRecord,
|
||
RuntimeProfileSaveArchiveRecord, RuntimeProfileTaskCenterRecord, RuntimeProfileTaskClaimRecord,
|
||
RuntimeProfileTaskConfigRecord, RuntimeProfileTaskCycle as DomainRuntimeProfileTaskCycle,
|
||
RuntimeProfileTaskStatus as DomainRuntimeProfileTaskStatus,
|
||
RuntimeProfileWalletLedgerEntryRecord, RuntimeReferralInviteCenterRecord,
|
||
RuntimeReferralRedeemRecord, RuntimeSettingsRecord, RuntimeSnapshotRecord,
|
||
RuntimeTrackingScopeKind as DomainRuntimeTrackingScopeKind, build_analytics_metric_query_input,
|
||
build_runtime_browse_history_clear_input, build_runtime_browse_history_list_input,
|
||
build_runtime_browse_history_record, build_runtime_browse_history_sync_input,
|
||
build_runtime_profile_dashboard_get_input, build_runtime_profile_dashboard_record,
|
||
build_runtime_profile_feedback_submission_input,
|
||
build_runtime_profile_feedback_submission_record,
|
||
build_runtime_profile_invite_code_admin_list_input,
|
||
build_runtime_profile_invite_code_admin_upsert_input, build_runtime_profile_invite_code_record,
|
||
build_runtime_profile_play_stats_get_input, build_runtime_profile_play_stats_record,
|
||
build_runtime_profile_recharge_center_get_input, build_runtime_profile_recharge_center_record,
|
||
build_runtime_profile_recharge_order_create_input,
|
||
build_runtime_profile_recharge_order_get_input,
|
||
build_runtime_profile_recharge_product_admin_list_input,
|
||
build_runtime_profile_recharge_product_admin_upsert_input,
|
||
build_runtime_profile_recharge_product_config_record,
|
||
build_runtime_profile_redeem_code_admin_disable_input,
|
||
build_runtime_profile_redeem_code_admin_list_input,
|
||
build_runtime_profile_redeem_code_admin_upsert_input, build_runtime_profile_redeem_code_record,
|
||
build_runtime_profile_reward_code_redeem_input,
|
||
build_runtime_profile_reward_code_redeem_record, build_runtime_profile_save_archive_list_input,
|
||
build_runtime_profile_save_archive_record, build_runtime_profile_save_archive_resume_input,
|
||
build_runtime_profile_task_center_get_input, build_runtime_profile_task_center_record,
|
||
build_runtime_profile_task_claim_input, build_runtime_profile_task_claim_record,
|
||
build_runtime_profile_task_config_admin_disable_input,
|
||
build_runtime_profile_task_config_admin_list_input,
|
||
build_runtime_profile_task_config_admin_upsert_input, build_runtime_profile_task_config_record,
|
||
build_runtime_profile_wallet_adjustment_input,
|
||
build_runtime_profile_wallet_ledger_entry_record,
|
||
build_runtime_profile_wallet_ledger_list_input, build_runtime_referral_invite_center_get_input,
|
||
build_runtime_referral_invite_center_record, build_runtime_referral_redeem_input,
|
||
build_runtime_referral_redeem_record, build_runtime_setting_get_input,
|
||
build_runtime_setting_record, build_runtime_setting_upsert_input,
|
||
build_runtime_snapshot_delete_input, build_runtime_snapshot_get_input,
|
||
build_runtime_snapshot_record, build_runtime_snapshot_upsert_input,
|
||
};
|
||
use module_runtime_item::{
|
||
RuntimeItemEquipmentSlot as DomainRuntimeItemEquipmentSlot,
|
||
RuntimeItemRewardItemRarity as DomainRuntimeItemRewardItemRarity,
|
||
RuntimeItemRewardItemSnapshot as DomainRuntimeItemRewardItemSnapshot,
|
||
normalize_reward_item_snapshot,
|
||
};
|
||
use module_story::{
|
||
StoryContinueInput as DomainStoryContinueInput, StoryEventKind as DomainStoryEventKind,
|
||
StoryEventRecord, StorySessionInput as DomainStorySessionInput, StorySessionRecord,
|
||
StorySessionResultRecord, StorySessionStateInput as DomainStorySessionStateInput,
|
||
StorySessionStateRecord, StorySessionStatus as DomainStorySessionStatus,
|
||
build_story_continue_input, build_story_session_input, build_story_session_state_input,
|
||
};
|
||
use shared_kernel::format_timestamp_micros;
|
||
use spacetimedb_sdk::{DbContext, Table};
|
||
use tokio::{
|
||
sync::{OwnedSemaphorePermit, RwLock, Semaphore, oneshot},
|
||
time::timeout,
|
||
};
|
||
use tracing::warn;
|
||
|
||
use crate::module_bindings::*;
|
||
|
||
#[derive(Clone, Debug)]
|
||
pub struct SpacetimeClientConfig {
|
||
pub server_url: String,
|
||
pub database: String,
|
||
pub token: Option<String>,
|
||
pub pool_size: u32,
|
||
pub procedure_timeout: Duration,
|
||
}
|
||
|
||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||
pub enum SpacetimeClientStage {
|
||
Ready,
|
||
PoolAcquire,
|
||
ConnectBuild,
|
||
ConnectHandshake,
|
||
ReadModelSubscribe,
|
||
ProcedureResult,
|
||
ReducerResult,
|
||
ReadCache,
|
||
}
|
||
|
||
impl SpacetimeClientStage {
|
||
pub fn as_str(self) -> &'static str {
|
||
match self {
|
||
Self::Ready => "ready",
|
||
Self::PoolAcquire => "pool_acquire",
|
||
Self::ConnectBuild => "connect_build",
|
||
Self::ConnectHandshake => "connect_handshake",
|
||
Self::ReadModelSubscribe => "read_model_subscribe",
|
||
Self::ProcedureResult => "procedure_result",
|
||
Self::ReducerResult => "reducer_result",
|
||
Self::ReadCache => "read_cache",
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||
pub struct SpacetimeClientHealthSnapshot {
|
||
pub ok: bool,
|
||
pub stage: SpacetimeClientStage,
|
||
pub checked_at_micros: i64,
|
||
pub elapsed_ms: u64,
|
||
pub timeout_ms: u64,
|
||
pub error: Option<String>,
|
||
pub last_success_at_micros: Option<i64>,
|
||
pub last_error: Option<String>,
|
||
}
|
||
|
||
impl SpacetimeClientHealthSnapshot {
|
||
pub fn healthy_for_test() -> Self {
|
||
Self {
|
||
ok: true,
|
||
stage: SpacetimeClientStage::Ready,
|
||
checked_at_micros: current_unix_micros(),
|
||
elapsed_ms: 0,
|
||
timeout_ms: 0,
|
||
error: None,
|
||
last_success_at_micros: Some(current_unix_micros()),
|
||
last_error: None,
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||
pub struct AuthStoreSnapshotRecord {
|
||
pub snapshot_json: Option<String>,
|
||
pub updated_at_micros: Option<i64>,
|
||
}
|
||
|
||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||
pub struct AuthStoreSnapshotImportRecord {
|
||
pub imported_user_count: u32,
|
||
pub imported_identity_count: u32,
|
||
pub imported_refresh_session_count: u32,
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
pub struct SpacetimeClient {
|
||
config: SpacetimeClientConfig,
|
||
pool: Arc<SpacetimeConnectionPool>,
|
||
health_state: Arc<RwLock<SpacetimeClientHealthState>>,
|
||
creation_entry_config_cache: Arc<RwLock<Option<CreationEntryConfigRecord>>>,
|
||
custom_world_gallery_legacy_sync_attempted: Arc<AtomicBool>,
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
pub enum SpacetimeClientError {
|
||
Build(String),
|
||
ConnectDropped,
|
||
Procedure(String),
|
||
Runtime(String),
|
||
Timeout,
|
||
}
|
||
|
||
const DEFAULT_PROCEDURE_TIMEOUT: Duration = Duration::from_secs(30);
|
||
const PUBLIC_WORK_PLAY_DAY_MICROS: i64 = 86_400_000_000;
|
||
const PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS: i64 = 7;
|
||
|
||
type ProcedureResultSender<T> =
|
||
Arc<Mutex<Option<oneshot::Sender<Result<T, SpacetimeClientError>>>>>;
|
||
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
|
||
|
||
struct SpacetimeConnectionPool {
|
||
slots: Vec<PooledConnectionSlot>,
|
||
permits: Arc<Semaphore>,
|
||
}
|
||
|
||
#[derive(Debug, Default)]
|
||
struct SpacetimeClientHealthState {
|
||
last_success_at_micros: Option<i64>,
|
||
last_error: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
struct SpacetimeStageError {
|
||
stage: SpacetimeClientStage,
|
||
error: SpacetimeClientError,
|
||
}
|
||
|
||
impl SpacetimeStageError {
|
||
fn new(stage: SpacetimeClientStage, error: SpacetimeClientError) -> Self {
|
||
Self { stage, error }
|
||
}
|
||
}
|
||
|
||
struct PooledConnectionSlot {
|
||
// 槽位占用标记独立成原子量:抢占/复位不依赖锁,租约 Drop 兜底可以同步完成。
|
||
in_use: AtomicBool,
|
||
// in_use=true 的持有者独占本槽连接,正常情况下锁上不会有竞争。
|
||
connection: tokio::sync::Mutex<Option<PooledConnection>>,
|
||
}
|
||
|
||
struct PooledConnection {
|
||
connection: DbConnection,
|
||
_read_model_subscriptions: Vec<SubscriptionHandle>,
|
||
runner: Option<JoinHandle<()>>,
|
||
broken: Arc<AtomicBool>,
|
||
}
|
||
|
||
struct PooledConnectionLease {
|
||
slot_index: usize,
|
||
connection: Option<PooledConnection>,
|
||
pool: Arc<SpacetimeConnectionPool>,
|
||
_permit: OwnedSemaphorePermit,
|
||
}
|
||
|
||
impl Drop for PooledConnectionLease {
|
||
// 租约 Drop 兜底:请求 future 被取消(如客户端断开导致 handler 被丢弃)时,
|
||
// 也必须归还连接并复位槽位,否则槽位会永久停留在 in_use 状态、连接池逐渐耗尽。
|
||
fn drop(&mut self) {
|
||
let slot = &self.pool.slots[self.slot_index];
|
||
if let Some(connection) = self.connection.take() {
|
||
if !connection.is_broken() {
|
||
if let Ok(mut slot_connection) = slot.connection.try_lock() {
|
||
*slot_connection = Some(connection);
|
||
}
|
||
// try_lock 理论上不会失败(in_use 持有者独占);万一失败只丢弃连接,不丢槽位。
|
||
}
|
||
}
|
||
slot.in_use.store(false, Ordering::Release);
|
||
// _permit 随 Drop 自动归还信号量。
|
||
}
|
||
}
|
||
|
||
impl SpacetimeClient {
|
||
pub fn new(config: SpacetimeClientConfig) -> Self {
|
||
let pool_size = config.pool_size.max(1) as usize;
|
||
let config = SpacetimeClientConfig {
|
||
procedure_timeout: if config.procedure_timeout.is_zero() {
|
||
DEFAULT_PROCEDURE_TIMEOUT
|
||
} else {
|
||
config.procedure_timeout
|
||
},
|
||
..config
|
||
};
|
||
let slots = (0..pool_size)
|
||
.map(|_| PooledConnectionSlot {
|
||
in_use: AtomicBool::new(false),
|
||
connection: tokio::sync::Mutex::new(None),
|
||
})
|
||
.collect::<Vec<_>>();
|
||
let pool = Arc::new(SpacetimeConnectionPool {
|
||
slots,
|
||
permits: Arc::new(Semaphore::new(pool_size)),
|
||
});
|
||
|
||
Self {
|
||
config,
|
||
pool,
|
||
health_state: Arc::new(RwLock::new(SpacetimeClientHealthState::default())),
|
||
creation_entry_config_cache: Arc::new(RwLock::new(None)),
|
||
custom_world_gallery_legacy_sync_attempted: Arc::new(AtomicBool::new(false)),
|
||
}
|
||
}
|
||
|
||
async fn call_after_connect<T>(
|
||
&self,
|
||
procedure: &'static str,
|
||
call: impl FnOnce(&DbConnection, ProcedureResultSender<T>) + Send + 'static,
|
||
) -> Result<T, SpacetimeClientError>
|
||
where
|
||
T: Send + 'static,
|
||
{
|
||
let started_at = Instant::now();
|
||
let metrics_guard = telemetry::begin_procedure(procedure);
|
||
let (sender, receiver) = oneshot::channel();
|
||
let result_sender = Arc::new(Mutex::new(Some(sender)));
|
||
let final_result = match self
|
||
.acquire_connection_with_timeout(self.config.procedure_timeout)
|
||
.await
|
||
{
|
||
Ok(lease) => {
|
||
let (result, failed_stage) = if let Some(connection) = lease.connection.as_ref() {
|
||
call(&connection.connection, result_sender.clone());
|
||
let stage = SpacetimeClientStage::ProcedureResult;
|
||
(
|
||
match timeout(self.config.procedure_timeout, receiver).await {
|
||
Ok(inner) => match inner {
|
||
Ok(value) => value,
|
||
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
||
},
|
||
Err(_) => Err(Self::resolve_timeout_error(Some(connection), stage)),
|
||
},
|
||
stage,
|
||
)
|
||
} else {
|
||
(
|
||
Err(SpacetimeClientError::Runtime(
|
||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||
)),
|
||
SpacetimeClientStage::ProcedureResult,
|
||
)
|
||
};
|
||
self.release_connection(lease).await;
|
||
if let Err(error) = &result {
|
||
log_spacetime_client_failure(
|
||
"procedure",
|
||
procedure,
|
||
failed_stage,
|
||
started_at,
|
||
error,
|
||
);
|
||
}
|
||
result
|
||
}
|
||
Err(error) => {
|
||
log_spacetime_client_failure(
|
||
"procedure",
|
||
procedure,
|
||
error.stage,
|
||
started_at,
|
||
&error.error,
|
||
);
|
||
Err(error.error)
|
||
}
|
||
};
|
||
|
||
metrics_guard.finish(&final_result);
|
||
final_result
|
||
}
|
||
|
||
async fn call_reducer_after_connect(
|
||
&self,
|
||
procedure: &'static str,
|
||
call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static,
|
||
) -> Result<(), SpacetimeClientError> {
|
||
let started_at = Instant::now();
|
||
let metrics_guard = telemetry::begin_procedure(procedure);
|
||
let (sender, receiver) = oneshot::channel();
|
||
let result_sender = Arc::new(Mutex::new(Some(sender)));
|
||
let final_result = match self
|
||
.acquire_connection_with_timeout(self.config.procedure_timeout)
|
||
.await
|
||
{
|
||
Ok(lease) => {
|
||
let (result, failed_stage) = if let Some(connection) = lease.connection.as_ref() {
|
||
call(&connection.connection, result_sender.clone());
|
||
let stage = SpacetimeClientStage::ReducerResult;
|
||
(
|
||
match timeout(self.config.procedure_timeout, receiver).await {
|
||
Ok(inner) => match inner {
|
||
Ok(value) => value,
|
||
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
||
},
|
||
Err(_) => Err(Self::resolve_timeout_error(Some(connection), stage)),
|
||
},
|
||
stage,
|
||
)
|
||
} else {
|
||
(
|
||
Err(SpacetimeClientError::Runtime(
|
||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||
)),
|
||
SpacetimeClientStage::ReducerResult,
|
||
)
|
||
};
|
||
self.release_connection(lease).await;
|
||
if let Err(error) = &result {
|
||
log_spacetime_client_failure(
|
||
"reducer",
|
||
procedure,
|
||
failed_stage,
|
||
started_at,
|
||
error,
|
||
);
|
||
}
|
||
result
|
||
}
|
||
Err(error) => {
|
||
log_spacetime_client_failure(
|
||
"reducer",
|
||
procedure,
|
||
error.stage,
|
||
started_at,
|
||
&error.error,
|
||
);
|
||
Err(error.error)
|
||
}
|
||
};
|
||
|
||
metrics_guard.finish(&final_result);
|
||
final_result
|
||
}
|
||
|
||
async fn read_after_connect<T>(
|
||
&self,
|
||
read_name: &'static str,
|
||
read: impl FnOnce(&DbConnection) -> Result<T, SpacetimeClientError> + Send + 'static,
|
||
) -> Result<T, SpacetimeClientError>
|
||
where
|
||
T: Send + 'static,
|
||
{
|
||
let started_at = Instant::now();
|
||
let metrics_guard = telemetry::begin_read(read_name);
|
||
let lease = match self
|
||
.acquire_connection_with_timeout(self.config.procedure_timeout)
|
||
.await
|
||
{
|
||
Ok(lease) => lease,
|
||
Err(error) => {
|
||
log_spacetime_client_failure(
|
||
"read",
|
||
read_name,
|
||
error.stage,
|
||
started_at,
|
||
&error.error,
|
||
);
|
||
let final_result = Err(error.error);
|
||
metrics_guard.finish(&final_result);
|
||
return final_result;
|
||
}
|
||
};
|
||
let final_result = if let Some(connection) = lease.connection.as_ref() {
|
||
read(&connection.connection)
|
||
} else {
|
||
Err(SpacetimeClientError::Runtime(
|
||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||
))
|
||
};
|
||
self.release_connection(lease).await;
|
||
|
||
if let Err(error) = &final_result {
|
||
log_spacetime_client_failure(
|
||
"read",
|
||
read_name,
|
||
SpacetimeClientStage::ReadCache,
|
||
started_at,
|
||
error,
|
||
);
|
||
}
|
||
metrics_guard.finish(&final_result);
|
||
final_result
|
||
}
|
||
|
||
async fn cache_creation_entry_config(&self, config: CreationEntryConfigRecord) {
|
||
*self.creation_entry_config_cache.write().await = Some(config);
|
||
}
|
||
|
||
async fn read_cached_creation_entry_config(&self) -> Option<CreationEntryConfigRecord> {
|
||
self.creation_entry_config_cache.read().await.clone()
|
||
}
|
||
|
||
pub async fn health_check(&self, probe_timeout: Duration) -> SpacetimeClientHealthSnapshot {
|
||
let timeout = if probe_timeout.is_zero() {
|
||
DEFAULT_PROCEDURE_TIMEOUT
|
||
} else {
|
||
probe_timeout
|
||
};
|
||
let started_at = Instant::now();
|
||
let checked_at_micros = current_unix_micros();
|
||
let result = self.acquire_connection_with_timeout(timeout).await;
|
||
match result {
|
||
Ok(lease) => {
|
||
self.release_connection(lease).await;
|
||
let mut health_state = self.health_state.write().await;
|
||
health_state.last_success_at_micros = Some(checked_at_micros);
|
||
health_state.last_error = None;
|
||
SpacetimeClientHealthSnapshot {
|
||
ok: true,
|
||
stage: SpacetimeClientStage::Ready,
|
||
checked_at_micros,
|
||
elapsed_ms: duration_millis_u64(started_at.elapsed()),
|
||
timeout_ms: duration_millis_u64(timeout),
|
||
error: None,
|
||
last_success_at_micros: health_state.last_success_at_micros,
|
||
last_error: health_state.last_error.clone(),
|
||
}
|
||
}
|
||
Err(error) => {
|
||
log_spacetime_client_failure(
|
||
"health_check",
|
||
"spacetime_connection",
|
||
error.stage,
|
||
started_at,
|
||
&error.error,
|
||
);
|
||
let mut health_state = self.health_state.write().await;
|
||
let error_message = error.error.to_string();
|
||
health_state.last_error = Some(error_message.clone());
|
||
SpacetimeClientHealthSnapshot {
|
||
ok: false,
|
||
stage: error.stage,
|
||
checked_at_micros,
|
||
elapsed_ms: duration_millis_u64(started_at.elapsed()),
|
||
timeout_ms: duration_millis_u64(timeout),
|
||
error: Some(error_message),
|
||
last_success_at_micros: health_state.last_success_at_micros,
|
||
last_error: health_state.last_error.clone(),
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn acquire_connection_with_timeout(
|
||
&self,
|
||
operation_timeout: Duration,
|
||
) -> Result<PooledConnectionLease, SpacetimeStageError> {
|
||
let permit = timeout(operation_timeout, self.pool.permits.clone().acquire_owned())
|
||
.await
|
||
.map_err(|_| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::PoolAcquire,
|
||
SpacetimeClientError::Timeout,
|
||
)
|
||
})?
|
||
.map_err(|error| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::PoolAcquire,
|
||
SpacetimeClientError::Runtime(error.to_string()),
|
||
)
|
||
})?;
|
||
|
||
// 持有 permit 即保证最多 pool_size 个并发持有者,必然能抢到一个空闲槽位;
|
||
// CAS 抢占后立即构造租约,后续任何失败/取消都由租约 Drop 兜底复位槽位。
|
||
let slot_index = self
|
||
.pool
|
||
.slots
|
||
.iter()
|
||
.position(|slot| {
|
||
slot.in_use
|
||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||
.is_ok()
|
||
})
|
||
.ok_or_else(|| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::PoolAcquire,
|
||
SpacetimeClientError::Runtime(
|
||
"SpacetimeDB 连接池 permit 与槽位状态不一致".to_string(),
|
||
),
|
||
)
|
||
})?;
|
||
|
||
let mut lease = PooledConnectionLease {
|
||
slot_index,
|
||
connection: None,
|
||
pool: self.pool.clone(),
|
||
_permit: permit,
|
||
};
|
||
|
||
let reusable_connection = self.pool.slots[slot_index]
|
||
.connection
|
||
.lock()
|
||
.await
|
||
.take()
|
||
.filter(|connection| !connection.is_broken());
|
||
|
||
let connection = if let Some(connection) = reusable_connection {
|
||
connection
|
||
} else {
|
||
// 建连失败时直接返回错误,槽位与 permit 由 lease Drop 自动归还。
|
||
self.build_pooled_connection(operation_timeout).await?
|
||
};
|
||
|
||
lease.connection = Some(connection);
|
||
Ok(lease)
|
||
}
|
||
|
||
async fn build_pooled_connection(
|
||
&self,
|
||
operation_timeout: Duration,
|
||
) -> Result<PooledConnection, SpacetimeStageError> {
|
||
let config = self.config.clone();
|
||
let broken = Arc::new(AtomicBool::new(false));
|
||
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
|
||
let connect_sender = Arc::new(Mutex::new(Some(sender)));
|
||
let broken_flag = broken.clone();
|
||
let disconnect_sender = connect_sender.clone();
|
||
let connection = timeout(
|
||
operation_timeout,
|
||
tokio::task::spawn_blocking(move || {
|
||
DbConnection::builder()
|
||
.with_uri(config.server_url)
|
||
.with_database_name(config.database)
|
||
.with_token(config.token)
|
||
.on_connect(move |_, _, _| {
|
||
send_connect_once(&connect_sender, Ok(()));
|
||
})
|
||
.on_disconnect(move |_, error| {
|
||
broken_flag.store(true, Ordering::SeqCst);
|
||
let message = error
|
||
.map(|error| error.to_string())
|
||
.unwrap_or_else(|| "SpacetimeDB 连接已断开".to_string());
|
||
send_connect_once(
|
||
&disconnect_sender,
|
||
Err(SpacetimeClientError::Procedure(message)),
|
||
);
|
||
})
|
||
.build()
|
||
.map_err(|error| SpacetimeClientError::Build(error.to_string()))
|
||
}),
|
||
)
|
||
.await
|
||
.map_err(|_| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::ConnectBuild,
|
||
SpacetimeClientError::Timeout,
|
||
)
|
||
})?
|
||
.map_err(|error| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::ConnectBuild,
|
||
SpacetimeClientError::Runtime(error.to_string()),
|
||
)
|
||
})?
|
||
.map_err(|error| SpacetimeStageError::new(SpacetimeClientStage::ConnectBuild, error))?;
|
||
|
||
let runner = connection.run_threaded();
|
||
timeout(operation_timeout, receiver)
|
||
.await
|
||
.map_err(|_| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::ConnectHandshake,
|
||
SpacetimeClientError::Timeout,
|
||
)
|
||
})?
|
||
.map_err(|_| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::ConnectHandshake,
|
||
SpacetimeClientError::ConnectDropped,
|
||
)
|
||
})?
|
||
.map_err(|error| {
|
||
SpacetimeStageError::new(SpacetimeClientStage::ConnectHandshake, error)
|
||
})?;
|
||
|
||
let read_model_subscriptions = self
|
||
.subscribe_cached_read_models(&connection, broken.clone(), operation_timeout)
|
||
.await?;
|
||
|
||
Ok(PooledConnection {
|
||
connection,
|
||
_read_model_subscriptions: read_model_subscriptions,
|
||
runner: Some(runner),
|
||
broken,
|
||
})
|
||
}
|
||
|
||
async fn subscribe_cached_read_models(
|
||
&self,
|
||
connection: &DbConnection,
|
||
broken: Arc<AtomicBool>,
|
||
operation_timeout: Duration,
|
||
) -> Result<Vec<SubscriptionHandle>, SpacetimeStageError> {
|
||
let mut subscriptions = Vec::new();
|
||
for query in [
|
||
"SELECT * FROM public_work_gallery_entry",
|
||
"SELECT * FROM public_work_detail_entry",
|
||
"SELECT * FROM bark_battle_gallery_view",
|
||
"SELECT * FROM puzzle_gallery_card_view",
|
||
"SELECT * FROM puzzle_clear_gallery_card_view",
|
||
"SELECT * FROM jump_hop_gallery_card_view",
|
||
"SELECT * FROM wooden_fish_gallery_card_view",
|
||
"SELECT * FROM custom_world_gallery_entry",
|
||
"SELECT * FROM match_3_d_gallery_view",
|
||
"SELECT * FROM square_hole_gallery_view",
|
||
"SELECT * FROM visual_novel_gallery_view",
|
||
"SELECT * FROM big_fish_gallery_view",
|
||
] {
|
||
let subscription = self
|
||
.subscribe_cached_read_model_query(
|
||
connection,
|
||
broken.clone(),
|
||
query,
|
||
true,
|
||
operation_timeout,
|
||
)
|
||
.await?;
|
||
subscriptions.push(subscription);
|
||
}
|
||
|
||
for query in [
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'puzzle'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'puzzle-clear'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'jump-hop'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'wooden-fish'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'custom-world'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'match3d'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'square-hole'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'visual-novel'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'big-fish'",
|
||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'bark-battle'",
|
||
"SELECT * FROM creation_entry_config",
|
||
"SELECT * FROM creation_entry_type_config",
|
||
"SELECT * FROM asset_object",
|
||
] {
|
||
if let Ok(subscription) = self
|
||
.subscribe_cached_read_model_query(
|
||
connection,
|
||
broken.clone(),
|
||
query,
|
||
false,
|
||
operation_timeout,
|
||
)
|
||
.await
|
||
{
|
||
subscriptions.push(subscription);
|
||
}
|
||
}
|
||
|
||
Ok(subscriptions)
|
||
}
|
||
|
||
async fn subscribe_cached_read_model_query(
|
||
&self,
|
||
connection: &DbConnection,
|
||
broken: Arc<AtomicBool>,
|
||
query: &'static str,
|
||
mark_broken_on_error: bool,
|
||
operation_timeout: Duration,
|
||
) -> Result<SubscriptionHandle, SpacetimeStageError> {
|
||
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
|
||
let applied_sender = Arc::new(Mutex::new(Some(sender)));
|
||
let on_applied_sender = applied_sender.clone();
|
||
let on_error_sender = applied_sender.clone();
|
||
let broken_flag = broken.clone();
|
||
let subscription = connection
|
||
.subscription_builder()
|
||
.on_applied(move |_| {
|
||
send_connect_once(&on_applied_sender, Ok(()));
|
||
})
|
||
.on_error(move |_, error| {
|
||
if mark_broken_on_error {
|
||
broken_flag.store(true, Ordering::SeqCst);
|
||
}
|
||
send_connect_once(
|
||
&on_error_sender,
|
||
Err(SpacetimeClientError::Procedure(error.to_string())),
|
||
);
|
||
})
|
||
.subscribe(query);
|
||
|
||
timeout(operation_timeout, receiver)
|
||
.await
|
||
.map_err(|_| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::ReadModelSubscribe,
|
||
SpacetimeClientError::Timeout,
|
||
)
|
||
})?
|
||
.map_err(|_| {
|
||
SpacetimeStageError::new(
|
||
SpacetimeClientStage::ReadModelSubscribe,
|
||
SpacetimeClientError::ConnectDropped,
|
||
)
|
||
})?
|
||
.map_err(|error| {
|
||
SpacetimeStageError::new(SpacetimeClientStage::ReadModelSubscribe, error)
|
||
})?;
|
||
|
||
Ok(subscription)
|
||
}
|
||
|
||
async fn release_connection(&self, lease: PooledConnectionLease) {
|
||
// 显式归还与“请求被取消”的隐式归还共用同一套租约 Drop 兜底逻辑,
|
||
// 保证任何路径下槽位与 permit 都会复位,连接池不会被慢慢泄漏占满。
|
||
drop(lease);
|
||
}
|
||
|
||
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
|
||
fn resolve_timeout_error(
|
||
connection: Option<&PooledConnection>,
|
||
_stage: SpacetimeClientStage,
|
||
) -> SpacetimeClientError {
|
||
if let Some(connection) = connection {
|
||
if connection.is_broken() {
|
||
return SpacetimeClientError::ConnectDropped;
|
||
}
|
||
connection.mark_broken();
|
||
}
|
||
|
||
SpacetimeClientError::Timeout
|
||
}
|
||
}
|
||
|
||
fn current_unix_micros() -> i64 {
|
||
std::time::SystemTime::now()
|
||
.duration_since(std::time::UNIX_EPOCH)
|
||
.map(|duration| duration.as_micros() as i64)
|
||
.unwrap_or(0)
|
||
}
|
||
|
||
fn current_public_work_day() -> i64 {
|
||
current_unix_micros().div_euclid(PUBLIC_WORK_PLAY_DAY_MICROS)
|
||
}
|
||
|
||
fn duration_millis_u64(duration: Duration) -> u64 {
|
||
duration.as_millis().min(u64::MAX as u128) as u64
|
||
}
|
||
|
||
fn log_spacetime_client_failure(
|
||
operation_kind: &'static str,
|
||
operation_name: &'static str,
|
||
stage: SpacetimeClientStage,
|
||
started_at: Instant,
|
||
error: &SpacetimeClientError,
|
||
) {
|
||
warn!(
|
||
operation_kind,
|
||
operation_name,
|
||
spacetime_stage = stage.as_str(),
|
||
elapsed_ms = duration_millis_u64(started_at.elapsed()),
|
||
error = %error,
|
||
"SpacetimeDB client operation failed"
|
||
);
|
||
}
|
||
|
||
fn public_work_recent_play_counts(
|
||
connection: &DbConnection,
|
||
source_type: &str,
|
||
) -> HashMap<String, u32> {
|
||
let current_day = current_public_work_day();
|
||
let first_day = current_day - (PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS - 1);
|
||
let mut counts = HashMap::new();
|
||
|
||
for row in connection.db().public_work_play_daily_stat().iter() {
|
||
if row.source_type != source_type
|
||
|| row.played_day < first_day
|
||
|| row.played_day > current_day
|
||
{
|
||
continue;
|
||
}
|
||
let entry: &mut u32 = counts.entry(row.profile_id).or_insert(0);
|
||
*entry = (*entry).saturating_add(row.play_count);
|
||
}
|
||
|
||
counts
|
||
}
|
||
|
||
impl SpacetimeClientError {
|
||
pub(crate) fn from_sdk_error(error: impl fmt::Display) -> Self {
|
||
Self::Procedure(error.to_string())
|
||
}
|
||
|
||
pub(crate) fn validation_failed(error: impl fmt::Display) -> Self {
|
||
Self::Runtime(error.to_string())
|
||
}
|
||
|
||
pub(crate) fn reducer_failed(message: String) -> Self {
|
||
Self::Runtime(message)
|
||
}
|
||
|
||
pub(crate) fn procedure_failed(message: Option<String>) -> Self {
|
||
Self::Procedure(message.unwrap_or_else(|| "SpacetimeDB procedure 返回未知错误".to_string()))
|
||
}
|
||
|
||
pub(crate) fn missing_snapshot(label: &'static str) -> Self {
|
||
Self::Procedure(format!("SpacetimeDB procedure 未返回{label}"))
|
||
}
|
||
}
|
||
|
||
impl PooledConnection {
|
||
fn is_broken(&self) -> bool {
|
||
self.broken.load(Ordering::SeqCst)
|
||
}
|
||
|
||
fn mark_broken(&self) {
|
||
self.broken.store(true, Ordering::SeqCst);
|
||
}
|
||
}
|
||
|
||
impl Drop for PooledConnection {
|
||
fn drop(&mut self) {
|
||
let _ = self.connection.disconnect();
|
||
if let Some(runner) = self.runner.take() {
|
||
drop(runner);
|
||
}
|
||
}
|
||
}
|
||
|
||
impl fmt::Debug for SpacetimeClient {
|
||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||
f.debug_struct("SpacetimeClient")
|
||
.field("config", &self.config)
|
||
.field("pool_size", &self.pool.slots.len())
|
||
.finish()
|
||
}
|
||
}
|
||
|
||
fn send_once<T>(sender: &ProcedureResultSender<T>, result: Result<T, SpacetimeClientError>) {
|
||
if let Some(sender) = sender
|
||
.lock()
|
||
.expect("spacetime result sender should not poison")
|
||
.take()
|
||
{
|
||
let _ = sender.send(result);
|
||
}
|
||
}
|
||
|
||
fn send_reducer_once(sender: &ReducerResultSender, result: Result<(), SpacetimeClientError>) {
|
||
if let Some(sender) = sender
|
||
.lock()
|
||
.expect("spacetime reducer result sender should not poison")
|
||
.take()
|
||
{
|
||
let _ = sender.send(result);
|
||
}
|
||
}
|
||
|
||
fn send_connect_once(
|
||
sender: &Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>,
|
||
result: Result<(), SpacetimeClientError>,
|
||
) {
|
||
if let Some(sender) = sender
|
||
.lock()
|
||
.expect("spacetime connect sender should not poison")
|
||
.take()
|
||
{
|
||
let _ = sender.send(result);
|
||
}
|
||
}
|
||
|
||
impl fmt::Display for SpacetimeClientError {
|
||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||
match self {
|
||
Self::Build(message) | Self::Procedure(message) | Self::Runtime(message) => {
|
||
f.write_str(message)
|
||
}
|
||
Self::ConnectDropped => f.write_str("SpacetimeDB 连接在返回结果前已断开"),
|
||
Self::Timeout => f.write_str("SpacetimeDB procedure 调用超时"),
|
||
}
|
||
}
|
||
}
|
||
|
||
impl Error for SpacetimeClientError {}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn procedure_failed_keeps_server_message_or_default() {
|
||
assert_eq!(
|
||
SpacetimeClientError::procedure_failed(Some("领域错误".to_string())).to_string(),
|
||
"领域错误"
|
||
);
|
||
assert_eq!(
|
||
SpacetimeClientError::procedure_failed(None).to_string(),
|
||
"SpacetimeDB procedure 返回未知错误"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn missing_snapshot_names_adapter_boundary() {
|
||
assert_eq!(
|
||
SpacetimeClientError::missing_snapshot("story session 快照").to_string(),
|
||
"SpacetimeDB procedure 未返回story session 快照"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn validation_and_reducer_failures_stay_runtime_classified() {
|
||
assert!(matches!(
|
||
SpacetimeClientError::validation_failed("字段缺失"),
|
||
SpacetimeClientError::Runtime(_)
|
||
));
|
||
assert!(matches!(
|
||
SpacetimeClientError::reducer_failed("状态非法".to_string()),
|
||
SpacetimeClientError::Runtime(_)
|
||
));
|
||
}
|
||
|
||
fn test_client(pool_size: u32, procedure_timeout: Duration) -> SpacetimeClient {
|
||
SpacetimeClient::new(SpacetimeClientConfig {
|
||
// 指向本机不可达端口:测试只验证连接池行为,不需要真实 SpacetimeDB。
|
||
server_url: "http://127.0.0.1:9".to_string(),
|
||
database: "pool-test".to_string(),
|
||
token: None,
|
||
pool_size,
|
||
procedure_timeout,
|
||
})
|
||
}
|
||
|
||
/// 复现线上故障机制:修复前请求 future 被取消时租约不会归还,槽位永久停留在 in_use,
|
||
/// 后续 acquire 拿着 permit 空转挂死。修复后租约 Drop 必须同时复位槽位与 permit。
|
||
#[tokio::test]
|
||
async fn dropped_lease_releases_slot_and_permit() {
|
||
let client = test_client(1, Duration::from_millis(200));
|
||
let permit = client
|
||
.pool
|
||
.permits
|
||
.clone()
|
||
.acquire_owned()
|
||
.await
|
||
.expect("permit should acquire");
|
||
client.pool.slots[0].in_use.store(true, Ordering::SeqCst);
|
||
assert_eq!(client.pool.permits.available_permits(), 0);
|
||
|
||
// 模拟请求被取消:租约未经过 release_connection 直接被 Drop。
|
||
let lease = PooledConnectionLease {
|
||
slot_index: 0,
|
||
connection: None,
|
||
pool: client.pool.clone(),
|
||
_permit: permit,
|
||
};
|
||
drop(lease);
|
||
|
||
assert!(
|
||
!client.pool.slots[0].in_use.load(Ordering::SeqCst),
|
||
"租约 Drop 后槽位必须复位,否则连接池会被泄漏占满"
|
||
);
|
||
assert_eq!(
|
||
client.pool.permits.available_permits(),
|
||
1,
|
||
"租约 Drop 后 permit 必须归还"
|
||
);
|
||
}
|
||
|
||
/// 池内 permit 全部被占用(持续在途请求)时,acquire 必须在超时窗口内返回
|
||
/// pool_acquire 超时,而不是无限等待。
|
||
#[tokio::test]
|
||
async fn acquire_times_out_at_pool_acquire_when_pool_is_busy() {
|
||
let client = test_client(1, Duration::from_millis(200));
|
||
let _held_permit = client
|
||
.pool
|
||
.permits
|
||
.clone()
|
||
.acquire_owned()
|
||
.await
|
||
.expect("permit should acquire");
|
||
|
||
let result = tokio::time::timeout(
|
||
Duration::from_secs(5),
|
||
client.acquire_connection_with_timeout(Duration::from_millis(200)),
|
||
)
|
||
.await
|
||
.expect("acquire 必须在超时窗口内返回,而不是空转挂死");
|
||
|
||
let error = match result {
|
||
Ok(_) => panic!("池占满时应返回 pool_acquire 超时"),
|
||
Err(error) => error,
|
||
};
|
||
assert_eq!(error.stage, SpacetimeClientStage::PoolAcquire);
|
||
assert!(matches!(error.error, SpacetimeClientError::Timeout));
|
||
}
|
||
}
|