This commit is contained in:
2026-05-02 17:56:42 +08:00
parent 2311edb2e6
commit acc55d0e13
40 changed files with 2582 additions and 931 deletions

View File

@@ -19,11 +19,45 @@ pub(crate) async fn execute_billable_asset_operation<T, Fut>(
where
Fut: Future<Output = Result<T, AppError>>,
{
consume_asset_operation_points(state, owner_user_id, asset_kind, asset_id).await?;
execute_billable_asset_operation_with_cost(
state,
owner_user_id,
asset_kind,
asset_id,
ASSET_OPERATION_POINTS_COST,
operation,
)
.await
}
/// 生图等特殊操作可声明独立光点成本,避免修改全局资产操作默认价格。
pub(crate) async fn execute_billable_asset_operation_with_cost<T, Fut>(
state: &AppState,
owner_user_id: &str,
asset_kind: &str,
asset_id: &str,
points_cost: u64,
operation: Fut,
) -> Result<T, AppError>
where
Fut: Future<Output = Result<T, AppError>>,
{
let points_consumed =
consume_asset_operation_points(state, owner_user_id, asset_kind, asset_id, points_cost)
.await?;
match operation.await {
Ok(value) => Ok(value),
Err(error) => {
refund_asset_operation_points(state, owner_user_id, asset_kind, asset_id).await;
if points_consumed {
refund_asset_operation_points(
state,
owner_user_id,
asset_kind,
asset_id,
points_cost,
)
.await;
}
Err(error)
}
}
@@ -35,22 +69,36 @@ async fn consume_asset_operation_points(
owner_user_id: &str,
asset_kind: &str,
asset_id: &str,
) -> Result<(), AppError> {
points_cost: u64,
) -> Result<bool, AppError> {
let ledger_id = format!(
"asset_operation_consume:{}:{}:{}",
owner_user_id, asset_kind, asset_id
);
state
match state
.spacetime_client()
.consume_profile_wallet_points(
owner_user_id.to_string(),
ASSET_OPERATION_POINTS_COST,
points_cost,
ledger_id,
current_utc_micros(),
)
.await
.map(|_| ())
.map_err(map_asset_operation_wallet_error)
{
Ok(_) => Ok(true),
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
// 中文注释:外部生图不应被 Maincloud 钱包短暂 503 阻断;此时跳过扣费,让业务链路继续,避免用户重复点击。
tracing::warn!(
owner_user_id,
asset_kind,
asset_id,
error = %error,
"资产操作光点预扣因 SpacetimeDB 连接不可用而降级跳过"
);
Ok(false)
}
Err(error) => Err(map_asset_operation_wallet_error(error)),
}
}
/// 外部生成或发布 mutation 失败后补偿退款;退款失败只记日志,避免覆盖原始业务错误。
@@ -59,6 +107,7 @@ async fn refund_asset_operation_points(
owner_user_id: &str,
asset_kind: &str,
asset_id: &str,
points_cost: u64,
) {
let ledger_id = format!(
"asset_operation_refund:{}:{}:{}",
@@ -68,7 +117,7 @@ async fn refund_asset_operation_points(
.spacetime_client()
.refund_profile_wallet_points(
owner_user_id.to_string(),
ASSET_OPERATION_POINTS_COST,
points_cost,
ledger_id,
current_utc_micros(),
)
@@ -104,6 +153,45 @@ pub(crate) fn map_asset_operation_wallet_error(error: SpacetimeClientError) -> A
}))
}
pub(crate) fn should_skip_asset_operation_billing_for_connectivity(
error: &SpacetimeClientError,
) -> bool {
match error {
SpacetimeClientError::ConnectDropped | SpacetimeClientError::Timeout => true,
SpacetimeClientError::Build(message)
| SpacetimeClientError::Procedure(message)
| SpacetimeClientError::Runtime(message) => {
message.contains("503")
|| message.contains("Service Unavailable")
|| message.contains("Failed to connect")
|| message.contains("WebSocket")
|| message.contains("连接已断开")
|| message.contains("连接在返回结果前已断开")
}
}
}
fn current_utc_micros() -> i64 {
time::OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn asset_operation_billing_skips_spacetime_connectivity_errors() {
assert_eq!(ASSET_OPERATION_POINTS_COST, 1);
assert!(should_skip_asset_operation_billing_for_connectivity(
&SpacetimeClientError::ConnectDropped
));
assert!(should_skip_asset_operation_billing_for_connectivity(
&SpacetimeClientError::Runtime(
"Failed to connect: HTTP error: 503 Service Unavailable".to_string(),
),
));
assert!(!should_skip_asset_operation_billing_for_connectivity(
&SpacetimeClientError::Procedure("光点余额不足".to_string()),
));
}
}

