use shared_kernel::normalize_required_string; use crate::commands::validate_task_create_input; use crate::{ AiResultReferenceKind, AiResultReferenceSnapshot, AiStageCompletionInput, AiTaskCreateInput, AiTaskFieldError, AiTaskServiceError, AiTaskSnapshot, AiTaskStageKind, AiTaskStageSnapshot, AiTaskStageStatus, AiTaskStatus, AiTextChunkSnapshot, INITIAL_AI_TASK_VERSION, generate_ai_result_ref_id, generate_ai_text_chunk_id, normalize_optional_text, normalize_string_list, }; use super::{InMemoryAiTaskStore, ensure_task_is_not_terminal}; #[derive(Clone, Debug)] pub struct AiTaskService { store: InMemoryAiTaskStore, } impl AiTaskService { pub fn new(store: InMemoryAiTaskStore) -> Self { Self { store } } pub fn create_task( &self, input: AiTaskCreateInput, ) -> Result { validate_task_create_input(&input).map_err(AiTaskServiceError::Field)?; let snapshot = AiTaskSnapshot { task_id: input.task_id.clone(), task_kind: input.task_kind, owner_user_id: normalize_required_string(input.owner_user_id).unwrap_or_default(), request_label: normalize_required_string(input.request_label).unwrap_or_default(), source_module: normalize_required_string(input.source_module).unwrap_or_default(), source_entity_id: normalize_optional_text(input.source_entity_id), request_payload_json: normalize_optional_text(input.request_payload_json), status: AiTaskStatus::Pending, failure_message: None, stages: input .stages .into_iter() .map(|stage| AiTaskStageSnapshot { stage_kind: stage.stage_kind, label: normalize_required_string(stage.label).unwrap_or_default(), detail: normalize_required_string(stage.detail).unwrap_or_default(), order: stage.order, status: AiTaskStageStatus::Pending, text_output: None, structured_payload_json: None, warning_messages: Vec::new(), started_at_micros: None, completed_at_micros: None, }) .collect(), result_references: Vec::new(), latest_text_output: None, latest_structured_payload_json: None, version: INITIAL_AI_TASK_VERSION, created_at_micros: input.created_at_micros, started_at_micros: None, completed_at_micros: None, updated_at_micros: input.created_at_micros, }; self.store.insert_task(snapshot) } pub fn start_task( &self, task_id: &str, started_at_micros: i64, ) -> Result { self.store.update_task(task_id, |task| { ensure_task_is_not_terminal(task.status)?; task.status = AiTaskStatus::Running; task.started_at_micros.get_or_insert(started_at_micros); task.updated_at_micros = started_at_micros; task.version += 1; Ok(()) }) } pub fn start_stage( &self, task_id: &str, stage_kind: AiTaskStageKind, started_at_micros: i64, ) -> Result { self.store.update_task(task_id, |task| { ensure_task_is_not_terminal(task.status)?; task.status = AiTaskStatus::Running; task.started_at_micros.get_or_insert(started_at_micros); let stage = task .stages .iter_mut() .find(|stage| stage.stage_kind == stage_kind) .ok_or(AiTaskServiceError::StageNotFound)?; stage.status = AiTaskStageStatus::Running; stage.started_at_micros.get_or_insert(started_at_micros); task.updated_at_micros = started_at_micros; task.version += 1; Ok(()) }) } pub fn append_text_chunk( &self, task_id: &str, stage_kind: AiTaskStageKind, sequence: u32, delta_text: String, created_at_micros: i64, ) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), AiTaskServiceError> { if delta_text.trim().is_empty() { return Err(AiTaskServiceError::Field( AiTaskFieldError::MissingChunkText, )); } if sequence == 0 { return Err(AiTaskServiceError::Field(AiTaskFieldError::InvalidSequence)); } let chunk = AiTextChunkSnapshot { chunk_id: generate_ai_text_chunk_id(created_at_micros, sequence), task_id: normalize_required_string(task_id).unwrap_or_default(), stage_kind, sequence, delta_text: normalize_required_string(delta_text).unwrap_or_default(), created_at_micros, }; let task = self.store.append_text_chunk(chunk.clone())?; Ok((task, chunk)) } pub fn complete_stage( &self, input: AiStageCompletionInput, ) -> Result { self.store.update_task(&input.task_id, |task| { ensure_task_is_not_terminal(task.status)?; let stage = task .stages .iter_mut() .find(|stage| stage.stage_kind == input.stage_kind) .ok_or(AiTaskServiceError::StageNotFound)?; stage.status = AiTaskStageStatus::Completed; stage.completed_at_micros = Some(input.completed_at_micros); stage.text_output = normalize_optional_text(input.text_output.clone()); stage.structured_payload_json = normalize_optional_text(input.structured_payload_json.clone()); stage.warning_messages = normalize_string_list(input.warning_messages.clone()); task.latest_text_output = stage.text_output.clone(); task.latest_structured_payload_json = stage.structured_payload_json.clone(); task.updated_at_micros = input.completed_at_micros; task.version += 1; Ok(()) }) } pub fn attach_result_reference( &self, task_id: &str, reference_kind: AiResultReferenceKind, reference_id: String, label: Option, created_at_micros: i64, ) -> Result { let Some(reference_id) = normalize_required_string(reference_id) else { return Err(AiTaskServiceError::Field( AiTaskFieldError::MissingReferenceId, )); }; self.store.update_task(task_id, |task| { ensure_task_is_not_terminal(task.status)?; task.result_references.push(AiResultReferenceSnapshot { result_ref_id: generate_ai_result_ref_id(created_at_micros), task_id: task.task_id.clone(), reference_kind, reference_id: reference_id.clone(), label: normalize_optional_text(label.clone()), created_at_micros, }); task.updated_at_micros = created_at_micros; task.version += 1; Ok(()) }) } pub fn complete_task( &self, task_id: &str, completed_at_micros: i64, ) -> Result { self.store.update_task(task_id, |task| { ensure_task_is_not_terminal(task.status)?; task.status = AiTaskStatus::Completed; task.completed_at_micros = Some(completed_at_micros); task.updated_at_micros = completed_at_micros; task.version += 1; Ok(()) }) } pub fn fail_task( &self, task_id: &str, failure_message: String, completed_at_micros: i64, ) -> Result { let Some(failure_message) = normalize_required_string(failure_message) else { return Err(AiTaskServiceError::Field( AiTaskFieldError::MissingFailureMessage, )); }; self.store.update_task(task_id, |task| { ensure_task_is_not_terminal(task.status)?; task.status = AiTaskStatus::Failed; task.failure_message = Some(failure_message.clone()); task.completed_at_micros = Some(completed_at_micros); task.updated_at_micros = completed_at_micros; task.version += 1; Ok(()) }) } pub fn cancel_task( &self, task_id: &str, completed_at_micros: i64, ) -> Result { self.store.update_task(task_id, |task| { ensure_task_is_not_terminal(task.status)?; task.status = AiTaskStatus::Cancelled; task.completed_at_micros = Some(completed_at_micros); task.updated_at_micros = completed_at_micros; task.version += 1; Ok(()) }) } pub fn get_task(&self, task_id: &str) -> Result { self.store.get_task(task_id) } }