use std::collections::BTreeMap; use axum::{ Json, extract::{Extension, Path, State, rejection::JsonRejection}, http::StatusCode, response::{ IntoResponse, Response, sse::{Event, Sse}, }, }; use module_custom_world::{ CustomWorldThemeMode, empty_agent_anchor_content_json, empty_agent_asset_coverage_json, empty_agent_creator_intent_readiness_json, empty_json_array, empty_json_object, }; use serde_json::{Map, Value, json}; use shared_contracts::runtime::{ CreateCustomWorldAgentSessionRequest, CustomWorldAgentCardDetailResponse, CustomWorldAgentCheckpointResponse, CustomWorldAgentMessageResponse, CustomWorldAgentOperationResponse, CustomWorldAgentSessionResponse, CustomWorldAgentSessionSnapshotResponse, CustomWorldDraftCardDetailResponse, CustomWorldDraftCardDetailSectionResponse, CustomWorldDraftCardSummaryResponse, CustomWorldGalleryCardResponse, CustomWorldGalleryDetailResponse, CustomWorldGalleryResponse, CustomWorldLibraryEntryResponse, CustomWorldLibraryMutationResponse, CustomWorldLibraryResponse, CustomWorldProfileUpsertRequest, CustomWorldPublishGateResponse, CustomWorldResultPreviewBlockerResponse, CustomWorldSupportedActionResponse, CustomWorldWorkSummaryResponse, CustomWorldWorksResponse, ExecuteCustomWorldAgentActionRequest, SendCustomWorldAgentMessageRequest, }; use shared_kernel::build_prefixed_uuid_id; use spacetime_client::{ CustomWorldAgentActionExecuteRecordInput, CustomWorldAgentCheckpointRecord, CustomWorldAgentMessageFinalizeRecordInput, CustomWorldAgentMessageRecord, CustomWorldAgentMessageSubmitRecordInput, CustomWorldAgentOperationProgressRecordInput, CustomWorldAgentOperationRecord, CustomWorldAgentSessionCreateRecordInput, CustomWorldAgentSessionRecord, CustomWorldDraftCardDetailRecord, CustomWorldDraftCardDetailSectionRecord, CustomWorldDraftCardRecord, CustomWorldGalleryEntryRecord, CustomWorldLibraryEntryRecord, CustomWorldProfileUpsertRecordInput, CustomWorldPublishGateRecord, CustomWorldResultPreviewBlockerRecord, CustomWorldSupportedActionRecord, CustomWorldWorkSummaryRecord, SpacetimeClientError, }; use std::{collections::BTreeSet, convert::Infallible, sync::Arc, time::Instant}; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::info; use crate::{ ai_generation_drafts::{ AiGenerationDraftContext, AiGenerationDraftSink, AiGenerationDraftWriter, }, api_response::json_success_body, auth::AuthenticatedAccessToken, character_visual_assets::generate_character_primary_visual_for_profile, custom_world_agent_entities::generate_custom_world_agent_entities, custom_world_agent_turn::{ CustomWorldAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input, run_custom_world_agent_turn, }, custom_world_ai::generate_custom_world_scene_image_for_profile, custom_world_foundation_draft::{ DraftFoundationPayloadError, build_draft_foundation_action_payload_json, generate_custom_world_foundation_draft, }, http_error::AppError, request_context::RequestContext, state::AppState, }; const DRAFT_ASSET_GENERATION_MAX_ATTEMPTS: u32 = 3; pub async fn get_custom_world_library( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { let owner_user_id = authenticated.claims().user_id().to_string(); let entries = state .spacetime_client() .list_custom_world_profiles(owner_user_id) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldLibraryResponse { entries: entries .into_iter() .map(map_custom_world_library_entry_response) .collect(), }, )) } pub async fn get_custom_world_library_detail( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, Path(profile_id): Path, ) -> Result, Response> { if profile_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": "profileId is required", })), )); } let detail = state .spacetime_client() .get_custom_world_library_detail(authenticated.claims().user_id().to_string(), profile_id) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldGalleryDetailResponse { entry: map_custom_world_library_entry_response(detail.entry), }, )) } pub async fn put_custom_world_library_profile( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, Path(profile_id): Path, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": error.body_text(), })), ) })?; let owner_user_id = authenticated.claims().user_id().to_string(); if profile_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": "profileId is required", })), )); } let metadata = extract_custom_world_metadata(&payload.profile).map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": error, })), ) })?; let author_display_name = resolve_author_display_name(&state, &authenticated); let author_public_user_code = resolve_author_public_user_code(&state, &authenticated, &request_context)?; let mutation = state .spacetime_client() .upsert_custom_world_profile(CustomWorldProfileUpsertRecordInput { profile_id: profile_id.clone(), owner_user_id: owner_user_id.clone(), public_work_code: None, author_public_user_code: Some(author_public_user_code), source_agent_session_id: payload.source_agent_session_id.clone(), world_name: metadata.world_name, subtitle: metadata.subtitle, summary_text: metadata.summary_text, theme_mode: metadata.theme_mode, cover_image_src: metadata.cover_image_src, profile_payload_json: serde_json::to_string(&payload.profile).map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": format!("profile JSON 序列化失败:{error}"), })), ) })?, playable_npc_count: metadata.playable_npc_count, landmark_count: metadata.landmark_count, author_display_name, updated_at_micros: current_utc_micros(), }) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldLibraryMutationResponse { entry: map_custom_world_library_entry_response(mutation.entry.clone()), entries: vec![map_custom_world_library_entry_response(mutation.entry)], }, )) } pub async fn delete_custom_world_library_profile( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, Path(profile_id): Path, ) -> Result, Response> { let owner_user_id = authenticated.claims().user_id().to_string(); if profile_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": "profileId is required", })), )); } let entries = state .spacetime_client() .delete_custom_world_profile(profile_id, owner_user_id, current_utc_micros()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldLibraryResponse { entries: entries .into_iter() .map(map_custom_world_library_entry_response) .collect(), }, )) } pub async fn publish_custom_world_library_profile( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, Path(profile_id): Path, ) -> Result, Response> { let owner_user_id = authenticated.claims().user_id().to_string(); if profile_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": "profileId is required", })), )); } let mutation = state .spacetime_client() .publish_custom_world_profile( profile_id, owner_user_id, None, resolve_author_public_user_code(&state, &authenticated, &request_context)?, resolve_author_display_name(&state, &authenticated), current_utc_micros(), ) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldLibraryMutationResponse { entry: map_custom_world_library_entry_response(mutation.entry.clone()), entries: vec![map_custom_world_library_entry_response(mutation.entry)], }, )) } pub async fn unpublish_custom_world_library_profile( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, Path(profile_id): Path, ) -> Result, Response> { let owner_user_id = authenticated.claims().user_id().to_string(); if profile_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-library", "message": "profileId is required", })), )); } let mutation = state .spacetime_client() .unpublish_custom_world_profile( profile_id, owner_user_id, resolve_author_display_name(&state, &authenticated), current_utc_micros(), ) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldLibraryMutationResponse { entry: map_custom_world_library_entry_response(mutation.entry.clone()), entries: vec![map_custom_world_library_entry_response(mutation.entry)], }, )) } pub async fn list_custom_world_gallery( State(state): State, Extension(request_context): Extension, ) -> Result, Response> { let entries = state .spacetime_client() .list_custom_world_gallery_entries() .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldGalleryResponse { entries: entries .into_iter() .map(map_custom_world_gallery_card_response) .collect(), }, )) } pub async fn get_custom_world_gallery_detail( State(state): State, Path((owner_user_id, profile_id)): Path<(String, String)>, Extension(request_context): Extension, ) -> Result, Response> { if owner_user_id.trim().is_empty() || profile_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-gallery", "message": "ownerUserId and profileId are required", })), )); } let detail = state .spacetime_client() .get_custom_world_gallery_detail(owner_user_id, profile_id) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldGalleryDetailResponse { entry: map_custom_world_library_entry_response(detail.entry), }, )) } pub async fn get_custom_world_gallery_detail_by_code( State(state): State, Path(code): Path, Extension(request_context): Extension, ) -> Result, Response> { if code.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-gallery", "message": "code is required", })), )); } let detail = state .spacetime_client() .get_custom_world_gallery_detail_by_code(code) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldGalleryDetailResponse { entry: map_custom_world_library_entry_response(detail.entry), }, )) } pub async fn create_custom_world_agent_session( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": error.body_text(), })), ) })?; let seed_text = payload.seed_text.unwrap_or_default().trim().to_string(); let welcome_message_text = build_custom_world_agent_welcome_text(&seed_text); let session = state .spacetime_client() .create_custom_world_agent_session(CustomWorldAgentSessionCreateRecordInput { session_id: build_prefixed_uuid_id("custom-world-agent-session-"), owner_user_id: authenticated.claims().user_id().to_string(), seed_text, welcome_message_id: build_prefixed_uuid_id("message-"), welcome_message_text, anchor_content_json: empty_agent_anchor_content_json(), creator_intent_json: Some(empty_json_object()), creator_intent_readiness_json: empty_agent_creator_intent_readiness_json(), anchor_pack_json: Some(empty_json_object()), lock_state_json: Some(empty_json_object()), draft_profile_json: Some(empty_json_object()), pending_clarifications_json: empty_json_array(), suggested_actions_json: empty_json_array(), recommended_replies_json: empty_json_array(), quality_findings_json: empty_json_array(), asset_coverage_json: empty_agent_asset_coverage_json(), checkpoints_json: empty_json_array(), created_at_micros: current_utc_micros(), }) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldAgentSessionResponse { session: map_custom_world_agent_session_response(session), }, )) } pub async fn get_custom_world_agent_session( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { if session_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "sessionId is required", })), )); } let session = state .spacetime_client() .get_custom_world_agent_session(session_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; log_custom_world_publish_gate_diagnostics("get_session", &session); Ok(json_success_body( Some(&request_context), map_custom_world_agent_session_response(session), )) } pub async fn get_custom_world_works( State(state): State, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { let items = state .spacetime_client() .list_custom_world_works(authenticated.claims().user_id().to_string()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldWorksResponse { items: items .into_iter() .map(map_custom_world_work_summary_response) .collect(), }, )) } pub async fn delete_custom_world_agent_session( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { ensure_non_empty(&request_context, &session_id, "sessionId")?; let items = state .spacetime_client() .delete_custom_world_agent_session(session_id, authenticated.claims().user_id().to_string()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldWorksResponse { items: items .into_iter() .map(map_custom_world_work_summary_response) .collect(), }, )) } pub async fn get_custom_world_agent_card_detail( State(state): State, Path((session_id, card_id)): Path<(String, String)>, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { if session_id.trim().is_empty() || card_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "sessionId and cardId are required", })), )); } let card = state .spacetime_client() .get_custom_world_agent_card_detail( session_id, authenticated.claims().user_id().to_string(), card_id, ) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), CustomWorldAgentCardDetailResponse { card: map_custom_world_draft_card_detail_response(card), }, )) } pub async fn submit_custom_world_agent_message( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": error.body_text(), })), ) })?; if session_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "sessionId is required", })), )); } let client_message_id = payload.client_message_id.trim().to_string(); let message_text = payload.text.trim().to_string(); if client_message_id.is_empty() || message_text.is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "clientMessageId and text are required", })), )); } let owner_user_id = authenticated.claims().user_id().to_string(); let operation_id = build_prefixed_uuid_id("operation-"); let submitted_at_micros = current_utc_micros(); let operation = state .spacetime_client() .submit_custom_world_agent_message(CustomWorldAgentMessageSubmitRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), user_message_id: client_message_id, user_message_text: message_text, operation_id: operation_id.clone(), submitted_at_micros, }) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; let session = state .spacetime_client() .get_custom_world_agent_session(session_id.clone(), owner_user_id.clone()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new( "custom_world", owner_user_id.as_str(), session_id.as_str(), operation_id.as_str(), "自定义世界模板生成草稿", )); if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await { tracing::warn!(error = %error, "自定义世界模板生成草稿任务启动失败,主生成流程继续执行"); } let draft_sink = AiGenerationDraftSink::new( AiGenerationDraftContext::new( "custom_world", owner_user_id.as_str(), session_id.as_str(), operation_id.as_str(), "自定义世界模板生成草稿", ), state.spacetime_client().clone(), ); let turn_result = run_custom_world_agent_turn( CustomWorldAgentTurnRequest { llm_client: state.llm_client(), session: &session, quick_fill_requested: payload.quick_fill_requested.unwrap_or(false), focus_card_id: payload.focus_card_id.clone(), }, move |text| { draft_sink.persist_visible_text_async(text); }, ) .await; if let Ok(result) = &turn_result { draft_writer .persist_visible_text( state.spacetime_client(), result.assistant_reply_text.as_str(), ) .await; } let finalize_input = match turn_result { Ok(turn_result) => build_finalize_record_input( session_id.clone(), owner_user_id.clone(), operation_id.clone(), format!("assistant-{}", operation.operation_id), turn_result, current_utc_micros(), ), Err(error) => build_failed_finalize_record_input( session_id.clone(), owner_user_id.clone(), operation_id.clone(), &session, error.to_string(), current_utc_micros(), ), }; let finalized_operation = state .spacetime_client() .finalize_custom_world_agent_message(finalize_input) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; if finalized_operation.status == "failed" { let message = finalized_operation .error_message .clone() .unwrap_or_else(|| "消息处理失败".to_string()); return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "custom-world-agent", "message": message, "operationId": finalized_operation.operation_id, })), )); } Ok(json_success_body( Some(&request_context), json!({ "operation": map_custom_world_agent_operation_response(finalized_operation), }), )) } pub async fn stream_custom_world_agent_message( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result { let Json(payload) = payload.map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": error.body_text(), })), ) })?; if session_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "sessionId is required", })), )); } let client_message_id = payload.client_message_id.trim().to_string(); let message_text = payload.text.trim().to_string(); if client_message_id.is_empty() || message_text.is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "clientMessageId and text are required", })), )); } let owner_user_id = authenticated.claims().user_id().to_string(); let operation = state .spacetime_client() .submit_custom_world_agent_message(CustomWorldAgentMessageSubmitRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), user_message_id: client_message_id, user_message_text: message_text, operation_id: build_prefixed_uuid_id("operation-"), submitted_at_micros: current_utc_micros(), }) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; let session = state .spacetime_client() .get_custom_world_agent_session(session_id.clone(), owner_user_id.clone()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; let quick_fill_requested = payload.quick_fill_requested.unwrap_or(false); let focus_card_id = payload.focus_card_id.clone(); let state = state.clone(); let session_id_for_stream = session_id.clone(); let owner_user_id_for_stream = owner_user_id.clone(); let operation_id = operation.operation_id.clone(); let stream = async_stream::stream! { let mut draft_writer = AiGenerationDraftWriter::new(AiGenerationDraftContext::new( "custom_world", owner_user_id_for_stream.as_str(), session_id_for_stream.as_str(), operation_id.as_str(), "自定义世界模板生成草稿", )); if let Err(error) = draft_writer.ensure_started(state.spacetime_client()).await { tracing::warn!(error = %error, "自定义世界模板生成草稿任务启动失败,主生成流程继续执行"); } let draft_sink = AiGenerationDraftSink::new( AiGenerationDraftContext::new( "custom_world", owner_user_id_for_stream.as_str(), session_id_for_stream.as_str(), operation_id.as_str(), "自定义世界模板生成草稿", ), state.spacetime_client().clone(), ); // 聊天回复必须等本轮模型解析、进度与会话快照全部落库后, // 再随最终 session 一次性返回,避免玩家先看到回复而进度仍停在旧状态。 let turn_result = run_custom_world_agent_turn( CustomWorldAgentTurnRequest { llm_client: state.llm_client(), session: &session, quick_fill_requested, focus_card_id, }, move |text| { draft_sink.persist_visible_text_async(text); }, ) .await; if let Ok(result) = &turn_result { draft_writer .persist_visible_text(state.spacetime_client(), result.assistant_reply_text.as_str()) .await; } let finalize_input = match turn_result { Ok(turn_result) => build_finalize_record_input( session_id_for_stream.clone(), owner_user_id_for_stream.clone(), operation_id.clone(), format!("assistant-{operation_id}"), turn_result, current_utc_micros(), ), Err(error) => build_failed_finalize_record_input( session_id_for_stream.clone(), owner_user_id_for_stream.clone(), operation_id.clone(), &session, error.to_string(), current_utc_micros(), ), }; let finalize_result = state .spacetime_client() .finalize_custom_world_agent_message(finalize_input) .await; let _finalized_operation = match finalize_result { Ok(operation) => operation, Err(error) => { yield Ok::(custom_world_sse_json_event_or_error( "error", json!({ "message": error.to_string() }), )); return; } }; if _finalized_operation.status == "failed" { yield Ok::(custom_world_sse_json_event_or_error( "error", json!({ "message": _finalized_operation .error_message .clone() .unwrap_or_else(|| "消息处理失败".to_string()) }), )); return; } let final_session = match state .spacetime_client() .get_custom_world_agent_session( session_id_for_stream, owner_user_id_for_stream, ) .await { Ok(session) => session, Err(error) => { yield Ok::(custom_world_sse_json_event_or_error( "error", json!({ "message": error.to_string() }), )); return; } }; let session_response = map_custom_world_agent_session_response(final_session); yield Ok::(custom_world_sse_json_event_or_error( "session", json!({ "session": session_response }), )); yield Ok::(custom_world_sse_json_event_or_error( "done", json!({ "ok": true }), )); }; Ok(Sse::new(stream).into_response()) } pub async fn get_custom_world_agent_operation( State(state): State, Path((session_id, operation_id)): Path<(String, String)>, Extension(request_context): Extension, Extension(authenticated): Extension, ) -> Result, Response> { if session_id.trim().is_empty() || operation_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "sessionId and operationId are required", })), )); } let operation = state .spacetime_client() .get_custom_world_agent_operation( session_id, authenticated.claims().user_id().to_string(), operation_id, ) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; Ok(json_success_body( Some(&request_context), map_custom_world_agent_operation_response(operation), )) } pub async fn execute_custom_world_agent_action( State(state): State, Path(session_id): Path, Extension(request_context): Extension, Extension(authenticated): Extension, payload: Result, JsonRejection>, ) -> Result, Response> { let Json(payload) = payload.map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": error.body_text(), })), ) })?; if session_id.trim().is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "sessionId is required", })), )); } let action = payload.action.trim().to_string(); if action.is_empty() { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "action is required", })), )); } let owner_user_id = authenticated.claims().user_id().to_string(); let submitted_at_micros = current_utc_micros(); let payload_json = if matches!( action.as_str(), "draft_foundation" | "generate_characters" | "generate_landmarks" ) { let session = state .spacetime_client() .get_custom_world_agent_session(session_id.clone(), owner_user_id.clone()) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; let llm_client = state.llm_client().ok_or_else(|| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({ "provider": "custom-world-agent", "message": "服务端尚未配置可用的 LLM API Key", })), ) })?; if action == "draft_foundation" { if session.progress_percent < 100 { return Err(custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": "draft_foundation requires progressPercent >= 100", })), )); } let operation_id = build_prefixed_uuid_id("operation-"); let operation = state .spacetime_client() .upsert_custom_world_agent_operation_progress( CustomWorldAgentOperationProgressRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), operation_id: operation_id.clone(), operation_type: "draft_foundation".to_string(), operation_status: "running".to_string(), phase_label: "整理世界骨架".to_string(), phase_detail: "正在校验已确认锚点,并准备第一版世界框架生成链路。" .to_string(), operation_progress: 12, error_message: None, updated_at_micros: submitted_at_micros, }, ) .await .map_err(|error| { custom_world_error_response( &request_context, map_custom_world_client_error(error), ) })?; spawn_custom_world_draft_foundation_job( state.clone(), session, owner_user_id, operation_id, payload, ); return Ok(json_success_body( Some(&request_context), json!({ "operation": map_custom_world_agent_operation_response(operation), }), )); } else { let generation_result = generate_custom_world_agent_entities(llm_client, &session, &payload) .await .map_err(|message| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({ "provider": "custom-world-agent", "message": message, })), ) })?; generation_result.payload_json } } else { serde_json::to_string(&payload).map_err(|error| { custom_world_error_response( &request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world-agent", "message": format!("action payload JSON 序列化失败:{error}"), })), ) })? }; let result = state .spacetime_client() .execute_custom_world_agent_action(CustomWorldAgentActionExecuteRecordInput { session_id: session_id.clone(), owner_user_id: owner_user_id.clone(), operation_id: build_prefixed_uuid_id("operation-"), action: action.clone(), payload_json: Some(payload_json), submitted_at_micros, }) .await .map_err(|error| { custom_world_error_response(&request_context, map_custom_world_client_error(error)) })?; if matches!( action.as_str(), "sync_result_profile" | "publish_world" | "draft_foundation" ) { if let Ok(session) = state .spacetime_client() .get_custom_world_agent_session(session_id.clone(), owner_user_id.clone()) .await { log_custom_world_publish_gate_diagnostics(action.as_str(), &session); } } Ok(json_success_body( Some(&request_context), json!({ "operation": map_custom_world_agent_operation_response(result.operation), }), )) } fn spawn_custom_world_draft_foundation_job( state: AppState, session: CustomWorldAgentSessionRecord, owner_user_id: String, operation_id: String, payload: ExecuteCustomWorldAgentActionRequest, ) { tokio::spawn(async move { let Some(llm_client) = state.llm_client().cloned() else { let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿生成失败", "服务端尚未配置可用的 LLM API Key", 100, Some("服务端尚未配置可用的 LLM API Key".to_string()), ) .await; return; }; let progress_state = state.clone(); let progress_session_id = session.session_id.clone(); let progress_owner_user_id = owner_user_id.clone(); let progress_operation_id = operation_id.clone(); let draft_result = generate_custom_world_foundation_draft(&llm_client, &session, move |progress| { let progress_state = progress_state.clone(); let session_id = progress_session_id.clone(); let owner_user_id = progress_owner_user_id.clone(); let operation_id = progress_operation_id.clone(); tokio::spawn(async move { let _ = upsert_custom_world_draft_foundation_progress( &progress_state, &session_id, &owner_user_id, &operation_id, "running", progress.phase_label.as_str(), progress.phase_detail.as_str(), progress.progress, None, ) .await; }); }) .await; let draft_result = match draft_result { Ok(result) => result, Err(message) => { let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿生成失败", message.clone().as_str(), 100, Some(message.clone()), ) .await; return; } }; let mut draft_profile_json = draft_result.draft_profile_json; let mut draft_profile_value = match serde_json::from_str::(&draft_profile_json) { Ok(Value::Object(object)) => Value::Object(object), Ok(_) => { let message = "foundation draft JSON 必须是 object".to_string(); let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿素材生成失败", message.as_str(), 100, Some(message.clone()), ) .await; return; } Err(error) => { let message = format!("foundation draft JSON 非法:{error}"); let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿素材生成失败", message.as_str(), 100, Some(message.clone()), ) .await; return; } }; let image_generation_limiter = Arc::new(Semaphore::new( state.config.draft_asset_generation_max_concurrent_requests, )); let role_visual_profile_input = draft_profile_value.clone(); let act_background_profile_input = draft_profile_value.clone(); // 角色主形象与幕背景图互不依赖,必须并行发起;上游生图请求统一限流,避免同批草稿瞬时打满供应商接口。 let (role_visual_result, act_background_result) = tokio::join!( async { let mut profile = role_visual_profile_input; generate_draft_foundation_role_visuals( &state, &session, &owner_user_id, &operation_id, &mut profile, image_generation_limiter.clone(), ) .await .map(|_| profile) }, async { let mut profile = act_background_profile_input; generate_draft_foundation_act_backgrounds( &state, &session, &owner_user_id, &operation_id, &mut profile, image_generation_limiter.clone(), ) .await .map(|_| profile) } ); let mut draft_profile_with_assets = draft_profile_value.clone(); let mut asset_generation_errors = Vec::new(); match role_visual_result { Ok(profile) => draft_profile_with_assets = profile, Err(message) => asset_generation_errors.push(("生成角色主形象失败", message)), } match act_background_result { Ok(profile) => { merge_generated_act_backgrounds(&mut draft_profile_with_assets, &profile) } Err(message) => asset_generation_errors.push(("生成幕背景图失败", message)), } draft_profile_value = draft_profile_with_assets; if !asset_generation_errors.is_empty() { let message = asset_generation_errors .iter() .map(|(_, message)| message.as_str()) .collect::>() .join(";"); let phase_label = asset_generation_errors .first() .map(|(label, _)| *label) .unwrap_or("素材生成失败"); persist_partial_draft_foundation_after_asset_failure( &state, &session, &owner_user_id, &operation_id, &draft_profile_value, phase_label, message.as_str(), ) .await; return; } draft_profile_json = match serde_json::to_string(&draft_profile_value) { Ok(value) => value, Err(error) => { let message = format!("带素材的 foundation draft JSON 序列化失败:{error}"); let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿素材写回失败", message.as_str(), 100, Some(message.clone()), ) .await; return; } }; let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "running", "编译草稿卡", "正在把世界底稿整理成可浏览的卡片摘要和详情结构。", 98, None, ) .await; let payload_json = match build_draft_foundation_action_payload_json(&payload, &draft_profile_json) { Ok(value) => value, Err(error) => { let message = match error { DraftFoundationPayloadError::SerializePayload(message) => message, DraftFoundationPayloadError::InvalidPayloadShape => { "action payload 必须是 object".to_string() } DraftFoundationPayloadError::InvalidGeneratedDraft(message) => message, }; let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿写入失败", message.clone().as_str(), 100, Some(message), ) .await; return; } }; if let Err(error) = state .spacetime_client() .execute_custom_world_agent_action(CustomWorldAgentActionExecuteRecordInput { session_id: session.session_id.clone(), owner_user_id: owner_user_id.clone(), operation_id: operation_id.clone(), action: "draft_foundation".to_string(), payload_json: Some(payload_json), submitted_at_micros: current_utc_micros(), }) .await { let message = error.to_string(); let _ = upsert_custom_world_draft_foundation_progress( &state, &session.session_id, &owner_user_id, &operation_id, "failed", "底稿写入失败", message.clone().as_str(), 100, Some(message), ) .await; } }); } async fn generate_draft_foundation_role_visuals( state: &AppState, session: &CustomWorldAgentSessionRecord, owner_user_id: &str, operation_id: &str, draft_profile: &mut Value, image_generation_limiter: Arc, ) -> Result<(), String> { let Some(profile_object) = draft_profile.as_object_mut() else { return Err("foundation draft JSON 必须是 object".to_string()); }; let mut role_refs = Vec::new(); for key in ["playableNpcs", "storyNpcs"] { if let Some(roles) = profile_object.get(key).and_then(Value::as_array) { for index in 0..roles.len() { role_refs.push((key.to_string(), index)); } } } let mut role_generation_refs = Vec::new(); for (key, index) in role_refs { let role = profile_object .get(key.as_str()) .and_then(Value::as_array) .and_then(|roles| roles.get(index)) .cloned() .unwrap_or(Value::Null); let name = json_text_from_value(&role, "name").unwrap_or_else(|| format!("角色{}", index + 1)); let role_id = json_text_from_value(&role, "id").unwrap_or_else(|| format!("{key}-{index}")); let visual_prompt = json_text_from_value(&role, "visualDescription").ok_or_else(|| { format!("角色「{name}」缺少 visualDescription,不能在角色形象设定文本生成前直接生图。") })?; role_generation_refs.push(RoleVisualGenerationRef { key, index, role_id, name, prompt: visual_prompt, }); } upsert_custom_world_draft_foundation_progress( state, &session.session_id, owner_user_id, operation_id, "running", "并行生成角色主形象", format!("正在同时生成 {} 张角色主形象。", role_generation_refs.len()).as_str(), 97, None, ) .await .map_err(|error| error.to_string())?; let mut generation_tasks = JoinSet::new(); for role_ref in role_generation_refs { let task_state = (*state).clone(); let task_owner_user_id = owner_user_id.to_string(); let task_limiter = image_generation_limiter.clone(); generation_tasks.spawn(async move { let mut last_error = None; for attempt in 1..=DRAFT_ASSET_GENERATION_MAX_ATTEMPTS { let generation_result = { let _permit = task_limiter .acquire() .await .map_err(|error| format!("图片生成并发控制失效:{error}"))?; generate_character_primary_visual_for_profile( &task_state, task_owner_user_id.as_str(), role_ref.role_id.as_str(), role_ref.prompt.as_str(), Some(role_ref.name.as_str()), ) .await }; match generation_result { Ok(generated) => { return Ok::<_, String>((role_ref.key, role_ref.index, generated)); } Err(error) => { last_error = Some(error.body_text()); if attempt < DRAFT_ASSET_GENERATION_MAX_ATTEMPTS { tokio::time::sleep(std::time::Duration::from_millis( 300 * u64::from(attempt), )) .await; } } } } Err(format!( "角色「{}」主形象连续生成 {} 次失败:{}", role_ref.name, DRAFT_ASSET_GENERATION_MAX_ATTEMPTS, last_error.unwrap_or_else(|| "未知错误".to_string()) )) }); } let mut errors = Vec::new(); while let Some(result) = generation_tasks.join_next().await { let task_result = result.map_err(|error| error.to_string())?; let (key, index, generated) = match task_result { Ok(value) => value, Err(message) => { errors.push(message); continue; } }; if let Some(role_object) = profile_object .get_mut(key.as_str()) .and_then(Value::as_array_mut) .and_then(|roles| roles.get_mut(index)) .and_then(Value::as_object_mut) { role_object.insert("imageSrc".to_string(), Value::String(generated.image_src)); role_object.insert( "generatedVisualAssetId".to_string(), Value::String(generated.asset_id), ); } } if !errors.is_empty() { return Err(join_unique_error_messages(errors)); } Ok(()) } async fn generate_draft_foundation_act_backgrounds( state: &AppState, session: &CustomWorldAgentSessionRecord, owner_user_id: &str, operation_id: &str, draft_profile: &mut Value, image_generation_limiter: Arc, ) -> Result<(), String> { let world_name = json_text_from_value(draft_profile, "name").unwrap_or_else(|| "未命名世界".to_string()); let profile_id = json_text_from_value(draft_profile, "id"); let scene_image_profile_input = draft_profile.clone(); let act_refs = collect_scene_act_refs(draft_profile); validate_scene_act_background_prompts(&act_refs)?; tracing::info!( operation_id, session_id = %session.session_id, act_count = act_refs.len(), max_concurrent_requests = state.config.draft_asset_generation_max_concurrent_requests, "开始并行生成草稿幕背景图" ); upsert_custom_world_draft_foundation_progress( state, &session.session_id, owner_user_id, operation_id, "running", "并行生成幕背景图", format!("正在同时生成 {} 张幕背景图。", act_refs.len()).as_str(), 98, None, ) .await .map_err(|error| error.to_string())?; let mut generation_tasks = JoinSet::new(); for act_ref in act_refs { let task_state = (*state).clone(); let task_owner_user_id = owner_user_id.to_string(); let task_profile_id = profile_id.clone(); let task_world_name = world_name.clone(); let task_profile = scene_image_profile_input.clone(); let task_limiter = image_generation_limiter.clone(); let task_operation_id = operation_id.to_string(); let task_session_id = session.session_id.clone(); generation_tasks.spawn(async move { let mut last_error = None; for attempt in 1..=DRAFT_ASSET_GENERATION_MAX_ATTEMPTS { let attempt_started_at = Instant::now(); tracing::info!( operation_id = %task_operation_id, session_id = %task_session_id, chapter_index = act_ref.chapter_index, act_index = act_ref.act_index, scene_id = %act_ref.scene_id, scene_name = %act_ref.scene_name, attempt, "开始生成单幕背景图" ); let generation_result = { let _permit = task_limiter.acquire().await.map_err(|error| { ( act_ref.chapter_index, act_ref.act_index, format!("图片生成并发控制失效:{error}"), ) })?; generate_custom_world_scene_image_for_profile( &task_state, task_owner_user_id.as_str(), &task_profile, task_profile_id.as_deref(), task_world_name.as_str(), act_ref.scene_id.as_str(), act_ref.scene_name.as_str(), act_ref.scene_description.as_str(), act_ref.prompt.as_str(), ) .await }; match generation_result { Ok(generated) => { tracing::info!( operation_id = %task_operation_id, session_id = %task_session_id, chapter_index = act_ref.chapter_index, act_index = act_ref.act_index, scene_id = %act_ref.scene_id, scene_name = %act_ref.scene_name, attempt, elapsed_ms = attempt_started_at.elapsed().as_millis(), "单幕背景图生成成功" ); return Ok::<_, (usize, usize, String)>(( act_ref.chapter_index, act_ref.act_index, generated, )); } Err(error) => { let error_message = error.body_text(); tracing::warn!( operation_id = %task_operation_id, session_id = %task_session_id, chapter_index = act_ref.chapter_index, act_index = act_ref.act_index, scene_id = %act_ref.scene_id, scene_name = %act_ref.scene_name, attempt, elapsed_ms = attempt_started_at.elapsed().as_millis(), error_message = %error_message, "单幕背景图生成失败" ); last_error = Some(error_message); if attempt < DRAFT_ASSET_GENERATION_MAX_ATTEMPTS { tokio::time::sleep(std::time::Duration::from_millis( 300 * u64::from(attempt), )) .await; } } } } Err(( act_ref.chapter_index, act_ref.act_index, format!( "第{}章第{}幕「{}」背景图连续生成 {} 次失败:{}", act_ref.chapter_index + 1, act_ref.act_index + 1, act_ref.scene_name, DRAFT_ASSET_GENERATION_MAX_ATTEMPTS, last_error.unwrap_or_else(|| "未知错误".to_string()) ), )) }); } let mut errors = Vec::new(); let mut generated_count = 0usize; while let Some(result) = generation_tasks.join_next().await { let task_result = result.map_err(|error| error.to_string())?; let (chapter_index, act_index, generated) = match task_result { Ok(value) => value, Err((chapter_index, act_index, message)) => { mark_scene_act_background_generation_error( draft_profile, chapter_index, act_index, &message, ); errors.push(message); continue; } }; if let Some(act_object) = draft_profile .get_mut("sceneChapterBlueprints") .and_then(Value::as_array_mut) .and_then(|chapters| chapters.get_mut(chapter_index)) .and_then(|chapter| chapter.get_mut("acts")) .and_then(Value::as_array_mut) .and_then(|acts| acts.get_mut(act_index)) .and_then(Value::as_object_mut) { act_object.insert( "backgroundImageSrc".to_string(), Value::String(generated.image_src), ); act_object.insert( "backgroundAssetId".to_string(), Value::String(generated.asset_id), ); act_object.insert( "generatedScenePrompt".to_string(), Value::String(generated.prompt), ); act_object.insert( "generatedSceneModel".to_string(), Value::String(generated.model), ); generated_count += 1; } } if !errors.is_empty() { if generated_count > 0 { // 自动草稿生成和手动生成用的是同一套生图与资产入库能力;这里不能因为批量中的个别幕失败, // 把已经写入 profile 分支的 backgroundImageSrc 一起丢掉,否则前端就看不到已经生成好的图。 tracing::warn!( generated_count, failed_count = errors.len(), error_message = %join_unique_error_messages(errors), "部分幕背景图生成失败,已保留成功生成的幕图" ); return Ok(()); } return Err(join_unique_error_messages(errors)); } Ok(()) } fn mark_scene_act_background_generation_error( draft_profile: &mut Value, chapter_index: usize, act_index: usize, message: &str, ) { if let Some(act_object) = draft_profile .get_mut("sceneChapterBlueprints") .and_then(Value::as_array_mut) .and_then(|chapters| chapters.get_mut(chapter_index)) .and_then(|chapter| chapter.get_mut("acts")) .and_then(Value::as_array_mut) .and_then(|acts| acts.get_mut(act_index)) .and_then(Value::as_object_mut) { act_object.insert( "backgroundGenerationError".to_string(), Value::String(message.trim().to_string()), ); } } fn join_unique_error_messages(messages: Vec) -> String { // 并行图片任务可能从同一个上游故障返回完全相同的业务错误;用户侧只需要看到去重后的失败项。 messages .into_iter() .map(|message| message.trim().to_string()) .filter(|message| !message.is_empty()) .collect::>() .into_iter() .collect::>() .join(";") } struct RoleVisualGenerationRef { key: String, index: usize, role_id: String, name: String, prompt: String, } struct SceneActGenerationRef { chapter_index: usize, act_index: usize, scene_id: String, scene_name: String, scene_description: String, prompt: String, } fn collect_scene_act_refs(draft_profile: &Value) -> Vec { let scene_context_by_id = collect_scene_context_by_id(draft_profile); draft_profile .get("sceneChapterBlueprints") .and_then(Value::as_array) .into_iter() .flatten() .enumerate() .flat_map(|(chapter_index, chapter)| { let chapter_scene_id = json_text_from_value(chapter, "sceneId") .or_else(|| json_text_from_value(chapter, "id")) .unwrap_or_else(|| format!("chapter-{chapter_index}")); let chapter_scene_name = json_first_text_from_value( chapter, &["sceneName", "landmarkName", "name", "title"], ) .unwrap_or_else(|| chapter_scene_id.clone()); let chapter_scene_context = scene_context_by_id .get(&chapter_scene_id) .cloned() .unwrap_or_else(|| SceneImageContext { id: chapter_scene_id.clone(), name: chapter_scene_name.clone(), description: json_text_from_value(chapter, "description") .or_else(|| json_text_from_value(chapter, "summary")) .unwrap_or_default(), danger_level: json_text_from_value(chapter, "dangerLevel").unwrap_or_default(), }); let scene_contexts = scene_context_by_id.clone(); chapter .get("acts") .and_then(Value::as_array) .into_iter() .flatten() .enumerate() .map(move |(act_index, act)| { let prompt = json_first_text_from_value( act, &[ "backgroundPromptText", "scenePromptText", "visualPromptText", "promptText", "imagePromptText", "backgroundPrompt", "visualPrompt", ], ) .unwrap_or_default(); let scene_name = json_first_text_from_value( act, &["sceneName", "landmarkName", "locationName"], ) .unwrap_or_else(|| chapter_scene_context.name.clone()); let act_scene_id = json_text_from_value(act, "sceneId") .unwrap_or_else(|| chapter_scene_context.id.clone()); let scene_context = scene_contexts .get(&act_scene_id) .cloned() .unwrap_or_else(|| SceneImageContext { id: act_scene_id.clone(), name: scene_name, description: chapter_scene_context.description.clone(), danger_level: chapter_scene_context.danger_level.clone(), }); SceneActGenerationRef { chapter_index, act_index, scene_id: act_scene_id, scene_name: scene_context.name, scene_description: scene_context.description, prompt: prompt.clone(), } }) }) .collect() } #[derive(Clone, Debug)] struct SceneImageContext { id: String, name: String, description: String, danger_level: String, } fn collect_scene_context_by_id(draft_profile: &Value) -> BTreeMap { let mut contexts = BTreeMap::new(); if let Some(camp) = draft_profile.get("camp").and_then(Value::as_object) { if let Some(context) = scene_context_from_object(camp, "camp") { contexts.insert(context.id.clone(), context); } } if let Some(landmarks) = draft_profile.get("landmarks").and_then(Value::as_array) { for landmark in landmarks.iter().filter_map(Value::as_object) { if let Some(context) = scene_context_from_object(landmark, "landmark") { contexts.insert(context.id.clone(), context); } } } contexts } fn scene_context_from_object( object: &Map, fallback_id: &str, ) -> Option { let id = read_string_field(object, "id") .or_else(|| read_string_field(object, "sceneId")) .unwrap_or_else(|| fallback_id.to_string()); let name = read_string_field(object, "name") .or_else(|| read_string_field(object, "sceneName")) .unwrap_or_else(|| id.clone()); Some(SceneImageContext { id, name, description: read_string_field(object, "description") .or_else(|| read_string_field(object, "visualDescription")) .unwrap_or_default(), danger_level: read_string_field(object, "dangerLevel").unwrap_or_default(), }) } fn validate_scene_act_background_prompts(act_refs: &[SceneActGenerationRef]) -> Result<(), String> { if let Some(act_ref) = act_refs.iter().find(|act_ref| act_ref.prompt.is_empty()) { return Err(format!( "第{}章第{}幕「{}」缺少 backgroundPromptText,不能在幕背景图描述文本生成前直接生图。", act_ref.chapter_index + 1, act_ref.act_index + 1, act_ref.scene_name )); } Ok(()) } fn json_first_text_from_value(value: &Value, keys: &[&str]) -> Option { keys.iter().find_map(|key| json_text_from_value(value, key)) } fn merge_generated_act_backgrounds(target_profile: &mut Value, background_profile: &Value) { let Some(target_chapters) = target_profile .get_mut("sceneChapterBlueprints") .and_then(Value::as_array_mut) else { return; }; let Some(background_chapters) = background_profile .get("sceneChapterBlueprints") .and_then(Value::as_array) else { return; }; for (chapter_index, background_chapter) in background_chapters.iter().enumerate() { let Some(target_acts) = target_chapters .get_mut(chapter_index) .and_then(|chapter| chapter.get_mut("acts")) .and_then(Value::as_array_mut) else { continue; }; let Some(background_acts) = background_chapter.get("acts").and_then(Value::as_array) else { continue; }; for (act_index, background_act) in background_acts.iter().enumerate() { let Some(target_act_object) = target_acts .get_mut(act_index) .and_then(Value::as_object_mut) else { continue; }; let Some(background_act_object) = background_act.as_object() else { continue; }; // 只合并图片生成产物字段,避免并行分支把其他草稿内容互相覆盖。 for key in [ "backgroundImageSrc", "backgroundAssetId", "generatedScenePrompt", "generatedSceneModel", ] { if let Some(value) = background_act_object.get(key) { target_act_object.insert(key.to_string(), value.clone()); } } } } } fn json_text_from_value(value: &Value, key: &str) -> Option { value .get(key) .and_then(Value::as_str) .map(str::trim) .filter(|value| !value.is_empty()) .map(ToOwned::to_owned) } async fn persist_partial_draft_foundation_after_asset_failure( state: &AppState, session: &CustomWorldAgentSessionRecord, owner_user_id: &str, operation_id: &str, draft_profile: &Value, phase_label: &str, error_message: &str, ) { let draft_profile_json = match serde_json::to_string(draft_profile) { Ok(value) => Some(value), Err(error) => { tracing::warn!(error = %error, "素材失败后的部分底稿序列化失败"); None } }; let finalize_result = state .spacetime_client() .finalize_custom_world_agent_message(CustomWorldAgentMessageFinalizeRecordInput { session_id: session.session_id.clone(), owner_user_id: owner_user_id.to_string(), operation_id: operation_id.to_string(), assistant_message_id: None, assistant_reply_text: None, phase_label: phase_label.to_string(), phase_detail: format!("已保存成功生成的素材,失败项超过 {DRAFT_ASSET_GENERATION_MAX_ATTEMPTS} 次重试:{error_message}"), operation_status: "failed".to_string(), operation_progress: 100, stage: session.stage.clone(), progress_percent: session.progress_percent, focus_card_id: session.focus_card_id.clone(), anchor_content_json: session.anchor_content.to_string(), creator_intent_json: Some(session.creator_intent.to_string()), creator_intent_readiness_json: session.creator_intent_readiness.to_string(), anchor_pack_json: Some(session.anchor_pack.to_string()), draft_profile_json, pending_clarifications_json: Value::Array(session.pending_clarifications.clone()).to_string(), suggested_actions_json: Value::Array(session.suggested_actions.clone()).to_string(), recommended_replies_json: json!(session.recommended_replies).to_string(), quality_findings_json: Value::Array(session.quality_findings.clone()).to_string(), asset_coverage_json: session.asset_coverage.to_string(), error_message: Some(error_message.to_string()), updated_at_micros: current_utc_micros(), }) .await; if let Err(error) = finalize_result { tracing::warn!(error = %error, "素材失败后的部分底稿持久化失败"); let _ = upsert_custom_world_draft_foundation_progress( state, &session.session_id, owner_user_id, operation_id, "failed", phase_label, error_message, 100, Some(error_message.to_string()), ) .await; } } async fn upsert_custom_world_draft_foundation_progress( state: &AppState, session_id: &str, owner_user_id: &str, operation_id: &str, status: &str, phase_label: &str, phase_detail: &str, progress: u32, error_message: Option, ) -> Result { state .spacetime_client() .upsert_custom_world_agent_operation_progress( CustomWorldAgentOperationProgressRecordInput { session_id: session_id.to_string(), owner_user_id: owner_user_id.to_string(), operation_id: operation_id.to_string(), operation_type: "draft_foundation".to_string(), operation_status: status.to_string(), phase_label: phase_label.to_string(), phase_detail: phase_detail.to_string(), operation_progress: progress.min(100), error_message, updated_at_micros: current_utc_micros(), }, ) .await } fn map_custom_world_library_entry_response( entry: CustomWorldLibraryEntryRecord, ) -> CustomWorldLibraryEntryResponse { CustomWorldLibraryEntryResponse { owner_user_id: entry.owner_user_id, profile_id: entry.profile_id, public_work_code: entry.public_work_code, author_public_user_code: entry.author_public_user_code, profile: entry.profile, visibility: entry.visibility, published_at: entry.published_at, updated_at: entry.updated_at, author_display_name: entry.author_display_name, world_name: entry.world_name, subtitle: entry.subtitle, summary_text: entry.summary_text, cover_image_src: entry.cover_image_src, theme_mode: entry.theme_mode, playable_npc_count: entry.playable_npc_count, landmark_count: entry.landmark_count, } } fn map_custom_world_gallery_card_response( entry: CustomWorldGalleryEntryRecord, ) -> CustomWorldGalleryCardResponse { CustomWorldGalleryCardResponse { owner_user_id: entry.owner_user_id, profile_id: entry.profile_id, public_work_code: entry.public_work_code, author_public_user_code: entry.author_public_user_code, visibility: entry.visibility, published_at: entry.published_at, updated_at: entry.updated_at, author_display_name: entry.author_display_name, world_name: entry.world_name, subtitle: entry.subtitle, summary_text: entry.summary_text, cover_image_src: entry.cover_image_src, theme_mode: entry.theme_mode, playable_npc_count: entry.playable_npc_count, landmark_count: entry.landmark_count, } } fn map_custom_world_work_summary_response( item: CustomWorldWorkSummaryRecord, ) -> CustomWorldWorkSummaryResponse { CustomWorldWorkSummaryResponse { work_id: item.work_id, source_type: item.source_type, status: item.status, title: item.title, subtitle: item.subtitle, summary: item.summary, cover_image_src: item.cover_image_src, cover_render_mode: item.cover_render_mode, cover_character_image_srcs: item.cover_character_image_srcs, updated_at: item.updated_at, published_at: item.published_at, stage: item.stage, stage_label: item.stage_label, playable_npc_count: item.playable_npc_count, landmark_count: item.landmark_count, role_visual_ready_count: item.role_visual_ready_count, role_animation_ready_count: item.role_animation_ready_count, role_asset_summary_label: item.role_asset_summary_label, session_id: item.session_id, profile_id: item.profile_id, can_resume: item.can_resume, can_enter_world: item.can_enter_world, blocker_count: item.blocker_count, publish_ready: item.publish_ready, } } fn map_custom_world_agent_session_response( session: CustomWorldAgentSessionRecord, ) -> CustomWorldAgentSessionSnapshotResponse { CustomWorldAgentSessionSnapshotResponse { session_id: session.session_id, current_turn: session.current_turn, anchor_content: session.anchor_content, progress_percent: session.progress_percent, last_assistant_reply: session.last_assistant_reply, stage: session.stage, focus_card_id: session.focus_card_id, creator_intent: session.creator_intent, creator_intent_readiness: session.creator_intent_readiness, anchor_pack: session.anchor_pack, lock_state: session.lock_state, draft_profile: session.draft_profile, messages: session .messages .into_iter() .map(map_custom_world_agent_message_response) .collect(), draft_cards: session .draft_cards .into_iter() .map(map_custom_world_draft_card_response) .collect(), pending_clarifications: session.pending_clarifications, suggested_actions: session.suggested_actions, recommended_replies: session.recommended_replies, quality_findings: session.quality_findings, asset_coverage: session.asset_coverage, checkpoints: session .checkpoints .into_iter() .map(map_custom_world_agent_checkpoint_response) .collect(), supported_actions: session .supported_actions .into_iter() .map(map_custom_world_supported_action_response) .collect(), publish_gate: session .publish_gate .map(map_custom_world_publish_gate_response), result_preview: session.result_preview, updated_at: session.updated_at, } } fn log_custom_world_publish_gate_diagnostics( source: &str, session: &CustomWorldAgentSessionRecord, ) { let draft_profile = session.draft_profile.as_object(); let preview_profile = session .result_preview .as_ref() .and_then(|preview| preview.get("preview")) .and_then(Value::as_object); let profile = preview_profile.or(draft_profile); let blocker_codes = session .publish_gate .as_ref() .map(|gate| { gate.blockers .iter() .map(|blocker| blocker.code.as_str()) .collect::>() .join(",") }) .unwrap_or_default(); info!( target: "custom_world.publish_gate", source, session_id = %session.session_id, stage = %session.stage, publish_ready = session.publish_gate.as_ref().map(|gate| gate.publish_ready).unwrap_or(false), blocker_count = session.publish_gate.as_ref().map(|gate| gate.blocker_count).unwrap_or(0), blocker_codes = %blocker_codes, has_draft_profile = session.draft_profile.as_object().map(|value| !value.is_empty()).unwrap_or(false), has_result_preview = session.result_preview.is_some(), preview_source = session.result_preview.as_ref().and_then(|value| value.get("source")).and_then(serde_json::Value::as_str).unwrap_or(""), has_world_hook = has_custom_world_publish_text(profile, &["worldHook", "creatorIntent.worldHook", "anchorContent.worldPromise.hook", "settingText"]), has_player_premise = has_custom_world_publish_text(profile, &["playerPremise", "creatorIntent.playerPremise", "anchorContent.playerEntryPoint.openingIdentity", "anchorContent.playerEntryPoint.openingProblem", "anchorContent.playerEntryPoint.entryMotivation"]), has_core_conflicts = has_custom_world_non_empty_text_array(profile, "coreConflicts"), has_main_chapter = has_custom_world_array(profile, "chapters") || has_custom_world_array(profile, "sceneChapterBlueprints") || has_custom_world_array(profile, "sceneChapters"), has_scene_act = has_custom_world_scene_act(profile), "custom world publish gate diagnostics" ); } fn has_custom_world_publish_text(profile: Option<&Map>, paths: &[&str]) -> bool { paths.iter().any(|path| { let mut segments = path.split('.'); let Some(first_segment) = segments.next() else { return false; }; let mut current = profile.and_then(|value| value.get(first_segment)); for segment in segments { current = current.and_then(|value| value.get(segment)); } current .and_then(Value::as_str) .map(str::trim) .map(|value| !value.is_empty()) .unwrap_or(false) }) } fn has_custom_world_non_empty_text_array(profile: Option<&Map>, key: &str) -> bool { profile .and_then(|value| value.get(key)) .and_then(Value::as_array) .map(|items| { items.iter().any(|item| { item.as_str() .map(str::trim) .is_some_and(|value| !value.is_empty()) }) }) .unwrap_or(false) } fn has_custom_world_array(profile: Option<&Map>, key: &str) -> bool { profile .and_then(|value| value.get(key)) .and_then(Value::as_array) .map(|items| !items.is_empty()) .unwrap_or(false) } fn has_custom_world_scene_act(profile: Option<&Map>) -> bool { profile .and_then(|value| { value .get("sceneChapterBlueprints") .or_else(|| value.get("sceneChapters")) }) .and_then(Value::as_array) .map(|chapters| { chapters.iter().any(|chapter| { chapter .get("acts") .and_then(Value::as_array) .map(|acts| !acts.is_empty()) .unwrap_or(false) }) }) .unwrap_or(false) } fn map_custom_world_publish_gate_response( gate: CustomWorldPublishGateRecord, ) -> CustomWorldPublishGateResponse { CustomWorldPublishGateResponse { profile_id: gate.profile_id, blockers: gate .blockers .into_iter() .map(map_custom_world_result_preview_blocker_response) .collect(), blocker_count: gate.blocker_count, publish_ready: gate.publish_ready, can_enter_world: gate.can_enter_world, } } fn map_custom_world_agent_message_response( message: CustomWorldAgentMessageRecord, ) -> CustomWorldAgentMessageResponse { CustomWorldAgentMessageResponse { id: message.message_id, role: message.role, kind: message.kind, text: message.text, created_at: message.created_at, related_operation_id: message.related_operation_id, } } fn map_custom_world_agent_operation_response( operation: CustomWorldAgentOperationRecord, ) -> CustomWorldAgentOperationResponse { CustomWorldAgentOperationResponse { operation_id: operation.operation_id, operation_type: operation.operation_type, status: operation.status, phase_label: operation.phase_label, phase_detail: operation.phase_detail, progress: operation.progress, error: operation.error_message, } } fn map_custom_world_draft_card_response( card: CustomWorldDraftCardRecord, ) -> CustomWorldDraftCardSummaryResponse { CustomWorldDraftCardSummaryResponse { id: card.card_id, kind: card.kind, title: card.title, subtitle: card.subtitle, summary: card.summary, status: card.status, linked_ids: card.linked_ids, warning_count: card.warning_count, asset_status: card.asset_status, asset_status_label: card.asset_status_label, } } fn map_custom_world_draft_card_detail_response( card: CustomWorldDraftCardDetailRecord, ) -> CustomWorldDraftCardDetailResponse { CustomWorldDraftCardDetailResponse { id: card.card_id, kind: card.kind, title: card.title, sections: card .sections .into_iter() .map(map_custom_world_draft_card_detail_section_response) .collect(), linked_ids: card.linked_ids, locked: card.locked, editable: card.editable, editable_section_ids: card.editable_section_ids, warning_messages: card.warning_messages, asset_status: card.asset_status, asset_status_label: card.asset_status_label, } } fn map_custom_world_draft_card_detail_section_response( section: CustomWorldDraftCardDetailSectionRecord, ) -> CustomWorldDraftCardDetailSectionResponse { CustomWorldDraftCardDetailSectionResponse { id: section.section_id, label: section.label, value: section.value, } } fn map_custom_world_agent_checkpoint_response( checkpoint: CustomWorldAgentCheckpointRecord, ) -> CustomWorldAgentCheckpointResponse { CustomWorldAgentCheckpointResponse { checkpoint_id: checkpoint.checkpoint_id, created_at: checkpoint.created_at, label: checkpoint.label, } } fn map_custom_world_supported_action_response( action: CustomWorldSupportedActionRecord, ) -> CustomWorldSupportedActionResponse { CustomWorldSupportedActionResponse { action: action.action, enabled: action.enabled, reason: action.reason, } } fn map_custom_world_result_preview_blocker_response( blocker: CustomWorldResultPreviewBlockerRecord, ) -> CustomWorldResultPreviewBlockerResponse { CustomWorldResultPreviewBlockerResponse { id: blocker.id, code: blocker.code, message: blocker.message, } } fn map_custom_world_client_error(error: SpacetimeClientError) -> AppError { let status = match &error { SpacetimeClientError::Procedure(message) if message.contains("custom_world_profile 不存在") => { StatusCode::NOT_FOUND } SpacetimeClientError::Procedure(message) if message.contains("custom_world_agent_session 不存在") => { StatusCode::NOT_FOUND } SpacetimeClientError::Procedure(message) if message.contains("custom_world_agent_operation 不存在") => { StatusCode::NOT_FOUND } SpacetimeClientError::Procedure(message) if message.contains("当前模型不可用") || message.contains("设定生成失败") || message.contains("解析失败") || message.contains("缺少有效回复") => { StatusCode::BAD_GATEWAY } SpacetimeClientError::Runtime(_) => StatusCode::BAD_REQUEST, _ => StatusCode::BAD_GATEWAY, }; AppError::from_status(status).with_details(json!({ "provider": "spacetimedb", "message": error.to_string(), })) } fn custom_world_error_response(request_context: &RequestContext, error: AppError) -> Response { error.into_response_with_context(Some(request_context)) } fn ensure_non_empty( request_context: &RequestContext, value: &str, field_name: &str, ) -> Result<(), Response> { if value.trim().is_empty() { return Err(custom_world_error_response( request_context, AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({ "provider": "custom-world", "message": format!("{field_name} is required"), })), )); } Ok(()) } fn custom_world_sse_json_event(event_name: &str, payload: Value) -> Result { Event::default() .event(event_name) .json_data(payload) .map_err(|error| { AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ "provider": "sse", "message": format!("SSE payload 序列化失败:{error}"), })) }) } fn custom_world_sse_json_event_or_error(event_name: &str, payload: Value) -> Event { match custom_world_sse_json_event(event_name, payload) { Ok(event) => event, Err(_) => custom_world_sse_error_event_message("SSE payload 序列化失败".to_string()), } } fn custom_world_sse_error_event_message(message: String) -> Event { let payload = format!( "{{\"message\":{}}}", serde_json::to_string(&message) .unwrap_or_else(|_| "\"SSE 错误事件序列化失败\"".to_string()) ); Event::default().event("error").data(payload) } fn resolve_author_display_name( state: &AppState, authenticated: &AuthenticatedAccessToken, ) -> String { state .auth_user_service() .get_user_by_id(authenticated.claims().user_id()) .ok() .flatten() .map(|user| user.display_name) .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| "玩家".to_string()) } fn resolve_author_public_user_code( state: &AppState, authenticated: &AuthenticatedAccessToken, request_context: &RequestContext, ) -> Result { state .auth_user_service() .get_user_by_id(authenticated.claims().user_id()) .map_err(|error| { custom_world_error_response( request_context, AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ "provider": "custom-world-library", "message": format!("作者叙世号读取失败:{error}"), })), ) })? .map(|user| user.public_user_code) .filter(|value| !value.trim().is_empty()) .ok_or_else(|| { custom_world_error_response( request_context, AppError::from_status(StatusCode::UNAUTHORIZED).with_details(json!({ "provider": "custom-world-library", "message": "当前登录用户缺少叙世号", })), ) }) } fn build_custom_world_agent_welcome_text(seed_text: &str) -> String { if seed_text.trim().is_empty() { return "我会先帮你把世界的核心锚点整理出来。你可以从世界钩子、玩家身份、主题氛围、核心冲突、关键关系或标志性元素开始。" .to_string(); } "我已经收到你的世界起点,会先把它整理成创作锚点。你可以继续补充玩家身份、核心冲突、关键关系或标志性元素。".to_string() } struct CustomWorldProfileMetadata { world_name: String, subtitle: String, summary_text: String, cover_image_src: Option, theme_mode: CustomWorldThemeMode, playable_npc_count: u32, landmark_count: u32, } fn extract_custom_world_metadata(profile: &Value) -> Result { let object = profile .as_object() .ok_or_else(|| "profile 必须是 JSON object".to_string())?; let world_name = read_string_field(object, "name").unwrap_or_else(|| "未命名世界".to_string()); let subtitle = read_string_field(object, "subtitle").unwrap_or_default(); let summary_text = read_string_field(object, "summary").unwrap_or_default(); let cover_image_src = resolve_cover_image_src(object); let theme_mode = read_string_field(object, "themeMode") .and_then(|value| CustomWorldThemeMode::from_client_str(&value)) .unwrap_or(CustomWorldThemeMode::Mythic); let playable_npc_count = count_profile_roles(object); let landmark_count = object .get("landmarks") .and_then(Value::as_array) .map(|entries| entries.len() as u32) .unwrap_or(0); Ok(CustomWorldProfileMetadata { world_name, subtitle, summary_text, cover_image_src, theme_mode, playable_npc_count, landmark_count, }) } fn read_string_field(object: &Map, key: &str) -> Option { object .get(key) .and_then(Value::as_str) .map(str::trim) .filter(|value| !value.is_empty()) .map(ToOwned::to_owned) } fn resolve_cover_image_src(object: &Map) -> Option { object .get("cover") .and_then(Value::as_object) .and_then(|cover| read_string_field(cover, "imageSrc")) .or_else(|| { object .get("camp") .and_then(Value::as_object) .and_then(|camp| read_string_field(camp, "imageSrc")) }) .or_else(|| { object .get("landmarks") .and_then(Value::as_array) .and_then(|entries| entries.first()) .and_then(Value::as_object) .and_then(|landmark| read_string_field(landmark, "imageSrc")) }) } fn count_profile_roles(object: &Map) -> u32 { let playable = object .get("playableNpcs") .and_then(Value::as_array) .map(|entries| entries.len() as u32) .unwrap_or(0); let story = object .get("storyNpcs") .and_then(Value::as_array) .map(|entries| entries.len() as u32) .unwrap_or(0); playable.saturating_add(story) } fn current_utc_micros() -> i64 { use std::time::{SystemTime, UNIX_EPOCH}; let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("system clock should be after unix epoch"); i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64") } #[cfg(test)] mod tests { use super::*; #[test] fn collect_scene_act_refs_accepts_scene_prompt_text_alias() { let draft_profile = json!({ "name": "雾港纪元", "tone": "潮湿、悬疑、低照度", "landmarks": [ { "id": "scene-office", "name": "旧港办公室", "description": "旧港边缘的玻璃办公室,窗外能看到潮湿码头。", "dangerLevel": "low" } ], "sceneChapterBlueprints": [ { "sceneId": "scene-office", "sceneName": "旧港办公室", "acts": [ { "title": "深夜工位", "summary": "团队在凌晨三点继续赶版本。", "actGoal": "找到丢失的部署钥匙", "transitionHook": "电梯门在无人操作时打开", "primaryRoleName": "林澈", "supportRoleNames": ["阿岚"], "scenePromptText": "现代创业公司办公室,凌晨灯光,紧张忙碌" } ] } ] }); let act_refs = collect_scene_act_refs(&draft_profile); assert_eq!(act_refs.len(), 1); assert_eq!(act_refs[0].prompt, "现代创业公司办公室,凌晨灯光,紧张忙碌"); assert_eq!(act_refs[0].scene_id, "scene-office"); assert_eq!(act_refs[0].scene_name, "旧港办公室"); assert_eq!( act_refs[0].scene_description, "旧港边缘的玻璃办公室,窗外能看到潮湿码头。" ); assert!(validate_scene_act_background_prompts(&act_refs).is_ok()); } }