621 lines
26 KiB
Rust
621 lines
26 KiB
Rust
// `module_bindings` 是 SpacetimeDB CLI 生成产物,禁止再被 rustfmt 二次改写。
|
|
#[rustfmt::skip]
|
|
pub mod module_bindings;
|
|
|
|
mod mapper;
|
|
use mapper::*;
|
|
pub use mapper::{
|
|
AiResultReferenceRecord, AiTaskMutationRecord, AiTaskRecord, AiTaskStageRecord,
|
|
AiTextChunkRecord, BattleStateRecord, BigFishAgentMessageRecord, BigFishAnchorItemRecord,
|
|
BigFishAnchorPackRecord, BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput,
|
|
BigFishAssetSlotRecord, BigFishBackgroundBlueprintRecord, BigFishDraftCompileRecordInput,
|
|
BigFishGameDraftRecord, BigFishInputSubmitRecordInput, BigFishLevelBlueprintRecord,
|
|
BigFishLikeReportRecordInput, BigFishMessageFinalizeRecordInput,
|
|
BigFishMessageSubmitRecordInput, BigFishPlayReportRecordInput, BigFishRunStartRecordInput,
|
|
BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRunRecord,
|
|
BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record,
|
|
BigFishWorkRemixRecordInput, BigFishWorkSummaryRecord, CreationEntryConfigRecord,
|
|
CustomWorldAgentActionExecuteRecord,
|
|
CustomWorldAgentActionExecuteRecordInput, CustomWorldAgentCheckpointRecord,
|
|
CustomWorldAgentMessageFinalizeRecordInput, CustomWorldAgentMessageRecord,
|
|
CustomWorldAgentMessageSubmitRecordInput, CustomWorldAgentOperationProgressRecordInput,
|
|
CustomWorldAgentOperationRecord, CustomWorldAgentSessionCreateRecordInput,
|
|
CustomWorldAgentSessionRecord, CustomWorldCheckpointRecord, CustomWorldDraftCardDetailRecord,
|
|
CustomWorldDraftCardDetailSectionRecord, CustomWorldDraftCardRecord,
|
|
CustomWorldGalleryEntryRecord, CustomWorldLibraryEntryRecord, CustomWorldLibraryMutationRecord,
|
|
CustomWorldProfileLikeReportRecordInput, CustomWorldProfilePlayReportRecordInput,
|
|
CustomWorldProfileRemixRecordInput, CustomWorldProfileUpsertRecordInput,
|
|
CustomWorldPublishGateRecord, CustomWorldPublishWorldRecord,
|
|
CustomWorldPublishWorldRecordInput, CustomWorldPublishedProfileCompileRecord,
|
|
CustomWorldResultPreviewBlockerRecord, CustomWorldSupportedActionRecord,
|
|
CustomWorldWorkSummaryRecord, Match3DAgentMessageFinalizeRecordInput,
|
|
Match3DAgentMessageRecord, Match3DAgentMessageSubmitRecordInput,
|
|
Match3DAgentSessionCreateRecordInput, Match3DAgentSessionRecord, Match3DAnchorItemRecord,
|
|
Match3DAnchorPackRecord, Match3DClickConfirmationRecord, Match3DCompileDraftRecordInput,
|
|
Match3DCreatorConfigRecord, Match3DItemSnapshotRecord, Match3DResultDraftRecord,
|
|
Match3DRunClickRecordInput, Match3DRunRecord, Match3DRunRestartRecordInput,
|
|
Match3DRunStartRecordInput, Match3DRunStopRecordInput, Match3DRunTimeUpRecordInput,
|
|
Match3DTraySlotRecord, Match3DWorkProfileRecord, Match3DWorkUpdateRecordInput,
|
|
NpcBattleInteractionRecord, NpcInteractionRecord, NpcStateRecord,
|
|
PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord,
|
|
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
|
|
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
|
|
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord,
|
|
PuzzleCreatorIntentRecord, PuzzleDraftLevelRecord, PuzzleFormDraftRecord,
|
|
PuzzleFormDraftSaveRecordInput,
|
|
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
|
|
PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzleMergedGroupRecord,
|
|
PuzzlePieceStateRecord, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord,
|
|
PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord,
|
|
PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunNextLevelRecordInput,
|
|
PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord,
|
|
PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleRuntimeLevelRecord,
|
|
PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput,
|
|
PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput,
|
|
PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput,
|
|
PuzzleWorkUpsertRecordInput, ResolveCombatActionRecord, ResolveNpcBattleInteractionInput,
|
|
SquareHoleAgentMessageFinalizeRecordInput, SquareHoleAgentMessageRecord,
|
|
SquareHoleAgentMessageSubmitRecordInput, SquareHoleAgentSessionCreateRecordInput,
|
|
SquareHoleAgentSessionRecord, SquareHoleAnchorItemRecord, SquareHoleAnchorPackRecord,
|
|
SquareHoleCompileDraftRecordInput, SquareHoleCreatorConfigRecord,
|
|
SquareHoleDropConfirmationRecord, SquareHoleDropFeedbackRecord, SquareHoleHoleOptionRecord,
|
|
SquareHoleHoleSnapshotRecord, SquareHoleResultDraftRecord, SquareHoleRunDropRecordInput,
|
|
SquareHoleRunRecord, SquareHoleRunRestartRecordInput, SquareHoleRunStartRecordInput,
|
|
SquareHoleRunStopRecordInput, SquareHoleRunTimeUpRecordInput, SquareHoleShapeOptionRecord,
|
|
SquareHoleShapeSnapshotRecord, SquareHoleWorkProfileRecord, SquareHoleWorkUpdateRecordInput,
|
|
VisualNovelAgentMessageFinalizeRecordInput, VisualNovelAgentMessageRecord,
|
|
VisualNovelAgentMessageSubmitRecordInput, VisualNovelAgentSessionCreateRecordInput,
|
|
VisualNovelAgentSessionRecord, VisualNovelHistoryEntryRecord,
|
|
VisualNovelHistoryEntryRecordInput, VisualNovelRunRecord, VisualNovelRunSnapshotRecordInput,
|
|
VisualNovelRunStartRecordInput, VisualNovelRuntimeEventRecord,
|
|
VisualNovelRuntimeEventRecordInput, VisualNovelWorkCompileRecordInput,
|
|
VisualNovelWorkProfileRecord, VisualNovelWorkUpdateRecordInput,
|
|
};
|
|
|
|
pub mod ai;
|
|
pub mod assets;
|
|
pub mod auth;
|
|
pub mod big_fish;
|
|
pub mod combat;
|
|
pub mod custom_world;
|
|
pub mod inventory;
|
|
pub mod match3d;
|
|
pub mod npc;
|
|
pub mod puzzle;
|
|
pub mod runtime;
|
|
pub mod square_hole;
|
|
pub mod story;
|
|
pub mod story_runtime;
|
|
pub mod visual_novel;
|
|
|
|
use std::{
|
|
error::Error,
|
|
fmt,
|
|
sync::atomic::{AtomicBool, Ordering},
|
|
sync::{Arc, Mutex},
|
|
thread::JoinHandle,
|
|
time::Duration,
|
|
};
|
|
|
|
use module_ai::{
|
|
AiResultReferenceInput as DomainAiResultReferenceInput,
|
|
AiResultReferenceKind as DomainAiResultReferenceKind,
|
|
AiStageCompletionInput as DomainAiStageCompletionInput,
|
|
AiTaskCancelInput as DomainAiTaskCancelInput, AiTaskCreateInput as DomainAiTaskCreateInput,
|
|
AiTaskFailureInput as DomainAiTaskFailureInput, AiTaskFinishInput as DomainAiTaskFinishInput,
|
|
AiTaskKind as DomainAiTaskKind, AiTaskStageBlueprint as DomainAiTaskStageBlueprint,
|
|
AiTaskStageKind as DomainAiTaskStageKind, AiTaskStageStartInput as DomainAiTaskStageStartInput,
|
|
AiTaskStartInput as DomainAiTaskStartInput,
|
|
AiTextChunkAppendInput as DomainAiTextChunkAppendInput,
|
|
};
|
|
use module_assets::{
|
|
AssetEntityBindingRecord, AssetHistoryEntryRecord, AssetObjectAccessPolicy, AssetObjectRecord,
|
|
build_asset_entity_binding_record, build_asset_history_entry_record, build_asset_object_record,
|
|
};
|
|
use module_combat::{
|
|
BattleMode as DomainBattleMode, BattleStateInput as DomainBattleStateInput,
|
|
BattleStateQueryInput as DomainBattleStateQueryInput,
|
|
BattleStateSnapshot as DomainBattleStateSnapshot, BattleStatus as DomainBattleStatus,
|
|
CombatOutcome as DomainCombatOutcome,
|
|
ResolveCombatActionInput as DomainResolveCombatActionInput,
|
|
ResolveCombatActionResult as DomainResolveCombatActionResult, build_battle_state_query_input,
|
|
validate_battle_state_input, validate_resolve_combat_action_input,
|
|
};
|
|
use module_custom_world::CustomWorldThemeMode as DomainCustomWorldThemeMode;
|
|
use module_inventory::{
|
|
RuntimeInventoryStateQueryInput as DomainRuntimeInventoryStateQueryInput,
|
|
RuntimeInventoryStateRecord,
|
|
RuntimeInventoryStateSnapshot as DomainRuntimeInventoryStateSnapshot,
|
|
build_runtime_inventory_state_query_input, build_runtime_inventory_state_record,
|
|
};
|
|
use module_npc::{
|
|
NpcInteractionBattleMode as DomainNpcInteractionBattleMode,
|
|
NpcInteractionResult as DomainNpcInteractionResult,
|
|
NpcInteractionStatus as DomainNpcInteractionStatus,
|
|
NpcRelationStance as DomainNpcRelationStance, NpcRelationState as DomainNpcRelationState,
|
|
NpcStanceProfile as DomainNpcStanceProfile, NpcStateSnapshot as DomainNpcStateSnapshot,
|
|
ResolveNpcInteractionInput as DomainResolveNpcInteractionInput,
|
|
};
|
|
use module_puzzle::{
|
|
PuzzleAgentMessageSnapshot as DomainPuzzleAgentMessageSnapshot,
|
|
PuzzleAgentSessionSnapshot as DomainPuzzleAgentSessionSnapshot,
|
|
PuzzleAgentSuggestedAction as DomainPuzzleAgentSuggestedAction,
|
|
PuzzleAnchorItem as DomainPuzzleAnchorItem, PuzzleAnchorPack as DomainPuzzleAnchorPack,
|
|
PuzzleBoardSnapshot as DomainPuzzleBoardSnapshot,
|
|
PuzzleCellPosition as DomainPuzzleCellPosition,
|
|
PuzzleCreatorIntent as DomainPuzzleCreatorIntent, PuzzleDraftLevel as DomainPuzzleDraftLevel,
|
|
PuzzleGeneratedImageCandidate as DomainPuzzleGeneratedImageCandidate,
|
|
PuzzleMergedGroupState as DomainPuzzleMergedGroupState,
|
|
PuzzlePieceState as DomainPuzzlePieceState, PuzzleResultDraft as DomainPuzzleResultDraft,
|
|
PuzzleResultPreviewBlocker as DomainPuzzleResultPreviewBlocker,
|
|
PuzzleResultPreviewEnvelope as DomainPuzzleResultPreviewEnvelope,
|
|
PuzzleResultPreviewFinding as DomainPuzzleResultPreviewFinding,
|
|
PuzzleRunSnapshot as DomainPuzzleRunSnapshot,
|
|
PuzzleRuntimeLevelSnapshot as DomainPuzzleRuntimeLevelSnapshot,
|
|
PuzzleWorkProfile as DomainPuzzleWorkProfile,
|
|
};
|
|
use module_runtime::{
|
|
AnalyticsMetricQueryResponse as DomainAnalyticsMetricQueryResponse, RuntimeBrowseHistoryRecord,
|
|
RuntimePlatformTheme as DomainRuntimePlatformTheme, RuntimeProfileDashboardRecord,
|
|
RuntimeProfileFeedbackSubmissionRecord, RuntimeProfileInviteCodeRecord,
|
|
RuntimeProfilePlayStatsRecord, RuntimeProfileRechargeCenterRecord,
|
|
RuntimeProfileRechargeOrderRecord,
|
|
RuntimeProfileRedeemCodeMode as DomainRuntimeProfileRedeemCodeMode,
|
|
RuntimeProfileRedeemCodeRecord, RuntimeProfileRewardCodeRedeemRecord,
|
|
RuntimeProfileSaveArchiveRecord, RuntimeProfileTaskCenterRecord, RuntimeProfileTaskClaimRecord,
|
|
RuntimeProfileTaskConfigRecord, RuntimeProfileTaskCycle as DomainRuntimeProfileTaskCycle,
|
|
RuntimeProfileTaskStatus as DomainRuntimeProfileTaskStatus,
|
|
RuntimeProfileWalletLedgerEntryRecord, RuntimeReferralInviteCenterRecord,
|
|
RuntimeReferralRedeemRecord, RuntimeSettingsRecord, RuntimeSnapshotRecord,
|
|
RuntimeTrackingScopeKind as DomainRuntimeTrackingScopeKind, build_analytics_metric_query_input,
|
|
build_runtime_browse_history_clear_input, build_runtime_browse_history_list_input,
|
|
build_runtime_browse_history_record, build_runtime_browse_history_sync_input,
|
|
build_runtime_profile_dashboard_get_input, build_runtime_profile_dashboard_record,
|
|
build_runtime_profile_feedback_submission_input,
|
|
build_runtime_profile_feedback_submission_record,
|
|
build_runtime_profile_invite_code_admin_list_input,
|
|
build_runtime_profile_invite_code_admin_upsert_input, build_runtime_profile_invite_code_record,
|
|
build_runtime_profile_play_stats_get_input, build_runtime_profile_play_stats_record,
|
|
build_runtime_profile_recharge_center_get_input, build_runtime_profile_recharge_center_record,
|
|
build_runtime_profile_recharge_order_create_input,
|
|
build_runtime_profile_redeem_code_admin_disable_input,
|
|
build_runtime_profile_redeem_code_admin_list_input,
|
|
build_runtime_profile_redeem_code_admin_upsert_input, build_runtime_profile_redeem_code_record,
|
|
build_runtime_profile_reward_code_redeem_input,
|
|
build_runtime_profile_reward_code_redeem_record, build_runtime_profile_save_archive_list_input,
|
|
build_runtime_profile_save_archive_record, build_runtime_profile_save_archive_resume_input,
|
|
build_runtime_profile_task_center_get_input, build_runtime_profile_task_center_record,
|
|
build_runtime_profile_task_claim_input, build_runtime_profile_task_claim_record,
|
|
build_runtime_profile_task_config_admin_disable_input,
|
|
build_runtime_profile_task_config_admin_list_input,
|
|
build_runtime_profile_task_config_admin_upsert_input, build_runtime_profile_task_config_record,
|
|
build_runtime_profile_wallet_adjustment_input,
|
|
build_runtime_profile_wallet_ledger_entry_record,
|
|
build_runtime_profile_wallet_ledger_list_input, build_runtime_referral_invite_center_get_input,
|
|
build_runtime_referral_invite_center_record, build_runtime_referral_redeem_input,
|
|
build_runtime_referral_redeem_record, build_runtime_setting_get_input,
|
|
build_runtime_setting_record, build_runtime_setting_upsert_input,
|
|
build_runtime_snapshot_delete_input, build_runtime_snapshot_get_input,
|
|
build_runtime_snapshot_record, build_runtime_snapshot_upsert_input,
|
|
};
|
|
use module_runtime_item::{
|
|
RuntimeItemEquipmentSlot as DomainRuntimeItemEquipmentSlot,
|
|
RuntimeItemRewardItemRarity as DomainRuntimeItemRewardItemRarity,
|
|
RuntimeItemRewardItemSnapshot as DomainRuntimeItemRewardItemSnapshot,
|
|
normalize_reward_item_snapshot,
|
|
};
|
|
use module_story::{
|
|
StoryContinueInput as DomainStoryContinueInput, StoryEventKind as DomainStoryEventKind,
|
|
StoryEventRecord, StorySessionInput as DomainStorySessionInput, StorySessionRecord,
|
|
StorySessionResultRecord, StorySessionStateInput as DomainStorySessionStateInput,
|
|
StorySessionStateRecord, StorySessionStatus as DomainStorySessionStatus,
|
|
build_story_continue_input, build_story_session_input, build_story_session_state_input,
|
|
};
|
|
use shared_kernel::format_timestamp_micros;
|
|
use spacetimedb_sdk::DbContext;
|
|
use tokio::{
|
|
sync::{OwnedSemaphorePermit, Semaphore, oneshot},
|
|
time::timeout,
|
|
};
|
|
|
|
use crate::module_bindings::*;
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct SpacetimeClientConfig {
|
|
pub server_url: String,
|
|
pub database: String,
|
|
pub token: Option<String>,
|
|
pub pool_size: u32,
|
|
pub procedure_timeout: Duration,
|
|
}
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
|
pub struct AuthStoreSnapshotRecord {
|
|
pub snapshot_json: Option<String>,
|
|
pub updated_at_micros: Option<i64>,
|
|
}
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
|
pub struct AuthStoreSnapshotImportRecord {
|
|
pub imported_user_count: u32,
|
|
pub imported_identity_count: u32,
|
|
pub imported_refresh_session_count: u32,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct SpacetimeClient {
|
|
config: SpacetimeClientConfig,
|
|
pool: Arc<SpacetimeConnectionPool>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum SpacetimeClientError {
|
|
Build(String),
|
|
ConnectDropped,
|
|
Procedure(String),
|
|
Runtime(String),
|
|
Timeout,
|
|
}
|
|
|
|
const DEFAULT_PROCEDURE_TIMEOUT: Duration = Duration::from_secs(30);
|
|
|
|
type ProcedureResultSender<T> =
|
|
Arc<Mutex<Option<oneshot::Sender<Result<T, SpacetimeClientError>>>>>;
|
|
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
|
|
|
|
struct SpacetimeConnectionPool {
|
|
slots: Vec<tokio::sync::Mutex<PooledConnectionSlot>>,
|
|
permits: Arc<Semaphore>,
|
|
}
|
|
|
|
struct PooledConnectionSlot {
|
|
connection: Option<PooledConnection>,
|
|
in_use: bool,
|
|
}
|
|
|
|
struct PooledConnection {
|
|
connection: DbConnection,
|
|
runner: Option<JoinHandle<()>>,
|
|
broken: Arc<AtomicBool>,
|
|
}
|
|
|
|
struct PooledConnectionLease {
|
|
slot_index: usize,
|
|
connection: Option<PooledConnection>,
|
|
_permit: OwnedSemaphorePermit,
|
|
}
|
|
|
|
impl SpacetimeClient {
|
|
pub fn new(config: SpacetimeClientConfig) -> Self {
|
|
let pool_size = config.pool_size.max(1) as usize;
|
|
let config = SpacetimeClientConfig {
|
|
procedure_timeout: if config.procedure_timeout.is_zero() {
|
|
DEFAULT_PROCEDURE_TIMEOUT
|
|
} else {
|
|
config.procedure_timeout
|
|
},
|
|
..config
|
|
};
|
|
let slots = (0..pool_size)
|
|
.map(|_| {
|
|
tokio::sync::Mutex::new(PooledConnectionSlot {
|
|
connection: None,
|
|
in_use: false,
|
|
})
|
|
})
|
|
.collect::<Vec<_>>();
|
|
let pool = Arc::new(SpacetimeConnectionPool {
|
|
slots,
|
|
permits: Arc::new(Semaphore::new(pool_size)),
|
|
});
|
|
|
|
Self { config, pool }
|
|
}
|
|
|
|
async fn call_after_connect<T>(
|
|
&self,
|
|
call: impl FnOnce(&DbConnection, ProcedureResultSender<T>) + Send + 'static,
|
|
) -> Result<T, SpacetimeClientError>
|
|
where
|
|
T: Send + 'static,
|
|
{
|
|
let (sender, receiver) = oneshot::channel();
|
|
let result_sender = Arc::new(Mutex::new(Some(sender)));
|
|
let lease = self.acquire_connection().await?;
|
|
let final_result = if let Some(connection) = lease.connection.as_ref() {
|
|
call(&connection.connection, result_sender.clone());
|
|
match timeout(self.config.procedure_timeout, receiver).await {
|
|
Ok(inner) => match inner {
|
|
Ok(value) => value,
|
|
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
|
},
|
|
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
|
|
}
|
|
} else {
|
|
Err(SpacetimeClientError::Runtime(
|
|
"SpacetimeDB 连接租约缺少连接".to_string(),
|
|
))
|
|
};
|
|
self.release_connection(lease).await;
|
|
|
|
final_result
|
|
}
|
|
|
|
async fn call_reducer_after_connect(
|
|
&self,
|
|
call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static,
|
|
) -> Result<(), SpacetimeClientError> {
|
|
let (sender, receiver) = oneshot::channel();
|
|
let result_sender = Arc::new(Mutex::new(Some(sender)));
|
|
let lease = self.acquire_connection().await?;
|
|
let final_result = if let Some(connection) = lease.connection.as_ref() {
|
|
call(&connection.connection, result_sender.clone());
|
|
match timeout(self.config.procedure_timeout, receiver).await {
|
|
Ok(inner) => match inner {
|
|
Ok(value) => value,
|
|
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
|
},
|
|
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
|
|
}
|
|
} else {
|
|
Err(SpacetimeClientError::Runtime(
|
|
"SpacetimeDB 连接租约缺少连接".to_string(),
|
|
))
|
|
};
|
|
self.release_connection(lease).await;
|
|
|
|
final_result
|
|
}
|
|
|
|
async fn acquire_connection(&self) -> Result<PooledConnectionLease, SpacetimeClientError> {
|
|
let permit = timeout(
|
|
self.config.procedure_timeout,
|
|
self.pool.permits.clone().acquire_owned(),
|
|
)
|
|
.await
|
|
.map_err(|_| SpacetimeClientError::Timeout)?
|
|
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))?;
|
|
|
|
loop {
|
|
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
|
|
if let Ok(mut slot_guard) = slot.try_lock() {
|
|
if slot_guard.in_use {
|
|
continue;
|
|
}
|
|
let reusable_connection = slot_guard
|
|
.connection
|
|
.take()
|
|
.filter(|connection| !connection.is_broken());
|
|
slot_guard.in_use = true;
|
|
drop(slot_guard);
|
|
|
|
let connection = if let Some(connection) = reusable_connection {
|
|
connection
|
|
} else {
|
|
match self.build_pooled_connection().await {
|
|
Ok(connection) => connection,
|
|
Err(error) => {
|
|
let mut slot_guard = self.pool.slots[slot_index].lock().await;
|
|
slot_guard.in_use = false;
|
|
return Err(error);
|
|
}
|
|
}
|
|
};
|
|
|
|
return Ok(PooledConnectionLease {
|
|
slot_index,
|
|
connection: Some(connection),
|
|
_permit: permit,
|
|
});
|
|
}
|
|
}
|
|
|
|
tokio::task::yield_now().await;
|
|
}
|
|
}
|
|
|
|
async fn build_pooled_connection(&self) -> Result<PooledConnection, SpacetimeClientError> {
|
|
let config = self.config.clone();
|
|
let broken = Arc::new(AtomicBool::new(false));
|
|
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
|
|
let connect_sender = Arc::new(Mutex::new(Some(sender)));
|
|
let broken_flag = broken.clone();
|
|
let disconnect_sender = connect_sender.clone();
|
|
let connection = tokio::task::spawn_blocking(move || {
|
|
DbConnection::builder()
|
|
.with_uri(config.server_url)
|
|
.with_database_name(config.database)
|
|
.with_token(config.token)
|
|
.on_connect(move |_, _, _| {
|
|
send_connect_once(&connect_sender, Ok(()));
|
|
})
|
|
.on_disconnect(move |_, error| {
|
|
broken_flag.store(true, Ordering::SeqCst);
|
|
let message = error
|
|
.map(|error| error.to_string())
|
|
.unwrap_or_else(|| "SpacetimeDB 连接已断开".to_string());
|
|
send_connect_once(
|
|
&disconnect_sender,
|
|
Err(SpacetimeClientError::Procedure(message)),
|
|
);
|
|
})
|
|
.build()
|
|
.map_err(|error| SpacetimeClientError::Build(error.to_string()))
|
|
})
|
|
.await
|
|
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??;
|
|
|
|
let runner = connection.run_threaded();
|
|
timeout(self.config.procedure_timeout, receiver)
|
|
.await
|
|
.map_err(|_| SpacetimeClientError::Timeout)?
|
|
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
|
|
|
|
Ok(PooledConnection {
|
|
connection,
|
|
runner: Some(runner),
|
|
broken,
|
|
})
|
|
}
|
|
|
|
async fn release_connection(&self, mut lease: PooledConnectionLease) {
|
|
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
|
|
slot_guard.in_use = false;
|
|
let Some(connection) = lease.connection.take() else {
|
|
slot_guard.connection = None;
|
|
return;
|
|
};
|
|
if connection.is_broken() {
|
|
slot_guard.connection = None;
|
|
} else {
|
|
slot_guard.connection = Some(connection);
|
|
}
|
|
}
|
|
|
|
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
|
|
fn resolve_timeout_error(connection: Option<&PooledConnection>) -> SpacetimeClientError {
|
|
if let Some(connection) = connection {
|
|
if connection.is_broken() {
|
|
return SpacetimeClientError::ConnectDropped;
|
|
}
|
|
connection.mark_broken();
|
|
}
|
|
|
|
SpacetimeClientError::Timeout
|
|
}
|
|
}
|
|
|
|
impl SpacetimeClientError {
|
|
pub(crate) fn from_sdk_error(error: impl fmt::Display) -> Self {
|
|
Self::Procedure(error.to_string())
|
|
}
|
|
|
|
pub(crate) fn validation_failed(error: impl fmt::Display) -> Self {
|
|
Self::Runtime(error.to_string())
|
|
}
|
|
|
|
pub(crate) fn reducer_failed(message: String) -> Self {
|
|
Self::Runtime(message)
|
|
}
|
|
|
|
pub(crate) fn procedure_failed(message: Option<String>) -> Self {
|
|
Self::Procedure(message.unwrap_or_else(|| "SpacetimeDB procedure 返回未知错误".to_string()))
|
|
}
|
|
|
|
pub(crate) fn missing_snapshot(label: &'static str) -> Self {
|
|
Self::Procedure(format!("SpacetimeDB procedure 未返回{label}"))
|
|
}
|
|
}
|
|
|
|
impl PooledConnection {
|
|
fn is_broken(&self) -> bool {
|
|
self.broken.load(Ordering::SeqCst)
|
|
}
|
|
|
|
fn mark_broken(&self) {
|
|
self.broken.store(true, Ordering::SeqCst);
|
|
}
|
|
}
|
|
|
|
impl Drop for PooledConnection {
|
|
fn drop(&mut self) {
|
|
let _ = self.connection.disconnect();
|
|
if let Some(runner) = self.runner.take() {
|
|
drop(runner);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl fmt::Debug for SpacetimeClient {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("SpacetimeClient")
|
|
.field("config", &self.config)
|
|
.field("pool_size", &self.pool.slots.len())
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
fn send_once<T>(sender: &ProcedureResultSender<T>, result: Result<T, SpacetimeClientError>) {
|
|
if let Some(sender) = sender
|
|
.lock()
|
|
.expect("spacetime result sender should not poison")
|
|
.take()
|
|
{
|
|
let _ = sender.send(result);
|
|
}
|
|
}
|
|
|
|
fn send_reducer_once(sender: &ReducerResultSender, result: Result<(), SpacetimeClientError>) {
|
|
if let Some(sender) = sender
|
|
.lock()
|
|
.expect("spacetime reducer result sender should not poison")
|
|
.take()
|
|
{
|
|
let _ = sender.send(result);
|
|
}
|
|
}
|
|
|
|
fn send_connect_once(
|
|
sender: &Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>,
|
|
result: Result<(), SpacetimeClientError>,
|
|
) {
|
|
if let Some(sender) = sender
|
|
.lock()
|
|
.expect("spacetime connect sender should not poison")
|
|
.take()
|
|
{
|
|
let _ = sender.send(result);
|
|
}
|
|
}
|
|
|
|
impl fmt::Display for SpacetimeClientError {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
Self::Build(message) | Self::Procedure(message) | Self::Runtime(message) => {
|
|
f.write_str(message)
|
|
}
|
|
Self::ConnectDropped => f.write_str("SpacetimeDB 连接在返回结果前已断开"),
|
|
Self::Timeout => f.write_str("SpacetimeDB procedure 调用超时"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Error for SpacetimeClientError {}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn procedure_failed_keeps_server_message_or_default() {
|
|
assert_eq!(
|
|
SpacetimeClientError::procedure_failed(Some("领域错误".to_string())).to_string(),
|
|
"领域错误"
|
|
);
|
|
assert_eq!(
|
|
SpacetimeClientError::procedure_failed(None).to_string(),
|
|
"SpacetimeDB procedure 返回未知错误"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn missing_snapshot_names_adapter_boundary() {
|
|
assert_eq!(
|
|
SpacetimeClientError::missing_snapshot("story session 快照").to_string(),
|
|
"SpacetimeDB procedure 未返回story session 快照"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn validation_and_reducer_failures_stay_runtime_classified() {
|
|
assert!(matches!(
|
|
SpacetimeClientError::validation_failed("字段缺失"),
|
|
SpacetimeClientError::Runtime(_)
|
|
));
|
|
assert!(matches!(
|
|
SpacetimeClientError::reducer_failed("状态非法".to_string()),
|
|
SpacetimeClientError::Runtime(_)
|
|
));
|
|
}
|
|
}
|