Files
Genarrative/server-rs/crates/api-server/src/big_fish.rs
kdletters 615d828add
Some checks failed
CI / verify (push) Has been cancelled
fix: show published big fish works in gallery
2026-04-27 00:10:15 +08:00

1808 lines
65 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::{
collections::BTreeMap,
convert::Infallible,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use axum::{
Json,
extract::{Extension, Path, State, rejection::JsonRejection},
http::StatusCode,
response::{
IntoResponse, Response,
sse::{Event, Sse},
},
};
use module_assets::{
AssetObjectAccessPolicy, AssetObjectFieldError, build_asset_entity_binding_input,
build_asset_object_upsert_input, generate_asset_binding_id, generate_asset_object_id,
};
use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess, OssPutObjectRequest};
use serde_json::{Map, Value, json};
use shared_contracts::big_fish::{
BigFishActionResponse, BigFishAgentMessageResponse, BigFishAnchorItemResponse,
BigFishAnchorPackResponse, BigFishAssetCoverageResponse, BigFishAssetSlotResponse,
BigFishBackgroundBlueprintResponse, BigFishGameDraftResponse, BigFishLevelBlueprintResponse,
BigFishRunResponse, BigFishRuntimeEntityResponse, BigFishRuntimeParamsResponse,
BigFishRuntimeSnapshotResponse, BigFishSessionResponse, BigFishSessionSnapshotResponse,
BigFishVector2Response, CreateBigFishSessionRequest, ExecuteBigFishActionRequest,
SendBigFishMessageRequest, SubmitBigFishInputRequest,
};
use shared_contracts::big_fish_works::{BigFishWorkSummaryResponse, BigFishWorksResponse};
use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros};
use spacetime_client::{
BigFishAgentMessageRecord, BigFishAnchorItemRecord, BigFishAnchorPackRecord,
BigFishAssetCoverageRecord, BigFishAssetGenerateRecordInput, BigFishAssetSlotRecord,
BigFishBackgroundBlueprintRecord, BigFishGameDraftRecord, BigFishLevelBlueprintRecord,
BigFishMessageSubmitRecordInput, BigFishRunInputSubmitRecordInput, BigFishRunStartRecordInput,
BigFishRuntimeEntityRecord, BigFishRuntimeParamsRecord, BigFishRuntimeRecord,
BigFishSessionCreateRecordInput, BigFishSessionRecord, BigFishVector2Record,
BigFishWorkSummaryRecord, SpacetimeClientError,
};
use tokio::time::sleep;
use crate::big_fish_agent_turn::{
BigFishAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input,
run_big_fish_agent_turn,
};
use crate::{
ai_generation_drafts::{
AiGenerationDraftContext, AiGenerationDraftSink, AiGenerationDraftWriter,
},
api_response::json_success_body,
auth::AuthenticatedAccessToken,
http_error::AppError,
request_context::RequestContext,
state::AppState,
};
pub async fn create_big_fish_session(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
payload: Result<Json<CreateBigFishSessionRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = payload.map_err(|error| {
big_fish_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": error.body_text(),
})),
)
})?;
let seed_text = payload.seed_text.unwrap_or_default().trim().to_string();
let session = state
.spacetime_client()
.create_big_fish_session(BigFishSessionCreateRecordInput {
session_id: build_prefixed_uuid_id("big-fish-session-"),
owner_user_id: authenticated.claims().user_id().to_string(),
seed_text: seed_text.clone(),
welcome_message_id: build_prefixed_uuid_id("big-fish-message-"),
welcome_message_text: build_big_fish_welcome_text(&seed_text),
created_at_micros: current_utc_micros(),
})
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishSessionResponse {
session: map_big_fish_session_response(session),
},
))
}
pub async fn get_big_fish_session(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let session = state
.spacetime_client()
.get_big_fish_session(session_id, authenticated.claims().user_id().to_string())
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishSessionResponse {
session: map_big_fish_session_response(session),
},
))
}
pub async fn get_big_fish_works(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
let items = state
.spacetime_client()
.list_big_fish_works(authenticated.claims().user_id().to_string())
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishWorksResponse {
items: items
.into_iter()
.map(map_big_fish_work_summary_response)
.collect(),
},
))
}
pub async fn list_big_fish_gallery(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
) -> Result<Json<Value>, Response> {
let items = state
.spacetime_client()
.list_big_fish_gallery()
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishWorksResponse {
items: items
.into_iter()
.map(map_big_fish_work_summary_response)
.collect(),
},
))
}
pub async fn delete_big_fish_work(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let items = state
.spacetime_client()
.delete_big_fish_work(session_id, authenticated.claims().user_id().to_string())
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishWorksResponse {
items: items
.into_iter()
.map(map_big_fish_work_summary_response)
.collect(),
},
))
}
pub async fn submit_big_fish_message(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
payload: Result<Json<SendBigFishMessageRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = payload.map_err(|error| {
big_fish_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": error.body_text(),
})),
)
})?;
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let client_message_id = payload.client_message_id.trim().to_string();
let message_text = payload.text.trim().to_string();
if client_message_id.is_empty() || message_text.is_empty() {
return Err(big_fish_bad_request(
&request_context,
"clientMessageId and text are required",
));
}
let owner_user_id = authenticated.claims().user_id().to_string();
let submitted_session = state
.spacetime_client()
.submit_big_fish_message(BigFishMessageSubmitRecordInput {
session_id: session_id.clone(),
owner_user_id: owner_user_id.clone(),
user_message_id: client_message_id,
user_message_text: message_text,
assistant_message_id: build_prefixed_uuid_id("big-fish-message-"),
submitted_at_micros: current_utc_micros(),
})
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new(
"big_fish",
owner_user_id.as_str(),
session_id.as_str(),
payload.client_message_id.as_str(),
"大鱼吃小鱼模板生成草稿",
));
if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await {
tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行");
}
let draft_sink = AiGenerationDraftSink::new(
AiGenerationDraftContext::new(
"big_fish",
owner_user_id.as_str(),
session_id.as_str(),
payload.client_message_id.as_str(),
"大鱼吃小鱼模板生成草稿",
),
state.spacetime_client().clone(),
);
let turn_result = run_big_fish_agent_turn(
BigFishAgentTurnRequest {
llm_client: state.llm_client(),
session: &submitted_session,
quick_fill_requested: payload.quick_fill_requested.unwrap_or(false),
},
move |text| {
draft_sink.persist_visible_text_async(text);
},
)
.await;
if let Ok(result) = &turn_result {
draft_writer
.persist_visible_text(
state.spacetime_client(),
result.assistant_reply_text.as_str(),
)
.await;
}
let finalize_input = match turn_result {
Ok(turn_result) => build_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
build_prefixed_uuid_id("big-fish-message-"),
turn_result,
current_utc_micros(),
),
Err(error) => build_failed_finalize_record_input(
session_id.clone(),
owner_user_id.clone(),
&submitted_session,
error.to_string(),
current_utc_micros(),
),
};
let session = state
.spacetime_client()
.finalize_big_fish_agent_message(finalize_input)
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishSessionResponse {
session: map_big_fish_session_response(session),
},
))
}
pub async fn stream_big_fish_message(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
payload: Result<Json<SendBigFishMessageRequest>, JsonRejection>,
) -> Result<Response, Response> {
let Json(payload) = payload.map_err(|error| {
big_fish_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": error.body_text(),
})),
)
})?;
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let client_message_id = payload.client_message_id.trim().to_string();
let message_text = payload.text.trim().to_string();
if client_message_id.is_empty() || message_text.is_empty() {
return Err(big_fish_bad_request(
&request_context,
"clientMessageId and text are required",
));
}
let owner_user_id = authenticated.claims().user_id().to_string();
let submitted_session = state
.spacetime_client()
.submit_big_fish_message(BigFishMessageSubmitRecordInput {
session_id: session_id.clone(),
owner_user_id: owner_user_id.clone(),
user_message_id: client_message_id,
user_message_text: message_text,
assistant_message_id: build_prefixed_uuid_id("big-fish-message-"),
submitted_at_micros: current_utc_micros(),
})
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false);
let state = state.clone();
let session_id_for_stream = session_id.clone();
let owner_user_id_for_stream = owner_user_id.clone();
let client_message_id_for_stream = payload.client_message_id.clone();
let stream = async_stream::stream! {
let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new(
"big_fish",
owner_user_id_for_stream.as_str(),
session_id_for_stream.as_str(),
client_message_id_for_stream.as_str(),
"大鱼吃小鱼模板生成草稿",
));
if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await {
tracing::warn!(error = %error, "大鱼吃小鱼模板生成草稿任务启动失败,主生成流程继续执行");
}
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
// 与 RPG/拼图 Agent 保持同一语义回复先流式展示session 真相仍等 finalize 后下发。
let turn_result = {
let run_turn = run_big_fish_agent_turn(
BigFishAgentTurnRequest {
llm_client: state.llm_client(),
session: &submitted_session,
quick_fill_requested,
},
move |text| {
let _ = reply_tx.send(text.to_string());
},
);
tokio::pin!(run_turn);
loop {
// 每个 replyText 增量同时写草稿表并推给 SSE避免前端等待完整模型响应。
tokio::select! {
result = &mut run_turn => break result,
maybe_text = reply_rx.recv() => {
if let Some(text) = maybe_text {
draft_writer
.persist_visible_text(state.spacetime_client(), text.as_str())
.await;
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"reply_delta",
json!({ "text": text }),
));
}
}
}
}
};
while let Some(text) = reply_rx.recv().await {
draft_writer
.persist_visible_text(state.spacetime_client(), text.as_str())
.await;
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"reply_delta",
json!({ "text": text }),
));
}
let finalize_input = match turn_result {
Ok(turn_result) => build_finalize_record_input(
session_id_for_stream.clone(),
owner_user_id_for_stream.clone(),
build_prefixed_uuid_id("big-fish-message-"),
turn_result,
current_utc_micros(),
),
Err(error) => build_failed_finalize_record_input(
session_id_for_stream.clone(),
owner_user_id_for_stream.clone(),
&submitted_session,
error.to_string(),
current_utc_micros(),
),
};
let session = match state
.spacetime_client()
.finalize_big_fish_agent_message(finalize_input)
.await
{
Ok(session) => session,
Err(error) => {
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"error",
json!({ "message": error.to_string() }),
));
return;
}
};
let session_response = map_big_fish_session_response(session);
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"session",
json!({ "session": session_response }),
));
yield Ok::<Event, Infallible>(big_fish_sse_json_event_or_error(
"done",
json!({ "ok": true }),
));
};
Ok(Sse::new(stream).into_response())
}
pub async fn execute_big_fish_action(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
payload: Result<Json<ExecuteBigFishActionRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = payload.map_err(|error| {
big_fish_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": error.body_text(),
})),
)
})?;
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let owner_user_id = authenticated.claims().user_id().to_string();
let now = current_utc_micros();
let session = match payload.action.trim() {
"big_fish_compile_draft" => {
compile_big_fish_draft_with_all_assets(&state, session_id, owner_user_id, now).await
}
"big_fish_generate_level_main_image" => {
let asset_url = generate_big_fish_formal_asset(
&state,
&owner_user_id,
&session_id,
"level_main_image",
payload.level,
None,
now,
)
.await
.map_err(|error| big_fish_error_response(&request_context, error))?;
state
.spacetime_client()
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
session_id,
owner_user_id,
asset_kind: "level_main_image".to_string(),
level: payload.level,
motion_key: None,
asset_url: Some(asset_url),
generated_at_micros: now,
})
.await
}
"big_fish_generate_level_motion" => {
let asset_url = generate_big_fish_formal_asset(
&state,
&owner_user_id,
&session_id,
"level_motion",
payload.level,
payload.motion_key.as_deref(),
now,
)
.await
.map_err(|error| big_fish_error_response(&request_context, error))?;
state
.spacetime_client()
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
session_id,
owner_user_id,
asset_kind: "level_motion".to_string(),
level: payload.level,
motion_key: payload.motion_key,
asset_url: Some(asset_url),
generated_at_micros: now,
})
.await
}
"big_fish_generate_stage_background" => {
let asset_url = generate_big_fish_formal_asset(
&state,
&owner_user_id,
&session_id,
"stage_background",
None,
None,
now,
)
.await
.map_err(|error| big_fish_error_response(&request_context, error))?;
state
.spacetime_client()
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
session_id,
owner_user_id,
asset_kind: "stage_background".to_string(),
level: None,
motion_key: None,
asset_url: Some(asset_url),
generated_at_micros: now,
})
.await
}
"big_fish_publish_game" => {
state
.spacetime_client()
.publish_big_fish_game(session_id, owner_user_id, now)
.await
}
other => {
return Err(big_fish_bad_request(
&request_context,
format!("action `{other}` is not supported").as_str(),
));
}
}
.map_err(|error| big_fish_error_response(&request_context, map_big_fish_client_error(error)))?;
Ok(json_success_body(
Some(&request_context),
BigFishActionResponse {
session: map_big_fish_session_response(session),
},
))
}
pub async fn start_big_fish_run(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &session_id, "sessionId")?;
let run = state
.spacetime_client()
.start_big_fish_run(BigFishRunStartRecordInput {
run_id: build_prefixed_uuid_id("big-fish-run-"),
session_id,
owner_user_id: authenticated.claims().user_id().to_string(),
started_at_micros: current_utc_micros(),
})
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishRunResponse {
run: map_big_fish_runtime_response(run),
},
))
}
pub async fn get_big_fish_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &run_id, "runId")?;
let run = state
.spacetime_client()
.get_big_fish_run(run_id, authenticated.claims().user_id().to_string())
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishRunResponse {
run: map_big_fish_runtime_response(run),
},
))
}
pub async fn submit_big_fish_input(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
payload: Result<Json<SubmitBigFishInputRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = payload.map_err(|error| {
big_fish_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": error.body_text(),
})),
)
})?;
ensure_non_empty(&request_context, &run_id, "runId")?;
let run = state
.spacetime_client()
.submit_big_fish_input(BigFishRunInputSubmitRecordInput {
run_id,
owner_user_id: authenticated.claims().user_id().to_string(),
input_x: payload.x,
input_y: payload.y,
submitted_at_micros: current_utc_micros(),
})
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
})?;
Ok(json_success_body(
Some(&request_context),
BigFishRunResponse {
run: map_big_fish_runtime_response(run),
},
))
}
fn map_big_fish_session_response(session: BigFishSessionRecord) -> BigFishSessionSnapshotResponse {
BigFishSessionSnapshotResponse {
session_id: session.session_id,
current_turn: session.current_turn,
progress_percent: session.progress_percent,
stage: session.stage,
anchor_pack: map_big_fish_anchor_pack_response(session.anchor_pack),
draft: session.draft.map(map_big_fish_draft_response),
asset_slots: session
.asset_slots
.into_iter()
.map(map_big_fish_asset_slot_response)
.collect(),
asset_coverage: map_big_fish_asset_coverage_response(session.asset_coverage),
messages: session
.messages
.into_iter()
.map(map_big_fish_agent_message_response)
.collect(),
last_assistant_reply: session.last_assistant_reply,
publish_ready: session.publish_ready,
updated_at: session.updated_at,
}
}
fn map_big_fish_anchor_pack_response(
anchor_pack: BigFishAnchorPackRecord,
) -> BigFishAnchorPackResponse {
BigFishAnchorPackResponse {
gameplay_promise: map_big_fish_anchor_item_response(anchor_pack.gameplay_promise),
ecology_visual_theme: map_big_fish_anchor_item_response(anchor_pack.ecology_visual_theme),
growth_ladder: map_big_fish_anchor_item_response(anchor_pack.growth_ladder),
risk_tempo: map_big_fish_anchor_item_response(anchor_pack.risk_tempo),
}
}
fn map_big_fish_anchor_item_response(anchor: BigFishAnchorItemRecord) -> BigFishAnchorItemResponse {
BigFishAnchorItemResponse {
key: anchor.key,
label: anchor.label,
value: anchor.value,
status: anchor.status,
}
}
fn map_big_fish_draft_response(draft: BigFishGameDraftRecord) -> BigFishGameDraftResponse {
BigFishGameDraftResponse {
title: draft.title,
subtitle: draft.subtitle,
core_fun: draft.core_fun,
ecology_theme: draft.ecology_theme,
levels: draft
.levels
.into_iter()
.map(map_big_fish_level_response)
.collect(),
background: map_big_fish_background_response(draft.background),
runtime_params: map_big_fish_runtime_params_response(draft.runtime_params),
}
}
fn map_big_fish_level_response(
level: BigFishLevelBlueprintRecord,
) -> BigFishLevelBlueprintResponse {
BigFishLevelBlueprintResponse {
level: level.level,
name: level.name,
one_line_fantasy: level.one_line_fantasy,
silhouette_direction: level.silhouette_direction,
size_ratio: level.size_ratio,
visual_prompt_seed: level.visual_prompt_seed,
motion_prompt_seed: level.motion_prompt_seed,
merge_source_level: level.merge_source_level,
prey_window: level.prey_window,
threat_window: level.threat_window,
is_final_level: level.is_final_level,
}
}
fn map_big_fish_background_response(
background: BigFishBackgroundBlueprintRecord,
) -> BigFishBackgroundBlueprintResponse {
BigFishBackgroundBlueprintResponse {
theme: background.theme,
color_mood: background.color_mood,
foreground_hints: background.foreground_hints,
midground_composition: background.midground_composition,
background_depth: background.background_depth,
safe_play_area_hint: background.safe_play_area_hint,
spawn_edge_hint: background.spawn_edge_hint,
background_prompt_seed: background.background_prompt_seed,
}
}
fn map_big_fish_runtime_params_response(
params: BigFishRuntimeParamsRecord,
) -> BigFishRuntimeParamsResponse {
BigFishRuntimeParamsResponse {
level_count: params.level_count,
merge_count_per_upgrade: params.merge_count_per_upgrade,
spawn_target_count: params.spawn_target_count,
leader_move_speed: params.leader_move_speed,
follower_catch_up_speed: params.follower_catch_up_speed,
offscreen_cull_seconds: params.offscreen_cull_seconds,
prey_spawn_delta_levels: params.prey_spawn_delta_levels,
threat_spawn_delta_levels: params.threat_spawn_delta_levels,
win_level: params.win_level,
}
}
fn map_big_fish_asset_slot_response(slot: BigFishAssetSlotRecord) -> BigFishAssetSlotResponse {
BigFishAssetSlotResponse {
slot_id: slot.slot_id,
asset_kind: slot.asset_kind,
level: slot.level,
motion_key: slot.motion_key,
status: slot.status,
asset_url: slot.asset_url,
prompt_snapshot: slot.prompt_snapshot,
updated_at: slot.updated_at,
}
}
fn map_big_fish_asset_coverage_response(
coverage: BigFishAssetCoverageRecord,
) -> BigFishAssetCoverageResponse {
BigFishAssetCoverageResponse {
level_main_image_ready_count: coverage.level_main_image_ready_count,
level_motion_ready_count: coverage.level_motion_ready_count,
background_ready: coverage.background_ready,
required_level_count: coverage.required_level_count,
publish_ready: coverage.publish_ready,
blockers: coverage.blockers,
}
}
async fn compile_big_fish_draft_with_all_assets(
state: &AppState,
session_id: String,
owner_user_id: String,
now: i64,
) -> Result<BigFishSessionRecord, SpacetimeClientError> {
let session = state
.spacetime_client()
.compile_big_fish_draft(session_id.clone(), owner_user_id.clone(), now)
.await?;
let draft = session
.draft
.clone()
.ok_or_else(|| SpacetimeClientError::Runtime("大鱼吃小鱼玩法草稿尚未生成".to_string()))?;
// 点击生成草稿时一次性生成所有首版玩法资产,前端只负责展示进度和最终 session。
for level in &draft.levels {
let asset_url = generate_big_fish_formal_asset(
state,
&owner_user_id,
&session_id,
"level_main_image",
Some(level.level),
None,
current_utc_micros(),
)
.await
.map_err(|error| SpacetimeClientError::Runtime(error.message().to_string()))?;
state
.spacetime_client()
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
session_id: session_id.clone(),
owner_user_id: owner_user_id.clone(),
asset_kind: "level_main_image".to_string(),
level: Some(level.level),
motion_key: None,
asset_url: Some(asset_url),
generated_at_micros: current_utc_micros(),
})
.await?;
}
for level in &draft.levels {
for motion_key in ["idle_float", "move_swim"] {
let asset_url = generate_big_fish_formal_asset(
state,
&owner_user_id,
&session_id,
"level_motion",
Some(level.level),
Some(motion_key),
current_utc_micros(),
)
.await
.map_err(|error| SpacetimeClientError::Runtime(error.message().to_string()))?;
state
.spacetime_client()
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
session_id: session_id.clone(),
owner_user_id: owner_user_id.clone(),
asset_kind: "level_motion".to_string(),
level: Some(level.level),
motion_key: Some(motion_key.to_string()),
asset_url: Some(asset_url),
generated_at_micros: current_utc_micros(),
})
.await?;
}
}
let asset_url = generate_big_fish_formal_asset(
state,
&owner_user_id,
&session_id,
"stage_background",
None,
None,
current_utc_micros(),
)
.await
.map_err(|error| SpacetimeClientError::Runtime(error.message().to_string()))?;
state
.spacetime_client()
.generate_big_fish_asset(BigFishAssetGenerateRecordInput {
session_id,
owner_user_id,
asset_kind: "stage_background".to_string(),
level: None,
motion_key: None,
asset_url: Some(asset_url),
generated_at_micros: current_utc_micros(),
})
.await
}
fn map_big_fish_agent_message_response(
message: BigFishAgentMessageRecord,
) -> BigFishAgentMessageResponse {
BigFishAgentMessageResponse {
id: message.message_id,
role: message.role,
kind: message.kind,
text: message.text,
created_at: message.created_at,
}
}
fn map_big_fish_runtime_response(run: BigFishRuntimeRecord) -> BigFishRuntimeSnapshotResponse {
BigFishRuntimeSnapshotResponse {
run_id: run.run_id,
session_id: run.session_id,
status: run.status,
tick: run.tick,
player_level: run.player_level,
win_level: run.win_level,
leader_entity_id: run.leader_entity_id,
owned_entities: run
.owned_entities
.into_iter()
.map(map_big_fish_entity_response)
.collect(),
wild_entities: run
.wild_entities
.into_iter()
.map(map_big_fish_entity_response)
.collect(),
camera_center: map_big_fish_vector_response(run.camera_center),
last_input: map_big_fish_vector_response(run.last_input),
event_log: run.event_log,
updated_at: run.updated_at,
}
}
fn map_big_fish_work_summary_response(
item: BigFishWorkSummaryRecord,
) -> BigFishWorkSummaryResponse {
BigFishWorkSummaryResponse {
work_id: item.work_id,
source_session_id: item.source_session_id,
owner_user_id: item.owner_user_id,
title: item.title,
subtitle: item.subtitle,
summary: item.summary,
cover_image_src: item.cover_image_src,
status: item.status,
updated_at: current_timestamp_micros_to_string(item.updated_at_micros),
publish_ready: item.publish_ready,
level_count: item.level_count,
level_main_image_ready_count: item.level_main_image_ready_count,
level_motion_ready_count: item.level_motion_ready_count,
background_ready: item.background_ready,
}
}
fn map_big_fish_entity_response(
entity: BigFishRuntimeEntityRecord,
) -> BigFishRuntimeEntityResponse {
BigFishRuntimeEntityResponse {
entity_id: entity.entity_id,
level: entity.level,
position: map_big_fish_vector_response(entity.position),
radius: entity.radius,
offscreen_seconds: entity.offscreen_seconds,
}
}
fn map_big_fish_vector_response(vector: BigFishVector2Record) -> BigFishVector2Response {
BigFishVector2Response {
x: vector.x,
y: vector.y,
}
}
fn build_big_fish_welcome_text(seed_text: &str) -> String {
if seed_text.trim().is_empty() {
return "我会先帮你确定大鱼吃小鱼的核心锚点。可以从主题生态、成长阶梯或风险节奏开始。"
.to_string();
}
"我已经收到你的玩法起点,会先把它整理成锚点并准备结果页草稿。".to_string()
}
struct BigFishDashScopeSettings {
base_url: String,
api_key: String,
request_timeout_ms: u64,
}
struct BigFishGeneratedImage {
image_url: String,
task_id: String,
}
struct BigFishDownloadedImage {
mime_type: String,
extension: String,
bytes: Vec<u8>,
}
struct BigFishFormalAssetContext {
entity_id: String,
prompt: String,
negative_prompt: String,
size: String,
asset_object_kind: String,
binding_slot: String,
path_segments: Vec<String>,
}
const BIG_FISH_TEXT_TO_IMAGE_MODEL: &str = "wan2.2-t2i-flash";
const BIG_FISH_ENTITY_KIND: &str = "big_fish_session";
const BIG_FISH_DEFAULT_NEGATIVE_PROMPT: &str = "文字水印logoUI界面对话框边框多余肢体畸形鱼体低清晰度模糊压缩噪点现代摄影棚写实照片背景";
async fn generate_big_fish_formal_asset(
state: &AppState,
owner_user_id: &str,
session_id: &str,
asset_kind: &str,
level: Option<u32>,
motion_key: Option<&str>,
generated_at_micros: i64,
) -> Result<String, AppError> {
let session = state
.spacetime_client()
.get_big_fish_session(session_id.to_string(), owner_user_id.to_string())
.await
.map_err(map_big_fish_client_error)?;
let draft = session.draft.as_ref().ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": "玩法草稿尚未编译,不能生成正式图片。",
}))
})?;
let context = build_big_fish_formal_asset_context(
&session,
draft,
asset_kind,
level,
motion_key,
generated_at_micros,
)?;
let settings = require_big_fish_dashscope_settings(state)?;
let http_client = build_big_fish_dashscope_http_client(&settings)?;
let generated = create_big_fish_text_to_image_generation(
&http_client,
&settings,
context.prompt.as_str(),
context.negative_prompt.as_str(),
context.size.as_str(),
)
.await?;
let downloaded = download_big_fish_remote_image(
&http_client,
generated.image_url.as_str(),
"下载 Big Fish 正式图片失败",
)
.await?;
persist_big_fish_formal_asset(
state,
owner_user_id,
&context,
generated,
downloaded,
generated_at_micros,
)
.await
}
fn build_big_fish_formal_asset_context(
session: &BigFishSessionRecord,
draft: &BigFishGameDraftRecord,
asset_kind: &str,
level: Option<u32>,
motion_key: Option<&str>,
generated_at_micros: i64,
) -> Result<BigFishFormalAssetContext, AppError> {
let asset_id = format!("asset-{generated_at_micros}");
match asset_kind {
"level_main_image" => {
let level = find_big_fish_level_blueprint(draft, level)?;
let level_part = build_big_fish_level_part(Some(level.level));
Ok(BigFishFormalAssetContext {
entity_id: session.session_id.clone(),
prompt: build_big_fish_level_main_image_prompt(draft, level),
negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(),
size: "1024*1024".to_string(),
asset_object_kind: "big_fish_level_main_image".to_string(),
binding_slot: format!("level_main_image:{level_part}"),
path_segments: vec![
sanitize_big_fish_path_segment(session.session_id.as_str(), "session"),
"level-main-image".to_string(),
level_part,
asset_id,
],
})
}
"level_motion" => {
let level = find_big_fish_level_blueprint(draft, level)?;
let motion_key = motion_key
.map(str::trim)
.filter(|value| matches!(*value, "idle_float" | "move_swim"))
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": "motionKey 必须是 idle_float 或 move_swim。",
}))
})?;
let level_part = build_big_fish_level_part(Some(level.level));
Ok(BigFishFormalAssetContext {
entity_id: session.session_id.clone(),
prompt: build_big_fish_level_motion_prompt(draft, level, motion_key),
negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(),
size: "1024*1024".to_string(),
asset_object_kind: "big_fish_level_motion".to_string(),
binding_slot: format!("level_motion:{level_part}:{motion_key}"),
path_segments: vec![
sanitize_big_fish_path_segment(session.session_id.as_str(), "session"),
"level-motion".to_string(),
level_part,
sanitize_big_fish_path_segment(motion_key, "motion"),
asset_id,
],
})
}
"stage_background" => Ok(BigFishFormalAssetContext {
entity_id: session.session_id.clone(),
prompt: build_big_fish_stage_background_prompt(draft),
negative_prompt: BIG_FISH_DEFAULT_NEGATIVE_PROMPT.to_string(),
size: "720*1280".to_string(),
asset_object_kind: "big_fish_stage_background".to_string(),
binding_slot: "stage_background".to_string(),
path_segments: vec![
sanitize_big_fish_path_segment(session.session_id.as_str(), "session"),
"stage-background".to_string(),
asset_id,
],
}),
_ => Err(
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": format!("assetKind `{asset_kind}` 不支持正式图片生成。"),
})),
),
}
}
fn find_big_fish_level_blueprint(
draft: &BigFishGameDraftRecord,
level: Option<u32>,
) -> Result<&BigFishLevelBlueprintRecord, AppError> {
let level = level.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": "level 是等级资产生成的必填项。",
}))
})?;
draft
.levels
.iter()
.find(|blueprint| blueprint.level == level)
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": format!("level `{level}` 不存在于当前 Big Fish 草稿。"),
}))
})
}
fn build_big_fish_level_main_image_prompt(
draft: &BigFishGameDraftRecord,
level: &BigFishLevelBlueprintRecord,
) -> String {
vec![
format!(
"为竖屏移动游戏《{}》生成一张等级生物主图。",
draft.title
),
format!(
"生态主题:{}。核心乐趣:{}",
draft.ecology_theme, draft.core_fun
),
format!(
"等级Lv.{},名称:{},幻想描述:{}",
level.level, level.name, level.one_line_fantasy
),
format!("轮廓方向:{}", level.silhouette_direction),
format!("视觉提示词种子:{}", level.visual_prompt_seed),
"画面要求单体游戏生物完整入镜轮廓清晰适合作为大鱼吃小鱼等级角色主图2D 高完成度游戏插画,深海发光质感,中央构图。".to_string(),
"不要出现 UI、文字、logo、水印、对话框或边框背景保持干净的深海渐变或透明感不要出现多只主体。".to_string(),
]
.join("")
}
fn build_big_fish_level_motion_prompt(
draft: &BigFishGameDraftRecord,
level: &BigFishLevelBlueprintRecord,
motion_key: &str,
) -> String {
let motion_text = match motion_key {
"move_swim" => "向右游动的关键帧预览,身体与尾鳍有清晰推进姿态,带轻微水流拖尾。",
_ => "待机漂浮的关键帧预览,身体轻微摆动,姿态稳定,适合作为 idle 状态。",
};
vec![
format!(
"为竖屏移动游戏《{}》生成一张等级生物动作关键帧静态预览图。",
draft.title
),
format!("生态主题:{}", draft.ecology_theme),
format!(
"等级Lv.{},名称:{},幻想描述:{}",
level.level, level.name, level.one_line_fantasy
),
format!("动作提示词种子:{}", level.motion_prompt_seed),
format!("动作要求:{motion_text}"),
"画面要求单体生物完整入镜轮廓清晰动作方向明确2D 高完成度游戏插画,适合作为 Big Fish 动作槽位的静态 keyframe。".to_string(),
"不要出现 UI、文字、logo、水印、对话框或边框不要生成序列帧拼图不要出现多只主体。".to_string(),
]
.join("")
}
fn build_big_fish_stage_background_prompt(draft: &BigFishGameDraftRecord) -> String {
let background = &draft.background;
vec![
format!(
"为竖屏移动游戏《{}》生成一张 9:16 全屏活动区域背景。",
draft.title
),
format!("生态主题:{}", draft.ecology_theme),
format!("背景主题:{}。色彩氛围:{}", background.theme, background.color_mood),
format!("前景提示:{}", background.foreground_hints),
format!("中景构图:{}", background.midground_composition),
format!("背景纵深:{}", background.background_depth),
format!("安全操作区:{}", background.safe_play_area_hint),
format!("出生边缘:{}", background.spawn_edge_hint),
format!("背景提示词种子:{}", background.background_prompt_seed),
"画面要求:竖屏 9:16中央 70% 保持清爽可读,边缘有深海生态层次和微弱生物光,适合作为大鱼吃小鱼运行态背景。".to_string(),
"不要出现 UI、文字、logo、水印、对话框、边框或巨大主体遮挡不要把中央操作区画得过暗或过复杂。".to_string(),
]
.join("")
}
fn require_big_fish_dashscope_settings(
state: &AppState,
) -> Result<BigFishDashScopeSettings, AppError> {
let base_url = state.config.dashscope_base_url.trim().trim_end_matches('/');
if base_url.is_empty() {
return Err(
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({
"provider": "dashscope",
"reason": "DASHSCOPE_BASE_URL 未配置",
})),
);
}
let api_key = state
.config
.dashscope_api_key
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({
"provider": "dashscope",
"reason": "DASHSCOPE_API_KEY 未配置",
}))
})?;
Ok(BigFishDashScopeSettings {
base_url: base_url.to_string(),
api_key: api_key.to_string(),
request_timeout_ms: state.config.dashscope_image_request_timeout_ms.max(1),
})
}
fn build_big_fish_dashscope_http_client(
settings: &BigFishDashScopeSettings,
) -> Result<reqwest::Client, AppError> {
reqwest::Client::builder()
.timeout(Duration::from_millis(settings.request_timeout_ms))
.build()
.map_err(|error| {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({
"provider": "dashscope",
"message": format!("构造 DashScope HTTP 客户端失败:{error}"),
}))
})
}
async fn create_big_fish_text_to_image_generation(
http_client: &reqwest::Client,
settings: &BigFishDashScopeSettings,
prompt: &str,
negative_prompt: &str,
size: &str,
) -> Result<BigFishGeneratedImage, AppError> {
let mut parameters = Map::from_iter([
("n".to_string(), json!(1)),
("size".to_string(), Value::String(size.to_string())),
("prompt_extend".to_string(), Value::Bool(true)),
("watermark".to_string(), Value::Bool(false)),
]);
if !negative_prompt.trim().is_empty() {
parameters.insert(
"negative_prompt".to_string(),
Value::String(negative_prompt.trim().to_string()),
);
}
let response = http_client
.post(format!(
"{}/services/aigc/text2image/image-synthesis",
settings.base_url
))
.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {}", settings.api_key),
)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header("X-DashScope-Async", "enable")
.json(&json!({
"model": BIG_FISH_TEXT_TO_IMAGE_MODEL,
"input": {
"prompt": prompt,
},
"parameters": parameters,
}))
.send()
.await
.map_err(|error| {
map_big_fish_dashscope_request_error(format!("创建 Big Fish 图片生成任务失败:{error}"))
})?;
let status = response.status();
let response_text = response.text().await.map_err(|error| {
map_big_fish_dashscope_request_error(format!("读取 Big Fish 图片生成响应失败:{error}"))
})?;
if !status.is_success() {
return Err(map_big_fish_dashscope_upstream_error(
response_text.as_str(),
"创建 Big Fish 图片生成任务失败",
));
}
let payload =
parse_big_fish_json_payload(response_text.as_str(), "解析 Big Fish 图片生成响应失败")?;
let task_id = extract_big_fish_task_id(&payload).ok_or_else(|| {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": "Big Fish 图片生成任务未返回 task_id",
}))
})?;
let deadline = Instant::now() + Duration::from_millis(settings.request_timeout_ms);
while Instant::now() < deadline {
let poll_response = http_client
.get(format!("{}/tasks/{}", settings.base_url, task_id))
.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {}", settings.api_key),
)
.send()
.await
.map_err(|error| {
map_big_fish_dashscope_request_error(format!(
"查询 Big Fish 图片生成任务失败:{error}"
))
})?;
let poll_status = poll_response.status();
let poll_text = poll_response.text().await.map_err(|error| {
map_big_fish_dashscope_request_error(format!(
"读取 Big Fish 图片生成任务响应失败:{error}"
))
})?;
if !poll_status.is_success() {
return Err(map_big_fish_dashscope_upstream_error(
poll_text.as_str(),
"查询 Big Fish 图片生成任务失败",
));
}
let poll_payload =
parse_big_fish_json_payload(poll_text.as_str(), "解析 Big Fish 图片生成任务响应失败")?;
let task_status = find_first_big_fish_string_by_key(&poll_payload, "task_status")
.unwrap_or_default()
.trim()
.to_string();
if task_status == "SUCCEEDED" {
let image_url = extract_big_fish_image_urls(&poll_payload)
.into_iter()
.next()
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": "Big Fish 图片生成成功但未返回图片地址",
}))
})?;
return Ok(BigFishGeneratedImage { image_url, task_id });
}
if matches!(task_status.as_str(), "FAILED" | "UNKNOWN") {
return Err(map_big_fish_dashscope_upstream_error(
poll_text.as_str(),
"Big Fish 图片生成任务失败",
));
}
sleep(Duration::from_secs(2)).await;
}
Err(
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": "Big Fish 图片生成超时或未返回图片地址",
})),
)
}
async fn download_big_fish_remote_image(
http_client: &reqwest::Client,
image_url: &str,
fallback_message: &str,
) -> Result<BigFishDownloadedImage, AppError> {
let response = http_client.get(image_url).send().await.map_err(|error| {
map_big_fish_dashscope_request_error(format!("{fallback_message}{error}"))
})?;
let status = response.status();
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or("image/jpeg")
.to_string();
let bytes = response.bytes().await.map_err(|error| {
map_big_fish_dashscope_request_error(format!("{fallback_message}{error}"))
})?;
if !status.is_success() {
return Err(
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": fallback_message,
"status": status.as_u16(),
})),
);
}
let mime_type = normalize_big_fish_downloaded_image_mime_type(content_type.as_str());
Ok(BigFishDownloadedImage {
extension: big_fish_mime_to_extension(mime_type.as_str()).to_string(),
mime_type,
bytes: bytes.to_vec(),
})
}
async fn persist_big_fish_formal_asset(
state: &AppState,
owner_user_id: &str,
context: &BigFishFormalAssetContext,
generated: BigFishGeneratedImage,
downloaded: BigFishDownloadedImage,
generated_at_micros: i64,
) -> Result<String, AppError> {
let oss_client = state.oss_client().ok_or_else(|| {
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({
"provider": "aliyun-oss",
"reason": "OSS 未完成环境变量配置",
}))
})?;
let http_client = reqwest::Client::new();
let put_result = oss_client
.put_object(
&http_client,
OssPutObjectRequest {
prefix: LegacyAssetPrefix::BigFishAssets,
path_segments: context.path_segments.clone(),
file_name: format!("image.{}", downloaded.extension),
content_type: Some(downloaded.mime_type.clone()),
access: OssObjectAccess::Private,
metadata: build_big_fish_asset_metadata(
context.asset_object_kind.as_str(),
owner_user_id,
BIG_FISH_ENTITY_KIND,
context.entity_id.as_str(),
context.binding_slot.as_str(),
),
body: downloaded.bytes,
},
)
.await
.map_err(map_big_fish_asset_oss_error)?;
let head = oss_client
.head_object(
&http_client,
OssHeadObjectRequest {
object_key: put_result.object_key.clone(),
},
)
.await
.map_err(map_big_fish_asset_oss_error)?;
let asset_object = state
.spacetime_client()
.confirm_asset_object(
build_asset_object_upsert_input(
generate_asset_object_id(generated_at_micros),
head.bucket,
head.object_key,
AssetObjectAccessPolicy::Private,
head.content_type.or(Some(downloaded.mime_type)),
head.content_length,
head.etag,
context.asset_object_kind.clone(),
Some(generated.task_id),
Some(owner_user_id.to_string()),
None,
Some(context.entity_id.clone()),
generated_at_micros,
)
.map_err(map_big_fish_asset_object_prepare_error)?,
)
.await
.map_err(map_big_fish_asset_spacetime_error)?;
state
.spacetime_client()
.bind_asset_object_to_entity(
build_asset_entity_binding_input(
generate_asset_binding_id(generated_at_micros),
asset_object.asset_object_id,
BIG_FISH_ENTITY_KIND.to_string(),
context.entity_id.clone(),
context.binding_slot.clone(),
context.asset_object_kind.clone(),
Some(owner_user_id.to_string()),
None,
generated_at_micros,
)
.map_err(map_big_fish_asset_binding_prepare_error)?,
)
.await
.map_err(map_big_fish_asset_spacetime_error)?;
Ok(put_result.legacy_public_path)
}
fn build_big_fish_asset_metadata(
asset_kind: &str,
owner_user_id: &str,
entity_kind: &str,
entity_id: &str,
slot: &str,
) -> BTreeMap<String, String> {
BTreeMap::from([
("asset_kind".to_string(), asset_kind.to_string()),
("owner_user_id".to_string(), owner_user_id.to_string()),
("entity_kind".to_string(), entity_kind.to_string()),
("entity_id".to_string(), entity_id.to_string()),
("slot".to_string(), slot.to_string()),
])
}
fn parse_big_fish_json_payload(raw_text: &str, fallback_message: &str) -> Result<Value, AppError> {
serde_json::from_str::<Value>(raw_text).map_err(|error| {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": format!("{fallback_message}{error}"),
}))
})
}
fn extract_big_fish_task_id(payload: &Value) -> Option<String> {
find_first_big_fish_string_by_key(payload, "task_id")
}
fn extract_big_fish_image_urls(payload: &Value) -> Vec<String> {
let mut urls = Vec::new();
collect_big_fish_strings_by_key(payload, "image", &mut urls);
collect_big_fish_strings_by_key(payload, "url", &mut urls);
let mut deduped = Vec::new();
for url in urls {
if !deduped.contains(&url) {
deduped.push(url);
}
}
deduped
}
fn find_first_big_fish_string_by_key(payload: &Value, target_key: &str) -> Option<String> {
let mut results = Vec::new();
collect_big_fish_strings_by_key(payload, target_key, &mut results);
results.into_iter().next()
}
fn collect_big_fish_strings_by_key(payload: &Value, target_key: &str, results: &mut Vec<String>) {
match payload {
Value::Array(entries) => {
for entry in entries {
collect_big_fish_strings_by_key(entry, target_key, results);
}
}
Value::Object(object) => {
for (key, value) in object {
if key == target_key
&& let Some(text) = value.as_str()
{
results.push(text.to_string());
}
collect_big_fish_strings_by_key(value, target_key, results);
}
}
_ => {}
}
}
fn normalize_big_fish_downloaded_image_mime_type(content_type: &str) -> String {
let mime_type = content_type
.split(';')
.next()
.map(str::trim)
.unwrap_or("image/jpeg");
match mime_type {
"image/png" | "image/webp" | "image/jpeg" | "image/jpg" | "image/gif" => {
mime_type.to_string()
}
_ => "image/jpeg".to_string(),
}
}
fn big_fish_mime_to_extension(mime_type: &str) -> &str {
match mime_type {
"image/png" => "png",
"image/webp" => "webp",
"image/gif" => "gif",
_ => "jpg",
}
}
fn map_big_fish_dashscope_request_error(message: String) -> AppError {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": message,
}))
}
fn map_big_fish_dashscope_upstream_error(raw_text: &str, fallback_message: &str) -> AppError {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "dashscope",
"message": parse_big_fish_api_error_message(raw_text, fallback_message),
}))
}
fn parse_big_fish_api_error_message(raw_text: &str, fallback_message: &str) -> String {
let trimmed = raw_text.trim();
if trimmed.is_empty() {
return fallback_message.to_string();
}
if let Ok(payload) = serde_json::from_str::<Value>(trimmed)
&& let Some(message) = find_first_big_fish_string_by_key(&payload, "message")
.or_else(|| find_first_big_fish_string_by_key(&payload, "code"))
{
return message;
}
let excerpt = trimmed.chars().take(240).collect::<String>();
format!("{fallback_message}{excerpt}")
}
fn map_big_fish_asset_object_prepare_error(error: AssetObjectFieldError) -> AppError {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "asset-object",
"message": error.to_string(),
}))
}
fn map_big_fish_asset_binding_prepare_error(error: AssetObjectFieldError) -> AppError {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "asset-entity-binding",
"message": error.to_string(),
}))
}
fn map_big_fish_asset_spacetime_error(error: SpacetimeClientError) -> AppError {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "spacetimedb",
"message": error.to_string(),
}))
}
fn map_big_fish_asset_oss_error(error: platform_oss::OssError) -> AppError {
let status = match error {
platform_oss::OssError::InvalidConfig(_) | platform_oss::OssError::InvalidRequest(_) => {
StatusCode::BAD_REQUEST
}
platform_oss::OssError::ObjectNotFound(_) => StatusCode::NOT_FOUND,
platform_oss::OssError::Request(_)
| platform_oss::OssError::SerializePolicy(_)
| platform_oss::OssError::Sign(_) => StatusCode::BAD_GATEWAY,
};
AppError::from_status(status).with_details(json!({
"provider": "aliyun-oss",
"message": error.to_string(),
}))
}
fn build_big_fish_level_part(level: Option<u32>) -> String {
level
.map(|value| format!("level-{value}"))
.unwrap_or_else(|| "stage".to_string())
}
fn sanitize_big_fish_path_segment(value: &str, fallback: &str) -> String {
let sanitized = value
.trim()
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
ch
} else {
'-'
}
})
.collect::<String>()
.trim_matches('-')
.to_string();
if sanitized.is_empty() {
fallback.to_string()
} else {
sanitized
}
}
fn ensure_non_empty(
request_context: &RequestContext,
value: &str,
field_name: &str,
) -> Result<(), Response> {
if value.trim().is_empty() {
return Err(big_fish_bad_request(
request_context,
format!("{field_name} is required").as_str(),
));
}
Ok(())
}
fn big_fish_bad_request(request_context: &RequestContext, message: &str) -> Response {
big_fish_error_response(
request_context,
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "big-fish",
"message": message,
})),
)
}
fn big_fish_sse_json_event_or_error(event_name: &str, payload: Value) -> Event {
match serde_json::to_string(&payload) {
Ok(payload_text) => Event::default().event(event_name).data(payload_text),
Err(_) => big_fish_sse_error_event_message("SSE payload 序列化失败".to_string()),
}
}
fn big_fish_sse_error_event_message(message: String) -> Event {
let payload = format!(
"{{\"message\":{}}}",
serde_json::to_string(&message)
.unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string())
);
Event::default().event("error").data(payload)
}
fn map_big_fish_client_error(error: SpacetimeClientError) -> AppError {
let status = match &error {
SpacetimeClientError::Procedure(message)
if message.contains("big_fish_creation_session 不存在")
|| message.contains("big_fish_runtime_run 不存在") =>
{
StatusCode::NOT_FOUND
}
SpacetimeClientError::Procedure(message)
if message.contains("不能为空")
|| message.contains("尚未编译")
|| message.contains("不允许")
|| message.contains("非法")
|| message.contains("缺少") =>
{
StatusCode::BAD_REQUEST
}
SpacetimeClientError::Runtime(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::BAD_GATEWAY,
};
AppError::from_status(status).with_details(json!({
"provider": "spacetimedb",
"message": error.to_string(),
}))
}
fn big_fish_error_response(request_context: &RequestContext, error: AppError) -> Response {
error.into_response_with_context(Some(request_context))
}
fn current_utc_micros() -> i64 {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock should be after unix epoch");
i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64")
}
fn current_timestamp_micros_to_string(value: i64) -> String {
format_timestamp_micros(value)
}