use std::{ collections::BTreeMap, convert::Infallible, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use axum::{ Json, extract::{Extension, Path, State, rejection::JsonRejection}, http::StatusCode, response::{ IntoResponse, Response, sse::{Event, Sse}, }, }; use module_assets::{ AssetObjectAccessPolicy, AssetObjectFieldError, build_asset_entity_binding_input, build_asset_object_upsert_input, generate_asset_binding_id, generate_asset_object_id, }; use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess, OssPutObjectRequest}; use serde_json::{Map, Value, json}; use shared_contracts::big_fish::{ BigFishActionResponse, BigFishAgentMessageResponse, BigFishAnchorItemResponse, BigFishAnchorPackResponse, BigFishAssetCoverageResponse, BigFishAssetSlotResponse, BigFishBackgroundBlueprintResponse, BigFishGameDraftResponse, BigFishLevelBlueprintResponse, BigFishRunResponse, BigFishRuntimeEntityResponse, BigFishRuntimeParamsResponse, BigFishRuntimeSnapshotResponse, BigFishSessionResponse, BigFishSessionSnapshotResponse, BigFishVector2Response, CreateBigFishSessionRequest, ExecuteBigFishActionRequest, RecordBigFishPlayRequest, SendBigFishMessageRequest, SubmitBigFishInputRequest, }; use shared_contracts::big_fish_works::{BigFishWorkSummaryResponse, BigFishWorksResponse}; use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros}; use spacetime_client::{ BigFishAgentMessageRecord, BigFishAnchorItemRecord, BigFishAnchorPackRecord, BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput, BigFishAssetSlotRecord, BigFishBackgroundBlueprintRecord, BigFishDraftCompileRecordInput, BigFishGameDraftRecord, BigFishInputSubmitRecordInput, BigFishLevelBlueprintRecord, BigFishLikeReportRecordInput, BigFishMessageSubmitRecordInput, BigFishPlayReportRecordInput, BigFishRunStartRecordInput, BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRunRecord, BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record, BigFishWorkRemixRecordInput, BigFishWorkSummaryRecord, SpacetimeClientError, }; use tokio::time::sleep; use crate::big_fish_agent_turn::{ BigFishAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input, run_big_fish_agent_turn, }; use crate::big_fish_draft_compiler::compile_big_fish_draft_with_fallback; use crate::prompt::big_fish::{ BIG_FISH_DEFAULT_NEGATIVE_PROMPT, BIG_FISH_TRANSPARENT_ASSET_NEGATIVE_PROMPT, build_big_fish_level_main_image_prompt, build_big_fish_level_motion_prompt, build_big_fish_stage_background_prompt, }; use crate::{ ai_generation_drafts::{ AiGenerationDraftContext, AiGenerationDraftSink, AiGenerationDraftWriter, }, api_response::json_success_body, asset_billing::execute_billable_asset_operation, auth::AuthenticatedAccessToken, character_visual_assets::try_apply_background_alpha_to_png, http_error::AppError, platform_errors::map_oss_error, request_context::RequestContext, state::AppState, work_author::resolve_work_author_by_user_id, work_play_tracking::{WorkPlayTrackingDraft, record_work_play_start_after_success}, }; pub async fn create_big_fish_session( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; let seed_text = payload.seed_text.unwrap_or_default().trim().to_string(); let session = state .spacetime_client() .create_big_fish_session(BigFishSessionCreateRecordInput { session_id: build_prefixed_uuid_id("big-fish-session-"), owner_user_id: authenticated.claims().user_id().to_string(), seed_text: seed_text.clone(), welcome_message_id: build_prefixed_uuid_id("big-fish-message-"), welcome_message_text: build_big_fish_welcome_text(&seed_text), created_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn get_big_fish_session( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let session = load_big_fish_session_with_retry( &state, session_id, authenticated.claims().user_id().to_string(), ) .await .map_err(|error| big_fish_error_response(&request_context, map_big_fish_client_error(error)))?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn get_big_fish_works( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { let items = state .spacetime_client() .list_big_fish_works(authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(|item| map_big_fish_work_summary_response(&state, item)) .collect(), }, )) } pub async fn list_big_fish_gallery( State(state): State, Extension(request_context): Extension, ) -> Result, Response> { let items = match state.spacetime_client().list_big_fish_gallery().await { Ok(items) => items, Err(error) if should_soft_fallback_big_fish_gallery(&error) => { tracing::warn!( error = %error, "大鱼吃小鱼公开广场读取失败,已按空广场降级返回" ); Vec::new() } Err(error) => { return Err(big_fish_error_response( &request_context, map_big_fish_client_error(error), )); } }; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(|item| map_big_fish_work_summary_response(&state, item)) .collect(), }, )) } pub async fn delete_big_fish_work( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let items = state .spacetime_client() .delete_big_fish_work(session_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(|item| map_big_fish_work_summary_response(&state, item)) .collect(), }, )) } pub async fn record_big_fish_play( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let items = state .spacetime_client() .record_big_fish_play(BigFishPlayReportRecordInput { session_id: session_id.clone(), user_id: authenticated.claims().user_id().to_string(), elapsed_ms: payload.elapsed_ms.unwrap_or(0), reported_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; record_work_play_start_after_success( &state, &request_context, WorkPlayTrackingDraft::new( "big-fish", session_id.clone(), &authenticated, "/api/runtime/big-fish/sessions/{session_id}/play", ) .run_id(session_id.clone()), ) .await; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(|item| map_big_fish_work_summary_response(&state, item)) .collect(), }, )) } pub async fn start_big_fish_run( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let run = state .spacetime_client() .start_big_fish_run(BigFishRunStartRecordInput { run_id: build_prefixed_uuid_id("big-fish-run-"), session_id, owner_user_id: authenticated.claims().user_id().to_string(), started_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishRunResponse { run: map_big_fish_run_response(run), }, )) } pub async fn record_big_fish_gallery_like( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let items = state .spacetime_client() .record_big_fish_like(BigFishLikeReportRecordInput { session_id, user_id: authenticated.claims().user_id().to_string(), liked_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(|item| map_big_fish_work_summary_response(&state, item)) .collect(), }, )) } pub async fn get_big_fish_run( State(state): State, Path(run_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &run_id, "runId")?; let run = state .spacetime_client() .get_big_fish_run(run_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishRunResponse { run: map_big_fish_run_response(run), }, )) } pub async fn submit_big_fish_input( State(state): State, Path(run_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &run_id, "runId")?; if !payload.x.is_finite() || !payload.y.is_finite() { return Err(big_fish_bad_request(&request_context, "input is invalid")); } let run = state .spacetime_client() .submit_big_fish_input(BigFishInputSubmitRecordInput { run_id, owner_user_id: authenticated.claims().user_id().to_string(), x: payload.x, y: payload.y, submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishRunResponse { run: map_big_fish_run_response(run), }, )) } pub async fn remix_big_fish_gallery_work( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let session = state .spacetime_client() .remix_big_fish_work(BigFishWorkRemixRecordInput { source_session_id: session_id, target_session_id: build_prefixed_uuid_id("big-fish-session-"), target_owner_user_id: authenticated.claims().user_id().to_string(), welcome_message_id: build_prefixed_uuid_id("big-fish-message-"), remixed_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn submit_big_fish_message( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let client_message_id = payload.client_message_id.trim().to_string(); let message_text = payload.text.trim().to_string(); if client_message_id.is_empty() || message_text.is_empty() { return Err(big_fish_bad_request( &request_context, "clientMessageId and text are required", )); } let owner_user_id = authenticated.claims().user_id().to_string(); let submitted_session = state .spacetime_client() .submit_big_fish_message(BigFishMessageSubmitRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), user_message_id: client_message_id, user_message_text: message_text, assistant_message_id: build_prefixed_uuid_id("big-fish-message-"), submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new( "big_fish", owner_user_id.as_str(), session_id.as_str(), payload.client_message_id.as_str(), "大鱼吃小鱼模板生成草稿", )); if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await { tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行"); } let draft_sink = AiGenerationDraftSink::new( AiGenerationDraftContext::new( "big_fish", owner_user_id.as_str(), session_id.as_str(), payload.client_message_id.as_str(), "大鱼吃小鱼模板生成草稿", ), state.spacetime_client().clone(), ); let turn_result = run_big_fish_agent_turn( BigFishAgentTurnRequest { llm_client: state.llm_client(), session: &submitted_session, quick_fill_requested: payload.quick_fill_requested.unwrap_or(false), enable_web_search: state.config.creation_agent_llm_web_search_enabled, }, move |text| { draft_sink.persist_visible_text_async(text); }, ) .await; if let Ok(result) = &turn_result { draft_writer .persist_visible_text( state.spacetime_client(), result.assistant_reply_text.as_str(), ) .await; } let finalize_input = match turn_result { Ok(turn_result) => build_finalize_record_input( session_id.clone(), owner_user_id.clone(), build_prefixed_uuid_id("big-fish-message-"), turn_result, current_utc_micros(), ), Err(error) => build_failed_finalize_record_input( session_id.clone(), owner_user_id.clone(), &submitted_session, error.to_string(), current_utc_micros(), ), }; let session = state .spacetime_client() .finalize_big_fish_agent_message(finalize_input) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn stream_big_fish_message( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let client_message_id = payload.client_message_id.trim().to_string(); let message_text = payload.text.trim().to_string(); if client_message_id.is_empty() || message_text.is_empty() { return Err(big_fish_bad_request( &request_context, "clientMessageId and text are required", )); } let owner_user_id = authenticated.claims().user_id().to_string(); let submitted_session = state .spacetime_client() .submit_big_fish_message(BigFishMessageSubmitRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), user_message_id: client_message_id, user_message_text: message_text, assistant_message_id: build_prefixed_uuid_id("big-fish-message-"), submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false); let state = state.clone(); let session_id_for_stream = session_id.clone(); let owner_user_id_for_stream = owner_user_id.clone(); let client_message_id_for_stream = payload.client_message_id.clone(); let stream = async_stream::stream! { let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new( "big_fish", owner_user_id_for_stream.as_str(), session_id_for_stream.as_str(), client_message_id_for_stream.as_str(), "大鱼吃小鱼模板生成草稿", )); if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await { tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行"); } let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::(); // 与 RPG/拼图 Agent 保持同一语义:回复先流式展示,session 真相仍等 finalize 后下发。 let turn_result = { let run_turn = run_big_fish_agent_turn( BigFishAgentTurnRequest { llm_client: state.llm_client(), session: &submitted_session, quick_fill_requested, enable_web_search: state.config.creation_agent_llm_web_search_enabled, }, move |text| { let _ = reply_tx.send(text.to_string()); }, ); tokio::pin!(run_turn); loop { // 每个 replyText 增量同时写草稿表并推给 SSE,避免前端等待完整模型响应。 tokio::select! { result = &mut run_turn => break result, maybe_text = reply_rx.recv() => { if let Some(text) = maybe_text { draft_writer .persist_visible_text(state.spacetime_client(), text.as_str()) .await; yield Ok::(big_fish_sse_json_event_or_error( "reply_delta", json!({ "text": text }), )); } } } } }; while let Some(text) = reply_rx.recv().await { draft_writer .persist_visible_text(state.spacetime_client(), text.as_str()) .await; yield Ok::(big_fish_sse_json_event_or_error( "reply_delta", json!({ "text": text }), )); } let finalize_input = match turn_result { Ok(turn_result) => build_finalize_record_input( session_id_for_stream.clone(), owner_user_id_for_stream.clone(), build_prefixed_uuid_id("big-fish-message-"), turn_result, current_utc_micros(), ), Err(error) => build_failed_finalize_record_input( session_id_for_stream.clone(), owner_user_id_for_stream.clone(), &submitted_session, error.to_string(), current_utc_micros(), ), }; let session = match state .spacetime_client() .finalize_big_fish_agent_message(finalize_input) .await { Ok(session) => session, Err(error) => { yield Ok::(big_fish_sse_json_event_or_error( "error", json!({ "message": error.to_string() }), )); return; } }; let session_response = map_big_fish_session_response(session); yield Ok::(big_fish_sse_json_event_or_error( "session", json!({ "session": session_response }), )); yield Ok::(big_fish_sse_json_event_or_error( "done", json!({ "ok": true }), )); }; Ok(Sse::new(stream).into_response()) } pub async fn execute_big_fish_action( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let owner_user_id = authenticated.claims().user_id().to_string(); let now = current_utc_micros(); let action = payload.action.trim().to_string(); let billed_asset_kind = match action.as_str() { "big_fish_generate_level_main_image" => Some("big_fish_level_main_image"), "big_fish_generate_level_motion" => Some("big_fish_level_motion"), "big_fish_generate_stage_background" => Some("big_fish_stage_background"), "big_fish_publish_game" => Some("big_fish_publish_game"), _ => None, }; let billing_asset_id = format!("{session_id}:{now}"); let session_operation = async { match action.as_str() { "big_fish_compile_draft" => { compile_big_fish_draft_only(&state, session_id.clone(), owner_user_id.clone(), now) .await .map_err(map_big_fish_client_error) } "big_fish_generate_level_main_image" => { let asset_url = generate_big_fish_formal_asset( &state, &owner_user_id, &session_id, "level_main_image", payload.level, None, now, ) .await?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { owner_user_id: owner_user_id.clone(), session_id: session_id.clone(), asset_kind: "level_main_image".to_string(), level: payload.level, motion_key: None, asset_url: Some(asset_url), generated_at_micros: now, }) .await .map_err(map_big_fish_client_error) } "big_fish_generate_level_motion" => { let asset_url = generate_big_fish_formal_asset( &state, &owner_user_id, &session_id, "level_motion", payload.level, payload.motion_key.as_deref(), now, ) .await?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { owner_user_id: owner_user_id.clone(), session_id: session_id.clone(), asset_kind: "level_motion".to_string(), level: payload.level, motion_key: payload.motion_key, asset_url: Some(asset_url), generated_at_micros: now, }) .await .map_err(map_big_fish_client_error) } "big_fish_generate_stage_background" => { let asset_url = generate_big_fish_formal_asset( &state, &owner_user_id, &session_id, "stage_background", None, None, now, ) .await?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { owner_user_id: owner_user_id.clone(), session_id: session_id.clone(), asset_kind: "stage_background".to_string(), level: None, motion_key: None, asset_url: Some(asset_url), generated_at_micros: now, }) .await .map_err(map_big_fish_client_error) } "big_fish_publish_game" => state .spacetime_client() .publish_big_fish_game(session_id, owner_user_id.clone(), now) .await .map_err(map_big_fish_client_error), other => Err( AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": format!("action `{other}` is not supported"), })), ), } }; let session_result = if let Some(asset_kind) = billed_asset_kind { execute_billable_asset_operation( &state, &owner_user_id, asset_kind, &billing_asset_id, session_operation, ) .await } else { session_operation.await }; let session = session_result.map_err(|error| big_fish_error_response(&request_context, error))?; Ok(json_success_body( Some(&request_context), BigFishActionResponse { session: map_big_fish_session_response(session), }, )) } fn map_big_fish_session_response(session: BigFishSessionRecord) -> BigFishSessionSnapshotResponse { BigFishSessionSnapshotResponse { session_id: session.session_id, current_turn: session.current_turn, progress_percent: session.progress_percent, stage: session.stage, anchor_pack: map_big_fish_anchor_pack_response(session.anchor_pack), draft: session.draft.map(map_big_fish_draft_response), asset_slots: session .asset_slots .into_iter() .map(map_big_fish_asset_slot_response) .collect(), asset_coverage: map_big_fish_asset_coverage_response(session.asset_coverage), messages: session .messages .into_iter() .map(map_big_fish_agent_message_response) .collect(), last_assistant_reply: session.last_assistant_reply, publish_ready: session.publish_ready, updated_at: session.updated_at, } } fn map_big_fish_anchor_pack_response( anchor_pack: BigFishAnchorPackRecord, ) -> BigFishAnchorPackResponse { BigFishAnchorPackResponse { gameplay_promise: map_big_fish_anchor_item_response(anchor_pack.gameplay_promise), ecology_visual_theme: map_big_fish_anchor_item_response(anchor_pack.ecology_visual_theme), growth_ladder: map_big_fish_anchor_item_response(anchor_pack.growth_ladder), risk_tempo: map_big_fish_anchor_item_response(anchor_pack.risk_tempo), } } fn map_big_fish_anchor_item_response(anchor: BigFishAnchorItemRecord) -> BigFishAnchorItemResponse { BigFishAnchorItemResponse { key: anchor.key, label: anchor.label, value: anchor.value, status: anchor.status, } } fn map_big_fish_draft_response(draft: BigFishGameDraftRecord) -> BigFishGameDraftResponse { BigFishGameDraftResponse { title: draft.title, subtitle: draft.subtitle, core_fun: draft.core_fun, ecology_theme: draft.ecology_theme, levels: draft .levels .into_iter() .map(map_big_fish_level_response) .collect(), background: map_big_fish_background_response(draft.background), runtime_params: map_big_fish_runtime_params_response(draft.runtime_params), } } fn map_big_fish_level_response( level: BigFishLevelBlueprintRecord, ) -> BigFishLevelBlueprintResponse { BigFishLevelBlueprintResponse { level: level.level, name: level.name, one_line_fantasy: level.one_line_fantasy, text_description: level.text_description, silhouette_direction: level.silhouette_direction, size_ratio: level.size_ratio, visual_description: level.visual_description, visual_prompt_seed: level.visual_prompt_seed, idle_motion_description: level.idle_motion_description, move_motion_description: level.move_motion_description, motion_prompt_seed: level.motion_prompt_seed, merge_source_level: level.merge_source_level, prey_window: level.prey_window, threat_window: level.threat_window, is_final_level: level.is_final_level, } } fn map_big_fish_background_response( background: BigFishBackgroundBlueprintRecord, ) -> BigFishBackgroundBlueprintResponse { BigFishBackgroundBlueprintResponse { theme: background.theme, color_mood: background.color_mood, foreground_hints: background.foreground_hints, midground_composition: background.midground_composition, background_depth: background.background_depth, safe_play_area_hint: background.safe_play_area_hint, spawn_edge_hint: background.spawn_edge_hint, background_prompt_seed: background.background_prompt_seed, } } fn map_big_fish_runtime_params_response( params: BigFishRuntimeParamsRecord, ) -> BigFishRuntimeParamsResponse { BigFishRuntimeParamsResponse { level_count: params.level_count, merge_count_per_upgrade: params.merge_count_per_upgrade, spawn_target_count: params.spawn_target_count, leader_move_speed: params.leader_move_speed, follower_catch_up_speed: params.follower_catch_up_speed, offscreen_cull_seconds: params.offscreen_cull_seconds, prey_spawn_delta_levels: params.prey_spawn_delta_levels, threat_spawn_delta_levels: params.threat_spawn_delta_levels, win_level: params.win_level, } } fn map_big_fish_asset_slot_response(slot: BigFishAssetSlotRecord) -> BigFishAssetSlotResponse { BigFishAssetSlotResponse { slot_id: slot.slot_id, asset_kind: slot.asset_kind, level: slot.level, motion_key: slot.motion_key, status: slot.status, asset_url: slot.asset_url, prompt_snapshot: slot.prompt_snapshot, updated_at: slot.updated_at, } } fn map_big_fish_asset_coverage_response( coverage: BigFishAssetCoverageRecord, ) -> BigFishAssetCoverageResponse { BigFishAssetCoverageResponse { level_main_image_ready_count: coverage.level_main_image_ready_count, level_motion_ready_count: coverage.level_motion_ready_count, background_ready: coverage.background_ready, required_level_count: coverage.required_level_count, publish_ready: coverage.publish_ready, blockers: coverage.blockers, } } fn map_big_fish_run_response(run: BigFishRuntimeRunRecord) -> BigFishRuntimeSnapshotResponse { BigFishRuntimeSnapshotResponse { run_id: run.run_id, session_id: run.session_id, status: run.status, tick: run.tick, player_level: run.player_level, win_level: run.win_level, leader_entity_id: run.leader_entity_id, owned_entities: run .owned_entities .into_iter() .map(map_big_fish_runtime_entity_response) .collect(), wild_entities: run .wild_entities .into_iter() .map(map_big_fish_runtime_entity_response) .collect(), camera_center: map_big_fish_vector2_response(run.camera_center), last_input: map_big_fish_vector2_response(run.last_input), event_log: run.event_log, updated_at: run.updated_at, } } fn map_big_fish_runtime_entity_response( entity: BigFishRuntimeEntityRecord, ) -> BigFishRuntimeEntityResponse { BigFishRuntimeEntityResponse { entity_id: entity.entity_id, level: entity.level, position: map_big_fish_vector2_response(entity.position), radius: entity.radius, offscreen_seconds: entity.offscreen_seconds, } } fn map_big_fish_vector2_response(vector: BigFishVector2Record) -> BigFishVector2Response { BigFishVector2Response { x: vector.x, y: vector.y, } } async fn compile_big_fish_draft_only( state: &AppState, session_id: String, owner_user_id: String, now: i64, ) -> Result { // 中文注释:大鱼吃小鱼草稿阶段只负责编译结果页草稿,不在这一步串行生成主图、动作或背景。 // 这些资产统一留在结果页工坊按需触发,避免 compile action 因长耗时资产任务卡在首步等待态。 let session = load_big_fish_session_with_retry(state, session_id.clone(), owner_user_id.clone()).await?; let anchor_pack = map_record_anchor_pack_to_domain(&session.anchor_pack); let compiled_draft = compile_big_fish_draft_with_fallback(state.llm_client(), &anchor_pack).await; let draft_json = serde_json::to_string(&compiled_draft).ok(); state .spacetime_client() .compile_big_fish_draft(BigFishDraftCompileRecordInput { session_id, owner_user_id, draft_json, compiled_at_micros: now, }) .await } async fn load_big_fish_session_with_retry( state: &AppState, session_id: String, owner_user_id: String, ) -> Result { let mut last_retryable_error = None; for attempt in 0..2 { match state .spacetime_client() .get_big_fish_session(session_id.clone(), owner_user_id.clone()) .await { Ok(session) => return Ok(session), Err(error @ SpacetimeClientError::Timeout) | Err(error @ SpacetimeClientError::ConnectDropped) => { last_retryable_error = Some(error); if attempt == 0 { sleep(Duration::from_millis(250)).await; continue; } } Err(error) => return Err(error), } } Err(last_retryable_error.unwrap_or(SpacetimeClientError::Timeout)) } fn map_record_anchor_pack_to_domain( anchor_pack: &BigFishAnchorPackRecord, ) -> module_big_fish::BigFishAnchorPack { module_big_fish::BigFishAnchorPack { gameplay_promise: map_record_anchor_item_to_domain(&anchor_pack.gameplay_promise), ecology_visual_theme: map_record_anchor_item_to_domain(&anchor_pack.ecology_visual_theme), growth_ladder: map_record_anchor_item_to_domain(&anchor_pack.growth_ladder), risk_tempo: map_record_anchor_item_to_domain(&anchor_pack.risk_tempo), } } fn map_record_anchor_item_to_domain( anchor_item: &BigFishAnchorItemRecord, ) -> module_big_fish::BigFishAnchorItem { module_big_fish::BigFishAnchorItem { key: anchor_item.key.clone(), label: anchor_item.label.clone(), value: anchor_item.value.clone(), status: match anchor_item.status.as_str() { "confirmed" => module_big_fish::BigFishAnchorStatus::Confirmed, "locked" => module_big_fish::BigFishAnchorStatus::Locked, "inferred" => module_big_fish::BigFishAnchorStatus::Inferred, _ => module_big_fish::BigFishAnchorStatus::Missing, }, } } fn map_big_fish_agent_message_response( message: BigFishAgentMessageRecord, ) -> BigFishAgentMessageResponse { BigFishAgentMessageResponse { id: message.message_id, role: message.role, kind: message.kind, text: message.text, created_at: message.created_at, } } fn map_big_fish_work_summary_response( state: &AppState, item: BigFishWorkSummaryRecord, ) -> BigFishWorkSummaryResponse { let author = resolve_work_author_by_user_id(state, &item.owner_user_id, None, None); BigFishWorkSummaryResponse { work_id: item.work_id, source_session_id: item.source_session_id, owner_user_id: item.owner_user_id, author_display_name: author.display_name, title: item.title, subtitle: item.subtitle, summary: item.summary, cover_image_src: item.cover_image_src, status: item.status, updated_at: current_timestamp_micros_to_string(item.updated_at_micros), published_at: item .published_at_micros .map(current_timestamp_micros_to_string), publish_ready: item.publish_ready, level_count: item.level_count, level_main_image_ready_count: item.level_main_image_ready_count, level_motion_ready_count: item.level_motion_ready_count, background_ready: item.background_ready, play_count: item.play_count, remix_count: item.remix_count, like_count: item.like_count, recent_play_count_7d: item.recent_play_count_7d, } } fn build_big_fish_welcome_text(seed_text: &str) -> String { if seed_text.trim().is_empty() { return "我会先帮你确定大鱼吃小鱼的核心锚点。可以从主题生态、成长阶梯或风险节奏开始。" .to_string(); } "我已经收到你的玩法起点,会先把它整理成锚点并准备结果页草稿。".to_string() } struct BigFishDashScopeSettings { base_url: String, api_key: String, request_timeout_ms: u64, } struct BigFishGeneratedImage { image_url: String, task_id: String, } struct BigFishDownloadedImage { mime_type: String, extension: String, bytes: Vec, } struct BigFishFormalAssetContext { entity_id: String, prompt: String, negative_prompt: String, size: String, asset_object_kind: String, binding_slot: String, path_segments: Vec, apply_transparent_background_post_process: bool, } const BIG_FISH_TEXT_TO_IMAGE_MODEL: &str = "wan2.2-t2i-flash"; const BIG_FISH_ENTITY_KIND: &str = "big_fish_session"; async fn generate_big_fish_formal_asset( state: &AppState, owner_user_id: &str, session_id: &str, asset_kind: &str, level: Option, motion_key: Option<&str>, generated_at_micros: i64, ) -> Result { let session = state .spacetime_client() .get_big_fish_session(session_id.to_string(), owner_user_id.to_string()) .await .map_err(map_big_fish_client_error)?; let draft = session.draft.as_ref().ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": "玩法草稿尚未编译,不能生成正式图片。", })) })?; let context = build_big_fish_formal_asset_context( &session, draft, asset_kind, level, motion_key, generated_at_micros, )?; let settings = require_big_fish_dashscope_settings(state)?; let http_client = build_big_fish_dashscope_http_client(&settings)?; let generated = create_big_fish_text_to_image_generation( &http_client, &settings, context.prompt.as_str(), context.negative_prompt.as_str(), context.size.as_str(), ) .await?; let downloaded = download_big_fish_remote_image( &http_client, generated.image_url.as_str(), "下载 Big Fish 正式图片失败", context.apply_transparent_background_post_process, ) .await?; persist_big_fish_formal_asset( state, owner_user_id, &context, generated, downloaded, generated_at_micros, ) .await } fn build_big_fish_formal_asset_context( session: &BigFishSessionRecord, draft: &BigFishGameDraftRecord, asset_kind: &str, level: Option, motion_key: Option<&str>, generated_at_micros: i64, ) -> Result { let asset_id = format!("asset-{generated_at_micros}"); match asset_kind { "level_main_image" => { let level = find_big_fish_level_blueprint(draft, level)?; let level_part = build_big_fish_level_part(Some(level.level)); Ok(BigFishFormalAssetContext { entity_id: session.session_id.clone(), prompt: build_big_fish_level_main_image_prompt(draft, level), negative_prompt: BIG_FISH_TRANSPARENT_ASSET_NEGATIVE_PROMPT.to_string(), size: "1024*1024".to_string(), asset_object_kind: "big_fish_level_main_image".to_string(), binding_slot: format!("level_main_image:{level_part}"), path_segments: vec![ sanitize_big_fish_path_segment(session.session_id.as_str(), "session"), "level-main-image".to_string(), level_part, asset_id, ], apply_transparent_background_post_process: true, }) } "level_motion" => { let level = find_big_fish_level_blueprint(draft, level)?; let motion_key = motion_key .map(str::trim) .filter(|value| matches!(*value, "idle_float" | "move_swim")) .ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": "motionKey 必须是 idle_float 或 move_swim。", })) })?; let level_part = build_big_fish_level_part(Some(level.level)); Ok(BigFishFormalAssetContext { entity_id: session.session_id.clone(), prompt: build_big_fish_level_motion_prompt(draft, level, motion_key), negative_prompt: BIG_FISH_TRANSPARENT_ASSET_NEGATIVE_PROMPT.to_string(), size: "1024*1024".to_string(), asset_object_kind: "big_fish_level_motion".to_string(), binding_slot: format!("level_motion:{level_part}:{motion_key}"), path_segments: vec![ sanitize_big_fish_path_segment(session.session_id.as_str(), "session"), "level-motion".to_string(), level_part, sanitize_big_fish_path_segment(motion_key, "motion"), asset_id, ], apply_transparent_background_post_process: true, }) } "stage_background" => Ok(BigFishFormalAssetContext { entity_id: session.session_id.clone(), prompt: build_big_fish_stage_background_prompt(draft), negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(), size: "720*1280".to_string(), asset_object_kind: "big_fish_stage_background".to_string(), binding_slot: "stage_background".to_string(), path_segments: vec![ sanitize_big_fish_path_segment(session.session_id.as_str(), "session"), "stage-background".to_string(), asset_id, ], apply_transparent_background_post_process: false, }), _ => Err( AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": format!("assetKind `{asset_kind}` 不支持正式图片生成。"), })), ), } } fn find_big_fish_level_blueprint( draft: &BigFishGameDraftRecord, level: Option, ) -> Result<&BigFishLevelBlueprintRecord, AppError> { let level = level.ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": "level 是等级资产生成的必填项。", })) })?; draft .levels .iter() .find(|blueprint| blueprint.level == level) .ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": format!("level `{level}` 不存在于当前 Big Fish 草稿。"), })) }) } fn require_big_fish_dashscope_settings( state: &AppState, ) -> Result { let base_url = state.config.dashscope_base_url.trim().trim_end_matches('/'); if base_url.is_empty() { return Err( AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "dashscope", "reason": "DASHSCOPE_BASE_URL 未配置", })), ); } let api_key = state .config .dashscope_api_key .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) .ok_or_else(|| { AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "dashscope", "reason": "DASHSCOPE_API_KEY 未配置", })) })?; Ok(BigFishDashScopeSettings { base_url: base_url.to_string(), api_key: api_key.to_string(), request_timeout_ms: state.config.dashscope_image_request_timeout_ms.max(1), }) } fn build_big_fish_dashscope_http_client( settings: &BigFishDashScopeSettings, ) -> Result { reqwest::Client::builder() .timeout(Duration::from_millis(settings.request_timeout_ms)) .build() .map_err(|error| { AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ "provider": "dashscope", "message": format!("构造 DashScope HTTP 客户端失败:{error}"), })) }) } async fn create_big_fish_text_to_image_generation( http_client: &reqwest::Client, settings: &BigFishDashScopeSettings, prompt: &str, negative_prompt: &str, size: &str, ) -> Result { let mut parameters = Map::from_iter([ ("n".to_string(), json!(1)), ("size".to_string(), Value::String(size.to_string())), ("prompt_extend".to_string(), Value::Bool(true)), ("watermark".to_string(), Value::Bool(false)), ]); if !negative_prompt.trim().is_empty() { parameters.insert( "negative_prompt".to_string(), Value::String(negative_prompt.trim().to_string()), ); } let response = http_client .post(format!( "{}/services/aigc/text2image/image-synthesis", settings.base_url )) .header( reqwest::header::AUTHORIZATION, format!("Bearer {}", settings.api_key), ) .header(reqwest::header::CONTENT_TYPE, "application/json") .header("X-DashScope-Async", "enable") .json(&json!({ "model": BIG_FISH_TEXT_TO_IMAGE_MODEL, "input": { "prompt": prompt, }, "parameters": parameters, })) .send() .await .map_err(|error| { map_big_fish_dashscope_request_error(format!("创建 Big Fish 图片生成任务失败:{error}")) })?; let status = response.status(); let response_text = response.text().await.map_err(|error| { map_big_fish_dashscope_request_error(format!("读取 Big Fish 图片生成响应失败:{error}")) })?; if !status.is_success() { return Err(map_big_fish_dashscope_upstream_error( response_text.as_str(), "创建 Big Fish 图片生成任务失败", )); } let payload = parse_big_fish_json_payload(response_text.as_str(), "解析 Big Fish 图片生成响应失败")?; let task_id = extract_big_fish_task_id(&payload).ok_or_else(|| { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": "Big Fish 图片生成任务未返回 task_id", })) })?; let deadline = Instant::now() + Duration::from_millis(settings.request_timeout_ms); while Instant::now() < deadline { let poll_response = http_client .get(format!("{}/tasks/{}", settings.base_url, task_id)) .header( reqwest::header::AUTHORIZATION, format!("Bearer {}", settings.api_key), ) .send() .await .map_err(|error| { map_big_fish_dashscope_request_error(format!( "查询 Big Fish 图片生成任务失败:{error}" )) })?; let poll_status = poll_response.status(); let poll_text = poll_response.text().await.map_err(|error| { map_big_fish_dashscope_request_error(format!( "读取 Big Fish 图片生成任务响应失败:{error}" )) })?; if !poll_status.is_success() { return Err(map_big_fish_dashscope_upstream_error( poll_text.as_str(), "查询 Big Fish 图片生成任务失败", )); } let poll_payload = parse_big_fish_json_payload(poll_text.as_str(), "解析 Big Fish 图片生成任务响应失败")?; let task_status = find_first_big_fish_string_by_key(&poll_payload, "task_status") .unwrap_or_default() .trim() .to_string(); if task_status == "SUCCEEDED" { let image_url = extract_big_fish_image_urls(&poll_payload) .into_iter() .next() .ok_or_else(|| { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": "Big Fish 图片生成成功但未返回图片地址", })) })?; return Ok(BigFishGeneratedImage { image_url, task_id }); } if matches!(task_status.as_str(), "FAILED" | "UNKNOWN") { return Err(map_big_fish_dashscope_upstream_error( poll_text.as_str(), "Big Fish 图片生成任务失败", )); } sleep(Duration::from_secs(2)).await; } Err( AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": "Big Fish 图片生成超时或未返回图片地址", })), ) } async fn download_big_fish_remote_image( http_client: &reqwest::Client, image_url: &str, fallback_message: &str, apply_transparent_background_post_process: bool, ) -> Result { let response = http_client.get(image_url).send().await.map_err(|error| { map_big_fish_dashscope_request_error(format!("{fallback_message}:{error}")) })?; let status = response.status(); let content_type = response .headers() .get(reqwest::header::CONTENT_TYPE) .and_then(|value| value.to_str().ok()) .unwrap_or("image/jpeg") .to_string(); let bytes = response.bytes().await.map_err(|error| { map_big_fish_dashscope_request_error(format!("{fallback_message}:{error}")) })?; if !status.is_success() { return Err( AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": fallback_message, "status": status.as_u16(), })), ); } let mime_type = normalize_big_fish_downloaded_image_mime_type(content_type.as_str()); let mut normalized_bytes = bytes.to_vec(); let mut normalized_mime_type = mime_type; let mut extension = big_fish_mime_to_extension(normalized_mime_type.as_str()).to_string(); // 中文注释:Big Fish 的等级主图与动作关键帧要和 RPG 角色主图保持同一后处理口径。 // 因此在上游已经输出 PNG 时,统一补一层透明背景 alpha 清理,避免只靠 prompt 约束导致残留底色。 if apply_transparent_background_post_process && normalized_mime_type == "image/png" && let Some(optimized) = try_apply_background_alpha_to_png(normalized_bytes.as_slice()) { normalized_bytes = optimized; normalized_mime_type = "image/png".to_string(); extension = "png".to_string(); } Ok(BigFishDownloadedImage { extension, mime_type: normalized_mime_type, bytes: normalized_bytes, }) } async fn persist_big_fish_formal_asset( state: &AppState, owner_user_id: &str, context: &BigFishFormalAssetContext, generated: BigFishGeneratedImage, downloaded: BigFishDownloadedImage, generated_at_micros: i64, ) -> Result { let oss_client = state.oss_client().ok_or_else(|| { AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "aliyun-oss", "reason": "OSS 未完成环境变量配置", })) })?; let http_client = reqwest::Client::new(); let put_result = oss_client .put_object( &http_client, OssPutObjectRequest { prefix: LegacyAssetPrefix::BigFishAssets, path_segments: context.path_segments.clone(), file_name: format!("image.{}", downloaded.extension), content_type: Some(downloaded.mime_type.clone()), access: OssObjectAccess::Private, metadata: build_big_fish_asset_metadata( context.asset_object_kind.as_str(), owner_user_id, BIG_FISH_ENTITY_KIND, context.entity_id.as_str(), context.binding_slot.as_str(), ), body: downloaded.bytes, }, ) .await .map_err(map_big_fish_asset_oss_error)?; let head = oss_client .head_object( &http_client, OssHeadObjectRequest { object_key: put_result.object_key.clone(), }, ) .await .map_err(map_big_fish_asset_oss_error)?; let asset_object = state .spacetime_client() .confirm_asset_object( build_asset_object_upsert_input( generate_asset_object_id(generated_at_micros), head.bucket, head.object_key, AssetObjectAccessPolicy::Private, head.content_type.or(Some(downloaded.mime_type)), head.content_length, head.etag, context.asset_object_kind.clone(), Some(generated.task_id), Some(owner_user_id.to_string()), None, Some(context.entity_id.clone()), generated_at_micros, ) .map_err(map_big_fish_asset_object_prepare_error)?, ) .await .map_err(map_big_fish_asset_spacetime_error)?; state .spacetime_client() .bind_asset_object_to_entity( build_asset_entity_binding_input( generate_asset_binding_id(generated_at_micros), asset_object.asset_object_id, BIG_FISH_ENTITY_KIND.to_string(), context.entity_id.clone(), context.binding_slot.clone(), context.asset_object_kind.clone(), Some(owner_user_id.to_string()), None, generated_at_micros, ) .map_err(map_big_fish_asset_binding_prepare_error)?, ) .await .map_err(map_big_fish_asset_spacetime_error)?; Ok(put_result.legacy_public_path) } fn build_big_fish_asset_metadata( asset_kind: &str, owner_user_id: &str, entity_kind: &str, entity_id: &str, slot: &str, ) -> BTreeMap { BTreeMap::from([ ("asset_kind".to_string(), asset_kind.to_string()), ("owner_user_id".to_string(), owner_user_id.to_string()), ("entity_kind".to_string(), entity_kind.to_string()), ("entity_id".to_string(), entity_id.to_string()), ("slot".to_string(), slot.to_string()), ]) } fn parse_big_fish_json_payload(raw_text: &str, fallback_message: &str) -> Result { serde_json::from_str::(raw_text).map_err(|error| { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": format!("{fallback_message}:{error}"), })) }) } fn extract_big_fish_task_id(payload: &Value) -> Option { find_first_big_fish_string_by_key(payload, "task_id") } fn extract_big_fish_image_urls(payload: &Value) -> Vec { let mut urls = Vec::new(); collect_big_fish_strings_by_key(payload, "image", &mut urls); collect_big_fish_strings_by_key(payload, "url", &mut urls); let mut deduped = Vec::new(); for url in urls { if !deduped.contains(&url) { deduped.push(url); } } deduped } fn find_first_big_fish_string_by_key(payload: &Value, target_key: &str) -> Option { let mut results = Vec::new(); collect_big_fish_strings_by_key(payload, target_key, &mut results); results.into_iter().next() } fn collect_big_fish_strings_by_key(payload: &Value, target_key: &str, results: &mut Vec) { match payload { Value::Array(entries) => { for entry in entries { collect_big_fish_strings_by_key(entry, target_key, results); } } Value::Object(object) => { for (key, value) in object { if key == target_key && let Some(text) = value.as_str() { results.push(text.to_string()); } collect_big_fish_strings_by_key(value, target_key, results); } } _ => {} } } fn normalize_big_fish_downloaded_image_mime_type(content_type: &str) -> String { let mime_type = content_type .split(';') .next() .map(str::trim) .unwrap_or("image/jpeg"); match mime_type { "image/png" | "image/webp" | "image/jpeg" | "image/jpg" | "image/gif" => { mime_type.to_string() } _ => "image/jpeg".to_string(), } } fn big_fish_mime_to_extension(mime_type: &str) -> &str { match mime_type { "image/png" => "png", "image/webp" => "webp", "image/gif" => "gif", _ => "jpg", } } fn map_big_fish_dashscope_request_error(message: String) -> AppError { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": message, })) } fn map_big_fish_dashscope_upstream_error(raw_text: &str, fallback_message: &str) -> AppError { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": parse_big_fish_api_error_message(raw_text, fallback_message), })) } fn parse_big_fish_api_error_message(raw_text: &str, fallback_message: &str) -> String { let trimmed = raw_text.trim(); if trimmed.is_empty() { return fallback_message.to_string(); } if let Ok(payload) = serde_json::from_str::(trimmed) && let Some(message) = find_first_big_fish_string_by_key(&payload, "message") .or_else(|| find_first_big_fish_string_by_key(&payload, "code")) { return message; } let excerpt = trimmed.chars().take(240).collect::(); format!("{fallback_message}:{excerpt}") } fn map_big_fish_asset_object_prepare_error(error: AssetObjectFieldError) -> AppError { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "asset-object", "message": error.to_string(), })) } fn map_big_fish_asset_binding_prepare_error(error: AssetObjectFieldError) -> AppError { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "asset-entity-binding", "message": error.to_string(), })) } fn map_big_fish_asset_spacetime_error(error: SpacetimeClientError) -> AppError { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "spacetimedb", "message": error.to_string(), })) } fn map_big_fish_asset_oss_error(error: platform_oss::OssError) -> AppError { map_oss_error(error, "aliyun-oss") } fn build_big_fish_level_part(level: Option) -> String { level .map(|value| format!("level-{value}")) .unwrap_or_else(|| "stage".to_string()) } fn sanitize_big_fish_path_segment(value: &str, fallback: &str) -> String { let sanitized = value .trim() .chars() .map(|ch| { if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' { ch } else { '-' } }) .collect::() .trim_matches('-') .to_string(); if sanitized.is_empty() { fallback.to_string() } else { sanitized } } fn ensure_non_empty( request_context: &RequestContext, value: &str, field_name: &str, ) -> Result<(), Response> { if value.trim().is_empty() { return Err(big_fish_bad_request( request_context, format!("{field_name} is required").as_str(), )); } Ok(()) } fn big_fish_bad_request(request_context: &RequestContext, message: &str) -> Response { big_fish_error_response( request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": message, })), ) } fn big_fish_sse_json_event_or_error(event_name: &str, payload: Value) -> Event { match serde_json::to_string(&payload) { Ok(payload_text) => Event::default().event(event_name).data(payload_text), Err(_) => big_fish_sse_error_event_message("SSE payload 序列化失败".to_string()), } } fn big_fish_sse_error_event_message(message: String) -> Event { let payload = format!( "{{\"message\":{}}}", serde_json::to_string(&message) .unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string()) ); Event::default().event("error").data(payload) } fn map_big_fish_client_error(error: SpacetimeClientError) -> AppError { let status = match &error { SpacetimeClientError::Procedure(message) if message.contains("big_fish_creation_session 不存在") => { StatusCode::NOT_FOUND } SpacetimeClientError::Procedure(message) if message.contains("big_fish_runtime_run 不存在") => { StatusCode::NOT_FOUND } SpacetimeClientError::Procedure(message) if message.contains("无权访问") => { StatusCode::FORBIDDEN } SpacetimeClientError::Procedure(message) if message.contains("不能为空") || message.contains("尚未编译") || message.contains("不允许") || message.contains("非法") || message.contains("缺少") => { StatusCode::BAD_REQUEST } SpacetimeClientError::Runtime(_) => StatusCode::BAD_REQUEST, SpacetimeClientError::Timeout => StatusCode::GATEWAY_TIMEOUT, _ => StatusCode::BAD_GATEWAY, }; let message = match &error { SpacetimeClientError::Timeout => "SpacetimeDB 会话读取超时,请稍后重试。".to_string(), SpacetimeClientError::ConnectDropped => { "SpacetimeDB 会话连接已断开,请稍后重试。".to_string() } _ => error.to_string(), }; AppError::from_status(status).with_details(json!({ "provider": "spacetimedb", "message": message, })) } fn should_soft_fallback_big_fish_gallery(error: &SpacetimeClientError) -> bool { match error { // 公开广场是首页可选数据,SpacetimeDB procedure 运行态短暂失败时不应阻断平台首屏。 SpacetimeClientError::Runtime(_) | SpacetimeClientError::ConnectDropped => true, SpacetimeClientError::Procedure(message) => { message.contains("list_big_fish_works") || message.contains("procedure") || message.contains("No such procedure") } _ => false, } } fn big_fish_error_response(request_context: &RequestContext, error: AppError) -> Response { error.into_response_with_context(Some(request_context)) } fn current_utc_micros() -> i64 { let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("system clock should be after unix epoch"); i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64") } fn current_timestamp_micros_to_string(value: i64) -> String { format_timestamp_micros(value) } #[cfg(test)] mod tests { use super::*; #[test] fn big_fish_gallery_soft_fallbacks_for_runtime_errors() { assert!(should_soft_fallback_big_fish_gallery( &SpacetimeClientError::Runtime("procedure runtime error".to_string()) )); assert!(should_soft_fallback_big_fish_gallery( &SpacetimeClientError::ConnectDropped )); assert!(should_soft_fallback_big_fish_gallery( &SpacetimeClientError::Procedure("No such procedure: list_big_fish_works".to_string(),) )); } #[test] fn big_fish_gallery_keeps_timeout_errors_visible() { assert!(!should_soft_fallback_big_fish_gallery( &SpacetimeClientError::Timeout )); } }