use crate::*; #[spacetimedb::table(accessor = profile_dashboard_state)] pub struct ProfileDashboardState { #[primary_key] pub(crate) user_id: String, pub(crate) wallet_balance: u64, pub(crate) total_play_time_ms: u64, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_wallet_ledger, index(accessor = by_profile_wallet_ledger_user_id, btree(columns = [user_id])), index( accessor = by_profile_wallet_ledger_user_created_at, btree(columns = [user_id, created_at]) ) )] pub struct ProfileWalletLedger { #[primary_key] pub(crate) wallet_ledger_id: String, pub(crate) user_id: String, pub(crate) amount_delta: i64, pub(crate) balance_after: u64, pub(crate) source_type: RuntimeProfileWalletLedgerSourceType, pub(crate) created_at: Timestamp, } #[spacetimedb::table(accessor = profile_redeem_code)] pub struct ProfileRedeemCode { #[primary_key] pub(crate) code: String, pub(crate) mode: RuntimeProfileRedeemCodeMode, pub(crate) reward_points: u64, pub(crate) max_uses: u32, pub(crate) global_used_count: u32, pub(crate) enabled: bool, pub(crate) allowed_user_ids: Vec, pub(crate) created_by: String, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_redeem_code_usage, index(accessor = by_profile_redeem_code_usage_code, btree(columns = [code])), index(accessor = by_profile_redeem_code_usage_user_id, btree(columns = [user_id])), index( accessor = by_profile_redeem_code_usage_code_user_id, btree(columns = [code, user_id]) ) )] pub struct ProfileRedeemCodeUsage { #[primary_key] pub(crate) usage_id: String, pub(crate) code: String, pub(crate) user_id: String, pub(crate) amount_granted: u64, pub(crate) created_at: Timestamp, } #[spacetimedb::table(accessor = profile_invite_code)] pub struct ProfileInviteCode { #[primary_key] pub(crate) user_id: String, #[unique] pub(crate) invite_code: String, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_referral_relation, index(accessor = by_profile_referral_inviter_user_id, btree(columns = [inviter_user_id])), index( accessor = by_profile_referral_inviter_bound_at, btree(columns = [inviter_user_id, bound_at]) ) )] pub struct ProfileReferralRelation { #[primary_key] pub(crate) invitee_user_id: String, pub(crate) inviter_user_id: String, pub(crate) invite_code: String, pub(crate) inviter_reward_granted: bool, pub(crate) invitee_reward_granted: bool, pub(crate) bound_at: Timestamp, } #[spacetimedb::table( accessor = profile_played_world, index(accessor = by_profile_played_world_user_id, btree(columns = [user_id])), index( accessor = by_profile_played_world_user_world_key, btree(columns = [user_id, world_key]) ), index( accessor = by_profile_played_world_user_last_played_at, btree(columns = [user_id, last_played_at]) ) )] pub struct ProfilePlayedWorld { #[primary_key] pub(crate) played_world_id: String, pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_title: String, pub(crate) world_subtitle: String, pub(crate) first_played_at: Timestamp, pub(crate) last_played_at: Timestamp, pub(crate) last_observed_play_time_ms: u64, } pub(crate) struct ProfilePlayedWorkUpsertInput { pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_title: String, pub(crate) world_subtitle: String, pub(crate) played_at_micros: i64, } #[spacetimedb::table(accessor = profile_membership)] pub struct ProfileMembership { #[primary_key] pub(crate) user_id: String, pub(crate) status: RuntimeProfileMembershipStatus, pub(crate) tier: RuntimeProfileMembershipTier, pub(crate) started_at: Timestamp, pub(crate) expires_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_recharge_order, index(accessor = by_profile_recharge_order_user_id, btree(columns = [user_id])), index( accessor = by_profile_recharge_order_user_created_at, btree(columns = [user_id, created_at]) ) )] pub struct ProfileRechargeOrder { #[primary_key] pub(crate) order_id: String, pub(crate) user_id: String, pub(crate) product_id: String, pub(crate) product_title: String, pub(crate) kind: RuntimeProfileRechargeProductKind, pub(crate) amount_cents: u64, pub(crate) status: RuntimeProfileRechargeOrderStatus, pub(crate) payment_channel: String, pub(crate) paid_at: Timestamp, pub(crate) created_at: Timestamp, pub(crate) points_delta: i64, pub(crate) membership_expires_at: Option, } #[spacetimedb::table( accessor = profile_save_archive, index(accessor = by_profile_save_archive_user_id, btree(columns = [user_id])), index( accessor = by_profile_save_archive_user_world_key, btree(columns = [user_id, world_key]) ), index( accessor = by_profile_save_archive_user_saved_at, btree(columns = [user_id, saved_at]) ) )] pub struct ProfileSaveArchive { #[primary_key] pub(crate) archive_id: String, pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_name: String, pub(crate) subtitle: String, pub(crate) summary_text: String, pub(crate) cover_image_src: Option, pub(crate) saved_at: Timestamp, pub(crate) bottom_tab: String, pub(crate) game_state_json: String, pub(crate) current_story_json: Option, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } // save archive 列表是按世界聚合后的最近一次快照视图,读取时只做排序,不再拼装默认值。 #[spacetimedb::procedure] pub fn list_profile_save_archives( ctx: &mut ProcedureContext, input: RuntimeProfileSaveArchiveListInput, ) -> RuntimeProfileSaveArchiveProcedureResult { match ctx.try_with_tx(|tx| list_profile_save_archive_rows(tx, input.clone())) { Ok(entries) => RuntimeProfileSaveArchiveProcedureResult { ok: true, entries, record: None, current_snapshot: None, error_message: None, }, Err(message) => RuntimeProfileSaveArchiveProcedureResult { ok: false, entries: Vec::new(), record: None, current_snapshot: None, error_message: Some(message), }, } } // resume 会把指定 archive 回填到当前 snapshot,并同步返回 entry + 当前 snapshot。 #[spacetimedb::procedure] pub fn resume_profile_save_archive_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileSaveArchiveResumeInput, ) -> RuntimeProfileSaveArchiveProcedureResult { match ctx.try_with_tx(|tx| resume_profile_save_archive_record(tx, input.clone())) { Ok((record, current_snapshot)) => RuntimeProfileSaveArchiveProcedureResult { ok: true, entries: Vec::new(), record: Some(record), current_snapshot: Some(current_snapshot), error_message: None, }, Err(message) => RuntimeProfileSaveArchiveProcedureResult { ok: false, entries: Vec::new(), record: None, current_snapshot: None, error_message: Some(message), }, } } // profile dashboard 当前先作为 projection 读入口返回默认零值,等待 runtime_snapshot 写链补齐刷新。 #[spacetimedb::procedure] pub fn get_profile_dashboard( ctx: &mut ProcedureContext, input: RuntimeProfileDashboardGetInput, ) -> RuntimeProfileDashboardProcedureResult { match ctx.try_with_tx(|tx| get_profile_dashboard_snapshot(tx, input.clone())) { Ok(record) => RuntimeProfileDashboardProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileDashboardProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 钱包流水当前只暴露最近 50 条只读视图,排序与截断逻辑在 procedure 内统一收口。 #[spacetimedb::procedure] pub fn list_profile_wallet_ledger( ctx: &mut ProcedureContext, input: RuntimeProfileWalletLedgerListInput, ) -> RuntimeProfileWalletLedgerProcedureResult { match ctx.try_with_tx(|tx| list_profile_wallet_ledger_entries(tx, input.clone())) { Ok(entries) => RuntimeProfileWalletLedgerProcedureResult { ok: true, entries, error_message: None, }, Err(message) => RuntimeProfileWalletLedgerProcedureResult { ok: false, entries: Vec::new(), error_message: Some(message), }, } } // 资产生成由 Axum 调用外部模型,钱包扣费必须先在 SpacetimeDB 内原子落账。 #[spacetimedb::procedure] pub fn consume_profile_wallet_points_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileWalletAdjustmentInput, ) -> RuntimeProfileWalletAdjustmentProcedureResult { match ctx.try_with_tx(|tx| { apply_profile_wallet_adjustment( tx, input.clone(), RuntimeProfileWalletLedgerSourceType::AssetOperationConsume, true, ) }) { Ok(record) => RuntimeProfileWalletAdjustmentProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileWalletAdjustmentProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 生成链路失败时由 Axum 调用退款,ledger_id 幂等保证重复补偿不会重复加钱。 #[spacetimedb::procedure] pub fn refund_profile_wallet_points_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileWalletAdjustmentInput, ) -> RuntimeProfileWalletAdjustmentProcedureResult { match ctx.try_with_tx(|tx| { apply_profile_wallet_adjustment( tx, input.clone(), RuntimeProfileWalletLedgerSourceType::AssetOperationRefund, false, ) }) { Ok(record) => RuntimeProfileWalletAdjustmentProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileWalletAdjustmentProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // play stats 与 dashboard 共用 dashboard projection 的 total_play_time / updated_at,避免 Axum 侧拼装。 #[spacetimedb::procedure] pub fn get_profile_play_stats( ctx: &mut ProcedureContext, input: RuntimeProfilePlayStatsGetInput, ) -> RuntimeProfilePlayStatsProcedureResult { match ctx.try_with_tx(|tx| get_profile_play_stats_snapshot(tx, input.clone())) { Ok(record) => RuntimeProfilePlayStatsProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfilePlayStatsProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 账户充值中心只读快照,套餐和权益由后端返回,前端不保存业务价格表。 #[spacetimedb::procedure] pub fn get_profile_recharge_center( ctx: &mut ProcedureContext, input: RuntimeProfileRechargeCenterGetInput, ) -> RuntimeProfileRechargeCenterProcedureResult { match ctx.try_with_tx(|tx| get_profile_recharge_center_snapshot(tx, input.clone())) { Ok(record) => RuntimeProfileRechargeCenterProcedureResult { ok: true, record: Some(record), order: None, error_message: None, }, Err(message) => RuntimeProfileRechargeCenterProcedureResult { ok: false, record: None, order: None, error_message: Some(message), }, } } // 当前阶段没有真实支付网关,下单后在服务端模拟支付成功并立即写入权益。 #[spacetimedb::procedure] pub fn create_profile_recharge_order_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileRechargeOrderCreateInput, ) -> RuntimeProfileRechargeCenterProcedureResult { match ctx.try_with_tx(|tx| create_profile_recharge_order_record(tx, input.clone())) { Ok((record, order)) => RuntimeProfileRechargeCenterProcedureResult { ok: true, record: Some(record), order: Some(order), error_message: None, }, Err(message) => RuntimeProfileRechargeCenterProcedureResult { ok: false, record: None, order: None, error_message: Some(message), }, } } // 邀请中心会在首次打开时为账号创建稳定邀请码,前端只展示这里返回的后端状态。 #[spacetimedb::procedure] pub fn get_profile_referral_invite_center( ctx: &mut ProcedureContext, input: RuntimeReferralInviteCenterGetInput, ) -> RuntimeReferralInviteCenterProcedureResult { match ctx.try_with_tx(|tx| get_profile_referral_invite_center_snapshot(tx, input.clone())) { Ok(record) => RuntimeReferralInviteCenterProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeReferralInviteCenterProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 填码绑定、每日邀请者奖励上限和双方叙世币发放都在同一事务内完成。 #[spacetimedb::procedure] pub fn redeem_profile_referral_invite_code( ctx: &mut ProcedureContext, input: RuntimeReferralRedeemInput, ) -> RuntimeReferralRedeemProcedureResult { match ctx.try_with_tx(|tx| redeem_profile_referral_invite_code_record(tx, input.clone())) { Ok(record) => RuntimeReferralRedeemProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeReferralRedeemProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 兑换码奖励、usage 与钱包流水必须在同一事务内落库,避免到账和计次分离。 #[spacetimedb::procedure] pub fn redeem_profile_reward_code( ctx: &mut ProcedureContext, input: RuntimeProfileRewardCodeRedeemInput, ) -> RuntimeProfileRewardCodeRedeemProcedureResult { match ctx.try_with_tx(|tx| redeem_profile_reward_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileRewardCodeRedeemProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileRewardCodeRedeemProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_upsert_profile_redeem_code( ctx: &mut ProcedureContext, input: RuntimeProfileRedeemCodeAdminUpsertInput, ) -> RuntimeProfileRedeemCodeAdminProcedureResult { match ctx.try_with_tx(|tx| admin_upsert_profile_redeem_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_disable_profile_redeem_code( ctx: &mut ProcedureContext, input: RuntimeProfileRedeemCodeAdminDisableInput, ) -> RuntimeProfileRedeemCodeAdminProcedureResult { match ctx.try_with_tx(|tx| admin_disable_profile_redeem_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } pub(crate) fn list_profile_save_archive_rows( ctx: &ReducerContext, input: RuntimeProfileSaveArchiveListInput, ) -> Result, String> { let validated_input = build_runtime_profile_save_archive_list_input(input.user_id) .map_err(|error| error.to_string())?; let mut entries = ctx .db .profile_save_archive() .iter() .filter(|row| row.user_id == validated_input.user_id) .map(|row| build_profile_save_archive_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { right .saved_at_micros .cmp(&left.saved_at_micros) .then_with(|| left.archive_id.cmp(&right.archive_id)) }); Ok(entries) } pub(crate) fn resume_profile_save_archive_record( ctx: &ReducerContext, input: RuntimeProfileSaveArchiveResumeInput, ) -> Result<(RuntimeProfileSaveArchiveSnapshot, RuntimeSnapshot), String> { let validated_input = build_runtime_profile_save_archive_resume_input(input.user_id, input.world_key) .map_err(|error| error.to_string())?; let archive = ctx .db .profile_save_archive() .iter() .find(|row| { row.user_id == validated_input.user_id && row.world_key == validated_input.world_key }) .ok_or_else(|| "profile_save_archive 对应 world_key 不存在".to_string())?; let existing_snapshot = ctx .db .runtime_snapshot() .user_id() .find(&validated_input.user_id); let created_at = existing_snapshot .as_ref() .map(|row| row.created_at) .unwrap_or(archive.saved_at); if let Some(existing) = existing_snapshot { ctx.db .runtime_snapshot() .user_id() .delete(&existing.user_id); } ctx.db.runtime_snapshot().insert(RuntimeSnapshotRow { user_id: archive.user_id.clone(), version: SAVE_SNAPSHOT_VERSION, saved_at: archive.saved_at, bottom_tab: archive.bottom_tab.clone(), game_state_json: archive.game_state_json.clone(), current_story_json: archive.current_story_json.clone(), created_at, updated_at: archive.saved_at, }); Ok(( build_profile_save_archive_snapshot_from_row(&archive), RuntimeSnapshot { user_id: archive.user_id.clone(), version: SAVE_SNAPSHOT_VERSION, saved_at_micros: archive.saved_at.to_micros_since_unix_epoch(), bottom_tab: archive.bottom_tab.clone(), game_state_json: archive.game_state_json.clone(), current_story_json: archive.current_story_json.clone(), created_at_micros: created_at.to_micros_since_unix_epoch(), updated_at_micros: archive.saved_at.to_micros_since_unix_epoch(), }, )) } pub(crate) fn sync_profile_projections_from_snapshot( ctx: &ReducerContext, snapshot: &RuntimeSnapshot, ) -> Result<(), String> { let game_state = parse_json_str(&snapshot.game_state_json)?; let game_state_object = game_state.as_object(); let saved_at = Timestamp::from_micros_since_unix_epoch(snapshot.saved_at_micros); if is_non_persistent_runtime_snapshot(&game_state) { return Ok(()); } sync_profile_dashboard_from_snapshot(ctx, snapshot, game_state_object, saved_at); sync_profile_save_archive_from_snapshot(ctx, snapshot, &game_state, saved_at)?; Ok(()) } pub(crate) fn upsert_profile_played_work( ctx: &ReducerContext, input: ProfilePlayedWorkUpsertInput, ) -> Result<(), String> { let user_id = input.user_id.trim(); let world_key = input.world_key.trim(); if user_id.is_empty() { return Err("profile_played_world.user_id 不能为空".to_string()); } if world_key.is_empty() { return Err("profile_played_world.world_key 不能为空".to_string()); } let played_at = Timestamp::from_micros_since_unix_epoch(input.played_at_micros); let played_world_id = format!("{user_id}:{world_key}"); let existing = ctx .db .profile_played_world() .played_world_id() .find(&played_world_id); if let Some(existing) = existing { ctx.db .profile_played_world() .played_world_id() .delete(&existing.played_world_id); ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: user_id.to_string(), world_key: world_key.to_string(), owner_user_id: input.owner_user_id, profile_id: input.profile_id, world_type: input.world_type, world_title: input.world_title, world_subtitle: input.world_subtitle, first_played_at: existing.first_played_at, last_played_at: played_at, last_observed_play_time_ms: existing.last_observed_play_time_ms, }); } else { ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: user_id.to_string(), world_key: world_key.to_string(), owner_user_id: input.owner_user_id, profile_id: input.profile_id, world_type: input.world_type, world_title: input.world_title, world_subtitle: input.world_subtitle, first_played_at: played_at, last_played_at: played_at, last_observed_play_time_ms: 0, }); } ensure_profile_dashboard_state(ctx, user_id, played_at); Ok(()) } pub(crate) fn add_profile_observed_play_time( ctx: &ReducerContext, user_id: &str, world_key: &str, elapsed_ms: u64, observed_at_micros: i64, ) -> Result<(), String> { let user_id = user_id.trim(); let world_key = world_key.trim(); if user_id.is_empty() || world_key.is_empty() || elapsed_ms == 0 { return Ok(()); } let observed_at = Timestamp::from_micros_since_unix_epoch(observed_at_micros); let played_world_id = format!("{user_id}:{world_key}"); if let Some(existing) = ctx .db .profile_played_world() .played_world_id() .find(&played_world_id) { ctx.db .profile_played_world() .played_world_id() .delete(&existing.played_world_id); ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: existing.user_id, world_key: existing.world_key, owner_user_id: existing.owner_user_id, profile_id: existing.profile_id, world_type: existing.world_type, world_title: existing.world_title, world_subtitle: existing.world_subtitle, first_played_at: existing.first_played_at, last_played_at: observed_at, last_observed_play_time_ms: existing .last_observed_play_time_ms .saturating_add(elapsed_ms), }); } add_profile_dashboard_play_time(ctx, user_id, elapsed_ms, observed_at); Ok(()) } fn ensure_profile_dashboard_state(ctx: &ReducerContext, user_id: &str, updated_at: Timestamp) { if ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()) .is_some() { return; } ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: 0, total_play_time_ms: 0, created_at: updated_at, updated_at, }); } fn add_profile_dashboard_play_time( ctx: &ReducerContext, user_id: &str, elapsed_ms: u64, updated_at: Timestamp, ) { let current = ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()); if let Some(existing) = current { ctx.db .profile_dashboard_state() .user_id() .delete(&existing.user_id); ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: existing.wallet_balance, total_play_time_ms: existing.total_play_time_ms.saturating_add(elapsed_ms), created_at: existing.created_at, updated_at, }); } else { ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: 0, total_play_time_ms: elapsed_ms, created_at: updated_at, updated_at, }); } } fn sync_profile_dashboard_from_snapshot( ctx: &ReducerContext, snapshot: &RuntimeSnapshot, game_state: Option<&serde_json::Map>, saved_at: Timestamp, ) { let current_state = ctx .db .profile_dashboard_state() .user_id() .find(&snapshot.user_id); let previous_wallet_balance = current_state .as_ref() .map(|row| row.wallet_balance) .unwrap_or(0); let previous_total_play_time_ms = current_state .as_ref() .map(|row| row.total_play_time_ms) .unwrap_or(0); let next_wallet_balance = read_non_negative_u64(game_state.and_then(|state| state.get("playerCurrency"))); let mut next_total_play_time_ms = previous_total_play_time_ms; if next_wallet_balance != previous_wallet_balance { ctx.db.profile_wallet_ledger().insert(ProfileWalletLedger { wallet_ledger_id: format!( "{}:{}:{}", snapshot.user_id, snapshot.saved_at_micros, next_wallet_balance ), user_id: snapshot.user_id.clone(), amount_delta: next_wallet_balance as i64 - previous_wallet_balance as i64, balance_after: next_wallet_balance, source_type: RuntimeProfileWalletLedgerSourceType::SnapshotSync, created_at: saved_at, }); } if let Some(world_meta) = resolve_profile_world_snapshot_meta(game_state) { let current_play_time_ms = read_non_negative_u64( game_state .and_then(|state| state.get("runtimeStats")) .and_then(JsonValue::as_object) .and_then(|stats| stats.get("playTimeMs")), ); let played_world_id = format!("{}:{}", snapshot.user_id, world_meta.world_key); let existing = ctx .db .profile_played_world() .played_world_id() .find(&played_world_id); let previous_observed_play_time_ms = existing .as_ref() .map(|row| row.last_observed_play_time_ms) .unwrap_or(0); let incremental_play_time_ms = current_play_time_ms.saturating_sub(previous_observed_play_time_ms); next_total_play_time_ms = next_total_play_time_ms.saturating_add(incremental_play_time_ms); if let Some(existing) = existing { ctx.db .profile_played_world() .played_world_id() .delete(&existing.played_world_id); ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: snapshot.user_id.clone(), world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_title: world_meta.world_title, world_subtitle: world_meta.world_subtitle, first_played_at: existing.first_played_at, last_played_at: saved_at, last_observed_play_time_ms: current_play_time_ms .max(existing.last_observed_play_time_ms), }); } else { ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: snapshot.user_id.clone(), world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_title: world_meta.world_title, world_subtitle: world_meta.world_subtitle, first_played_at: saved_at, last_played_at: saved_at, last_observed_play_time_ms: current_play_time_ms, }); } } if let Some(existing) = current_state { ctx.db .profile_dashboard_state() .user_id() .delete(&existing.user_id); ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: snapshot.user_id.clone(), wallet_balance: next_wallet_balance, total_play_time_ms: next_total_play_time_ms, created_at: existing.created_at, updated_at: saved_at, }); } else { ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: snapshot.user_id.clone(), wallet_balance: next_wallet_balance, total_play_time_ms: next_total_play_time_ms, created_at: saved_at, updated_at: saved_at, }); } } fn sync_profile_save_archive_from_snapshot( ctx: &ReducerContext, snapshot: &RuntimeSnapshot, game_state: &JsonValue, saved_at: Timestamp, ) -> Result<(), String> { let Some(archive_meta) = resolve_profile_save_archive_meta(game_state, snapshot.current_story_json.as_deref()) else { return Ok(()); }; let archive_id = format!("{}:{}", snapshot.user_id, archive_meta.world_key); let existing = ctx.db.profile_save_archive().archive_id().find(&archive_id); let created_at = existing .as_ref() .map(|row| row.created_at) .unwrap_or(saved_at); if let Some(existing) = existing { ctx.db .profile_save_archive() .archive_id() .delete(&existing.archive_id); } ctx.db.profile_save_archive().insert(ProfileSaveArchive { archive_id, user_id: snapshot.user_id.clone(), world_key: archive_meta.world_key, owner_user_id: archive_meta.owner_user_id, profile_id: archive_meta.profile_id, world_type: archive_meta.world_type, world_name: archive_meta.world_name, subtitle: archive_meta.subtitle, summary_text: archive_meta.summary_text, cover_image_src: archive_meta.cover_image_src, saved_at, bottom_tab: snapshot.bottom_tab.clone(), game_state_json: snapshot.game_state_json.clone(), current_story_json: snapshot.current_story_json.clone(), created_at, updated_at: saved_at, }); Ok(()) } #[derive(Clone, Debug)] struct ProfileWorldSnapshotMeta { world_key: String, owner_user_id: Option, profile_id: Option, world_type: Option, world_title: String, world_subtitle: String, } #[derive(Clone, Debug)] struct ProfileSaveArchiveMeta { world_key: String, owner_user_id: Option, profile_id: Option, world_type: Option, world_name: String, subtitle: String, summary_text: String, cover_image_src: Option, } pub(crate) fn build_profile_save_archive_snapshot_from_row( row: &ProfileSaveArchive, ) -> RuntimeProfileSaveArchiveSnapshot { RuntimeProfileSaveArchiveSnapshot { archive_id: row.archive_id.clone(), user_id: row.user_id.clone(), world_key: row.world_key.clone(), owner_user_id: row.owner_user_id.clone(), profile_id: row.profile_id.clone(), world_type: row.world_type.clone(), world_name: row.world_name.clone(), subtitle: row.subtitle.clone(), summary_text: row.summary_text.clone(), cover_image_src: row.cover_image_src.clone(), saved_at_micros: row.saved_at.to_micros_since_unix_epoch(), bottom_tab: row.bottom_tab.clone(), game_state_json: row.game_state_json.clone(), current_story_json: row.current_story_json.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn read_non_negative_u64(value: Option<&JsonValue>) -> u64 { match value { Some(JsonValue::Number(number)) => { if let Some(raw) = number.as_u64() { raw } else if let Some(raw) = number.as_i64() { raw.max(0) as u64 } else if let Some(raw) = number.as_f64() { if raw.is_finite() && raw > 0.0 { raw.floor() as u64 } else { 0 } } else { 0 } } Some(JsonValue::String(raw)) => raw.trim().parse::().ok().unwrap_or(0), _ => 0, } } fn read_string_from_json(value: Option<&JsonValue>) -> Option { value .and_then(JsonValue::as_str) .map(str::trim) .filter(|value| !value.is_empty()) .map(ToString::to_string) } fn resolve_profile_world_snapshot_meta( game_state: Option<&serde_json::Map>, ) -> Option { let game_state = game_state?; let custom_world_profile = game_state .get("customWorldProfile") .and_then(JsonValue::as_object); if let Some(custom_world_profile) = custom_world_profile { let profile_id = read_string_from_json(custom_world_profile.get("id")); let world_title = read_string_from_json(custom_world_profile.get("name")) .or_else(|| read_string_from_json(custom_world_profile.get("title"))); if profile_id.is_some() || world_title.is_some() { let world_title = world_title.unwrap_or_else(|| "自定义世界".to_string()); return Some(ProfileWorldSnapshotMeta { world_key: profile_id .as_ref() .map(|profile_id| format!("custom:{profile_id}")) .unwrap_or_else(|| format!("custom:{world_title}")), owner_user_id: None, profile_id, world_type: Some("CUSTOM".to_string()), world_title, world_subtitle: read_string_from_json(custom_world_profile.get("summary")) .or_else(|| read_string_from_json(custom_world_profile.get("settingText"))) .unwrap_or_default(), }); } } let world_type = read_string_from_json(game_state.get("worldType"))?; let current_scene_preset = game_state .get("currentScenePreset") .and_then(JsonValue::as_object); Some(ProfileWorldSnapshotMeta { world_key: format!("builtin:{world_type}"), owner_user_id: None, profile_id: None, world_type: Some(world_type.clone()), world_title: current_scene_preset .and_then(|preset| read_string_from_json(preset.get("name"))) .unwrap_or_else(|| build_builtin_world_title(&world_type)), world_subtitle: current_scene_preset .and_then(|preset| { read_string_from_json(preset.get("summary")) .or_else(|| read_string_from_json(preset.get("description"))) }) .unwrap_or_default(), }) } fn resolve_profile_save_archive_meta( game_state: &JsonValue, current_story_json: Option<&str>, ) -> Option { if is_non_persistent_runtime_snapshot(game_state) { return None; } let game_state_object = game_state.as_object(); let world_meta = resolve_profile_world_snapshot_meta(game_state_object)?; let story_engine_memory = game_state_object .and_then(|state| state.get("storyEngineMemory")) .and_then(JsonValue::as_object); let continue_game_digest = story_engine_memory .and_then(|memory| read_string_from_json(memory.get("continueGameDigest"))); let current_story_text = parse_optional_json_str(current_story_json) .ok() .flatten() .and_then(|story| story.as_object().cloned()) .and_then(|story| read_string_from_json(story.get("text"))); let custom_world_profile = game_state_object .and_then(|state| state.get("customWorldProfile")) .and_then(JsonValue::as_object); if let Some(custom_world_profile) = custom_world_profile { let world_name = read_string_from_json(custom_world_profile.get("name")) .or_else(|| read_string_from_json(custom_world_profile.get("title"))) .unwrap_or_else(|| world_meta.world_title.clone()); let subtitle = read_string_from_json(custom_world_profile.get("summary")) .or_else(|| read_string_from_json(custom_world_profile.get("settingText"))) .unwrap_or_else(|| world_meta.world_subtitle.clone()); let summary_text = continue_game_digest .or(current_story_text) .or_else(|| { if subtitle.is_empty() { None } else { Some(subtitle.clone()) } }) .unwrap_or_else(|| DEFAULT_SAVE_ARCHIVE_SUMMARY_TEXT.to_string()); return Some(ProfileSaveArchiveMeta { world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_name, subtitle, summary_text, cover_image_src: read_string_from_json(custom_world_profile.get("coverImageSrc")), }); } let summary_text = continue_game_digest .or(current_story_text) .or_else(|| { if world_meta.world_subtitle.is_empty() { None } else { Some(world_meta.world_subtitle.clone()) } }) .unwrap_or_else(|| DEFAULT_SAVE_ARCHIVE_SUMMARY_TEXT.to_string()); let current_scene_preset = game_state_object .and_then(|state| state.get("currentScenePreset")) .and_then(JsonValue::as_object); Some(ProfileSaveArchiveMeta { world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_name: world_meta.world_title, subtitle: world_meta.world_subtitle.clone(), summary_text, cover_image_src: current_scene_preset .and_then(|preset| read_string_from_json(preset.get("imageSrc"))), }) } fn is_non_persistent_runtime_snapshot(game_state: &JsonValue) -> bool { let Some(game_state) = game_state.as_object() else { return false; }; if game_state .get("runtimePersistenceDisabled") .and_then(JsonValue::as_bool) .unwrap_or(false) { return true; } matches!( read_string_from_json(game_state.get("runtimeMode")).as_deref(), Some("preview") | Some("test") ) } fn build_builtin_world_title(world_type: &str) -> String { match world_type { "WUXIA" => "武侠世界".to_string(), "XIANXIA" => "仙侠世界".to_string(), _ => "叙事世界".to_string(), } } fn get_profile_dashboard_snapshot( ctx: &ReducerContext, input: RuntimeProfileDashboardGetInput, ) -> Result { let validated_input = build_runtime_profile_dashboard_get_input(input.user_id) .map_err(|error| error.to_string())?; let state = ctx .db .profile_dashboard_state() .user_id() .find(&validated_input.user_id); let played_world_count = ctx .db .profile_played_world() .iter() .filter(|row| row.user_id == validated_input.user_id) .count() as u32; Ok(match state { Some(existing) => RuntimeProfileDashboardSnapshot { user_id: existing.user_id, wallet_balance: existing.wallet_balance, total_play_time_ms: existing.total_play_time_ms, played_world_count, updated_at_micros: Some(existing.updated_at.to_micros_since_unix_epoch()), }, None => RuntimeProfileDashboardSnapshot { user_id: validated_input.user_id, wallet_balance: 0, total_play_time_ms: 0, played_world_count, updated_at_micros: None, }, }) } fn list_profile_wallet_ledger_entries( ctx: &ReducerContext, input: RuntimeProfileWalletLedgerListInput, ) -> Result, String> { let validated_input = build_runtime_profile_wallet_ledger_list_input(input.user_id) .map_err(|error| error.to_string())?; let mut entries = ctx .db .profile_wallet_ledger() .iter() .filter(|row| row.user_id == validated_input.user_id) .map(|row| build_profile_wallet_ledger_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { right .created_at_micros .cmp(&left.created_at_micros) .then_with(|| left.wallet_ledger_id.cmp(&right.wallet_ledger_id)) }); entries.truncate(PROFILE_WALLET_LEDGER_LIST_LIMIT); Ok(entries) } fn get_profile_play_stats_snapshot( ctx: &ReducerContext, input: RuntimeProfilePlayStatsGetInput, ) -> Result { let validated_input = build_runtime_profile_play_stats_get_input(input.user_id) .map_err(|error| error.to_string())?; let dashboard_state = ctx .db .profile_dashboard_state() .user_id() .find(&validated_input.user_id); let mut played_works = ctx .db .profile_played_world() .iter() .filter(|row| row.user_id == validated_input.user_id) .map(|row| build_profile_played_world_snapshot_from_row(&row)) .collect::>(); played_works.sort_by(|left, right| { right .last_played_at_micros .cmp(&left.last_played_at_micros) .then_with(|| left.played_world_id.cmp(&right.played_world_id)) }); Ok(RuntimeProfilePlayStatsSnapshot { user_id: validated_input.user_id, total_play_time_ms: dashboard_state .as_ref() .map(|row| row.total_play_time_ms) .unwrap_or(0), played_works, updated_at_micros: dashboard_state .as_ref() .map(|row| row.updated_at.to_micros_since_unix_epoch()), }) } fn get_profile_recharge_center_snapshot( ctx: &ReducerContext, input: RuntimeProfileRechargeCenterGetInput, ) -> Result { let validated_input = build_runtime_profile_recharge_center_get_input(input.user_id) .map_err(|error| error.to_string())?; Ok(build_profile_recharge_center_snapshot( ctx, &validated_input.user_id, )) } fn create_profile_recharge_order_record( ctx: &ReducerContext, input: RuntimeProfileRechargeOrderCreateInput, ) -> Result< ( RuntimeProfileRechargeCenterSnapshot, RuntimeProfileRechargeOrderSnapshot, ), String, > { let validated_input = build_runtime_profile_recharge_order_create_input( input.user_id, input.product_id, input.payment_channel, input.created_at_micros, ) .map_err(|error| error.to_string())?; let product = runtime_profile_recharge_product_by_id(&validated_input.product_id) .ok_or_else(|| "recharge.product_id 不存在".to_string())?; let created_at = Timestamp::from_micros_since_unix_epoch(validated_input.created_at_micros); let (points_delta, membership_expires_at) = match product.kind { RuntimeProfileRechargeProductKind::Points => { let has_recharged = has_profile_points_recharged(ctx, &validated_input.user_id); let bonus_points = if has_recharged { 0 } else { product.bonus_points }; let points_delta = product.points_amount.saturating_add(bonus_points); apply_profile_wallet_delta( ctx, &validated_input.user_id, points_delta, RuntimeProfileWalletLedgerSourceType::PointsRecharge, &format!( "{}:{}:{}", validated_input.user_id, validated_input.created_at_micros, product.product_id ), created_at, )?; (points_delta as i64, None) } RuntimeProfileRechargeProductKind::Membership => { let expires_at = apply_profile_membership_purchase( ctx, &validated_input.user_id, product.tier, product.duration_days, created_at, ); (0, Some(expires_at)) } }; let order = ProfileRechargeOrder { order_id: format!( "recharge:{}:{}:{}", validated_input.user_id, validated_input.created_at_micros, product.product_id ), user_id: validated_input.user_id.clone(), product_id: product.product_id.clone(), product_title: product.title.clone(), kind: product.kind, amount_cents: product.price_cents, status: RuntimeProfileRechargeOrderStatus::Paid, payment_channel: validated_input.payment_channel, paid_at: created_at, created_at, points_delta, membership_expires_at, }; ctx.db.profile_recharge_order().insert(order); let latest_order = latest_profile_recharge_order(ctx, &validated_input.user_id) .ok_or_else(|| "profile_recharge_order 写入后未能读取".to_string())?; Ok(( build_profile_recharge_center_snapshot(ctx, &validated_input.user_id), build_profile_recharge_order_snapshot_from_row(&latest_order), )) } fn get_profile_referral_invite_center_snapshot( ctx: &ReducerContext, input: RuntimeReferralInviteCenterGetInput, ) -> Result { let validated_input = build_runtime_referral_invite_center_get_input(input.user_id) .map_err(|error| error.to_string())?; Ok(build_profile_referral_invite_center_snapshot( ctx, &validated_input.user_id, )) } fn redeem_profile_referral_invite_code_record( ctx: &ReducerContext, input: RuntimeReferralRedeemInput, ) -> Result { let validated_input = build_runtime_referral_redeem_input( input.user_id, input.invite_code, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let bound_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let invitee_user_id = validated_input.user_id; let invite_code = validated_input.invite_code; if ctx .db .profile_referral_relation() .invitee_user_id() .find(&invitee_user_id) .is_some() { return Err("每个用户最多只能填写一个邀请码".to_string()); } let inviter_code = ctx .db .profile_invite_code() .invite_code() .find(&invite_code) .ok_or_else(|| "邀请码不存在".to_string())?; if inviter_code.user_id == invitee_user_id { return Err("不能填写自己的邀请码".to_string()); } let invitee_balance_after = apply_profile_wallet_delta( ctx, &invitee_user_id, PROFILE_REFERRAL_REWARD_POINTS, RuntimeProfileWalletLedgerSourceType::InviteInviteeReward, &format!( "invitee:{}:{}", invitee_user_id, validated_input.updated_at_micros ), bound_at, )?; let today_inviter_reward_count = count_today_profile_referral_inviter_rewards(ctx, &inviter_code.user_id, bound_at); let inviter_reward_granted = today_inviter_reward_count < PROFILE_REFERRAL_DAILY_INVITER_REWARD_LIMIT; let inviter_balance_after = if inviter_reward_granted { apply_profile_wallet_delta( ctx, &inviter_code.user_id, PROFILE_REFERRAL_REWARD_POINTS, RuntimeProfileWalletLedgerSourceType::InviteInviterReward, &format!( "inviter:{}:{}", inviter_code.user_id, validated_input.updated_at_micros ), bound_at, )? } else { profile_wallet_balance(ctx, &inviter_code.user_id) }; ctx.db .profile_referral_relation() .insert(ProfileReferralRelation { invitee_user_id: invitee_user_id.clone(), inviter_user_id: inviter_code.user_id, invite_code, inviter_reward_granted, invitee_reward_granted: true, bound_at, }); Ok(RuntimeReferralRedeemSnapshot { center: build_profile_referral_invite_center_snapshot(ctx, &invitee_user_id), invitee_reward_granted: true, inviter_reward_granted, invitee_balance_after, inviter_balance_after, }) } fn redeem_profile_reward_code_record( ctx: &ReducerContext, input: RuntimeProfileRewardCodeRedeemInput, ) -> Result { let validated_input = build_runtime_profile_reward_code_redeem_input( input.user_id, input.code, input.redeemed_at_micros, ) .map_err(|error| error.to_string())?; let redeemed_at = Timestamp::from_micros_since_unix_epoch(validated_input.redeemed_at_micros); let user_id = validated_input.user_id; let code = validated_input.code; let redeem_code = ctx .db .profile_redeem_code() .code() .find(&code) .ok_or_else(|| "兑换码不存在".to_string())?; if !redeem_code.enabled { return Err("兑换码已停用".to_string()); } if redeem_code.reward_points == 0 { return Err("兑换码奖励无效".to_string()); } let user_used_count = count_profile_redeem_code_user_usage(ctx, &code, &user_id); match redeem_code.mode { RuntimeProfileRedeemCodeMode::Public if user_used_count >= redeem_code.max_uses => { return Err("兑换次数已用完".to_string()); } RuntimeProfileRedeemCodeMode::Unique if redeem_code.global_used_count >= redeem_code.max_uses => { return Err("兑换次数已用完".to_string()); } RuntimeProfileRedeemCodeMode::Private => { if !redeem_code .allowed_user_ids .iter() .any(|item| item == &user_id) { return Err("该兑换码不适用于当前账号".to_string()); } if redeem_code.global_used_count >= redeem_code.max_uses { return Err("兑换次数已用完".to_string()); } } _ => {} } let usage_id = build_profile_redeem_code_usage_id( ctx, &code, &user_id, validated_input.redeemed_at_micros, ); let wallet_ledger_id = format!("{}:ledger", usage_id); let wallet_balance = apply_profile_wallet_delta( ctx, &user_id, redeem_code.reward_points, RuntimeProfileWalletLedgerSourceType::RedeemCodeReward, &wallet_ledger_id, redeemed_at, )?; ctx.db .profile_redeem_code_usage() .insert(ProfileRedeemCodeUsage { usage_id, code: code.clone(), user_id, amount_granted: redeem_code.reward_points, created_at: redeemed_at, }); let next_code = ProfileRedeemCode { global_used_count: redeem_code.global_used_count.saturating_add(1), updated_at: redeemed_at, ..redeem_code }; ctx.db.profile_redeem_code().code().delete(&code); ctx.db.profile_redeem_code().insert(next_code); let ledger_entry = ctx .db .profile_wallet_ledger() .wallet_ledger_id() .find(&wallet_ledger_id) .ok_or_else(|| "兑换码钱包流水写入失败".to_string())?; Ok(RuntimeProfileRewardCodeRedeemSnapshot { wallet_balance, amount_granted: ledger_entry.amount_delta.max(0) as u64, ledger_entry: build_profile_wallet_ledger_snapshot_from_row(&ledger_entry), }) } fn admin_upsert_profile_redeem_code_record( ctx: &ReducerContext, input: RuntimeProfileRedeemCodeAdminUpsertInput, ) -> Result { let validated_input = build_runtime_profile_redeem_code_admin_upsert_input( input.admin_user_id, input.code, input.mode, input.reward_points, input.max_uses, input.enabled, input.allowed_user_ids, input.allowed_public_user_codes, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let allowed_user_ids = resolve_profile_redeem_code_allowed_user_ids(ctx, &validated_input)?; let existing = ctx .db .profile_redeem_code() .code() .find(&validated_input.code); let created_at = existing .as_ref() .map(|row| row.created_at) .unwrap_or(updated_at); let global_used_count = existing .as_ref() .map(|row| row.global_used_count) .unwrap_or(0); if let Some(existing) = existing { ctx.db.profile_redeem_code().code().delete(&existing.code); } let row = ProfileRedeemCode { code: validated_input.code, mode: validated_input.mode, reward_points: validated_input.reward_points, max_uses: validated_input.max_uses, global_used_count, enabled: validated_input.enabled, allowed_user_ids, created_by: validated_input.admin_user_id, created_at, updated_at, }; let inserted = ctx.db.profile_redeem_code().insert(row); Ok(build_profile_redeem_code_snapshot_from_row(&inserted)) } fn admin_disable_profile_redeem_code_record( ctx: &ReducerContext, input: RuntimeProfileRedeemCodeAdminDisableInput, ) -> Result { let validated_input = build_runtime_profile_redeem_code_admin_disable_input( input.admin_user_id, input.code, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let existing = ctx .db .profile_redeem_code() .code() .find(&validated_input.code) .ok_or_else(|| "兑换码不存在".to_string())?; ctx.db.profile_redeem_code().code().delete(&existing.code); let inserted = ctx.db.profile_redeem_code().insert(ProfileRedeemCode { enabled: false, updated_at, ..existing }); Ok(build_profile_redeem_code_snapshot_from_row(&inserted)) } fn build_profile_referral_invite_center_snapshot( ctx: &ReducerContext, user_id: &str, ) -> RuntimeReferralInviteCenterSnapshot { let code = ensure_profile_invite_code(ctx, user_id); let today_inviter_reward_count = count_today_profile_referral_inviter_rewards(ctx, user_id, ctx.timestamp); let invited_count = ctx .db .profile_referral_relation() .iter() .filter(|row| row.inviter_user_id == user_id) .count() as u32; let rewarded_invite_count = ctx .db .profile_referral_relation() .iter() .filter(|row| row.inviter_user_id == user_id && row.inviter_reward_granted) .count() as u32; let bound_relation = ctx .db .profile_referral_relation() .invitee_user_id() .find(&user_id.to_string()); RuntimeReferralInviteCenterSnapshot { user_id: user_id.to_string(), invite_code: code.invite_code.clone(), invite_link_path: format!("/?inviteCode={}", code.invite_code), invited_count, rewarded_invite_count, today_inviter_reward_count, today_inviter_reward_remaining: PROFILE_REFERRAL_DAILY_INVITER_REWARD_LIMIT .saturating_sub(today_inviter_reward_count), reward_points: PROFILE_REFERRAL_REWARD_POINTS, has_redeemed_code: bound_relation.is_some(), bound_inviter_user_id: bound_relation .as_ref() .map(|relation| relation.inviter_user_id.clone()), bound_at_micros: bound_relation .as_ref() .map(|relation| relation.bound_at.to_micros_since_unix_epoch()), updated_at_micros: code.updated_at.to_micros_since_unix_epoch(), } } fn ensure_profile_invite_code(ctx: &ReducerContext, user_id: &str) -> ProfileInviteCode { if let Some(row) = ctx .db .profile_invite_code() .user_id() .find(&user_id.to_string()) { return row; } let mut invite_code = build_profile_invite_code(user_id, 0); let mut salt = 1; while ctx .db .profile_invite_code() .invite_code() .find(&invite_code) .is_some() { invite_code = build_profile_invite_code(user_id, salt); salt += 1; } ctx.db.profile_invite_code().insert(ProfileInviteCode { user_id: user_id.to_string(), invite_code, created_at: ctx.timestamp, updated_at: ctx.timestamp, }) } fn build_profile_invite_code(user_id: &str, salt: u32) -> String { let mut hash = 14_695_981_039_346_656_037u64; for byte in user_id.as_bytes().iter().copied().chain(salt.to_le_bytes()) { hash ^= byte as u64; hash = hash.wrapping_mul(1_099_511_628_211); } format!("SY{:08X}", hash as u32) } fn count_today_profile_referral_inviter_rewards( ctx: &ReducerContext, user_id: &str, now: Timestamp, ) -> u32 { let day_start_micros = (now.to_micros_since_unix_epoch() / 86_400_000_000) * 86_400_000_000; ctx.db .profile_wallet_ledger() .iter() .filter(|row| { row.user_id == user_id && row.source_type == RuntimeProfileWalletLedgerSourceType::InviteInviterReward && row.created_at.to_micros_since_unix_epoch() >= day_start_micros }) .count() as u32 } fn profile_wallet_balance(ctx: &ReducerContext, user_id: &str) -> u64 { ctx.db .profile_dashboard_state() .user_id() .find(&user_id.to_string()) .map(|row| row.wallet_balance) .unwrap_or(0) } fn build_profile_recharge_center_snapshot( ctx: &ReducerContext, user_id: &str, ) -> RuntimeProfileRechargeCenterSnapshot { let wallet_balance = ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()) .map(|row| row.wallet_balance) .unwrap_or(0); RuntimeProfileRechargeCenterSnapshot { user_id: user_id.to_string(), wallet_balance, membership: build_profile_membership_snapshot(ctx, user_id), point_products: runtime_profile_recharge_point_products(), membership_products: runtime_profile_recharge_membership_products(), benefits: runtime_profile_membership_benefits(), latest_order: latest_profile_recharge_order(ctx, user_id) .map(|row| build_profile_recharge_order_snapshot_from_row(&row)), has_points_recharged: has_profile_points_recharged(ctx, user_id), } } fn build_profile_membership_snapshot( ctx: &ReducerContext, user_id: &str, ) -> RuntimeProfileMembershipSnapshot { let now_micros = ctx.timestamp.to_micros_since_unix_epoch(); let membership = ctx .db .profile_membership() .user_id() .find(&user_id.to_string()); match membership { Some(row) if row.expires_at.to_micros_since_unix_epoch() > now_micros => { RuntimeProfileMembershipSnapshot { user_id: row.user_id, status: row.status, tier: row.tier, started_at_micros: Some(row.started_at.to_micros_since_unix_epoch()), expires_at_micros: Some(row.expires_at.to_micros_since_unix_epoch()), updated_at_micros: Some(row.updated_at.to_micros_since_unix_epoch()), } } Some(row) => RuntimeProfileMembershipSnapshot { user_id: row.user_id, status: RuntimeProfileMembershipStatus::Normal, tier: RuntimeProfileMembershipTier::Normal, started_at_micros: Some(row.started_at.to_micros_since_unix_epoch()), expires_at_micros: Some(row.expires_at.to_micros_since_unix_epoch()), updated_at_micros: Some(row.updated_at.to_micros_since_unix_epoch()), }, None => RuntimeProfileMembershipSnapshot { user_id: user_id.to_string(), status: RuntimeProfileMembershipStatus::Normal, tier: RuntimeProfileMembershipTier::Normal, started_at_micros: None, expires_at_micros: None, updated_at_micros: None, }, } } fn apply_profile_membership_purchase( ctx: &ReducerContext, user_id: &str, tier: RuntimeProfileMembershipTier, duration_days: u32, purchased_at: Timestamp, ) -> Timestamp { let current = ctx .db .profile_membership() .user_id() .find(&user_id.to_string()); let purchased_at_micros = purchased_at.to_micros_since_unix_epoch(); let start_at_micros = current .as_ref() .map(|row| row.expires_at.to_micros_since_unix_epoch()) .filter(|expires_at_micros| *expires_at_micros > purchased_at_micros) .unwrap_or(purchased_at_micros); let expires_at = Timestamp::from_micros_since_unix_epoch( start_at_micros.saturating_add(duration_days as i64 * 86_400_000_000), ); let created_at = current .as_ref() .map(|row| row.started_at) .unwrap_or(purchased_at); if let Some(existing) = current { ctx.db .profile_membership() .user_id() .delete(&existing.user_id); } ctx.db.profile_membership().insert(ProfileMembership { user_id: user_id.to_string(), status: RuntimeProfileMembershipStatus::Active, tier, started_at: created_at, expires_at, updated_at: purchased_at, }); expires_at } fn apply_profile_wallet_delta( ctx: &ReducerContext, user_id: &str, amount_delta: u64, source_type: RuntimeProfileWalletLedgerSourceType, ledger_id: &str, created_at: Timestamp, ) -> Result { let amount_delta = i64::try_from(amount_delta).map_err(|_| "profile.wallet_amount 超出上限".to_string())?; apply_profile_wallet_signed_delta( ctx, user_id, amount_delta, source_type, ledger_id, created_at, false, ) } fn apply_profile_wallet_adjustment( ctx: &ReducerContext, input: RuntimeProfileWalletAdjustmentInput, source_type: RuntimeProfileWalletLedgerSourceType, consume: bool, ) -> Result { let validated_input = build_runtime_profile_wallet_adjustment_input( input.user_id, input.amount, input.ledger_id, input.created_at_micros, ) .map_err(|error| error.to_string())?; let created_at = Timestamp::from_micros_since_unix_epoch(validated_input.created_at_micros); let amount_delta = if consume { -(validated_input.amount as i64) } else { validated_input.amount as i64 }; apply_profile_wallet_signed_delta( ctx, &validated_input.user_id, amount_delta, source_type, &validated_input.ledger_id, created_at, true, )?; get_profile_dashboard_snapshot( ctx, RuntimeProfileDashboardGetInput { user_id: validated_input.user_id, }, ) } fn apply_profile_wallet_signed_delta( ctx: &ReducerContext, user_id: &str, amount_delta: i64, source_type: RuntimeProfileWalletLedgerSourceType, ledger_id: &str, created_at: Timestamp, idempotent: bool, ) -> Result { if idempotent && ctx .db .profile_wallet_ledger() .wallet_ledger_id() .find(&ledger_id.to_string()) .is_some() { return Ok(profile_wallet_balance(ctx, user_id)); } let current = ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()); let previous_balance = current.as_ref().map(|row| row.wallet_balance).unwrap_or(0); let next_balance = if amount_delta >= 0 { previous_balance .checked_add(amount_delta as u64) .ok_or_else(|| "profile.wallet_balance 超出上限".to_string())? } else { previous_balance .checked_sub(amount_delta.unsigned_abs()) .ok_or_else(|| "叙世币余额不足".to_string())? }; let created_state_at = current .as_ref() .map(|row| row.created_at) .unwrap_or(created_at); if let Some(existing) = current { ctx.db .profile_dashboard_state() .user_id() .delete(&existing.user_id); ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: next_balance, total_play_time_ms: existing.total_play_time_ms, created_at: existing.created_at, updated_at: created_at, }); } else { ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: next_balance, total_play_time_ms: 0, created_at: created_state_at, updated_at: created_at, }); } ctx.db.profile_wallet_ledger().insert(ProfileWalletLedger { wallet_ledger_id: ledger_id.to_string(), user_id: user_id.to_string(), amount_delta, balance_after: next_balance, source_type, created_at, }); Ok(next_balance) } fn has_profile_points_recharged(ctx: &ReducerContext, user_id: &str) -> bool { ctx.db.profile_wallet_ledger().iter().any(|row| { row.user_id == user_id && row.source_type == RuntimeProfileWalletLedgerSourceType::PointsRecharge }) } fn latest_profile_recharge_order( ctx: &ReducerContext, user_id: &str, ) -> Option { let mut orders = ctx .db .profile_recharge_order() .iter() .filter(|row| row.user_id == user_id) .collect::>(); orders.sort_by(|left, right| { right .created_at .to_micros_since_unix_epoch() .cmp(&left.created_at.to_micros_since_unix_epoch()) .then_with(|| left.order_id.cmp(&right.order_id)) }); orders.into_iter().next() } fn count_profile_redeem_code_user_usage(ctx: &ReducerContext, code: &str, user_id: &str) -> u32 { ctx.db .profile_redeem_code_usage() .by_profile_redeem_code_usage_code_user_id() .filter((code, user_id)) .count() as u32 } fn build_profile_redeem_code_usage_id( ctx: &ReducerContext, code: &str, user_id: &str, redeemed_at_micros: i64, ) -> String { let sequence = count_profile_redeem_code_user_usage(ctx, code, user_id); format!( "redeem:{}:{}:{}:{}", code, user_id, redeemed_at_micros, sequence ) } fn resolve_profile_redeem_code_allowed_user_ids( ctx: &ReducerContext, input: &RuntimeProfileRedeemCodeAdminUpsertInput, ) -> Result, String> { if input.mode != RuntimeProfileRedeemCodeMode::Private { return Ok(Vec::new()); } let mut allowed_user_ids = input.allowed_user_ids.clone(); for public_user_code in &input.allowed_public_user_codes { if let Some(account) = ctx .db .user_account() .by_user_account_public_code() .filter(public_user_code) .next() { allowed_user_ids.push(account.user_id); } } allowed_user_ids.sort(); allowed_user_ids.dedup(); if allowed_user_ids.is_empty() { return Err("私有兑换码必须指定可兑换用户".to_string()); } Ok(allowed_user_ids) } fn build_profile_redeem_code_snapshot_from_row( row: &ProfileRedeemCode, ) -> RuntimeProfileRedeemCodeSnapshot { RuntimeProfileRedeemCodeSnapshot { code: row.code.clone(), mode: row.mode, reward_points: row.reward_points, max_uses: row.max_uses, global_used_count: row.global_used_count, enabled: row.enabled, allowed_user_ids: row.allowed_user_ids.clone(), created_by: row.created_by.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn build_profile_wallet_ledger_snapshot_from_row( row: &ProfileWalletLedger, ) -> RuntimeProfileWalletLedgerEntrySnapshot { RuntimeProfileWalletLedgerEntrySnapshot { wallet_ledger_id: row.wallet_ledger_id.clone(), user_id: row.user_id.clone(), amount_delta: row.amount_delta, balance_after: row.balance_after, source_type: row.source_type, created_at_micros: row.created_at.to_micros_since_unix_epoch(), } } fn build_profile_recharge_order_snapshot_from_row( row: &ProfileRechargeOrder, ) -> RuntimeProfileRechargeOrderSnapshot { RuntimeProfileRechargeOrderSnapshot { order_id: row.order_id.clone(), user_id: row.user_id.clone(), product_id: row.product_id.clone(), product_title: row.product_title.clone(), kind: row.kind, amount_cents: row.amount_cents, status: row.status, payment_channel: row.payment_channel.clone(), paid_at_micros: row.paid_at.to_micros_since_unix_epoch(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), points_delta: row.points_delta, membership_expires_at_micros: row .membership_expires_at .map(|value| value.to_micros_since_unix_epoch()), } } fn build_profile_played_world_snapshot_from_row( row: &ProfilePlayedWorld, ) -> RuntimeProfilePlayedWorldSnapshot { RuntimeProfilePlayedWorldSnapshot { played_world_id: row.played_world_id.clone(), user_id: row.user_id.clone(), world_key: row.world_key.clone(), owner_user_id: row.owner_user_id.clone(), profile_id: row.profile_id.clone(), world_type: row.world_type.clone(), world_title: row.world_title.clone(), world_subtitle: row.world_subtitle.clone(), first_played_at_micros: row.first_played_at.to_micros_since_unix_epoch(), last_played_at_micros: row.last_played_at.to_micros_since_unix_epoch(), last_observed_play_time_ms: row.last_observed_play_time_ms, } }