perf: read gallery hot paths from spacetime cache

This commit is contained in:
kdletters
2026-05-17 00:03:07 +08:00
parent 99f539a601
commit d9c8473504
10 changed files with 347 additions and 113 deletions

View File

@@ -96,6 +96,7 @@ pub mod story_runtime;
pub mod visual_novel;
use std::{
collections::HashMap,
error::Error,
fmt,
sync::atomic::{AtomicBool, Ordering},
@@ -225,7 +226,7 @@ use module_story::{
use shared_kernel::format_timestamp_micros;
use spacetimedb_sdk::{DbContext, Table};
use tokio::{
sync::{OwnedSemaphorePermit, Semaphore, oneshot},
sync::{OwnedSemaphorePermit, RwLock, Semaphore, oneshot},
time::timeout,
};
@@ -257,6 +258,8 @@ pub struct AuthStoreSnapshotImportRecord {
pub struct SpacetimeClient {
config: SpacetimeClientConfig,
pool: Arc<SpacetimeConnectionPool>,
creation_entry_config_cache: Arc<RwLock<Option<CreationEntryConfigRecord>>>,
custom_world_gallery_legacy_sync_attempted: Arc<AtomicBool>,
}
#[derive(Debug)]
@@ -269,6 +272,8 @@ pub enum SpacetimeClientError {
}
const DEFAULT_PROCEDURE_TIMEOUT: Duration = Duration::from_secs(30);
const PUBLIC_WORK_PLAY_DAY_MICROS: i64 = 86_400_000_000;
const PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS: i64 = 7;
type ProcedureResultSender<T> =
Arc<Mutex<Option<oneshot::Sender<Result<T, SpacetimeClientError>>>>>;
@@ -286,7 +291,7 @@ struct PooledConnectionSlot {
struct PooledConnection {
connection: DbConnection,
_gallery_subscription: Vec<SubscriptionHandle>,
_read_model_subscriptions: Vec<SubscriptionHandle>,
runner: Option<JoinHandle<()>>,
broken: Arc<AtomicBool>,
}
@@ -321,7 +326,12 @@ impl SpacetimeClient {
permits: Arc::new(Semaphore::new(pool_size)),
});
Self { config, pool }
Self {
config,
pool,
creation_entry_config_cache: Arc::new(RwLock::new(None)),
custom_world_gallery_legacy_sync_attempted: Arc::new(AtomicBool::new(false)),
}
}
async fn call_after_connect<T>(
@@ -415,6 +425,14 @@ impl SpacetimeClient {
final_result
}
async fn cache_creation_entry_config(&self, config: CreationEntryConfigRecord) {
*self.creation_entry_config_cache.write().await = Some(config);
}
async fn read_cached_creation_entry_config(&self) -> Option<CreationEntryConfigRecord> {
self.creation_entry_config_cache.read().await.clone()
}
async fn acquire_connection(&self) -> Result<PooledConnectionLease, SpacetimeClientError> {
let permit = timeout(
self.config.procedure_timeout,
@@ -503,19 +521,19 @@ impl SpacetimeClient {
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
let gallery_subscription = self
.subscribe_puzzle_gallery_views(&connection, broken.clone())
let read_model_subscriptions = self
.subscribe_cached_read_models(&connection, broken.clone())
.await?;
Ok(PooledConnection {
connection,
_gallery_subscription: gallery_subscription,
_read_model_subscriptions: read_model_subscriptions,
runner: Some(runner),
broken,
})
}
async fn subscribe_puzzle_gallery_views(
async fn subscribe_cached_read_models(
&self,
connection: &DbConnection,
broken: Arc<AtomicBool>,
@@ -523,37 +541,67 @@ impl SpacetimeClient {
let mut subscriptions = Vec::new();
for query in [
"SELECT * FROM puzzle_gallery_view",
"SELECT * FROM custom_world_gallery_entry",
] {
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
let applied_sender = Arc::new(Mutex::new(Some(sender)));
let on_applied_sender = applied_sender.clone();
let on_error_sender = applied_sender.clone();
let broken_flag = broken.clone();
let subscription = connection
.subscription_builder()
.on_applied(move |_| {
send_connect_once(&on_applied_sender, Ok(()));
})
.on_error(move |_, error| {
broken_flag.store(true, Ordering::SeqCst);
send_connect_once(
&on_error_sender,
Err(SpacetimeClientError::Procedure(error.to_string())),
);
})
.subscribe(query);
timeout(self.config.procedure_timeout, receiver)
.await
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
let subscription = self
.subscribe_cached_read_model_query(connection, broken.clone(), query, true)
.await?;
subscriptions.push(subscription);
}
for query in [
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'puzzle'",
"SELECT * FROM public_work_play_daily_stat WHERE source_type = 'custom-world'",
"SELECT * FROM creation_entry_config",
"SELECT * FROM creation_entry_type_config",
] {
if let Ok(subscription) = self
.subscribe_cached_read_model_query(connection, broken.clone(), query, false)
.await
{
subscriptions.push(subscription);
}
}
Ok(subscriptions)
}
async fn subscribe_cached_read_model_query(
&self,
connection: &DbConnection,
broken: Arc<AtomicBool>,
query: &'static str,
mark_broken_on_error: bool,
) -> Result<SubscriptionHandle, SpacetimeClientError> {
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
let applied_sender = Arc::new(Mutex::new(Some(sender)));
let on_applied_sender = applied_sender.clone();
let on_error_sender = applied_sender.clone();
let broken_flag = broken.clone();
let subscription = connection
.subscription_builder()
.on_applied(move |_| {
send_connect_once(&on_applied_sender, Ok(()));
})
.on_error(move |_, error| {
if mark_broken_on_error {
broken_flag.store(true, Ordering::SeqCst);
}
send_connect_once(
&on_error_sender,
Err(SpacetimeClientError::Procedure(error.to_string())),
);
})
.subscribe(query);
timeout(self.config.procedure_timeout, receiver)
.await
.map_err(|_| SpacetimeClientError::Timeout)?
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
Ok(subscription)
}
async fn release_connection(&self, mut lease: PooledConnectionLease) {
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
slot_guard.in_use = false;
@@ -581,6 +629,39 @@ impl SpacetimeClient {
}
}
fn current_unix_micros() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_micros() as i64)
.unwrap_or(0)
}
fn current_public_work_day() -> i64 {
current_unix_micros().div_euclid(PUBLIC_WORK_PLAY_DAY_MICROS)
}
fn public_work_recent_play_counts(
connection: &DbConnection,
source_type: &str,
) -> HashMap<String, u32> {
let current_day = current_public_work_day();
let first_day = current_day - (PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS - 1);
let mut counts = HashMap::new();
for row in connection.db().public_work_play_daily_stat().iter() {
if row.source_type != source_type
|| row.played_day < first_day
|| row.played_day > current_day
{
continue;
}
let entry: &mut u32 = counts.entry(row.profile_id).or_insert(0);
*entry = (*entry).saturating_add(row.play_count);
}
counts
}
impl SpacetimeClientError {
pub(crate) fn from_sdk_error(error: impl fmt::Display) -> Self {
Self::Procedure(error.to_string())