1
Some checks failed
CI / verify (push) Has been cancelled

This commit is contained in:
2026-04-26 21:07:55 +08:00
609 changed files with 4601 additions and 14501 deletions

View File

@@ -1,13 +1,17 @@
use std::{
collections::BTreeMap,
convert::Infallible,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use axum::{
Json,
extract::{Extension, Path, State, rejection::JsonRejection},
http::{HeaderName, StatusCode, header},
response::{IntoResponse, Response},
http::StatusCode,
response::{
IntoResponse, Response,
sse::{Event, Sse},
},
};
use module_assets::{
AssetObjectAccessPolicy, AssetObjectFieldError, build_asset_entity_binding_input,
@@ -323,93 +327,107 @@ pub async fn stream_big_fish_message(
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false);
let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new(
"big_fish",
owner_user_id.as_str(),
session_id.as_str(),
payload.client_message_id.as_str(),
"大鱼吃小鱼模板生成草稿",
));
if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await {
tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行");
}
let draft_sink = AiGenerationDraftSink::new(
AiGenerationDraftContext::new(
let state = state.clone();
let session_id_for_stream = session_id.clone();
let owner_user_id_for_stream = owner_user_id.clone();
let client_message_id_for_stream = payload.client_message_id.clone();
let stream = async_stream::stream! {
let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new(
"big_fish",
owner_user_id.as_str(),
session_id.as_str(),
payload.client_message_id.as_str(),
owner_user_id_for_stream.as_str(),
session_id_for_stream.as_str(),
client_message_id_for_stream.as_str(),
"大鱼吃小鱼模板生成草稿",
),
state.spacetime_client().clone(),
);
let mut streamed_reply_text = String::new();
let turn_result = run_big_fish_agent_turn(
BigFishAgentTurnRequest {
llm_client: state.llm_client(),
session: &submitted_session,
quick_fill_requested,
},
|text| {
draft_sink.persist_visible_text_async(text);
streamed_reply_text = text.to_string();
},
)
.await;
if !streamed_reply_text.is_empty() {
draft_writer
.persist_visible_text(state.spacetime_client(), streamed_reply_text.as_str())
.await;
}
let reply_text = match &turn_result {
Ok(result) => result.assistant_reply_text.clone(),
Err(error) => error.to_string(),
};
let finalize_input = match turn_result {
Ok(turn_result) => build_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
build_prefixed_uuid_id("big-fish-message-"),
turn_result,
current_utc_micros(),
),
Err(error) => build_failed_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
&submitted_session,
error.to_string(),
current_utc_micros(),
),
};
let session = state
.spacetime_client()
.finalize_big_fish_agent_message(finalize_input)
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
));
if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await {
tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行");
}
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
// 与 RPG/拼图 Agent 保持同一语义回复先流式展示session 真相仍等 finalize 后下发。
let turn_result = {
let run_turn = run_big_fish_agent_turn(
BigFishAgentTurnRequest {
llm_client: state.llm_client(),
session: &submitted_session,
quick_fill_requested,
},
move |text| {
let _ = reply_tx.send(text.to_string());
},
);
tokio::pin!(run_turn);
let session_response = map_big_fish_session_response(session);
let mut sse_body = String::new();
append_sse_event(
&request_context,
&mut sse_body,
"reply_delta",
&json!({ "text": if streamed_reply_text.is_empty() { reply_text } else { streamed_reply_text } }),
)?;
append_sse_event(
&request_context,
&mut sse_body,
"session",
&json!({ "session": session_response }),
)?;
append_sse_event(
&request_context,
&mut sse_body,
"done",
&json!({ "ok": true }),
)?;
Ok(build_event_stream_response(sse_body))
loop {
// 每个 replyText 增量同时写草稿表并推给 SSE避免前端等待完整模型响应。
tokio::select! {
result = &mut run_turn => break result,
maybe_text = reply_rx.recv() => {
if let Some(text) = maybe_text {
draft_writer
.persist_visible_text(state.spacetime_client(), text.as_str())
.await;
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"reply_delta",
json!({ "text": text }),
));
}
}
}
}
};
while let Some(text) = reply_rx.recv().await {
draft_writer
.persist_visible_text(state.spacetime_client(), text.as_str())
.await;
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"reply_delta",
json!({ "text": text }),
));
}
let finalize_input = match turn_result {
Ok(turn_result) => build_finalize_record_input(
session_id_for_stream.clone(),
owner_user_id_for_stream.clone(),
build_prefixed_uuid_id("big-fish-message-"),
turn_result,
current_utc_micros(),
),
Err(error) => build_failed_finalize_record_input(
session_id_for_stream.clone(),
owner_user_id_for_stream.clone(),
&submitted_session,
error.to_string(),
current_utc_micros(),
),
};
let session = match state
.spacetime_client()
.finalize_big_fish_agent_message(finalize_input)
.await
{
Ok(session) => session,
Err(error) => {
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"error",
json!({ "message": error.to_string() }),
));
return;
}
};
let session_response = map_big_fish_session_response(session);
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"session",
json!({ "session": session_response }),
));
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"done",
json!({ "ok": true }),
));
};
Ok(Sse::new(stream).into_response())
}
pub async fn execute_big_fish_action(
@@ -1706,40 +1724,20 @@ fn big_fish_bad_request(request_context: &RequestContext, message: &str) -> Resp
)
}
fn append_sse_event(
request_context: &RequestContext,
body: &mut String,
event: &str,
payload: &Value,
) -> Result<(), Response> {
let payload_text = serde_json::to_string(payload).map_err(|error| {
big_fish_error_response(
request_context,
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({
"provider": "big-fish",
"message": format!("SSE payload 序列化失败:{error}"),
})),
)
})?;
body.push_str("event: ");
body.push_str(event);
body.push('\n');
body.push_str("data: ");
body.push_str(&payload_text);
body.push_str("\n\n");
Ok(())
fn big_fish_sse_json_event_or_error(event_name: &str, payload: Value) -> Event {
match serde_json::to_string(&payload) {
Ok(payload_text) => Event::default().event(event_name).data(payload_text),
Err(_) => big_fish_sse_error_event_message("SSE payload 序列化失败".to_string()),
}
}
fn build_event_stream_response(body: String) -> Response {
(
[
(header::CONTENT_TYPE, "text/event-stream; charset=utf-8"),
(header::CACHE_CONTROL, "no-cache"),
(HeaderName::from_static("x-accel-buffering"), "no"),
],
body,
)
.into_response()
fn big_fish_sse_error_event_message(message: String) -> Event {
let payload = format!(
"{{\"message\":{}}}",
serde_json::to_string(&message)
.unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string())
);
Event::default().event("error").data(payload)
}
fn map_big_fish_client_error(error: SpacetimeClientError) -> AppError {