Add skill for gameplay entry type workflows

This commit is contained in:
2026-05-04 02:32:38 +08:00
parent 49aad7311c
commit 34aecdddf1
77 changed files with 5997 additions and 110 deletions

View File

@@ -161,6 +161,11 @@ macro_rules! migration_tables {
user_browse_history,
profile_dashboard_state,
profile_wallet_ledger,
tracking_event,
tracking_daily_stat,
profile_task_config,
profile_task_progress,
profile_task_reward_claim,
profile_redeem_code,
profile_redeem_code_usage,
profile_invite_code,

View File

@@ -4,6 +4,10 @@ const PUBLIC_WORK_PLAY_DAY_MICROS: i64 = 86_400_000_000;
const PUBLIC_WORK_RECENT_PLAY_WINDOW_DAYS: i64 = 7;
const PROFILE_REFERRAL_INVITED_USERS_LIMIT: usize = 20;
const PROFILE_NEW_USER_REGISTRATION_LEDGER_PREFIX: &str = "new-user-registration";
const PROFILE_TASK_SYSTEM_USER_ID: &str = "system:profile-task";
const PROFILE_TASK_LOGIN_EVENT_ID_PREFIX: &str = "daily-login";
const PROFILE_TRACKING_SITE_SCOPE_ID: &str = "site";
const PROFILE_TRACKING_PROFILE_MODULE_KEY: &str = "profile";
#[spacetimedb::table(accessor = profile_dashboard_state)]
pub struct ProfileDashboardState {
@@ -33,6 +37,115 @@ pub struct ProfileWalletLedger {
pub(crate) created_at: Timestamp,
}
#[spacetimedb::table(
accessor = tracking_event,
index(accessor = by_tracking_event_event_key, btree(columns = [event_key])),
index(
accessor = by_tracking_event_scope,
btree(columns = [scope_kind, scope_id])
),
index(
accessor = by_tracking_event_user,
btree(columns = [user_id, occurred_at])
)
)]
pub struct TrackingEvent {
#[primary_key]
pub(crate) event_id: String,
pub(crate) event_key: String,
pub(crate) scope_kind: RuntimeTrackingScopeKind,
pub(crate) scope_id: String,
pub(crate) day_key: i64,
pub(crate) user_id: Option<String>,
pub(crate) owner_user_id: Option<String>,
pub(crate) profile_id: Option<String>,
pub(crate) module_key: Option<String>,
pub(crate) metadata_json: String,
pub(crate) occurred_at: Timestamp,
}
#[spacetimedb::table(
accessor = tracking_daily_stat,
index(
accessor = by_tracking_daily_stat_event_day,
btree(columns = [event_key, day_key])
),
index(
accessor = by_tracking_daily_stat_scope_day,
btree(columns = [scope_kind, scope_id, day_key])
)
)]
pub struct TrackingDailyStat {
#[primary_key]
pub(crate) stat_id: String,
pub(crate) event_key: String,
pub(crate) scope_kind: RuntimeTrackingScopeKind,
pub(crate) scope_id: String,
pub(crate) day_key: i64,
pub(crate) count: u32,
pub(crate) first_occurred_at: Timestamp,
pub(crate) last_occurred_at: Timestamp,
pub(crate) updated_at: Timestamp,
}
#[spacetimedb::table(accessor = profile_task_config)]
pub struct ProfileTaskConfig {
#[primary_key]
pub(crate) task_id: String,
pub(crate) title: String,
pub(crate) description: String,
pub(crate) event_key: String,
pub(crate) cycle: RuntimeProfileTaskCycle,
pub(crate) scope_kind: RuntimeTrackingScopeKind,
pub(crate) threshold: u32,
pub(crate) reward_points: u64,
pub(crate) enabled: bool,
pub(crate) sort_order: i32,
pub(crate) created_by: String,
pub(crate) created_at: Timestamp,
pub(crate) updated_by: String,
pub(crate) updated_at: Timestamp,
}
#[spacetimedb::table(
accessor = profile_task_progress,
index(accessor = by_profile_task_progress_user, btree(columns = [user_id])),
index(
accessor = by_profile_task_progress_user_task,
btree(columns = [user_id, task_id])
)
)]
pub struct ProfileTaskProgress {
#[primary_key]
pub(crate) progress_id: String,
pub(crate) user_id: String,
pub(crate) task_id: String,
pub(crate) day_key: i64,
pub(crate) progress_count: u32,
pub(crate) threshold: u32,
pub(crate) status: RuntimeProfileTaskStatus,
pub(crate) updated_at: Timestamp,
}
#[spacetimedb::table(
accessor = profile_task_reward_claim,
index(accessor = by_profile_task_claim_user, btree(columns = [user_id])),
index(
accessor = by_profile_task_claim_user_task,
btree(columns = [user_id, task_id])
)
)]
pub struct ProfileTaskRewardClaim {
#[primary_key]
pub(crate) claim_id: String,
pub(crate) user_id: String,
pub(crate) task_id: String,
pub(crate) day_key: i64,
pub(crate) reward_points: u64,
pub(crate) wallet_ledger_id: String,
pub(crate) claimed_at: Timestamp,
}
#[spacetimedb::table(accessor = profile_redeem_code)]
pub struct ProfileRedeemCode {
#[primary_key]
@@ -355,6 +468,103 @@ pub fn list_profile_wallet_ledger(
}
}
// 任务中心读取会顺手记录当日登录埋点,确保“每日登录”只依赖后端事实。
#[spacetimedb::procedure]
pub fn get_profile_task_center(
ctx: &mut ProcedureContext,
input: RuntimeProfileTaskCenterGetInput,
) -> RuntimeProfileTaskCenterProcedureResult {
match ctx.try_with_tx(|tx| get_profile_task_center_snapshot(tx, input.clone(), true)) {
Ok(record) => RuntimeProfileTaskCenterProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeProfileTaskCenterProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// 领奖记录与光点流水在同一事务内写入,避免任务状态和钱包余额漂移。
#[spacetimedb::procedure]
pub fn claim_profile_task_reward_and_return(
ctx: &mut ProcedureContext,
input: RuntimeProfileTaskClaimInput,
) -> RuntimeProfileTaskClaimProcedureResult {
match ctx.try_with_tx(|tx| claim_profile_task_reward_record(tx, input.clone())) {
Ok(record) => RuntimeProfileTaskClaimProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeProfileTaskClaimProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn admin_list_profile_task_configs(
ctx: &mut ProcedureContext,
input: RuntimeProfileTaskConfigAdminListInput,
) -> RuntimeProfileTaskConfigAdminListProcedureResult {
match ctx.try_with_tx(|tx| list_profile_task_config_snapshots(tx, input.clone())) {
Ok(entries) => RuntimeProfileTaskConfigAdminListProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeProfileTaskConfigAdminListProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn admin_upsert_profile_task_config(
ctx: &mut ProcedureContext,
input: RuntimeProfileTaskConfigAdminUpsertInput,
) -> RuntimeProfileTaskConfigAdminProcedureResult {
match ctx.try_with_tx(|tx| upsert_profile_task_config_record(tx, input.clone())) {
Ok(record) => RuntimeProfileTaskConfigAdminProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeProfileTaskConfigAdminProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn admin_disable_profile_task_config(
ctx: &mut ProcedureContext,
input: RuntimeProfileTaskConfigAdminDisableInput,
) -> RuntimeProfileTaskConfigAdminProcedureResult {
match ctx.try_with_tx(|tx| disable_profile_task_config_record(tx, input.clone())) {
Ok(record) => RuntimeProfileTaskConfigAdminProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeProfileTaskConfigAdminProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// 新用户注册赠送由后端注册链路调用;流水 ID 固定,保证重试不重复发放。
#[spacetimedb::procedure]
pub fn grant_new_user_registration_wallet_reward(
@@ -591,6 +801,25 @@ pub fn admin_disable_profile_redeem_code(
}
}
#[spacetimedb::procedure]
pub fn admin_list_profile_redeem_codes(
ctx: &mut ProcedureContext,
input: RuntimeProfileRedeemCodeAdminListInput,
) -> RuntimeProfileRedeemCodeAdminListProcedureResult {
match ctx.try_with_tx(|tx| admin_list_profile_redeem_code_records(tx, input.clone())) {
Ok(entries) => RuntimeProfileRedeemCodeAdminListProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeProfileRedeemCodeAdminListProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn admin_upsert_profile_invite_code(
ctx: &mut ProcedureContext,
@@ -610,6 +839,25 @@ pub fn admin_upsert_profile_invite_code(
}
}
#[spacetimedb::procedure]
pub fn admin_list_profile_invite_codes(
ctx: &mut ProcedureContext,
input: RuntimeProfileInviteCodeAdminListInput,
) -> RuntimeProfileInviteCodeAdminListProcedureResult {
match ctx.try_with_tx(|tx| admin_list_profile_invite_code_records(tx, input.clone())) {
Ok(entries) => RuntimeProfileInviteCodeAdminListProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeProfileInviteCodeAdminListProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
pub(crate) fn list_profile_save_archive_rows(
ctx: &ReducerContext,
input: RuntimeProfileSaveArchiveListInput,
@@ -2136,6 +2384,533 @@ fn build_profile_recharge_center_snapshot(
}
}
fn get_profile_task_center_snapshot(
ctx: &ReducerContext,
input: RuntimeProfileTaskCenterGetInput,
record_login_event: bool,
) -> Result<RuntimeProfileTaskCenterSnapshot, String> {
let validated_input = build_runtime_profile_task_center_get_input(input.user_id)
.map_err(|error| error.to_string())?;
ensure_default_profile_task_config(ctx);
if record_login_event {
record_daily_login_tracking_event(ctx, &validated_input.user_id)?;
}
Ok(build_profile_task_center_snapshot(
ctx,
&validated_input.user_id,
ctx.timestamp,
))
}
fn claim_profile_task_reward_record(
ctx: &ReducerContext,
input: RuntimeProfileTaskClaimInput,
) -> Result<RuntimeProfileTaskClaimSnapshot, String> {
let validated_input = build_runtime_profile_task_claim_input(input.user_id, input.task_id)
.map_err(|error| error.to_string())?;
ensure_default_profile_task_config(ctx);
let config = ctx
.db
.profile_task_config()
.task_id()
.find(&validated_input.task_id)
.ok_or_else(|| RuntimeProfileFieldError::MissingTaskId.to_string())?;
if !config.enabled {
return Err(RuntimeProfileFieldError::TaskDisabled.to_string());
}
if is_daily_login_task_config(&config) {
record_daily_login_tracking_event(ctx, &validated_input.user_id)?;
}
let day_key = runtime_profile_beijing_day_key(ctx.timestamp.to_micros_since_unix_epoch());
let claim_id =
build_runtime_profile_task_claim_id(&validated_input.user_id, &config.task_id, day_key);
if ctx
.db
.profile_task_reward_claim()
.claim_id()
.find(&claim_id)
.is_some()
{
return Err(RuntimeProfileFieldError::TaskAlreadyClaimed.to_string());
}
let progress_count = profile_task_progress_count(ctx, &validated_input.user_id, &config);
if progress_count < config.threshold {
return Err(RuntimeProfileFieldError::TaskNotClaimable.to_string());
}
let ledger_id = build_runtime_profile_task_reward_ledger_id(
&validated_input.user_id,
&config.task_id,
day_key,
);
let wallet_balance = grant_profile_wallet_points(
ctx,
&validated_input.user_id,
config.reward_points,
RuntimeProfileWalletLedgerSourceType::DailyTaskReward,
&ledger_id,
ctx.timestamp,
)?;
let claim = ctx
.db
.profile_task_reward_claim()
.insert(ProfileTaskRewardClaim {
claim_id: claim_id.clone(),
user_id: validated_input.user_id.clone(),
task_id: config.task_id.clone(),
day_key,
reward_points: config.reward_points,
wallet_ledger_id: ledger_id.clone(),
claimed_at: ctx.timestamp,
});
refresh_profile_task_progress(ctx, &validated_input.user_id, &config, day_key);
let ledger_entry = ctx
.db
.profile_wallet_ledger()
.wallet_ledger_id()
.find(&ledger_id)
.ok_or_else(|| "任务奖励钱包流水写入失败".to_string())?;
Ok(RuntimeProfileTaskClaimSnapshot {
user_id: validated_input.user_id.clone(),
task_id: config.task_id.clone(),
day_key,
reward_points: claim.reward_points,
wallet_balance,
ledger_entry: build_profile_wallet_ledger_snapshot_from_row(&ledger_entry),
center: build_profile_task_center_snapshot(ctx, &validated_input.user_id, ctx.timestamp),
})
}
fn list_profile_task_config_snapshots(
ctx: &ReducerContext,
input: RuntimeProfileTaskConfigAdminListInput,
) -> Result<Vec<RuntimeProfileTaskConfigSnapshot>, String> {
let _validated_input = build_runtime_profile_task_config_admin_list_input(input.admin_user_id)
.map_err(|error| error.to_string())?;
ensure_default_profile_task_config(ctx);
let mut entries = ctx
.db
.profile_task_config()
.iter()
.map(|row| build_profile_task_config_snapshot_from_row(&row))
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
left.sort_order
.cmp(&right.sort_order)
.then_with(|| left.task_id.cmp(&right.task_id))
});
Ok(entries)
}
fn admin_list_profile_redeem_code_records(
ctx: &ReducerContext,
input: RuntimeProfileRedeemCodeAdminListInput,
) -> Result<Vec<RuntimeProfileRedeemCodeSnapshot>, String> {
let _validated_input = build_runtime_profile_redeem_code_admin_list_input(input.admin_user_id)
.map_err(|error| error.to_string())?;
let mut entries = ctx
.db
.profile_redeem_code()
.iter()
.map(|row| build_profile_redeem_code_snapshot_from_row(&row))
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
right
.updated_at_micros
.cmp(&left.updated_at_micros)
.then_with(|| left.code.cmp(&right.code))
});
Ok(entries)
}
fn admin_list_profile_invite_code_records(
ctx: &ReducerContext,
input: RuntimeProfileInviteCodeAdminListInput,
) -> Result<Vec<RuntimeProfileInviteCodeSnapshot>, String> {
let _validated_input = build_runtime_profile_invite_code_admin_list_input(input.admin_user_id)
.map_err(|error| error.to_string())?;
let mut entries = ctx
.db
.profile_invite_code()
.iter()
.filter(|row| is_admin_profile_invite_code_user_id(&row.user_id))
.map(|row| build_profile_invite_code_snapshot_from_row(&row))
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
right
.updated_at_micros
.cmp(&left.updated_at_micros)
.then_with(|| left.invite_code.cmp(&right.invite_code))
});
Ok(entries)
}
fn upsert_profile_task_config_record(
ctx: &ReducerContext,
input: RuntimeProfileTaskConfigAdminUpsertInput,
) -> Result<RuntimeProfileTaskConfigSnapshot, String> {
let validated_input = build_runtime_profile_task_config_admin_upsert_input(
input.admin_user_id,
input.task_id,
input.title,
input.description,
input.event_key,
input.cycle,
input.scope_kind,
input.threshold,
input.reward_points,
input.enabled,
input.sort_order,
input.updated_at_micros,
)
.map_err(|error| error.to_string())?;
let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros);
let existing = ctx
.db
.profile_task_config()
.task_id()
.find(&validated_input.task_id);
if let Some(row) = existing.as_ref() {
ctx.db.profile_task_config().task_id().delete(&row.task_id);
}
let inserted = ctx.db.profile_task_config().insert(ProfileTaskConfig {
task_id: validated_input.task_id,
title: validated_input.title,
description: validated_input.description,
event_key: validated_input.event_key,
cycle: validated_input.cycle,
scope_kind: validated_input.scope_kind,
threshold: validated_input.threshold,
reward_points: validated_input.reward_points,
enabled: validated_input.enabled,
sort_order: validated_input.sort_order,
created_by: existing
.as_ref()
.map(|row| row.created_by.clone())
.unwrap_or_else(|| validated_input.admin_user_id.clone()),
created_at: existing
.as_ref()
.map(|row| row.created_at)
.unwrap_or(updated_at),
updated_by: validated_input.admin_user_id,
updated_at,
});
Ok(build_profile_task_config_snapshot_from_row(&inserted))
}
fn disable_profile_task_config_record(
ctx: &ReducerContext,
input: RuntimeProfileTaskConfigAdminDisableInput,
) -> Result<RuntimeProfileTaskConfigSnapshot, String> {
let validated_input = build_runtime_profile_task_config_admin_disable_input(
input.admin_user_id,
input.task_id,
input.updated_at_micros,
)
.map_err(|error| error.to_string())?;
let row = ctx
.db
.profile_task_config()
.task_id()
.find(&validated_input.task_id)
.ok_or_else(|| RuntimeProfileFieldError::MissingTaskId.to_string())?;
let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros);
ctx.db.profile_task_config().task_id().delete(&row.task_id);
let inserted = ctx.db.profile_task_config().insert(ProfileTaskConfig {
enabled: false,
updated_by: validated_input.admin_user_id,
updated_at,
..row
});
Ok(build_profile_task_config_snapshot_from_row(&inserted))
}
fn build_profile_task_center_snapshot(
ctx: &ReducerContext,
user_id: &str,
updated_at: Timestamp,
) -> RuntimeProfileTaskCenterSnapshot {
ensure_default_profile_task_config(ctx);
let day_key = runtime_profile_beijing_day_key(updated_at.to_micros_since_unix_epoch());
let mut configs = ctx.db.profile_task_config().iter().collect::<Vec<_>>();
configs.sort_by(|left, right| {
left.sort_order
.cmp(&right.sort_order)
.then_with(|| left.task_id.cmp(&right.task_id))
});
let tasks = configs
.into_iter()
.map(|config| {
let progress_count = profile_task_progress_count(ctx, user_id, &config);
refresh_profile_task_progress(ctx, user_id, &config, day_key);
let claim = ctx.db.profile_task_reward_claim().claim_id().find(
&build_runtime_profile_task_claim_id(user_id, &config.task_id, day_key),
);
RuntimeProfileTaskItemSnapshot {
task_id: config.task_id,
title: config.title,
description: config.description,
event_key: config.event_key,
cycle: config.cycle,
threshold: config.threshold,
progress_count,
reward_points: config.reward_points,
status: resolve_runtime_profile_task_status(
config.enabled,
progress_count,
config.threshold,
claim.is_some(),
),
day_key,
claimed_at_micros: claim.map(|row| row.claimed_at.to_micros_since_unix_epoch()),
updated_at_micros: updated_at.to_micros_since_unix_epoch(),
}
})
.collect();
RuntimeProfileTaskCenterSnapshot {
user_id: user_id.to_string(),
day_key,
wallet_balance: profile_wallet_balance(ctx, user_id),
tasks,
updated_at_micros: updated_at.to_micros_since_unix_epoch(),
}
}
fn refresh_profile_task_progress(
ctx: &ReducerContext,
user_id: &str,
config: &ProfileTaskConfig,
day_key: i64,
) -> ProfileTaskProgress {
let progress_id = build_runtime_profile_task_progress_id(user_id, &config.task_id, day_key);
if let Some(existing) = ctx
.db
.profile_task_progress()
.progress_id()
.find(&progress_id)
{
ctx.db
.profile_task_progress()
.progress_id()
.delete(&existing.progress_id);
}
let progress_count = profile_task_progress_count(ctx, user_id, config);
let claimed = ctx
.db
.profile_task_reward_claim()
.claim_id()
.find(&build_runtime_profile_task_claim_id(
user_id,
&config.task_id,
day_key,
))
.is_some();
ctx.db.profile_task_progress().insert(ProfileTaskProgress {
progress_id,
user_id: user_id.to_string(),
task_id: config.task_id.clone(),
day_key,
progress_count,
threshold: config.threshold,
status: resolve_runtime_profile_task_status(
config.enabled,
progress_count,
config.threshold,
claimed,
),
updated_at: ctx.timestamp,
})
}
fn profile_task_progress_count(
ctx: &ReducerContext,
user_id: &str,
config: &ProfileTaskConfig,
) -> u32 {
let day_key = runtime_profile_beijing_day_key(ctx.timestamp.to_micros_since_unix_epoch());
let scope_id = profile_task_tracking_scope_id(user_id, config);
ctx.db
.tracking_daily_stat()
.stat_id()
.find(&build_runtime_tracking_daily_stat_id(
&config.event_key,
config.scope_kind,
&scope_id,
day_key,
))
.map(|row| row.count)
.unwrap_or(0)
}
fn profile_task_tracking_scope_id(user_id: &str, config: &ProfileTaskConfig) -> String {
match config.scope_kind {
RuntimeTrackingScopeKind::Site => PROFILE_TRACKING_SITE_SCOPE_ID.to_string(),
RuntimeTrackingScopeKind::Module => PROFILE_TRACKING_PROFILE_MODULE_KEY.to_string(),
RuntimeTrackingScopeKind::User => user_id.to_string(),
RuntimeTrackingScopeKind::Work => user_id.to_string(),
}
}
fn is_daily_login_task_config(config: &ProfileTaskConfig) -> bool {
config.task_id == PROFILE_TASK_ID_DAILY_LOGIN
&& config.event_key == PROFILE_TASK_EVENT_KEY_DAILY_LOGIN
&& config.scope_kind == RuntimeTrackingScopeKind::User
}
fn record_daily_login_tracking_event(ctx: &ReducerContext, user_id: &str) -> Result<(), String> {
let day_key = runtime_profile_beijing_day_key(ctx.timestamp.to_micros_since_unix_epoch());
let event_id = format!(
"{}:{}:{}",
PROFILE_TASK_LOGIN_EVENT_ID_PREFIX,
user_id.trim(),
day_key
);
if ctx.db.tracking_event().event_id().find(&event_id).is_some() {
return Ok(());
}
record_tracking_event(
ctx,
RuntimeTrackingEventInput {
event_id,
event_key: PROFILE_TASK_EVENT_KEY_DAILY_LOGIN.to_string(),
scope_kind: RuntimeTrackingScopeKind::User,
scope_id: user_id.to_string(),
user_id: Some(user_id.to_string()),
owner_user_id: None,
profile_id: None,
module_key: Some(PROFILE_TRACKING_PROFILE_MODULE_KEY.to_string()),
metadata_json: PROFILE_INVITE_CODE_METADATA_DEFAULT_JSON.to_string(),
occurred_at_micros: ctx.timestamp.to_micros_since_unix_epoch(),
},
)
}
fn record_tracking_event(
ctx: &ReducerContext,
input: RuntimeTrackingEventInput,
) -> Result<(), String> {
let validated_input = build_runtime_tracking_event_input(
input.event_id,
input.event_key,
input.scope_kind,
input.scope_id,
input.user_id,
input.owner_user_id,
input.profile_id,
input.module_key,
input.metadata_json,
input.occurred_at_micros,
)
.map_err(|error| error.to_string())?;
let occurred_at = Timestamp::from_micros_since_unix_epoch(validated_input.occurred_at_micros);
let day_key = runtime_profile_beijing_day_key(validated_input.occurred_at_micros);
ctx.db.tracking_event().insert(TrackingEvent {
event_id: validated_input.event_id,
event_key: validated_input.event_key.clone(),
scope_kind: validated_input.scope_kind,
scope_id: validated_input.scope_id.clone(),
day_key,
user_id: validated_input.user_id,
owner_user_id: validated_input.owner_user_id,
profile_id: validated_input.profile_id,
module_key: validated_input.module_key,
metadata_json: validated_input.metadata_json,
occurred_at,
});
upsert_tracking_daily_stat(
ctx,
&validated_input.event_key,
validated_input.scope_kind,
&validated_input.scope_id,
day_key,
occurred_at,
);
Ok(())
}
fn upsert_tracking_daily_stat(
ctx: &ReducerContext,
event_key: &str,
scope_kind: RuntimeTrackingScopeKind,
scope_id: &str,
day_key: i64,
occurred_at: Timestamp,
) {
let stat_id = build_runtime_tracking_daily_stat_id(event_key, scope_kind, scope_id, day_key);
let existing = ctx.db.tracking_daily_stat().stat_id().find(&stat_id);
if let Some(row) = existing {
ctx.db.tracking_daily_stat().stat_id().delete(&row.stat_id);
ctx.db.tracking_daily_stat().insert(TrackingDailyStat {
stat_id,
event_key: row.event_key,
scope_kind: row.scope_kind,
scope_id: row.scope_id,
day_key: row.day_key,
count: row.count.saturating_add(1),
first_occurred_at: row.first_occurred_at,
last_occurred_at: occurred_at,
updated_at: occurred_at,
});
} else {
ctx.db.tracking_daily_stat().insert(TrackingDailyStat {
stat_id,
event_key: event_key.to_string(),
scope_kind,
scope_id: scope_id.to_string(),
day_key,
count: 1,
first_occurred_at: occurred_at,
last_occurred_at: occurred_at,
updated_at: occurred_at,
});
}
}
fn ensure_default_profile_task_config(ctx: &ReducerContext) -> ProfileTaskConfig {
if let Some(row) = ctx
.db
.profile_task_config()
.task_id()
.find(&PROFILE_TASK_ID_DAILY_LOGIN.to_string())
{
return row;
}
let default_config = build_default_runtime_profile_task_config(
ctx.timestamp.to_micros_since_unix_epoch(),
PROFILE_TASK_SYSTEM_USER_ID.to_string(),
);
ctx.db.profile_task_config().insert(ProfileTaskConfig {
task_id: default_config.task_id,
title: default_config.title,
description: default_config.description,
event_key: default_config.event_key,
cycle: default_config.cycle,
scope_kind: default_config.scope_kind,
threshold: default_config.threshold,
reward_points: default_config.reward_points,
enabled: default_config.enabled,
sort_order: default_config.sort_order,
created_by: default_config.created_by,
created_at: ctx.timestamp,
updated_by: default_config.updated_by,
updated_at: ctx.timestamp,
})
}
fn build_profile_membership_snapshot(
ctx: &ReducerContext,
user_id: &str,
@@ -2485,6 +3260,27 @@ fn build_profile_wallet_ledger_snapshot_from_row(
}
}
fn build_profile_task_config_snapshot_from_row(
row: &ProfileTaskConfig,
) -> RuntimeProfileTaskConfigSnapshot {
RuntimeProfileTaskConfigSnapshot {
task_id: row.task_id.clone(),
title: row.title.clone(),
description: row.description.clone(),
event_key: row.event_key.clone(),
cycle: row.cycle,
scope_kind: row.scope_kind,
threshold: row.threshold,
reward_points: row.reward_points,
enabled: row.enabled,
sort_order: row.sort_order,
created_by: row.created_by.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
updated_by: row.updated_by.clone(),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
fn build_profile_recharge_order_snapshot_from_row(
row: &ProfileRechargeOrder,
) -> RuntimeProfileRechargeOrderSnapshot {