#[spacetimedb::table( accessor = ai_task, index(accessor = by_ai_task_owner_user_id, btree(columns = [owner_user_id])), index(accessor = by_ai_task_status, btree(columns = [status])), index(accessor = by_ai_task_kind, btree(columns = [task_kind])) )] pub struct AiTask { #[primary_key] task_id: String, task_kind: AiTaskKind, owner_user_id: String, request_label: String, source_module: String, source_entity_id: Option, request_payload_json: Option, status: AiTaskStatus, failure_message: Option, latest_text_output: Option, latest_structured_payload_json: Option, version: u32, created_at: Timestamp, started_at: Option, completed_at: Option, updated_at: Timestamp, } #[spacetimedb::table( accessor = ai_task_stage, index(accessor = by_ai_task_stage_task_id, btree(columns = [task_id])), index(accessor = by_ai_task_stage_task_order, btree(columns = [task_id, stage_order])) )] pub struct AiTaskStage { #[primary_key] task_stage_id: String, task_id: String, stage_kind: AiTaskStageKind, label: String, detail: String, stage_order: u32, status: AiTaskStageStatus, text_output: Option, structured_payload_json: Option, warning_messages: Vec, started_at: Option, completed_at: Option, } #[spacetimedb::table( accessor = ai_text_chunk, index(accessor = by_ai_text_chunk_task_id, btree(columns = [task_id])), index( accessor = by_ai_text_chunk_task_stage_sequence, btree(columns = [task_id, stage_kind, sequence]) ) )] pub struct AiTextChunk { #[primary_key] text_chunk_row_id: String, chunk_id: String, task_id: String, stage_kind: AiTaskStageKind, sequence: u32, delta_text: String, created_at: Timestamp, } #[spacetimedb::table( accessor = ai_result_reference, index(accessor = by_ai_result_reference_task_id, btree(columns = [task_id])) )] pub struct AiResultReference { #[primary_key] result_reference_row_id: String, result_ref_id: String, task_id: String, reference_kind: AiResultReferenceKind, reference_id: String, label: Option, created_at: Timestamp, } // AI 任务当前先固定成 private 真相表,后续由 Axum / platform-llm 再往外包一层 HTTP 与 SSE 协议。 #[spacetimedb::reducer] pub fn create_ai_task(ctx: &ReducerContext, input: AiTaskCreateInput) -> Result<(), String> { create_ai_task_tx(ctx, input).map(|_| ()) } #[spacetimedb::procedure] pub fn create_ai_task_and_return( ctx: &mut ProcedureContext, input: AiTaskCreateInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| create_ai_task_tx(tx, input.clone())) { Ok(task) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: None, error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } #[spacetimedb::reducer] pub fn start_ai_task(ctx: &ReducerContext, input: AiTaskStartInput) -> Result<(), String> { start_ai_task_tx(ctx, input).map(|_| ()) } #[spacetimedb::reducer] pub fn start_ai_task_stage( ctx: &ReducerContext, input: AiTaskStageStartInput, ) -> Result<(), String> { start_ai_task_stage_tx(ctx, input).map(|_| ()) } // 流式增量写入需要同步返回 chunk 与聚合后的任务快照,方便后续 Axum facade 直接复用。 #[spacetimedb::procedure] pub fn append_ai_text_chunk_and_return( ctx: &mut ProcedureContext, input: AiTextChunkAppendInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| append_ai_text_chunk_tx(tx, input.clone())) { Ok((task, text_chunk)) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: Some(text_chunk), error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn complete_ai_stage_and_return( ctx: &mut ProcedureContext, input: AiStageCompletionInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| complete_ai_stage_tx(tx, input.clone())) { Ok(task) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: None, error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn attach_ai_result_reference_and_return( ctx: &mut ProcedureContext, input: AiResultReferenceInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| attach_ai_result_reference_tx(tx, input.clone())) { Ok(task) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: None, error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn complete_ai_task_and_return( ctx: &mut ProcedureContext, input: AiTaskFinishInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| complete_ai_task_tx(tx, input.clone())) { Ok(task) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: None, error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn fail_ai_task_and_return( ctx: &mut ProcedureContext, input: AiTaskFailureInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| fail_ai_task_tx(tx, input.clone())) { Ok(task) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: None, error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } #[spacetimedb::procedure] pub fn cancel_ai_task_and_return( ctx: &mut ProcedureContext, input: AiTaskCancelInput, ) -> AiTaskProcedureResult { match ctx.try_with_tx(|tx| cancel_ai_task_tx(tx, input.clone())) { Ok(task) => AiTaskProcedureResult { ok: true, task: Some(task), text_chunk: None, error_message: None, }, Err(message) => AiTaskProcedureResult { ok: false, task: None, text_chunk: None, error_message: Some(message), }, } } fn create_ai_task_tx( ctx: &ReducerContext, input: AiTaskCreateInput, ) -> Result { validate_task_create_input(&input).map_err(|error| error.to_string())?; if ctx.db.ai_task().task_id().find(&input.task_id).is_some() { return Err("ai_task.task_id 已存在".to_string()); } let task_snapshot = build_ai_task_snapshot_from_create_input(&input); ctx.db.ai_task().insert(build_ai_task_row(&task_snapshot)); replace_ai_task_stages(ctx, &task_snapshot.task_id, &task_snapshot.stages); get_ai_task_snapshot_tx(ctx, &task_snapshot.task_id) } fn start_ai_task_tx( ctx: &ReducerContext, input: AiTaskStartInput, ) -> Result { let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; snapshot.status = AiTaskStatus::Running; if snapshot.started_at_micros.is_none() { snapshot.started_at_micros = Some(input.started_at_micros); } snapshot.updated_at_micros = input.started_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn start_ai_task_stage_tx( ctx: &ReducerContext, input: AiTaskStageStartInput, ) -> Result { let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; let stage = snapshot .stages .iter_mut() .find(|stage| stage.stage_kind == input.stage_kind) .ok_or_else(|| "ai_task.stage 不存在".to_string())?; snapshot.status = AiTaskStatus::Running; if snapshot.started_at_micros.is_none() { snapshot.started_at_micros = Some(input.started_at_micros); } stage.status = AiTaskStageStatus::Running; if stage.started_at_micros.is_none() { stage.started_at_micros = Some(input.started_at_micros); } snapshot.updated_at_micros = input.started_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn append_ai_text_chunk_tx( ctx: &ReducerContext, input: AiTextChunkAppendInput, ) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), String> { if input.delta_text.trim().is_empty() { return Err("ai_text_chunk.delta_text 不能为空".to_string()); } if input.sequence == 0 { return Err("ai_text_chunk.sequence 必须大于 0".to_string()); } let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; let stage = snapshot .stages .iter_mut() .find(|stage| stage.stage_kind == input.stage_kind) .ok_or_else(|| "ai_task.stage 不存在".to_string())?; let chunk = AiTextChunkSnapshot { chunk_id: generate_ai_text_chunk_id(input.created_at_micros, input.sequence), task_id: input.task_id.trim().to_string(), stage_kind: input.stage_kind, sequence: input.sequence, delta_text: input.delta_text.trim().to_string(), created_at_micros: input.created_at_micros, }; ctx.db .ai_text_chunk() .insert(build_ai_text_chunk_row(&chunk)); let aggregated_text = collect_ai_stage_text_output(ctx, &chunk.task_id, chunk.stage_kind); snapshot.status = AiTaskStatus::Running; if snapshot.started_at_micros.is_none() { snapshot.started_at_micros = Some(input.created_at_micros); } stage.status = AiTaskStageStatus::Running; if stage.started_at_micros.is_none() { stage.started_at_micros = Some(input.created_at_micros); } stage.text_output = aggregated_text.clone(); snapshot.latest_text_output = aggregated_text; snapshot.updated_at_micros = input.created_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok((snapshot, chunk)) } fn complete_ai_stage_tx( ctx: &ReducerContext, input: AiStageCompletionInput, ) -> Result { let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; let stage = snapshot .stages .iter_mut() .find(|stage| stage.stage_kind == input.stage_kind) .ok_or_else(|| "ai_task.stage 不存在".to_string())?; stage.status = AiTaskStageStatus::Completed; stage.completed_at_micros = Some(input.completed_at_micros); stage.text_output = normalize_optional_text(input.text_output.clone()); stage.structured_payload_json = normalize_optional_text(input.structured_payload_json.clone()); stage.warning_messages = normalize_string_list(input.warning_messages.clone()); snapshot.latest_text_output = stage.text_output.clone(); snapshot.latest_structured_payload_json = stage.structured_payload_json.clone(); snapshot.updated_at_micros = input.completed_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn attach_ai_result_reference_tx( ctx: &ReducerContext, input: AiResultReferenceInput, ) -> Result { let reference_id = input.reference_id.trim().to_string(); if reference_id.is_empty() { return Err("ai_result_reference.reference_id 不能为空".to_string()); } let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; let reference = AiResultReferenceSnapshot { result_ref_id: generate_ai_result_ref_id(input.created_at_micros), task_id: input.task_id.trim().to_string(), reference_kind: input.reference_kind, reference_id, label: normalize_optional_text(input.label), created_at_micros: input.created_at_micros, }; ctx.db .ai_result_reference() .insert(build_ai_result_reference_row(&reference)); snapshot.result_references.push(reference); snapshot.updated_at_micros = input.created_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn complete_ai_task_tx( ctx: &ReducerContext, input: AiTaskFinishInput, ) -> Result { let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; snapshot.status = AiTaskStatus::Completed; snapshot.completed_at_micros = Some(input.completed_at_micros); snapshot.updated_at_micros = input.completed_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn fail_ai_task_tx( ctx: &ReducerContext, input: AiTaskFailureInput, ) -> Result { let failure_message = input.failure_message.trim().to_string(); if failure_message.is_empty() { return Err("ai_task.failure_message 不能为空".to_string()); } let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; snapshot.status = AiTaskStatus::Failed; snapshot.failure_message = Some(failure_message); snapshot.completed_at_micros = Some(input.completed_at_micros); snapshot.updated_at_micros = input.completed_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn cancel_ai_task_tx( ctx: &ReducerContext, input: AiTaskCancelInput, ) -> Result { let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?; ensure_ai_task_can_transition(snapshot.status)?; snapshot.status = AiTaskStatus::Cancelled; snapshot.completed_at_micros = Some(input.completed_at_micros); snapshot.updated_at_micros = input.completed_at_micros; snapshot.version += 1; persist_ai_task_snapshot(ctx, &snapshot)?; Ok(snapshot) } fn get_ai_task_snapshot_tx(ctx: &ReducerContext, task_id: &str) -> Result { let row = ctx .db .ai_task() .task_id() .find(&task_id.trim().to_string()) .ok_or_else(|| "ai_task 不存在".to_string())?; Ok(build_ai_task_snapshot_from_row(ctx, &row)) } fn persist_ai_task_snapshot(ctx: &ReducerContext, snapshot: &AiTaskSnapshot) -> Result<(), String> { ctx.db.ai_task().task_id().delete(&snapshot.task_id); ctx.db.ai_task().insert(build_ai_task_row(snapshot)); replace_ai_task_stages(ctx, &snapshot.task_id, &snapshot.stages); Ok(()) } fn replace_ai_task_stages(ctx: &ReducerContext, task_id: &str, stages: &[AiTaskStageSnapshot]) { let stage_ids = ctx .db .ai_task_stage() .iter() .filter(|row| row.task_id == task_id) .map(|row| row.task_stage_id.clone()) .collect::>(); for stage_id in stage_ids { ctx.db.ai_task_stage().task_stage_id().delete(&stage_id); } for stage in stages { ctx.db .ai_task_stage() .insert(build_ai_task_stage_row(task_id, stage)); } } fn collect_ai_stage_text_output( ctx: &ReducerContext, task_id: &str, stage_kind: AiTaskStageKind, ) -> Option { let mut chunks = ctx .db .ai_text_chunk() .iter() .filter(|row| row.task_id == task_id && row.stage_kind == stage_kind) .map(|row| build_ai_text_chunk_snapshot_from_row(&row)) .collect::>(); chunks.sort_by_key(|chunk| chunk.sequence); let aggregated = chunks .into_iter() .map(|chunk| chunk.delta_text) .collect::>() .join(""); if aggregated.trim().is_empty() { None } else { Some(aggregated) } } fn ensure_ai_task_can_transition(status: AiTaskStatus) -> Result<(), String> { if matches!( status, AiTaskStatus::Completed | AiTaskStatus::Failed | AiTaskStatus::Cancelled ) { Err("当前 ai_task 状态不允许执行该操作".to_string()) } else { Ok(()) } } fn build_ai_task_snapshot_from_create_input(input: &AiTaskCreateInput) -> AiTaskSnapshot { AiTaskSnapshot { task_id: input.task_id.trim().to_string(), task_kind: input.task_kind, owner_user_id: input.owner_user_id.trim().to_string(), request_label: input.request_label.trim().to_string(), source_module: input.source_module.trim().to_string(), source_entity_id: normalize_optional_text(input.source_entity_id.clone()), request_payload_json: normalize_optional_text(input.request_payload_json.clone()), status: AiTaskStatus::Pending, failure_message: None, stages: input .stages .iter() .map(|stage| AiTaskStageSnapshot { stage_kind: stage.stage_kind, label: stage.label.trim().to_string(), detail: stage.detail.trim().to_string(), order: stage.order, status: AiTaskStageStatus::Pending, text_output: None, structured_payload_json: None, warning_messages: Vec::new(), started_at_micros: None, completed_at_micros: None, }) .collect(), result_references: Vec::new(), latest_text_output: None, latest_structured_payload_json: None, version: INITIAL_AI_TASK_VERSION, created_at_micros: input.created_at_micros, started_at_micros: None, completed_at_micros: None, updated_at_micros: input.created_at_micros, } } fn build_ai_task_row(snapshot: &AiTaskSnapshot) -> AiTask { AiTask { task_id: snapshot.task_id.clone(), task_kind: snapshot.task_kind, owner_user_id: snapshot.owner_user_id.clone(), request_label: snapshot.request_label.clone(), source_module: snapshot.source_module.clone(), source_entity_id: snapshot.source_entity_id.clone(), request_payload_json: snapshot.request_payload_json.clone(), status: snapshot.status, failure_message: snapshot.failure_message.clone(), latest_text_output: snapshot.latest_text_output.clone(), latest_structured_payload_json: snapshot.latest_structured_payload_json.clone(), version: snapshot.version, created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros), started_at: snapshot .started_at_micros .map(Timestamp::from_micros_since_unix_epoch), completed_at: snapshot .completed_at_micros .map(Timestamp::from_micros_since_unix_epoch), updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros), } } fn build_ai_task_snapshot_from_row(ctx: &ReducerContext, row: &AiTask) -> AiTaskSnapshot { let mut stages = ctx .db .ai_task_stage() .iter() .filter(|stage| stage.task_id == row.task_id) .map(|stage| build_ai_task_stage_snapshot_from_row(&stage)) .collect::>(); stages.sort_by_key(|stage| stage.order); let mut result_references = ctx .db .ai_result_reference() .iter() .filter(|reference| reference.task_id == row.task_id) .map(|reference| build_ai_result_reference_snapshot_from_row(&reference)) .collect::>(); result_references.sort_by_key(|reference| reference.created_at_micros); AiTaskSnapshot { task_id: row.task_id.clone(), task_kind: row.task_kind, owner_user_id: row.owner_user_id.clone(), request_label: row.request_label.clone(), source_module: row.source_module.clone(), source_entity_id: row.source_entity_id.clone(), request_payload_json: row.request_payload_json.clone(), status: row.status, failure_message: row.failure_message.clone(), stages, result_references, latest_text_output: row.latest_text_output.clone(), latest_structured_payload_json: row.latest_structured_payload_json.clone(), version: row.version, created_at_micros: row.created_at.to_micros_since_unix_epoch(), started_at_micros: row .started_at .map(|value| value.to_micros_since_unix_epoch()), completed_at_micros: row .completed_at .map(|value| value.to_micros_since_unix_epoch()), updated_at_micros: row.updated_at.to_micros_since_unix_epoch(), } } fn build_ai_task_stage_row(task_id: &str, snapshot: &AiTaskStageSnapshot) -> AiTaskStage { AiTaskStage { task_stage_id: generate_ai_task_stage_id(task_id, snapshot.stage_kind), task_id: task_id.to_string(), stage_kind: snapshot.stage_kind, label: snapshot.label.clone(), detail: snapshot.detail.clone(), stage_order: snapshot.order, status: snapshot.status, text_output: snapshot.text_output.clone(), structured_payload_json: snapshot.structured_payload_json.clone(), warning_messages: snapshot.warning_messages.clone(), started_at: snapshot .started_at_micros .map(Timestamp::from_micros_since_unix_epoch), completed_at: snapshot .completed_at_micros .map(Timestamp::from_micros_since_unix_epoch), } } fn build_ai_task_stage_snapshot_from_row(row: &AiTaskStage) -> AiTaskStageSnapshot { AiTaskStageSnapshot { stage_kind: row.stage_kind, label: row.label.clone(), detail: row.detail.clone(), order: row.stage_order, status: row.status, text_output: row.text_output.clone(), structured_payload_json: row.structured_payload_json.clone(), warning_messages: row.warning_messages.clone(), started_at_micros: row .started_at .map(|value| value.to_micros_since_unix_epoch()), completed_at_micros: row .completed_at .map(|value| value.to_micros_since_unix_epoch()), } } fn build_ai_text_chunk_row(snapshot: &AiTextChunkSnapshot) -> AiTextChunk { AiTextChunk { text_chunk_row_id: format!( "{}{}_{}_{}", AI_TEXT_CHUNK_ID_PREFIX, snapshot.task_id, snapshot.stage_kind.as_str(), snapshot.sequence ), chunk_id: snapshot.chunk_id.clone(), task_id: snapshot.task_id.clone(), stage_kind: snapshot.stage_kind, sequence: snapshot.sequence, delta_text: snapshot.delta_text.clone(), created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros), } } fn build_ai_text_chunk_snapshot_from_row(row: &AiTextChunk) -> AiTextChunkSnapshot { AiTextChunkSnapshot { chunk_id: row.chunk_id.clone(), task_id: row.task_id.clone(), stage_kind: row.stage_kind, sequence: row.sequence, delta_text: row.delta_text.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), } } fn build_ai_result_reference_row(snapshot: &AiResultReferenceSnapshot) -> AiResultReference { AiResultReference { result_reference_row_id: format!( "{}{}_{}", AI_RESULT_REF_ID_PREFIX, snapshot.task_id, snapshot.result_ref_id ), result_ref_id: snapshot.result_ref_id.clone(), task_id: snapshot.task_id.clone(), reference_kind: snapshot.reference_kind, reference_id: snapshot.reference_id.clone(), label: snapshot.label.clone(), created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros), } } fn build_ai_result_reference_snapshot_from_row( row: &AiResultReference, ) -> AiResultReferenceSnapshot { AiResultReferenceSnapshot { result_ref_id: row.result_ref_id.clone(), task_id: row.task_id.clone(), reference_kind: row.reference_kind, reference_id: row.reference_id.clone(), label: row.label.clone(), created_at_micros: row.created_at.to_micros_since_unix_epoch(), } }