拆分大文件

This commit is contained in:
2026-04-23 23:38:00 +08:00
parent 53a9cdd791
commit 8df502b2a7
506 changed files with 11312 additions and 13069 deletions

View File

@@ -55,6 +55,61 @@
- `cancel_ai_task_and_return`
18. `turn_in_quest``resolve_combat_action(Victory)``player_progression / chapter_progression` 的最小经验联动
### 2.0.1 `runtime` 域拆分进度
截至 `2026-04-23``runtime` 域已完成第一轮真实内容拆分,根入口不再保留该域的业务 helper 实现。
当前 `src/runtime/` 的实际落位如下:
1. `src/runtime/settings.rs`
- `runtime_setting`
- setting 读取 / upsert procedure 与快照 helper
2. `src/runtime/snapshots.rs`
- `runtime_snapshot`
- snapshot 读取 / upsert / delete helper
3. `src/runtime/browse_history.rs`
- `user_browse_history`
- 浏览历史 list / upsert / clear procedure 与行转换 helper
4. `src/runtime/profile.rs`
- `profile_dashboard_state`
- `profile_wallet_ledger`
- `profile_played_world`
- `profile_save_archive`
- profile dashboard / ledger / play stats / save archive 投影与同步 helper
`src/runtime/mod.rs` 当前只承担聚合职责:
1. 声明 `settings / snapshots / browse_history / profile`
2. 对外统一使用 `pub use xxx::*;` 重新导出
后续新增 runtime 相关 table / reducer / procedure / helper 时,必须直接落到上述二级文件,禁止回写到 `src/lib.rs`
### 2.0.2 `ai` 域拆分进度
截至 `2026-04-23``ai` 域也已完成第一轮真实内容拆分,根入口不再保留 `ai_task / ai_task_stage / ai_text_chunk / ai_result_reference` 的业务实现。
当前 `src/ai/` 的实际落位如下:
1. `src/ai/tasks.rs`
- `ai_task`
- task 创建、启动、完成、失败、取消的 reducer / procedure
- task 状态迁移与持久化 helper
2. `src/ai/stages.rs`
- `ai_task_stage`
- `ai_text_chunk`
- `ai_result_reference`
- stage 启动、chunk 追加、stage 完成、result reference 绑定的 procedure / helper
3. `src/ai/snapshots.rs`
- AI 任务、阶段、chunk、reference 的 row / snapshot 转换 helper
`src/ai/mod.rs` 当前只承担聚合职责:
1. 声明 `tasks / stages / snapshots`
2. 对外统一使用 `pub use xxx::*;`
3. 对内部共享的 row / snapshot helper 使用 `pub(crate) use snapshots::*;`
后续新增 AI 相关 table / reducer / procedure / helper 时,必须直接落到上述二级文件,禁止回写到 `src/lib.rs`
## 2.1 `src/lib.rs` 拆分路由规则
`2026-04-23` 起,`src/lib.rs` 不再允许继续承载具体业务域的 table / reducer / procedure / tx helper。

View File

