推进 server-rs DDD 分层与新接口接线

This commit is contained in:
Codex
2026-04-29 15:46:16 +08:00
parent 9d3fcfae77
commit f82775b852
89 changed files with 3657 additions and 9636 deletions

View File

@@ -1,4 +1,400 @@
//! AI 应用编排过渡落位。
//!
//! 这里仅返回纯应用结果或领域事件;真实 LLM 调用继续留在 `platform-llm`
//! 与 `api-server` 编排层。
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use shared_kernel::normalize_required_string;
use crate::commands::validate_task_create_input;
use crate::{
AiResultReferenceKind, AiResultReferenceSnapshot, AiStageCompletionInput, AiTaskCreateInput,
AiTaskFieldError, AiTaskServiceError, AiTaskSnapshot, AiTaskStageSnapshot, AiTaskStageStatus,
AiTaskStatus, AiTextChunkSnapshot, INITIAL_AI_TASK_VERSION, generate_ai_result_ref_id,
generate_ai_text_chunk_id, normalize_optional_text, normalize_string_list,
};
use serde::{Deserialize, Serialize};
#[cfg(feature = "spacetime-types")]
use spacetimedb::SpacetimeType;
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskProcedureResult {
pub ok: bool,
pub task: Option<AiTaskSnapshot>,
pub text_chunk: Option<AiTextChunkSnapshot>,
pub error_message: Option<String>,
}
#[derive(Clone, Debug, Default)]
pub struct InMemoryAiTaskStore {
inner: Arc<Mutex<InMemoryAiTaskStoreState>>,
}
#[derive(Debug, Default)]
struct InMemoryAiTaskStoreState {
tasks: HashMap<String, AiTaskSnapshot>,
text_chunks: HashMap<String, Vec<AiTextChunkSnapshot>>,
}
#[derive(Clone, Debug)]
pub struct AiTaskService {
store: InMemoryAiTaskStore,
}
impl AiTaskService {
pub fn new(store: InMemoryAiTaskStore) -> Self {
Self { store }
}
pub fn create_task(
&self,
input: AiTaskCreateInput,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
validate_task_create_input(&input).map_err(AiTaskServiceError::Field)?;
let snapshot = AiTaskSnapshot {
task_id: input.task_id.clone(),
task_kind: input.task_kind,
owner_user_id: normalize_required_string(input.owner_user_id).unwrap_or_default(),
request_label: normalize_required_string(input.request_label).unwrap_or_default(),
source_module: normalize_required_string(input.source_module).unwrap_or_default(),
source_entity_id: normalize_optional_text(input.source_entity_id),
request_payload_json: normalize_optional_text(input.request_payload_json),
status: AiTaskStatus::Pending,
failure_message: None,
stages: input
.stages
.into_iter()
.map(|stage| AiTaskStageSnapshot {
stage_kind: stage.stage_kind,
label: normalize_required_string(stage.label).unwrap_or_default(),
detail: normalize_required_string(stage.detail).unwrap_or_default(),
order: stage.order,
status: AiTaskStageStatus::Pending,
text_output: None,
structured_payload_json: None,
warning_messages: Vec::new(),
started_at_micros: None,
completed_at_micros: None,
})
.collect(),
result_references: Vec::new(),
latest_text_output: None,
latest_structured_payload_json: None,
version: INITIAL_AI_TASK_VERSION,
created_at_micros: input.created_at_micros,
started_at_micros: None,
completed_at_micros: None,
updated_at_micros: input.created_at_micros,
};
self.store.insert_task(snapshot)
}
pub fn start_task(
&self,
task_id: &str,
started_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
task.status = AiTaskStatus::Running;
task.started_at_micros.get_or_insert(started_at_micros);
task.updated_at_micros = started_at_micros;
task.version += 1;
Ok(())
})
}
pub fn start_stage(
&self,
task_id: &str,
stage_kind: crate::AiTaskStageKind,
started_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
task.status = AiTaskStatus::Running;
task.started_at_micros.get_or_insert(started_at_micros);
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
stage.status = AiTaskStageStatus::Running;
stage.started_at_micros.get_or_insert(started_at_micros);
task.updated_at_micros = started_at_micros;
task.version += 1;
Ok(())
})
}
pub fn append_text_chunk(
&self,
task_id: &str,
stage_kind: crate::AiTaskStageKind,
sequence: u32,
delta_text: String,
created_at_micros: i64,
) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), AiTaskServiceError> {
if delta_text.trim().is_empty() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::MissingChunkText,
));
}
if sequence == 0 {
return Err(AiTaskServiceError::Field(AiTaskFieldError::InvalidSequence));
}
let chunk = AiTextChunkSnapshot {
chunk_id: generate_ai_text_chunk_id(created_at_micros, sequence),
task_id: normalize_required_string(task_id).unwrap_or_default(),
stage_kind,
sequence,
delta_text: normalize_required_string(delta_text).unwrap_or_default(),
created_at_micros,
};
let task = self.store.append_text_chunk(chunk.clone())?;
Ok((task, chunk))
}
pub fn complete_stage(
&self,
input: AiStageCompletionInput,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(&input.task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
stage.status = AiTaskStageStatus::Completed;
stage.completed_at_micros = Some(input.completed_at_micros);
stage.text_output = normalize_optional_text(input.text_output.clone());
stage.structured_payload_json =
normalize_optional_text(input.structured_payload_json.clone());
stage.warning_messages = normalize_string_list(input.warning_messages.clone());
task.latest_text_output = stage.text_output.clone();
task.latest_structured_payload_json = stage.structured_payload_json.clone();
task.updated_at_micros = input.completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn attach_result_reference(
&self,
task_id: &str,
reference_kind: AiResultReferenceKind,
reference_id: String,
label: Option<String>,
created_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let Some(reference_id) = normalize_required_string(reference_id) else {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::MissingReferenceId,
));
};
self.store.update_task(task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
task.result_references.push(AiResultReferenceSnapshot {
result_ref_id: generate_ai_result_ref_id(created_at_micros),
task_id: task.task_id.clone(),
reference_kind,
reference_id: reference_id.clone(),
label: normalize_optional_text(label.clone()),
created_at_micros,
});
task.updated_at_micros = created_at_micros;
task.version += 1;
Ok(())
})
}
pub fn complete_task(
&self,
task_id: &str,
completed_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
task.status = AiTaskStatus::Completed;
task.completed_at_micros = Some(completed_at_micros);
task.updated_at_micros = completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn fail_task(
&self,
task_id: &str,
failure_message: String,
completed_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let Some(failure_message) = normalize_required_string(failure_message) else {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::MissingFailureMessage,
));
};
self.store.update_task(task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
task.status = AiTaskStatus::Failed;
task.failure_message = Some(failure_message.clone());
task.completed_at_micros = Some(completed_at_micros);
task.updated_at_micros = completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn cancel_task(
&self,
task_id: &str,
completed_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
ensure_task_is_not_terminal(task.status)?;
task.status = AiTaskStatus::Cancelled;
task.completed_at_micros = Some(completed_at_micros);
task.updated_at_micros = completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn get_task(&self, task_id: &str) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.get_task(task_id)
}
}
impl InMemoryAiTaskStore {
fn insert_task(&self, task: AiTaskSnapshot) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let mut state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
if state.tasks.contains_key(&task.task_id) {
return Err(AiTaskServiceError::TaskAlreadyExists);
}
state.text_chunks.insert(task.task_id.clone(), Vec::new());
state.tasks.insert(task.task_id.clone(), task.clone());
Ok(task)
}
fn update_task<F>(
&self,
task_id: &str,
mut apply: F,
) -> Result<AiTaskSnapshot, AiTaskServiceError>
where
F: FnMut(&mut AiTaskSnapshot) -> Result<(), AiTaskServiceError>,
{
let mut state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
let task = state
.tasks
.get_mut(task_id.trim())
.ok_or(AiTaskServiceError::TaskNotFound)?;
apply(task)?;
Ok(task.clone())
}
fn append_text_chunk(
&self,
chunk: AiTextChunkSnapshot,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let mut state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
{
let task = state
.tasks
.get_mut(&chunk.task_id)
.ok_or(AiTaskServiceError::TaskNotFound)?;
ensure_task_is_not_terminal(task.status)?;
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == chunk.stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
if stage.status == AiTaskStageStatus::Pending {
stage.status = AiTaskStageStatus::Running;
stage.started_at_micros = Some(chunk.created_at_micros);
}
task.status = AiTaskStatus::Running;
task.started_at_micros
.get_or_insert(chunk.created_at_micros);
}
let chunks = state
.text_chunks
.get_mut(&chunk.task_id)
.ok_or(AiTaskServiceError::TaskNotFound)?;
chunks.push(chunk.clone());
chunks.sort_by_key(|value| value.sequence);
let aggregated_text = chunks
.iter()
.filter(|value| value.stage_kind == chunk.stage_kind)
.map(|value| value.delta_text.as_str())
.collect::<Vec<_>>()
.join("");
let normalized_output = if aggregated_text.trim().is_empty() {
None
} else {
Some(aggregated_text)
};
let task = state
.tasks
.get_mut(&chunk.task_id)
.ok_or(AiTaskServiceError::TaskNotFound)?;
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == chunk.stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
stage.text_output = normalized_output.clone();
task.latest_text_output = normalized_output;
task.updated_at_micros = chunk.created_at_micros;
task.version += 1;
Ok(task.clone())
}
fn get_task(&self, task_id: &str) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
state
.tasks
.get(task_id.trim())
.cloned()
.ok_or(AiTaskServiceError::TaskNotFound)
}
}
fn ensure_task_is_not_terminal(status: AiTaskStatus) -> Result<(), AiTaskServiceError> {
if status.is_terminal() {
Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
))
} else {
Ok(())
}
}

