use std::{ collections::BTreeMap, convert::Infallible, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use axum::{ Json, extract::{Extension, Path, State, rejection::JsonRejection}, http::StatusCode, response::{ IntoResponse, Response, sse::{Event, Sse}, }, }; use module_assets::{ AssetObjectAccessPolicy, AssetObjectFieldError, build_asset_entity_binding_input, build_asset_object_upsert_input, generate_asset_binding_id, generate_asset_object_id, }; use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess, OssPutObjectRequest}; use serde_json::{Map, Value, json}; use shared_contracts::big_fish::{ BigFishActionResponse, BigFishAgentMessageResponse, BigFishAnchorItemResponse, BigFishAnchorPackResponse, BigFishAssetCoverageResponse, BigFishAssetSlotResponse, BigFishBackgroundBlueprintResponse, BigFishGameDraftResponse, BigFishLevelBlueprintResponse, BigFishRunResponse, BigFishRuntimeEntityResponse, BigFishRuntimeParamsResponse, BigFishRuntimeSnapshotResponse, BigFishSessionResponse, BigFishSessionSnapshotResponse, BigFishVector2Response, CreateBigFishSessionRequest, ExecuteBigFishActionRequest, SendBigFishMessageRequest, SubmitBigFishInputRequest, }; use shared_contracts::big_fish_works::{BigFishWorkSummaryResponse, BigFishWorksResponse}; use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros}; use spacetime_client::{ BigFishAgentMessageRecord, BigFishAnchorItemRecord, BigFishAnchorPackRecord, BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput, BigFishAssetSlotRecord, BigFishBackgroundBlueprintRecord, BigFishGameDraftRecord, BigFishLevelBlueprintRecord, BigFishMessageSubmitRecordInput, BigFishRunInputSubmitRecordInput, BigFishRunStartRecordInput, BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRecord, BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record, BigFishWorkSummaryRecord, SpacetimeClientError, }; use tokio::time::sleep; use crate::big_fish_agent_turn::{ BigFishAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input, run_big_fish_agent_turn, }; use crate::{ ai_generation_drafts::{ AiGenerationDraftContext, AiGenerationDraftSink, AiGenerationDraftWriter, }, api_response::json_success_body, auth::AuthenticatedAccessToken, http_error::AppError, request_context::RequestContext, state::AppState, }; pub async fn create_big_fish_session( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; let seed_text = payload.seed_text.unwrap_or_default().trim().to_string(); let session = state .spacetime_client() .create_big_fish_session(BigFishSessionCreateRecordInput { session_id: build_prefixed_uuid_id("big-fish-session-"), owner_user_id: authenticated.claims().user_id().to_string(), seed_text: seed_text.clone(), welcome_message_id: build_prefixed_uuid_id("big-fish-message-"), welcome_message_text: build_big_fish_welcome_text(&seed_text), created_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn get_big_fish_session( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let session = state .spacetime_client() .get_big_fish_session(session_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn get_big_fish_works( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { let items = state .spacetime_client() .list_big_fish_works(authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(map_big_fish_work_summary_response) .collect(), }, )) } pub async fn delete_big_fish_work( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let items = state .spacetime_client() .delete_big_fish_work(session_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishWorksResponse { items: items .into_iter() .map(map_big_fish_work_summary_response) .collect(), }, )) } pub async fn submit_big_fish_message( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let client_message_id = payload.client_message_id.trim().to_string(); let message_text = payload.text.trim().to_string(); if client_message_id.is_empty() || message_text.is_empty() { return Err(big_fish_bad_request( &request_context, "clientMessageId and text are required", )); } let owner_user_id = authenticated.claims().user_id().to_string(); let submitted_session = state .spacetime_client() .submit_big_fish_message(BigFishMessageSubmitRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), user_message_id: client_message_id, user_message_text: message_text, assistant_message_id: build_prefixed_uuid_id("big-fish-message-"), submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new( "big_fish", owner_user_id.as_str(), session_id.as_str(), payload.client_message_id.as_str(), "大鱼吃小鱼模板生成草稿", )); if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await { tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行"); } let draft_sink = AiGenerationDraftSink::new( AiGenerationDraftContext::new( "big_fish", owner_user_id.as_str(), session_id.as_str(), payload.client_message_id.as_str(), "大鱼吃小鱼模板生成草稿", ), state.spacetime_client().clone(), ); let turn_result = run_big_fish_agent_turn( BigFishAgentTurnRequest { llm_client: state.llm_client(), session: &submitted_session, quick_fill_requested: payload.quick_fill_requested.unwrap_or(false), enable_web_search: state.config.creation_agent_llm_web_search_enabled, }, move |text| { draft_sink.persist_visible_text_async(text); }, ) .await; if let Ok(result) = &turn_result { draft_writer .persist_visible_text( state.spacetime_client(), result.assistant_reply_text.as_str(), ) .await; } let finalize_input = match turn_result { Ok(turn_result) => build_finalize_record_input( session_id.clone(), owner_user_id.clone(), build_prefixed_uuid_id("big-fish-message-"), turn_result, current_utc_micros(), ), Err(error) => build_failed_finalize_record_input( session_id.clone(), owner_user_id.clone(), &submitted_session, error.to_string(), current_utc_micros(), ), }; let session = state .spacetime_client() .finalize_big_fish_agent_message(finalize_input) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishSessionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn stream_big_fish_message( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let client_message_id = payload.client_message_id.trim().to_string(); let message_text = payload.text.trim().to_string(); if client_message_id.is_empty() || message_text.is_empty() { return Err(big_fish_bad_request( &request_context, "clientMessageId and text are required", )); } let owner_user_id = authenticated.claims().user_id().to_string(); let submitted_session = state .spacetime_client() .submit_big_fish_message(BigFishMessageSubmitRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), user_message_id: client_message_id, user_message_text: message_text, assistant_message_id: build_prefixed_uuid_id("big-fish-message-"), submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false); let state = state.clone(); let session_id_for_stream = session_id.clone(); let owner_user_id_for_stream = owner_user_id.clone(); let client_message_id_for_stream = payload.client_message_id.clone(); let stream = async_stream::stream! { let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new( "big_fish", owner_user_id_for_stream.as_str(), session_id_for_stream.as_str(), client_message_id_for_stream.as_str(), "大鱼吃小鱼模板生成草稿", )); if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await { tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行"); } let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::(); // 与 RPG/拼图 Agent 保持同一语义:回复先流式展示,session 真相仍等 finalize 后下发。 let turn_result = { let run_turn = run_big_fish_agent_turn( BigFishAgentTurnRequest { llm_client: state.llm_client(), session: &submitted_session, quick_fill_requested, enable_web_search: state.config.creation_agent_llm_web_search_enabled, }, move |text| { let _ = reply_tx.send(text.to_string()); }, ); tokio::pin!(run_turn); loop { // 每个 replyText 增量同时写草稿表并推给 SSE,避免前端等待完整模型响应。 tokio::select! { result = &mut run_turn => break result, maybe_text = reply_rx.recv() => { if let Some(text) = maybe_text { draft_writer .persist_visible_text(state.spacetime_client(), text.as_str()) .await; yield Ok::(big_fish_sse_json_event_or_error( "reply_delta", json!({ "text": text }), )); } } } } }; while let Some(text) = reply_rx.recv().await { draft_writer .persist_visible_text(state.spacetime_client(), text.as_str()) .await; yield Ok::(big_fish_sse_json_event_or_error( "reply_delta", json!({ "text": text }), )); } let finalize_input = match turn_result { Ok(turn_result) => build_finalize_record_input( session_id_for_stream.clone(), owner_user_id_for_stream.clone(), build_prefixed_uuid_id("big-fish-message-"), turn_result, current_utc_micros(), ), Err(error) => build_failed_finalize_record_input( session_id_for_stream.clone(), owner_user_id_for_stream.clone(), &submitted_session, error.to_string(), current_utc_micros(), ), }; let session = match state .spacetime_client() .finalize_big_fish_agent_message(finalize_input) .await { Ok(session) => session, Err(error) => { yield Ok::(big_fish_sse_json_event_or_error( "error", json!({ "message": error.to_string() }), )); return; } }; let session_response = map_big_fish_session_response(session); yield Ok::(big_fish_sse_json_event_or_error( "session", json!({ "session": session_response }), )); yield Ok::(big_fish_sse_json_event_or_error( "done", json!({ "ok": true }), )); }; Ok(Sse::new(stream).into_response()) } pub async fn execute_big_fish_action( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &session_id, "sessionId")?; let owner_user_id = authenticated.claims().user_id().to_string(); let now = current_utc_micros(); let session = match payload.action.trim() { "big_fish_compile_draft" => { compile_big_fish_draft_with_all_assets(&state, session_id, owner_user_id, now).await } "big_fish_generate_level_main_image" => { let asset_url = generate_big_fish_formal_asset( &state, &owner_user_id, &session_id, "level_main_image", payload.level, None, now, ) .await .map_err(|error| big_fish_error_response(&request_context, error))?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { session_id, owner_user_id, asset_kind: "level_main_image".to_string(), level: payload.level, motion_key: None, asset_url: Some(asset_url), generated_at_micros: now, }) .await } "big_fish_generate_level_motion" => { let asset_url = generate_big_fish_formal_asset( &state, &owner_user_id, &session_id, "level_motion", payload.level, payload.motion_key.as_deref(), now, ) .await .map_err(|error| big_fish_error_response(&request_context, error))?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { session_id, owner_user_id, asset_kind: "level_motion".to_string(), level: payload.level, motion_key: payload.motion_key, asset_url: Some(asset_url), generated_at_micros: now, }) .await } "big_fish_generate_stage_background" => { let asset_url = generate_big_fish_formal_asset( &state, &owner_user_id, &session_id, "stage_background", None, None, now, ) .await .map_err(|error| big_fish_error_response(&request_context, error))?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { session_id, owner_user_id, asset_kind: "stage_background".to_string(), level: None, motion_key: None, asset_url: Some(asset_url), generated_at_micros: now, }) .await } "big_fish_publish_game" => { state .spacetime_client() .publish_big_fish_game(session_id, owner_user_id, now) .await } other => { return Err(big_fish_bad_request( &request_context, format!("action `{other}` is not supported").as_str(), )); } } .map_err(|error| big_fish_error_response(&request_context, map_big_fish_client_error(error)))?; Ok(json_success_body( Some(&request_context), BigFishActionResponse { session: map_big_fish_session_response(session), }, )) } pub async fn start_big_fish_run( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let run = state .spacetime_client() .start_big_fish_run(BigFishRunStartRecordInput { run_id: build_prefixed_uuid_id("big-fish-run-"), session_id, owner_user_id: authenticated.claims().user_id().to_string(), started_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishRunResponse { run: map_big_fish_runtime_response(run), }, )) } pub async fn get_big_fish_run( State(state): State, Path(run_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &run_id, "runId")?; let run = state .spacetime_client() .get_big_fish_run(run_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishRunResponse { run: map_big_fish_runtime_response(run), }, )) } pub async fn submit_big_fish_input( State(state): State, Path(run_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { big_fish_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": error.body_text(), })), ) })?; ensure_non_empty(&request_context, &run_id, "runId")?; let run = state .spacetime_client() .submit_big_fish_input(BigFishRunInputSubmitRecordInput { run_id, owner_user_id: authenticated.claims().user_id().to_string(), input_x: payload.x, input_y: payload.y, submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { big_fish_error_response(&request_context, map_big_fish_client_error(error)) })?; Ok(json_success_body( Some(&request_context), BigFishRunResponse { run: map_big_fish_runtime_response(run), }, )) } fn map_big_fish_session_response(session: BigFishSessionRecord) -> BigFishSessionSnapshotResponse { BigFishSessionSnapshotResponse { session_id: session.session_id, current_turn: session.current_turn, progress_percent: session.progress_percent, stage: session.stage, anchor_pack: map_big_fish_anchor_pack_response(session.anchor_pack), draft: session.draft.map(map_big_fish_draft_response), asset_slots: session .asset_slots .into_iter() .map(map_big_fish_asset_slot_response) .collect(), asset_coverage: map_big_fish_asset_coverage_response(session.asset_coverage), messages: session .messages .into_iter() .map(map_big_fish_agent_message_response) .collect(), last_assistant_reply: session.last_assistant_reply, publish_ready: session.publish_ready, updated_at: session.updated_at, } } fn map_big_fish_anchor_pack_response( anchor_pack: BigFishAnchorPackRecord, ) -> BigFishAnchorPackResponse { BigFishAnchorPackResponse { gameplay_promise: map_big_fish_anchor_item_response(anchor_pack.gameplay_promise), ecology_visual_theme: map_big_fish_anchor_item_response(anchor_pack.ecology_visual_theme), growth_ladder: map_big_fish_anchor_item_response(anchor_pack.growth_ladder), risk_tempo: map_big_fish_anchor_item_response(anchor_pack.risk_tempo), } } fn map_big_fish_anchor_item_response(anchor: BigFishAnchorItemRecord) -> BigFishAnchorItemResponse { BigFishAnchorItemResponse { key: anchor.key, label: anchor.label, value: anchor.value, status: anchor.status, } } fn map_big_fish_draft_response(draft: BigFishGameDraftRecord) -> BigFishGameDraftResponse { BigFishGameDraftResponse { title: draft.title, subtitle: draft.subtitle, core_fun: draft.core_fun, ecology_theme: draft.ecology_theme, levels: draft .levels .into_iter() .map(map_big_fish_level_response) .collect(), background: map_big_fish_background_response(draft.background), runtime_params: map_big_fish_runtime_params_response(draft.runtime_params), } } fn map_big_fish_level_response( level: BigFishLevelBlueprintRecord, ) -> BigFishLevelBlueprintResponse { BigFishLevelBlueprintResponse { level: level.level, name: level.name, one_line_fantasy: level.one_line_fantasy, silhouette_direction: level.silhouette_direction, size_ratio: level.size_ratio, visual_prompt_seed: level.visual_prompt_seed, motion_prompt_seed: level.motion_prompt_seed, merge_source_level: level.merge_source_level, prey_window: level.prey_window, threat_window: level.threat_window, is_final_level: level.is_final_level, } } fn map_big_fish_background_response( background: BigFishBackgroundBlueprintRecord, ) -> BigFishBackgroundBlueprintResponse { BigFishBackgroundBlueprintResponse { theme: background.theme, color_mood: background.color_mood, foreground_hints: background.foreground_hints, midground_composition: background.midground_composition, background_depth: background.background_depth, safe_play_area_hint: background.safe_play_area_hint, spawn_edge_hint: background.spawn_edge_hint, background_prompt_seed: background.background_prompt_seed, } } fn map_big_fish_runtime_params_response( params: BigFishRuntimeParamsRecord, ) -> BigFishRuntimeParamsResponse { BigFishRuntimeParamsResponse { level_count: params.level_count, merge_count_per_upgrade: params.merge_count_per_upgrade, spawn_target_count: params.spawn_target_count, leader_move_speed: params.leader_move_speed, follower_catch_up_speed: params.follower_catch_up_speed, offscreen_cull_seconds: params.offscreen_cull_seconds, prey_spawn_delta_levels: params.prey_spawn_delta_levels, threat_spawn_delta_levels: params.threat_spawn_delta_levels, win_level: params.win_level, } } fn map_big_fish_asset_slot_response(slot: BigFishAssetSlotRecord) -> BigFishAssetSlotResponse { BigFishAssetSlotResponse { slot_id: slot.slot_id, asset_kind: slot.asset_kind, level: slot.level, motion_key: slot.motion_key, status: slot.status, asset_url: slot.asset_url, prompt_snapshot: slot.prompt_snapshot, updated_at: slot.updated_at, } } fn map_big_fish_asset_coverage_response( coverage: BigFishAssetCoverageRecord, ) -> BigFishAssetCoverageResponse { BigFishAssetCoverageResponse { level_main_image_ready_count: coverage.level_main_image_ready_count, level_motion_ready_count: coverage.level_motion_ready_count, background_ready: coverage.background_ready, required_level_count: coverage.required_level_count, publish_ready: coverage.publish_ready, blockers: coverage.blockers, } } async fn compile_big_fish_draft_with_all_assets( state: &AppState, session_id: String, owner_user_id: String, now: i64, ) -> Result { let session = state .spacetime_client() .compile_big_fish_draft(session_id.clone(), owner_user_id.clone(), now) .await?; let draft = session .draft .clone() .ok_or_else(|| SpacetimeClientError::Runtime("大鱼吃小鱼玩法草稿尚未生成".to_string()))?; // 点击生成草稿时一次性生成所有首版玩法资产,前端只负责展示进度和最终 session。 for level in &draft.levels { let asset_url = generate_big_fish_formal_asset( state, &owner_user_id, &session_id, "level_main_image", Some(level.level), None, current_utc_micros(), ) .await .map_err(|error| SpacetimeClientError::Runtime(error.message().to_string()))?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), asset_kind: "level_main_image".to_string(), level: Some(level.level), motion_key: None, asset_url: Some(asset_url), generated_at_micros: current_utc_micros(), }) .await?; } for level in &draft.levels { for motion_key in ["idle_float", "move_swim"] { let asset_url = generate_big_fish_formal_asset( state, &owner_user_id, &session_id, "level_motion", Some(level.level), Some(motion_key), current_utc_micros(), ) .await .map_err(|error| SpacetimeClientError::Runtime(error.message().to_string()))?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), asset_kind: "level_motion".to_string(), level: Some(level.level), motion_key: Some(motion_key.to_string()), asset_url: Some(asset_url), generated_at_micros: current_utc_micros(), }) .await?; } } let asset_url = generate_big_fish_formal_asset( state, &owner_user_id, &session_id, "stage_background", None, None, current_utc_micros(), ) .await .map_err(|error| SpacetimeClientError::Runtime(error.message().to_string()))?; state .spacetime_client() .generate_big_fish_asset(BigFishAssetGenerateRecordInput { session_id, owner_user_id, asset_kind: "stage_background".to_string(), level: None, motion_key: None, asset_url: Some(asset_url), generated_at_micros: current_utc_micros(), }) .await } fn map_big_fish_agent_message_response( message: BigFishAgentMessageRecord, ) -> BigFishAgentMessageResponse { BigFishAgentMessageResponse { id: message.message_id, role: message.role, kind: message.kind, text: message.text, created_at: message.created_at, } } fn map_big_fish_runtime_response(run: BigFishRuntimeRecord) -> BigFishRuntimeSnapshotResponse { BigFishRuntimeSnapshotResponse { run_id: run.run_id, session_id: run.session_id, status: run.status, tick: run.tick, player_level: run.player_level, win_level: run.win_level, leader_entity_id: run.leader_entity_id, owned_entities: run .owned_entities .into_iter() .map(map_big_fish_entity_response) .collect(), wild_entities: run .wild_entities .into_iter() .map(map_big_fish_entity_response) .collect(), camera_center: map_big_fish_vector_response(run.camera_center), last_input: map_big_fish_vector_response(run.last_input), event_log: run.event_log, updated_at: run.updated_at, } } fn map_big_fish_work_summary_response( item: BigFishWorkSummaryRecord, ) -> BigFishWorkSummaryResponse { BigFishWorkSummaryResponse { work_id: item.work_id, source_session_id: item.source_session_id, title: item.title, subtitle: item.subtitle, summary: item.summary, cover_image_src: item.cover_image_src, status: item.status, updated_at: current_timestamp_micros_to_string(item.updated_at_micros), publish_ready: item.publish_ready, level_count: item.level_count, level_main_image_ready_count: item.level_main_image_ready_count, level_motion_ready_count: item.level_motion_ready_count, background_ready: item.background_ready, } } fn map_big_fish_entity_response( entity: BigFishRuntimeEntityRecord, ) -> BigFishRuntimeEntityResponse { BigFishRuntimeEntityResponse { entity_id: entity.entity_id, level: entity.level, position: map_big_fish_vector_response(entity.position), radius: entity.radius, offscreen_seconds: entity.offscreen_seconds, } } fn map_big_fish_vector_response(vector: BigFishVector2Record) -> BigFishVector2Response { BigFishVector2Response { x: vector.x, y: vector.y, } } fn build_big_fish_welcome_text(seed_text: &str) -> String { if seed_text.trim().is_empty() { return "我会先帮你确定大鱼吃小鱼的核心锚点。可以从主题生态、成长阶梯或风险节奏开始。" .to_string(); } "我已经收到你的玩法起点,会先把它整理成锚点并准备结果页草稿。".to_string() } struct BigFishDashScopeSettings { base_url: String, api_key: String, request_timeout_ms: u64, } struct BigFishGeneratedImage { image_url: String, task_id: String, } struct BigFishDownloadedImage { mime_type: String, extension: String, bytes: Vec, } struct BigFishFormalAssetContext { entity_id: String, prompt: String, negative_prompt: String, size: String, asset_object_kind: String, binding_slot: String, path_segments: Vec, } const BIG_FISH_TEXT_TO_IMAGE_MODEL: &str = "wan2.2-t2i-flash"; const BIG_FISH_ENTITY_KIND: &str = "big_fish_session"; const BIG_FISH_DEFAULT_NEGATIVE_PROMPT: &str = "文字,水印,logo,UI界面,对话框,边框,多余肢体,畸形鱼体,低清晰度,模糊,压缩噪点,现代摄影棚,写实照片背景"; async fn generate_big_fish_formal_asset( state: &AppState, owner_user_id: &str, session_id: &str, asset_kind: &str, level: Option, motion_key: Option<&str>, generated_at_micros: i64, ) -> Result { let session = state .spacetime_client() .get_big_fish_session(session_id.to_string(), owner_user_id.to_string()) .await .map_err(map_big_fish_client_error)?; let draft = session.draft.as_ref().ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": "玩法草稿尚未编译,不能生成正式图片。", })) })?; let context = build_big_fish_formal_asset_context( &session, draft, asset_kind, level, motion_key, generated_at_micros, )?; let settings = require_big_fish_dashscope_settings(state)?; let http_client = build_big_fish_dashscope_http_client(&settings)?; let generated = create_big_fish_text_to_image_generation( &http_client, &settings, context.prompt.as_str(), context.negative_prompt.as_str(), context.size.as_str(), ) .await?; let downloaded = download_big_fish_remote_image( &http_client, generated.image_url.as_str(), "下载 Big Fish 正式图片失败", ) .await?; persist_big_fish_formal_asset( state, owner_user_id, &context, generated, downloaded, generated_at_micros, ) .await } fn build_big_fish_formal_asset_context( session: &BigFishSessionRecord, draft: &BigFishGameDraftRecord, asset_kind: &str, level: Option, motion_key: Option<&str>, generated_at_micros: i64, ) -> Result { let asset_id = format!("asset-{generated_at_micros}"); match asset_kind { "level_main_image" => { let level = find_big_fish_level_blueprint(draft, level)?; let level_part = build_big_fish_level_part(Some(level.level)); Ok(BigFishFormalAssetContext { entity_id: session.session_id.clone(), prompt: build_big_fish_level_main_image_prompt(draft, level), negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(), size: "1024*1024".to_string(), asset_object_kind: "big_fish_level_main_image".to_string(), binding_slot: format!("level_main_image:{level_part}"), path_segments: vec![ sanitize_big_fish_path_segment(session.session_id.as_str(), "session"), "level-main-image".to_string(), level_part, asset_id, ], }) } "level_motion" => { let level = find_big_fish_level_blueprint(draft, level)?; let motion_key = motion_key .map(str::trim) .filter(|value| matches!(*value, "idle_float" | "move_swim")) .ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": "motionKey 必须是 idle_float 或 move_swim。", })) })?; let level_part = build_big_fish_level_part(Some(level.level)); Ok(BigFishFormalAssetContext { entity_id: session.session_id.clone(), prompt: build_big_fish_level_motion_prompt(draft, level, motion_key), negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(), size: "1024*1024".to_string(), asset_object_kind: "big_fish_level_motion".to_string(), binding_slot: format!("level_motion:{level_part}:{motion_key}"), path_segments: vec![ sanitize_big_fish_path_segment(session.session_id.as_str(), "session"), "level-motion".to_string(), level_part, sanitize_big_fish_path_segment(motion_key, "motion"), asset_id, ], }) } "stage_background" => Ok(BigFishFormalAssetContext { entity_id: session.session_id.clone(), prompt: build_big_fish_stage_background_prompt(draft), negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(), size: "720*1280".to_string(), asset_object_kind: "big_fish_stage_background".to_string(), binding_slot: "stage_background".to_string(), path_segments: vec![ sanitize_big_fish_path_segment(session.session_id.as_str(), "session"), "stage-background".to_string(), asset_id, ], }), _ => Err( AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": format!("assetKind `{asset_kind}` 不支持正式图片生成。"), })), ), } } fn find_big_fish_level_blueprint( draft: &BigFishGameDraftRecord, level: Option, ) -> Result<&BigFishLevelBlueprintRecord, AppError> { let level = level.ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": "level 是等级资产生成的必填项。", })) })?; draft .levels .iter() .find(|blueprint| blueprint.level == level) .ok_or_else(|| { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": format!("level `{level}` 不存在于当前 Big Fish 草稿。"), })) }) } fn build_big_fish_level_main_image_prompt( draft: &BigFishGameDraftRecord, level: &BigFishLevelBlueprintRecord, ) -> String { vec![ format!( "为竖屏移动游戏《{}》生成一张等级生物主图。", draft.title ), format!( "生态主题:{}。核心乐趣:{}。", draft.ecology_theme, draft.core_fun ), format!( "等级:Lv.{},名称:{},幻想描述:{}。", level.level, level.name, level.one_line_fantasy ), format!("轮廓方向:{}。", level.silhouette_direction), format!("视觉提示词种子:{}。", level.visual_prompt_seed), "画面要求:单体游戏生物完整入镜,轮廓清晰,适合作为大鱼吃小鱼等级角色主图,2D 高完成度游戏插画,深海发光质感,中央构图。".to_string(), "不要出现 UI、文字、logo、水印、对话框或边框;背景保持干净的深海渐变或透明感,不要出现多只主体。".to_string(), ] .join("") } fn build_big_fish_level_motion_prompt( draft: &BigFishGameDraftRecord, level: &BigFishLevelBlueprintRecord, motion_key: &str, ) -> String { let motion_text = match motion_key { "move_swim" => "向右游动的关键帧预览,身体与尾鳍有清晰推进姿态,带轻微水流拖尾。", _ => "待机漂浮的关键帧预览,身体轻微摆动,姿态稳定,适合作为 idle 状态。", }; vec![ format!( "为竖屏移动游戏《{}》生成一张等级生物动作关键帧静态预览图。", draft.title ), format!("生态主题:{}。", draft.ecology_theme), format!( "等级:Lv.{},名称:{},幻想描述:{}。", level.level, level.name, level.one_line_fantasy ), format!("动作提示词种子:{}。", level.motion_prompt_seed), format!("动作要求:{motion_text}"), "画面要求:单体生物完整入镜,轮廓清晰,动作方向明确,2D 高完成度游戏插画,适合作为 Big Fish 动作槽位的静态 keyframe。".to_string(), "不要出现 UI、文字、logo、水印、对话框或边框;不要生成序列帧拼图,不要出现多只主体。".to_string(), ] .join("") } fn build_big_fish_stage_background_prompt(draft: &BigFishGameDraftRecord) -> String { let background = &draft.background; vec![ format!( "为竖屏移动游戏《{}》生成一张 9:16 全屏活动区域背景。", draft.title ), format!("生态主题:{}。", draft.ecology_theme), format!("背景主题:{}。色彩氛围:{}。", background.theme, background.color_mood), format!("前景提示:{}。", background.foreground_hints), format!("中景构图:{}。", background.midground_composition), format!("背景纵深:{}。", background.background_depth), format!("安全操作区:{}。", background.safe_play_area_hint), format!("出生边缘:{}。", background.spawn_edge_hint), format!("背景提示词种子:{}。", background.background_prompt_seed), "画面要求:竖屏 9:16,中央 70% 保持清爽可读,边缘有深海生态层次和微弱生物光,适合作为大鱼吃小鱼运行态背景。".to_string(), "不要出现 UI、文字、logo、水印、对话框、边框或巨大主体遮挡;不要把中央操作区画得过暗或过复杂。".to_string(), ] .join("") } fn require_big_fish_dashscope_settings( state: &AppState, ) -> Result { let base_url = state.config.dashscope_base_url.trim().trim_end_matches('/'); if base_url.is_empty() { return Err( AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "dashscope", "reason": "DASHSCOPE_BASE_URL 未配置", })), ); } let api_key = state .config .dashscope_api_key .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) .ok_or_else(|| { AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "dashscope", "reason": "DASHSCOPE_API_KEY 未配置", })) })?; Ok(BigFishDashScopeSettings { base_url: base_url.to_string(), api_key: api_key.to_string(), request_timeout_ms: state.config.dashscope_image_request_timeout_ms.max(1), }) } fn build_big_fish_dashscope_http_client( settings: &BigFishDashScopeSettings, ) -> Result { reqwest::Client::builder() .timeout(Duration::from_millis(settings.request_timeout_ms)) .build() .map_err(|error| { AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ "provider": "dashscope", "message": format!("构造 DashScope HTTP 客户端失败:{error}"), })) }) } async fn create_big_fish_text_to_image_generation( http_client: &reqwest::Client, settings: &BigFishDashScopeSettings, prompt: &str, negative_prompt: &str, size: &str, ) -> Result { let mut parameters = Map::from_iter([ ("n".to_string(), json!(1)), ("size".to_string(), Value::String(size.to_string())), ("prompt_extend".to_string(), Value::Bool(true)), ("watermark".to_string(), Value::Bool(false)), ]); if !negative_prompt.trim().is_empty() { parameters.insert( "negative_prompt".to_string(), Value::String(negative_prompt.trim().to_string()), ); } let response = http_client .post(format!( "{}/services/aigc/text2image/image-synthesis", settings.base_url )) .header( reqwest::header::AUTHORIZATION, format!("Bearer {}", settings.api_key), ) .header(reqwest::header::CONTENT_TYPE, "application/json") .header("X-DashScope-Async", "enable") .json(&json!({ "model": BIG_FISH_TEXT_TO_IMAGE_MODEL, "input": { "prompt": prompt, }, "parameters": parameters, })) .send() .await .map_err(|error| { map_big_fish_dashscope_request_error(format!("创建 Big Fish 图片生成任务失败:{error}")) })?; let status = response.status(); let response_text = response.text().await.map_err(|error| { map_big_fish_dashscope_request_error(format!("读取 Big Fish 图片生成响应失败:{error}")) })?; if !status.is_success() { return Err(map_big_fish_dashscope_upstream_error( response_text.as_str(), "创建 Big Fish 图片生成任务失败", )); } let payload = parse_big_fish_json_payload(response_text.as_str(), "解析 Big Fish 图片生成响应失败")?; let task_id = extract_big_fish_task_id(&payload).ok_or_else(|| { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": "Big Fish 图片生成任务未返回 task_id", })) })?; let deadline = Instant::now() + Duration::from_millis(settings.request_timeout_ms); while Instant::now() < deadline { let poll_response = http_client .get(format!("{}/tasks/{}", settings.base_url, task_id)) .header( reqwest::header::AUTHORIZATION, format!("Bearer {}", settings.api_key), ) .send() .await .map_err(|error| { map_big_fish_dashscope_request_error(format!( "查询 Big Fish 图片生成任务失败:{error}" )) })?; let poll_status = poll_response.status(); let poll_text = poll_response.text().await.map_err(|error| { map_big_fish_dashscope_request_error(format!( "读取 Big Fish 图片生成任务响应失败:{error}" )) })?; if !poll_status.is_success() { return Err(map_big_fish_dashscope_upstream_error( poll_text.as_str(), "查询 Big Fish 图片生成任务失败", )); } let poll_payload = parse_big_fish_json_payload(poll_text.as_str(), "解析 Big Fish 图片生成任务响应失败")?; let task_status = find_first_big_fish_string_by_key(&poll_payload, "task_status") .unwrap_or_default() .trim() .to_string(); if task_status == "SUCCEEDED" { let image_url = extract_big_fish_image_urls(&poll_payload) .into_iter() .next() .ok_or_else(|| { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": "Big Fish 图片生成成功但未返回图片地址", })) })?; return Ok(BigFishGeneratedImage { image_url, task_id }); } if matches!(task_status.as_str(), "FAILED" | "UNKNOWN") { return Err(map_big_fish_dashscope_upstream_error( poll_text.as_str(), "Big Fish 图片生成任务失败", )); } sleep(Duration::from_secs(2)).await; } Err( AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": "Big Fish 图片生成超时或未返回图片地址", })), ) } async fn download_big_fish_remote_image( http_client: &reqwest::Client, image_url: &str, fallback_message: &str, ) -> Result { let response = http_client.get(image_url).send().await.map_err(|error| { map_big_fish_dashscope_request_error(format!("{fallback_message}:{error}")) })?; let status = response.status(); let content_type = response .headers() .get(reqwest::header::CONTENT_TYPE) .and_then(|value| value.to_str().ok()) .unwrap_or("image/jpeg") .to_string(); let bytes = response.bytes().await.map_err(|error| { map_big_fish_dashscope_request_error(format!("{fallback_message}:{error}")) })?; if !status.is_success() { return Err( AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": fallback_message, "status": status.as_u16(), })), ); } let mime_type = normalize_big_fish_downloaded_image_mime_type(content_type.as_str()); Ok(BigFishDownloadedImage { extension: big_fish_mime_to_extension(mime_type.as_str()).to_string(), mime_type, bytes: bytes.to_vec(), }) } async fn persist_big_fish_formal_asset( state: &AppState, owner_user_id: &str, context: &BigFishFormalAssetContext, generated: BigFishGeneratedImage, downloaded: BigFishDownloadedImage, generated_at_micros: i64, ) -> Result { let oss_client = state.oss_client().ok_or_else(|| { AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "aliyun-oss", "reason": "OSS 未完成环境变量配置", })) })?; let http_client = reqwest::Client::new(); let put_result = oss_client .put_object( &http_client, OssPutObjectRequest { prefix: LegacyAssetPrefix::BigFishAssets, path_segments: context.path_segments.clone(), file_name: format!("image.{}", downloaded.extension), content_type: Some(downloaded.mime_type.clone()), access: OssObjectAccess::Private, metadata: build_big_fish_asset_metadata( context.asset_object_kind.as_str(), owner_user_id, BIG_FISH_ENTITY_KIND, context.entity_id.as_str(), context.binding_slot.as_str(), ), body: downloaded.bytes, }, ) .await .map_err(map_big_fish_asset_oss_error)?; let head = oss_client .head_object( &http_client, OssHeadObjectRequest { object_key: put_result.object_key.clone(), }, ) .await .map_err(map_big_fish_asset_oss_error)?; let asset_object = state .spacetime_client() .confirm_asset_object( build_asset_object_upsert_input( generate_asset_object_id(generated_at_micros), head.bucket, head.object_key, AssetObjectAccessPolicy::Private, head.content_type.or(Some(downloaded.mime_type)), head.content_length, head.etag, context.asset_object_kind.clone(), Some(generated.task_id), Some(owner_user_id.to_string()), None, Some(context.entity_id.clone()), generated_at_micros, ) .map_err(map_big_fish_asset_object_prepare_error)?, ) .await .map_err(map_big_fish_asset_spacetime_error)?; state .spacetime_client() .bind_asset_object_to_entity( build_asset_entity_binding_input( generate_asset_binding_id(generated_at_micros), asset_object.asset_object_id, BIG_FISH_ENTITY_KIND.to_string(), context.entity_id.clone(), context.binding_slot.clone(), context.asset_object_kind.clone(), Some(owner_user_id.to_string()), None, generated_at_micros, ) .map_err(map_big_fish_asset_binding_prepare_error)?, ) .await .map_err(map_big_fish_asset_spacetime_error)?; Ok(put_result.legacy_public_path) } fn build_big_fish_asset_metadata( asset_kind: &str, owner_user_id: &str, entity_kind: &str, entity_id: &str, slot: &str, ) -> BTreeMap { BTreeMap::from([ ("asset_kind".to_string(), asset_kind.to_string()), ("owner_user_id".to_string(), owner_user_id.to_string()), ("entity_kind".to_string(), entity_kind.to_string()), ("entity_id".to_string(), entity_id.to_string()), ("slot".to_string(), slot.to_string()), ]) } fn parse_big_fish_json_payload(raw_text: &str, fallback_message: &str) -> Result { serde_json::from_str::(raw_text).map_err(|error| { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": format!("{fallback_message}:{error}"), })) }) } fn extract_big_fish_task_id(payload: &Value) -> Option { find_first_big_fish_string_by_key(payload, "task_id") } fn extract_big_fish_image_urls(payload: &Value) -> Vec { let mut urls = Vec::new(); collect_big_fish_strings_by_key(payload, "image", &mut urls); collect_big_fish_strings_by_key(payload, "url", &mut urls); let mut deduped = Vec::new(); for url in urls { if !deduped.contains(&url) { deduped.push(url); } } deduped } fn find_first_big_fish_string_by_key(payload: &Value, target_key: &str) -> Option { let mut results = Vec::new(); collect_big_fish_strings_by_key(payload, target_key, &mut results); results.into_iter().next() } fn collect_big_fish_strings_by_key(payload: &Value, target_key: &str, results: &mut Vec) { match payload { Value::Array(entries) => { for entry in entries { collect_big_fish_strings_by_key(entry, target_key, results); } } Value::Object(object) => { for (key, value) in object { if key == target_key && let Some(text) = value.as_str() { results.push(text.to_string()); } collect_big_fish_strings_by_key(value, target_key, results); } } _ => {} } } fn normalize_big_fish_downloaded_image_mime_type(content_type: &str) -> String { let mime_type = content_type .split(';') .next() .map(str::trim) .unwrap_or("image/jpeg"); match mime_type { "image/png" | "image/webp" | "image/jpeg" | "image/jpg" | "image/gif" => { mime_type.to_string() } _ => "image/jpeg".to_string(), } } fn big_fish_mime_to_extension(mime_type: &str) -> &str { match mime_type { "image/png" => "png", "image/webp" => "webp", "image/gif" => "gif", _ => "jpg", } } fn map_big_fish_dashscope_request_error(message: String) -> AppError { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": message, })) } fn map_big_fish_dashscope_upstream_error(raw_text: &str, fallback_message: &str) -> AppError { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "dashscope", "message": parse_big_fish_api_error_message(raw_text, fallback_message), })) } fn parse_big_fish_api_error_message(raw_text: &str, fallback_message: &str) -> String { let trimmed = raw_text.trim(); if trimmed.is_empty() { return fallback_message.to_string(); } if let Ok(payload) = serde_json::from_str::(trimmed) && let Some(message) = find_first_big_fish_string_by_key(&payload, "message") .or_else(|| find_first_big_fish_string_by_key(&payload, "code")) { return message; } let excerpt = trimmed.chars().take(240).collect::(); format!("{fallback_message}:{excerpt}") } fn map_big_fish_asset_object_prepare_error(error: AssetObjectFieldError) -> AppError { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "asset-object", "message": error.to_string(), })) } fn map_big_fish_asset_binding_prepare_error(error: AssetObjectFieldError) -> AppError { AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "asset-entity-binding", "message": error.to_string(), })) } fn map_big_fish_asset_spacetime_error(error: SpacetimeClientError) -> AppError { AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "spacetimedb", "message": error.to_string(), })) } fn map_big_fish_asset_oss_error(error: platform_oss::OssError) -> AppError { let status = match error { platform_oss::OssError::InvalidConfig(_) | platform_oss::OssError::InvalidRequest(_) => { StatusCode::BAD_REQUEST } platform_oss::OssError::ObjectNotFound(_) => StatusCode::NOT_FOUND, platform_oss::OssError::Request(_) | platform_oss::OssError::SerializePolicy(_) | platform_oss::OssError::Sign(_) => StatusCode::BAD_GATEWAY, }; AppError::from_status(status).with_details(json!({ "provider": "aliyun-oss", "message": error.to_string(), })) } fn build_big_fish_level_part(level: Option) -> String { level .map(|value| format!("level-{value}")) .unwrap_or_else(|| "stage".to_string()) } fn sanitize_big_fish_path_segment(value: &str, fallback: &str) -> String { let sanitized = value .trim() .chars() .map(|ch| { if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' { ch } else { '-' } }) .collect::() .trim_matches('-') .to_string(); if sanitized.is_empty() { fallback.to_string() } else { sanitized } } fn ensure_non_empty( request_context: &RequestContext, value: &str, field_name: &str, ) -> Result<(), Response> { if value.trim().is_empty() { return Err(big_fish_bad_request( request_context, format!("{field_name} is required").as_str(), )); } Ok(()) } fn big_fish_bad_request(request_context: &RequestContext, message: &str) -> Response { big_fish_error_response( request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "big-fish", "message": message, })), ) } fn big_fish_sse_json_event_or_error(event_name: &str, payload: Value) -> Event { match serde_json::to_string(&payload) { Ok(payload_text) => Event::default().event(event_name).data(payload_text), Err(_) => big_fish_sse_error_event_message("SSE payload 序列化失败".to_string()), } } fn big_fish_sse_error_event_message(message: String) -> Event { let payload = format!( "{{\"message\":{}}}", serde_json::to_string(&message) .unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string()) ); Event::default().event("error").data(payload) } fn map_big_fish_client_error(error: SpacetimeClientError) -> AppError { let status = match &error { SpacetimeClientError::Procedure(message) if message.contains("big_fish_creation_session 不存在") || message.contains("big_fish_runtime_run 不存在") => { StatusCode::NOT_FOUND } SpacetimeClientError::Procedure(message) if message.contains("不能为空") || message.contains("尚未编译") || message.contains("不允许") || message.contains("非法") || message.contains("缺少") => { StatusCode::BAD_REQUEST } SpacetimeClientError::Runtime(_) => StatusCode::BAD_REQUEST, _ => StatusCode::BAD_GATEWAY, }; AppError::from_status(status).with_details(json!({ "provider": "spacetimedb", "message": error.to_string(), })) } fn big_fish_error_response(request_context: &RequestContext, error: AppError) -> Response { error.into_response_with_context(Some(request_context)) } fn current_utc_micros() -> i64 { let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("system clock should be after unix epoch"); i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64") } fn current_timestamp_micros_to_string(value: i64) -> String { format_timestamp_micros(value) }