optimize puzzle gallery access
This commit is contained in:
@@ -142,24 +142,6 @@ use module_npc::{
|
||||
NpcStanceProfile as DomainNpcStanceProfile, NpcStateSnapshot as DomainNpcStateSnapshot,
|
||||
ResolveNpcInteractionInput as DomainResolveNpcInteractionInput,
|
||||
};
|
||||
use module_puzzle::{
|
||||
PuzzleAgentMessageSnapshot as DomainPuzzleAgentMessageSnapshot,
|
||||
PuzzleAgentSessionSnapshot as DomainPuzzleAgentSessionSnapshot,
|
||||
PuzzleAgentSuggestedAction as DomainPuzzleAgentSuggestedAction,
|
||||
PuzzleAnchorItem as DomainPuzzleAnchorItem, PuzzleAnchorPack as DomainPuzzleAnchorPack,
|
||||
PuzzleBoardSnapshot as DomainPuzzleBoardSnapshot,
|
||||
PuzzleCellPosition as DomainPuzzleCellPosition,
|
||||
PuzzleCreatorIntent as DomainPuzzleCreatorIntent, PuzzleDraftLevel as DomainPuzzleDraftLevel,
|
||||
PuzzleGeneratedImageCandidate as DomainPuzzleGeneratedImageCandidate,
|
||||
PuzzleMergedGroupState as DomainPuzzleMergedGroupState,
|
||||
PuzzlePieceState as DomainPuzzlePieceState, PuzzleResultDraft as DomainPuzzleResultDraft,
|
||||
PuzzleResultPreviewBlocker as DomainPuzzleResultPreviewBlocker,
|
||||
PuzzleResultPreviewEnvelope as DomainPuzzleResultPreviewEnvelope,
|
||||
PuzzleResultPreviewFinding as DomainPuzzleResultPreviewFinding,
|
||||
PuzzleRunSnapshot as DomainPuzzleRunSnapshot,
|
||||
PuzzleRuntimeLevelSnapshot as DomainPuzzleRuntimeLevelSnapshot,
|
||||
PuzzleWorkProfile as DomainPuzzleWorkProfile,
|
||||
};
|
||||
use module_runtime::{
|
||||
AnalyticsMetricQueryResponse as DomainAnalyticsMetricQueryResponse, RuntimeBrowseHistoryRecord,
|
||||
RuntimePlatformTheme as DomainRuntimePlatformTheme, RuntimeProfileDashboardRecord,
|
||||
@@ -222,7 +204,7 @@ use module_story::{
|
||||
build_story_continue_input, build_story_session_input, build_story_session_state_input,
|
||||
};
|
||||
use shared_kernel::format_timestamp_micros;
|
||||
use spacetimedb_sdk::DbContext;
|
||||
use spacetimedb_sdk::{DbContext, Table};
|
||||
use tokio::{
|
||||
sync::{OwnedSemaphorePermit, Semaphore, oneshot},
|
||||
time::timeout,
|
||||
@@ -285,6 +267,7 @@ struct PooledConnectionSlot {
|
||||
|
||||
struct PooledConnection {
|
||||
connection: DbConnection,
|
||||
_gallery_subscription: Vec<SubscriptionHandle>,
|
||||
runner: Option<JoinHandle<()>>,
|
||||
broken: Arc<AtomicBool>,
|
||||
}
|
||||
@@ -377,6 +360,26 @@ impl SpacetimeClient {
|
||||
final_result
|
||||
}
|
||||
|
||||
async fn read_after_connect<T>(
|
||||
&self,
|
||||
read: impl FnOnce(&DbConnection) -> Result<T, SpacetimeClientError> + Send + 'static,
|
||||
) -> Result<T, SpacetimeClientError>
|
||||
where
|
||||
T: Send + 'static,
|
||||
{
|
||||
let lease = self.acquire_connection().await?;
|
||||
let final_result = if let Some(connection) = lease.connection.as_ref() {
|
||||
read(&connection.connection)
|
||||
} else {
|
||||
Err(SpacetimeClientError::Runtime(
|
||||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||||
))
|
||||
};
|
||||
self.release_connection(lease).await;
|
||||
|
||||
final_result
|
||||
}
|
||||
|
||||
async fn acquire_connection(&self) -> Result<PooledConnectionLease, SpacetimeClientError> {
|
||||
let permit = timeout(
|
||||
self.config.procedure_timeout,
|
||||
@@ -465,13 +468,58 @@ impl SpacetimeClient {
|
||||
.map_err(|_| SpacetimeClientError::Timeout)?
|
||||
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
|
||||
|
||||
let gallery_subscription = self
|
||||
.subscribe_puzzle_gallery_views(&connection, broken.clone())
|
||||
.await?;
|
||||
|
||||
Ok(PooledConnection {
|
||||
connection,
|
||||
_gallery_subscription: gallery_subscription,
|
||||
runner: Some(runner),
|
||||
broken,
|
||||
})
|
||||
}
|
||||
|
||||
async fn subscribe_puzzle_gallery_views(
|
||||
&self,
|
||||
connection: &DbConnection,
|
||||
broken: Arc<AtomicBool>,
|
||||
) -> Result<Vec<SubscriptionHandle>, SpacetimeClientError> {
|
||||
let mut subscriptions = Vec::new();
|
||||
for query in [
|
||||
"SELECT * FROM puzzle_gallery_view",
|
||||
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'puzzle'",
|
||||
] {
|
||||
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
|
||||
let applied_sender = Arc::new(Mutex::new(Some(sender)));
|
||||
let on_applied_sender = applied_sender.clone();
|
||||
let on_error_sender = applied_sender.clone();
|
||||
let broken_flag = broken.clone();
|
||||
let subscription = connection
|
||||
.subscription_builder()
|
||||
.on_applied(move |_| {
|
||||
send_connect_once(&on_applied_sender, Ok(()));
|
||||
})
|
||||
.on_error(move |_, error| {
|
||||
broken_flag.store(true, Ordering::SeqCst);
|
||||
send_connect_once(
|
||||
&on_error_sender,
|
||||
Err(SpacetimeClientError::Procedure(error.to_string())),
|
||||
);
|
||||
})
|
||||
.subscribe(query);
|
||||
|
||||
timeout(self.config.procedure_timeout, receiver)
|
||||
.await
|
||||
.map_err(|_| SpacetimeClientError::Timeout)?
|
||||
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
|
||||
|
||||
subscriptions.push(subscription);
|
||||
}
|
||||
|
||||
Ok(subscriptions)
|
||||
}
|
||||
|
||||
async fn release_connection(&self, mut lease: PooledConnectionLease) {
|
||||
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
|
||||
slot_guard.in_use = false;
|
||||
|
||||
Reference in New Issue
Block a user