View File

@@ -1,4 +1,125 @@
//! AI 写入命令过渡落位。
//!
//! 只描述创建任务、推进阶段、追加文本片段和挂接结果引用等用例输入,
//! 不承载外部模型请求或持久化细节。
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use shared_kernel::normalize_required_string;
#[cfg(feature = "spacetime-types")]
use spacetimedb::SpacetimeType;
use crate::{
AiResultReferenceKind, AiTaskFieldError, AiTaskKind, AiTaskStageBlueprint, AiTaskStageKind,
};
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskCreateInput {
pub task_id: String,
pub task_kind: AiTaskKind,
pub owner_user_id: String,
pub request_label: String,
pub source_module: String,
pub source_entity_id: Option<String>,
pub request_payload_json: Option<String>,
pub stages: Vec<AiTaskStageBlueprint>,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStartInput {
pub task_id: String,
pub started_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStageStartInput {
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub started_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTextChunkAppendInput {
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub sequence: u32,
pub delta_text: String,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiStageCompletionInput {
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub text_output: Option<String>,
pub structured_payload_json: Option<String>,
pub warning_messages: Vec<String>,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiResultReferenceInput {
pub task_id: String,
pub reference_kind: AiResultReferenceKind,
pub reference_id: String,
pub label: Option<String>,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskFinishInput {
pub task_id: String,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskCancelInput {
pub task_id: String,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskFailureInput {
pub task_id: String,
pub failure_message: String,
pub completed_at_micros: i64,
}
pub fn validate_task_create_input(input: &AiTaskCreateInput) -> Result<(), AiTaskFieldError> {
if normalize_required_string(&input.task_id).is_none() {
return Err(AiTaskFieldError::MissingTaskId);
}
if normalize_required_string(&input.owner_user_id).is_none() {
return Err(AiTaskFieldError::MissingOwnerUserId);
}
if normalize_required_string(&input.request_label).is_none() {
return Err(AiTaskFieldError::MissingRequestLabel);
}
if normalize_required_string(&input.source_module).is_none() {
return Err(AiTaskFieldError::MissingSourceModule);
}
if input.stages.is_empty() {
return Err(AiTaskFieldError::MissingStageBlueprints);
}
let mut seen = HashMap::new();
for stage in &input.stages {
if normalize_required_string(&stage.label).is_none()
|| normalize_required_string(&stage.detail).is_none()
{
return Err(AiTaskFieldError::MissingStageBlueprints);
}
if seen.insert(stage.stage_kind, true).is_some() {
return Err(AiTaskFieldError::DuplicateStageBlueprint);
}
}
Ok(())
}

View File

@@ -1,4 +1,239 @@
//! AI 领域模型过渡落位。
//!
//! 当前历史实现仍在 `lib.rs`。后续迁移 `AiTask`、阶段、流式片段和结果引用时,
//! 只能放入纯领域类型与状态迁移,不能引入 LLM、HTTP 或 SpacetimeDB adapter。
use serde::{Deserialize, Serialize};
use shared_kernel::{
build_prefixed_seed_id, normalize_optional_string as normalize_shared_optional_string,
normalize_string_list as normalize_shared_string_list,
};
#[cfg(feature = "spacetime-types")]
use spacetimedb::SpacetimeType;
pub const AI_TASK_ID_PREFIX: &str = "aitask_";
pub const AI_TASK_STAGE_ID_PREFIX: &str = "aistage_";
pub const AI_RESULT_REF_ID_PREFIX: &str = "aires_";
pub const AI_TEXT_CHUNK_ID_PREFIX: &str = "aichunk_";
pub const INITIAL_AI_TASK_VERSION: u32 = 1;
// AI 编排类型与当前正式运行时主链保持一致,具体 prompt 策略留给上层模块。
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiTaskKind {
StoryGeneration,
CharacterChat,
NpcChat,
CustomWorldGeneration,
QuestIntent,
RuntimeItemIntent,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiTaskStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum AiTaskStageKind {
PreparePrompt,
RequestModel,
RepairResponse,
NormalizeResult,
PersistResult,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiTaskStageStatus {
Pending,
Running,
Completed,
Skipped,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiResultReferenceKind {
StorySession,
StoryEvent,
CustomWorldProfile,
QuestRecord,
RuntimeItemRecord,
AssetObject,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStageBlueprint {
pub stage_kind: AiTaskStageKind,
pub label: String,
pub detail: String,
pub order: u32,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStageSnapshot {
pub stage_kind: AiTaskStageKind,
pub label: String,
pub detail: String,
pub order: u32,
pub status: AiTaskStageStatus,
pub text_output: Option<String>,
pub structured_payload_json: Option<String>,
pub warning_messages: Vec<String>,
pub started_at_micros: Option<i64>,
pub completed_at_micros: Option<i64>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskSnapshot {
pub task_id: String,
pub task_kind: AiTaskKind,
pub owner_user_id: String,
pub request_label: String,
pub source_module: String,
pub source_entity_id: Option<String>,
pub request_payload_json: Option<String>,
pub status: AiTaskStatus,
pub failure_message: Option<String>,
pub stages: Vec<AiTaskStageSnapshot>,
pub result_references: Vec<AiResultReferenceSnapshot>,
pub latest_text_output: Option<String>,
pub latest_structured_payload_json: Option<String>,
pub version: u32,
pub created_at_micros: i64,
pub started_at_micros: Option<i64>,
pub completed_at_micros: Option<i64>,
pub updated_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTextChunkSnapshot {
pub chunk_id: String,
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub sequence: u32,
pub delta_text: String,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiResultReferenceSnapshot {
pub result_ref_id: String,
pub task_id: String,
pub reference_kind: AiResultReferenceKind,
pub reference_id: String,
pub label: Option<String>,
pub created_at_micros: i64,
}
impl AiTaskKind {
pub fn default_stage_blueprints(self) -> Vec<AiTaskStageBlueprint> {
let ordered_kinds = match self {
Self::StoryGeneration => vec![
AiTaskStageKind::PreparePrompt,
AiTaskStageKind::RequestModel,
AiTaskStageKind::RepairResponse,
AiTaskStageKind::NormalizeResult,
],
Self::CharacterChat | Self::NpcChat | Self::QuestIntent | Self::RuntimeItemIntent => {
vec![
AiTaskStageKind::PreparePrompt,
AiTaskStageKind::RequestModel,
AiTaskStageKind::NormalizeResult,
]
}
Self::CustomWorldGeneration => vec![
AiTaskStageKind::PreparePrompt,
AiTaskStageKind::RequestModel,
AiTaskStageKind::RepairResponse,
AiTaskStageKind::NormalizeResult,
AiTaskStageKind::PersistResult,
],
};
ordered_kinds
.into_iter()
.enumerate()
.map(|(index, stage_kind)| AiTaskStageBlueprint {
stage_kind,
label: stage_kind.default_label().to_string(),
detail: stage_kind.default_detail().to_string(),
order: index as u32,
})
.collect()
}
}
impl AiTaskStageKind {
pub fn as_str(self) -> &'static str {
match self {
Self::PreparePrompt => "prepare_prompt",
Self::RequestModel => "request_model",
Self::RepairResponse => "repair_response",
Self::NormalizeResult => "normalize_result",
Self::PersistResult => "persist_result",
}
}
pub fn default_label(self) -> &'static str {
match self {
Self::PreparePrompt => "整理提示词",
Self::RequestModel => "请求模型",
Self::RepairResponse => "修复响应",
Self::NormalizeResult => "归一结果",
Self::PersistResult => "回写结果",
}
}
pub fn default_detail(self) -> &'static str {
match self {
Self::PreparePrompt => "整理输入上下文并构建本轮提示词。",
Self::RequestModel => "向上游模型发起正式推理请求。",
Self::RepairResponse => "对非严格输出做补救修复或二次编排。",
Self::NormalizeResult => "把模型输出归一成模块可消费结构。",
Self::PersistResult => "把结果引用或聚合状态回写到下游模块。",
}
}
}
impl AiTaskStatus {
pub fn is_terminal(self) -> bool {
matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
}
}
pub fn generate_ai_task_id(seed_micros: i64) -> String {
build_prefixed_seed_id(AI_TASK_ID_PREFIX, seed_micros)
}
pub fn generate_ai_task_stage_id(task_id: &str, stage_kind: AiTaskStageKind) -> String {
format!(
"{}{}_{}",
AI_TASK_STAGE_ID_PREFIX,
task_id.trim(),
stage_kind.as_str()
)
}
pub fn generate_ai_result_ref_id(seed_micros: i64) -> String {
build_prefixed_seed_id(AI_RESULT_REF_ID_PREFIX, seed_micros)
}
pub fn generate_ai_text_chunk_id(seed_micros: i64, sequence: u32) -> String {
format!("{}{seed_micros:x}_{sequence:x}", AI_TEXT_CHUNK_ID_PREFIX)
}
pub fn normalize_optional_text(value: Option<String>) -> Option<String> {
normalize_shared_optional_string(value)
}
pub fn normalize_string_list(values: Vec<String>) -> Vec<String> {
normalize_shared_string_list(values)
}

View File

@@ -1,3 +1,61 @@
//! AI 领域错误过渡落位。
//!
//! 错误必须可被 HTTP adapter 和 SpacetimeDB adapter 显式映射,不能直接绑定状态码。
use std::{error::Error, fmt};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AiTaskFieldError {
MissingTaskId,
MissingOwnerUserId,
MissingRequestLabel,
MissingSourceModule,
MissingStageBlueprints,
DuplicateStageBlueprint,
MissingReferenceId,
MissingChunkText,
InvalidSequence,
MissingFailureMessage,
MissingStage,
InvalidTaskState,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AiTaskServiceError {
Field(AiTaskFieldError),
TaskAlreadyExists,
TaskNotFound,
StageNotFound,
Store(String),
}
impl fmt::Display for AiTaskFieldError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MissingTaskId => f.write_str("ai_task.task_id 不能为空"),
Self::MissingOwnerUserId => f.write_str("ai_task.owner_user_id 不能为空"),
Self::MissingRequestLabel => f.write_str("ai_task.request_label 不能为空"),
Self::MissingSourceModule => f.write_str("ai_task.source_module 不能为空"),
Self::MissingStageBlueprints => f.write_str("ai_task.stages 至少需要一个有效阶段"),
Self::DuplicateStageBlueprint => f.write_str("ai_task.stages 不能包含重复阶段"),
Self::MissingReferenceId => f.write_str("ai_result_reference.reference_id 不能为空"),
Self::MissingChunkText => f.write_str("ai_text_chunk.delta_text 不能为空"),
Self::InvalidSequence => f.write_str("ai_text_chunk.sequence 必须大于 0"),
Self::MissingFailureMessage => f.write_str("ai_task.failure_message 不能为空"),
Self::MissingStage => f.write_str("ai_task.stage 不存在"),
Self::InvalidTaskState => f.write_str("当前 ai_task 状态不允许执行该操作"),
}
}
}
impl Error for AiTaskFieldError {}
impl fmt::Display for AiTaskServiceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Field(error) => write!(f, "{error}"),
Self::TaskAlreadyExists => f.write_str("ai_task 已存在,不能重复创建"),
Self::TaskNotFound => f.write_str("ai_task 不存在"),
Self::StageNotFound => f.write_str("ai_task.stage 不存在"),
Self::Store(message) => f.write_str(message),
}
}
}
impl Error for AiTaskServiceError {}

View File

@@ -1,3 +1,32 @@
//! AI 领域事件过渡落位。
//!
//! 用于表达任务开始、阶段完成、任务失败和结果引用挂接等跨上下文事实。
use crate::{
AiResultReferenceKind, AiTaskKind, AiTaskStageKind, AiTaskStatus, AiTextChunkSnapshot,
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AiTaskDomainEvent {
TaskCreated {
task_id: String,
task_kind: AiTaskKind,
owner_user_id: String,
},
TaskStatusChanged {
task_id: String,
status: AiTaskStatus,
},
StageStarted {
task_id: String,
stage_kind: AiTaskStageKind,
},
StageCompleted {
task_id: String,
stage_kind: AiTaskStageKind,
},
TextChunkAppended {
chunk: AiTextChunkSnapshot,
},
ResultReferenceAttached {
task_id: String,
reference_kind: AiResultReferenceKind,
reference_id: String,
},
}

View File

@@ -4,832 +4,22 @@ mod domain;
mod errors;
mod events;
use std::{
collections::HashMap,
error::Error,
fmt,
sync::{Arc, Mutex},
pub use application::{AiTaskProcedureResult, AiTaskService, InMemoryAiTaskStore};
pub use commands::{
AiResultReferenceInput, AiStageCompletionInput, AiTaskCancelInput, AiTaskCreateInput,
AiTaskFailureInput, AiTaskFinishInput, AiTaskStageStartInput, AiTaskStartInput,
AiTextChunkAppendInput, validate_task_create_input,
};
use serde::{Deserialize, Serialize};
use shared_kernel::{
build_prefixed_seed_id, normalize_optional_string as normalize_shared_optional_string,
normalize_required_string, normalize_string_list as normalize_shared_string_list,
pub use domain::{
AI_RESULT_REF_ID_PREFIX, AI_TASK_ID_PREFIX, AI_TASK_STAGE_ID_PREFIX, AI_TEXT_CHUNK_ID_PREFIX,
AiResultReferenceKind, AiResultReferenceSnapshot, AiTaskKind, AiTaskSnapshot,
AiTaskStageBlueprint, AiTaskStageKind, AiTaskStageSnapshot, AiTaskStageStatus, AiTaskStatus,
AiTextChunkSnapshot, INITIAL_AI_TASK_VERSION, generate_ai_result_ref_id, generate_ai_task_id,
generate_ai_task_stage_id, generate_ai_text_chunk_id, normalize_optional_text,
normalize_string_list,
};
#[cfg(feature = "spacetime-types")]
use spacetimedb::SpacetimeType;
pub const AI_TASK_ID_PREFIX: &str = "aitask_";
pub const AI_TASK_STAGE_ID_PREFIX: &str = "aistage_";
pub const AI_RESULT_REF_ID_PREFIX: &str = "aires_";
pub const AI_TEXT_CHUNK_ID_PREFIX: &str = "aichunk_";
pub const INITIAL_AI_TASK_VERSION: u32 = 1;
// AI 编排类型与当前 Node 正式运行时主链保持一致,避免后续接线时重新发明命名。
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiTaskKind {
StoryGeneration,
CharacterChat,
NpcChat,
CustomWorldGeneration,
QuestIntent,
RuntimeItemIntent,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiTaskStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum AiTaskStageKind {
PreparePrompt,
RequestModel,
RepairResponse,
NormalizeResult,
PersistResult,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiTaskStageStatus {
Pending,
Running,
Completed,
Skipped,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AiResultReferenceKind {
StorySession,
StoryEvent,
CustomWorldProfile,
QuestRecord,
RuntimeItemRecord,
AssetObject,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStageBlueprint {
pub stage_kind: AiTaskStageKind,
pub label: String,
pub detail: String,
pub order: u32,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStageSnapshot {
pub stage_kind: AiTaskStageKind,
pub label: String,
pub detail: String,
pub order: u32,
pub status: AiTaskStageStatus,
pub text_output: Option<String>,
pub structured_payload_json: Option<String>,
pub warning_messages: Vec<String>,
pub started_at_micros: Option<i64>,
pub completed_at_micros: Option<i64>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskCreateInput {
pub task_id: String,
pub task_kind: AiTaskKind,
pub owner_user_id: String,
pub request_label: String,
pub source_module: String,
pub source_entity_id: Option<String>,
pub request_payload_json: Option<String>,
pub stages: Vec<AiTaskStageBlueprint>,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStartInput {
pub task_id: String,
pub started_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskStageStartInput {
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub started_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskSnapshot {
pub task_id: String,
pub task_kind: AiTaskKind,
pub owner_user_id: String,
pub request_label: String,
pub source_module: String,
pub source_entity_id: Option<String>,
pub request_payload_json: Option<String>,
pub status: AiTaskStatus,
pub failure_message: Option<String>,
pub stages: Vec<AiTaskStageSnapshot>,
pub result_references: Vec<AiResultReferenceSnapshot>,
pub latest_text_output: Option<String>,
pub latest_structured_payload_json: Option<String>,
pub version: u32,
pub created_at_micros: i64,
pub started_at_micros: Option<i64>,
pub completed_at_micros: Option<i64>,
pub updated_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTextChunkSnapshot {
pub chunk_id: String,
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub sequence: u32,
pub delta_text: String,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTextChunkAppendInput {
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub sequence: u32,
pub delta_text: String,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiStageCompletionInput {
pub task_id: String,
pub stage_kind: AiTaskStageKind,
pub text_output: Option<String>,
pub structured_payload_json: Option<String>,
pub warning_messages: Vec<String>,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiResultReferenceInput {
pub task_id: String,
pub reference_kind: AiResultReferenceKind,
pub reference_id: String,
pub label: Option<String>,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiResultReferenceSnapshot {
pub result_ref_id: String,
pub task_id: String,
pub reference_kind: AiResultReferenceKind,
pub reference_id: String,
pub label: Option<String>,
pub created_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskFinishInput {
pub task_id: String,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskCancelInput {
pub task_id: String,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskFailureInput {
pub task_id: String,
pub failure_message: String,
pub completed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AiTaskProcedureResult {
pub ok: bool,
pub task: Option<AiTaskSnapshot>,
pub text_chunk: Option<AiTextChunkSnapshot>,
pub error_message: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AiTaskFieldError {
MissingTaskId,
MissingOwnerUserId,
MissingRequestLabel,
MissingSourceModule,
MissingStageBlueprints,
DuplicateStageBlueprint,
MissingReferenceId,
MissingChunkText,
InvalidSequence,
MissingFailureMessage,
MissingStage,
InvalidTaskState,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AiTaskServiceError {
Field(AiTaskFieldError),
TaskAlreadyExists,
TaskNotFound,
StageNotFound,
Store(String),
}
#[derive(Clone, Debug, Default)]
pub struct InMemoryAiTaskStore {
inner: Arc<Mutex<InMemoryAiTaskStoreState>>,
}
#[derive(Debug, Default)]
struct InMemoryAiTaskStoreState {
tasks: HashMap<String, AiTaskSnapshot>,
text_chunks: HashMap<String, Vec<AiTextChunkSnapshot>>,
}
#[derive(Clone, Debug)]
pub struct AiTaskService {
store: InMemoryAiTaskStore,
}
impl AiTaskKind {
// 默认阶段蓝图只冻结通用语义,具体 prompt 内容与供应商策略仍由上层模块决定。
pub fn default_stage_blueprints(self) -> Vec<AiTaskStageBlueprint> {
let ordered_kinds = match self {
Self::StoryGeneration => vec![
AiTaskStageKind::PreparePrompt,
AiTaskStageKind::RequestModel,
AiTaskStageKind::RepairResponse,
AiTaskStageKind::NormalizeResult,
],
Self::CharacterChat | Self::NpcChat | Self::QuestIntent | Self::RuntimeItemIntent => {
vec![
AiTaskStageKind::PreparePrompt,
AiTaskStageKind::RequestModel,
AiTaskStageKind::NormalizeResult,
]
}
Self::CustomWorldGeneration => vec![
AiTaskStageKind::PreparePrompt,
AiTaskStageKind::RequestModel,
AiTaskStageKind::RepairResponse,
AiTaskStageKind::NormalizeResult,
AiTaskStageKind::PersistResult,
],
};
ordered_kinds
.into_iter()
.enumerate()
.map(|(index, stage_kind)| AiTaskStageBlueprint {
stage_kind,
label: stage_kind.default_label().to_string(),
detail: stage_kind.default_detail().to_string(),
order: index as u32,
})
.collect()
}
}
impl AiTaskStageKind {
pub fn as_str(self) -> &'static str {
match self {
Self::PreparePrompt => "prepare_prompt",
Self::RequestModel => "request_model",
Self::RepairResponse => "repair_response",
Self::NormalizeResult => "normalize_result",
Self::PersistResult => "persist_result",
}
}
pub fn default_label(self) -> &'static str {
match self {
Self::PreparePrompt => "整理提示词",
Self::RequestModel => "请求模型",
Self::RepairResponse => "修复响应",
Self::NormalizeResult => "归一结果",
Self::PersistResult => "回写结果",
}
}
pub fn default_detail(self) -> &'static str {
match self {
Self::PreparePrompt => "整理输入上下文并构建本轮提示词。",
Self::RequestModel => "向上游模型发起正式推理请求。",
Self::RepairResponse => "对非严格输出做补救修复或二次编排。",
Self::NormalizeResult => "把模型输出归一成模块可消费结构。",
Self::PersistResult => "把结果引用或聚合状态回写到下游模块。",
}
}
}
impl AiTaskStatus {
fn is_terminal(self) -> bool {
matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
}
}
impl AiTaskService {
pub fn new(store: InMemoryAiTaskStore) -> Self {
Self { store }
}
pub fn create_task(
&self,
input: AiTaskCreateInput,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
validate_task_create_input(&input).map_err(AiTaskServiceError::Field)?;
let snapshot = AiTaskSnapshot {
task_id: input.task_id.clone(),
task_kind: input.task_kind,
owner_user_id: normalize_required_string(input.owner_user_id).unwrap_or_default(),
request_label: normalize_required_string(input.request_label).unwrap_or_default(),
source_module: normalize_required_string(input.source_module).unwrap_or_default(),
source_entity_id: normalize_optional_text(input.source_entity_id),
request_payload_json: normalize_optional_text(input.request_payload_json),
status: AiTaskStatus::Pending,
failure_message: None,
stages: input
.stages
.into_iter()
.map(|stage| AiTaskStageSnapshot {
stage_kind: stage.stage_kind,
label: normalize_required_string(stage.label).unwrap_or_default(),
detail: normalize_required_string(stage.detail).unwrap_or_default(),
order: stage.order,
status: AiTaskStageStatus::Pending,
text_output: None,
structured_payload_json: None,
warning_messages: Vec::new(),
started_at_micros: None,
completed_at_micros: None,
})
.collect(),
result_references: Vec::new(),
latest_text_output: None,
latest_structured_payload_json: None,
version: INITIAL_AI_TASK_VERSION,
created_at_micros: input.created_at_micros,
started_at_micros: None,
completed_at_micros: None,
updated_at_micros: input.created_at_micros,
};
self.store.insert_task(snapshot)
}
pub fn start_task(
&self,
task_id: &str,
started_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
task.status = AiTaskStatus::Running;
task.started_at_micros.get_or_insert(started_at_micros);
task.updated_at_micros = started_at_micros;
task.version += 1;
Ok(())
})
}
pub fn start_stage(
&self,
task_id: &str,
stage_kind: AiTaskStageKind,
started_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
task.status = AiTaskStatus::Running;
task.started_at_micros.get_or_insert(started_at_micros);
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
stage.status = AiTaskStageStatus::Running;
stage.started_at_micros.get_or_insert(started_at_micros);
task.updated_at_micros = started_at_micros;
task.version += 1;
Ok(())
})
}
pub fn append_text_chunk(
&self,
task_id: &str,
stage_kind: AiTaskStageKind,
sequence: u32,
delta_text: String,
created_at_micros: i64,
) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), AiTaskServiceError> {
if delta_text.trim().is_empty() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::MissingChunkText,
));
}
if sequence == 0 {
return Err(AiTaskServiceError::Field(AiTaskFieldError::InvalidSequence));
}
let chunk = AiTextChunkSnapshot {
chunk_id: generate_ai_text_chunk_id(created_at_micros, sequence),
task_id: normalize_required_string(task_id).unwrap_or_default(),
stage_kind,
sequence,
delta_text: normalize_required_string(delta_text).unwrap_or_default(),
created_at_micros,
};
let task = self.store.append_text_chunk(chunk.clone())?;
Ok((task, chunk))
}
pub fn complete_stage(
&self,
input: AiStageCompletionInput,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(&input.task_id, |task| {
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
stage.status = AiTaskStageStatus::Completed;
stage.completed_at_micros = Some(input.completed_at_micros);
stage.text_output = normalize_optional_text(input.text_output.clone());
stage.structured_payload_json =
normalize_optional_text(input.structured_payload_json.clone());
stage.warning_messages = normalize_string_list(input.warning_messages.clone());
task.latest_text_output = stage.text_output.clone();
task.latest_structured_payload_json = stage.structured_payload_json.clone();
task.updated_at_micros = input.completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn attach_result_reference(
&self,
task_id: &str,
reference_kind: AiResultReferenceKind,
reference_id: String,
label: Option<String>,
created_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let Some(reference_id) = normalize_required_string(reference_id) else {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::MissingReferenceId,
));
};
self.store.update_task(task_id, |task| {
task.result_references.push(AiResultReferenceSnapshot {
result_ref_id: generate_ai_result_ref_id(created_at_micros),
task_id: task.task_id.clone(),
reference_kind,
reference_id: reference_id.clone(),
label: normalize_optional_text(label.clone()),
created_at_micros,
});
task.updated_at_micros = created_at_micros;
task.version += 1;
Ok(())
})
}
pub fn complete_task(
&self,
task_id: &str,
completed_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
task.status = AiTaskStatus::Completed;
task.completed_at_micros = Some(completed_at_micros);
task.updated_at_micros = completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn fail_task(
&self,
task_id: &str,
failure_message: String,
completed_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let Some(failure_message) = normalize_required_string(failure_message) else {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::MissingFailureMessage,
));
};
self.store.update_task(task_id, |task| {
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
task.status = AiTaskStatus::Failed;
task.failure_message = Some(failure_message.clone());
task.completed_at_micros = Some(completed_at_micros);
task.updated_at_micros = completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn cancel_task(
&self,
task_id: &str,
completed_at_micros: i64,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.update_task(task_id, |task| {
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
task.status = AiTaskStatus::Cancelled;
task.completed_at_micros = Some(completed_at_micros);
task.updated_at_micros = completed_at_micros;
task.version += 1;
Ok(())
})
}
pub fn get_task(&self, task_id: &str) -> Result<AiTaskSnapshot, AiTaskServiceError> {
self.store.get_task(task_id)
}
}
impl InMemoryAiTaskStore {
fn insert_task(&self, task: AiTaskSnapshot) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let mut state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
if state.tasks.contains_key(&task.task_id) {
return Err(AiTaskServiceError::TaskAlreadyExists);
}
state.text_chunks.insert(task.task_id.clone(), Vec::new());
state.tasks.insert(task.task_id.clone(), task.clone());
Ok(task)
}
fn update_task<F>(
&self,
task_id: &str,
mut apply: F,
) -> Result<AiTaskSnapshot, AiTaskServiceError>
where
F: FnMut(&mut AiTaskSnapshot) -> Result<(), AiTaskServiceError>,
{
let mut state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
let task = state
.tasks
.get_mut(task_id.trim())
.ok_or(AiTaskServiceError::TaskNotFound)?;
apply(task)?;
Ok(task.clone())
}
fn append_text_chunk(
&self,
chunk: AiTextChunkSnapshot,
) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let mut state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
{
let task = state
.tasks
.get_mut(&chunk.task_id)
.ok_or(AiTaskServiceError::TaskNotFound)?;
if task.status.is_terminal() {
return Err(AiTaskServiceError::Field(
AiTaskFieldError::InvalidTaskState,
));
}
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == chunk.stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
if stage.status == AiTaskStageStatus::Pending {
stage.status = AiTaskStageStatus::Running;
stage.started_at_micros = Some(chunk.created_at_micros);
}
task.status = AiTaskStatus::Running;
task.started_at_micros
.get_or_insert(chunk.created_at_micros);
}
let chunks = state
.text_chunks
.get_mut(&chunk.task_id)
.ok_or(AiTaskServiceError::TaskNotFound)?;
chunks.push(chunk.clone());
chunks.sort_by_key(|value| value.sequence);
let aggregated_text = chunks
.iter()
.filter(|value| value.stage_kind == chunk.stage_kind)
.map(|value| value.delta_text.as_str())
.collect::<Vec<_>>()
.join("");
let normalized_output = if aggregated_text.trim().is_empty() {
None
} else {
Some(aggregated_text)
};
let task = state
.tasks
.get_mut(&chunk.task_id)
.ok_or(AiTaskServiceError::TaskNotFound)?;
let stage = task
.stages
.iter_mut()
.find(|stage| stage.stage_kind == chunk.stage_kind)
.ok_or(AiTaskServiceError::StageNotFound)?;
stage.text_output = normalized_output.clone();
task.latest_text_output = normalized_output;
task.updated_at_micros = chunk.created_at_micros;
task.version += 1;
Ok(task.clone())
}
fn get_task(&self, task_id: &str) -> Result<AiTaskSnapshot, AiTaskServiceError> {
let state = self
.inner
.lock()
.map_err(|_| AiTaskServiceError::Store("AI 任务仓储锁已中毒".to_string()))?;
state
.tasks
.get(task_id.trim())
.cloned()
.ok_or(AiTaskServiceError::TaskNotFound)
}
}
pub fn validate_task_create_input(input: &AiTaskCreateInput) -> Result<(), AiTaskFieldError> {
if normalize_required_string(&input.task_id).is_none() {
return Err(AiTaskFieldError::MissingTaskId);
}
if normalize_required_string(&input.owner_user_id).is_none() {
return Err(AiTaskFieldError::MissingOwnerUserId);
}
if normalize_required_string(&input.request_label).is_none() {
return Err(AiTaskFieldError::MissingRequestLabel);
}
if normalize_required_string(&input.source_module).is_none() {
return Err(AiTaskFieldError::MissingSourceModule);
}
if input.stages.is_empty() {
return Err(AiTaskFieldError::MissingStageBlueprints);
}
let mut seen = HashMap::new();
for stage in &input.stages {
if normalize_required_string(&stage.label).is_none()
|| normalize_required_string(&stage.detail).is_none()
{
return Err(AiTaskFieldError::MissingStageBlueprints);
}
if seen.insert(stage.stage_kind, true).is_some() {
return Err(AiTaskFieldError::DuplicateStageBlueprint);
}
}
Ok(())
}
pub fn generate_ai_task_id(seed_micros: i64) -> String {
build_prefixed_seed_id(AI_TASK_ID_PREFIX, seed_micros)
}
pub fn generate_ai_task_stage_id(task_id: &str, stage_kind: AiTaskStageKind) -> String {
format!(
"{}{}_{}",
AI_TASK_STAGE_ID_PREFIX,
task_id.trim(),
stage_kind.as_str()
)
}
pub fn generate_ai_result_ref_id(seed_micros: i64) -> String {
build_prefixed_seed_id(AI_RESULT_REF_ID_PREFIX, seed_micros)
}
pub fn generate_ai_text_chunk_id(seed_micros: i64, sequence: u32) -> String {
format!("{}{seed_micros:x}_{sequence:x}", AI_TEXT_CHUNK_ID_PREFIX)
}
pub fn normalize_optional_text(value: Option<String>) -> Option<String> {
normalize_shared_optional_string(value)
}
pub fn normalize_string_list(values: Vec<String>) -> Vec<String> {
normalize_shared_string_list(values)
}
impl fmt::Display for AiTaskFieldError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MissingTaskId => f.write_str("ai_task.task_id 不能为空"),
Self::MissingOwnerUserId => f.write_str("ai_task.owner_user_id 不能为空"),
Self::MissingRequestLabel => f.write_str("ai_task.request_label 不能为空"),
Self::MissingSourceModule => f.write_str("ai_task.source_module 不能为空"),
Self::MissingStageBlueprints => f.write_str("ai_task.stages 至少需要一个有效阶段"),
Self::DuplicateStageBlueprint => f.write_str("ai_task.stages 不能包含重复阶段"),
Self::MissingReferenceId => f.write_str("ai_result_reference.reference_id 不能为空"),
Self::MissingChunkText => f.write_str("ai_text_chunk.delta_text 不能为空"),
Self::InvalidSequence => f.write_str("ai_text_chunk.sequence 必须大于 0"),
Self::MissingFailureMessage => f.write_str("ai_task.failure_message 不能为空"),
Self::MissingStage => f.write_str("ai_task.stage 不存在"),
Self::InvalidTaskState => f.write_str("当前 ai_task 状态不允许执行该操作"),
}
}
}
impl Error for AiTaskFieldError {}
impl fmt::Display for AiTaskServiceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Field(error) => write!(f, "{error}"),
Self::TaskAlreadyExists => f.write_str("ai_task 已存在,不能重复创建"),
Self::TaskNotFound => f.write_str("ai_task 不存在"),
Self::StageNotFound => f.write_str("ai_task.stage 不存在"),
Self::Store(message) => f.write_str(message),
}
}
}
impl Error for AiTaskServiceError {}
pub use errors::{AiTaskFieldError, AiTaskServiceError};
pub use events::AiTaskDomainEvent;
#[cfg(test)]
mod tests {