This commit is contained in:
2026-04-23 00:29:52 +08:00
parent cd207dc237
commit 1223f597d2
4 changed files with 130 additions and 69 deletions

View File

@@ -177,3 +177,19 @@ SpacetimeDB reducer / procedure 必须保持确定性,因此:
已被真实更新。 已被真实更新。
5. 提示词正文未被改写。 5. 提示词正文未被改写。
## 9. 当前实现状态
截至 `2026-04-22`,本方案已在 `server-rs` 完成以下落地:
1. `submit_custom_world_agent_message` 已改为只提交 user message 与 running operation不再写死 assistant 回复。
2. `api-server` 的 Custom World Agent turn service 已恢复:
- 动态状态识别
- 正式单轮推理
- `replyText/progressPercent/nextAnchorContent` JSON 解析
- creator intent / readiness / clarification / suggested action 等最小派生
3. `/api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 已改为真实 SSE 流:
- 推理进行中实时输出累计 `reply_delta`
- finalize 后输出最新 `session`
- 最后输出 `done`
4. 普通提交接口在 finalize 完成后返回最终 operation 快照,而不是 submit 阶段的 running 快照。
5. `finalize_custom_world_agent_message_turn` 已负责把 assistant message、session 聚合字段与 operation 最终状态一次性回写到 SpacetimeDB。

View File

@@ -566,7 +566,7 @@ pub async fn submit_custom_world_agent_message(
|_| {}, |_| {},
) )
.await; .await;
state let finalized_operation = state
.spacetime_client() .spacetime_client()
.finalize_custom_world_agent_message(build_finalize_record_input( .finalize_custom_world_agent_message(build_finalize_record_input(
session_id, session_id,
@@ -584,7 +584,7 @@ pub async fn submit_custom_world_agent_message(
Ok(json_success_body( Ok(json_success_body(
Some(&request_context), Some(&request_context),
json!({ json!({
"operation": map_custom_world_agent_operation_response(operation), "operation": map_custom_world_agent_operation_response(finalized_operation),
}), }),
)) ))
} }
@@ -651,57 +651,100 @@ pub async fn stream_custom_world_agent_message(
.map_err(|error| { .map_err(|error| {
custom_world_error_response(&request_context, map_custom_world_client_error(error)) custom_world_error_response(&request_context, map_custom_world_client_error(error))
})?; })?;
let mut reply_updates = Vec::<String>::new(); let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false);
let turn_result = run_custom_world_agent_turn( let focus_card_id = payload.focus_card_id.clone();
CustomWorldAgentTurnRequest { let state = state.clone();
llm_client: state.llm_client(), let session_id_for_stream = session_id.clone();
session: &session, let owner_user_id_for_stream = owner_user_id.clone();
quick_fill_requested: payload.quick_fill_requested.unwrap_or(false), let operation_id = operation.operation_id.clone();
focus_card_id: payload.focus_card_id.clone(), let stream = async_stream::stream! {
}, let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|text| { let turn_result = {
reply_updates.push(text.to_string()); let run_turn = run_custom_world_agent_turn(
}, CustomWorldAgentTurnRequest {
) llm_client: state.llm_client(),
.await; session: &session,
state quick_fill_requested,
.spacetime_client() focus_card_id,
.finalize_custom_world_agent_message(build_finalize_record_input( },
session_id.clone(), move |text| {
owner_user_id.clone(), let _ = reply_tx.send(text.to_string());
operation.operation_id.clone(), },
format!("assistant-{}", operation.operation_id), );
turn_result, tokio::pin!(run_turn);
current_utc_micros(),
))
.await
.map_err(|error| {
custom_world_error_response(&request_context, map_custom_world_client_error(error))
})?;
let final_session = state
.spacetime_client()
.get_custom_world_agent_session(session_id, owner_user_id)
.await
.map_err(|error| {
custom_world_error_response(&request_context, map_custom_world_client_error(error))
})?;
let session_response = map_custom_world_agent_session_response(final_session);
let mut events = reply_updates loop {
.into_iter() tokio::select! {
.map(|text| custom_world_sse_json_event("reply_delta", json!({ "text": text }))) result = &mut run_turn => break result,
.collect::<Result<Vec<_>, _>>() maybe_text = reply_rx.recv() => {
.map_err(|error| custom_world_error_response(&request_context, error))?; if let Some(text) = maybe_text {
events.push( yield Ok::<Event, Infallible>(custom_world_sse_json_event_or_error(
custom_world_sse_json_event("session", json!({ "session": session_response })) "reply_delta",
.map_err(|error| custom_world_error_response(&request_context, error))?, json!({ "text": text }),
); ));
events.push( }
custom_world_sse_json_event("done", json!({ "ok": true })) }
.map_err(|error| custom_world_error_response(&request_context, error))?, }
); }
};
let stream = tokio_stream::iter(events.into_iter().map(|event| Ok::<Event, Infallible>(event))); while let Some(text) = reply_rx.recv().await {
yield Ok::<Event, Infallible>(custom_world_sse_json_event_or_error(
"reply_delta",
json!({ "text": text }),
));
}
let finalize_result = state
.spacetime_client()
.finalize_custom_world_agent_message(build_finalize_record_input(
session_id_for_stream.clone(),
owner_user_id_for_stream.clone(),
operation_id.clone(),
format!("assistant-{operation_id}"),
turn_result,
current_utc_micros(),
))
.await;
let _finalized_operation = match finalize_result {
Ok(operation) => operation,
Err(error) => {
yield Ok::<Event, Infallible>(custom_world_sse_json_event_or_error(
"error",
json!({ "message": error.to_string() }),
));
return;
}
};
let final_session = match state
.spacetime_client()
.get_custom_world_agent_session(
session_id_for_stream,
owner_user_id_for_stream,
)
.await
{
Ok(session) => session,
Err(error) => {
yield Ok::<Event, Infallible>(custom_world_sse_json_event_or_error(
"error",
json!({ "message": error.to_string() }),
));
return;
}
};
let session_response = map_custom_world_agent_session_response(final_session);
yield Ok::<Event, Infallible>(custom_world_sse_json_event_or_error(
"session",
json!({ "session": session_response }),
));
yield Ok::<Event, Infallible>(custom_world_sse_json_event_or_error(
"done",
json!({ "ok": true }),
));
};
Ok(Sse::new(stream).into_response()) Ok(Sse::new(stream).into_response())
} }
@@ -1053,21 +1096,6 @@ fn map_custom_world_result_preview_blocker_response(
} }
} }
fn resolve_stream_reply_text(session: &CustomWorldAgentSessionSnapshotResponse) -> String {
session
.last_assistant_reply
.clone()
.or_else(|| {
session
.messages
.iter()
.rev()
.find(|message| message.role == "assistant")
.map(|message| message.text.clone())
})
.unwrap_or_default()
}
fn map_custom_world_client_error(error: SpacetimeClientError) -> AppError { fn map_custom_world_client_error(error: SpacetimeClientError) -> AppError {
let status = match &error { let status = match &error {
SpacetimeClientError::Procedure(message) SpacetimeClientError::Procedure(message)
@@ -1111,6 +1139,22 @@ fn custom_world_sse_json_event(event_name: &str, payload: Value) -> Result<Event
}) })
} }
fn custom_world_sse_json_event_or_error(event_name: &str, payload: Value) -> Event {
match custom_world_sse_json_event(event_name, payload) {
Ok(event) => event,
Err(_) => custom_world_sse_error_event_message("SSE payload 序列化失败".to_string()),
}
}
fn custom_world_sse_error_event_message(message: String) -> Event {
let payload = format!(
"{{\"message\":{}}}",
serde_json::to_string(&message)
.unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string())
);
Event::default().event("error").data(payload)
}
fn resolve_author_display_name(_authenticated: &AuthenticatedAccessToken) -> String { fn resolve_author_display_name(_authenticated: &AuthenticatedAccessToken) -> String {
"玩家".to_string() "玩家".to_string()
} }

View File

@@ -68,6 +68,7 @@ enum PromptConversationMode {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
#[allow(dead_code)]
struct PromptDynamicState { struct PromptDynamicState {
current_turn: u32, current_turn: u32,
progress_percent: u32, progress_percent: u32,
@@ -923,8 +924,8 @@ fn build_prompt_dynamic_state_inference_prompt(
} }
fn build_eight_anchor_single_turn_prompt( fn build_eight_anchor_single_turn_prompt(
current_turn: u32, _current_turn: u32,
progress_percent: u32, _progress_percent: u32,
quick_fill_requested: bool, quick_fill_requested: bool,
current_anchor_content: &EightAnchorContent, current_anchor_content: &EightAnchorContent,
chat_history: &[JsonValue], chat_history: &[JsonValue],

View File

@@ -2508,7 +2508,7 @@ fn submit_custom_world_agent_message_tx(
return Err("forced failure".to_string()); return Err("forced failure".to_string());
} }
let session = ctx let _session = ctx
.db .db
.custom_world_agent_session() .custom_world_agent_session()
.session_id() .session_id()