Merge branch 'master' into codex/puzzle-clear-template-runtime-fixes

This commit is contained in:
kdletters
2026-06-06 20:01:52 +08:00
425 changed files with 16451 additions and 6022 deletions

View File

@@ -55,7 +55,7 @@ shared-kernel = { workspace = true }
shared-logging = { workspace = true }
socket2 = { workspace = true }
spacetime-client = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal"] }
tokio-stream = { workspace = true }
futures-util = { workspace = true }
time = { workspace = true, features = ["formatting"] }

View File

@@ -878,6 +878,46 @@ mod tests {
);
}
#[tokio::test]
async fn readyz_reports_readiness_and_draining_state() {
let state = AppState::new(AppConfig::default()).expect("state should build");
let app = build_router(state.clone());
let ready_response = app
.clone()
.oneshot(
Request::builder()
.uri("/readyz")
.header("x-request-id", "req-ready")
.body(Body::empty())
.expect("readyz request should build"),
)
.await
.expect("readyz request should succeed");
assert_eq!(ready_response.status(), StatusCode::OK);
let ready_body = read_json_response(ready_response).await;
assert_eq!(ready_body["ok"], Value::Bool(true));
assert_eq!(ready_body["ready"], Value::Bool(true));
state.mark_not_ready();
let draining_response = app
.oneshot(
Request::builder()
.uri("/readyz")
.header("x-request-id", "req-draining")
.body(Body::empty())
.expect("readyz request should build"),
)
.await
.expect("readyz request should succeed");
assert_eq!(draining_response.status(), StatusCode::SERVICE_UNAVAILABLE);
let draining_body = read_json_response(draining_response).await;
assert_eq!(
draining_body["error"]["details"]["reason"],
"api_server_draining"
);
}
#[tokio::test]
async fn creative_agent_draft_edit_rejects_unconfirmed_template_session() {
let app = build_internal_creative_agent_app();
@@ -2658,6 +2698,18 @@ mod tests {
bind_payload["user"]["phoneNumberMasked"],
Value::String("138****8000".to_string())
);
assert_eq!(
bind_payload["user"]["phoneNumber"],
Value::String("+8613800138000".to_string())
);
assert_eq!(
bind_payload["user"]["wechatAccount"],
Value::String("wx-mini-code-bind-001".to_string())
);
assert_eq!(
bind_payload["user"]["wechatDisplayName"],
Value::String("微信旅人".to_string())
);
assert!(
bind_payload["token"]
.as_str()
@@ -3345,6 +3397,10 @@ mod tests {
serde_json::from_slice(&body).expect("response body should be valid json");
assert_eq!(payload["user"]["id"], Value::String(seed_user.id));
assert_eq!(
payload["user"]["phoneNumber"],
Value::String("+8613800138016".to_string())
);
assert_eq!(
payload["availableLoginMethods"],
serde_json::json!(["phone", "password", "wechat"])

View File

@@ -7,10 +7,13 @@ pub fn map_auth_user_payload(user: AuthUser) -> AuthUserPayload {
public_user_code: user.public_user_code,
display_name: user.display_name,
avatar_url: user.avatar_url,
phone_number: user.phone_number,
phone_number_masked: user.phone_number_masked,
login_method: user.login_method.as_str().to_string(),
binding_status: user.binding_status.as_str().to_string(),
wechat_bound: user.wechat_bound,
wechat_display_name: user.wechat_display_name,
wechat_account: user.wechat_account,
}
}

View File

@@ -102,7 +102,7 @@ fn reject_overloaded_request(request: &Request<Body>) -> Response {
}
fn should_bypass_backpressure(request: &Request<Body>) -> bool {
request.uri().path() == "/healthz"
matches!(request.uri().path(), "/healthz" | "/readyz")
}
fn classify_request_permit_pool(path: &str) -> HttpRequestPermitPoolKind {
@@ -200,6 +200,7 @@ mod tests {
.route("/held", get(held_request))
.route("/fast", get(fast_request))
.route("/healthz", get(fast_request))
.route("/readyz", get(fast_request))
.layer(middleware::from_fn_with_state(
backpressure_state,
limit_concurrent_requests,
@@ -297,6 +298,13 @@ mod tests {
.expect("healthz request should complete");
assert_eq!(health_response.status(), StatusCode::OK);
let ready_response = app
.clone()
.oneshot(test_request("/readyz"))
.await
.expect("readyz request should complete");
assert_eq!(ready_response.status(), StatusCode::OK);
gate.release.notify_one();
let completed_response = held_response
.await

View File

@@ -30,7 +30,7 @@ use shared_kernel::{
use spacetime_client::{
BarkBattleDraftConfigUpsertRecordInput, BarkBattleDraftCreateRecordInput,
BarkBattleRunFinishRecordInput, BarkBattleRunRecord, BarkBattleRunStartRecordInput,
BarkBattleWorkPublishRecordInput, SpacetimeClientError,
BarkBattleWorkDeleteRecordInput, BarkBattleWorkPublishRecordInput, SpacetimeClientError,
};
use time::{Duration as TimeDuration, OffsetDateTime};
@@ -406,6 +406,38 @@ pub async fn list_bark_battle_works(
))
}
pub async fn delete_bark_battle_work(
State(state): State<AppState>,
Path(work_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &work_id, "workId")?;
let items = state
.spacetime_client()
.delete_bark_battle_work(BarkBattleWorkDeleteRecordInput {
work_id,
owner_user_id: authenticated.claims().user_id().to_string(),
})
.await
.map_err(|error| {
bark_battle_error_response(&request_context, map_bark_battle_client_error(error))
})?;
let items = items
.into_iter()
.map(|item| {
let author_display_name =
resolve_bark_battle_author_display_name_for_record(&state, &item);
map_work_summary_record(item, &request_context, author_display_name)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(json_success_body(
Some(&request_context),
BarkBattleWorksResponse { items },
))
}
pub async fn list_bark_battle_gallery(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,

View File

@@ -25,6 +25,7 @@ pub struct AppConfig {
pub gallery_max_concurrent_requests: Option<usize>,
pub detail_max_concurrent_requests: Option<usize>,
pub admin_max_concurrent_requests: Option<usize>,
pub shutdown_outbox_flush_timeout: Duration,
pub tracking_outbox_enabled: bool,
pub tracking_outbox_dir: PathBuf,
pub tracking_outbox_batch_size: usize,
@@ -169,6 +170,7 @@ impl Default for AppConfig {
gallery_max_concurrent_requests: None,
detail_max_concurrent_requests: None,
admin_max_concurrent_requests: None,
shutdown_outbox_flush_timeout: Duration::from_millis(5_000),
tracking_outbox_enabled: true,
tracking_outbox_dir: PathBuf::from("server-rs/.data/tracking-outbox"),
tracking_outbox_batch_size: 500,
@@ -365,6 +367,11 @@ impl AppConfig {
{
config.admin_max_concurrent_requests = Some(max_concurrent_requests);
}
if let Some(timeout_ms) =
read_first_positive_u64_env(&["GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS"])
{
config.shutdown_outbox_flush_timeout = Duration::from_millis(timeout_ms);
}
if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_TRACKING_OUTBOX_ENABLED"]) {
config.tracking_outbox_enabled = enabled;
}
@@ -1324,6 +1331,7 @@ mod tests {
std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_DIR");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
@@ -1336,6 +1344,7 @@ mod tests {
std::env::set_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS", "64");
std::env::set_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS", "32");
std::env::set_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS", "16");
std::env::set_var("GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS", "3000");
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED", "false");
std::env::set_var(
"GENARRATIVE_TRACKING_OUTBOX_DIR",
@@ -1354,6 +1363,10 @@ mod tests {
assert_eq!(config.gallery_max_concurrent_requests, Some(64));
assert_eq!(config.detail_max_concurrent_requests, Some(32));
assert_eq!(config.admin_max_concurrent_requests, Some(16));
assert_eq!(
config.shutdown_outbox_flush_timeout,
std::time::Duration::from_millis(3_000)
);
assert!(!config.tracking_outbox_enabled);
assert_eq!(
config.tracking_outbox_dir,
@@ -1374,6 +1387,7 @@ mod tests {
std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_DIR");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");

View File

@@ -277,6 +277,29 @@ mod tests {
);
}
#[test]
fn test_creation_entry_config_response_updates_jump_hop_metadata() {
let config = test_creation_entry_config_response();
let jump_hop = config
.creation_types
.iter()
.find(|item| item.id == "jump-hop")
.expect("test creation entry config should include jump-hop");
assert_eq!(jump_hop.title, "\u{8df3}\u{4e00}\u{8df3}");
assert!(jump_hop.visible);
assert!(jump_hop.open);
assert_eq!(jump_hop.badge, "\u{53ef}\u{521b}\u{5efa}");
assert_eq!(
jump_hop.subtitle,
"\u{4e3b}\u{9898}\u{9a71}\u{52a8}\u{5e73}\u{53f0}\u{8df3}\u{8dc3}"
);
assert_eq!(
jump_hop.image_src,
"/creation-type-references/jump-hop.webp"
);
}
#[test]
fn test_creation_entry_config_response_keeps_baby_object_match_visible() {
let config = test_creation_entry_config_response();

View File

@@ -1,4 +1,4 @@
use axum::http::StatusCode;
use axum::http::StatusCode;
use platform_image::generated_asset_sheets as generated_asset_sheets_impl;
use crate::{
@@ -8,9 +8,12 @@ use crate::{
#[allow(unused_imports)]
pub(crate) use generated_asset_sheets_impl::{
GeneratedAssetSheetError, GeneratedAssetSheetPersistInput, GeneratedAssetSheetPersistPrompt,
GeneratedAssetSheetAlphaOptions, GeneratedAssetSheetError, GeneratedAssetSheetKeyColor,
GeneratedAssetSheetPersistInput, GeneratedAssetSheetPersistPrompt,
GeneratedAssetSheetPromptInput, GeneratedAssetSheetSliceImage, GeneratedAssetSheetUpload,
apply_generated_asset_sheet_green_screen_alpha, crop_generated_asset_sheet_view_edge_matte,
apply_generated_asset_sheet_alpha_with_options, apply_generated_asset_sheet_green_screen_alpha,
crop_generated_asset_sheet_view_edge_matte,
crop_generated_asset_sheet_view_edge_matte_with_options,
};
pub(crate) fn build_generated_asset_sheet_prompt(

View File

@@ -1,7 +1,15 @@
use axum::{Json, extract::Extension};
use axum::{
Json,
extract::{Extension, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde_json::{Value, json};
use crate::{api_response::json_success_body, request_context::RequestContext};
use crate::{
api_response::json_success_body, http_error::AppError, request_context::RequestContext,
state::AppState,
};
pub async fn health_check(Extension(request_context): Extension<RequestContext>) -> Json<Value> {
json_success_body(
@@ -12,3 +20,28 @@ pub async fn health_check(Extension(request_context): Extension<RequestContext>)
}),
)
}
pub async fn readiness_check(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
) -> Response {
if state.is_ready() {
return json_success_body(
Some(&request_context),
json!({
"ok": true,
"ready": true,
"service": "genarrative-api-server",
}),
)
.into_response();
}
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
.with_message("api-server 正在退出,不再接收新流量")
.with_details(json!({
"reason": "api_server_draining",
"ready": false,
}))
.into_response_with_context(Some(&request_context))
}

File diff suppressed because one or more lines are too long

View File

@@ -100,25 +100,35 @@ use shared_logging::{OtelConfig, init_tracing};
use socket2::{Domain, Protocol, Socket, Type};
use std::{
collections::HashSet,
env, fs, io,
env, fs, future, io,
net::{SocketAddr, TcpListener as StdTcpListener},
panic, thread,
panic,
sync::Arc,
thread,
time::Duration,
};
use tokio::net::TcpListener;
use tokio::runtime::Builder as TokioRuntimeBuilder;
use tokio::time::timeout;
use tracing::{error, info};
use tracing::{error, info, warn};
use crate::{
app::{build_router, build_spacetime_unavailable_router},
config::AppConfig,
state::{AppState, AppStateInitError},
tracking_outbox::TrackingOutbox,
};
const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024;
const AUTH_STORE_STARTUP_RESTORE_TIMEOUT: Duration = Duration::from_secs(8);
#[derive(Clone)]
struct ShutdownContext {
app_state: Option<AppState>,
tracking_outbox: Option<Arc<TrackingOutbox>>,
outbox_flush_timeout: Duration,
}
fn main() -> Result<(), io::Error> {
// Windows 本地调试下 Axum 路由树和启动恢复链较重,显式放大启动线程栈,避免 debug 构建在进入监听前栈溢出。
let server_thread = thread::Builder::new()
@@ -159,19 +169,33 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
let listen_backlog = config.listen_backlog;
let worker_threads = config.worker_threads;
let otel_enabled = config.otel_enabled;
let outbox_flush_timeout = config.shutdown_outbox_flush_timeout;
let listener = build_tcp_listener(bind_address, listen_backlog)?;
let router = match restore_app_state_for_startup(config).await {
let (router, shutdown_context) = match restore_app_state_for_startup(config).await {
Ok(state) => {
state.puzzle_gallery_cache().spawn_cleanup_task();
if let Some(outbox) = state.tracking_outbox() {
let tracking_outbox = state.tracking_outbox();
if let Some(outbox) = tracking_outbox.clone() {
outbox.spawn_worker();
}
build_router(state)
}
Err(AppStateInitError::DependencyUnavailable(message)) => {
build_spacetime_unavailable_router(message)
(
build_router(state.clone()),
ShutdownContext {
app_state: Some(state),
tracking_outbox,
outbox_flush_timeout,
},
)
}
Err(AppStateInitError::DependencyUnavailable(message)) => (
build_spacetime_unavailable_router(message),
ShutdownContext {
app_state: None,
tracking_outbox: None,
outbox_flush_timeout,
},
),
Err(error) => {
return Err(std::io::Error::other(format!(
"初始化应用状态失败:{error}"
@@ -187,7 +211,98 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
"api-server 已完成 tracing 初始化并开始监听"
);
axum::serve(listener, router).await
let result = axum::serve(listener, router)
.with_graceful_shutdown(shutdown_signal(shutdown_context.clone()))
.await;
finalize_shutdown(shutdown_context).await;
result
}
async fn shutdown_signal(context: ShutdownContext) {
let signal = wait_for_shutdown_signal().await;
if let Some(state) = context.app_state.as_ref() {
state.mark_not_ready();
}
info!(
signal,
"api-server 收到退出信号,已标记 readiness 不可用并开始排空 HTTP 请求"
);
}
async fn wait_for_shutdown_signal() -> &'static str {
#[cfg(unix)]
{
tokio::select! {
signal = wait_for_ctrl_c_signal() => signal,
signal = wait_for_sigterm_signal() => signal,
}
}
#[cfg(not(unix))]
{
wait_for_ctrl_c_signal().await
}
}
async fn wait_for_ctrl_c_signal() -> &'static str {
if let Err(error) = tokio::signal::ctrl_c().await {
error!(error = %error, "监听 SIGINT 失败,无法通过 Ctrl-C 触发优雅退出");
future::pending::<()>().await;
}
"sigint"
}
#[cfg(unix)]
async fn wait_for_sigterm_signal() -> &'static str {
let mut signal = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
{
Ok(signal) => signal,
Err(error) => {
error!(error = %error, "监听 SIGTERM 失败,无法通过 systemd terminate 触发优雅退出");
future::pending::<()>().await;
unreachable!("pending future never returns");
}
};
signal.recv().await;
"sigterm"
}
async fn finalize_shutdown(context: ShutdownContext) {
if let Some(state) = context.app_state.as_ref() {
state.mark_not_ready();
}
let Some(outbox) = context.tracking_outbox else {
return;
};
if context.outbox_flush_timeout.is_zero() {
warn!("api-server 退出时 tracking outbox flush timeout 为 0跳过主动 flush");
return;
}
let timeout_ms = context
.outbox_flush_timeout
.as_millis()
.min(u128::from(u64::MAX)) as u64;
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 tracking outbox flush 完成");
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
}
}
fn build_tcp_listener(

View File

@@ -1,15 +1,15 @@
use axum::{
Router, middleware,
routing::{get, post},
routing::{delete, get, post},
};
use crate::{
auth::require_bearer_auth,
bark_battle::{
create_bark_battle_draft, finish_bark_battle_run, generate_bark_battle_image_asset,
get_bark_battle_run, get_bark_battle_runtime_config, list_bark_battle_gallery,
list_bark_battle_works, publish_bark_battle_work, start_bark_battle_run,
update_bark_battle_draft_config,
create_bark_battle_draft, delete_bark_battle_work, finish_bark_battle_run,
generate_bark_battle_image_asset, get_bark_battle_run, get_bark_battle_runtime_config,
list_bark_battle_gallery, list_bark_battle_works, publish_bark_battle_work,
start_bark_battle_run, update_bark_battle_draft_config,
},
state::AppState,
};
@@ -51,6 +51,13 @@ pub fn router(state: AppState) -> Router<AppState> {
require_bearer_auth,
)),
)
.route(
"/api/runtime/bark-battle/works/{work_id}",
delete(delete_bark_battle_work).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
)),
)
.route(
"/api/runtime/bark-battle/gallery",
get(list_bark_battle_gallery),

View File

@@ -1,7 +1,12 @@
use axum::{Router, routing::get};
use crate::{health::health_check, state::AppState};
use crate::{
health::{health_check, readiness_check},
state::AppState,
};
pub fn router(_state: AppState) -> Router<AppState> {
Router::new().route("/healthz", get(health_check))
Router::new()
.route("/healthz", get(health_check))
.route("/readyz", get(readiness_check))
}

View File

@@ -1,14 +1,16 @@
use axum::{
Router, middleware,
routing::{get, post},
middleware,
routing::{delete, get, post},
Router,
};
use crate::{
auth::{require_bearer_auth, require_runtime_principal_auth},
jump_hop::{
create_jump_hop_session, execute_jump_hop_action, get_jump_hop_gallery_detail,
get_jump_hop_runtime_work, get_jump_hop_session, jump_hop_run_jump, list_jump_hop_gallery,
list_jump_hop_works, publish_jump_hop_work, restart_jump_hop_run, start_jump_hop_run,
create_jump_hop_session, delete_jump_hop_work, execute_jump_hop_action,
get_jump_hop_gallery_detail, get_jump_hop_leaderboard, get_jump_hop_runtime_work,
get_jump_hop_session, jump_hop_run_jump, list_jump_hop_gallery, list_jump_hop_works,
publish_jump_hop_work, restart_jump_hop_run, start_jump_hop_run,
},
state::AppState,
};
@@ -43,6 +45,13 @@ pub fn router(state: AppState) -> Router<AppState> {
require_bearer_auth,
)),
)
.route(
"/api/creation/jump-hop/works/{profile_id}",
delete(delete_jump_hop_work).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
)),
)
.route(
"/api/creation/jump-hop/works/{profile_id}/publish",
post(publish_jump_hop_work).route_layer(middleware::from_fn_with_state(
@@ -54,6 +63,13 @@ pub fn router(state: AppState) -> Router<AppState> {
"/api/runtime/jump-hop/works/{profile_id}",
get(get_jump_hop_runtime_work),
)
.route(
"/api/runtime/jump-hop/works/{profile_id}/leaderboard",
get(get_jump_hop_leaderboard).route_layer(middleware::from_fn_with_state(
state.clone(),
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/jump-hop/runs",
post(start_jump_hop_run).route_layer(middleware::from_fn_with_state(

View File

@@ -1,16 +1,16 @@
use axum::{
Router, middleware,
routing::{get, post},
routing::{delete, get, post},
};
use crate::{
auth::{require_bearer_auth, require_runtime_principal_auth},
state::AppState,
wooden_fish::{
checkpoint_wooden_fish_run, create_wooden_fish_session, execute_wooden_fish_action,
finish_wooden_fish_run, get_wooden_fish_gallery_detail, get_wooden_fish_runtime_work,
get_wooden_fish_session, list_wooden_fish_gallery, list_wooden_fish_works,
publish_wooden_fish_work, start_wooden_fish_run,
checkpoint_wooden_fish_run, create_wooden_fish_session, delete_wooden_fish_work,
execute_wooden_fish_action, finish_wooden_fish_run, get_wooden_fish_gallery_detail,
get_wooden_fish_runtime_work, get_wooden_fish_session, list_wooden_fish_gallery,
list_wooden_fish_works, publish_wooden_fish_work, start_wooden_fish_run,
},
};
@@ -44,6 +44,13 @@ pub fn router(state: AppState) -> Router<AppState> {
require_bearer_auth,
)),
)
.route(
"/api/creation/wooden-fish/works/{profile_id}",
delete(delete_wooden_fish_work).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
)),
)
.route(
"/api/creation/wooden-fish/works/{profile_id}/publish",
post(publish_wooden_fish_work).route_layer(middleware::from_fn_with_state(

View File

@@ -1,5 +1,6 @@
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashSet},
sync::{Mutex, OnceLock},
time::{Instant, SystemTime, UNIX_EPOCH},
};
@@ -130,6 +131,73 @@ const PUZZLE_UI_BACKGROUND_PROMPT_FALLBACK_MARKER: &str =
const PUZZLE_VECTOR_ENGINE_SQUARE_IMAGE_SIZE: &str = "1024x1024";
const PUZZLE_VECTOR_ENGINE_PORTRAIT_IMAGE_SIZE: &str = "1024x1536";
static PUZZLE_BACKGROUND_COMPILE_TASKS: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
fn puzzle_background_compile_tasks() -> &'static Mutex<HashSet<String>> {
PUZZLE_BACKGROUND_COMPILE_TASKS.get_or_init(|| Mutex::new(HashSet::new()))
}
fn try_register_puzzle_background_compile_task(session_id: &str) -> bool {
match puzzle_background_compile_tasks().lock() {
Ok(mut tasks) => tasks.insert(session_id.to_string()),
Err(error) => {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id,
error = %error,
"拼图后台生成任务注册表锁已损坏,允许本次任务继续"
);
true
}
}
}
fn unregister_puzzle_background_compile_task(session_id: &str) {
match puzzle_background_compile_tasks().lock() {
Ok(mut tasks) => {
tasks.remove(session_id);
}
Err(error) => {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id,
error = %error,
"拼图后台生成任务注册表解锁失败,忽略清理"
);
}
}
}
fn has_puzzle_cover_image_src(value: &Option<String>) -> bool {
value
.as_deref()
.map(str::trim)
.is_some_and(|value| !value.is_empty())
}
fn mark_puzzle_initial_generation_started_snapshot(
mut session: PuzzleAgentSessionRecord,
) -> PuzzleAgentSessionRecord {
session.stage = "image_refining".to_string();
session.progress_percent = session.progress_percent.max(88);
if let Some(draft) = session.draft.as_mut() {
let draft_needs_cover = !has_puzzle_cover_image_src(&draft.cover_image_src);
if let Some(primary_level) = draft.levels.first_mut() {
if !has_puzzle_cover_image_src(&primary_level.cover_image_src) {
primary_level.generation_status = "generating".to_string();
}
draft.generation_status = primary_level.generation_status.clone();
draft.candidates = primary_level.candidates.clone();
draft.selected_candidate_id = primary_level.selected_candidate_id.clone();
draft.cover_image_src = primary_level.cover_image_src.clone();
draft.cover_asset_id = primary_level.cover_asset_id.clone();
} else if draft_needs_cover {
draft.generation_status = "generating".to_string();
}
}
session
}
pub(crate) fn format_puzzle_reference_image_upload_bytes(bytes: usize) -> String {
format!("{:.1}MB", bytes as f64 / 1024.0 / 1024.0)
}

View File

@@ -1177,21 +1177,16 @@ pub(crate) fn find_puzzle_level_for_initial_asset_check<'a>(
.or_else(|| levels.first())
}
pub(crate) async fn compile_puzzle_draft_with_initial_cover(
pub(crate) async fn generate_puzzle_initial_cover_from_compiled_session(
state: &PuzzleApiState,
request_context: &RequestContext,
session_id: String,
compiled_session: PuzzleAgentSessionRecord,
owner_user_id: String,
prompt_text: Option<&str>,
reference_image_src: Option<&str>,
image_model: Option<&str>,
now: i64,
) -> Result<PuzzleAgentSessionRecord, AppError> {
let compiled_session = state
.spacetime_client()
.compile_puzzle_agent_draft(session_id.clone(), owner_user_id.clone(), now)
.await
.map_err(map_puzzle_compile_error)?;
let draft = compiled_session.draft.clone().ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": PUZZLE_AGENT_API_BASE_PROVIDER,
@@ -1419,7 +1414,7 @@ pub(crate) async fn compile_puzzle_draft_with_initial_cover(
match state
.spacetime_client()
.select_puzzle_cover_image(PuzzleSelectCoverImageRecordInput {
session_id,
session_id: compiled_session.session_id.clone(),
owner_user_id,
level_id: Some(target_level.level_id),
candidate_id: selected_candidate_id,

View File

@@ -623,7 +623,7 @@ pub async fn execute_puzzle_agent_action(
session_id,
owner_user_id,
error_message,
failed_at_micros: now,
failed_at_micros: current_utc_micros(),
})
.await;
if let Err(error) = result {
@@ -668,27 +668,128 @@ pub async fn execute_puzzle_agent_action(
Err(response) => return Err(response),
};
let session = if ai_redraw {
execute_billable_asset_operation_with_cost(
state.root_state(),
&owner_user_id,
"puzzle_initial_image",
&billing_asset_id,
PUZZLE_IMAGE_GENERATION_POINTS_COST,
async {
compile_puzzle_draft_with_initial_cover(
&state,
&request_context,
if !try_register_puzzle_background_compile_task(&compile_session_id) {
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %compile_session_id,
owner_user_id = %owner_user_id,
"拼图首图后台生成任务已存在,本次 action 直接返回生成中会话"
);
state
.spacetime_client()
.get_puzzle_agent_session(
compile_session_id.clone(),
owner_user_id.clone(),
)
.await
.map(mark_puzzle_initial_generation_started_snapshot)
.map_err(map_puzzle_client_error)
} else {
let compiled_session = state
.spacetime_client()
.compile_puzzle_agent_draft(
compile_session_id.clone(),
owner_user_id.clone(),
prompt_text,
primary_reference_image_src,
payload.image_model.as_deref(),
now,
)
.await
},
)
.await
.map_err(map_puzzle_compile_error);
match compiled_session {
Ok(compiled_session) => {
let response_session =
mark_puzzle_initial_generation_started_snapshot(
compiled_session.clone(),
);
let background_state = state.clone();
let background_request_context = request_context.clone();
let background_session_id = compile_session_id.clone();
let background_owner_user_id = owner_user_id.clone();
let background_prompt_text = prompt_text.map(str::to_string);
let background_reference_image_src =
primary_reference_image_src.map(str::to_string);
let background_image_model = payload.image_model.clone();
let background_billing_asset_id =
format!("{background_session_id}:compile_puzzle_draft");
tokio::spawn(async move {
let operation_owner_user_id =
background_owner_user_id.clone();
let background_root_state =
background_state.root_state().clone();
let operation_state = background_state.clone();
let result = execute_billable_asset_operation_with_cost(
&background_root_state,
&background_owner_user_id,
"puzzle_initial_image",
&background_billing_asset_id,
PUZZLE_IMAGE_GENERATION_POINTS_COST,
async move {
generate_puzzle_initial_cover_from_compiled_session(
&operation_state,
&background_request_context,
compiled_session,
operation_owner_user_id,
background_prompt_text.as_deref(),
background_reference_image_src.as_deref(),
background_image_model.as_deref(),
current_utc_micros(),
)
.await
},
)
.await;
match result {
Ok(session) => {
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %session.session_id,
owner_user_id = %background_owner_user_id,
"拼图首图后台生成任务完成"
);
}
Err(error) => {
let error_message = error.body_text();
let failure_result = background_state
.spacetime_client()
.mark_puzzle_draft_generation_failed(
PuzzleDraftCompileFailureRecordInput {
session_id: background_session_id.clone(),
owner_user_id: background_owner_user_id
.clone(),
error_message: error_message.clone(),
failed_at_micros: current_utc_micros(),
},
)
.await;
if let Err(mark_error) = failure_result {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %background_session_id,
owner_user_id = %background_owner_user_id,
message = %mark_error,
"拼图首图后台生成失败态回写失败"
);
}
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %background_session_id,
owner_user_id = %background_owner_user_id,
message = %error_message,
"拼图首图后台生成任务失败"
);
}
}
unregister_puzzle_background_compile_task(
&background_session_id,
);
});
Ok(response_session)
}
Err(error) => {
unregister_puzzle_background_compile_task(&compile_session_id);
Err(error)
}
}
}
} else {
compile_puzzle_draft_with_uploaded_cover(
&state,
@@ -716,7 +817,7 @@ pub async fn execute_puzzle_agent_action(
"compile_puzzle_draft",
"首关拼图草稿",
if ai_redraw {
"已编译首关草稿、并行生成首关画面和 UI 背景并写入正式草稿"
"已编译首关草稿,并启动首关画面和 UI 资产后台生成"
} else {
"已编译首关草稿,并直接应用上传图片、生成 UI 背景为第一关图片。"
},

View File

@@ -980,6 +980,41 @@ fn puzzle_work_summary_response_keeps_levels_for_shelf_cover() {
);
}
#[test]
fn puzzle_compile_started_snapshot_marks_primary_level_generating() {
let mut session = PuzzleAgentSessionRecord {
session_id: "puzzle-session-1".to_string(),
seed_text: "画面描述:一只猫在雨夜灯牌下回头。".to_string(),
current_turn: 1,
progress_percent: 88,
stage: "draft_ready".to_string(),
anchor_pack: test_puzzle_anchor_pack_record(),
draft: Some(test_puzzle_draft_record()),
messages: Vec::new(),
last_assistant_reply: None,
published_profile_id: None,
suggested_actions: Vec::new(),
result_preview: None,
updated_at: "2024-01-01T00:00:00Z".to_string(),
};
{
let draft = session.draft.as_mut().expect("draft");
draft.generation_status = "idle".to_string();
draft.levels[0].generation_status = "idle".to_string();
draft.levels[0].cover_image_src = None;
draft.levels[0].cover_asset_id = None;
}
let session = mark_puzzle_initial_generation_started_snapshot(session);
let draft = session.draft.expect("draft");
assert_eq!(session.stage, "image_refining");
assert_eq!(draft.generation_status, "generating");
assert_eq!(draft.levels[0].generation_status, "generating");
assert!(draft.cover_image_src.is_none());
assert!(draft.levels[0].cover_image_src.is_none());
}
#[test]
fn puzzle_ui_background_prompt_keeps_generated_slots_out_of_background() {
let prompt = build_puzzle_ui_background_request_prompt_for_test("雨夜猫街", "雨夜猫街主题背景");

View File

@@ -2,7 +2,10 @@ use std::{
collections::HashMap,
error::Error,
fmt,
sync::{Arc, Mutex},
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
};
use axum::extract::FromRef;
@@ -229,6 +232,7 @@ pub struct AppStateInner {
// 配置会在后续中间件、路由和平台适配接入时逐步消费。
#[allow(dead_code)]
pub config: AppConfig,
ready: AtomicBool,
http_request_permit_pools: HttpRequestPermitPools,
auth_jwt_config: JwtConfig,
admin_runtime: Option<AdminRuntime>,
@@ -399,6 +403,7 @@ impl AppState {
Ok(Self(Arc::new(AppStateInner {
config,
ready: AtomicBool::new(true),
http_request_permit_pools,
auth_jwt_config,
admin_runtime,
@@ -447,6 +452,14 @@ impl AppState {
self.http_request_permit_pools.clone()
}
pub fn is_ready(&self) -> bool {
self.ready.load(Ordering::Acquire)
}
pub fn mark_not_ready(&self) {
self.ready.store(false, Ordering::Release);
}
pub async fn upsert_creation_entry_type_config(
&self,
input: module_runtime::CreationEntryTypeAdminUpsertInput,

View File

@@ -159,6 +159,16 @@ impl TrackingOutbox {
});
}
pub async fn flush_for_shutdown(&self) -> Result<(), TrackingOutboxError> {
{
let mut inner = self.inner.lock().await;
self.ensure_initialized_locked(&mut inner).await?;
self.seal_active_locked(&mut inner, "shutdown").await?;
}
self.flush_sealed_files_once().await
}
async fn seal_active_if_due(&self) -> Result<(), TrackingOutboxError> {
let mut inner = self.inner.lock().await;
self.ensure_initialized_locked(&mut inner).await?;
@@ -176,7 +186,11 @@ impl TrackingOutbox {
crate::telemetry::update_tracking_outbox_pending_files(sealed_files.len());
for path in sealed_files {
let started_at = Instant::now();
let metadata = fs::metadata(&path).await?;
let metadata = match fs::metadata(&path).await {
Ok(metadata) => metadata,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
Err(error) => return Err(error.into()),
};
let file_bytes = metadata.len();
let events = match read_outbox_events(&path).await {
Ok(events) => events,
@@ -203,7 +217,11 @@ impl TrackingOutbox {
match self.spacetime_client.record_tracking_events(events).await {
Ok(accepted_count) => {
fs::remove_file(&path).await?;
match fs::remove_file(&path).await {
Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
Err(error) => return Err(error.into()),
}
self.subtract_total_bytes(file_bytes).await;
crate::telemetry::record_tracking_outbox_flush(
started_at.elapsed(),
@@ -596,6 +614,34 @@ mod tests {
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn shutdown_flush_seals_active_file_for_later_retry() {
let dir = test_dir("shutdown");
let outbox = test_outbox(dir.clone(), 500, 1024 * 1024);
outbox.enqueue(sample_event("event-1")).await.unwrap();
let result = outbox.flush_for_shutdown().await;
assert!(
matches!(result, Err(TrackingOutboxError::Spacetime(_))),
"missing test SpacetimeDB should keep sealed file for retry"
);
assert!(!dir.join(ACTIVE_FILE_NAME).exists());
let sealed_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| {
entry
.file_name()
.to_str()
.is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX))
})
.count();
assert_eq!(sealed_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn directory_size_excludes_quarantined_corrupt_files() {
let dir = test_dir("directory-size");

View File

@@ -229,6 +229,33 @@ pub async fn list_wooden_fish_works(
))
}
pub async fn delete_wooden_fish_work(
State(state): State<AppState>,
Path(profile_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &profile_id, "profileId")?;
let works = state
.spacetime_client()
.delete_wooden_fish_work(profile_id, authenticated.claims().user_id().to_string())
.await
.map_err(|error| {
wooden_fish_error_response(
&request_context,
WOODEN_FISH_CREATION_PROVIDER,
map_wooden_fish_client_error(error),
)
})?;
Ok(json_success_body(
Some(&request_context),
WoodenFishWorksResponse {
items: works.into_iter().map(|work| work.summary).collect(),
},
))
}
pub async fn get_wooden_fish_runtime_work(
State(state): State<AppState>,
Path(profile_id): Path<String>,