@@ -1,753 +1,7 @@
#[spacetimedb::table(
accessor = ai_task,
index(accessor = by_ai_task_owner_user_id, btree(columns = [owner_user_id])),
index(accessor = by_ai_task_status, btree(columns = [status])),
index(accessor = by_ai_task_kind, btree(columns = [task_kind]))
)]
pub struct AiTask {
#[primary_key]
task_id: String,
task_kind: AiTaskKind,
owner_user_id: String,
request_label: String,
source_module: String,
source_entity_id: Option<String>,
request_payload_json: Option<String>,
status: AiTaskStatus,
failure_message: Option<String>,
latest_text_output: Option<String>,
latest_structured_payload_json: Option<String>,
version: u32,
created_at: Timestamp,
started_at: Option<Timestamp>,
completed_at: Option<Timestamp>,
updated_at: Timestamp,
}
mod snapshots;
mod stages;
mod tasks;
#[spacetimedb::table(
accessor = ai_task_stage,
index(accessor = by_ai_task_stage_task_id, btree(columns = [task_id])),
index(accessor = by_ai_task_stage_task_order, btree(columns = [task_id, stage_order]))
)]
pub struct AiTaskStage {
#[primary_key]
task_stage_id: String,
task_id: String,
stage_kind: AiTaskStageKind,
label: String,
detail: String,
stage_order: u32,
status: AiTaskStageStatus,
text_output: Option<String>,
structured_payload_json: Option<String>,
warning_messages: Vec<String>,
started_at: Option<Timestamp>,
completed_at: Option<Timestamp>,
}
#[spacetimedb::table(
accessor = ai_text_chunk,
index(accessor = by_ai_text_chunk_task_id, btree(columns = [task_id])),
index(
accessor = by_ai_text_chunk_task_stage_sequence,
btree(columns = [task_id, stage_kind, sequence])
)
)]
pub struct AiTextChunk {
#[primary_key]
text_chunk_row_id: String,
chunk_id: String,
task_id: String,
stage_kind: AiTaskStageKind,
sequence: u32,
delta_text: String,
created_at: Timestamp,
}
#[spacetimedb::table(
accessor = ai_result_reference,
index(accessor = by_ai_result_reference_task_id, btree(columns = [task_id]))
)]
pub struct AiResultReference {
#[primary_key]
result_reference_row_id: String,
result_ref_id: String,
task_id: String,
reference_kind: AiResultReferenceKind,
reference_id: String,
label: Option<String>,
created_at: Timestamp,
}
// AI 任务当前先固定成 private 真相表,后续由 Axum / platform-llm 再往外包一层 HTTP 与 SSE 协议。
#[spacetimedb::reducer]
pub fn create_ai_task(ctx: &ReducerContext, input: AiTaskCreateInput) -> Result<(), String> {
create_ai_task_tx(ctx, input).map(|_| ())
}
#[spacetimedb::procedure]
pub fn create_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskCreateInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| create_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::reducer]
pub fn start_ai_task(ctx: &ReducerContext, input: AiTaskStartInput) -> Result<(), String> {
start_ai_task_tx(ctx, input).map(|_| ())
}
#[spacetimedb::reducer]
pub fn start_ai_task_stage(
ctx: &ReducerContext,
input: AiTaskStageStartInput,
) -> Result<(), String> {
start_ai_task_stage_tx(ctx, input).map(|_| ())
}
// 流式增量写入需要同步返回 chunk 与聚合后的任务快照,方便后续 Axum facade 直接复用。
#[spacetimedb::procedure]
pub fn append_ai_text_chunk_and_return(
ctx: &mut ProcedureContext,
input: AiTextChunkAppendInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| append_ai_text_chunk_tx(tx, input.clone())) {
Ok((task, text_chunk)) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: Some(text_chunk),
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn complete_ai_stage_and_return(
ctx: &mut ProcedureContext,
input: AiStageCompletionInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| complete_ai_stage_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn attach_ai_result_reference_and_return(
ctx: &mut ProcedureContext,
input: AiResultReferenceInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| attach_ai_result_reference_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn complete_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskFinishInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| complete_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn fail_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskFailureInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| fail_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn cancel_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskCancelInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| cancel_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
fn create_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskCreateInput,
) -> Result<AiTaskSnapshot, String> {
validate_task_create_input(&input).map_err(|error| error.to_string())?;
if ctx.db.ai_task().task_id().find(&input.task_id).is_some() {
return Err("ai_task.task_id 已存在".to_string());
}
let task_snapshot = build_ai_task_snapshot_from_create_input(&input);
ctx.db.ai_task().insert(build_ai_task_row(&task_snapshot));
replace_ai_task_stages(ctx, &task_snapshot.task_id, &task_snapshot.stages);
get_ai_task_snapshot_tx(ctx, &task_snapshot.task_id)
}
fn start_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskStartInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.started_at_micros);
}
snapshot.updated_at_micros = input.started_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn start_ai_task_stage_tx(
ctx: &ReducerContext,
input: AiTaskStageStartInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.started_at_micros);
}
stage.status = AiTaskStageStatus::Running;
if stage.started_at_micros.is_none() {
stage.started_at_micros = Some(input.started_at_micros);
}
snapshot.updated_at_micros = input.started_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn append_ai_text_chunk_tx(
ctx: &ReducerContext,
input: AiTextChunkAppendInput,
) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), String> {
if input.delta_text.trim().is_empty() {
return Err("ai_text_chunk.delta_text 不能为空".to_string());
}
if input.sequence == 0 {
return Err("ai_text_chunk.sequence 必须大于 0".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
let chunk = AiTextChunkSnapshot {
chunk_id: generate_ai_text_chunk_id(input.created_at_micros, input.sequence),
task_id: input.task_id.trim().to_string(),
stage_kind: input.stage_kind,
sequence: input.sequence,
delta_text: input.delta_text.trim().to_string(),
created_at_micros: input.created_at_micros,
};
ctx.db
.ai_text_chunk()
.insert(build_ai_text_chunk_row(&chunk));
let aggregated_text = collect_ai_stage_text_output(ctx, &chunk.task_id, chunk.stage_kind);
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.created_at_micros);
}
stage.status = AiTaskStageStatus::Running;
if stage.started_at_micros.is_none() {
stage.started_at_micros = Some(input.created_at_micros);
}
stage.text_output = aggregated_text.clone();
snapshot.latest_text_output = aggregated_text;
snapshot.updated_at_micros = input.created_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok((snapshot, chunk))
}
fn complete_ai_stage_tx(
ctx: &ReducerContext,
input: AiStageCompletionInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
stage.status = AiTaskStageStatus::Completed;
stage.completed_at_micros = Some(input.completed_at_micros);
stage.text_output = normalize_optional_text(input.text_output.clone());
stage.structured_payload_json = normalize_optional_text(input.structured_payload_json.clone());
stage.warning_messages = normalize_string_list(input.warning_messages.clone());
snapshot.latest_text_output = stage.text_output.clone();
snapshot.latest_structured_payload_json = stage.structured_payload_json.clone();
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn attach_ai_result_reference_tx(
ctx: &ReducerContext,
input: AiResultReferenceInput,
) -> Result<AiTaskSnapshot, String> {
let reference_id = input.reference_id.trim().to_string();
if reference_id.is_empty() {
return Err("ai_result_reference.reference_id 不能为空".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let reference = AiResultReferenceSnapshot {
result_ref_id: generate_ai_result_ref_id(input.created_at_micros),
task_id: input.task_id.trim().to_string(),
reference_kind: input.reference_kind,
reference_id,
label: normalize_optional_text(input.label),
created_at_micros: input.created_at_micros,
};
ctx.db
.ai_result_reference()
.insert(build_ai_result_reference_row(&reference));
snapshot.result_references.push(reference);
snapshot.updated_at_micros = input.created_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn complete_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskFinishInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Completed;
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn fail_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskFailureInput,
) -> Result<AiTaskSnapshot, String> {
let failure_message = input.failure_message.trim().to_string();
if failure_message.is_empty() {
return Err("ai_task.failure_message 不能为空".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Failed;
snapshot.failure_message = Some(failure_message);
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn cancel_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskCancelInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Cancelled;
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn get_ai_task_snapshot_tx(ctx: &ReducerContext, task_id: &str) -> Result<AiTaskSnapshot, String> {
let row = ctx
.db
.ai_task()
.task_id()
.find(&task_id.trim().to_string())
.ok_or_else(|| "ai_task 不存在".to_string())?;
Ok(build_ai_task_snapshot_from_row(ctx, &row))
}
fn persist_ai_task_snapshot(ctx: &ReducerContext, snapshot: &AiTaskSnapshot) -> Result<(), String> {
ctx.db.ai_task().task_id().delete(&snapshot.task_id);
ctx.db.ai_task().insert(build_ai_task_row(snapshot));
replace_ai_task_stages(ctx, &snapshot.task_id, &snapshot.stages);
Ok(())
}
fn replace_ai_task_stages(ctx: &ReducerContext, task_id: &str, stages: &[AiTaskStageSnapshot]) {
let stage_ids = ctx
.db
.ai_task_stage()
.iter()
.filter(|row| row.task_id == task_id)
.map(|row| row.task_stage_id.clone())
.collect::<Vec<_>>();
for stage_id in stage_ids {
ctx.db.ai_task_stage().task_stage_id().delete(&stage_id);
}
for stage in stages {
ctx.db
.ai_task_stage()
.insert(build_ai_task_stage_row(task_id, stage));
}
}
fn collect_ai_stage_text_output(
ctx: &ReducerContext,
task_id: &str,
stage_kind: AiTaskStageKind,
) -> Option<String> {
let mut chunks = ctx
.db
.ai_text_chunk()
.iter()
.filter(|row| row.task_id == task_id && row.stage_kind == stage_kind)
.map(|row| build_ai_text_chunk_snapshot_from_row(&row))
.collect::<Vec<_>>();
chunks.sort_by_key(|chunk| chunk.sequence);
let aggregated = chunks
.into_iter()
.map(|chunk| chunk.delta_text)
.collect::<Vec<_>>()
.join("");
if aggregated.trim().is_empty() {
None
} else {
Some(aggregated)
}
}
fn ensure_ai_task_can_transition(status: AiTaskStatus) -> Result<(), String> {
if matches!(
status,
AiTaskStatus::Completed | AiTaskStatus::Failed | AiTaskStatus::Cancelled
) {
Err("当前 ai_task 状态不允许执行该操作".to_string())
} else {
Ok(())
}
}
fn build_ai_task_snapshot_from_create_input(input: &AiTaskCreateInput) -> AiTaskSnapshot {
AiTaskSnapshot {
task_id: input.task_id.trim().to_string(),
task_kind: input.task_kind,
owner_user_id: input.owner_user_id.trim().to_string(),
request_label: input.request_label.trim().to_string(),
source_module: input.source_module.trim().to_string(),
source_entity_id: normalize_optional_text(input.source_entity_id.clone()),
request_payload_json: normalize_optional_text(input.request_payload_json.clone()),
status: AiTaskStatus::Pending,
failure_message: None,
stages: input
.stages
.iter()
.map(|stage| AiTaskStageSnapshot {
stage_kind: stage.stage_kind,
label: stage.label.trim().to_string(),
detail: stage.detail.trim().to_string(),
order: stage.order,
status: AiTaskStageStatus::Pending,
text_output: None,
structured_payload_json: None,
warning_messages: Vec::new(),
started_at_micros: None,
completed_at_micros: None,
})
.collect(),
result_references: Vec::new(),
latest_text_output: None,
latest_structured_payload_json: None,
version: INITIAL_AI_TASK_VERSION,
created_at_micros: input.created_at_micros,
started_at_micros: None,
completed_at_micros: None,
updated_at_micros: input.created_at_micros,
}
}
fn build_ai_task_row(snapshot: &AiTaskSnapshot) -> AiTask {
AiTask {
task_id: snapshot.task_id.clone(),
task_kind: snapshot.task_kind,
owner_user_id: snapshot.owner_user_id.clone(),
request_label: snapshot.request_label.clone(),
source_module: snapshot.source_module.clone(),
source_entity_id: snapshot.source_entity_id.clone(),
request_payload_json: snapshot.request_payload_json.clone(),
status: snapshot.status,
failure_message: snapshot.failure_message.clone(),
latest_text_output: snapshot.latest_text_output.clone(),
latest_structured_payload_json: snapshot.latest_structured_payload_json.clone(),
version: snapshot.version,
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
started_at: snapshot
.started_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
completed_at: snapshot
.completed_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros),
}
}
fn build_ai_task_snapshot_from_row(ctx: &ReducerContext, row: &AiTask) -> AiTaskSnapshot {
let mut stages = ctx
.db
.ai_task_stage()
.iter()
.filter(|stage| stage.task_id == row.task_id)
.map(|stage| build_ai_task_stage_snapshot_from_row(&stage))
.collect::<Vec<_>>();
stages.sort_by_key(|stage| stage.order);
let mut result_references = ctx
.db
.ai_result_reference()
.iter()
.filter(|reference| reference.task_id == row.task_id)
.map(|reference| build_ai_result_reference_snapshot_from_row(&reference))
.collect::<Vec<_>>();
result_references.sort_by_key(|reference| reference.created_at_micros);
AiTaskSnapshot {
task_id: row.task_id.clone(),
task_kind: row.task_kind,
owner_user_id: row.owner_user_id.clone(),
request_label: row.request_label.clone(),
source_module: row.source_module.clone(),
source_entity_id: row.source_entity_id.clone(),
request_payload_json: row.request_payload_json.clone(),
status: row.status,
failure_message: row.failure_message.clone(),
stages,
result_references,
latest_text_output: row.latest_text_output.clone(),
latest_structured_payload_json: row.latest_structured_payload_json.clone(),
version: row.version,
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
started_at_micros: row
.started_at
.map(|value| value.to_micros_since_unix_epoch()),
completed_at_micros: row
.completed_at
.map(|value| value.to_micros_since_unix_epoch()),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
fn build_ai_task_stage_row(task_id: &str, snapshot: &AiTaskStageSnapshot) -> AiTaskStage {
AiTaskStage {
task_stage_id: generate_ai_task_stage_id(task_id, snapshot.stage_kind),
task_id: task_id.to_string(),
stage_kind: snapshot.stage_kind,
label: snapshot.label.clone(),
detail: snapshot.detail.clone(),
stage_order: snapshot.order,
status: snapshot.status,
text_output: snapshot.text_output.clone(),
structured_payload_json: snapshot.structured_payload_json.clone(),
warning_messages: snapshot.warning_messages.clone(),
started_at: snapshot
.started_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
completed_at: snapshot
.completed_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
}
}
fn build_ai_task_stage_snapshot_from_row(row: &AiTaskStage) -> AiTaskStageSnapshot {
AiTaskStageSnapshot {
stage_kind: row.stage_kind,
label: row.label.clone(),
detail: row.detail.clone(),
order: row.stage_order,
status: row.status,
text_output: row.text_output.clone(),
structured_payload_json: row.structured_payload_json.clone(),
warning_messages: row.warning_messages.clone(),
started_at_micros: row
.started_at
.map(|value| value.to_micros_since_unix_epoch()),
completed_at_micros: row
.completed_at
.map(|value| value.to_micros_since_unix_epoch()),
}
}
fn build_ai_text_chunk_row(snapshot: &AiTextChunkSnapshot) -> AiTextChunk {
AiTextChunk {
text_chunk_row_id: format!(
"{}{}_{}_{}",
AI_TEXT_CHUNK_ID_PREFIX,
snapshot.task_id,
snapshot.stage_kind.as_str(),
snapshot.sequence
),
chunk_id: snapshot.chunk_id.clone(),
task_id: snapshot.task_id.clone(),
stage_kind: snapshot.stage_kind,
sequence: snapshot.sequence,
delta_text: snapshot.delta_text.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
}
}
fn build_ai_text_chunk_snapshot_from_row(row: &AiTextChunk) -> AiTextChunkSnapshot {
AiTextChunkSnapshot {
chunk_id: row.chunk_id.clone(),
task_id: row.task_id.clone(),
stage_kind: row.stage_kind,
sequence: row.sequence,
delta_text: row.delta_text.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}
fn build_ai_result_reference_row(snapshot: &AiResultReferenceSnapshot) -> AiResultReference {
AiResultReference {
result_reference_row_id: format!(
"{}{}_{}",
AI_RESULT_REF_ID_PREFIX, snapshot.task_id, snapshot.result_ref_id
),
result_ref_id: snapshot.result_ref_id.clone(),
task_id: snapshot.task_id.clone(),
reference_kind: snapshot.reference_kind,
reference_id: snapshot.reference_id.clone(),
label: snapshot.label.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
}
}
fn build_ai_result_reference_snapshot_from_row(
row: &AiResultReference,
) -> AiResultReferenceSnapshot {
AiResultReferenceSnapshot {
result_ref_id: row.result_ref_id.clone(),
task_id: row.task_id.clone(),
reference_kind: row.reference_kind,
reference_id: row.reference_id.clone(),
label: row.label.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}
pub(crate) use snapshots::*;
pub use stages::*;
pub use tasks::*;

View File

@@ -1 +1,174 @@
// AI snapshot / row 转换 helper 落位点。
use crate::*;
use module_ai::{AI_RESULT_REF_ID_PREFIX, AI_TEXT_CHUNK_ID_PREFIX};
pub(crate) fn build_ai_task_row(snapshot: &AiTaskSnapshot) -> AiTask {
AiTask {
task_id: snapshot.task_id.clone(),
task_kind: snapshot.task_kind,
owner_user_id: snapshot.owner_user_id.clone(),
request_label: snapshot.request_label.clone(),
source_module: snapshot.source_module.clone(),
source_entity_id: snapshot.source_entity_id.clone(),
request_payload_json: snapshot.request_payload_json.clone(),
status: snapshot.status,
failure_message: snapshot.failure_message.clone(),
latest_text_output: snapshot.latest_text_output.clone(),
latest_structured_payload_json: snapshot.latest_structured_payload_json.clone(),
version: snapshot.version,
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
started_at: snapshot
.started_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
completed_at: snapshot
.completed_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros),
}
}
pub(crate) fn build_ai_task_snapshot_from_row(
ctx: &ReducerContext,
row: &AiTask,
) -> AiTaskSnapshot {
let mut stages = ctx
.db
.ai_task_stage()
.iter()
.filter(|stage| stage.task_id == row.task_id)
.map(|stage| build_ai_task_stage_snapshot_from_row(&stage))
.collect::<Vec<_>>();
stages.sort_by_key(|stage| stage.order);
let mut result_references = ctx
.db
.ai_result_reference()
.iter()
.filter(|reference| reference.task_id == row.task_id)
.map(|reference| build_ai_result_reference_snapshot_from_row(&reference))
.collect::<Vec<_>>();
result_references.sort_by_key(|reference| reference.created_at_micros);
AiTaskSnapshot {
task_id: row.task_id.clone(),
task_kind: row.task_kind,
owner_user_id: row.owner_user_id.clone(),
request_label: row.request_label.clone(),
source_module: row.source_module.clone(),
source_entity_id: row.source_entity_id.clone(),
request_payload_json: row.request_payload_json.clone(),
status: row.status,
failure_message: row.failure_message.clone(),
stages,
result_references,
latest_text_output: row.latest_text_output.clone(),
latest_structured_payload_json: row.latest_structured_payload_json.clone(),
version: row.version,
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
started_at_micros: row
.started_at
.map(|value| value.to_micros_since_unix_epoch()),
completed_at_micros: row
.completed_at
.map(|value| value.to_micros_since_unix_epoch()),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
pub(crate) fn build_ai_task_stage_row(task_id: &str, snapshot: &AiTaskStageSnapshot) -> AiTaskStage {
AiTaskStage {
task_stage_id: generate_ai_task_stage_id(task_id, snapshot.stage_kind),
task_id: task_id.to_string(),
stage_kind: snapshot.stage_kind,
label: snapshot.label.clone(),
detail: snapshot.detail.clone(),
stage_order: snapshot.order,
status: snapshot.status,
text_output: snapshot.text_output.clone(),
structured_payload_json: snapshot.structured_payload_json.clone(),
warning_messages: snapshot.warning_messages.clone(),
started_at: snapshot
.started_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
completed_at: snapshot
.completed_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
}
}
pub(crate) fn build_ai_task_stage_snapshot_from_row(row: &AiTaskStage) -> AiTaskStageSnapshot {
AiTaskStageSnapshot {
stage_kind: row.stage_kind,
label: row.label.clone(),
detail: row.detail.clone(),
order: row.stage_order,
status: row.status,
text_output: row.text_output.clone(),
structured_payload_json: row.structured_payload_json.clone(),
warning_messages: row.warning_messages.clone(),
started_at_micros: row
.started_at
.map(|value| value.to_micros_since_unix_epoch()),
completed_at_micros: row
.completed_at
.map(|value| value.to_micros_since_unix_epoch()),
}
}
pub(crate) fn build_ai_text_chunk_row(snapshot: &AiTextChunkSnapshot) -> AiTextChunk {
AiTextChunk {
text_chunk_row_id: format!(
"{}{}_{}_{}",
AI_TEXT_CHUNK_ID_PREFIX,
snapshot.task_id,
snapshot.stage_kind.as_str(),
snapshot.sequence
),
chunk_id: snapshot.chunk_id.clone(),
task_id: snapshot.task_id.clone(),
stage_kind: snapshot.stage_kind,
sequence: snapshot.sequence,
delta_text: snapshot.delta_text.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
}
}
pub(crate) fn build_ai_text_chunk_snapshot_from_row(row: &AiTextChunk) -> AiTextChunkSnapshot {
AiTextChunkSnapshot {
chunk_id: row.chunk_id.clone(),
task_id: row.task_id.clone(),
stage_kind: row.stage_kind,
sequence: row.sequence,
delta_text: row.delta_text.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}
pub(crate) fn build_ai_result_reference_row(
snapshot: &AiResultReferenceSnapshot,
) -> AiResultReference {
AiResultReference {
result_reference_row_id: format!(
"{}{}_{}",
AI_RESULT_REF_ID_PREFIX, snapshot.task_id, snapshot.result_ref_id
),
result_ref_id: snapshot.result_ref_id.clone(),
task_id: snapshot.task_id.clone(),
reference_kind: snapshot.reference_kind,
reference_id: snapshot.reference_id.clone(),
label: snapshot.label.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
}
}
pub(crate) fn build_ai_result_reference_snapshot_from_row(
row: &AiResultReference,
) -> AiResultReferenceSnapshot {
AiResultReferenceSnapshot {
result_ref_id: row.result_ref_id.clone(),
task_id: row.task_id.clone(),
reference_kind: row.reference_kind,
reference_id: row.reference_id.clone(),
label: row.label.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}

View File

@@ -1 +1,315 @@
// AI stage、chunk、reference 与阶段级 helper 落位点。
use crate::*;
use module_ai::{generate_ai_result_ref_id, generate_ai_text_chunk_id, normalize_optional_text, normalize_string_list};
#[spacetimedb::table(
accessor = ai_task_stage,
index(accessor = by_ai_task_stage_task_id, btree(columns = [task_id])),
index(accessor = by_ai_task_stage_task_order, btree(columns = [task_id, stage_order]))
)]
pub struct AiTaskStage {
#[primary_key]
pub(crate) task_stage_id: String,
pub(crate) task_id: String,
pub(crate) stage_kind: AiTaskStageKind,
pub(crate) label: String,
pub(crate) detail: String,
pub(crate) stage_order: u32,
pub(crate) status: AiTaskStageStatus,
pub(crate) text_output: Option<String>,
pub(crate) structured_payload_json: Option<String>,
pub(crate) warning_messages: Vec<String>,
pub(crate) started_at: Option<Timestamp>,
pub(crate) completed_at: Option<Timestamp>,
}
#[spacetimedb::table(
accessor = ai_text_chunk,
index(accessor = by_ai_text_chunk_task_id, btree(columns = [task_id])),
index(
accessor = by_ai_text_chunk_task_stage_sequence,
btree(columns = [task_id, stage_kind, sequence])
)
)]
pub struct AiTextChunk {
#[primary_key]
pub(crate) text_chunk_row_id: String,
pub(crate) chunk_id: String,
pub(crate) task_id: String,
pub(crate) stage_kind: AiTaskStageKind,
pub(crate) sequence: u32,
pub(crate) delta_text: String,
pub(crate) created_at: Timestamp,
}
#[spacetimedb::table(
accessor = ai_result_reference,
index(accessor = by_ai_result_reference_task_id, btree(columns = [task_id]))
)]
pub struct AiResultReference {
#[primary_key]
pub(crate) result_reference_row_id: String,
pub(crate) result_ref_id: String,
pub(crate) task_id: String,
pub(crate) reference_kind: AiResultReferenceKind,
pub(crate) reference_id: String,
pub(crate) label: Option<String>,
pub(crate) created_at: Timestamp,
}
#[spacetimedb::reducer]
pub fn start_ai_task_stage(
ctx: &ReducerContext,
input: AiTaskStageStartInput,
) -> Result<(), String> {
start_ai_task_stage_tx(ctx, input).map(|_| ())
}
// 流式增量写入需要同步返回 chunk 与聚合后的任务快照,方便后续 Axum facade 直接复用。
#[spacetimedb::procedure]
pub fn append_ai_text_chunk_and_return(
ctx: &mut ProcedureContext,
input: AiTextChunkAppendInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| append_ai_text_chunk_tx(tx, input.clone())) {
Ok((task, text_chunk)) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: Some(text_chunk),
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn complete_ai_stage_and_return(
ctx: &mut ProcedureContext,
input: AiStageCompletionInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| complete_ai_stage_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn attach_ai_result_reference_and_return(
ctx: &mut ProcedureContext,
input: AiResultReferenceInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| attach_ai_result_reference_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
pub(crate) fn start_ai_task_stage_tx(
ctx: &ReducerContext,
input: AiTaskStageStartInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.started_at_micros);
}
stage.status = AiTaskStageStatus::Running;
if stage.started_at_micros.is_none() {
stage.started_at_micros = Some(input.started_at_micros);
}
snapshot.updated_at_micros = input.started_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
pub(crate) fn append_ai_text_chunk_tx(
ctx: &ReducerContext,
input: AiTextChunkAppendInput,
) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), String> {
if input.delta_text.trim().is_empty() {
return Err("ai_text_chunk.delta_text 不能为空".to_string());
}
if input.sequence == 0 {
return Err("ai_text_chunk.sequence 必须大于 0".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
let chunk = AiTextChunkSnapshot {
chunk_id: generate_ai_text_chunk_id(input.created_at_micros, input.sequence),
task_id: input.task_id.trim().to_string(),
stage_kind: input.stage_kind,
sequence: input.sequence,
delta_text: input.delta_text.trim().to_string(),
created_at_micros: input.created_at_micros,
};
ctx.db.ai_text_chunk().insert(build_ai_text_chunk_row(&chunk));
let aggregated_text = collect_ai_stage_text_output(ctx, &chunk.task_id, chunk.stage_kind);
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.created_at_micros);
}
stage.status = AiTaskStageStatus::Running;
if stage.started_at_micros.is_none() {
stage.started_at_micros = Some(input.created_at_micros);
}
stage.text_output = aggregated_text.clone();
snapshot.latest_text_output = aggregated_text;
snapshot.updated_at_micros = input.created_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok((snapshot, chunk))
}
pub(crate) fn complete_ai_stage_tx(
ctx: &ReducerContext,
input: AiStageCompletionInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
stage.status = AiTaskStageStatus::Completed;
stage.completed_at_micros = Some(input.completed_at_micros);
stage.text_output = normalize_optional_text(input.text_output.clone());
stage.structured_payload_json = normalize_optional_text(input.structured_payload_json.clone());
stage.warning_messages = normalize_string_list(input.warning_messages.clone());
snapshot.latest_text_output = stage.text_output.clone();
snapshot.latest_structured_payload_json = stage.structured_payload_json.clone();
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
pub(crate) fn attach_ai_result_reference_tx(
ctx: &ReducerContext,
input: AiResultReferenceInput,
) -> Result<AiTaskSnapshot, String> {
let reference_id = input.reference_id.trim().to_string();
if reference_id.is_empty() {
return Err("ai_result_reference.reference_id 不能为空".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let reference = AiResultReferenceSnapshot {
result_ref_id: generate_ai_result_ref_id(input.created_at_micros),
task_id: input.task_id.trim().to_string(),
reference_kind: input.reference_kind,
reference_id,
label: normalize_optional_text(input.label),
created_at_micros: input.created_at_micros,
};
ctx.db
.ai_result_reference()
.insert(build_ai_result_reference_row(&reference));
snapshot.result_references.push(reference);
snapshot.updated_at_micros = input.created_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
pub(crate) fn replace_ai_task_stages(
ctx: &ReducerContext,
task_id: &str,
stages: &[AiTaskStageSnapshot],
) {
let stage_ids = ctx
.db
.ai_task_stage()
.iter()
.filter(|row| row.task_id == task_id)
.map(|row| row.task_stage_id.clone())
.collect::<Vec<_>>();
for stage_id in stage_ids {
ctx.db.ai_task_stage().task_stage_id().delete(&stage_id);
}
for stage in stages {
ctx.db
.ai_task_stage()
.insert(build_ai_task_stage_row(task_id, stage));
}
}
pub(crate) fn collect_ai_stage_text_output(
ctx: &ReducerContext,
task_id: &str,
stage_kind: AiTaskStageKind,
) -> Option<String> {
let mut chunks = ctx
.db
.ai_text_chunk()
.iter()
.filter(|row| row.task_id == task_id && row.stage_kind == stage_kind)
.map(|row| build_ai_text_chunk_snapshot_from_row(&row))
.collect::<Vec<_>>();
chunks.sort_by_key(|chunk| chunk.sequence);
let aggregated = chunks
.into_iter()
.map(|chunk| chunk.delta_text)
.collect::<Vec<_>>()
.join("");
if aggregated.trim().is_empty() {
None
} else {
Some(aggregated)
}
}

View File

@@ -1 +1,285 @@
// AI task reducer / procedure 与任务状态迁移落位点。
use crate::*;
use module_ai::{normalize_optional_text, validate_task_create_input, INITIAL_AI_TASK_VERSION};
#[spacetimedb::table(
accessor = ai_task,
index(accessor = by_ai_task_owner_user_id, btree(columns = [owner_user_id])),
index(accessor = by_ai_task_status, btree(columns = [status])),
index(accessor = by_ai_task_kind, btree(columns = [task_kind]))
)]
pub struct AiTask {
#[primary_key]
pub(crate) task_id: String,
pub(crate) task_kind: AiTaskKind,
pub(crate) owner_user_id: String,
pub(crate) request_label: String,
pub(crate) source_module: String,
pub(crate) source_entity_id: Option<String>,
pub(crate) request_payload_json: Option<String>,
pub(crate) status: AiTaskStatus,
pub(crate) failure_message: Option<String>,
pub(crate) latest_text_output: Option<String>,
pub(crate) latest_structured_payload_json: Option<String>,
pub(crate) version: u32,
pub(crate) created_at: Timestamp,
pub(crate) started_at: Option<Timestamp>,
pub(crate) completed_at: Option<Timestamp>,
pub(crate) updated_at: Timestamp,
}
// AI 任务当前先固定成 private 真相表,后续由 Axum / platform-llm 再往外包一层 HTTP 与 SSE 协议。
#[spacetimedb::reducer]
pub fn create_ai_task(ctx: &ReducerContext, input: AiTaskCreateInput) -> Result<(), String> {
create_ai_task_tx(ctx, input).map(|_| ())
}
#[spacetimedb::procedure]
pub fn create_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskCreateInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| create_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::reducer]
pub fn start_ai_task(ctx: &ReducerContext, input: AiTaskStartInput) -> Result<(), String> {
start_ai_task_tx(ctx, input).map(|_| ())
}
#[spacetimedb::procedure]
pub fn complete_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskFinishInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| complete_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn fail_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskFailureInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| fail_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn cancel_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskCancelInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| cancel_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
fn create_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskCreateInput,
) -> Result<AiTaskSnapshot, String> {
validate_task_create_input(&input).map_err(|error| error.to_string())?;
if ctx.db.ai_task().task_id().find(&input.task_id).is_some() {
return Err("ai_task.task_id 已存在".to_string());
}
let task_snapshot = build_ai_task_snapshot_from_create_input(&input);
ctx.db.ai_task().insert(build_ai_task_row(&task_snapshot));
replace_ai_task_stages(ctx, &task_snapshot.task_id, &task_snapshot.stages);
get_ai_task_snapshot_tx(ctx, &task_snapshot.task_id)
}
fn start_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskStartInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.started_at_micros);
}
snapshot.updated_at_micros = input.started_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn complete_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskFinishInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Completed;
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn fail_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskFailureInput,
) -> Result<AiTaskSnapshot, String> {
let failure_message = input.failure_message.trim().to_string();
if failure_message.is_empty() {
return Err("ai_task.failure_message 不能为空".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Failed;
snapshot.failure_message = Some(failure_message);
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn cancel_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskCancelInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Cancelled;
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
pub(crate) fn get_ai_task_snapshot_tx(
ctx: &ReducerContext,
task_id: &str,
) -> Result<AiTaskSnapshot, String> {
let row = ctx
.db
.ai_task()
.task_id()
.find(&task_id.trim().to_string())
.ok_or_else(|| "ai_task 不存在".to_string())?;
Ok(build_ai_task_snapshot_from_row(ctx, &row))
}
pub(crate) fn persist_ai_task_snapshot(
ctx: &ReducerContext,
snapshot: &AiTaskSnapshot,
) -> Result<(), String> {
ctx.db.ai_task().task_id().delete(&snapshot.task_id);
ctx.db.ai_task().insert(build_ai_task_row(snapshot));
replace_ai_task_stages(ctx, &snapshot.task_id, &snapshot.stages);
Ok(())
}
pub(crate) fn ensure_ai_task_can_transition(status: AiTaskStatus) -> Result<(), String> {
if matches!(
status,
AiTaskStatus::Completed | AiTaskStatus::Failed | AiTaskStatus::Cancelled
) {
Err("当前 ai_task 状态不允许执行该操作".to_string())
} else {
Ok(())
}
}
fn build_ai_task_snapshot_from_create_input(input: &AiTaskCreateInput) -> AiTaskSnapshot {
AiTaskSnapshot {
task_id: input.task_id.trim().to_string(),
task_kind: input.task_kind,
owner_user_id: input.owner_user_id.trim().to_string(),
request_label: input.request_label.trim().to_string(),
source_module: input.source_module.trim().to_string(),
source_entity_id: normalize_optional_text(input.source_entity_id.clone()),
request_payload_json: normalize_optional_text(input.request_payload_json.clone()),
status: AiTaskStatus::Pending,
failure_message: None,
stages: input
.stages
.iter()
.map(|stage| AiTaskStageSnapshot {
stage_kind: stage.stage_kind,
label: stage.label.trim().to_string(),
detail: stage.detail.trim().to_string(),
order: stage.order,
status: AiTaskStageStatus::Pending,
text_output: None,
structured_payload_json: None,
warning_messages: Vec::new(),
started_at_micros: None,
completed_at_micros: None,
})
.collect(),
result_references: Vec::new(),
latest_text_output: None,
latest_structured_payload_json: None,
version: INITIAL_AI_TASK_VERSION,
created_at_micros: input.created_at_micros,
started_at_micros: None,
completed_at_micros: None,
updated_at_micros: input.created_at_micros,
}
}

View File

@@ -1,5 +1,5 @@
use crate::*;
use crate::big_fish::tables::{big_fish_asset_slot, big_fish_creation_session};
use crate::*;
#[spacetimedb::procedure]
pub fn generate_big_fish_asset(

View File

@@ -1,5 +1,5 @@
use crate::*;
use crate::big_fish::tables::{big_fish_creation_session, big_fish_runtime_run};
use crate::*;
#[spacetimedb::procedure]
pub fn start_big_fish_run(

View File

@@ -1,5 +1,5 @@
use crate::*;
use crate::big_fish::tables::{big_fish_agent_message, big_fish_creation_session};
use crate::*;
#[spacetimedb::procedure]
pub fn create_big_fish_session(

File diff suppressed because it is too large Load Diff

View File

@@ -1 +1,220 @@
// Browse history 相关表、procedure 与 helper 落位点。
use crate::*;
#[spacetimedb::table(
accessor = user_browse_history,
index(accessor = by_browse_history_user_id, btree(columns = [user_id])),
index(
accessor = by_browse_history_user_owner_profile,
btree(columns = [user_id, owner_user_id, profile_id])
)
)]
pub struct UserBrowseHistory {
#[primary_key]
pub(crate) browse_history_id: String,
pub(crate) user_id: String,
pub(crate) owner_user_id: String,
pub(crate) profile_id: String,
pub(crate) world_name: String,
pub(crate) subtitle: String,
pub(crate) summary_text: String,
pub(crate) cover_image_src: Option<String>,
pub(crate) theme_mode: RuntimeBrowseHistoryThemeMode,
pub(crate) author_display_name: String,
pub(crate) visited_at: Timestamp,
pub(crate) created_at: Timestamp,
pub(crate) updated_at: Timestamp,
}
// procedure 面向 Axum 同步拉取浏览历史,继续沿用旧 Node 的 visitedAt 倒序输出语义。
#[spacetimedb::procedure]
pub fn list_platform_browse_history(
ctx: &mut ProcedureContext,
input: RuntimeBrowseHistoryListInput,
) -> RuntimeBrowseHistoryProcedureResult {
match ctx.try_with_tx(|tx| list_platform_browse_history_rows(tx, input.clone())) {
Ok(entries) => RuntimeBrowseHistoryProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeBrowseHistoryProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
// procedure 面向 Axum 承接 browse history 的单条/批量 POST同步返回当前用户的完整列表。
#[spacetimedb::procedure]
pub fn upsert_platform_browse_history_and_return(
ctx: &mut ProcedureContext,
input: RuntimeBrowseHistorySyncInput,
) -> RuntimeBrowseHistoryProcedureResult {
match ctx.try_with_tx(|tx| upsert_platform_browse_history_rows(tx, input.clone())) {
Ok(entries) => RuntimeBrowseHistoryProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeBrowseHistoryProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
// procedure 面向 Axum 清空当前用户浏览历史,并直接返回空列表响应。
#[spacetimedb::procedure]
pub fn clear_platform_browse_history_and_return(
ctx: &mut ProcedureContext,
input: RuntimeBrowseHistoryClearInput,
) -> RuntimeBrowseHistoryProcedureResult {
match ctx.try_with_tx(|tx| clear_platform_browse_history_rows(tx, input.clone())) {
Ok(entries) => RuntimeBrowseHistoryProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeBrowseHistoryProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
fn list_platform_browse_history_rows(
ctx: &ReducerContext,
input: RuntimeBrowseHistoryListInput,
) -> Result<Vec<RuntimeBrowseHistorySnapshot>, String> {
let validated_input = build_runtime_browse_history_list_input(input.user_id)
.map_err(|error| error.to_string())?;
let mut entries = ctx
.db
.user_browse_history()
.iter()
.filter(|row| row.user_id == validated_input.user_id)
.map(|row| build_runtime_browse_history_snapshot_from_row(&row))
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
right
.visited_at_micros
.cmp(&left.visited_at_micros)
.then_with(|| left.browse_history_id.cmp(&right.browse_history_id))
});
Ok(entries)
}
fn upsert_platform_browse_history_rows(
ctx: &ReducerContext,
input: RuntimeBrowseHistorySyncInput,
) -> Result<Vec<RuntimeBrowseHistorySnapshot>, String> {
let user_id = input.user_id.clone();
let prepared_entries =
prepare_runtime_browse_history_entries(input).map_err(|error| error.to_string())?;
for prepared in prepared_entries {
let existing = ctx
.db
.user_browse_history()
.browse_history_id()
.find(&prepared.browse_history_id);
let created_at = existing
.as_ref()
.map(|row| row.created_at)
.unwrap_or_else(|| Timestamp::from_micros_since_unix_epoch(prepared.updated_at_micros));
if let Some(existing) = existing {
ctx.db
.user_browse_history()
.browse_history_id()
.delete(&existing.browse_history_id);
}
ctx.db.user_browse_history().insert(UserBrowseHistory {
browse_history_id: prepared.browse_history_id,
user_id: prepared.user_id,
owner_user_id: prepared.owner_user_id,
profile_id: prepared.profile_id,
world_name: prepared.world_name,
subtitle: prepared.subtitle,
summary_text: prepared.summary_text,
cover_image_src: prepared.cover_image_src,
theme_mode: prepared.theme_mode,
author_display_name: prepared.author_display_name,
visited_at: Timestamp::from_micros_since_unix_epoch(prepared.visited_at_micros),
created_at,
updated_at: Timestamp::from_micros_since_unix_epoch(prepared.updated_at_micros),
});
}
list_platform_browse_history_rows(ctx, RuntimeBrowseHistoryListInput { user_id })
}
fn clear_platform_browse_history_rows(
ctx: &ReducerContext,
input: RuntimeBrowseHistoryClearInput,
) -> Result<Vec<RuntimeBrowseHistorySnapshot>, String> {
let validated_input = build_runtime_browse_history_clear_input(input.user_id)
.map_err(|error| error.to_string())?;
let row_ids = ctx
.db
.user_browse_history()
.iter()
.filter(|row| row.user_id == validated_input.user_id)
.map(|row| row.browse_history_id.clone())
.collect::<Vec<_>>();
for row_id in row_ids {
ctx.db
.user_browse_history()
.browse_history_id()
.delete(&row_id);
}
Ok(Vec::new())
}
fn build_runtime_browse_history_snapshot_from_row(
row: &UserBrowseHistory,
) -> RuntimeBrowseHistorySnapshot {
RuntimeBrowseHistorySnapshot {
browse_history_id: row.browse_history_id.clone(),
user_id: row.user_id.clone(),
owner_user_id: row.owner_user_id.clone(),
profile_id: row.profile_id.clone(),
world_name: row.world_name.clone(),
subtitle: row.subtitle.clone(),
summary_text: row.summary_text.clone(),
cover_image_src: row.cover_image_src.clone(),
theme_mode: row.theme_mode,
author_display_name: row.author_display_name.clone(),
visited_at_micros: row.visited_at.to_micros_since_unix_epoch(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
#[allow(dead_code)]
fn build_runtime_browse_history_row(snapshot: RuntimeBrowseHistorySnapshot) -> UserBrowseHistory {
UserBrowseHistory {
browse_history_id: snapshot.browse_history_id,
user_id: snapshot.user_id,
owner_user_id: snapshot.owner_user_id,
profile_id: snapshot.profile_id,
world_name: snapshot.world_name,
subtitle: snapshot.subtitle,
summary_text: snapshot.summary_text,
cover_image_src: snapshot.cover_image_src,
theme_mode: snapshot.theme_mode,
author_display_name: snapshot.author_display_name,
visited_at: Timestamp::from_micros_since_unix_epoch(snapshot.visited_at_micros),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1 +1,806 @@
// Profile dashboard、wallet 与 played world 投影落位点。
use crate::*;
#[spacetimedb::table(accessor = profile_dashboard_state)]
pub struct ProfileDashboardState {
#[primary_key]
pub(crate) user_id: String,
pub(crate) wallet_balance: u64,
pub(crate) total_play_time_ms: u64,
pub(crate) created_at: Timestamp,
pub(crate) updated_at: Timestamp,
}
#[spacetimedb::table(
accessor = profile_wallet_ledger,
index(accessor = by_profile_wallet_ledger_user_id, btree(columns = [user_id])),
index(
accessor = by_profile_wallet_ledger_user_created_at,
btree(columns = [user_id, created_at])
)
)]
pub struct ProfileWalletLedger {
#[primary_key]
pub(crate) wallet_ledger_id: String,
pub(crate) user_id: String,
pub(crate) amount_delta: i64,
pub(crate) balance_after: u64,
pub(crate) source_type: RuntimeProfileWalletLedgerSourceType,
pub(crate) created_at: Timestamp,
}
#[spacetimedb::table(
accessor = profile_played_world,
index(accessor = by_profile_played_world_user_id, btree(columns = [user_id])),
index(
accessor = by_profile_played_world_user_world_key,
btree(columns = [user_id, world_key])
),
index(
accessor = by_profile_played_world_user_last_played_at,
btree(columns = [user_id, last_played_at])
)
)]
pub struct ProfilePlayedWorld {
#[primary_key]
pub(crate) played_world_id: String,
pub(crate) user_id: String,
pub(crate) world_key: String,
pub(crate) owner_user_id: Option<String>,
pub(crate) profile_id: Option<String>,
pub(crate) world_type: Option<String>,
pub(crate) world_title: String,
pub(crate) world_subtitle: String,
pub(crate) first_played_at: Timestamp,
pub(crate) last_played_at: Timestamp,
pub(crate) last_observed_play_time_ms: u64,
}
#[spacetimedb::table(
accessor = profile_save_archive,
index(accessor = by_profile_save_archive_user_id, btree(columns = [user_id])),
index(
accessor = by_profile_save_archive_user_world_key,
btree(columns = [user_id, world_key])
),
index(
accessor = by_profile_save_archive_user_saved_at,
btree(columns = [user_id, saved_at])
)
)]
pub struct ProfileSaveArchive {
#[primary_key]
pub(crate) archive_id: String,
pub(crate) user_id: String,
pub(crate) world_key: String,
pub(crate) owner_user_id: Option<String>,
pub(crate) profile_id: Option<String>,
pub(crate) world_type: Option<String>,
pub(crate) world_name: String,
pub(crate) subtitle: String,
pub(crate) summary_text: String,
pub(crate) cover_image_src: Option<String>,
pub(crate) saved_at: Timestamp,
pub(crate) bottom_tab: String,
pub(crate) game_state_json: String,
pub(crate) current_story_json: Option<String>,
pub(crate) created_at: Timestamp,
pub(crate) updated_at: Timestamp,
}
// save archive 列表是按世界聚合后的最近一次快照视图,读取时只做排序,不再拼装默认值。
#[spacetimedb::procedure]
pub fn list_profile_save_archives(
ctx: &mut ProcedureContext,
input: RuntimeProfileSaveArchiveListInput,
) -> RuntimeProfileSaveArchiveProcedureResult {
match ctx.try_with_tx(|tx| list_profile_save_archive_rows(tx, input.clone())) {
Ok(entries) => RuntimeProfileSaveArchiveProcedureResult {
ok: true,
entries,
record: None,
current_snapshot: None,
error_message: None,
},
Err(message) => RuntimeProfileSaveArchiveProcedureResult {
ok: false,
entries: Vec::new(),
record: None,
current_snapshot: None,
error_message: Some(message),
},
}
}
// resume 会把指定 archive 回填到当前 snapshot并同步返回 entry + 当前 snapshot。
#[spacetimedb::procedure]
pub fn resume_profile_save_archive_and_return(
ctx: &mut ProcedureContext,
input: RuntimeProfileSaveArchiveResumeInput,
) -> RuntimeProfileSaveArchiveProcedureResult {
match ctx.try_with_tx(|tx| resume_profile_save_archive_record(tx, input.clone())) {
Ok((record, current_snapshot)) => RuntimeProfileSaveArchiveProcedureResult {
ok: true,
entries: Vec::new(),
record: Some(record),
current_snapshot: Some(current_snapshot),
error_message: None,
},
Err(message) => RuntimeProfileSaveArchiveProcedureResult {
ok: false,
entries: Vec::new(),
record: None,
current_snapshot: None,
error_message: Some(message),
},
}
}
// profile dashboard 当前先作为 projection 读入口返回默认零值,等待 runtime_snapshot 写链补齐刷新。
#[spacetimedb::procedure]
pub fn get_profile_dashboard(
ctx: &mut ProcedureContext,
input: RuntimeProfileDashboardGetInput,
) -> RuntimeProfileDashboardProcedureResult {
match ctx.try_with_tx(|tx| get_profile_dashboard_snapshot(tx, input.clone())) {
Ok(record) => RuntimeProfileDashboardProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeProfileDashboardProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// 钱包流水当前只暴露最近 50 条只读视图,排序与截断逻辑在 procedure 内统一收口。
#[spacetimedb::procedure]
pub fn list_profile_wallet_ledger(
ctx: &mut ProcedureContext,
input: RuntimeProfileWalletLedgerListInput,
) -> RuntimeProfileWalletLedgerProcedureResult {
match ctx.try_with_tx(|tx| list_profile_wallet_ledger_entries(tx, input.clone())) {
Ok(entries) => RuntimeProfileWalletLedgerProcedureResult {
ok: true,
entries,
error_message: None,
},
Err(message) => RuntimeProfileWalletLedgerProcedureResult {
ok: false,
entries: Vec::new(),
error_message: Some(message),
},
}
}
// play stats 与 dashboard 共用 dashboard projection 的 total_play_time / updated_at避免 Axum 侧拼装。
#[spacetimedb::procedure]
pub fn get_profile_play_stats(
ctx: &mut ProcedureContext,
input: RuntimeProfilePlayStatsGetInput,
) -> RuntimeProfilePlayStatsProcedureResult {
match ctx.try_with_tx(|tx| get_profile_play_stats_snapshot(tx, input.clone())) {
Ok(record) => RuntimeProfilePlayStatsProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeProfilePlayStatsProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
pub(crate) fn list_profile_save_archive_rows(
ctx: &ReducerContext,
input: RuntimeProfileSaveArchiveListInput,
) -> Result<Vec<RuntimeProfileSaveArchiveSnapshot>, String> {
let validated_input = build_runtime_profile_save_archive_list_input(input.user_id)
.map_err(|error| error.to_string())?;
let mut entries = ctx
.db
.profile_save_archive()
.iter()
.filter(|row| row.user_id == validated_input.user_id)
.map(|row| build_profile_save_archive_snapshot_from_row(&row))
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
right
.saved_at_micros
.cmp(&left.saved_at_micros)
.then_with(|| left.archive_id.cmp(&right.archive_id))
});
Ok(entries)
}
pub(crate) fn resume_profile_save_archive_record(
ctx: &ReducerContext,
input: RuntimeProfileSaveArchiveResumeInput,
) -> Result<(RuntimeProfileSaveArchiveSnapshot, RuntimeSnapshot), String> {
let validated_input = build_runtime_profile_save_archive_resume_input(input.user_id, input.world_key)
.map_err(|error| error.to_string())?;
let archive = ctx
.db
.profile_save_archive()
.iter()
.find(|row| {
row.user_id == validated_input.user_id && row.world_key == validated_input.world_key
})
.ok_or_else(|| "profile_save_archive 对应 world_key 不存在".to_string())?;
let existing_snapshot = ctx
.db
.runtime_snapshot()
.user_id()
.find(&validated_input.user_id);
let created_at = existing_snapshot
.as_ref()
.map(|row| row.created_at)
.unwrap_or(archive.saved_at);
if let Some(existing) = existing_snapshot {
ctx.db
.runtime_snapshot()
.user_id()
.delete(&existing.user_id);
}
ctx.db.runtime_snapshot().insert(RuntimeSnapshotRow {
user_id: archive.user_id.clone(),
version: SAVE_SNAPSHOT_VERSION,
saved_at: archive.saved_at,
bottom_tab: archive.bottom_tab.clone(),
game_state_json: archive.game_state_json.clone(),
current_story_json: archive.current_story_json.clone(),
created_at,
updated_at: archive.saved_at,
});
Ok((
build_profile_save_archive_snapshot_from_row(&archive),
RuntimeSnapshot {
user_id: archive.user_id.clone(),
version: SAVE_SNAPSHOT_VERSION,
saved_at_micros: archive.saved_at.to_micros_since_unix_epoch(),
bottom_tab: archive.bottom_tab.clone(),
game_state_json: archive.game_state_json.clone(),
current_story_json: archive.current_story_json.clone(),
created_at_micros: created_at.to_micros_since_unix_epoch(),
updated_at_micros: archive.saved_at.to_micros_since_unix_epoch(),
},
))
}
pub(crate) fn sync_profile_projections_from_snapshot(
ctx: &ReducerContext,
snapshot: &RuntimeSnapshot,
) -> Result<(), String> {
let game_state = parse_json_str(&snapshot.game_state_json)?;
let game_state_object = game_state.as_object();
let saved_at = Timestamp::from_micros_since_unix_epoch(snapshot.saved_at_micros);
sync_profile_dashboard_from_snapshot(ctx, snapshot, game_state_object, saved_at);
sync_profile_save_archive_from_snapshot(ctx, snapshot, &game_state, saved_at)?;
Ok(())
}
fn sync_profile_dashboard_from_snapshot(
ctx: &ReducerContext,
snapshot: &RuntimeSnapshot,
game_state: Option<&serde_json::Map<String, JsonValue>>,
saved_at: Timestamp,
) {
let current_state = ctx
.db
.profile_dashboard_state()
.user_id()
.find(&snapshot.user_id);
let previous_wallet_balance = current_state
.as_ref()
.map(|row| row.wallet_balance)
.unwrap_or(0);
let previous_total_play_time_ms = current_state
.as_ref()
.map(|row| row.total_play_time_ms)
.unwrap_or(0);
let next_wallet_balance =
read_non_negative_u64(game_state.and_then(|state| state.get("playerCurrency")));
let mut next_total_play_time_ms = previous_total_play_time_ms;
if next_wallet_balance != previous_wallet_balance {
ctx.db.profile_wallet_ledger().insert(ProfileWalletLedger {
wallet_ledger_id: format!(
"{}:{}:{}",
snapshot.user_id, snapshot.saved_at_micros, next_wallet_balance
),
user_id: snapshot.user_id.clone(),
amount_delta: next_wallet_balance as i64 - previous_wallet_balance as i64,
balance_after: next_wallet_balance,
source_type: RuntimeProfileWalletLedgerSourceType::SnapshotSync,
created_at: saved_at,
});
}
if let Some(world_meta) = resolve_profile_world_snapshot_meta(game_state) {
let current_play_time_ms = read_non_negative_u64(
game_state
.and_then(|state| state.get("runtimeStats"))
.and_then(JsonValue::as_object)
.and_then(|stats| stats.get("playTimeMs")),
);
let played_world_id = format!("{}:{}", snapshot.user_id, world_meta.world_key);
let existing = ctx
.db
.profile_played_world()
.played_world_id()
.find(&played_world_id);
let previous_observed_play_time_ms = existing
.as_ref()
.map(|row| row.last_observed_play_time_ms)
.unwrap_or(0);
let incremental_play_time_ms =
current_play_time_ms.saturating_sub(previous_observed_play_time_ms);
next_total_play_time_ms = next_total_play_time_ms.saturating_add(incremental_play_time_ms);
if let Some(existing) = existing {
ctx.db
.profile_played_world()
.played_world_id()
.delete(&existing.played_world_id);
ctx.db.profile_played_world().insert(ProfilePlayedWorld {
played_world_id,
user_id: snapshot.user_id.clone(),
world_key: world_meta.world_key,
owner_user_id: world_meta.owner_user_id,
profile_id: world_meta.profile_id,
world_type: world_meta.world_type,
world_title: world_meta.world_title,
world_subtitle: world_meta.world_subtitle,
first_played_at: existing.first_played_at,
last_played_at: saved_at,
last_observed_play_time_ms: current_play_time_ms
.max(existing.last_observed_play_time_ms),
});
} else {
ctx.db.profile_played_world().insert(ProfilePlayedWorld {
played_world_id,
user_id: snapshot.user_id.clone(),
world_key: world_meta.world_key,
owner_user_id: world_meta.owner_user_id,
profile_id: world_meta.profile_id,
world_type: world_meta.world_type,
world_title: world_meta.world_title,
world_subtitle: world_meta.world_subtitle,
first_played_at: saved_at,
last_played_at: saved_at,
last_observed_play_time_ms: current_play_time_ms,
});
}
}
if let Some(existing) = current_state {
ctx.db
.profile_dashboard_state()
.user_id()
.delete(&existing.user_id);
ctx.db
.profile_dashboard_state()
.insert(ProfileDashboardState {
user_id: snapshot.user_id.clone(),
wallet_balance: next_wallet_balance,
total_play_time_ms: next_total_play_time_ms,
created_at: existing.created_at,
updated_at: saved_at,
});
} else {
ctx.db
.profile_dashboard_state()
.insert(ProfileDashboardState {
user_id: snapshot.user_id.clone(),
wallet_balance: next_wallet_balance,
total_play_time_ms: next_total_play_time_ms,
created_at: saved_at,
updated_at: saved_at,
});
}
}
fn sync_profile_save_archive_from_snapshot(
ctx: &ReducerContext,
snapshot: &RuntimeSnapshot,
game_state: &JsonValue,
saved_at: Timestamp,
) -> Result<(), String> {
let Some(archive_meta) =
resolve_profile_save_archive_meta(game_state, snapshot.current_story_json.as_deref())
else {
return Ok(());
};
let archive_id = format!("{}:{}", snapshot.user_id, archive_meta.world_key);
let existing = ctx.db.profile_save_archive().archive_id().find(&archive_id);
let created_at = existing
.as_ref()
.map(|row| row.created_at)
.unwrap_or(saved_at);
if let Some(existing) = existing {
ctx.db
.profile_save_archive()
.archive_id()
.delete(&existing.archive_id);
}
ctx.db.profile_save_archive().insert(ProfileSaveArchive {
archive_id,
user_id: snapshot.user_id.clone(),
world_key: archive_meta.world_key,
owner_user_id: archive_meta.owner_user_id,
profile_id: archive_meta.profile_id,
world_type: archive_meta.world_type,
world_name: archive_meta.world_name,
subtitle: archive_meta.subtitle,
summary_text: archive_meta.summary_text,
cover_image_src: archive_meta.cover_image_src,
saved_at,
bottom_tab: snapshot.bottom_tab.clone(),
game_state_json: snapshot.game_state_json.clone(),
current_story_json: snapshot.current_story_json.clone(),
created_at,
updated_at: saved_at,
});
Ok(())
}
#[derive(Clone, Debug)]
struct ProfileWorldSnapshotMeta {
world_key: String,
owner_user_id: Option<String>,
profile_id: Option<String>,
world_type: Option<String>,
world_title: String,
world_subtitle: String,
}
#[derive(Clone, Debug)]
struct ProfileSaveArchiveMeta {
world_key: String,
owner_user_id: Option<String>,
profile_id: Option<String>,
world_type: Option<String>,
world_name: String,
subtitle: String,
summary_text: String,
cover_image_src: Option<String>,
}
pub(crate) fn build_profile_save_archive_snapshot_from_row(
row: &ProfileSaveArchive,
) -> RuntimeProfileSaveArchiveSnapshot {
RuntimeProfileSaveArchiveSnapshot {
archive_id: row.archive_id.clone(),
user_id: row.user_id.clone(),
world_key: row.world_key.clone(),
owner_user_id: row.owner_user_id.clone(),
profile_id: row.profile_id.clone(),
world_type: row.world_type.clone(),
world_name: row.world_name.clone(),
subtitle: row.subtitle.clone(),
summary_text: row.summary_text.clone(),
cover_image_src: row.cover_image_src.clone(),
saved_at_micros: row.saved_at.to_micros_since_unix_epoch(),
bottom_tab: row.bottom_tab.clone(),
game_state_json: row.game_state_json.clone(),
current_story_json: row.current_story_json.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
fn read_non_negative_u64(value: Option<&JsonValue>) -> u64 {
match value {
Some(JsonValue::Number(number)) => {
if let Some(raw) = number.as_u64() {
raw
} else if let Some(raw) = number.as_i64() {
raw.max(0) as u64
} else if let Some(raw) = number.as_f64() {
if raw.is_finite() && raw > 0.0 {
raw.floor() as u64
} else {
0
}
} else {
0
}
}
Some(JsonValue::String(raw)) => raw.trim().parse::<u64>().ok().unwrap_or(0),
_ => 0,
}
}
fn read_string_from_json(value: Option<&JsonValue>) -> Option<String> {
value
.and_then(JsonValue::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string)
}
fn resolve_profile_world_snapshot_meta(
game_state: Option<&serde_json::Map<String, JsonValue>>,
) -> Option<ProfileWorldSnapshotMeta> {
let game_state = game_state?;
let custom_world_profile = game_state
.get("customWorldProfile")
.and_then(JsonValue::as_object);
if let Some(custom_world_profile) = custom_world_profile {
let profile_id = read_string_from_json(custom_world_profile.get("id"));
let world_title = read_string_from_json(custom_world_profile.get("name"))
.or_else(|| read_string_from_json(custom_world_profile.get("title")));
if profile_id.is_some() || world_title.is_some() {
let world_title = world_title.unwrap_or_else(|| "自定义世界".to_string());
return Some(ProfileWorldSnapshotMeta {
world_key: profile_id
.as_ref()
.map(|profile_id| format!("custom:{profile_id}"))
.unwrap_or_else(|| format!("custom:{world_title}")),
owner_user_id: None,
profile_id,
world_type: Some("CUSTOM".to_string()),
world_title,
world_subtitle: read_string_from_json(custom_world_profile.get("summary"))
.or_else(|| read_string_from_json(custom_world_profile.get("settingText")))
.unwrap_or_default(),
});
}
}
let world_type = read_string_from_json(game_state.get("worldType"))?;
let current_scene_preset = game_state
.get("currentScenePreset")
.and_then(JsonValue::as_object);
Some(ProfileWorldSnapshotMeta {
world_key: format!("builtin:{world_type}"),
owner_user_id: None,
profile_id: None,
world_type: Some(world_type.clone()),
world_title: current_scene_preset
.and_then(|preset| read_string_from_json(preset.get("name")))
.unwrap_or_else(|| build_builtin_world_title(&world_type)),
world_subtitle: current_scene_preset
.and_then(|preset| {
read_string_from_json(preset.get("summary"))
.or_else(|| read_string_from_json(preset.get("description")))
})
.unwrap_or_default(),
})
}
fn resolve_profile_save_archive_meta(
game_state: &JsonValue,
current_story_json: Option<&str>,
) -> Option<ProfileSaveArchiveMeta> {
let game_state_object = game_state.as_object();
let world_meta = resolve_profile_world_snapshot_meta(game_state_object)?;
let story_engine_memory = game_state_object
.and_then(|state| state.get("storyEngineMemory"))
.and_then(JsonValue::as_object);
let continue_game_digest = story_engine_memory
.and_then(|memory| read_string_from_json(memory.get("continueGameDigest")));
let current_story_text = parse_optional_json_str(current_story_json)
.ok()
.flatten()
.and_then(|story| story.as_object().cloned())
.and_then(|story| read_string_from_json(story.get("text")));
let custom_world_profile = game_state_object
.and_then(|state| state.get("customWorldProfile"))
.and_then(JsonValue::as_object);
if let Some(custom_world_profile) = custom_world_profile {
let world_name = read_string_from_json(custom_world_profile.get("name"))
.or_else(|| read_string_from_json(custom_world_profile.get("title")))
.unwrap_or_else(|| world_meta.world_title.clone());
let subtitle = read_string_from_json(custom_world_profile.get("summary"))
.or_else(|| read_string_from_json(custom_world_profile.get("settingText")))
.unwrap_or_else(|| world_meta.world_subtitle.clone());
let summary_text = continue_game_digest
.or(current_story_text)
.or_else(|| {
if subtitle.is_empty() {
None
} else {
Some(subtitle.clone())
}
})
.unwrap_or_else(|| DEFAULT_SAVE_ARCHIVE_SUMMARY_TEXT.to_string());
return Some(ProfileSaveArchiveMeta {
world_key: world_meta.world_key,
owner_user_id: world_meta.owner_user_id,
profile_id: world_meta.profile_id,
world_type: world_meta.world_type,
world_name,
subtitle,
summary_text,
cover_image_src: read_string_from_json(custom_world_profile.get("coverImageSrc")),
});
}
let summary_text = continue_game_digest
.or(current_story_text)
.or_else(|| {
if world_meta.world_subtitle.is_empty() {
None
} else {
Some(world_meta.world_subtitle.clone())
}
})
.unwrap_or_else(|| DEFAULT_SAVE_ARCHIVE_SUMMARY_TEXT.to_string());
let current_scene_preset = game_state_object
.and_then(|state| state.get("currentScenePreset"))
.and_then(JsonValue::as_object);
Some(ProfileSaveArchiveMeta {
world_key: world_meta.world_key,
owner_user_id: world_meta.owner_user_id,
profile_id: world_meta.profile_id,
world_type: world_meta.world_type,
world_name: world_meta.world_title,
subtitle: world_meta.world_subtitle.clone(),
summary_text,
cover_image_src: current_scene_preset
.and_then(|preset| read_string_from_json(preset.get("imageSrc"))),
})
}
fn build_builtin_world_title(world_type: &str) -> String {
match world_type {
"WUXIA" => "武侠世界".to_string(),
"XIANXIA" => "仙侠世界".to_string(),
_ => "叙事世界".to_string(),
}
}
fn get_profile_dashboard_snapshot(
ctx: &ReducerContext,
input: RuntimeProfileDashboardGetInput,
) -> Result<RuntimeProfileDashboardSnapshot, String> {
let validated_input = build_runtime_profile_dashboard_get_input(input.user_id)
.map_err(|error| error.to_string())?;
let state = ctx
.db
.profile_dashboard_state()
.user_id()
.find(&validated_input.user_id);
let played_world_count = ctx
.db
.profile_played_world()
.iter()
.filter(|row| row.user_id == validated_input.user_id)
.count() as u32;
Ok(match state {
Some(existing) => RuntimeProfileDashboardSnapshot {
user_id: existing.user_id,
wallet_balance: existing.wallet_balance,
total_play_time_ms: existing.total_play_time_ms,
played_world_count,
updated_at_micros: Some(existing.updated_at.to_micros_since_unix_epoch()),
},
None => RuntimeProfileDashboardSnapshot {
user_id: validated_input.user_id,
wallet_balance: 0,
total_play_time_ms: 0,
played_world_count,
updated_at_micros: None,
},
})
}
fn list_profile_wallet_ledger_entries(
ctx: &ReducerContext,
input: RuntimeProfileWalletLedgerListInput,
) -> Result<Vec<RuntimeProfileWalletLedgerEntrySnapshot>, String> {
let validated_input = build_runtime_profile_wallet_ledger_list_input(input.user_id)
.map_err(|error| error.to_string())?;
let mut entries = ctx
.db
.profile_wallet_ledger()
.iter()
.filter(|row| row.user_id == validated_input.user_id)
.map(|row| build_profile_wallet_ledger_snapshot_from_row(&row))
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
right
.created_at_micros
.cmp(&left.created_at_micros)
.then_with(|| left.wallet_ledger_id.cmp(&right.wallet_ledger_id))
});
entries.truncate(PROFILE_WALLET_LEDGER_LIST_LIMIT);
Ok(entries)
}
fn get_profile_play_stats_snapshot(
ctx: &ReducerContext,
input: RuntimeProfilePlayStatsGetInput,
) -> Result<RuntimeProfilePlayStatsSnapshot, String> {
let validated_input = build_runtime_profile_play_stats_get_input(input.user_id)
.map_err(|error| error.to_string())?;
let dashboard_state = ctx
.db
.profile_dashboard_state()
.user_id()
.find(&validated_input.user_id);
let mut played_works = ctx
.db
.profile_played_world()
.iter()
.filter(|row| row.user_id == validated_input.user_id)
.map(|row| build_profile_played_world_snapshot_from_row(&row))
.collect::<Vec<_>>();
played_works.sort_by(|left, right| {
right
.last_played_at_micros
.cmp(&left.last_played_at_micros)
.then_with(|| left.played_world_id.cmp(&right.played_world_id))
});
Ok(RuntimeProfilePlayStatsSnapshot {
user_id: validated_input.user_id,
total_play_time_ms: dashboard_state
.as_ref()
.map(|row| row.total_play_time_ms)
.unwrap_or(0),
played_works,
updated_at_micros: dashboard_state
.as_ref()
.map(|row| row.updated_at.to_micros_since_unix_epoch()),
})
}
fn build_profile_wallet_ledger_snapshot_from_row(
row: &ProfileWalletLedger,
) -> RuntimeProfileWalletLedgerEntrySnapshot {
RuntimeProfileWalletLedgerEntrySnapshot {
wallet_ledger_id: row.wallet_ledger_id.clone(),
user_id: row.user_id.clone(),
amount_delta: row.amount_delta,
balance_after: row.balance_after,
source_type: row.source_type,
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}
fn build_profile_played_world_snapshot_from_row(
row: &ProfilePlayedWorld,
) -> RuntimeProfilePlayedWorldSnapshot {
RuntimeProfilePlayedWorldSnapshot {
played_world_id: row.played_world_id.clone(),
user_id: row.user_id.clone(),
world_key: row.world_key.clone(),
owner_user_id: row.owner_user_id.clone(),
profile_id: row.profile_id.clone(),
world_type: row.world_type.clone(),
world_title: row.world_title.clone(),
world_subtitle: row.world_subtitle.clone(),
first_played_at_micros: row.first_played_at.to_micros_since_unix_epoch(),
last_played_at_micros: row.last_played_at.to_micros_since_unix_epoch(),
last_observed_play_time_ms: row.last_observed_play_time_ms,
}
}

View File

@@ -1 +1,141 @@
// Runtime settings 相关表、procedure 与 helper 落位点。
use crate::*;
#[spacetimedb::table(accessor = runtime_setting)]
pub struct RuntimeSetting {
#[primary_key]
pub(crate) user_id: String,
pub(crate) music_volume: f32,
pub(crate) platform_theme: RuntimePlatformTheme,
pub(crate) created_at: Timestamp,
pub(crate) updated_at: Timestamp,
}
// procedure 面向 Axum 同步读取设置;若没有持久化记录则返回默认值快照,但不产生额外写入。
#[spacetimedb::procedure]
pub fn get_runtime_setting_or_default(
ctx: &mut ProcedureContext,
input: RuntimeSettingGetInput,
) -> RuntimeSettingProcedureResult {
match ctx.try_with_tx(|tx| get_runtime_setting_snapshot(tx, input.clone())) {
Ok(record) => RuntimeSettingProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeSettingProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// procedure 面向 Axum 同步写入设置,并返回最终归一化后的持久化结果。
#[spacetimedb::procedure]
pub fn upsert_runtime_setting_and_return(
ctx: &mut ProcedureContext,
input: RuntimeSettingUpsertInput,
) -> RuntimeSettingProcedureResult {
match ctx.try_with_tx(|tx| upsert_runtime_setting(tx, input.clone())) {
Ok(record) => RuntimeSettingProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeSettingProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
fn get_runtime_setting_snapshot(
ctx: &ReducerContext,
input: RuntimeSettingGetInput,
) -> Result<RuntimeSettingSnapshot, String> {
let validated_input =
build_runtime_setting_get_input(input.user_id).map_err(|error| error.to_string())?;
if let Some(existing) = ctx
.db
.runtime_setting()
.user_id()
.find(&validated_input.user_id)
{
return Ok(RuntimeSettingSnapshot {
user_id: existing.user_id,
music_volume: existing.music_volume,
platform_theme: existing.platform_theme,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: existing.updated_at.to_micros_since_unix_epoch(),
});
}
Ok(RuntimeSettingSnapshot {
user_id: validated_input.user_id,
music_volume: DEFAULT_MUSIC_VOLUME,
platform_theme: DEFAULT_PLATFORM_THEME,
created_at_micros: 0,
updated_at_micros: 0,
})
}
fn upsert_runtime_setting(
ctx: &ReducerContext,
input: RuntimeSettingUpsertInput,
) -> Result<RuntimeSettingSnapshot, String> {
let validated_input = build_runtime_setting_upsert_input(
input.user_id,
input.music_volume,
input.platform_theme,
input.updated_at_micros,
)
.map_err(|error| error.to_string())?;
let updated_at = Timestamp::from_micros_since_unix_epoch(validated_input.updated_at_micros);
let snapshot = match ctx
.db
.runtime_setting()
.user_id()
.find(&validated_input.user_id)
{
Some(existing) => {
ctx.db.runtime_setting().user_id().delete(&existing.user_id);
ctx.db.runtime_setting().insert(RuntimeSetting {
user_id: existing.user_id.clone(),
music_volume: validated_input.music_volume,
platform_theme: validated_input.platform_theme,
created_at: existing.created_at,
updated_at,
});
RuntimeSettingSnapshot {
user_id: existing.user_id,
music_volume: validated_input.music_volume,
platform_theme: validated_input.platform_theme,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: validated_input.updated_at_micros,
}
}
None => {
ctx.db.runtime_setting().insert(RuntimeSetting {
user_id: validated_input.user_id.clone(),
music_volume: validated_input.music_volume,
platform_theme: validated_input.platform_theme,
created_at: updated_at,
updated_at,
});
RuntimeSettingSnapshot {
user_id: validated_input.user_id,
music_volume: validated_input.music_volume,
platform_theme: validated_input.platform_theme,
created_at_micros: validated_input.updated_at_micros,
updated_at_micros: validated_input.updated_at_micros,
}
}
};
Ok(snapshot)
}

View File

@@ -1 +1,215 @@
// Runtime snapshot 与 save archive 相关表、procedure 与 helper 落位点。
use crate::*;
#[spacetimedb::table(accessor = runtime_snapshot)]
pub struct RuntimeSnapshotRow {
#[primary_key]
pub(crate) user_id: String,
pub(crate) version: u32,
pub(crate) saved_at: Timestamp,
pub(crate) bottom_tab: String,
pub(crate) game_state_json: String,
pub(crate) current_story_json: Option<String>,
pub(crate) created_at: Timestamp,
pub(crate) updated_at: Timestamp,
}
// 当前快照读取保持旧 Node 语义:无快照时返回 ok=true + record=None而不是默认空对象。
#[spacetimedb::procedure]
pub fn get_runtime_snapshot(
ctx: &mut ProcedureContext,
input: RuntimeSnapshotGetInput,
) -> RuntimeSnapshotProcedureResult {
match ctx.try_with_tx(|tx| get_runtime_snapshot_record(tx, input.clone())) {
Ok(record) => RuntimeSnapshotProcedureResult {
ok: true,
record,
error_message: None,
},
Err(message) => RuntimeSnapshotProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// PUT snapshot 主链会同步刷新 dashboard / wallet / played_world / save_archive 四类 projection。
#[spacetimedb::procedure]
pub fn upsert_runtime_snapshot_and_return(
ctx: &mut ProcedureContext,
input: RuntimeSnapshotUpsertInput,
) -> RuntimeSnapshotProcedureResult {
match ctx.try_with_tx(|tx| upsert_runtime_snapshot_record(tx, input.clone())) {
Ok(record) => RuntimeSnapshotProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => RuntimeSnapshotProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// 删除当前快照只影响 runtime_snapshot 主表,不联动清理 profile projection。
#[spacetimedb::procedure]
pub fn delete_runtime_snapshot_and_return(
ctx: &mut ProcedureContext,
input: RuntimeSnapshotDeleteInput,
) -> RuntimeSnapshotProcedureResult {
match ctx.try_with_tx(|tx| delete_runtime_snapshot_record(tx, input.clone())) {
Ok(record) => RuntimeSnapshotProcedureResult {
ok: true,
record,
error_message: None,
},
Err(message) => RuntimeSnapshotProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
pub(crate) fn get_runtime_snapshot_record(
ctx: &ReducerContext,
input: RuntimeSnapshotGetInput,
) -> Result<Option<RuntimeSnapshot>, String> {
let validated_input =
build_runtime_snapshot_get_input(input.user_id).map_err(|error| error.to_string())?;
Ok(ctx
.db
.runtime_snapshot()
.user_id()
.find(&validated_input.user_id)
.map(|row| build_runtime_snapshot_from_row(&row)))
}
pub(crate) fn upsert_runtime_snapshot_record(
ctx: &ReducerContext,
input: RuntimeSnapshotUpsertInput,
) -> Result<RuntimeSnapshot, String> {
let current_story_value = parse_optional_json_str(input.current_story_json.as_deref())?;
let game_state = parse_json_str(&input.game_state_json)?;
let prepared = build_runtime_snapshot_upsert_input(
input.user_id,
input.saved_at_micros,
input.bottom_tab,
game_state.clone(),
current_story_value.clone(),
input.updated_at_micros,
)
.map_err(|error| error.to_string())?;
let updated_at = Timestamp::from_micros_since_unix_epoch(prepared.updated_at_micros);
let saved_at = Timestamp::from_micros_since_unix_epoch(prepared.saved_at_micros);
let snapshot = match ctx.db.runtime_snapshot().user_id().find(&prepared.user_id) {
Some(existing) => {
ctx.db
.runtime_snapshot()
.user_id()
.delete(&existing.user_id);
ctx.db.runtime_snapshot().insert(RuntimeSnapshotRow {
user_id: existing.user_id.clone(),
version: SAVE_SNAPSHOT_VERSION,
saved_at,
bottom_tab: prepared.bottom_tab.clone(),
game_state_json: prepared.game_state_json.clone(),
current_story_json: prepared.current_story_json.clone(),
created_at: existing.created_at,
updated_at,
});
RuntimeSnapshot {
user_id: existing.user_id,
version: SAVE_SNAPSHOT_VERSION,
saved_at_micros: prepared.saved_at_micros,
bottom_tab: prepared.bottom_tab,
game_state_json: prepared.game_state_json,
current_story_json: prepared.current_story_json,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: prepared.updated_at_micros,
}
}
None => {
ctx.db.runtime_snapshot().insert(RuntimeSnapshotRow {
user_id: prepared.user_id.clone(),
version: SAVE_SNAPSHOT_VERSION,
saved_at,
bottom_tab: prepared.bottom_tab.clone(),
game_state_json: prepared.game_state_json.clone(),
current_story_json: prepared.current_story_json.clone(),
created_at: updated_at,
updated_at,
});
RuntimeSnapshot {
user_id: prepared.user_id,
version: SAVE_SNAPSHOT_VERSION,
saved_at_micros: prepared.saved_at_micros,
bottom_tab: prepared.bottom_tab,
game_state_json: prepared.game_state_json,
current_story_json: prepared.current_story_json,
created_at_micros: prepared.updated_at_micros,
updated_at_micros: prepared.updated_at_micros,
}
}
};
sync_profile_projections_from_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
pub(crate) fn delete_runtime_snapshot_record(
ctx: &ReducerContext,
input: RuntimeSnapshotDeleteInput,
) -> Result<Option<RuntimeSnapshot>, String> {
let validated_input =
build_runtime_snapshot_delete_input(input.user_id).map_err(|error| error.to_string())?;
let existing = ctx
.db
.runtime_snapshot()
.user_id()
.find(&validated_input.user_id);
if let Some(existing) = existing {
let snapshot = build_runtime_snapshot_from_row(&existing);
ctx.db
.runtime_snapshot()
.user_id()
.delete(&existing.user_id);
return Ok(Some(snapshot));
}
Ok(None)
}
pub(crate) fn build_runtime_snapshot_from_row(row: &RuntimeSnapshotRow) -> RuntimeSnapshot {
RuntimeSnapshot {
user_id: row.user_id.clone(),
version: row.version,
saved_at_micros: row.saved_at.to_micros_since_unix_epoch(),
bottom_tab: row.bottom_tab.clone(),
game_state_json: row.game_state_json.clone(),
current_story_json: row.current_story_json.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
pub(crate) fn parse_json_str(raw: &str) -> Result<JsonValue, String> {
serde_json::from_str::<JsonValue>(raw).map_err(|error| format!("game_state_json 解析失败: {error}"))
}
pub(crate) fn parse_optional_json_str(raw: Option<&str>) -> Result<Option<JsonValue>, String> {
match raw.map(str::trim).filter(|value| !value.is_empty()) {
Some(value) => serde_json::from_str::<JsonValue>(value)
.map(Some)
.map_err(|error| format!("current_story_json 解析失败: {error}")),
None => Ok(None),
}
}