View File

@@ -185,17 +185,22 @@ pub async fn generate_custom_world_profile(
);
// 中文注释profile 生成需要外部 LLM必须留在 Axum/api-serverSpacetimeDB reducer 只接收确定结果。
let result = generate_custom_world_foundation_draft(llm_client, &session, |_| {})
.await
.map_err(|message| {
custom_world_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "custom-world-profile",
"message": message,
})),
)
})?;
let result = generate_custom_world_foundation_draft(
llm_client,
&session,
state.config.creation_agent_llm_web_search_enabled,
|_| {},
)
.await
.map_err(|message| {
custom_world_error_response(
&request_context,
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "custom-world-profile",
"message": message,
})),
)
})?;
let mut profile =
serde_json::from_str::<Value>(&result.draft_profile_json).map_err(|error| {
custom_world_error_response(
@@ -1775,26 +1780,31 @@ fn spawn_custom_world_draft_foundation_job(
Err(error) => Err(format!("已保存底稿序列化失败:{error}")),
}
} else {
generate_custom_world_foundation_draft(&llm_client, &session, move |progress| {
let progress_state = progress_state.clone();
let session_id = progress_session_id.clone();
let owner_user_id = progress_owner_user_id.clone();
let operation_id = progress_operation_id.clone();
tokio::spawn(async move {
let _ = upsert_custom_world_draft_foundation_progress(
&progress_state,
&session_id,
&owner_user_id,
&operation_id,
"running",
progress.phase_label.as_str(),
progress.phase_detail.as_str(),
progress.progress,
None,
)
.await;
});
})
generate_custom_world_foundation_draft(
&llm_client,
&session,
state.config.creation_agent_llm_web_search_enabled,
move |progress| {
let progress_state = progress_state.clone();
let session_id = progress_session_id.clone();
let owner_user_id = progress_owner_user_id.clone();
let operation_id = progress_operation_id.clone();
tokio::spawn(async move {
let _ = upsert_custom_world_draft_foundation_progress(
&progress_state,
&session_id,
&owner_user_id,
&operation_id,
"running",
progress.phase_label.as_str(),
progress.phase_detail.as_str(),
progress.progress,
None,
)
.await;
});
},
)
.await
};

View File

@@ -10,6 +10,7 @@ use platform_llm::{LlmClient, LlmMessage, LlmTextRequest};
use serde_json::{Map as JsonMap, Value as JsonValue, json};
use shared_contracts::runtime::ExecuteCustomWorldAgentActionRequest;
use spacetime_client::CustomWorldAgentSessionRecord;
use tracing::warn;
use crate::llm_model_routing::CREATION_TEMPLATE_LLM_MODEL;
@@ -35,6 +36,7 @@ pub struct CustomWorldFoundationDraftProgress {
pub async fn generate_custom_world_foundation_draft(
llm_client: &LlmClient,
session: &CustomWorldAgentSessionRecord,
enable_web_search: bool,
mut on_progress: impl FnMut(CustomWorldFoundationDraftProgress) + Send,
) -> Result<CustomWorldFoundationDraftResult, String> {
let setting_text = build_foundation_generation_seed_text(session);
@@ -51,6 +53,7 @@ pub async fn generate_custom_world_foundation_draft(
|response_text| build_custom_world_framework_json_repair_prompt(response_text),
"agent-foundation-framework-json-repair",
"世界框架阶段没有返回有效内容。",
enable_web_search,
)
.await?;
normalize_framework_shape(&mut framework, setting_text.as_str());
@@ -61,6 +64,7 @@ pub async fn generate_custom_world_foundation_draft(
"playable",
FOUNDATION_DRAFT_PLAYABLE_COUNT,
(16, 30),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -72,6 +76,7 @@ pub async fn generate_custom_world_foundation_draft(
"story",
FOUNDATION_DRAFT_STORY_COUNT,
(30, 44),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -82,6 +87,7 @@ pub async fn generate_custom_world_foundation_draft(
&framework,
FOUNDATION_DRAFT_LANDMARK_COUNT,
(44, 66),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -94,6 +100,7 @@ pub async fn generate_custom_world_foundation_draft(
&playable_outlines,
"narrative",
(66, 76),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -104,6 +111,7 @@ pub async fn generate_custom_world_foundation_draft(
&playable_narrative,
"dossier",
(76, 84),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -114,6 +122,7 @@ pub async fn generate_custom_world_foundation_draft(
&story_outlines,
"narrative",
(84, 92),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -124,6 +133,7 @@ pub async fn generate_custom_world_foundation_draft(
&story_narrative,
"dossier",
(92, 96),
enable_web_search,
&mut on_progress,
)
.await?;
@@ -171,22 +181,19 @@ async fn request_foundation_json_stage<F>(
repair_prompt_builder: F,
repair_debug_label: &str,
empty_response_message: &str,
enable_web_search: bool,
) -> Result<JsonValue, String>
where
F: Fn(&str) -> String,
{
let response = llm_client
.request_text(
LlmTextRequest::new(vec![
LlmMessage::system(FOUNDATION_JSON_ONLY_SYSTEM_PROMPT),
LlmMessage::user(user_prompt),
])
.with_model(CREATION_TEMPLATE_LLM_MODEL)
.with_responses_api()
.with_web_search(true),
)
.await
.map_err(|error| format!("{debug_label} LLM 请求失败:{error}"))?;
let response = request_foundation_text_with_optional_search_fallback(
llm_client,
FOUNDATION_JSON_ONLY_SYSTEM_PROMPT,
user_prompt.as_str(),
debug_label,
enable_web_search,
)
.await?;
let text = response.content.trim();
if text.is_empty() {
return Err(empty_response_message.to_string());
@@ -211,12 +218,69 @@ where
}
}
async fn request_foundation_text_with_optional_search_fallback(
llm_client: &LlmClient,
system_prompt: &str,
user_prompt: &str,
debug_label: &str,
enable_web_search: bool,
) -> Result<platform_llm::LlmTextResponse, String> {
match request_foundation_text(llm_client, system_prompt, user_prompt, enable_web_search).await {
Ok(response) => Ok(response),
Err(error) if enable_web_search && should_retry_foundation_without_web_search(&error) => {
warn!(
error = %error,
debug_label,
"foundation draft 联网搜索增强不可用或超时,自动降级为无联网搜索重试"
);
request_foundation_text(llm_client, system_prompt, user_prompt, false)
.await
.map_err(|retry_error| format!("{debug_label} LLM 请求失败:{retry_error}"))
}
Err(error) => Err(format!("{debug_label} LLM 请求失败:{error}")),
}
}
async fn request_foundation_text(
llm_client: &LlmClient,
system_prompt: &str,
user_prompt: &str,
enable_web_search: bool,
) -> Result<platform_llm::LlmTextResponse, platform_llm::LlmError> {
llm_client
.request_text(
LlmTextRequest::new(vec![
LlmMessage::system(system_prompt),
LlmMessage::user(user_prompt),
])
.with_model(CREATION_TEMPLATE_LLM_MODEL)
.with_responses_api()
.with_web_search(enable_web_search),
)
.await
}
fn should_retry_foundation_without_web_search(error: &platform_llm::LlmError) -> bool {
match error {
platform_llm::LlmError::Timeout { .. } | platform_llm::LlmError::Connectivity { .. } => {
true
}
platform_llm::LlmError::Upstream { message, .. } => {
message.contains("ToolNotOpen")
|| message.contains("has not activated web search")
|| message.contains("未开通")
}
_ => false,
}
}
async fn generate_foundation_role_outline_entries(
llm_client: &LlmClient,
framework: &JsonValue,
role_type: &str,
total_count: usize,
progress_range: (u32, u32),
enable_web_search: bool,
on_progress: &mut (impl FnMut(CustomWorldFoundationDraftProgress) + Send),
) -> Result<Vec<JsonValue>, String> {
let mut merged_entries = Vec::new();
@@ -275,6 +339,7 @@ async fn generate_foundation_role_outline_entries(
)
.as_str(),
"角色框架名单阶段没有返回有效内容。",
enable_web_search,
)
.await?;
let key = role_key(role_type);
@@ -305,6 +370,7 @@ async fn generate_foundation_landmark_seed_entries(
framework: &JsonValue,
total_count: usize,
progress_range: (u32, u32),
enable_web_search: bool,
on_progress: &mut (impl FnMut(CustomWorldFoundationDraftProgress) + Send),
) -> Result<Vec<JsonValue>, String> {
let mut merged_entries = Vec::new();
@@ -352,6 +418,7 @@ async fn generate_foundation_landmark_seed_entries(
)
.as_str(),
"地点框架名单阶段没有返回有效内容。",
enable_web_search,
)
.await?;
merged_entries.extend(array_field(&raw, "landmarks").into_iter().take(batch_count));
@@ -486,6 +553,7 @@ async fn expand_foundation_role_entries(
base_entries: &[JsonValue],
stage: &str,
progress_range: (u32, u32),
enable_web_search: bool,
on_progress: &mut (impl FnMut(CustomWorldFoundationDraftProgress) + Send),
) -> Result<Vec<JsonValue>, String> {
let mut merged_entries = Vec::new();
@@ -540,6 +608,7 @@ async fn expand_foundation_role_entries(
)
.as_str(),
"角色档案补全阶段没有返回有效内容。",
enable_web_search,
)
.await?;
merged_entries.extend(array_field(&raw, role_key(role_type)));
@@ -2047,7 +2116,7 @@ mod tests {
net::TcpListener,
sync::{Arc, Mutex},
thread,
time::Duration as StdDuration,
time::{Duration as StdDuration, SystemTime, UNIX_EPOCH},
};
use platform_llm::{DEFAULT_REQUEST_TIMEOUT_MS, LlmConfig, LlmProvider};
@@ -2383,6 +2452,80 @@ mod tests {
);
}
#[test]
fn foundation_search_fallback_handles_tool_unavailable_and_timeout() {
let tool_error = platform_llm::LlmError::Upstream {
status_code: 404,
message: "Your account has not activated web search. code=ToolNotOpen".to_string(),
};
let timeout_error = platform_llm::LlmError::Timeout { attempts: 2 };
assert!(should_retry_foundation_without_web_search(&tool_error));
assert!(should_retry_foundation_without_web_search(&timeout_error));
assert!(!should_retry_foundation_without_web_search(
&platform_llm::LlmError::EmptyResponse
));
}
#[tokio::test]
async fn foundation_json_stage_retries_without_web_search_when_tool_unavailable() {
let log_dir = std::env::temp_dir().join(format!(
"api-server-foundation-raw-log-test-{}-{}",
std::process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after epoch")
.as_nanos()
));
unsafe {
std::env::set_var("LLM_RAW_LOG_DIR", &log_dir);
}
let request_capture = Arc::new(Mutex::new(Vec::new()));
let server_url = spawn_mock_server_with_statuses(
request_capture.clone(),
vec![
MockHttpResponse {
status_code: 404,
body: r#"{"error":{"code":"ToolNotOpen","message":"Your account has not activated web search."}}"#.to_string(),
},
MockHttpResponse {
status_code: 200,
body: llm_response(r#"{"name":"无搜索底稿"}"#),
},
],
);
let llm_client = build_test_llm_client(server_url);
let parsed = request_foundation_json_stage(
&llm_client,
"请生成 JSON".to_string(),
"agent-foundation-test",
|_| "修复 JSON".to_string(),
"agent-foundation-test-json-repair",
"空响应",
true,
)
.await
.expect("web search fallback should succeed");
assert_eq!(parsed.get("name"), Some(&json!("无搜索底稿")));
let requests = request_capture
.lock()
.expect("request capture should lock")
.clone();
assert_eq!(requests.len(), 2);
assert!(requests[0].contains("\"tools\""));
assert!(requests[0].contains("\"web_search\""));
assert!(!requests[1].contains("\"tools\""));
unsafe {
std::env::remove_var("LLM_RAW_LOG_DIR");
}
if log_dir.exists() {
std::fs::remove_dir_all(log_dir).expect("temporary LLM raw log dir should be removed");
}
}
#[tokio::test]
async fn role_outline_missing_asset_fields_are_filled_locally_before_details() {
let request_capture = Arc::new(Mutex::new(Vec::<String>::new()));
@@ -2471,7 +2614,7 @@ mod tests {
let llm_client = build_test_llm_client(server_url);
let session = build_test_session();
let result = generate_custom_world_foundation_draft(&llm_client, &session, |_| {})
let result = generate_custom_world_foundation_draft(&llm_client, &session, false, |_| {})
.await
.expect("draft generation should succeed");
let draft_profile = serde_json::from_str::<JsonValue>(&result.draft_profile_json)
@@ -2739,13 +2882,9 @@ mod tests {
fn llm_response(content: &str) -> String {
json!({
"id": "resp_01",
"choices": [
{
"message": {
"content": content,
}
}
]
"model": CREATION_TEMPLATE_LLM_MODEL,
"output_text": content,
"status": "completed"
})
.to_string()
}
@@ -2814,6 +2953,27 @@ mod tests {
fn spawn_mock_server(
request_capture: Arc<Mutex<Vec<String>>>,
response_bodies: Vec<String>,
) -> String {
spawn_mock_server_with_statuses(
request_capture,
response_bodies
.into_iter()
.map(|body| MockHttpResponse {
status_code: 200,
body,
})
.collect(),
)
}
struct MockHttpResponse {
status_code: u16,
body: String,
}
fn spawn_mock_server_with_statuses(
request_capture: Arc<Mutex<Vec<String>>>,
responses: Vec<MockHttpResponse>,
) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener
@@ -2821,10 +2981,13 @@ mod tests {
.expect("listener should expose address");
thread::spawn(move || {
let mut response_queue = VecDeque::from(response_bodies);
let mut response_queue = VecDeque::from(responses);
for _ in 0..32 {
let response_body = response_queue.pop_front().unwrap_or_else(|| {
llm_response(r#"{"storyNpcs":[{"name":"议长甲","backstory":"长期维持群岛议会体面并遮掩沉船旧案。","personality":"冷硬周密","motivation":"压住旧案","combatStyle":"以权令封锁线索","backstoryReveal":{"publicSummary":"议会遮掩者。","chapters":[{"affinityRequired":15,"title":"议会","summary":"议会出面。"},{"affinityRequired":30,"title":"封锁","summary":"封锁港口。"},{"affinityRequired":60,"title":"旧案","summary":"旧案松动。"},{"affinityRequired":90,"title":"对质","summary":"灯塔对质。"}]},"skills":[{"name":"封港令","summary":"调动巡海封锁","style":"压制"}],"initialItems":[{"name":"议会印信","category":"道具","quantity":1,"rarity":"rare","description":"可调动巡海队。","tags":["权力"]}]}]}"#)
let response = response_queue.pop_front().unwrap_or_else(|| {
MockHttpResponse {
status_code: 200,
body: llm_response(r#"{"storyNpcs":[{"name":"议长甲","backstory":"长期维持群岛议会体面并遮掩沉船旧案。","personality":"冷硬周密","motivation":"压住旧案","combatStyle":"以权令封锁线索","backstoryReveal":{"publicSummary":"议会遮掩者。","chapters":[{"affinityRequired":15,"title":"议会","summary":"议会出面。"},{"affinityRequired":30,"title":"封锁","summary":"封锁港口。"},{"affinityRequired":60,"title":"旧案","summary":"旧案松动。"},{"affinityRequired":90,"title":"对质","summary":"灯塔对质。"}]},"skills":[{"name":"封港令","summary":"调动巡海封锁","style":"压制"}],"initialItems":[{"name":"议会印信","category":"道具","quantity":1,"rarity":"rare","description":"可调动巡海队。","tags":["权力"]}]}]}"#),
}
});
let (mut stream, _) = listener.accept().expect("request should connect");
let request_text = read_request(&mut stream);
@@ -2832,7 +2995,7 @@ mod tests {
.lock()
.expect("request capture should lock")
.push(request_text);
write_response(&mut stream, response_body);
write_response(&mut stream, response);
}
});
@@ -2880,11 +3043,18 @@ mod tests {
String::from_utf8(buffer).expect("request should be utf-8")
}
fn write_response(stream: &mut std::net::TcpStream, body: String) {
fn write_response(stream: &mut std::net::TcpStream, response: MockHttpResponse) {
let status_text = if response.status_code == 200 {
"OK"
} else {
"ERROR"
};
let raw_response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
"HTTP/1.1 {} {}\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response.status_code,
status_text,
response.body.len(),
response.body
);
stream
.write_all(raw_response.as_bytes())

View File

@@ -34,6 +34,10 @@ impl AppError {
self.code
}
pub fn status_code(&self) -> StatusCode {
self.status_code
}
pub fn message(&self) -> &str {
&self.message
}

View File

@@ -76,9 +76,10 @@ use crate::{app::build_router, config::AppConfig, state::AppState};
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// 运行本地开发与联调时,优先从仓库根目录的 .env / .env.local 加载变量,避免手工逐项导出 OSS 配置。
// 运行本地开发与联调时,优先从仓库根目录加载本地变量,避免手工逐项导出 OSS / APIMart 配置。
let _ = dotenvy::from_filename(".env");
let _ = dotenvy::from_filename(".env.local");
let _ = dotenvy::from_filename(".env.secrets.local");
// 统一先从配置对象读取监听地址,避免后续把环境变量读取散落到入口和路由层。
let config = AppConfig::from_env();

File diff suppressed because it is too large Load Diff