1007 lines
36 KiB
Rust
1007 lines
36 KiB
Rust
use std::{
|
||
collections::BTreeMap,
|
||
convert::Infallible,
|
||
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
|
||
};
|
||
|
||
use axum::{
|
||
Json,
|
||
extract::{Extension, Path, State, rejection::JsonRejection},
|
||
http::StatusCode,
|
||
response::{
|
||
IntoResponse, Response,
|
||
sse::{Event, Sse},
|
||
},
|
||
};
|
||
use module_assets::{
|
||
AssetObjectAccessPolicy, AssetObjectFieldError, build_asset_entity_binding_input,
|
||
build_asset_object_upsert_input, generate_asset_binding_id, generate_asset_object_id,
|
||
};
|
||
use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess};
|
||
use serde_json::{Map, Value, json};
|
||
use shared_contracts::big_fish::{
|
||
BigFishActionResponse, BigFishAgentMessageResponse, BigFishAnchorItemResponse,
|
||
BigFishAnchorPackResponse, BigFishAssetCoverageResponse, BigFishAssetSlotResponse,
|
||
BigFishBackgroundBlueprintResponse, BigFishGameDraftResponse, BigFishLevelBlueprintResponse,
|
||
BigFishRunResponse, BigFishRuntimeEntityResponse, BigFishRuntimeParamsResponse,
|
||
BigFishRuntimeSnapshotResponse, BigFishSessionResponse, BigFishSessionSnapshotResponse,
|
||
BigFishVector2Response, CreateBigFishSessionRequest, ExecuteBigFishActionRequest,
|
||
RecordBigFishPlayRequest, SendBigFishMessageRequest, SubmitBigFishInputRequest,
|
||
};
|
||
use shared_contracts::big_fish_works::{BigFishWorkSummaryResponse, BigFishWorksResponse};
|
||
use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros};
|
||
use spacetime_client::{
|
||
BigFishAgentMessageRecord, BigFishAnchorItemRecord, BigFishAnchorPackRecord,
|
||
BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput, BigFishAssetSlotRecord,
|
||
BigFishBackgroundBlueprintRecord, BigFishDraftCompileRecordInput, BigFishGameDraftRecord,
|
||
BigFishInputSubmitRecordInput, BigFishLevelBlueprintRecord, BigFishLikeReportRecordInput,
|
||
BigFishMessageSubmitRecordInput, BigFishPlayReportRecordInput, BigFishRunStartRecordInput,
|
||
BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRunRecord,
|
||
BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record,
|
||
BigFishWorkRemixRecordInput, BigFishWorkSummaryRecord, SpacetimeClientError,
|
||
};
|
||
use tokio::time::sleep;
|
||
|
||
use crate::big_fish_agent_turn::{
|
||
BigFishAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input,
|
||
run_big_fish_agent_turn,
|
||
};
|
||
use crate::big_fish_draft_compiler::compile_big_fish_draft_with_fallback;
|
||
use crate::generated_image_assets::{
|
||
GeneratedImageAssetAdapter, GeneratedImageAssetDataUrl,
|
||
adapter::GeneratedImageAssetAdapterMetadata, adapter::GeneratedImageAssetPersistInput,
|
||
normalize_generated_image_asset_mime,
|
||
};
|
||
use crate::prompt::big_fish::{
|
||
BIG_FISH_DEFAULT_NEGATIVE_PROMPT, BIG_FISH_TRANSPARENT_ASSET_NEGATIVE_PROMPT,
|
||
build_big_fish_level_main_image_prompt, build_big_fish_level_motion_prompt,
|
||
build_big_fish_stage_background_prompt,
|
||
};
|
||
use crate::{
|
||
ai_generation_drafts::{
|
||
AiGenerationDraftContext, AiGenerationDraftSink, AiGenerationDraftWriter,
|
||
},
|
||
api_response::json_success_body,
|
||
asset_billing::execute_billable_asset_operation,
|
||
auth::AuthenticatedAccessToken,
|
||
character_visual_assets::try_apply_background_alpha_to_png,
|
||
http_error::AppError,
|
||
platform_errors::map_oss_error,
|
||
request_context::RequestContext,
|
||
state::AppState,
|
||
work_author::resolve_work_author_by_user_id,
|
||
work_play_tracking::{WorkPlayTrackingDraft, record_work_play_start_after_success},
|
||
};
|
||
|
||
pub async fn create_big_fish_session(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
payload: Result<Json<CreateBigFishSessionRequest>, JsonRejection>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let Json(payload) = payload.map_err(|error| {
|
||
big_fish_error_response(
|
||
&request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": error.body_text(),
|
||
})),
|
||
)
|
||
})?;
|
||
|
||
let seed_text = payload.seed_text.unwrap_or_default().trim().to_string();
|
||
let session = state
|
||
.spacetime_client()
|
||
.create_big_fish_session(BigFishSessionCreateRecordInput {
|
||
session_id: build_prefixed_uuid_id("big-fish-session-"),
|
||
owner_user_id: authenticated.claims().user_id().to_string(),
|
||
seed_text: seed_text.clone(),
|
||
welcome_message_id: build_prefixed_uuid_id("big-fish-message-"),
|
||
welcome_message_text: build_big_fish_welcome_text(&seed_text),
|
||
created_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishSessionResponse {
|
||
session: map_big_fish_session_response(session),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn get_big_fish_session(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let session = load_big_fish_session_with_retry(
|
||
&state,
|
||
session_id,
|
||
authenticated.claims().user_id().to_string(),
|
||
)
|
||
.await
|
||
.map_err(|error| big_fish_error_response(&request_context, map_big_fish_client_error(error)))?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishSessionResponse {
|
||
session: map_big_fish_session_response(session),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn get_big_fish_works(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let items = state
|
||
.spacetime_client()
|
||
.list_big_fish_works(authenticated.claims().user_id().to_string())
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishWorksResponse {
|
||
items: items
|
||
.into_iter()
|
||
.map(|item| map_big_fish_work_summary_response(&state, item))
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn list_big_fish_gallery(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let items = match state.spacetime_client().list_big_fish_gallery().await {
|
||
Ok(items) => items,
|
||
Err(error) if should_soft_fallback_big_fish_gallery(&error) => {
|
||
tracing::warn!(
|
||
error = %error,
|
||
"大鱼吃小鱼公开广场读取失败,已按空广场降级返回"
|
||
);
|
||
Vec::new()
|
||
}
|
||
Err(error) => {
|
||
return Err(big_fish_error_response(
|
||
&request_context,
|
||
map_big_fish_client_error(error),
|
||
));
|
||
}
|
||
};
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishWorksResponse {
|
||
items: items
|
||
.into_iter()
|
||
.map(|item| map_big_fish_work_summary_response(&state, item))
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn delete_big_fish_work(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let items = state
|
||
.spacetime_client()
|
||
.delete_big_fish_work(session_id, authenticated.claims().user_id().to_string())
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishWorksResponse {
|
||
items: items
|
||
.into_iter()
|
||
.map(|item| map_big_fish_work_summary_response(&state, item))
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn record_big_fish_play(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
payload: Result<Json<RecordBigFishPlayRequest>, JsonRejection>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let Json(payload) = payload.map_err(|error| {
|
||
big_fish_error_response(
|
||
&request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": error.body_text(),
|
||
})),
|
||
)
|
||
})?;
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let items = state
|
||
.spacetime_client()
|
||
.record_big_fish_play(BigFishPlayReportRecordInput {
|
||
session_id: session_id.clone(),
|
||
user_id: authenticated.claims().user_id().to_string(),
|
||
elapsed_ms: payload.elapsed_ms.unwrap_or(0),
|
||
reported_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
record_work_play_start_after_success(
|
||
&state,
|
||
&request_context,
|
||
WorkPlayTrackingDraft::new(
|
||
"big-fish",
|
||
session_id.clone(),
|
||
&authenticated,
|
||
"/api/runtime/big-fish/sessions/{session_id}/play",
|
||
)
|
||
.run_id(session_id.clone()),
|
||
)
|
||
.await;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishWorksResponse {
|
||
items: items
|
||
.into_iter()
|
||
.map(|item| map_big_fish_work_summary_response(&state, item))
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn start_big_fish_run(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let run = state
|
||
.spacetime_client()
|
||
.start_big_fish_run(BigFishRunStartRecordInput {
|
||
run_id: build_prefixed_uuid_id("big-fish-run-"),
|
||
session_id,
|
||
owner_user_id: authenticated.claims().user_id().to_string(),
|
||
started_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishRunResponse {
|
||
run: map_big_fish_run_response(run),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn record_big_fish_gallery_like(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let items = state
|
||
.spacetime_client()
|
||
.record_big_fish_like(BigFishLikeReportRecordInput {
|
||
session_id,
|
||
user_id: authenticated.claims().user_id().to_string(),
|
||
liked_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishWorksResponse {
|
||
items: items
|
||
.into_iter()
|
||
.map(|item| map_big_fish_work_summary_response(&state, item))
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn get_big_fish_run(
|
||
State(state): State<AppState>,
|
||
Path(run_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
ensure_non_empty(&request_context, &run_id, "runId")?;
|
||
|
||
let run = state
|
||
.spacetime_client()
|
||
.get_big_fish_run(run_id, authenticated.claims().user_id().to_string())
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishRunResponse {
|
||
run: map_big_fish_run_response(run),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn submit_big_fish_input(
|
||
State(state): State<AppState>,
|
||
Path(run_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
payload: Result<Json<SubmitBigFishInputRequest>, JsonRejection>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let Json(payload) = payload.map_err(|error| {
|
||
big_fish_error_response(
|
||
&request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": error.body_text(),
|
||
})),
|
||
)
|
||
})?;
|
||
ensure_non_empty(&request_context, &run_id, "runId")?;
|
||
if !payload.x.is_finite() || !payload.y.is_finite() {
|
||
return Err(big_fish_bad_request(&request_context, "input is invalid"));
|
||
}
|
||
|
||
let run = state
|
||
.spacetime_client()
|
||
.submit_big_fish_input(BigFishInputSubmitRecordInput {
|
||
run_id,
|
||
owner_user_id: authenticated.claims().user_id().to_string(),
|
||
x: payload.x,
|
||
y: payload.y,
|
||
submitted_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishRunResponse {
|
||
run: map_big_fish_run_response(run),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn remix_big_fish_gallery_work(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
) -> Result<Json<Value>, Response> {
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let session = state
|
||
.spacetime_client()
|
||
.remix_big_fish_work(BigFishWorkRemixRecordInput {
|
||
source_session_id: session_id,
|
||
target_session_id: build_prefixed_uuid_id("big-fish-session-"),
|
||
target_owner_user_id: authenticated.claims().user_id().to_string(),
|
||
welcome_message_id: build_prefixed_uuid_id("big-fish-message-"),
|
||
remixed_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishSessionResponse {
|
||
session: map_big_fish_session_response(session),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn submit_big_fish_message(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
payload: Result<Json<SendBigFishMessageRequest>, JsonRejection>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let Json(payload) = payload.map_err(|error| {
|
||
big_fish_error_response(
|
||
&request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": error.body_text(),
|
||
})),
|
||
)
|
||
})?;
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let client_message_id = payload.client_message_id.trim().to_string();
|
||
let message_text = payload.text.trim().to_string();
|
||
if client_message_id.is_empty() || message_text.is_empty() {
|
||
return Err(big_fish_bad_request(
|
||
&request_context,
|
||
"clientMessageId and text are required",
|
||
));
|
||
}
|
||
|
||
let owner_user_id = authenticated.claims().user_id().to_string();
|
||
let submitted_session = state
|
||
.spacetime_client()
|
||
.submit_big_fish_message(BigFishMessageSubmitRecordInput {
|
||
session_id: session_id.clone(),
|
||
owner_user_id: owner_user_id.clone(),
|
||
user_message_id: client_message_id,
|
||
user_message_text: message_text,
|
||
assistant_message_id: build_prefixed_uuid_id("big-fish-message-"),
|
||
submitted_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new(
|
||
"big_fish",
|
||
owner_user_id.as_str(),
|
||
session_id.as_str(),
|
||
payload.client_message_id.as_str(),
|
||
"大鱼吃小鱼模板生成草稿",
|
||
));
|
||
if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await {
|
||
tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行");
|
||
}
|
||
let draft_sink = AiGenerationDraftSink::new(
|
||
AiGenerationDraftContext::new(
|
||
"big_fish",
|
||
owner_user_id.as_str(),
|
||
session_id.as_str(),
|
||
payload.client_message_id.as_str(),
|
||
"大鱼吃小鱼模板生成草稿",
|
||
),
|
||
state.spacetime_client().clone(),
|
||
);
|
||
let turn_result = run_big_fish_agent_turn(
|
||
BigFishAgentTurnRequest {
|
||
llm_client: state.llm_client(),
|
||
session: &submitted_session,
|
||
quick_fill_requested: payload.quick_fill_requested.unwrap_or(false),
|
||
enable_web_search: state.config.creation_agent_llm_web_search_enabled,
|
||
},
|
||
move |text| {
|
||
draft_sink.persist_visible_text_async(text);
|
||
},
|
||
)
|
||
.await;
|
||
if let Ok(result) = &turn_result {
|
||
draft_writer
|
||
.persist_visible_text(
|
||
state.spacetime_client(),
|
||
result.assistant_reply_text.as_str(),
|
||
)
|
||
.await;
|
||
}
|
||
let finalize_input = match turn_result {
|
||
Ok(turn_result) => build_finalize_record_input(
|
||
session_id.clone(),
|
||
owner_user_id.clone(),
|
||
build_prefixed_uuid_id("big-fish-message-"),
|
||
turn_result,
|
||
current_utc_micros(),
|
||
),
|
||
Err(error) => build_failed_finalize_record_input(
|
||
session_id.clone(),
|
||
owner_user_id.clone(),
|
||
&submitted_session,
|
||
error.to_string(),
|
||
current_utc_micros(),
|
||
),
|
||
};
|
||
let session = state
|
||
.spacetime_client()
|
||
.finalize_big_fish_agent_message(finalize_input)
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishSessionResponse {
|
||
session: map_big_fish_session_response(session),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn stream_big_fish_message(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
payload: Result<Json<SendBigFishMessageRequest>, JsonRejection>,
|
||
) -> Result<Response, Response> {
|
||
let Json(payload) = payload.map_err(|error| {
|
||
big_fish_error_response(
|
||
&request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": error.body_text(),
|
||
})),
|
||
)
|
||
})?;
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let client_message_id = payload.client_message_id.trim().to_string();
|
||
let message_text = payload.text.trim().to_string();
|
||
if client_message_id.is_empty() || message_text.is_empty() {
|
||
return Err(big_fish_bad_request(
|
||
&request_context,
|
||
"clientMessageId and text are required",
|
||
));
|
||
}
|
||
|
||
let owner_user_id = authenticated.claims().user_id().to_string();
|
||
let submitted_session = state
|
||
.spacetime_client()
|
||
.submit_big_fish_message(BigFishMessageSubmitRecordInput {
|
||
session_id: session_id.clone(),
|
||
owner_user_id: owner_user_id.clone(),
|
||
user_message_id: client_message_id,
|
||
user_message_text: message_text,
|
||
assistant_message_id: build_prefixed_uuid_id("big-fish-message-"),
|
||
submitted_at_micros: current_utc_micros(),
|
||
})
|
||
.await
|
||
.map_err(|error| {
|
||
big_fish_error_response(&request_context, map_big_fish_client_error(error))
|
||
})?;
|
||
let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false);
|
||
let state = state.clone();
|
||
let session_id_for_stream = session_id.clone();
|
||
let owner_user_id_for_stream = owner_user_id.clone();
|
||
let client_message_id_for_stream = payload.client_message_id.clone();
|
||
let stream = async_stream::stream! {
|
||
let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new(
|
||
"big_fish",
|
||
owner_user_id_for_stream.as_str(),
|
||
session_id_for_stream.as_str(),
|
||
client_message_id_for_stream.as_str(),
|
||
"大鱼吃小鱼模板生成草稿",
|
||
));
|
||
if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await {
|
||
tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行");
|
||
}
|
||
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||
// 与 RPG/拼图 Agent 保持同一语义:回复先流式展示,session 真相仍等 finalize 后下发。
|
||
let turn_result = {
|
||
let run_turn = run_big_fish_agent_turn(
|
||
BigFishAgentTurnRequest {
|
||
llm_client: state.llm_client(),
|
||
session: &submitted_session,
|
||
quick_fill_requested,
|
||
enable_web_search: state.config.creation_agent_llm_web_search_enabled,
|
||
},
|
||
move |text| {
|
||
let _ = reply_tx.send(text.to_string());
|
||
},
|
||
);
|
||
tokio::pin!(run_turn);
|
||
|
||
loop {
|
||
// 每个 replyText 增量同时写草稿表并推给 SSE,避免前端等待完整模型响应。
|
||
tokio::select! {
|
||
result = &mut run_turn => break result,
|
||
maybe_text = reply_rx.recv() => {
|
||
if let Some(text) = maybe_text {
|
||
draft_writer
|
||
.persist_visible_text(state.spacetime_client(), text.as_str())
|
||
.await;
|
||
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
|
||
"reply_delta",
|
||
json!({ "text": text }),
|
||
));
|
||
}
|
||
}
|
||
}
|
||
}
|
||
};
|
||
|
||
while let Some(text) = reply_rx.recv().await {
|
||
draft_writer
|
||
.persist_visible_text(state.spacetime_client(), text.as_str())
|
||
.await;
|
||
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
|
||
"reply_delta",
|
||
json!({ "text": text }),
|
||
));
|
||
}
|
||
|
||
let finalize_input = match turn_result {
|
||
Ok(turn_result) => build_finalize_record_input(
|
||
session_id_for_stream.clone(),
|
||
owner_user_id_for_stream.clone(),
|
||
build_prefixed_uuid_id("big-fish-message-"),
|
||
turn_result,
|
||
current_utc_micros(),
|
||
),
|
||
Err(error) => build_failed_finalize_record_input(
|
||
session_id_for_stream.clone(),
|
||
owner_user_id_for_stream.clone(),
|
||
&submitted_session,
|
||
error.to_string(),
|
||
current_utc_micros(),
|
||
),
|
||
};
|
||
let session = match state
|
||
.spacetime_client()
|
||
.finalize_big_fish_agent_message(finalize_input)
|
||
.await
|
||
{
|
||
Ok(session) => session,
|
||
Err(error) => {
|
||
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
|
||
"error",
|
||
json!({ "message": error.to_string() }),
|
||
));
|
||
return;
|
||
}
|
||
};
|
||
|
||
let session_response = map_big_fish_session_response(session);
|
||
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
|
||
"session",
|
||
json!({ "session": session_response }),
|
||
));
|
||
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
|
||
"done",
|
||
json!({ "ok": true }),
|
||
));
|
||
};
|
||
Ok(Sse::new(stream).into_response())
|
||
}
|
||
|
||
pub async fn execute_big_fish_action(
|
||
State(state): State<AppState>,
|
||
Path(session_id): Path<String>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(authenticated): Extension<AuthenticatedAccessToken>,
|
||
payload: Result<Json<ExecuteBigFishActionRequest>, JsonRejection>,
|
||
) -> Result<Json<Value>, Response> {
|
||
let Json(payload) = payload.map_err(|error| {
|
||
big_fish_error_response(
|
||
&request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": error.body_text(),
|
||
})),
|
||
)
|
||
})?;
|
||
ensure_non_empty(&request_context, &session_id, "sessionId")?;
|
||
|
||
let owner_user_id = authenticated.claims().user_id().to_string();
|
||
let now = current_utc_micros();
|
||
let action = payload.action.trim().to_string();
|
||
let billed_asset_kind = match action.as_str() {
|
||
"big_fish_generate_level_main_image" => Some("big_fish_level_main_image"),
|
||
"big_fish_generate_level_motion" => Some("big_fish_level_motion"),
|
||
"big_fish_generate_stage_background" => Some("big_fish_stage_background"),
|
||
"big_fish_publish_game" => Some("big_fish_publish_game"),
|
||
_ => None,
|
||
};
|
||
let billing_asset_id = format!("{session_id}:{now}");
|
||
let session_operation = async {
|
||
match action.as_str() {
|
||
"big_fish_compile_draft" => {
|
||
compile_big_fish_draft_only(&state, session_id.clone(), owner_user_id.clone(), now)
|
||
.await
|
||
.map_err(map_big_fish_client_error)
|
||
}
|
||
"big_fish_generate_level_main_image" => {
|
||
let asset_url = generate_big_fish_formal_asset(
|
||
&state,
|
||
&owner_user_id,
|
||
&session_id,
|
||
"level_main_image",
|
||
payload.level,
|
||
None,
|
||
now,
|
||
)
|
||
.await?;
|
||
state
|
||
.spacetime_client()
|
||
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
|
||
owner_user_id: owner_user_id.clone(),
|
||
session_id: session_id.clone(),
|
||
asset_kind: "level_main_image".to_string(),
|
||
level: payload.level,
|
||
motion_key: None,
|
||
asset_url: Some(asset_url),
|
||
generated_at_micros: now,
|
||
})
|
||
.await
|
||
.map_err(map_big_fish_client_error)
|
||
}
|
||
"big_fish_generate_level_motion" => {
|
||
let asset_url = generate_big_fish_formal_asset(
|
||
&state,
|
||
&owner_user_id,
|
||
&session_id,
|
||
"level_motion",
|
||
payload.level,
|
||
payload.motion_key.as_deref(),
|
||
now,
|
||
)
|
||
.await?;
|
||
state
|
||
.spacetime_client()
|
||
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
|
||
owner_user_id: owner_user_id.clone(),
|
||
session_id: session_id.clone(),
|
||
asset_kind: "level_motion".to_string(),
|
||
level: payload.level,
|
||
motion_key: payload.motion_key,
|
||
asset_url: Some(asset_url),
|
||
generated_at_micros: now,
|
||
})
|
||
.await
|
||
.map_err(map_big_fish_client_error)
|
||
}
|
||
"big_fish_generate_stage_background" => {
|
||
let asset_url = generate_big_fish_formal_asset(
|
||
&state,
|
||
&owner_user_id,
|
||
&session_id,
|
||
"stage_background",
|
||
None,
|
||
None,
|
||
now,
|
||
)
|
||
.await?;
|
||
state
|
||
.spacetime_client()
|
||
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
|
||
owner_user_id: owner_user_id.clone(),
|
||
session_id: session_id.clone(),
|
||
asset_kind: "stage_background".to_string(),
|
||
level: None,
|
||
motion_key: None,
|
||
asset_url: Some(asset_url),
|
||
generated_at_micros: now,
|
||
})
|
||
.await
|
||
.map_err(map_big_fish_client_error)
|
||
}
|
||
"big_fish_publish_game" => state
|
||
.spacetime_client()
|
||
.publish_big_fish_game(session_id, owner_user_id.clone(), now)
|
||
.await
|
||
.map_err(map_big_fish_client_error),
|
||
other => Err(
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": format!("action `{other}` is not supported"),
|
||
})),
|
||
),
|
||
}
|
||
};
|
||
let session_result = if let Some(asset_kind) = billed_asset_kind {
|
||
execute_billable_asset_operation(
|
||
&state,
|
||
&owner_user_id,
|
||
asset_kind,
|
||
&billing_asset_id,
|
||
session_operation,
|
||
)
|
||
.await
|
||
} else {
|
||
session_operation.await
|
||
};
|
||
let session =
|
||
session_result.map_err(|error| big_fish_error_response(&request_context, error))?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
BigFishActionResponse {
|
||
session: map_big_fish_session_response(session),
|
||
},
|
||
))
|
||
}
|
||
|
||
mod mappers;
|
||
|
||
use mappers::*;
|
||
|
||
mod formal_assets;
|
||
|
||
use formal_assets::generate_big_fish_formal_asset;
|
||
|
||
fn sanitize_big_fish_path_segment(value: &str, fallback: &str) -> String {
|
||
let sanitized = value
|
||
.trim()
|
||
.chars()
|
||
.map(|ch| {
|
||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||
ch
|
||
} else {
|
||
'-'
|
||
}
|
||
})
|
||
.collect::<String>()
|
||
.trim_matches('-')
|
||
.to_string();
|
||
if sanitized.is_empty() {
|
||
fallback.to_string()
|
||
} else {
|
||
sanitized
|
||
}
|
||
}
|
||
|
||
fn ensure_non_empty(
|
||
request_context: &RequestContext,
|
||
value: &str,
|
||
field_name: &str,
|
||
) -> Result<(), Response> {
|
||
if value.trim().is_empty() {
|
||
return Err(big_fish_bad_request(
|
||
request_context,
|
||
format!("{field_name} is required").as_str(),
|
||
));
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
fn big_fish_bad_request(request_context: &RequestContext, message: &str) -> Response {
|
||
big_fish_error_response(
|
||
request_context,
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
|
||
"provider": "big-fish",
|
||
"message": message,
|
||
})),
|
||
)
|
||
}
|
||
|
||
fn big_fish_sse_json_event_or_error(event_name: &str, payload: Value) -> Event {
|
||
match serde_json::to_string(&payload) {
|
||
Ok(payload_text) => Event::default().event(event_name).data(payload_text),
|
||
Err(_) => big_fish_sse_error_event_message("SSE payload 序列化失败".to_string()),
|
||
}
|
||
}
|
||
|
||
fn big_fish_sse_error_event_message(message: String) -> Event {
|
||
let payload = format!(
|
||
"{{\"message\":{}}}",
|
||
serde_json::to_string(&message)
|
||
.unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string())
|
||
);
|
||
Event::default().event("error").data(payload)
|
||
}
|
||
|
||
fn map_big_fish_client_error(error: SpacetimeClientError) -> AppError {
|
||
let status = match &error {
|
||
SpacetimeClientError::Procedure(message)
|
||
if message.contains("big_fish_creation_session 不存在") =>
|
||
{
|
||
StatusCode::NOT_FOUND
|
||
}
|
||
SpacetimeClientError::Procedure(message)
|
||
if message.contains("big_fish_runtime_run 不存在") =>
|
||
{
|
||
StatusCode::NOT_FOUND
|
||
}
|
||
SpacetimeClientError::Procedure(message) if message.contains("无权访问") => {
|
||
StatusCode::FORBIDDEN
|
||
}
|
||
SpacetimeClientError::Procedure(message)
|
||
if message.contains("不能为空")
|
||
|| message.contains("尚未编译")
|
||
|| message.contains("不允许")
|
||
|| message.contains("非法")
|
||
|| message.contains("缺少") =>
|
||
{
|
||
StatusCode::BAD_REQUEST
|
||
}
|
||
SpacetimeClientError::Runtime(_) => StatusCode::BAD_REQUEST,
|
||
SpacetimeClientError::Timeout => StatusCode::GATEWAY_TIMEOUT,
|
||
_ => StatusCode::BAD_GATEWAY,
|
||
};
|
||
|
||
let message = match &error {
|
||
SpacetimeClientError::Timeout => "SpacetimeDB 会话读取超时,请稍后重试。".to_string(),
|
||
SpacetimeClientError::ConnectDropped => {
|
||
"SpacetimeDB 会话连接已断开,请稍后重试。".to_string()
|
||
}
|
||
_ => error.to_string(),
|
||
};
|
||
|
||
AppError::from_status(status).with_details(json!({
|
||
"provider": "spacetimedb",
|
||
"message": message,
|
||
}))
|
||
}
|
||
|
||
fn should_soft_fallback_big_fish_gallery(error: &SpacetimeClientError) -> bool {
|
||
match error {
|
||
// 公开广场是首页可选数据,SpacetimeDB procedure 运行态短暂失败时不应阻断平台首屏。
|
||
SpacetimeClientError::Runtime(_) | SpacetimeClientError::ConnectDropped => true,
|
||
SpacetimeClientError::Procedure(message) => {
|
||
message.contains("list_big_fish_works")
|
||
|| message.contains("procedure")
|
||
|| message.contains("No such procedure")
|
||
}
|
||
_ => false,
|
||
}
|
||
}
|
||
|
||
fn big_fish_error_response(request_context: &RequestContext, error: AppError) -> Response {
|
||
error.into_response_with_context(Some(request_context))
|
||
}
|
||
|
||
fn current_utc_micros() -> i64 {
|
||
let duration = SystemTime::now()
|
||
.duration_since(UNIX_EPOCH)
|
||
.expect("system clock should be after unix epoch");
|
||
i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64")
|
||
}
|
||
|
||
fn current_timestamp_micros_to_string(value: i64) -> String {
|
||
format_timestamp_micros(value)
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn big_fish_gallery_soft_fallbacks_for_runtime_errors() {
|
||
assert!(should_soft_fallback_big_fish_gallery(
|
||
&SpacetimeClientError::Runtime("procedure runtime error".to_string())
|
||
));
|
||
assert!(should_soft_fallback_big_fish_gallery(
|
||
&SpacetimeClientError::ConnectDropped
|
||
));
|
||
assert!(should_soft_fallback_big_fish_gallery(
|
||
&SpacetimeClientError::Procedure("No such procedure: list_big_fish_works".to_string(),)
|
||
));
|
||
}
|
||
|
||
#[test]
|
||
fn big_fish_gallery_keeps_timeout_errors_visible() {
|
||
assert!(!should_soft_fallback_big_fish_gallery(
|
||
&SpacetimeClientError::Timeout
|
||
));
|
||
}
|
||
}
|