From 1223f597d28b4416ca7299d0a91843ad1af08c31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=89=A9?= <253518756@qq.com> Date: Thu, 23 Apr 2026 00:29:52 +0800 Subject: [PATCH] 1 --- ...ORLD_AGENT_LLM_REPLY_RESTORE_2026-04-22.md | 16 ++ .../crates/api-server/src/custom_world.rs | 176 +++++++++++------- .../api-server/src/custom_world_agent_turn.rs | 5 +- server-rs/crates/spacetime-module/src/lib.rs | 2 +- 4 files changed, 130 insertions(+), 69 deletions(-) diff --git a/docs/technical/CUSTOM_WORLD_AGENT_LLM_REPLY_RESTORE_2026-04-22.md b/docs/technical/CUSTOM_WORLD_AGENT_LLM_REPLY_RESTORE_2026-04-22.md index 8c8a36a3..f26f8d11 100644 --- a/docs/technical/CUSTOM_WORLD_AGENT_LLM_REPLY_RESTORE_2026-04-22.md +++ b/docs/technical/CUSTOM_WORLD_AGENT_LLM_REPLY_RESTORE_2026-04-22.md @@ -177,3 +177,19 @@ SpacetimeDB reducer / procedure 必须保持确定性,因此: 已被真实更新。 5. 提示词正文未被改写。 +## 9. 当前实现状态 + +截至 `2026-04-22`,本方案已在 `server-rs` 完成以下落地: + +1. `submit_custom_world_agent_message` 已改为只提交 user message 与 running operation,不再写死 assistant 回复。 +2. `api-server` 的 Custom World Agent turn service 已恢复: + - 动态状态识别 + - 正式单轮推理 + - `replyText/progressPercent/nextAnchorContent` JSON 解析 + - creator intent / readiness / clarification / suggested action 等最小派生 +3. `/api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 已改为真实 SSE 流: + - 推理进行中实时输出累计 `reply_delta` + - finalize 后输出最新 `session` + - 最后输出 `done` +4. 普通提交接口在 finalize 完成后返回最终 operation 快照,而不是 submit 阶段的 running 快照。 +5. `finalize_custom_world_agent_message_turn` 已负责把 assistant message、session 聚合字段与 operation 最终状态一次性回写到 SpacetimeDB。 diff --git a/server-rs/crates/api-server/src/custom_world.rs b/server-rs/crates/api-server/src/custom_world.rs index dd8cbb92..f817623d 100644 --- a/server-rs/crates/api-server/src/custom_world.rs +++ b/server-rs/crates/api-server/src/custom_world.rs @@ -566,7 +566,7 @@ pub async fn submit_custom_world_agent_message( |_| {}, ) .await; - state + let finalized_operation = state .spacetime_client() .finalize_custom_world_agent_message(build_finalize_record_input( session_id, @@ -584,7 +584,7 @@ pub async fn submit_custom_world_agent_message( Ok(json_success_body( Some(&request_context), json!({ - "operation": map_custom_world_agent_operation_response(operation), + "operation": map_custom_world_agent_operation_response(finalized_operation), }), )) } @@ -651,57 +651,100 @@ pub async fn stream_custom_world_agent_message( .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; - let mut reply_updates = Vec::::new(); - let turn_result = run_custom_world_agent_turn( - CustomWorldAgentTurnRequest { - llm_client: state.llm_client(), - session: &session, - quick_fill_requested: payload.quick_fill_requested.unwrap_or(false), - focus_card_id: payload.focus_card_id.clone(), - }, - |text| { - reply_updates.push(text.to_string()); - }, - ) - .await; - state - .spacetime_client() - .finalize_custom_world_agent_message(build_finalize_record_input( - session_id.clone(), - owner_user_id.clone(), - operation.operation_id.clone(), - format!("assistant-{}", operation.operation_id), - turn_result, - current_utc_micros(), - )) - .await - .map_err(|error| { - custom_world_error_response(&request_context, map_custom_world_client_error(error)) - })?; - let final_session = state - .spacetime_client() - .get_custom_world_agent_session(session_id, owner_user_id) - .await - .map_err(|error| { - custom_world_error_response(&request_context, map_custom_world_client_error(error)) - })?; - let session_response = map_custom_world_agent_session_response(final_session); + let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false); + let focus_card_id = payload.focus_card_id.clone(); + let state = state.clone(); + let session_id_for_stream = session_id.clone(); + let owner_user_id_for_stream = owner_user_id.clone(); + let operation_id = operation.operation_id.clone(); + let stream = async_stream::stream! { + let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::(); + let turn_result = { + let run_turn = run_custom_world_agent_turn( + CustomWorldAgentTurnRequest { + llm_client: state.llm_client(), + session: &session, + quick_fill_requested, + focus_card_id, + }, + move |text| { + let _ = reply_tx.send(text.to_string()); + }, + ); + tokio::pin!(run_turn); - let mut events = reply_updates - .into_iter() - .map(|text| custom_world_sse_json_event("reply_delta", json!({ "text": text }))) - .collect::, _>>() - .map_err(|error| custom_world_error_response(&request_context, error))?; - events.push( - custom_world_sse_json_event("session", json!({ "session": session_response })) - .map_err(|error| custom_world_error_response(&request_context, error))?, - ); - events.push( - custom_world_sse_json_event("done", json!({ "ok": true })) - .map_err(|error| custom_world_error_response(&request_context, error))?, - ); + loop { + tokio::select! { + result = &mut run_turn => break result, + maybe_text = reply_rx.recv() => { + if let Some(text) = maybe_text { + yield Ok::(custom_world_sse_json_event_or_error( + "reply_delta", + json!({ "text": text }), + )); + } + } + } + } + }; - let stream = tokio_stream::iter(events.into_iter().map(|event| Ok::(event))); + while let Some(text) = reply_rx.recv().await { + yield Ok::(custom_world_sse_json_event_or_error( + "reply_delta", + json!({ "text": text }), + )); + } + + let finalize_result = state + .spacetime_client() + .finalize_custom_world_agent_message(build_finalize_record_input( + session_id_for_stream.clone(), + owner_user_id_for_stream.clone(), + operation_id.clone(), + format!("assistant-{operation_id}"), + turn_result, + current_utc_micros(), + )) + .await; + let _finalized_operation = match finalize_result { + Ok(operation) => operation, + Err(error) => { + yield Ok::(custom_world_sse_json_event_or_error( + "error", + json!({ "message": error.to_string() }), + )); + return; + } + }; + + let final_session = match state + .spacetime_client() + .get_custom_world_agent_session( + session_id_for_stream, + owner_user_id_for_stream, + ) + .await + { + Ok(session) => session, + Err(error) => { + yield Ok::(custom_world_sse_json_event_or_error( + "error", + json!({ "message": error.to_string() }), + )); + return; + } + }; + let session_response = map_custom_world_agent_session_response(final_session); + + yield Ok::(custom_world_sse_json_event_or_error( + "session", + json!({ "session": session_response }), + )); + yield Ok::(custom_world_sse_json_event_or_error( + "done", + json!({ "ok": true }), + )); + }; Ok(Sse::new(stream).into_response()) } @@ -1053,21 +1096,6 @@ fn map_custom_world_result_preview_blocker_response( } } -fn resolve_stream_reply_text(session: &CustomWorldAgentSessionSnapshotResponse) -> String { - session - .last_assistant_reply - .clone() - .or_else(|| { - session - .messages - .iter() - .rev() - .find(|message| message.role == "assistant") - .map(|message| message.text.clone()) - }) - .unwrap_or_default() -} - fn map_custom_world_client_error(error: SpacetimeClientError) -> AppError { let status = match &error { SpacetimeClientError::Procedure(message) @@ -1111,6 +1139,22 @@ fn custom_world_sse_json_event(event_name: &str, payload: Value) -> Result Event { + match custom_world_sse_json_event(event_name, payload) { + Ok(event) => event, + Err(_) => custom_world_sse_error_event_message("SSE payload 序列化失败".to_string()), + } +} + +fn custom_world_sse_error_event_message(message: String) -> Event { + let payload = format!( + "{{\"message\":{}}}", + serde_json::to_string(&message) + .unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string()) + ); + Event::default().event("error").data(payload) +} + fn resolve_author_display_name(_authenticated: &AuthenticatedAccessToken) -> String { "玩家".to_string() } diff --git a/server-rs/crates/api-server/src/custom_world_agent_turn.rs b/server-rs/crates/api-server/src/custom_world_agent_turn.rs index d593edf7..fd423c33 100644 --- a/server-rs/crates/api-server/src/custom_world_agent_turn.rs +++ b/server-rs/crates/api-server/src/custom_world_agent_turn.rs @@ -68,6 +68,7 @@ enum PromptConversationMode { } #[derive(Clone, Debug)] +#[allow(dead_code)] struct PromptDynamicState { current_turn: u32, progress_percent: u32, @@ -923,8 +924,8 @@ fn build_prompt_dynamic_state_inference_prompt( } fn build_eight_anchor_single_turn_prompt( - current_turn: u32, - progress_percent: u32, + _current_turn: u32, + _progress_percent: u32, quick_fill_requested: bool, current_anchor_content: &EightAnchorContent, chat_history: &[JsonValue], diff --git a/server-rs/crates/spacetime-module/src/lib.rs b/server-rs/crates/spacetime-module/src/lib.rs index 8e5e7ad6..2224236d 100644 --- a/server-rs/crates/spacetime-module/src/lib.rs +++ b/server-rs/crates/spacetime-module/src/lib.rs @@ -2508,7 +2508,7 @@ fn submit_custom_world_agent_message_tx( return Err("forced failure".to_string()); } - let session = ctx + let _session = ctx .db .custom_world_agent_session() .session_id()