This commit is contained in:
2026-04-24 12:21:33 +08:00
parent 3528980645
commit 70b5a7cf73
515 changed files with 14971 additions and 6831 deletions

View File

@@ -37,6 +37,10 @@ use spacetime_client::{
};
use tokio::time::sleep;
use crate::big_fish_agent_turn::{
BigFishAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input,
run_big_fish_agent_turn,
};
use crate::{
api_response::json_success_body, auth::AuthenticatedAccessToken, http_error::AppError,
request_context::RequestContext, state::AppState,
@@ -157,11 +161,12 @@ pub async fn submit_big_fish_message(
));
}
let session = state
let owner_user_id = authenticated.claims().user_id().to_string();
let submitted_session = state
.spacetime_client()
.submit_big_fish_message(BigFishMessageSubmitRecordInput {
session_id,
owner_user_id: authenticated.claims().user_id().to_string(),
session_id: session_id.clone(),
owner_user_id: owner_user_id.clone(),
user_message_id: client_message_id,
user_message_text: message_text,
assistant_message_id: build_prefixed_uuid_id("big-fish-message-"),
@@ -171,6 +176,37 @@ pub async fn submit_big_fish_message(
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
let turn_result = run_big_fish_agent_turn(
BigFishAgentTurnRequest {
llm_client: state.llm_client(),
session: &submitted_session,
},
|_| {},
)
.await;
let finalize_input = match turn_result {
Ok(turn_result) => build_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
build_prefixed_uuid_id("big-fish-message-"),
turn_result,
current_utc_micros(),
),
Err(error) => build_failed_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
&submitted_session,
error.to_string(),
current_utc_micros(),
),
};
let session = state
.spacetime_client()
.finalize_big_fish_agent_message(finalize_input)
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
@@ -198,14 +234,23 @@ pub async fn stream_big_fish_message(
})?;
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let client_message_id = payload.client_message_id.trim().to_string();
let message_text = payload.text.trim().to_string();
if client_message_id.is_empty() || message_text.is_empty() {
return Err(big_fish_bad_request(
&request_context,
"clientMessageId and text are required",
));
}
let owner_user_id = authenticated.claims().user_id().to_string();
let session = state
let submitted_session = state
.spacetime_client()
.submit_big_fish_message(BigFishMessageSubmitRecordInput {
session_id,
owner_user_id,
user_message_id: payload.client_message_id.trim().to_string(),
user_message_text: payload.text.trim().to_string(),
session_id: session_id.clone(),
owner_user_id: owner_user_id.clone(),
user_message_id: client_message_id,
user_message_text: message_text,
assistant_message_id: build_prefixed_uuid_id("big-fish-message-"),
submitted_at_micros: current_utc_micros(),
})
@@ -213,18 +258,52 @@ pub async fn stream_big_fish_message(
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
let mut streamed_reply_text = String::new();
let turn_result = run_big_fish_agent_turn(
BigFishAgentTurnRequest {
llm_client: state.llm_client(),
session: &submitted_session,
},
|text| {
streamed_reply_text = text.to_string();
},
)
.await;
let reply_text = match &turn_result {
Ok(result) => result.assistant_reply_text.clone(),
Err(error) => error.to_string(),
};
let finalize_input = match turn_result {
Ok(turn_result) => build_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
build_prefixed_uuid_id("big-fish-message-"),
turn_result,
current_utc_micros(),
),
Err(error) => build_failed_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
&submitted_session,
error.to_string(),
current_utc_micros(),
),
};
let session = state
.spacetime_client()
.finalize_big_fish_agent_message(finalize_input)
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
let session_response = map_big_fish_session_response(session);
let reply_text = session_response
.last_assistant_reply
.clone()
.unwrap_or_else(|| "锚点已更新。".to_string());
let mut sse_body = String::new();
append_sse_event(
&request_context,
&mut sse_body,
"reply_delta",
&json!({ "text": reply_text }),
&json!({ "text": if streamed_reply_text.is_empty() { reply_text } else { streamed_reply_text } }),
)?;
append_sse_event(
&request_context,