use crate::*; const PUBLIC_WORK_PLAY_DAY_MICROS: i64 = 86_400_000_000; const PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS: i64 = 7; const PROFILE_REFERRAL_INVITED_USERS_LIMIT: usize = 20; const PROFILE_NEW_USER_REGISTRATION_LEDGER_PREFIX: &str = "new-user-registration"; const PROFILE_TASK_SYSTEM_USER_ID: &str = "system:profile-task"; const PROFILE_TASK_LOGIN_EVENT_ID_PREFIX: &str = "daily-login"; const PROFILE_TRACKING_PROFILE_MODULE_KEY: &str = "profile"; #[spacetimedb::table(accessor = profile_dashboard_state)] pub struct ProfileDashboardState { #[primary_key] pub(crate) user_id: String, pub(crate) wallet_balance: u64, pub(crate) total_play_time_ms: u64, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_wallet_ledger, index(accessor = by_profile_wallet_ledger_user_id, btree(columns = [user_id])), index( accessor = by_profile_wallet_ledger_user_created_at, btree(columns = [user_id, created_at]) ) )] pub struct ProfileWalletLedger { #[primary_key] pub(crate) wallet_ledger_id: String, pub(crate) user_id: String, pub(crate) amount_delta: i64, pub(crate) balance_after: u64, pub(crate) source_type: RuntimeProfileWalletLedgerSourceType, pub(crate) created_at: Timestamp, } #[spacetimedb::table( accessor = tracking_event, index(accessor = by_tracking_event_event_key, btree(columns = [event_key])), index( accessor = by_tracking_event_scope, btree(columns = [scope_kind, scope_id]) ), index( accessor = by_tracking_event_user, btree(columns = [user_id, occurred_at]) ) )] pub struct TrackingEvent { #[primary_key] pub(crate) event_id: String, pub(crate) event_key: String, pub(crate) scope_kind: RuntimeTrackingScopeKind, pub(crate) scope_id: String, pub(crate) day_key: i64, pub(crate) user_id: Option, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) module_key: Option, pub(crate) metadata_json: String, pub(crate) occurred_at: Timestamp, } #[spacetimedb::table( accessor = tracking_daily_stat, index( accessor = by_tracking_daily_stat_event_day, btree(columns = [event_key, day_key]) ), index( accessor = by_tracking_daily_stat_scope_day, btree(columns = [scope_kind, scope_id, day_key]) ) )] pub struct TrackingDailyStat { #[primary_key] pub(crate) stat_id: String, pub(crate) event_key: String, pub(crate) scope_kind: RuntimeTrackingScopeKind, pub(crate) scope_id: String, pub(crate) day_key: i64, pub(crate) count: u32, pub(crate) first_occurred_at: Timestamp, pub(crate) last_occurred_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table(accessor = profile_task_config)] pub struct ProfileTaskConfig { #[primary_key] pub(crate) task_id: String, pub(crate) title: String, pub(crate) description: String, pub(crate) event_key: String, pub(crate) cycle: RuntimeProfileTaskCycle, pub(crate) scope_kind: RuntimeTrackingScopeKind, pub(crate) threshold: u32, pub(crate) reward_points: u64, pub(crate) enabled: bool, pub(crate) sort_order: i32, pub(crate) created_by: String, pub(crate) created_at: Timestamp, pub(crate) updated_by: String, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_task_progress, index(accessor = by_profile_task_progress_user, btree(columns = [user_id])), index( accessor = by_profile_task_progress_user_task, btree(columns = [user_id, task_id]) ) )] pub struct ProfileTaskProgress { #[primary_key] pub(crate) progress_id: String, pub(crate) user_id: String, pub(crate) task_id: String, pub(crate) day_key: i64, pub(crate) progress_count: u32, pub(crate) threshold: u32, pub(crate) status: RuntimeProfileTaskStatus, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_task_reward_claim, index(accessor = by_profile_task_claim_user, btree(columns = [user_id])), index( accessor = by_profile_task_claim_user_task, btree(columns = [user_id, task_id]) ) )] pub struct ProfileTaskRewardClaim { #[primary_key] pub(crate) claim_id: String, pub(crate) user_id: String, pub(crate) task_id: String, pub(crate) day_key: i64, pub(crate) reward_points: u64, pub(crate) wallet_ledger_id: String, pub(crate) claimed_at: Timestamp, } #[spacetimedb::table(accessor = profile_redeem_code)] pub struct ProfileRedeemCode { #[primary_key] pub(crate) code: String, pub(crate) mode: RuntimeProfileRedeemCodeMode, pub(crate) reward_points: u64, pub(crate) max_uses: u32, pub(crate) global_used_count: u32, pub(crate) enabled: bool, pub(crate) allowed_user_ids: Vec, pub(crate) created_by: String, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_redeem_code_usage, index(accessor = by_profile_redeem_code_usage_code, btree(columns = [code])), index(accessor = by_profile_redeem_code_usage_user_id, btree(columns = [user_id])), index( accessor = by_profile_redeem_code_usage_code_user_id, btree(columns = [code, user_id]) ) )] pub struct ProfileRedeemCodeUsage { #[primary_key] pub(crate) usage_id: String, pub(crate) code: String, pub(crate) user_id: String, pub(crate) amount_granted: u64, pub(crate) created_at: Timestamp, } #[spacetimedb::table(accessor = profile_invite_code)] pub struct ProfileInviteCode { #[primary_key] pub(crate) user_id: String, #[unique] pub(crate) invite_code: String, pub(crate) metadata_json: String, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, #[default(None::)] pub(crate) starts_at: Option, #[default(None::)] pub(crate) expires_at: Option, } #[spacetimedb::table( accessor = profile_referral_relation, index(accessor = by_profile_referral_inviter_user_id, btree(columns = [inviter_user_id])), index( accessor = by_profile_referral_inviter_bound_at, btree(columns = [inviter_user_id, bound_at]) ) )] pub struct ProfileReferralRelation { #[primary_key] pub(crate) invitee_user_id: String, pub(crate) inviter_user_id: String, pub(crate) invite_code: String, pub(crate) inviter_reward_granted: bool, pub(crate) invitee_reward_granted: bool, pub(crate) bound_at: Timestamp, } #[spacetimedb::table( accessor = profile_played_world, index(accessor = by_profile_played_world_user_id, btree(columns = [user_id])), index( accessor = by_profile_played_world_user_world_key, btree(columns = [user_id, world_key]) ), index( accessor = by_profile_played_world_user_last_played_at, btree(columns = [user_id, last_played_at]) ) )] pub struct ProfilePlayedWorld { #[primary_key] pub(crate) played_world_id: String, pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_title: String, pub(crate) world_subtitle: String, pub(crate) first_played_at: Timestamp, pub(crate) last_played_at: Timestamp, pub(crate) last_observed_play_time_ms: u64, } #[spacetimedb::table( accessor = public_work_play_daily_stat, index( accessor = by_public_work_play_daily_stat_work_day, btree(columns = [source_type, profile_id, played_day]) ) )] pub struct PublicWorkPlayDailyStat { #[primary_key] pub(crate) stat_id: String, // 中文注释:source_type 区分 custom-world / puzzle / big-fish,避免不同玩法 profile_id 撞桶。 pub(crate) source_type: String, pub(crate) owner_user_id: String, pub(crate) profile_id: String, // 中文注释:UTC 自 Unix 纪元起的自然日桶,用于快速聚合近 7 日新增游玩次数。 pub(crate) played_day: i64, pub(crate) play_count: u32, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = public_work_like, index(accessor = by_public_work_like_work, btree(columns = [source_type, profile_id])), index(accessor = by_public_work_like_user, btree(columns = [user_id])) )] pub struct PublicWorkLike { #[primary_key] pub(crate) like_id: String, // 中文注释:source_type 与 play 统计保持同一套作品类型命名,确保跨玩法 profile_id 不会互相冲突。 pub(crate) source_type: String, pub(crate) owner_user_id: String, pub(crate) profile_id: String, pub(crate) user_id: String, pub(crate) liked_at: Timestamp, } pub(crate) struct ProfilePlayedWorkUpsertInput { pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_title: String, pub(crate) world_subtitle: String, pub(crate) played_at_micros: i64, } pub(crate) struct PublicWorkPlayRecordInput { pub(crate) source_type: String, pub(crate) owner_user_id: String, pub(crate) profile_id: String, pub(crate) played_at_micros: i64, } pub(crate) struct PublicWorkLikeRecordInput { pub(crate) source_type: String, pub(crate) owner_user_id: String, pub(crate) profile_id: String, pub(crate) user_id: String, pub(crate) liked_at_micros: i64, } pub(crate) struct ProfileSaveArchiveUpsertInput { pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_name: String, pub(crate) subtitle: String, pub(crate) summary_text: String, pub(crate) cover_image_src: Option, pub(crate) bottom_tab: String, pub(crate) game_state_json: String, pub(crate) current_story_json: Option, pub(crate) saved_at_micros: i64, } #[spacetimedb::table(accessor = profile_membership)] pub struct ProfileMembership { #[primary_key] pub(crate) user_id: String, pub(crate) status: RuntimeProfileMembershipStatus, pub(crate) tier: RuntimeProfileMembershipTier, pub(crate) started_at: Timestamp, pub(crate) expires_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_recharge_order, index(accessor = by_profile_recharge_order_user_id, btree(columns = [user_id])), index( accessor = by_profile_recharge_order_user_created_at, btree(columns = [user_id, created_at]) ) )] pub struct ProfileRechargeOrder { #[primary_key] pub(crate) order_id: String, pub(crate) user_id: String, pub(crate) product_id: String, pub(crate) product_title: String, pub(crate) kind: RuntimeProfileRechargeProductKind, pub(crate) amount_cents: u64, pub(crate) status: RuntimeProfileRechargeOrderStatus, pub(crate) payment_channel: String, pub(crate) paid_at: Timestamp, pub(crate) created_at: Timestamp, pub(crate) points_delta: i64, pub(crate) membership_expires_at: Option, } #[spacetimedb::table( accessor = profile_feedback_submission, index(accessor = by_profile_feedback_user_id, btree(columns = [user_id])), index( accessor = by_profile_feedback_user_created_at, btree(columns = [user_id, created_at]) ) )] pub struct ProfileFeedbackSubmission { #[primary_key] pub(crate) feedback_id: String, pub(crate) user_id: String, pub(crate) description: String, pub(crate) contact_phone: Option, // 中文注释:首版凭证以 Data URL 写入私有表,HTTP 回包只返回元数据,后续迁 OSS 不改变外部契约。 pub(crate) evidence_json: String, pub(crate) status: RuntimeProfileFeedbackStatus, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } #[spacetimedb::table( accessor = profile_save_archive, index(accessor = by_profile_save_archive_user_id, btree(columns = [user_id])), index( accessor = by_profile_save_archive_user_world_key, btree(columns = [user_id, world_key]) ), index( accessor = by_profile_save_archive_user_saved_at, btree(columns = [user_id, saved_at]) ) )] pub struct ProfileSaveArchive { #[primary_key] pub(crate) archive_id: String, pub(crate) user_id: String, pub(crate) world_key: String, pub(crate) owner_user_id: Option, pub(crate) profile_id: Option, pub(crate) world_type: Option, pub(crate) world_name: String, pub(crate) subtitle: String, pub(crate) summary_text: String, pub(crate) cover_image_src: Option, pub(crate) saved_at: Timestamp, pub(crate) bottom_tab: String, pub(crate) game_state_json: String, pub(crate) current_story_json: Option, pub(crate) created_at: Timestamp, pub(crate) updated_at: Timestamp, } // save archive 列表是按世界聚合后的最近一次快照视图,读取时只做排序,不再拼装默认值。 #[spacetimedb::procedure] pub fn list_profile_save_archives( ctx: &mut ProcedureContext, input: RuntimeProfileSaveArchiveListInput, ) -> RuntimeProfileSaveArchiveProcedureResult { match ctx.try_with_tx(|tx| list_profile_save_archive_rows(tx, input.clone())) { Ok(entries) => RuntimeProfileSaveArchiveProcedureResult { ok: true, entries, record: None, current_snapshot: None, error_message: None, }, Err(message) => RuntimeProfileSaveArchiveProcedureResult { ok: false, entries: Vec::new(), record: None, current_snapshot: None, error_message: Some(message), }, } } // resume 会把指定 archive 回填到当前 snapshot,并同步返回 entry + 当前 snapshot。 #[spacetimedb::procedure] pub fn resume_profile_save_archive_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileSaveArchiveResumeInput, ) -> RuntimeProfileSaveArchiveProcedureResult { match ctx.try_with_tx(|tx| resume_profile_save_archive_record(tx, input.clone())) { Ok((record, current_snapshot)) => RuntimeProfileSaveArchiveProcedureResult { ok: true, entries: Vec::new(), record: Some(record), current_snapshot: Some(current_snapshot), error_message: None, }, Err(message) => RuntimeProfileSaveArchiveProcedureResult { ok: false, entries: Vec::new(), record: None, current_snapshot: None, error_message: Some(message), }, } } // profile dashboard 当前先作为 projection 读入口返回默认零值,等待 runtime_snapshot 写链补齐刷新。 #[spacetimedb::procedure] pub fn get_profile_dashboard( ctx: &mut ProcedureContext, input: RuntimeProfileDashboardGetInput, ) -> RuntimeProfileDashboardProcedureResult { match ctx.try_with_tx(|tx| get_profile_dashboard_snapshot(tx, input.clone())) { Ok(record) => RuntimeProfileDashboardProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileDashboardProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 钱包流水当前只暴露最近 50 条只读视图,排序与截断逻辑在 procedure 内统一收口。 #[spacetimedb::procedure] pub fn list_profile_wallet_ledger( ctx: &mut ProcedureContext, input: RuntimeProfileWalletLedgerListInput, ) -> RuntimeProfileWalletLedgerProcedureResult { match ctx.try_with_tx(|tx| list_profile_wallet_ledger_entries(tx, input.clone())) { Ok(entries) => RuntimeProfileWalletLedgerProcedureResult { ok: true, entries, error_message: None, }, Err(message) => RuntimeProfileWalletLedgerProcedureResult { ok: false, entries: Vec::new(), error_message: Some(message), }, } } // analytics metric 查询直接聚合 tracking_daily_stat,避免 API 层订阅全量表后自行汇总。 #[spacetimedb::procedure] pub fn query_analytics_metric( ctx: &mut ProcedureContext, input: AnalyticsMetricQueryInput, ) -> AnalyticsMetricQueryProcedureResult { match ctx.try_with_tx(|tx| query_analytics_metric_buckets(tx, input.clone())) { Ok(buckets) => AnalyticsMetricQueryProcedureResult { ok: true, buckets, error_message: None, }, Err(message) => AnalyticsMetricQueryProcedureResult { ok: false, buckets: Vec::new(), error_message: Some(message), }, } } // 登录成功埋点由认证链路主动调用;任务中心只负责读取和刷新任务进度。 #[spacetimedb::procedure] pub fn record_daily_login_tracking_event_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileTaskCenterGetInput, ) -> RuntimeTrackingEventProcedureResult { match ctx.try_with_tx(|tx| { let validated_input = build_runtime_profile_task_center_get_input(input.user_id.clone()) .map_err(|error| error.to_string())?; ensure_default_profile_task_config(tx); record_daily_login_tracking_event(tx, &validated_input.user_id) }) { Ok(()) => RuntimeTrackingEventProcedureResult { ok: true, error_message: None, }, Err(message) => RuntimeTrackingEventProcedureResult { ok: false, error_message: Some(message), }, } } // 任务中心读取会刷新进度;每日登录埋点应由登录成功链路提前记录。 #[spacetimedb::procedure] pub fn get_profile_task_center( ctx: &mut ProcedureContext, input: RuntimeProfileTaskCenterGetInput, ) -> RuntimeProfileTaskCenterProcedureResult { match ctx.try_with_tx(|tx| get_profile_task_center_snapshot(tx, input.clone(), false)) { Ok(record) => RuntimeProfileTaskCenterProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileTaskCenterProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 领奖记录与光点流水在同一事务内写入,避免任务状态和钱包余额漂移。 #[spacetimedb::procedure] pub fn claim_profile_task_reward_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileTaskClaimInput, ) -> RuntimeProfileTaskClaimProcedureResult { match ctx.try_with_tx(|tx| claim_profile_task_reward_record(tx, input.clone())) { Ok(record) => RuntimeProfileTaskClaimProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileTaskClaimProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_list_profile_task_configs( ctx: &mut ProcedureContext, input: RuntimeProfileTaskConfigAdminListInput, ) -> RuntimeProfileTaskConfigAdminListProcedureResult { match ctx.try_with_tx(|tx| list_profile_task_config_snapshots(tx, input.clone())) { Ok(entries) => RuntimeProfileTaskConfigAdminListProcedureResult { ok: true, entries, error_message: None, }, Err(message) => RuntimeProfileTaskConfigAdminListProcedureResult { ok: false, entries: Vec::new(), error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_upsert_profile_task_config( ctx: &mut ProcedureContext, input: RuntimeProfileTaskConfigAdminUpsertInput, ) -> RuntimeProfileTaskConfigAdminProcedureResult { match ctx.try_with_tx(|tx| upsert_profile_task_config_record(tx, input.clone())) { Ok(record) => RuntimeProfileTaskConfigAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileTaskConfigAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_disable_profile_task_config( ctx: &mut ProcedureContext, input: RuntimeProfileTaskConfigAdminDisableInput, ) -> RuntimeProfileTaskConfigAdminProcedureResult { match ctx.try_with_tx(|tx| disable_profile_task_config_record(tx, input.clone())) { Ok(record) => RuntimeProfileTaskConfigAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileTaskConfigAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 新用户注册赠送由后端注册链路调用;流水 ID 固定,保证重试不重复发放。 #[spacetimedb::procedure] pub fn grant_new_user_registration_wallet_reward( ctx: &mut ProcedureContext, input: RuntimeProfileDashboardGetInput, ) -> RuntimeProfileWalletAdjustmentProcedureResult { match ctx.try_with_tx(|tx| grant_new_user_registration_wallet_reward_tx(tx, input.clone())) { Ok(record) => RuntimeProfileWalletAdjustmentProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileWalletAdjustmentProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 资产生成由 Axum 调用外部模型,钱包扣费必须先在 SpacetimeDB 内原子落账。 #[spacetimedb::procedure] pub fn consume_profile_wallet_points_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileWalletAdjustmentInput, ) -> RuntimeProfileWalletAdjustmentProcedureResult { match ctx.try_with_tx(|tx| { apply_profile_wallet_adjustment( tx, input.clone(), RuntimeProfileWalletLedgerSourceType::AssetOperationConsume, true, ) }) { Ok(record) => RuntimeProfileWalletAdjustmentProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileWalletAdjustmentProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 生成链路失败时由 Axum 调用退款,ledger_id 幂等保证重复补偿不会重复加钱。 #[spacetimedb::procedure] pub fn refund_profile_wallet_points_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileWalletAdjustmentInput, ) -> RuntimeProfileWalletAdjustmentProcedureResult { match ctx.try_with_tx(|tx| { apply_profile_wallet_adjustment( tx, input.clone(), RuntimeProfileWalletLedgerSourceType::AssetOperationRefund, false, ) }) { Ok(record) => RuntimeProfileWalletAdjustmentProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileWalletAdjustmentProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // play stats 与 dashboard 共用 dashboard projection 的 total_play_time / updated_at,避免 Axum 侧拼装。 #[spacetimedb::procedure] pub fn get_profile_play_stats( ctx: &mut ProcedureContext, input: RuntimeProfilePlayStatsGetInput, ) -> RuntimeProfilePlayStatsProcedureResult { match ctx.try_with_tx(|tx| get_profile_play_stats_snapshot(tx, input.clone())) { Ok(record) => RuntimeProfilePlayStatsProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfilePlayStatsProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 账户充值中心只读快照,套餐和权益由后端返回,前端不保存业务价格表。 #[spacetimedb::procedure] pub fn get_profile_recharge_center( ctx: &mut ProcedureContext, input: RuntimeProfileRechargeCenterGetInput, ) -> RuntimeProfileRechargeCenterProcedureResult { match ctx.try_with_tx(|tx| get_profile_recharge_center_snapshot(tx, input.clone())) { Ok(record) => RuntimeProfileRechargeCenterProcedureResult { ok: true, record: Some(record), order: None, error_message: None, }, Err(message) => RuntimeProfileRechargeCenterProcedureResult { ok: false, record: None, order: None, error_message: Some(message), }, } } // 当前阶段没有真实支付网关,下单后在服务端模拟支付成功并立即写入权益。 #[spacetimedb::procedure] pub fn create_profile_recharge_order_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileRechargeOrderCreateInput, ) -> RuntimeProfileRechargeCenterProcedureResult { match ctx.try_with_tx(|tx| create_profile_recharge_order_record(tx, input.clone())) { Ok((record, order)) => RuntimeProfileRechargeCenterProcedureResult { ok: true, record: Some(record), order: Some(order), error_message: None, }, Err(message) => RuntimeProfileRechargeCenterProcedureResult { ok: false, record: None, order: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn submit_profile_feedback_and_return( ctx: &mut ProcedureContext, input: RuntimeProfileFeedbackSubmissionInput, ) -> RuntimeProfileFeedbackSubmissionProcedureResult { match ctx.try_with_tx(|tx| submit_profile_feedback_record(tx, input.clone())) { Ok(record) => RuntimeProfileFeedbackSubmissionProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileFeedbackSubmissionProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 邀请中心会在首次打开时为账号创建稳定邀请码,前端只展示这里返回的后端状态。 #[spacetimedb::procedure] pub fn get_profile_referral_invite_center( ctx: &mut ProcedureContext, input: RuntimeReferralInviteCenterGetInput, ) -> RuntimeReferralInviteCenterProcedureResult { match ctx.try_with_tx(|tx| get_profile_referral_invite_center_snapshot(tx, input.clone())) { Ok(record) => RuntimeReferralInviteCenterProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeReferralInviteCenterProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 填码绑定、每日邀请者奖励上限和双方光点发放都在同一事务内完成。 #[spacetimedb::procedure] pub fn redeem_profile_referral_invite_code( ctx: &mut ProcedureContext, input: RuntimeReferralRedeemInput, ) -> RuntimeReferralRedeemProcedureResult { match ctx.try_with_tx(|tx| redeem_profile_referral_invite_code_record(tx, input.clone())) { Ok(record) => RuntimeReferralRedeemProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeReferralRedeemProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // 兑换码奖励、usage 与钱包流水必须在同一事务内落库,避免到账和计次分离。 #[spacetimedb::procedure] pub fn redeem_profile_reward_code( ctx: &mut ProcedureContext, input: RuntimeProfileRewardCodeRedeemInput, ) -> RuntimeProfileRewardCodeRedeemProcedureResult { match ctx.try_with_tx(|tx| redeem_profile_reward_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileRewardCodeRedeemProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileRewardCodeRedeemProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_upsert_profile_redeem_code( ctx: &mut ProcedureContext, input: RuntimeProfileRedeemCodeAdminUpsertInput, ) -> RuntimeProfileRedeemCodeAdminProcedureResult { match ctx.try_with_tx(|tx| admin_upsert_profile_redeem_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_disable_profile_redeem_code( ctx: &mut ProcedureContext, input: RuntimeProfileRedeemCodeAdminDisableInput, ) -> RuntimeProfileRedeemCodeAdminProcedureResult { match ctx.try_with_tx(|tx| admin_disable_profile_redeem_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileRedeemCodeAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_list_profile_redeem_codes( ctx: &mut ProcedureContext, input: RuntimeProfileRedeemCodeAdminListInput, ) -> RuntimeProfileRedeemCodeAdminListProcedureResult { match ctx.try_with_tx(|tx| admin_list_profile_redeem_code_records(tx, input.clone())) { Ok(entries) => RuntimeProfileRedeemCodeAdminListProcedureResult { ok: true, entries, error_message: None, }, Err(message) => RuntimeProfileRedeemCodeAdminListProcedureResult { ok: false, entries: Vec::new(), error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_upsert_profile_invite_code( ctx: &mut ProcedureContext, input: RuntimeProfileInviteCodeAdminUpsertInput, ) -> RuntimeProfileInviteCodeAdminProcedureResult { match ctx.try_with_tx(|tx| admin_upsert_profile_invite_code_record(tx, input.clone())) { Ok(record) => RuntimeProfileInviteCodeAdminProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => RuntimeProfileInviteCodeAdminProcedureResult { ok: false, record: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn admin_list_profile_invite_codes( ctx: &mut ProcedureContext, input: RuntimeProfileInviteCodeAdminListInput, ) -> RuntimeProfileInviteCodeAdminListProcedureResult { match ctx.try_with_tx(|tx| admin_list_profile_invite_code_records(tx, input.clone())) { Ok(entries) => RuntimeProfileInviteCodeAdminListProcedureResult { ok: true, entries, error_message: None, }, Err(message) => RuntimeProfileInviteCodeAdminListProcedureResult { ok: false, entries: Vec::new(), error_message: Some(message), }, } } pub(crate) fn list_profile_save_archive_rows( ctx: &ReducerContext, input: RuntimeProfileSaveArchiveListInput, ) -> Result, String> { let validated_input = build_runtime_profile_save_archive_list_input(input.user_id) .map_err(|error| error.to_string())?; let mut entries = ctx .db .profile_save_archive() .iter() .filter(|row| row.user_id == validated_input.user_id) .map(|row| build_profile_save_archive_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { right .saved_at_micros .cmp(&left.saved_at_micros) .then_with(|| left.archive_id.cmp(&right.archive_id)) }); Ok(entries) } pub(crate) fn resume_profile_save_archive_record( ctx: &ReducerContext, input: RuntimeProfileSaveArchiveResumeInput, ) -> Result<(RuntimeProfileSaveArchiveSnapshot, RuntimeSnapshot), String> { let validated_input = build_runtime_profile_save_archive_resume_input(input.user_id, input.world_key) .map_err(|error| error.to_string())?; let archive = ctx .db .profile_save_archive() .iter() .find(|row| { row.user_id == validated_input.user_id && row.world_key == validated_input.world_key }) .ok_or_else(|| "profile_save_archive 对应 world_key 不存在".to_string())?; let existing_snapshot = ctx .db .runtime_snapshot() .user_id() .find(&validated_input.user_id); let created_at = existing_snapshot .as_ref() .map(|row| row.created_at) .unwrap_or(archive.saved_at); if let Some(existing) = existing_snapshot { ctx.db .runtime_snapshot() .user_id() .delete(&existing.user_id); } ctx.db.runtime_snapshot().insert(RuntimeSnapshotRow { user_id: archive.user_id.clone(), version: SAVE_SNAPSHOT_VERSION, saved_at: archive.saved_at, bottom_tab: archive.bottom_tab.clone(), game_state_json: archive.game_state_json.clone(), current_story_json: archive.current_story_json.clone(), created_at, updated_at: archive.saved_at, }); Ok(( build_profile_save_archive_snapshot_from_row(&archive), RuntimeSnapshot { user_id: archive.user_id.clone(), version: SAVE_SNAPSHOT_VERSION, saved_at_micros: archive.saved_at.to_micros_since_unix_epoch(), bottom_tab: archive.bottom_tab.clone(), game_state_json: archive.game_state_json.clone(), current_story_json: archive.current_story_json.clone(), created_at_micros: created_at.to_micros_since_unix_epoch(), updated_at_micros: archive.saved_at.to_micros_since_unix_epoch(), }, )) } pub(crate) fn sync_profile_projections_from_snapshot( ctx: &ReducerContext, snapshot: &RuntimeSnapshot, ) -> Result<(), String> { let game_state = parse_json_str(&snapshot.game_state_json)?; let game_state_object = game_state.as_object(); let saved_at = Timestamp::from_micros_since_unix_epoch(snapshot.saved_at_micros); if module_runtime::is_non_persistent_runtime_snapshot(&game_state) { return Ok(()); } sync_profile_dashboard_from_snapshot(ctx, snapshot, game_state_object, saved_at); sync_profile_save_archive_from_snapshot(ctx, snapshot, &game_state, saved_at)?; Ok(()) } pub(crate) fn upsert_profile_played_work( ctx: &ReducerContext, input: ProfilePlayedWorkUpsertInput, ) -> Result<(), String> { let user_id = input.user_id.trim(); let world_key = input.world_key.trim(); if user_id.is_empty() { return Err("profile_played_world.user_id 不能为空".to_string()); } if world_key.is_empty() { return Err("profile_played_world.world_key 不能为空".to_string()); } let played_at = Timestamp::from_micros_since_unix_epoch(input.played_at_micros); let played_world_id = build_runtime_profile_played_world_id(user_id, world_key); let existing = ctx .db .profile_played_world() .played_world_id() .find(&played_world_id); if let Some(existing) = existing { ctx.db .profile_played_world() .played_world_id() .delete(&existing.played_world_id); ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: user_id.to_string(), world_key: world_key.to_string(), owner_user_id: input.owner_user_id, profile_id: input.profile_id, world_type: input.world_type, world_title: input.world_title, world_subtitle: input.world_subtitle, first_played_at: existing.first_played_at, last_played_at: played_at, last_observed_play_time_ms: existing.last_observed_play_time_ms, }); } else { ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: user_id.to_string(), world_key: world_key.to_string(), owner_user_id: input.owner_user_id, profile_id: input.profile_id, world_type: input.world_type, world_title: input.world_title, world_subtitle: input.world_subtitle, first_played_at: played_at, last_played_at: played_at, last_observed_play_time_ms: 0, }); } ensure_profile_dashboard_state(ctx, user_id, played_at); Ok(()) } pub(crate) fn add_profile_observed_play_time( ctx: &ReducerContext, user_id: &str, world_key: &str, elapsed_ms: u64, observed_at_micros: i64, ) -> Result<(), String> { let user_id = user_id.trim(); let world_key = world_key.trim(); if user_id.is_empty() || world_key.is_empty() || elapsed_ms == 0 { return Ok(()); } let observed_at = Timestamp::from_micros_since_unix_epoch(observed_at_micros); let played_world_id = build_runtime_profile_played_world_id(user_id, world_key); if let Some(existing) = ctx .db .profile_played_world() .played_world_id() .find(&played_world_id) { ctx.db .profile_played_world() .played_world_id() .delete(&existing.played_world_id); ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: existing.user_id, world_key: existing.world_key, owner_user_id: existing.owner_user_id, profile_id: existing.profile_id, world_type: existing.world_type, world_title: existing.world_title, world_subtitle: existing.world_subtitle, first_played_at: existing.first_played_at, last_played_at: observed_at, last_observed_play_time_ms: existing .last_observed_play_time_ms .saturating_add(elapsed_ms), }); } add_profile_dashboard_play_time(ctx, user_id, elapsed_ms, observed_at); Ok(()) } pub(crate) fn upsert_profile_save_archive( ctx: &ReducerContext, input: ProfileSaveArchiveUpsertInput, ) -> Result<(), String> { let user_id = input.user_id.trim(); let world_key = input.world_key.trim(); if user_id.is_empty() || world_key.is_empty() { return Err("profile_save_archive 参数不能为空".to_string()); } let saved_at = Timestamp::from_micros_since_unix_epoch(input.saved_at_micros); let archive_id = format!("{user_id}:{world_key}"); let existing = ctx.db.profile_save_archive().archive_id().find(&archive_id); let created_at = existing .as_ref() .map(|row| row.created_at) .unwrap_or(saved_at); if let Some(existing) = existing { ctx.db .profile_save_archive() .archive_id() .delete(&existing.archive_id); } ctx.db.profile_save_archive().insert(ProfileSaveArchive { archive_id, user_id: user_id.to_string(), world_key: world_key.to_string(), owner_user_id: input.owner_user_id, profile_id: input.profile_id, world_type: input.world_type, world_name: input.world_name, subtitle: input.subtitle, summary_text: input.summary_text, cover_image_src: input.cover_image_src, saved_at, bottom_tab: input.bottom_tab, game_state_json: input.game_state_json, current_story_json: input.current_story_json, created_at, updated_at: saved_at, }); Ok(()) } pub(crate) fn record_public_work_play( ctx: &ReducerContext, input: PublicWorkPlayRecordInput, ) -> Result<(), String> { let source_type = input.source_type.trim(); let owner_user_id = input.owner_user_id.trim(); let profile_id = input.profile_id.trim(); if source_type.is_empty() || owner_user_id.is_empty() || profile_id.is_empty() { return Err("public_work_play_daily_stat 参数不能为空".to_string()); } let played_day = public_work_play_day_from_micros(input.played_at_micros); let stat_id = build_public_work_play_daily_stat_id(source_type, profile_id, played_day); let updated_at = Timestamp::from_micros_since_unix_epoch(input.played_at_micros); let next_count = ctx .db .public_work_play_daily_stat() .stat_id() .find(&stat_id) .map(|existing| { ctx.db .public_work_play_daily_stat() .stat_id() .delete(&existing.stat_id); existing.play_count.saturating_add(1) }) .unwrap_or(1); ctx.db .public_work_play_daily_stat() .insert(PublicWorkPlayDailyStat { stat_id, source_type: source_type.to_string(), owner_user_id: owner_user_id.to_string(), profile_id: profile_id.to_string(), played_day, play_count: next_count, updated_at, }); Ok(()) } pub(crate) fn record_public_work_like( ctx: &ReducerContext, input: PublicWorkLikeRecordInput, ) -> Result { let source_type = input.source_type.trim(); let owner_user_id = input.owner_user_id.trim(); let profile_id = input.profile_id.trim(); let user_id = input.user_id.trim(); if source_type.is_empty() || owner_user_id.is_empty() || profile_id.is_empty() || user_id.is_empty() { return Err("public_work_like 参数不能为空".to_string()); } let like_id = build_public_work_like_id(source_type, profile_id, user_id); if ctx.db.public_work_like().like_id().find(&like_id).is_some() { return Ok(false); } ctx.db.public_work_like().insert(PublicWorkLike { like_id, source_type: source_type.to_string(), owner_user_id: owner_user_id.to_string(), profile_id: profile_id.to_string(), user_id: user_id.to_string(), liked_at: Timestamp::from_micros_since_unix_epoch(input.liked_at_micros), }); Ok(true) } pub(crate) fn count_recent_public_work_plays( ctx: &ReducerContext, source_type: &str, profile_id: &str, now_micros: i64, ) -> u32 { let source_type = source_type.trim(); let profile_id = profile_id.trim(); if source_type.is_empty() || profile_id.is_empty() { return 0; } let current_day = public_work_play_day_from_micros(now_micros); let first_day = current_day.saturating_sub(PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS - 1); ctx.db .public_work_play_daily_stat() .iter() .filter(|row| { row.source_type == source_type && row.profile_id == profile_id && row.played_day >= first_day && row.played_day <= current_day }) .fold(0u32, |total, row| total.saturating_add(row.play_count)) } fn public_work_play_day_from_micros(value: i64) -> i64 { value.div_euclid(PUBLIC_WORK_PLAY_DAY_MICROS) } fn build_public_work_play_daily_stat_id( source_type: &str, profile_id: &str, played_day: i64, ) -> String { format!("{source_type}:{profile_id}:{played_day}") } fn build_public_work_like_id(source_type: &str, profile_id: &str, user_id: &str) -> String { format!("{source_type}:{profile_id}:{user_id}") } fn ensure_profile_dashboard_state(ctx: &ReducerContext, user_id: &str, updated_at: Timestamp) { if ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()) .is_some() { return; } ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: 0, total_play_time_ms: 0, created_at: updated_at, updated_at, }); } fn add_profile_dashboard_play_time( ctx: &ReducerContext, user_id: &str, elapsed_ms: u64, updated_at: Timestamp, ) { let current = ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()); if let Some(existing) = current { ctx.db .profile_dashboard_state() .user_id() .delete(&existing.user_id); ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: existing.wallet_balance, total_play_time_ms: existing.total_play_time_ms.saturating_add(elapsed_ms), created_at: existing.created_at, updated_at, }); } else { ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: 0, total_play_time_ms: elapsed_ms, created_at: updated_at, updated_at, }); } } fn sync_profile_dashboard_from_snapshot( ctx: &ReducerContext, snapshot: &RuntimeSnapshot, game_state: Option<&serde_json::Map>, saved_at: Timestamp, ) { let current_state = ctx .db .profile_dashboard_state() .user_id() .find(&snapshot.user_id); let previous_wallet_balance = current_state .as_ref() .map(|row| row.wallet_balance) .unwrap_or(0); let previous_total_play_time_ms = current_state .as_ref() .map(|row| row.total_play_time_ms) .unwrap_or(0); let has_business_wallet_ledger = has_profile_business_wallet_ledger(ctx, &snapshot.user_id); let synced_wallet_balance = if has_business_wallet_ledger { None } else { game_state .and_then(|state| state.get("playerCurrency")) .map(|value| module_runtime::read_runtime_json_non_negative_u64(Some(value))) }; let next_wallet_balance = synced_wallet_balance.unwrap_or(previous_wallet_balance); let mut next_total_play_time_ms = previous_total_play_time_ms; if let Some(next_wallet_balance) = synced_wallet_balance && next_wallet_balance != previous_wallet_balance { ctx.db.profile_wallet_ledger().insert(ProfileWalletLedger { wallet_ledger_id: build_runtime_profile_snapshot_wallet_ledger_id( &snapshot.user_id, snapshot.saved_at_micros, next_wallet_balance, ), user_id: snapshot.user_id.clone(), amount_delta: next_wallet_balance as i64 - previous_wallet_balance as i64, balance_after: next_wallet_balance, source_type: RuntimeProfileWalletLedgerSourceType::SnapshotSync, created_at: saved_at, }); } if let Some(world_meta) = module_runtime::resolve_runtime_profile_world_snapshot_meta(game_state) { let current_play_time_ms = module_runtime::read_runtime_json_non_negative_u64( game_state .and_then(|state| state.get("runtimeStats")) .and_then(JsonValue::as_object) .and_then(|stats| stats.get("playTimeMs")), ); let played_world_id = build_runtime_profile_played_world_id(&snapshot.user_id, &world_meta.world_key); let existing = ctx .db .profile_played_world() .played_world_id() .find(&played_world_id); let previous_observed_play_time_ms = existing .as_ref() .map(|row| row.last_observed_play_time_ms) .unwrap_or(0); let incremental_play_time_ms = current_play_time_ms.saturating_sub(previous_observed_play_time_ms); next_total_play_time_ms = next_total_play_time_ms.saturating_add(incremental_play_time_ms); if let Some(existing) = existing { ctx.db .profile_played_world() .played_world_id() .delete(&existing.played_world_id); ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: snapshot.user_id.clone(), world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_title: world_meta.world_title, world_subtitle: world_meta.world_subtitle, first_played_at: existing.first_played_at, last_played_at: saved_at, last_observed_play_time_ms: current_play_time_ms .max(existing.last_observed_play_time_ms), }); } else { ctx.db.profile_played_world().insert(ProfilePlayedWorld { played_world_id, user_id: snapshot.user_id.clone(), world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_title: world_meta.world_title, world_subtitle: world_meta.world_subtitle, first_played_at: saved_at, last_played_at: saved_at, last_observed_play_time_ms: current_play_time_ms, }); } } if let Some(existing) = current_state { ctx.db .profile_dashboard_state() .user_id() .delete(&existing.user_id); ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: snapshot.user_id.clone(), wallet_balance: next_wallet_balance, total_play_time_ms: next_total_play_time_ms, created_at: existing.created_at, updated_at: saved_at, }); } else { ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: snapshot.user_id.clone(), wallet_balance: next_wallet_balance, total_play_time_ms: next_total_play_time_ms, created_at: saved_at, updated_at: saved_at, }); } } fn sync_profile_save_archive_from_snapshot( ctx: &ReducerContext, snapshot: &RuntimeSnapshot, game_state: &JsonValue, saved_at: Timestamp, ) -> Result<(), String> { let Some(archive_meta) = module_runtime::resolve_runtime_profile_save_archive_meta( game_state, snapshot.current_story_json.as_deref(), ) else { return Ok(()); }; let archive_id = build_runtime_profile_save_archive_id(&snapshot.user_id, &archive_meta.world_key); let existing = ctx.db.profile_save_archive().archive_id().find(&archive_id); let created_at = existing .as_ref() .map(|row| row.created_at) .unwrap_or(saved_at); if let Some(existing) = existing { ctx.db .profile_save_archive() .archive_id() .delete(&existing.archive_id); } ctx.db.profile_save_archive().insert(ProfileSaveArchive { archive_id, user_id: snapshot.user_id.clone(), world_key: archive_meta.world_key, owner_user_id: archive_meta.owner_user_id, profile_id: archive_meta.profile_id, world_type: archive_meta.world_type, world_name: archive_meta.world_name, subtitle: archive_meta.subtitle, summary_text: archive_meta.summary_text, cover_image_src: archive_meta.cover_image_src, saved_at, bottom_tab: snapshot.bottom_tab.clone(), game_state_json: snapshot.game_state_json.clone(), current_story_json: snapshot.current_story_json.clone(), created_at, updated_at: saved_at, }); Ok(()) } pub(crate) fn build_profile_save_archive_snapshot_from_row( row: &ProfileSaveArchive, ) -> RuntimeProfileSaveArchiveSnapshot { RuntimeProfileSaveArchiveSnapshot { archive_id: row.archive_id.clone(), user_id: row.user_id.clone(), world_key: row.world_key.clone(), owner_user_id: row.owner_user_id.clone(), profile_id: row.profile_id.clone(), world_type: row.world_type.clone(), world_name: row.world_name.clone(), subtitle: row.subtitle.clone(), summary_text: row.summary_text.clone(), cover_image_src: row.cover_image_src.clone(), saved_at_micros: row.saved_at.to_micros_since_unix_epoch(), bottom_tab: row.bottom_tab.clone(), game_state_json: row.game_state_json.clone(), current_story_json: row.current_story_json.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn read_string_from_json(value: Option<&JsonValue>) -> Option { value .and_then(JsonValue::as_str) .map(str::trim) .filter(|value| !value.is_empty()) .map(ToString::to_string) } fn resolve_profile_world_snapshot_meta( game_state: Option<&serde_json::Map>, ) -> Option { let game_state = game_state?; let custom_world_profile = game_state .get("customWorldProfile") .and_then(JsonValue::as_object); if let Some(custom_world_profile) = custom_world_profile { let profile_id = read_string_from_json(custom_world_profile.get("id")); let world_title = read_string_from_json(custom_world_profile.get("name")) .or_else(|| read_string_from_json(custom_world_profile.get("title"))); if profile_id.is_some() || world_title.is_some() { let world_title = world_title.unwrap_or_else(|| "自定义世界".to_string()); return Some(RuntimeProfileWorldSnapshotMeta { world_key: profile_id .as_ref() .map(|profile_id| format!("custom:{profile_id}")) .unwrap_or_else(|| format!("custom:{world_title}")), owner_user_id: None, profile_id, world_type: Some("CUSTOM".to_string()), world_title, world_subtitle: read_string_from_json(custom_world_profile.get("summary")) .or_else(|| read_string_from_json(custom_world_profile.get("settingText"))) .unwrap_or_default(), }); } } let world_type = read_string_from_json(game_state.get("worldType"))?; let current_scene_preset = game_state .get("currentScenePreset") .and_then(JsonValue::as_object); Some(RuntimeProfileWorldSnapshotMeta { world_key: format!("builtin:{world_type}"), owner_user_id: None, profile_id: None, world_type: Some(world_type.clone()), world_title: current_scene_preset .and_then(|preset| read_string_from_json(preset.get("name"))) .unwrap_or_else(|| build_builtin_world_title(&world_type)), world_subtitle: current_scene_preset .and_then(|preset| { read_string_from_json(preset.get("summary")) .or_else(|| read_string_from_json(preset.get("description"))) }) .unwrap_or_default(), }) } fn resolve_profile_save_archive_meta( game_state: &JsonValue, current_story_json: Option<&str>, ) -> Option { if is_non_persistent_runtime_snapshot(game_state) { return None; } let game_state_object = game_state.as_object(); let world_meta = resolve_profile_world_snapshot_meta(game_state_object)?; let story_engine_memory = game_state_object .and_then(|state| state.get("storyEngineMemory")) .and_then(JsonValue::as_object); let continue_game_digest = story_engine_memory .and_then(|memory| read_string_from_json(memory.get("continueGameDigest"))); let current_story_text = parse_optional_json_str(current_story_json) .ok() .flatten() .and_then(|story| story.as_object().cloned()) .and_then(|story| read_string_from_json(story.get("text"))); let custom_world_profile = game_state_object .and_then(|state| state.get("customWorldProfile")) .and_then(JsonValue::as_object); if let Some(custom_world_profile) = custom_world_profile { let world_name = read_string_from_json(custom_world_profile.get("name")) .or_else(|| read_string_from_json(custom_world_profile.get("title"))) .unwrap_or_else(|| world_meta.world_title.clone()); let subtitle = read_string_from_json(custom_world_profile.get("summary")) .or_else(|| read_string_from_json(custom_world_profile.get("settingText"))) .unwrap_or_else(|| world_meta.world_subtitle.clone()); let summary_text = continue_game_digest .or(current_story_text) .or_else(|| { if subtitle.is_empty() { None } else { Some(subtitle.clone()) } }) .unwrap_or_else(|| DEFAULT_SAVE_ARCHIVE_SUMMARY_TEXT.to_string()); return Some(RuntimeProfileSaveArchiveMeta { world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_name, subtitle, summary_text, cover_image_src: read_string_from_json(custom_world_profile.get("coverImageSrc")), }); } let summary_text = continue_game_digest .or(current_story_text) .or_else(|| { if world_meta.world_subtitle.is_empty() { None } else { Some(world_meta.world_subtitle.clone()) } }) .unwrap_or_else(|| DEFAULT_SAVE_ARCHIVE_SUMMARY_TEXT.to_string()); let current_scene_preset = game_state_object .and_then(|state| state.get("currentScenePreset")) .and_then(JsonValue::as_object); Some(RuntimeProfileSaveArchiveMeta { world_key: world_meta.world_key, owner_user_id: world_meta.owner_user_id, profile_id: world_meta.profile_id, world_type: world_meta.world_type, world_name: world_meta.world_title, subtitle: world_meta.world_subtitle.clone(), summary_text, cover_image_src: current_scene_preset .and_then(|preset| read_string_from_json(preset.get("imageSrc"))), }) } fn is_non_persistent_runtime_snapshot(game_state: &JsonValue) -> bool { let Some(game_state) = game_state.as_object() else { return false; }; if game_state .get("runtimePersistenceDisabled") .and_then(JsonValue::as_bool) .unwrap_or(false) { return true; } matches!( read_string_from_json(game_state.get("runtimeMode")).as_deref(), Some("preview") | Some("test") ) } fn build_builtin_world_title(world_type: &str) -> String { match world_type { "WUXIA" => "武侠世界".to_string(), "XIANXIA" => "仙侠世界".to_string(), _ => "叙事世界".to_string(), } } fn get_profile_dashboard_snapshot( ctx: &ReducerContext, input: RuntimeProfileDashboardGetInput, ) -> Result { let validated_input = build_runtime_profile_dashboard_get_input(input.user_id) .map_err(|error| error.to_string())?; let state = ctx .db .profile_dashboard_state() .user_id() .find(&validated_input.user_id); let played_world_count = ctx .db .profile_played_world() .iter() .filter(|row| row.user_id == validated_input.user_id) .count() as u32; Ok(match state { Some(existing) => RuntimeProfileDashboardSnapshot { user_id: existing.user_id, wallet_balance: existing.wallet_balance, total_play_time_ms: existing.total_play_time_ms, played_world_count, updated_at_micros: Some(existing.updated_at.to_micros_since_unix_epoch()), }, None => RuntimeProfileDashboardSnapshot { user_id: validated_input.user_id, wallet_balance: 0, total_play_time_ms: 0, played_world_count, updated_at_micros: None, }, }) } fn list_profile_wallet_ledger_entries( ctx: &ReducerContext, input: RuntimeProfileWalletLedgerListInput, ) -> Result, String> { let validated_input = build_runtime_profile_wallet_ledger_list_input(input.user_id) .map_err(|error| error.to_string())?; let mut entries = ctx .db .profile_wallet_ledger() .iter() .filter(|row| row.user_id == validated_input.user_id) .map(|row| build_profile_wallet_ledger_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { right .created_at_micros .cmp(&left.created_at_micros) .then_with(|| left.wallet_ledger_id.cmp(&right.wallet_ledger_id)) }); entries.truncate(PROFILE_WALLET_LEDGER_LIST_LIMIT); Ok(entries) } fn get_profile_play_stats_snapshot( ctx: &ReducerContext, input: RuntimeProfilePlayStatsGetInput, ) -> Result { let validated_input = build_runtime_profile_play_stats_get_input(input.user_id) .map_err(|error| error.to_string())?; let dashboard_state = ctx .db .profile_dashboard_state() .user_id() .find(&validated_input.user_id); let mut played_works = ctx .db .profile_played_world() .iter() .filter(|row| row.user_id == validated_input.user_id) .map(|row| build_profile_played_world_snapshot_from_row(&row)) .collect::>(); played_works.sort_by(|left, right| { right .last_played_at_micros .cmp(&left.last_played_at_micros) .then_with(|| left.played_world_id.cmp(&right.played_world_id)) }); Ok(RuntimeProfilePlayStatsSnapshot { user_id: validated_input.user_id, total_play_time_ms: dashboard_state .as_ref() .map(|row| row.total_play_time_ms) .unwrap_or(0), played_works, updated_at_micros: dashboard_state .as_ref() .map(|row| row.updated_at.to_micros_since_unix_epoch()), }) } fn get_profile_recharge_center_snapshot( ctx: &ReducerContext, input: RuntimeProfileRechargeCenterGetInput, ) -> Result { let validated_input = build_runtime_profile_recharge_center_get_input(input.user_id) .map_err(|error| error.to_string())?; Ok(build_profile_recharge_center_snapshot( ctx, &validated_input.user_id, )) } fn create_profile_recharge_order_record( ctx: &ReducerContext, input: RuntimeProfileRechargeOrderCreateInput, ) -> Result< ( RuntimeProfileRechargeCenterSnapshot, RuntimeProfileRechargeOrderSnapshot, ), String, > { let validated_input = build_runtime_profile_recharge_order_create_input( input.user_id, input.product_id, input.payment_channel, input.created_at_micros, ) .map_err(|error| error.to_string())?; let product = runtime_profile_recharge_product_by_id(&validated_input.product_id) .ok_or_else(|| "recharge.product_id 不存在".to_string())?; let created_at = Timestamp::from_micros_since_unix_epoch(validated_input.created_at_micros); let (points_delta, membership_expires_at) = match product.kind { RuntimeProfileRechargeProductKind::Points => { let has_recharged = has_profile_points_recharged(ctx, &validated_input.user_id); let points_delta = resolve_runtime_profile_points_recharge_delta(&product, has_recharged); apply_profile_wallet_delta( ctx, &validated_input.user_id, points_delta, RuntimeProfileWalletLedgerSourceType::PointsRecharge, &build_runtime_profile_recharge_wallet_ledger_id( &validated_input.user_id, validated_input.created_at_micros, &product.product_id, ), created_at, )?; (points_delta as i64, None) } RuntimeProfileRechargeProductKind::Membership => { let expires_at = apply_profile_membership_purchase( ctx, &validated_input.user_id, product.tier, product.duration_days, created_at, ); (0, Some(expires_at)) } }; let order = ProfileRechargeOrder { order_id: build_runtime_profile_recharge_order_id( &validated_input.user_id, validated_input.created_at_micros, &product.product_id, ), user_id: validated_input.user_id.clone(), product_id: product.product_id.clone(), product_title: product.title.clone(), kind: product.kind, amount_cents: product.price_cents, status: RuntimeProfileRechargeOrderStatus::Paid, payment_channel: validated_input.payment_channel, paid_at: created_at, created_at, points_delta, membership_expires_at, }; ctx.db.profile_recharge_order().insert(order); let latest_order = latest_profile_recharge_order(ctx, &validated_input.user_id) .ok_or_else(|| "profile_recharge_order 写入后未能读取".to_string())?; Ok(( build_profile_recharge_center_snapshot(ctx, &validated_input.user_id), build_profile_recharge_order_snapshot_from_row(&latest_order), )) } fn submit_profile_feedback_record( ctx: &ReducerContext, input: RuntimeProfileFeedbackSubmissionInput, ) -> Result { let validated_input = build_runtime_profile_feedback_submission_input( input.user_id, input.description, input.contact_phone, input.evidence_items, input.created_at_micros, ) .map_err(|error| error.to_string())?; let created_at = Timestamp::from_micros_since_unix_epoch(validated_input.created_at_micros); let feedback_id = build_runtime_profile_feedback_submission_id( &validated_input.user_id, validated_input.created_at_micros, ); let evidence_json = serde_json::to_string(&validated_input.evidence_items) .map_err(|error| format!("反馈凭证序列化失败: {error}"))?; let row = ProfileFeedbackSubmission { feedback_id: feedback_id.clone(), user_id: validated_input.user_id, description: validated_input.description, contact_phone: validated_input.contact_phone, evidence_json, status: RuntimeProfileFeedbackStatus::Open, created_at, updated_at: created_at, }; ctx.db.profile_feedback_submission().insert(row); let latest = ctx .db .profile_feedback_submission() .feedback_id() .find(&feedback_id) .ok_or_else(|| "profile_feedback_submission 写入后未能读取".to_string())?; Ok(build_profile_feedback_submission_snapshot_from_row(&latest)) } fn get_profile_referral_invite_center_snapshot( ctx: &ReducerContext, input: RuntimeReferralInviteCenterGetInput, ) -> Result { let validated_input = build_runtime_referral_invite_center_get_input(input.user_id) .map_err(|error| error.to_string())?; Ok(build_profile_referral_invite_center_snapshot( ctx, &validated_input.user_id, )) } fn redeem_profile_referral_invite_code_record( ctx: &ReducerContext, input: RuntimeReferralRedeemInput, ) -> Result { let validated_input = build_runtime_referral_redeem_input( input.user_id, input.invite_code, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let bound_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let invitee_user_id = validated_input.user_id; let invite_code = validated_input.invite_code; if ctx .db .profile_referral_relation() .invitee_user_id() .find(&invitee_user_id) .is_some() { return Err("每个用户最多只能填写一个邀请码".to_string()); } let inviter_code = ctx .db .profile_invite_code() .invite_code() .find(&invite_code) .ok_or_else(|| "邀请码不存在".to_string())?; validate_profile_invite_code_redeem_time(&inviter_code, validated_input.updated_at_micros)?; if inviter_code.user_id == invitee_user_id { return Err("不能填写自己的邀请码".to_string()); } let invitee_balance_after = apply_profile_wallet_delta( ctx, &invitee_user_id, PROFILE_REFERRAL_REWARD_POINTS, RuntimeProfileWalletLedgerSourceType::InviteInviteeReward, &build_runtime_profile_referral_invitee_ledger_id( &invitee_user_id, validated_input.updated_at_micros, ), bound_at, )?; let is_admin_invite_code = is_admin_profile_invite_code_user_id(&inviter_code.user_id); let today_inviter_reward_count = if is_admin_invite_code { 0 } else { count_today_profile_referral_inviter_rewards(ctx, &inviter_code.user_id, bound_at) }; let inviter_reward_granted = !is_admin_invite_code && module_runtime::should_grant_runtime_profile_inviter_reward(today_inviter_reward_count); let inviter_balance_after = if inviter_reward_granted { apply_profile_wallet_delta( ctx, &inviter_code.user_id, PROFILE_REFERRAL_REWARD_POINTS, RuntimeProfileWalletLedgerSourceType::InviteInviterReward, &build_runtime_profile_referral_inviter_ledger_id( &inviter_code.user_id, validated_input.updated_at_micros, ), bound_at, )? } else { profile_wallet_balance(ctx, &inviter_code.user_id) }; ctx.db .profile_referral_relation() .insert(ProfileReferralRelation { invitee_user_id: invitee_user_id.clone(), inviter_user_id: inviter_code.user_id, invite_code, inviter_reward_granted, invitee_reward_granted: true, bound_at, }); Ok(RuntimeReferralRedeemSnapshot { center: build_profile_referral_invite_center_snapshot(ctx, &invitee_user_id), invitee_reward_granted: true, inviter_reward_granted, invitee_balance_after, inviter_balance_after, }) } fn redeem_profile_reward_code_record( ctx: &ReducerContext, input: RuntimeProfileRewardCodeRedeemInput, ) -> Result { let validated_input = build_runtime_profile_reward_code_redeem_input( input.user_id, input.code, input.redeemed_at_micros, ) .map_err(|error| error.to_string())?; let redeemed_at = Timestamp::from_micros_since_unix_epoch(validated_input.redeemed_at_micros); let user_id = validated_input.user_id; let code = validated_input.code; let redeem_code = ctx .db .profile_redeem_code() .code() .find(&code) .ok_or_else(|| "兑换码不存在".to_string())?; let user_used_count = count_profile_redeem_code_user_usage(ctx, &code, &user_id); validate_runtime_profile_redeem_code_usage( &build_profile_redeem_code_snapshot_from_row(&redeem_code), &user_id, user_used_count, ) .map_err(|error| error.to_string())?; let usage_id = build_runtime_profile_redeem_code_usage_id( &code, &user_id, validated_input.redeemed_at_micros, user_used_count, ); let wallet_ledger_id = build_runtime_profile_redeem_code_ledger_id(&usage_id); let wallet_balance = apply_profile_wallet_delta( ctx, &user_id, redeem_code.reward_points, RuntimeProfileWalletLedgerSourceType::RedeemCodeReward, &wallet_ledger_id, redeemed_at, )?; ctx.db .profile_redeem_code_usage() .insert(ProfileRedeemCodeUsage { usage_id, code: code.clone(), user_id, amount_granted: redeem_code.reward_points, created_at: redeemed_at, }); let next_code = ProfileRedeemCode { global_used_count: redeem_code.global_used_count.saturating_add(1), updated_at: redeemed_at, ..redeem_code }; ctx.db.profile_redeem_code().code().delete(&code); ctx.db.profile_redeem_code().insert(next_code); let ledger_entry = ctx .db .profile_wallet_ledger() .wallet_ledger_id() .find(&wallet_ledger_id) .ok_or_else(|| "兑换码钱包流水写入失败".to_string())?; Ok(RuntimeProfileRewardCodeRedeemSnapshot { wallet_balance, amount_granted: ledger_entry.amount_delta.max(0) as u64, ledger_entry: build_profile_wallet_ledger_snapshot_from_row(&ledger_entry), }) } fn admin_upsert_profile_redeem_code_record( ctx: &ReducerContext, input: RuntimeProfileRedeemCodeAdminUpsertInput, ) -> Result { let validated_input = build_runtime_profile_redeem_code_admin_upsert_input( input.admin_user_id, input.code, input.mode, input.reward_points, input.max_uses, input.enabled, input.allowed_user_ids, input.allowed_public_user_codes, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let allowed_user_ids = resolve_profile_redeem_code_allowed_user_ids(ctx, &validated_input)?; let existing = ctx .db .profile_redeem_code() .code() .find(&validated_input.code); let created_at = existing .as_ref() .map(|row| row.created_at) .unwrap_or(updated_at); let global_used_count = existing .as_ref() .map(|row| row.global_used_count) .unwrap_or(0); if let Some(existing) = existing { ctx.db.profile_redeem_code().code().delete(&existing.code); } let row = ProfileRedeemCode { code: validated_input.code, mode: validated_input.mode, reward_points: validated_input.reward_points, max_uses: validated_input.max_uses, global_used_count, enabled: validated_input.enabled, allowed_user_ids, created_by: validated_input.admin_user_id, created_at, updated_at, }; let inserted = ctx.db.profile_redeem_code().insert(row); Ok(build_profile_redeem_code_snapshot_from_row(&inserted)) } fn admin_disable_profile_redeem_code_record( ctx: &ReducerContext, input: RuntimeProfileRedeemCodeAdminDisableInput, ) -> Result { let validated_input = build_runtime_profile_redeem_code_admin_disable_input( input.admin_user_id, input.code, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let existing = ctx .db .profile_redeem_code() .code() .find(&validated_input.code) .ok_or_else(|| "兑换码不存在".to_string())?; ctx.db.profile_redeem_code().code().delete(&existing.code); let inserted = ctx.db.profile_redeem_code().insert(ProfileRedeemCode { enabled: false, updated_at, ..existing }); Ok(build_profile_redeem_code_snapshot_from_row(&inserted)) } fn admin_upsert_profile_invite_code_record( ctx: &ReducerContext, input: RuntimeProfileInviteCodeAdminUpsertInput, ) -> Result { let validated_input = build_runtime_profile_invite_code_admin_upsert_input( input.admin_user_id, input.invite_code, input.metadata_json, input.starts_at_micros, input.expires_at_micros, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let user_id = build_admin_profile_invite_code_user_id( &validated_input.admin_user_id, &validated_input.invite_code, ); if let Some(existing) = ctx .db .profile_invite_code() .invite_code() .find(&validated_input.invite_code) { if existing.user_id != user_id { return Err("邀请码已被其他用户占用".to_string()); } ctx.db .profile_invite_code() .user_id() .delete(&existing.user_id); let inserted = ctx.db.profile_invite_code().insert(ProfileInviteCode { user_id, invite_code: validated_input.invite_code, metadata_json: validated_input.metadata_json, created_at: existing.created_at, updated_at, starts_at: validated_input .starts_at_micros .map(Timestamp::from_micros_since_unix_epoch), expires_at: validated_input .expires_at_micros .map(Timestamp::from_micros_since_unix_epoch), }); return Ok(build_profile_invite_code_snapshot_from_row(&inserted)); } let inserted = ctx.db.profile_invite_code().insert(ProfileInviteCode { user_id, invite_code: validated_input.invite_code, metadata_json: validated_input.metadata_json, created_at: updated_at, updated_at, starts_at: validated_input .starts_at_micros .map(Timestamp::from_micros_since_unix_epoch), expires_at: validated_input .expires_at_micros .map(Timestamp::from_micros_since_unix_epoch), }); Ok(build_profile_invite_code_snapshot_from_row(&inserted)) } fn build_profile_referral_invite_center_snapshot( ctx: &ReducerContext, user_id: &str, ) -> RuntimeReferralInviteCenterSnapshot { let code = ensure_profile_invite_code(ctx, user_id); let today_inviter_reward_count = count_today_profile_referral_inviter_rewards(ctx, user_id, ctx.timestamp); let invited_count = ctx .db .profile_referral_relation() .iter() .filter(|row| row.inviter_user_id == user_id) .count() as u32; let rewarded_invite_count = ctx .db .profile_referral_relation() .iter() .filter(|row| row.inviter_user_id == user_id && row.inviter_reward_granted) .count() as u32; let bound_relation = ctx .db .profile_referral_relation() .invitee_user_id() .find(&user_id.to_string()); RuntimeReferralInviteCenterSnapshot { user_id: user_id.to_string(), invite_code: code.invite_code.clone(), invite_link_path: build_runtime_profile_invite_link_path(&code.invite_code), invited_count, rewarded_invite_count, today_inviter_reward_count, today_inviter_reward_remaining: PROFILE_REFERRAL_DAILY_INVITER_REWARD_LIMIT .saturating_sub(today_inviter_reward_count), reward_points: PROFILE_REFERRAL_REWARD_POINTS, invited_users: list_profile_referral_invited_users(ctx, user_id), has_redeemed_code: bound_relation.is_some(), bound_inviter_user_id: bound_relation .as_ref() .map(|relation| relation.inviter_user_id.clone()), bound_at_micros: bound_relation .as_ref() .map(|relation| relation.bound_at.to_micros_since_unix_epoch()), updated_at_micros: code.updated_at.to_micros_since_unix_epoch(), } } fn list_profile_referral_invited_users( ctx: &ReducerContext, inviter_user_id: &str, ) -> Vec { // 中文注释:邀请面板只展示最近成功邀请用户,完整统计仍由计数字段承担。 let inviter_user_id = inviter_user_id.to_string(); let mut relations = ctx .db .profile_referral_relation() .by_profile_referral_inviter_user_id() .filter(&inviter_user_id) .collect::>(); relations.sort_by(|left, right| { right .bound_at .to_micros_since_unix_epoch() .cmp(&left.bound_at.to_micros_since_unix_epoch()) }); relations .into_iter() .take(PROFILE_REFERRAL_INVITED_USERS_LIMIT) .map(|relation| { let account = ctx .db .user_account() .user_id() .find(&relation.invitee_user_id); RuntimeReferralInvitedUserSnapshot { user_id: relation.invitee_user_id, display_name: account .as_ref() .map(|user| user.display_name.trim()) .filter(|name| !name.is_empty()) .unwrap_or("玩家") .to_string(), avatar_url: account.and_then(|user| user.avatar_url), bound_at_micros: relation.bound_at.to_micros_since_unix_epoch(), } }) .collect() } fn ensure_profile_invite_code(ctx: &ReducerContext, user_id: &str) -> ProfileInviteCode { if let Some(row) = ctx .db .profile_invite_code() .user_id() .find(&user_id.to_string()) { return row; } let mut invite_code = build_runtime_profile_invite_code(user_id, 0); let mut salt = 1; while ctx .db .profile_invite_code() .invite_code() .find(&invite_code) .is_some() { invite_code = build_runtime_profile_invite_code(user_id, salt); salt += 1; } ctx.db.profile_invite_code().insert(ProfileInviteCode { user_id: user_id.to_string(), invite_code, metadata_json: PROFILE_INVITE_CODE_METADATA_DEFAULT_JSON.to_string(), created_at: ctx.timestamp, updated_at: ctx.timestamp, starts_at: None, expires_at: None, }) } fn validate_profile_invite_code_redeem_time( invite_code: &ProfileInviteCode, now_micros: i64, ) -> Result<(), String> { if invite_code .starts_at .map(|starts_at| now_micros < starts_at.to_micros_since_unix_epoch()) .unwrap_or(false) { return Err("邀请码未生效".to_string()); } if invite_code .expires_at .map(|expires_at| now_micros >= expires_at.to_micros_since_unix_epoch()) .unwrap_or(false) { return Err("邀请码已过期".to_string()); } Ok(()) } fn count_today_profile_referral_inviter_rewards( ctx: &ReducerContext, user_id: &str, now: Timestamp, ) -> u32 { let day_start_micros = runtime_profile_day_start_micros(now.to_micros_since_unix_epoch()); ctx.db .profile_wallet_ledger() .iter() .filter(|row| { row.user_id == user_id && row.source_type == RuntimeProfileWalletLedgerSourceType::InviteInviterReward && row.created_at.to_micros_since_unix_epoch() >= day_start_micros }) .count() as u32 } fn is_admin_profile_invite_code_user_id(user_id: &str) -> bool { user_id.starts_with("admin:") } fn build_admin_profile_invite_code_user_id(admin_user_id: &str, invite_code: &str) -> String { format!("admin:{}:{}", admin_user_id, invite_code) } fn profile_wallet_balance(ctx: &ReducerContext, user_id: &str) -> u64 { ctx.db .profile_dashboard_state() .user_id() .find(&user_id.to_string()) .map(|row| row.wallet_balance) .unwrap_or(0) } fn build_new_user_registration_wallet_ledger_id(user_id: &str) -> String { format!("{PROFILE_NEW_USER_REGISTRATION_LEDGER_PREFIX}:{user_id}") } fn grant_new_user_registration_wallet_reward_tx( ctx: &ReducerContext, input: RuntimeProfileDashboardGetInput, ) -> Result { let validated_input = build_runtime_profile_dashboard_get_input(input.user_id) .map_err(|error| error.to_string())?; let ledger_id = build_new_user_registration_wallet_ledger_id(&validated_input.user_id); if ctx .db .profile_wallet_ledger() .wallet_ledger_id() .find(&ledger_id) .is_none() { apply_profile_wallet_delta( ctx, &validated_input.user_id, PROFILE_NEW_USER_INITIAL_WALLET_POINTS, RuntimeProfileWalletLedgerSourceType::NewUserRegistrationReward, &ledger_id, ctx.timestamp, )?; } get_profile_dashboard_snapshot( ctx, RuntimeProfileDashboardGetInput { user_id: validated_input.user_id, }, ) } fn build_profile_recharge_center_snapshot( ctx: &ReducerContext, user_id: &str, ) -> RuntimeProfileRechargeCenterSnapshot { let wallet_balance = ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()) .map(|row| row.wallet_balance) .unwrap_or(0); RuntimeProfileRechargeCenterSnapshot { user_id: user_id.to_string(), wallet_balance, membership: build_profile_membership_snapshot(ctx, user_id), point_products: runtime_profile_recharge_point_products(), membership_products: runtime_profile_recharge_membership_products(), benefits: runtime_profile_membership_benefits(), latest_order: latest_profile_recharge_order(ctx, user_id) .map(|row| build_profile_recharge_order_snapshot_from_row(&row)), has_points_recharged: has_profile_points_recharged(ctx, user_id), } } fn get_profile_task_center_snapshot( ctx: &ReducerContext, input: RuntimeProfileTaskCenterGetInput, record_login_event: bool, ) -> Result { let validated_input = build_runtime_profile_task_center_get_input(input.user_id) .map_err(|error| error.to_string())?; ensure_default_profile_task_config(ctx); if record_login_event { record_daily_login_tracking_event(ctx, &validated_input.user_id)?; } build_profile_task_center_snapshot(ctx, &validated_input.user_id, ctx.timestamp) } fn claim_profile_task_reward_record( ctx: &ReducerContext, input: RuntimeProfileTaskClaimInput, ) -> Result { let validated_input = build_runtime_profile_task_claim_input(input.user_id, input.task_id) .map_err(|error| error.to_string())?; ensure_default_profile_task_config(ctx); let config = ctx .db .profile_task_config() .task_id() .find(&validated_input.task_id) .ok_or_else(|| RuntimeProfileFieldError::MissingTaskId.to_string())?; if !config.enabled { return Err(RuntimeProfileFieldError::TaskDisabled.to_string()); } if is_daily_login_task_config(&config) { record_daily_login_tracking_event(ctx, &validated_input.user_id)?; } let day_key = runtime_profile_beijing_day_key(ctx.timestamp.to_micros_since_unix_epoch()); let claim_id = build_runtime_profile_task_claim_id(&validated_input.user_id, &config.task_id, day_key); if ctx .db .profile_task_reward_claim() .claim_id() .find(&claim_id) .is_some() { return Err(RuntimeProfileFieldError::TaskAlreadyClaimed.to_string()); } let progress_count = profile_task_progress_count(ctx, &validated_input.user_id, &config)?; if progress_count < config.threshold { return Err(RuntimeProfileFieldError::TaskNotClaimable.to_string()); } let ledger_id = build_runtime_profile_task_reward_ledger_id( &validated_input.user_id, &config.task_id, day_key, ); let wallet_balance = grant_profile_wallet_points( ctx, &validated_input.user_id, config.reward_points, RuntimeProfileWalletLedgerSourceType::DailyTaskReward, &ledger_id, ctx.timestamp, )?; let claim = ctx .db .profile_task_reward_claim() .insert(ProfileTaskRewardClaim { claim_id: claim_id.clone(), user_id: validated_input.user_id.clone(), task_id: config.task_id.clone(), day_key, reward_points: config.reward_points, wallet_ledger_id: ledger_id.clone(), claimed_at: ctx.timestamp, }); refresh_profile_task_progress(ctx, &validated_input.user_id, &config, day_key)?; let ledger_entry = ctx .db .profile_wallet_ledger() .wallet_ledger_id() .find(&ledger_id) .ok_or_else(|| "任务奖励钱包流水写入失败".to_string())?; Ok(RuntimeProfileTaskClaimSnapshot { user_id: validated_input.user_id.clone(), task_id: config.task_id.clone(), day_key, reward_points: claim.reward_points, wallet_balance, ledger_entry: build_profile_wallet_ledger_snapshot_from_row(&ledger_entry), center: build_profile_task_center_snapshot(ctx, &validated_input.user_id, ctx.timestamp)?, }) } fn list_profile_task_config_snapshots( ctx: &ReducerContext, input: RuntimeProfileTaskConfigAdminListInput, ) -> Result, String> { let _validated_input = build_runtime_profile_task_config_admin_list_input(input.admin_user_id) .map_err(|error| error.to_string())?; ensure_default_profile_task_config(ctx); let mut entries = ctx .db .profile_task_config() .iter() .map(|row| build_profile_task_config_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { left.sort_order .cmp(&right.sort_order) .then_with(|| left.task_id.cmp(&right.task_id)) }); Ok(entries) } fn admin_list_profile_redeem_code_records( ctx: &ReducerContext, input: RuntimeProfileRedeemCodeAdminListInput, ) -> Result, String> { let _validated_input = build_runtime_profile_redeem_code_admin_list_input(input.admin_user_id) .map_err(|error| error.to_string())?; let mut entries = ctx .db .profile_redeem_code() .iter() .map(|row| build_profile_redeem_code_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { right .updated_at_micros .cmp(&left.updated_at_micros) .then_with(|| left.code.cmp(&right.code)) }); Ok(entries) } fn admin_list_profile_invite_code_records( ctx: &ReducerContext, input: RuntimeProfileInviteCodeAdminListInput, ) -> Result, String> { let _validated_input = build_runtime_profile_invite_code_admin_list_input(input.admin_user_id) .map_err(|error| error.to_string())?; let mut entries = ctx .db .profile_invite_code() .iter() .filter(|row| is_admin_profile_invite_code_user_id(&row.user_id)) .map(|row| build_profile_invite_code_snapshot_from_row(&row)) .collect::>(); entries.sort_by(|left, right| { right .updated_at_micros .cmp(&left.updated_at_micros) .then_with(|| left.invite_code.cmp(&right.invite_code)) }); Ok(entries) } fn upsert_profile_task_config_record( ctx: &ReducerContext, input: RuntimeProfileTaskConfigAdminUpsertInput, ) -> Result { let validated_input = build_runtime_profile_task_config_admin_upsert_input( input.admin_user_id, input.task_id, input.title, input.description, input.event_key, input.cycle, input.scope_kind, input.threshold, input.reward_points, input.enabled, input.sort_order, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); let existing = ctx .db .profile_task_config() .task_id() .find(&validated_input.task_id); if let Some(row) = existing.as_ref() { ctx.db.profile_task_config().task_id().delete(&row.task_id); } let inserted = ctx.db.profile_task_config().insert(ProfileTaskConfig { task_id: validated_input.task_id, title: validated_input.title, description: validated_input.description, event_key: validated_input.event_key, cycle: validated_input.cycle, scope_kind: validated_input.scope_kind, threshold: validated_input.threshold, reward_points: validated_input.reward_points, enabled: validated_input.enabled, sort_order: validated_input.sort_order, created_by: existing .as_ref() .map(|row| row.created_by.clone()) .unwrap_or_else(|| validated_input.admin_user_id.clone()), created_at: existing .as_ref() .map(|row| row.created_at) .unwrap_or(updated_at), updated_by: validated_input.admin_user_id, updated_at, }); Ok(build_profile_task_config_snapshot_from_row(&inserted)) } fn disable_profile_task_config_record( ctx: &ReducerContext, input: RuntimeProfileTaskConfigAdminDisableInput, ) -> Result { let validated_input = build_runtime_profile_task_config_admin_disable_input( input.admin_user_id, input.task_id, input.updated_at_micros, ) .map_err(|error| error.to_string())?; let row = ctx .db .profile_task_config() .task_id() .find(&validated_input.task_id) .ok_or_else(|| RuntimeProfileFieldError::MissingTaskId.to_string())?; let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros); ctx.db.profile_task_config().task_id().delete(&row.task_id); let inserted = ctx.db.profile_task_config().insert(ProfileTaskConfig { enabled: false, updated_by: validated_input.admin_user_id, updated_at, ..row }); Ok(build_profile_task_config_snapshot_from_row(&inserted)) } fn build_profile_task_center_snapshot( ctx: &ReducerContext, user_id: &str, updated_at: Timestamp, ) -> Result { ensure_default_profile_task_config(ctx); let day_key = runtime_profile_beijing_day_key(updated_at.to_micros_since_unix_epoch()); let mut configs = ctx.db.profile_task_config().iter().collect::>(); configs.sort_by(|left, right| { left.sort_order .cmp(&right.sort_order) .then_with(|| left.task_id.cmp(&right.task_id)) }); let mut tasks = Vec::with_capacity(configs.len()); for config in configs { validate_profile_task_user_scope(&config)?; let progress_count = profile_task_progress_count(ctx, user_id, &config)?; refresh_profile_task_progress(ctx, user_id, &config, day_key)?; let claim = ctx.db.profile_task_reward_claim().claim_id().find( &build_runtime_profile_task_claim_id(user_id, &config.task_id, day_key), ); tasks.push(RuntimeProfileTaskItemSnapshot { task_id: config.task_id, title: config.title, description: config.description, event_key: config.event_key, cycle: config.cycle, threshold: config.threshold, progress_count, reward_points: config.reward_points, status: resolve_runtime_profile_task_status( config.enabled, progress_count, config.threshold, claim.is_some(), ), day_key, claimed_at_micros: claim.map(|row| row.claimed_at.to_micros_since_unix_epoch()), updated_at_micros: updated_at.to_micros_since_unix_epoch(), }); } Ok(RuntimeProfileTaskCenterSnapshot { user_id: user_id.to_string(), day_key, wallet_balance: profile_wallet_balance(ctx, user_id), tasks, updated_at_micros: updated_at.to_micros_since_unix_epoch(), }) } fn query_analytics_metric_buckets( ctx: &ReducerContext, input: AnalyticsMetricQueryInput, ) -> Result, String> { let validated_input = build_analytics_metric_query_input( input.event_key, input.scope_kind, input.scope_id, input.granularity, ) .map_err(|error| error.to_string())?; let stats = ctx .db .tracking_daily_stat() .iter() .filter(|row| { row.event_key.trim() == validated_input.event_key && row.scope_kind == validated_input.scope_kind && row.scope_id.trim() == validated_input.scope_id }) .map(|row| RuntimeAnalyticsDailyStatSnapshot { event_key: row.event_key, scope_kind: row.scope_kind, scope_id: row.scope_id, day_key: row.day_key, count: row.count, }) .collect::>(); Ok(aggregate_runtime_tracking_daily_stats( stats, &validated_input.event_key, validated_input.scope_kind, &validated_input.scope_id, validated_input.granularity, )) } fn refresh_profile_task_progress( ctx: &ReducerContext, user_id: &str, config: &ProfileTaskConfig, day_key: i64, ) -> Result { let progress_id = build_runtime_profile_task_progress_id(user_id, &config.task_id, day_key); if let Some(existing) = ctx .db .profile_task_progress() .progress_id() .find(&progress_id) { ctx.db .profile_task_progress() .progress_id() .delete(&existing.progress_id); } let progress_count = profile_task_progress_count(ctx, user_id, config)?; let claimed = ctx .db .profile_task_reward_claim() .claim_id() .find(&build_runtime_profile_task_claim_id( user_id, &config.task_id, day_key, )) .is_some(); Ok(ctx.db.profile_task_progress().insert(ProfileTaskProgress { progress_id, user_id: user_id.to_string(), task_id: config.task_id.clone(), day_key, progress_count, threshold: config.threshold, status: resolve_runtime_profile_task_status( config.enabled, progress_count, config.threshold, claimed, ), updated_at: ctx.timestamp, })) } fn profile_task_progress_count( ctx: &ReducerContext, user_id: &str, config: &ProfileTaskConfig, ) -> Result { validate_profile_task_user_scope(config)?; let day_key = runtime_profile_beijing_day_key(ctx.timestamp.to_micros_since_unix_epoch()); let scope_id = profile_task_tracking_scope_id(user_id, config)?; Ok(ctx .db .tracking_daily_stat() .stat_id() .find(&build_runtime_tracking_daily_stat_id( &config.event_key, config.scope_kind, &scope_id, day_key, )) .map(|row| row.count) .unwrap_or(0)) } fn profile_task_tracking_scope_id( user_id: &str, config: &ProfileTaskConfig, ) -> Result { validate_profile_task_user_scope(config)?; Ok(user_id.to_string()) } fn validate_profile_task_user_scope(config: &ProfileTaskConfig) -> Result<(), String> { if config.scope_kind == RuntimeTrackingScopeKind::User { Ok(()) } else { Err(format!( "个人任务 scope_kind 首版仅支持 user,当前 task_id={} scope_kind={}", config.task_id, config.scope_kind.as_str() )) } } fn is_daily_login_task_config(config: &ProfileTaskConfig) -> bool { config.task_id == PROFILE_TASK_ID_DAILY_LOGIN && config.event_key == PROFILE_TASK_EVENT_KEY_DAILY_LOGIN && config.scope_kind == RuntimeTrackingScopeKind::User } fn record_daily_login_tracking_event(ctx: &ReducerContext, user_id: &str) -> Result<(), String> { let day_key = runtime_profile_beijing_day_key(ctx.timestamp.to_micros_since_unix_epoch()); let event_id = format!( "{}:{}:{}", PROFILE_TASK_LOGIN_EVENT_ID_PREFIX, user_id.trim(), day_key ); if ctx.db.tracking_event().event_id().find(&event_id).is_some() { return Ok(()); } record_tracking_event( ctx, RuntimeTrackingEventInput { event_id, event_key: PROFILE_TASK_EVENT_KEY_DAILY_LOGIN.to_string(), scope_kind: RuntimeTrackingScopeKind::User, scope_id: user_id.to_string(), user_id: Some(user_id.to_string()), owner_user_id: None, profile_id: None, module_key: Some(PROFILE_TRACKING_PROFILE_MODULE_KEY.to_string()), metadata_json: PROFILE_INVITE_CODE_METADATA_DEFAULT_JSON.to_string(), occurred_at_micros: ctx.timestamp.to_micros_since_unix_epoch(), }, ) } fn record_tracking_event( ctx: &ReducerContext, input: RuntimeTrackingEventInput, ) -> Result<(), String> { let validated_input = build_runtime_tracking_event_input( input.event_id, input.event_key, input.scope_kind, input.scope_id, input.user_id, input.owner_user_id, input.profile_id, input.module_key, input.metadata_json, input.occurred_at_micros, ) .map_err(|error| error.to_string())?; let occurred_at = Timestamp::from_micros_since_unix_epoch(validated_input.occurred_at_micros); let day_key = runtime_profile_beijing_day_key(validated_input.occurred_at_micros); ctx.db.tracking_event().insert(TrackingEvent { event_id: validated_input.event_id, event_key: validated_input.event_key.clone(), scope_kind: validated_input.scope_kind, scope_id: validated_input.scope_id.clone(), day_key, user_id: validated_input.user_id, owner_user_id: validated_input.owner_user_id, profile_id: validated_input.profile_id, module_key: validated_input.module_key, metadata_json: validated_input.metadata_json, occurred_at, }); upsert_tracking_daily_stat( ctx, &validated_input.event_key, validated_input.scope_kind, &validated_input.scope_id, day_key, occurred_at, ); Ok(()) } fn upsert_tracking_daily_stat( ctx: &ReducerContext, event_key: &str, scope_kind: RuntimeTrackingScopeKind, scope_id: &str, day_key: i64, occurred_at: Timestamp, ) { let stat_id = build_runtime_tracking_daily_stat_id(event_key, scope_kind, scope_id, day_key); let existing = ctx.db.tracking_daily_stat().stat_id().find(&stat_id); if let Some(row) = existing { ctx.db.tracking_daily_stat().stat_id().delete(&row.stat_id); ctx.db.tracking_daily_stat().insert(TrackingDailyStat { stat_id, event_key: row.event_key, scope_kind: row.scope_kind, scope_id: row.scope_id, day_key: row.day_key, count: row.count.saturating_add(1), first_occurred_at: row.first_occurred_at, last_occurred_at: occurred_at, updated_at: occurred_at, }); } else { ctx.db.tracking_daily_stat().insert(TrackingDailyStat { stat_id, event_key: event_key.to_string(), scope_kind, scope_id: scope_id.to_string(), day_key, count: 1, first_occurred_at: occurred_at, last_occurred_at: occurred_at, updated_at: occurred_at, }); } } fn ensure_default_profile_task_config(ctx: &ReducerContext) -> ProfileTaskConfig { if let Some(row) = ctx .db .profile_task_config() .task_id() .find(&PROFILE_TASK_ID_DAILY_LOGIN.to_string()) { return row; } let default_config = build_default_runtime_profile_task_config( ctx.timestamp.to_micros_since_unix_epoch(), PROFILE_TASK_SYSTEM_USER_ID.to_string(), ); ctx.db.profile_task_config().insert(ProfileTaskConfig { task_id: default_config.task_id, title: default_config.title, description: default_config.description, event_key: default_config.event_key, cycle: default_config.cycle, scope_kind: default_config.scope_kind, threshold: default_config.threshold, reward_points: default_config.reward_points, enabled: default_config.enabled, sort_order: default_config.sort_order, created_by: default_config.created_by, created_at: ctx.timestamp, updated_by: default_config.updated_by, updated_at: ctx.timestamp, }) } fn build_profile_membership_snapshot( ctx: &ReducerContext, user_id: &str, ) -> RuntimeProfileMembershipSnapshot { let now_micros = ctx.timestamp.to_micros_since_unix_epoch(); let membership = ctx .db .profile_membership() .user_id() .find(&user_id.to_string()); match membership { Some(row) if row.expires_at.to_micros_since_unix_epoch() > now_micros => { RuntimeProfileMembershipSnapshot { user_id: row.user_id, status: row.status, tier: row.tier, started_at_micros: Some(row.started_at.to_micros_since_unix_epoch()), expires_at_micros: Some(row.expires_at.to_micros_since_unix_epoch()), updated_at_micros: Some(row.updated_at.to_micros_since_unix_epoch()), } } Some(row) => RuntimeProfileMembershipSnapshot { user_id: row.user_id, status: RuntimeProfileMembershipStatus::Normal, tier: RuntimeProfileMembershipTier::Normal, started_at_micros: Some(row.started_at.to_micros_since_unix_epoch()), expires_at_micros: Some(row.expires_at.to_micros_since_unix_epoch()), updated_at_micros: Some(row.updated_at.to_micros_since_unix_epoch()), }, None => RuntimeProfileMembershipSnapshot { user_id: user_id.to_string(), status: RuntimeProfileMembershipStatus::Normal, tier: RuntimeProfileMembershipTier::Normal, started_at_micros: None, expires_at_micros: None, updated_at_micros: None, }, } } fn apply_profile_membership_purchase( ctx: &ReducerContext, user_id: &str, tier: RuntimeProfileMembershipTier, duration_days: u32, purchased_at: Timestamp, ) -> Timestamp { let current = ctx .db .profile_membership() .user_id() .find(&user_id.to_string()); let purchased_at_micros = purchased_at.to_micros_since_unix_epoch(); let purchase_update = resolve_runtime_profile_membership_purchase_update( current .as_ref() .map(|row| row.started_at.to_micros_since_unix_epoch()), current .as_ref() .map(|row| row.expires_at.to_micros_since_unix_epoch()), purchased_at_micros, duration_days, ); let expires_at = Timestamp::from_micros_since_unix_epoch(purchase_update.expires_at_micros); let created_at = Timestamp::from_micros_since_unix_epoch(purchase_update.started_at_micros); let current = current.map(|row| row.user_id); if let Some(existing_user_id) = current { ctx.db .profile_membership() .user_id() .delete(&existing_user_id); } ctx.db.profile_membership().insert(ProfileMembership { user_id: user_id.to_string(), status: RuntimeProfileMembershipStatus::Active, tier, started_at: created_at, expires_at, updated_at: purchased_at, }); expires_at } fn apply_profile_wallet_delta( ctx: &ReducerContext, user_id: &str, amount_delta: u64, source_type: RuntimeProfileWalletLedgerSourceType, ledger_id: &str, created_at: Timestamp, ) -> Result { let amount_delta = convert_runtime_profile_wallet_unsigned_delta(amount_delta) .map_err(|error| error.to_string())?; apply_profile_wallet_signed_delta( ctx, user_id, amount_delta, source_type, ledger_id, created_at, false, ) } pub(crate) fn grant_profile_wallet_points( ctx: &ReducerContext, user_id: &str, amount_delta: u64, source_type: RuntimeProfileWalletLedgerSourceType, ledger_id: &str, created_at: Timestamp, ) -> Result { apply_profile_wallet_delta( ctx, user_id, amount_delta, source_type, ledger_id, created_at, ) } fn apply_profile_wallet_adjustment( ctx: &ReducerContext, input: RuntimeProfileWalletAdjustmentInput, source_type: RuntimeProfileWalletLedgerSourceType, consume: bool, ) -> Result { let validated_input = build_runtime_profile_wallet_adjustment_input( input.user_id, input.amount, input.ledger_id, input.created_at_micros, ) .map_err(|error| error.to_string())?; let created_at = Timestamp::from_micros_since_unix_epoch(validated_input.created_at_micros); let unsigned_delta = convert_runtime_profile_wallet_unsigned_delta(validated_input.amount) .map_err(|error| error.to_string())?; let amount_delta = if consume { -unsigned_delta } else { unsigned_delta }; apply_profile_wallet_signed_delta( ctx, &validated_input.user_id, amount_delta, source_type, &validated_input.ledger_id, created_at, true, )?; get_profile_dashboard_snapshot( ctx, RuntimeProfileDashboardGetInput { user_id: validated_input.user_id, }, ) } fn apply_profile_wallet_signed_delta( ctx: &ReducerContext, user_id: &str, amount_delta: i64, source_type: RuntimeProfileWalletLedgerSourceType, ledger_id: &str, created_at: Timestamp, idempotent: bool, ) -> Result { if idempotent && ctx .db .profile_wallet_ledger() .wallet_ledger_id() .find(&ledger_id.to_string()) .is_some() { return Ok(profile_wallet_balance(ctx, user_id)); } let current = ctx .db .profile_dashboard_state() .user_id() .find(&user_id.to_string()); let previous_balance = current.as_ref().map(|row| row.wallet_balance).unwrap_or(0); let next_balance = calculate_runtime_profile_wallet_balance(previous_balance, amount_delta) .map_err(|error| error.to_string())?; let created_state_at = current .as_ref() .map(|row| row.created_at) .unwrap_or(created_at); if let Some(existing) = current { ctx.db .profile_dashboard_state() .user_id() .delete(&existing.user_id); ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: next_balance, total_play_time_ms: existing.total_play_time_ms, created_at: existing.created_at, updated_at: created_at, }); } else { ctx.db .profile_dashboard_state() .insert(ProfileDashboardState { user_id: user_id.to_string(), wallet_balance: next_balance, total_play_time_ms: 0, created_at: created_state_at, updated_at: created_at, }); } ctx.db.profile_wallet_ledger().insert(ProfileWalletLedger { wallet_ledger_id: ledger_id.to_string(), user_id: user_id.to_string(), amount_delta, balance_after: next_balance, source_type, created_at, }); Ok(next_balance) } fn has_profile_points_recharged(ctx: &ReducerContext, user_id: &str) -> bool { ctx.db.profile_wallet_ledger().iter().any(|row| { row.user_id == user_id && row.source_type == RuntimeProfileWalletLedgerSourceType::PointsRecharge }) } fn has_profile_business_wallet_ledger(ctx: &ReducerContext, user_id: &str) -> bool { ctx.db.profile_wallet_ledger().iter().any(|row| { row.user_id == user_id && row.source_type != RuntimeProfileWalletLedgerSourceType::SnapshotSync }) } fn latest_profile_recharge_order( ctx: &ReducerContext, user_id: &str, ) -> Option { let mut orders = ctx .db .profile_recharge_order() .iter() .filter(|row| row.user_id == user_id) .collect::>(); orders.sort_by(|left, right| { right .created_at .to_micros_since_unix_epoch() .cmp(&left.created_at.to_micros_since_unix_epoch()) .then_with(|| left.order_id.cmp(&right.order_id)) }); orders.into_iter().next() } fn count_profile_redeem_code_user_usage(ctx: &ReducerContext, code: &str, user_id: &str) -> u32 { ctx.db .profile_redeem_code_usage() .by_profile_redeem_code_usage_code_user_id() .filter((code, user_id)) .count() as u32 } fn resolve_profile_redeem_code_allowed_user_ids( ctx: &ReducerContext, input: &RuntimeProfileRedeemCodeAdminUpsertInput, ) -> Result, String> { if input.mode != RuntimeProfileRedeemCodeMode::Private { return Ok(Vec::new()); } let mut allowed_user_ids = input.allowed_user_ids.clone(); for public_user_code in &input.allowed_public_user_codes { if let Some(account) = ctx .db .user_account() .by_user_account_public_code() .filter(public_user_code) .next() { allowed_user_ids.push(account.user_id); } } allowed_user_ids.sort(); allowed_user_ids.dedup(); if allowed_user_ids.is_empty() { return Err("私有兑换码必须指定可兑换用户".to_string()); } Ok(allowed_user_ids) } fn build_profile_redeem_code_snapshot_from_row( row: &ProfileRedeemCode, ) -> RuntimeProfileRedeemCodeSnapshot { RuntimeProfileRedeemCodeSnapshot { code: row.code.clone(), mode: row.mode, reward_points: row.reward_points, max_uses: row.max_uses, global_used_count: row.global_used_count, enabled: row.enabled, allowed_user_ids: row.allowed_user_ids.clone(), created_by: row.created_by.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn build_profile_invite_code_snapshot_from_row( row: &ProfileInviteCode, ) -> RuntimeProfileInviteCodeSnapshot { RuntimeProfileInviteCodeSnapshot { user_id: row.user_id.clone(), invite_code: row.invite_code.clone(), metadata_json: row.metadata_json.clone(), starts_at_micros: row .starts_at .map(|value| value.to_micros_since_unix_epoch()), expires_at_micros: row .expires_at .map(|value| value.to_micros_since_unix_epoch()), created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn build_profile_wallet_ledger_snapshot_from_row( row: &ProfileWalletLedger, ) -> RuntimeProfileWalletLedgerEntrySnapshot { RuntimeProfileWalletLedgerEntrySnapshot { wallet_ledger_id: row.wallet_ledger_id.clone(), user_id: row.user_id.clone(), amount_delta: row.amount_delta, balance_after: row.balance_after, source_type: row.source_type, created_at_micros: row.created_at.to_micros_since_unix_epoch(), } } fn build_profile_task_config_snapshot_from_row( row: &ProfileTaskConfig, ) -> RuntimeProfileTaskConfigSnapshot { RuntimeProfileTaskConfigSnapshot { task_id: row.task_id.clone(), title: row.title.clone(), description: row.description.clone(), event_key: row.event_key.clone(), cycle: row.cycle, scope_kind: row.scope_kind, threshold: row.threshold, reward_points: row.reward_points, enabled: row.enabled, sort_order: row.sort_order, created_by: row.created_by.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_by: row.updated_by.clone(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn build_profile_recharge_order_snapshot_from_row( row: &ProfileRechargeOrder, ) -> RuntimeProfileRechargeOrderSnapshot { RuntimeProfileRechargeOrderSnapshot { order_id: row.order_id.clone(), user_id: row.user_id.clone(), product_id: row.product_id.clone(), product_title: row.product_title.clone(), kind: row.kind, amount_cents: row.amount_cents, status: row.status, payment_channel: row.payment_channel.clone(), paid_at_micros: row.paid_at.to_micros_since_unix_epoch(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), points_delta: row.points_delta, membership_expires_at_micros: row .membership_expires_at .map(|value| value.to_micros_since_unix_epoch()), } } fn build_profile_feedback_submission_snapshot_from_row( row: &ProfileFeedbackSubmission, ) -> RuntimeProfileFeedbackSubmissionSnapshot { RuntimeProfileFeedbackSubmissionSnapshot { feedback_id: row.feedback_id.clone(), user_id: row.user_id.clone(), description: row.description.clone(), contact_phone: row.contact_phone.clone(), evidence_json: row.evidence_json.clone(), status: row.status, created_at_micros: row.created_at.to_micros_since_unix_epoch(), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn build_profile_played_world_snapshot_from_row( row: &ProfilePlayedWorld, ) -> RuntimeProfilePlayedWorldSnapshot { RuntimeProfilePlayedWorldSnapshot { played_world_id: row.played_world_id.clone(), user_id: row.user_id.clone(), world_key: row.world_key.clone(), owner_user_id: row.owner_user_id.clone(), profile_id: row.profile_id.clone(), world_type: row.world_type.clone(), world_title: row.world_title.clone(), world_subtitle: row.world_subtitle.clone(), first_played_at_micros: row.first_played_at.to_micros_since_unix_epoch(), last_played_at_micros: row.last_played_at.to_micros_since_unix_epoch(), last_observed_play_time_ms: row.last_observed_play_time_ms, } }