Merge remote-tracking branch 'origin/master' into codex/editor-asset-library

# Conflicts:
#	server-rs/crates/spacetime-client/src/lib.rs
#	server-rs/crates/spacetime-client/src/mapper.rs
#	server-rs/crates/spacetime-client/src/module_bindings.rs
#	src/components/platform-entry/PlatformEntryFlowShellImpl.tsx
This commit is contained in:
2026-06-13 16:52:03 +08:00
464 changed files with 51434 additions and 13822 deletions

View File

@@ -56,7 +56,7 @@ shared-kernel = { workspace = true }
shared-logging = { workspace = true }
socket2 = { workspace = true }
spacetime-client = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal", "process"] }
tokio-stream = { workspace = true }
futures-util = { workspace = true }
time = { workspace = true, features = ["formatting"] }

View File

@@ -45,6 +45,7 @@ pub fn build_router(state: AppState) -> Router {
.merge(modules::assets::router(state.clone()))
.merge(modules::editor_project::router(state.clone()))
.merge(modules::platform::router(state.clone()))
.merge(modules::external_generation::router(state.clone()))
.merge(modules::play_flow::router(state.clone()))
.route(
"/api/profile/recharge/wechat/notify",

View File

@@ -52,7 +52,7 @@ where
match operation.await {
Ok(value) => Ok(value),
Err(error) => {
if points_consumed {
if points_consumed && should_refund_asset_operation_error(&error) {
refund_asset_operation_points(
state,
owner_user_id,
@@ -67,6 +67,20 @@ where
}
}
pub(crate) fn should_refund_asset_operation_error(error: &AppError) -> bool {
let message = error.body_text();
// 中文注释worker lease guard 拒绝表示当前进程已失去队列写权限;
// 这类 stale worker 失败不能补偿退款,否则可能冲掉后续合法 worker 的同一账本扣费。
!(message.contains("external_generation_job")
&& (message.contains("lease")
|| message.contains("worker")
|| message.contains("job_kind")
|| message.contains("source_")
|| message.contains("owner_user_id")
|| message.contains("不存在")
|| message.contains("不是 running 状态")))
}
/// 资产操作统一预扣泥点;扣费流水 ID 由业务资源 ID 参与构造,保证重试幂等。
async fn consume_asset_operation_points(
state: &AppState,
@@ -249,4 +263,31 @@ mod tests {
&SpacetimeClientError::Procedure("泥点余额不足".to_string()),
));
}
#[test]
fn asset_operation_billing_does_not_refund_stale_worker_lease_errors() {
let stale_error = AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "spacetimedb",
"message": "external_generation_job lease 已过期",
}));
let completed_job_error =
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "spacetimedb",
"message": "external_generation_job 当前不是 running 状态",
}));
let missing_job_error =
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "spacetimedb",
"message": "external_generation_job 不存在",
}));
let ordinary_error = AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "vector-engine",
"message": "图片生成失败",
}));
assert!(!should_refund_asset_operation_error(&stale_error));
assert!(!should_refund_asset_operation_error(&completed_job_error));
assert!(!should_refund_asset_operation_error(&missing_job_error));
assert!(should_refund_asset_operation_error(&ordinary_error));
}
}

View File

@@ -22,6 +22,19 @@ pub struct AppConfig {
pub bind_port: u16,
pub listen_backlog: i32,
pub worker_threads: Option<usize>,
pub process_role: ProcessRole,
pub external_generation_mode: ExternalGenerationMode,
pub external_generation_worker_id: String,
pub external_generation_worker_concurrency: usize,
pub external_generation_worker_poll_interval: Duration,
pub external_generation_worker_lease: Duration,
pub external_generation_controller_min_workers: usize,
pub external_generation_controller_max_workers: usize,
pub external_generation_controller_target_jobs_per_worker: usize,
pub external_generation_controller_poll_interval: Duration,
pub external_generation_controller_scale_down_idle_rounds: u32,
pub external_generation_controller_service_template: String,
pub external_generation_controller_dry_run: bool,
pub max_concurrent_requests: Option<usize>,
pub gallery_max_concurrent_requests: Option<usize>,
pub detail_max_concurrent_requests: Option<usize>,
@@ -171,6 +184,56 @@ pub struct AppConfig {
pub slow_request_threshold_ms: u64,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ProcessRole {
Api,
ExternalGenerationWorker,
ExternalGenerationController,
All,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExternalGenerationMode {
Inline,
Queue,
}
impl ExternalGenerationMode {
pub fn as_str(self) -> &'static str {
match self {
Self::Inline => "inline",
Self::Queue => "queue",
}
}
pub fn is_inline(self) -> bool {
matches!(self, Self::Inline)
}
}
impl ProcessRole {
pub fn as_str(self) -> &'static str {
match self {
Self::Api => "api",
Self::ExternalGenerationWorker => "external-generation-worker",
Self::ExternalGenerationController => "external-generation-controller",
Self::All => "all",
}
}
pub fn runs_http(self) -> bool {
matches!(self, Self::Api | Self::All)
}
pub fn runs_external_generation_worker(self) -> bool {
matches!(self, Self::ExternalGenerationWorker | Self::All)
}
pub fn runs_external_generation_controller(self) -> bool {
matches!(self, Self::ExternalGenerationController)
}
}
impl Default for AppConfig {
fn default() -> Self {
Self {
@@ -178,6 +241,20 @@ impl Default for AppConfig {
bind_port: 3000,
listen_backlog: 1024,
worker_threads: None,
process_role: ProcessRole::Api,
external_generation_mode: ExternalGenerationMode::Queue,
external_generation_worker_id: default_external_generation_worker_id(),
external_generation_worker_concurrency: 2,
external_generation_worker_poll_interval: Duration::from_millis(2_000),
external_generation_worker_lease: Duration::from_secs(3_600),
external_generation_controller_min_workers: 1,
external_generation_controller_max_workers: 8,
external_generation_controller_target_jobs_per_worker: 2,
external_generation_controller_poll_interval: Duration::from_millis(10_000),
external_generation_controller_scale_down_idle_rounds: 6,
external_generation_controller_service_template:
"genarrative-external-generation-worker@{}.service".to_string(),
external_generation_controller_dry_run: false,
max_concurrent_requests: None,
gallery_max_concurrent_requests: None,
detail_max_concurrent_requests: None,
@@ -374,6 +451,78 @@ impl AppConfig {
if let Some(worker_threads) = read_first_usize_env(&["GENARRATIVE_API_WORKER_THREADS"]) {
config.worker_threads = Some(worker_threads);
}
if let Some(process_role) = read_first_process_role_env(&["GENARRATIVE_PROCESS_ROLE"]) {
config.process_role = process_role;
}
if let Some(external_generation_mode) =
read_first_external_generation_mode_env(&["GENARRATIVE_EXTERNAL_GENERATION_MODE"])
{
config.external_generation_mode = external_generation_mode;
}
if let Some(worker_id) =
read_first_non_empty_env(&["GENARRATIVE_EXTERNAL_GENERATION_WORKER_ID"])
{
config.external_generation_worker_id = worker_id;
}
if let Some(concurrency) =
read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_WORKER_CONCURRENCY"])
{
config.external_generation_worker_concurrency = concurrency.max(1);
}
if let Some(poll_interval_ms) = read_first_positive_u64_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_WORKER_POLL_INTERVAL_MS",
]) {
config.external_generation_worker_poll_interval =
Duration::from_millis(poll_interval_ms);
}
if let Some(lease_seconds) = read_first_duration_seconds_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_WORKER_LEASE_SECONDS",
]) {
config.external_generation_worker_lease = Duration::from_secs(lease_seconds.max(1));
}
if let Some(min_workers) =
read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MIN_WORKERS"])
{
config.external_generation_controller_min_workers = min_workers;
}
if let Some(max_workers) =
read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MAX_WORKERS"])
{
config.external_generation_controller_max_workers = max_workers;
}
if config.external_generation_controller_max_workers
< config.external_generation_controller_min_workers
{
config.external_generation_controller_max_workers =
config.external_generation_controller_min_workers;
}
if let Some(target_jobs_per_worker) = read_first_usize_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_TARGET_JOBS_PER_WORKER",
]) {
config.external_generation_controller_target_jobs_per_worker =
target_jobs_per_worker.max(1);
}
if let Some(poll_interval_ms) = read_first_positive_u64_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_POLL_INTERVAL_MS",
]) {
config.external_generation_controller_poll_interval =
Duration::from_millis(poll_interval_ms);
}
if let Some(idle_rounds) = read_first_u32_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SCALE_DOWN_IDLE_ROUNDS",
]) {
config.external_generation_controller_scale_down_idle_rounds = idle_rounds;
}
if let Some(service_template) = read_first_non_empty_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SERVICE_TEMPLATE",
]) {
config.external_generation_controller_service_template = service_template;
}
if let Some(dry_run) =
read_first_bool_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_DRY_RUN"])
{
config.external_generation_controller_dry_run = dry_run;
}
if let Some(max_concurrent_requests) =
read_first_usize_env(&["GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"])
{
@@ -1053,6 +1202,22 @@ fn read_first_llm_provider_env(keys: &[&str]) -> Option<LlmProvider> {
})
}
fn read_first_process_role_env(keys: &[&str]) -> Option<ProcessRole> {
keys.iter().find_map(|key| {
env::var(key)
.ok()
.and_then(|value| parse_process_role(&value))
})
}
fn read_first_external_generation_mode_env(keys: &[&str]) -> Option<ExternalGenerationMode> {
keys.iter().find_map(|key| {
env::var(key)
.ok()
.and_then(|value| parse_external_generation_mode(&value))
})
}
fn read_first_positive_u32_env(keys: &[&str]) -> Option<u32> {
keys.iter().find_map(|key| {
env::var(key)
@@ -1100,6 +1265,49 @@ fn read_first_u8_env(keys: &[&str]) -> Option<u8> {
.find_map(|key| env::var(key).ok().and_then(|value| parse_u8(&value)))
}
fn default_external_generation_worker_id() -> String {
let host = env::var("HOSTNAME")
.or_else(|_| env::var("COMPUTERNAME"))
.unwrap_or_else(|_| "local".to_string());
format!("{}-{}", host.trim(), std::process::id())
}
fn parse_process_role(value: &str) -> Option<ProcessRole> {
match trim_quoted_env_value(value).to_ascii_lowercase().as_str() {
"api" => Some(ProcessRole::Api),
"external-generation-worker" | "external_generation_worker" | "worker" => {
Some(ProcessRole::ExternalGenerationWorker)
}
"external-generation-controller" | "external_generation_controller" | "controller" => {
Some(ProcessRole::ExternalGenerationController)
}
"all" => Some(ProcessRole::All),
_ => None,
}
}
fn parse_external_generation_mode(value: &str) -> Option<ExternalGenerationMode> {
match trim_quoted_env_value(value).to_ascii_lowercase().as_str() {
"inline" | "sync" | "synchronous" => Some(ExternalGenerationMode::Inline),
"queue" | "queued" | "worker" | "async" | "asynchronous" => {
Some(ExternalGenerationMode::Queue)
}
_ => None,
}
}
fn trim_quoted_env_value(raw: &str) -> &str {
let raw = raw.trim();
raw.strip_prefix('"')
.and_then(|value| value.strip_suffix('"'))
.or_else(|| {
raw.strip_prefix('\'')
.and_then(|value| value.strip_suffix('\''))
})
.unwrap_or(raw)
.trim()
}
fn read_first_positive_u16_env(keys: &[&str]) -> Option<u16> {
keys.iter().find_map(|key| {
env::var(key)
@@ -1220,7 +1428,8 @@ fn parse_positive_u16(raw: &str) -> Option<u16> {
#[cfg(test)]
mod tests {
use super::{
AppConfig, DEFAULT_VECTOR_ENGINE_IMAGE_REQUEST_TIMEOUT_MS, LlmProvider, parse_bool,
AppConfig, DEFAULT_VECTOR_ENGINE_IMAGE_REQUEST_TIMEOUT_MS, ExternalGenerationMode,
LlmProvider, ProcessRole, parse_bool, parse_external_generation_mode, parse_process_role,
};
use std::sync::{Mutex, OnceLock};
@@ -1262,6 +1471,91 @@ mod tests {
assert_eq!(parse_bool("'off'"), Some(false));
}
#[test]
fn process_role_controls_http_and_external_generation_worker_roles() {
assert_eq!(parse_process_role("api"), Some(ProcessRole::Api));
assert_eq!(
parse_process_role("\"external-generation-worker\""),
Some(ProcessRole::ExternalGenerationWorker)
);
assert_eq!(
parse_process_role("'external_generation_worker'"),
Some(ProcessRole::ExternalGenerationWorker)
);
assert_eq!(
parse_process_role("worker"),
Some(ProcessRole::ExternalGenerationWorker)
);
assert_eq!(
parse_process_role("controller"),
Some(ProcessRole::ExternalGenerationController)
);
assert_eq!(
parse_process_role("'external_generation_controller'"),
Some(ProcessRole::ExternalGenerationController)
);
assert_eq!(parse_process_role("all"), Some(ProcessRole::All));
assert_eq!(parse_process_role("unknown"), None);
assert!(ProcessRole::Api.runs_http());
assert!(!ProcessRole::Api.runs_external_generation_worker());
assert!(!ProcessRole::Api.runs_external_generation_controller());
assert!(!ProcessRole::ExternalGenerationWorker.runs_http());
assert!(ProcessRole::ExternalGenerationWorker.runs_external_generation_worker());
assert!(!ProcessRole::ExternalGenerationWorker.runs_external_generation_controller());
assert!(!ProcessRole::ExternalGenerationController.runs_http());
assert!(!ProcessRole::ExternalGenerationController.runs_external_generation_worker());
assert!(ProcessRole::ExternalGenerationController.runs_external_generation_controller());
assert!(ProcessRole::All.runs_http());
assert!(ProcessRole::All.runs_external_generation_worker());
assert!(!ProcessRole::All.runs_external_generation_controller());
}
#[test]
fn external_generation_mode_parses_inline_and_queue_aliases() {
assert_eq!(
parse_external_generation_mode("inline"),
Some(ExternalGenerationMode::Inline)
);
assert_eq!(
parse_external_generation_mode("'sync'"),
Some(ExternalGenerationMode::Inline)
);
assert_eq!(
parse_external_generation_mode("\"queue\""),
Some(ExternalGenerationMode::Queue)
);
assert_eq!(
parse_external_generation_mode("worker"),
Some(ExternalGenerationMode::Queue)
);
assert_eq!(parse_external_generation_mode("unknown"), None);
assert!(ExternalGenerationMode::Inline.is_inline());
assert!(!ExternalGenerationMode::Queue.is_inline());
}
#[test]
fn from_env_reads_external_generation_mode() {
let _guard = ENV_LOCK
.get_or_init(|| Mutex::new(()))
.lock()
.expect("env lock");
unsafe {
std::env::set_var("GENARRATIVE_EXTERNAL_GENERATION_MODE", "inline");
}
let config = AppConfig::from_env();
assert_eq!(
config.external_generation_mode,
ExternalGenerationMode::Inline
);
unsafe {
std::env::remove_var("GENARRATIVE_EXTERNAL_GENERATION_MODE");
}
}
#[test]
fn from_env_reads_sms_enabled_when_shell_value_keeps_quotes() {
let _guard = ENV_LOCK

View File

@@ -0,0 +1,108 @@
use axum::{
Json,
extract::{Extension, Path, State},
http::StatusCode,
response::Response,
};
use serde_json::json;
use shared_contracts::external_generation::{
ExternalGenerationJobStatus, ExternalGenerationJobStatusRecord,
ExternalGenerationJobStatusResponse, ExternalGenerationQueueOverview,
ExternalGenerationQueueOverviewResponse,
};
use spacetime_client::{
ExternalGenerationJobGetRecordInput, ExternalGenerationJobRecord,
ExternalGenerationQueueStatsRecord, SpacetimeClientError,
};
use crate::{
api_response::json_success_body, auth::AuthenticatedAccessToken, http_error::AppError,
request_context::RequestContext, state::AppState,
};
const EXTERNAL_GENERATION_PROVIDER: &str = "external_generation";
pub async fn get_external_generation_queue_overview(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
) -> Result<Json<serde_json::Value>, Response> {
let stats = state
.spacetime_client()
.get_external_generation_queue_stats()
.await
.map_err(|error| external_generation_error_response(&request_context, error))?;
Ok(json_success_body(
Some(&request_context),
ExternalGenerationQueueOverviewResponse {
overview: map_external_generation_queue_overview(stats),
},
))
}
pub async fn get_external_generation_job_status(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Path(job_id): Path<String>,
) -> Result<Json<serde_json::Value>, Response> {
let owner_user_id = authenticated.claims().user_id().to_string();
let job = state
.spacetime_client()
.get_external_generation_job(ExternalGenerationJobGetRecordInput {
job_id,
owner_user_id,
})
.await
.map_err(|error| external_generation_error_response(&request_context, error))?;
Ok(json_success_body(
Some(&request_context),
ExternalGenerationJobStatusResponse {
job: map_external_generation_job_status(job),
},
))
}
fn map_external_generation_queue_overview(
stats: ExternalGenerationQueueStatsRecord,
) -> ExternalGenerationQueueOverview {
ExternalGenerationQueueOverview {
pending_count: stats.pending_count,
running_count: stats.running_active_count,
updated_at_micros: stats.now_micros,
}
}
fn map_external_generation_job_status(
job: ExternalGenerationJobRecord,
) -> ExternalGenerationJobStatusRecord {
let (status, phase_detail, progress) = match job.status.as_str() {
"completed" => (ExternalGenerationJobStatus::Completed, "生成已完成。", 100),
"running" => (ExternalGenerationJobStatus::Running, "正在生成。", 35),
"failed" => (ExternalGenerationJobStatus::Failed, "生成失败。", 0),
_ => (ExternalGenerationJobStatus::Queued, "排队中。", 8),
};
ExternalGenerationJobStatusRecord {
operation_id: job.job_id,
status,
phase_label: job.request_label,
phase_detail: phase_detail.to_string(),
progress,
error: job.last_error_message,
updated_at_micros: job.updated_at_micros,
}
}
fn external_generation_error_response(
request_context: &RequestContext,
error: SpacetimeClientError,
) -> Response {
AppError::from_status(StatusCode::BAD_GATEWAY)
.with_details(json!({
"provider": EXTERNAL_GENERATION_PROVIDER,
"message": error.to_string(),
}))
.into_response_with_context(Some(request_context))
}

View File

@@ -0,0 +1,750 @@
use std::{future::Future, io, pin::Pin, time::Duration};
use axum::extract::FromRef;
use serde_json::json;
use shared_kernel::offset_datetime_to_unix_micros;
use spacetime_client::{
ExternalGenerationJobClaimRecordInput, ExternalGenerationJobCompleteRecordInput,
ExternalGenerationJobFailRecordInput, ExternalGenerationJobRecord,
ExternalGenerationJobRenewLeaseRecordInput,
};
use tokio::{
task::JoinSet,
time::{Instant, sleep},
};
use tracing::{error, info, warn};
use crate::{
jump_hop::{
JUMP_HOP_COMPILE_DRAFT_JOB_KIND, JumpHopCompileDraftWorkerPayload,
execute_jump_hop_compile_draft_worker_job,
},
puzzle::{
ExternalGenerationWriteLeaseGuard, PuzzleCompileDraftWorkerPayload,
PuzzleGenerateImagesWorkerPayload, PuzzleGenerateUiBackgroundWorkerPayload,
execute_puzzle_compile_draft_worker_job, execute_puzzle_generate_images_worker_job,
execute_puzzle_generate_ui_background_worker_job, release_puzzle_compile_background_claim,
},
puzzle_clear::{
PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND, PuzzleClearCompileDraftWorkerPayload,
execute_puzzle_clear_compile_draft_worker_job,
},
request_context::RequestContext,
state::{AppState, PuzzleApiState},
wooden_fish::{
WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND, WoodenFishGenerateImageAssetsWorkerPayload,
execute_wooden_fish_generate_image_assets_worker_job,
},
};
pub(crate) const PUZZLE_COMPILE_DRAFT_JOB_KIND: &str = "puzzle_compile_draft";
pub(crate) const PUZZLE_GENERATE_IMAGES_JOB_KIND: &str = "puzzle_generate_images";
pub(crate) const PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND: &str = "puzzle_generate_ui_background";
pub(crate) async fn run_external_generation_worker(state: AppState) -> Result<(), io::Error> {
let worker_id = state.config.external_generation_worker_id.clone();
let concurrency = state.config.external_generation_worker_concurrency.max(1);
let poll_interval = state.config.external_generation_worker_poll_interval;
let lease = state.config.external_generation_worker_lease;
let mut tasks = JoinSet::new();
let mut shutdown = external_generation_worker_shutdown_signal();
info!(
worker_id,
concurrency,
poll_interval_ms = poll_interval.as_millis(),
lease_seconds = lease.as_secs(),
"external generation worker 已启动"
);
loop {
while tasks.len() >= concurrency {
if await_worker_task_or_shutdown(&mut tasks, &mut shutdown).await {
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
}
let available = concurrency.saturating_sub(tasks.len()).max(1);
let now_micros = current_utc_micros();
let lease_expires_at_micros = now_micros.saturating_add(duration_micros_i64(lease));
let claim_jobs = state.spacetime_client().claim_external_generation_jobs(
ExternalGenerationJobClaimRecordInput {
worker_id: worker_id.clone(),
limit: available.min(u32::MAX as usize) as u32,
lease_expires_at_micros,
claimed_at_micros: now_micros,
},
);
tokio::pin!(claim_jobs);
let jobs = match tokio::select! {
_ = shutdown.as_mut() => {
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
result = &mut claim_jobs => result
} {
Ok(jobs) => jobs,
Err(error) => {
error!(error = %error, "领取外部生成任务失败,等待下一轮重试");
if await_one_task_or_sleep_or_shutdown(
&mut tasks,
sleep(poll_interval),
&mut shutdown,
)
.await
{
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
continue;
}
};
if jobs.is_empty() {
if await_one_task_or_sleep_or_shutdown(&mut tasks, sleep(poll_interval), &mut shutdown)
.await
{
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
continue;
}
for job in jobs {
let state = state.clone();
let worker_id = worker_id.clone();
tasks.spawn(async move {
if let Err(error) =
process_external_generation_job(state, worker_id, lease, job).await
{
error!(error = %error, "external generation worker 执行任务失败");
}
});
}
}
}
type ExternalGenerationShutdownSignal = Pin<Box<dyn Future<Output = ()> + Send>>;
fn external_generation_worker_shutdown_signal() -> ExternalGenerationShutdownSignal {
Box::pin(async {
wait_for_external_generation_worker_shutdown_signal().await;
})
}
#[cfg(unix)]
async fn wait_for_external_generation_worker_shutdown_signal() {
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).ok();
tokio::select! {
result = tokio::signal::ctrl_c() => {
if let Err(error) = result {
warn!(error = %error, "external generation worker 监听 SIGINT 失败");
}
}
_ = async {
if let Some(sigterm) = sigterm.as_mut() {
sigterm.recv().await;
} else {
std::future::pending::<()>().await;
}
} => {}
}
}
#[cfg(not(unix))]
async fn wait_for_external_generation_worker_shutdown_signal() {
if let Err(error) = tokio::signal::ctrl_c().await {
warn!(error = %error, "external generation worker 监听 Ctrl-C 失败");
}
}
async fn await_worker_task(tasks: &mut JoinSet<()>) {
if let Some(result) = tasks.join_next().await
&& let Err(error) = result
{
error!(error = %error, "external generation worker 子任务 panic");
}
}
async fn await_worker_task_or_shutdown(
tasks: &mut JoinSet<()>,
shutdown: &mut ExternalGenerationShutdownSignal,
) -> bool {
tokio::select! {
_ = shutdown.as_mut() => true,
_ = await_worker_task(tasks) => false,
}
}
async fn await_one_task_or_sleep_or_shutdown(
tasks: &mut JoinSet<()>,
sleeper: impl Future<Output = ()>,
shutdown: &mut ExternalGenerationShutdownSignal,
) -> bool {
tokio::pin!(sleeper);
if tasks.is_empty() {
tokio::select! {
_ = shutdown.as_mut() => true,
_ = &mut sleeper => false,
}
} else {
tokio::select! {
_ = shutdown.as_mut() => true,
_ = &mut sleeper => false,
result = tasks.join_next() => {
if let Some(Err(error)) = result {
error!(error = %error, "external generation worker 子任务 panic");
}
false
}
}
}
}
async fn drain_external_generation_worker_tasks(tasks: &mut JoinSet<()>) {
info!(
in_flight_jobs = tasks.len(),
"external generation worker 收到停机信号,停止领取新任务并等待当前任务完成"
);
while !tasks.is_empty() {
await_worker_task(tasks).await;
}
info!("external generation worker 已完成优雅停机");
}
async fn process_external_generation_job(
state: AppState,
worker_id: String,
lease: Duration,
job: ExternalGenerationJobRecord,
) -> Result<(), String> {
let heartbeat_interval = external_generation_worker_heartbeat_interval(lease);
let work = process_external_generation_job_once(state.clone(), worker_id.clone(), job.clone());
tokio::pin!(work);
let heartbeat = sleep(heartbeat_interval);
tokio::pin!(heartbeat);
loop {
tokio::select! {
biased;
result = &mut work => return result,
_ = &mut heartbeat => {
renew_job_lease(&state, &worker_id, &job, lease).await?;
heartbeat.as_mut().reset(Instant::now() + heartbeat_interval);
}
}
}
}
async fn process_external_generation_job_once(
state: AppState,
worker_id: String,
job: ExternalGenerationJobRecord,
) -> Result<(), String> {
match job.job_kind.as_str() {
PUZZLE_COMPILE_DRAFT_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleCompileDraftWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼图生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
let puzzle_state = PuzzleApiState::from_ref(&state);
let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?;
match execute_puzzle_compile_draft_worker_job(
&puzzle_state,
&request_context,
payload.clone(),
write_guard,
)
.await
{
Ok(session) => {
let result = complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"progressPercent": session.progress_percent,
})
.to_string(),
),
)
.await;
if result.is_ok() {
release_puzzle_compile_background_claim(&puzzle_state, &payload);
}
result
}
Err(error) => {
let message = error.body_text();
let should_release_claim = error.should_fail_queue_job();
let result = fail_queue_job_after_worker_error(
&state, &worker_id, &job, &error, &message,
)
.await;
if result.is_ok() && should_release_claim {
release_puzzle_compile_background_claim(&puzzle_state, &payload);
}
result?;
Err(message)
}
}
}
PUZZLE_GENERATE_IMAGES_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleGenerateImagesWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼图关卡图片生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
let puzzle_state = PuzzleApiState::from_ref(&state);
let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?;
match execute_puzzle_generate_images_worker_job(
&puzzle_state,
&request_context,
payload,
write_guard,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"progressPercent": session.progress_percent,
})
.to_string(),
),
)
.await
}
Err(error) => {
let message = error.body_text();
fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message)
.await?;
Err(message)
}
}
}
PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleGenerateUiBackgroundWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼图 UI 背景图生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
let puzzle_state = PuzzleApiState::from_ref(&state);
let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?;
match execute_puzzle_generate_ui_background_worker_job(
&puzzle_state,
&request_context,
payload,
write_guard,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"progressPercent": session.progress_percent,
})
.to_string(),
),
)
.await
}
Err(error) => {
let message = error.body_text();
fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message)
.await?;
Err(message)
}
}
}
JUMP_HOP_COMPILE_DRAFT_JOB_KIND => {
let payload = match serde_json::from_str::<JumpHopCompileDraftWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("跳一跳生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
match execute_jump_hop_compile_draft_worker_job(&state, &request_context, payload).await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"status": session.status,
})
.to_string(),
),
)
.await
}
Err(response) => {
let message = response_error_message(response).await;
fail_job(&state, &worker_id, &job, message.clone()).await?;
Err(message)
}
}
}
PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleClearCompileDraftWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼消消生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
match execute_puzzle_clear_compile_draft_worker_job(&state, &request_context, payload)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"status": session.status,
})
.to_string(),
),
)
.await
}
Err(response) => {
let message = response_error_message(response).await;
fail_job(&state, &worker_id, &job, message.clone()).await?;
Err(message)
}
}
}
WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND => {
let payload = match serde_json::from_str::<WoodenFishGenerateImageAssetsWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("敲木鱼图片生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
match execute_wooden_fish_generate_image_assets_worker_job(
&state,
&request_context,
payload,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"status": session.status,
})
.to_string(),
),
)
.await
}
Err(response) => {
let message = response_error_message(response).await;
fail_job(&state, &worker_id, &job, message.clone()).await?;
Err(message)
}
}
}
unknown => {
warn!(
job_id = job.job_id,
job_kind = unknown,
"external generation worker 收到暂不支持的任务类型"
);
fail_job(
&state,
&worker_id,
&job,
format!("暂不支持的外部生成任务类型:{unknown}"),
)
.await
}
}
}
async fn response_error_message(response: axum::response::Response) -> String {
use axum::body::to_bytes;
let status = response.status();
let body_bytes = match to_bytes(response.into_body(), 64 * 1024).await {
Ok(bytes) => bytes,
Err(error) => {
return format!("外部生成任务失败:{status},响应读取失败:{error}");
}
};
let body_text = String::from_utf8_lossy(&body_bytes).trim().to_string();
if body_text.is_empty() {
return format!("外部生成任务失败:{status}");
}
if let Ok(body_json) = serde_json::from_str::<serde_json::Value>(&body_text)
&& let Some(message) = body_json
.get("error")
.and_then(|error| error.get("message"))
.and_then(serde_json::Value::as_str)
.map(str::trim)
.filter(|message| !message.is_empty())
{
return message.to_string();
}
body_text
}
async fn fail_queue_job_after_worker_error(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
error: &crate::puzzle::PuzzleExternalGenerationWorkerError,
message: &str,
) -> Result<(), String> {
if error.should_fail_queue_job() {
return fail_job(state, worker_id, job, message.to_string()).await;
}
warn!(
job_id = job.job_id,
job_kind = job.job_kind,
"external generation worker 业务失败态尚未写回,保留任务租约等待后续重试"
);
Ok(())
}
async fn complete_job(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
result_payload_json: Option<String>,
) -> Result<(), String> {
state
.spacetime_client()
.complete_external_generation_job(ExternalGenerationJobCompleteRecordInput {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
result_payload_json,
completed_at_micros: current_utc_micros(),
})
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
async fn fail_job(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
error_message: String,
) -> Result<(), String> {
let now_micros = current_utc_micros();
state
.spacetime_client()
.fail_external_generation_job(ExternalGenerationJobFailRecordInput {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
error_message,
retry_after_micros: now_micros.saturating_add(60_000_000),
failed_at_micros: now_micros,
})
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
async fn renew_job_lease(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
lease: Duration,
) -> Result<(), String> {
let now_micros = current_utc_micros();
state
.spacetime_client()
.renew_external_generation_job_lease(ExternalGenerationJobRenewLeaseRecordInput {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
lease_expires_at_micros: now_micros.saturating_add(duration_micros_i64(lease)),
renewed_at_micros: now_micros,
})
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
fn require_job_lease_token(job: &ExternalGenerationJobRecord) -> Result<String, String> {
job.lease_token
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.ok_or_else(|| format!("external_generation_job {} 缺少 lease token", job.job_id))
}
fn build_external_generation_write_lease_guard(
worker_id: &str,
job: &ExternalGenerationJobRecord,
) -> Result<ExternalGenerationWriteLeaseGuard, String> {
Ok(ExternalGenerationWriteLeaseGuard::from_claimed_job(
job.job_id.clone(),
worker_id.to_string(),
require_job_lease_token(job)?,
))
}
fn duration_micros_i64(duration: Duration) -> i64 {
duration.as_micros().min(i64::MAX as u128) as i64
}
fn external_generation_worker_heartbeat_interval(lease: Duration) -> Duration {
let heartbeat_millis = (lease.as_millis() / 3).clamp(250, 30_000) as u64;
Duration::from_millis(heartbeat_millis)
}
fn current_utc_micros() -> i64 {
offset_datetime_to_unix_micros(time::OffsetDateTime::now_utc())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn worker_write_guard_uses_claimed_job_lease_token() {
let job = external_generation_job_record_fixture(Some("lease-1"));
let guard = build_external_generation_write_lease_guard("worker-a", &job)
.expect("guard should build");
assert_eq!(guard.job_id.as_deref(), Some("extgen-1"));
assert_eq!(guard.worker_id.as_deref(), Some("worker-a"));
assert_eq!(guard.lease_token.as_deref(), Some("lease-1"));
}
#[test]
fn worker_write_guard_requires_claimed_job_lease_token() {
let job = external_generation_job_record_fixture(None);
let error = build_external_generation_write_lease_guard("worker-a", &job)
.expect_err("missing token should fail");
assert!(error.contains("缺少 lease token"));
}
fn external_generation_job_record_fixture(
lease_token: Option<&str>,
) -> ExternalGenerationJobRecord {
ExternalGenerationJobRecord {
job_id: "extgen-1".to_string(),
dedupe_key: "puzzle:generate_puzzle_images:session-1:extgen-1".to_string(),
job_kind: PUZZLE_GENERATE_IMAGES_JOB_KIND.to_string(),
owner_user_id: "user-1".to_string(),
source_module: "puzzle".to_string(),
source_entity_id: "session-1:puzzle-level-1".to_string(),
request_label: "拼图关卡图片生成".to_string(),
request_payload_json: "{}".to_string(),
status: "running".to_string(),
attempt: 1,
max_attempts: 1,
last_error_message: None,
worker_id: Some("worker-a".to_string()),
lease_expires_at: Some("2026-06-03T00:00:00Z".to_string()),
available_at: "2026-06-03T00:00:00Z".to_string(),
result_payload_json: None,
created_at: "2026-06-03T00:00:00Z".to_string(),
started_at: Some("2026-06-03T00:00:00Z".to_string()),
completed_at: None,
updated_at: "2026-06-03T00:00:00Z".to_string(),
lease_token: lease_token.map(ToOwned::to_owned),
}
}
}

View File

@@ -0,0 +1,465 @@
use std::{collections::BTreeSet, future::Future, io, pin::Pin, process::Stdio, time::Duration};
use spacetime_client::ExternalGenerationQueueStatsRecord;
use tokio::{
process::Command,
time::{Instant, sleep},
};
use tracing::{error, info, warn};
use crate::state::AppState;
#[derive(Clone, Debug)]
struct ExternalGenerationWorkerControllerConfig {
min_workers: usize,
max_workers: usize,
target_jobs_per_worker: usize,
poll_interval: Duration,
scale_down_idle_rounds: u32,
service_template: String,
dry_run: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct ExternalGenerationWorkerControllerDecision {
desired_workers: usize,
should_scale_down: bool,
idle_rounds: u32,
}
#[derive(Debug, Default)]
struct ExternalGenerationWorkerControllerState {
idle_rounds: u32,
}
pub(crate) async fn run_external_generation_worker_controller(
state: AppState,
) -> Result<(), io::Error> {
let config = ExternalGenerationWorkerControllerConfig::from_state(&state);
let mut controller_state = ExternalGenerationWorkerControllerState::default();
let mut shutdown = external_generation_controller_shutdown_signal();
info!(
min_workers = config.min_workers,
max_workers = config.max_workers,
target_jobs_per_worker = config.target_jobs_per_worker,
poll_interval_ms = config.poll_interval.as_millis(),
scale_down_idle_rounds = config.scale_down_idle_rounds,
service_template = config.service_template,
dry_run = config.dry_run,
"external generation worker controller 已启动"
);
loop {
let tick = run_external_generation_controller_tick(&state, &config, &mut controller_state);
tokio::select! {
_ = shutdown.as_mut() => {
info!("external generation worker controller 收到停机信号");
return Ok(());
}
result = tick => {
if let Err(error) = result {
error!(error = %error, "external generation worker controller 本轮扩缩容失败");
}
}
}
let next_tick = sleep(config.poll_interval);
tokio::pin!(next_tick);
tokio::select! {
_ = shutdown.as_mut() => {
info!("external generation worker controller 收到停机信号");
return Ok(());
}
_ = &mut next_tick => {}
}
}
}
async fn run_external_generation_controller_tick(
state: &AppState,
config: &ExternalGenerationWorkerControllerConfig,
controller_state: &mut ExternalGenerationWorkerControllerState,
) -> Result<(), String> {
let stats = state
.spacetime_client()
.get_external_generation_queue_stats()
.await
.map_err(|error| format!("读取 external_generation_job 队列统计失败:{error}"))?;
let active_instances = list_active_external_generation_worker_instances(config).await?;
let current_workers = active_instances.len();
let decision = decide_external_generation_worker_target(
&stats,
current_workers,
controller_state.idle_rounds,
config,
);
controller_state.idle_rounds = decision.idle_rounds;
info!(
pending = stats.pending_count,
delayed_pending = stats.delayed_pending_count,
claimable = stats.claimable_count,
running_active = stats.running_active_count,
expired_running = stats.expired_running_count,
oldest_claimable_age_ms = stats.oldest_claimable_age_micros.unwrap_or(0) / 1_000,
current_workers,
desired_workers = decision.desired_workers,
idle_rounds = decision.idle_rounds,
"external generation worker controller 完成队列评估"
);
reconcile_external_generation_worker_instances(config, &active_instances, &decision).await
}
fn decide_external_generation_worker_target(
stats: &ExternalGenerationQueueStatsRecord,
current_workers: usize,
previous_idle_rounds: u32,
config: &ExternalGenerationWorkerControllerConfig,
) -> ExternalGenerationWorkerControllerDecision {
let pressure = stats
.claimable_pending_count
.saturating_add(stats.running_active_count)
.saturating_add(stats.expired_running_count);
let desired_from_pressure =
ceil_div_usize(pressure as usize, config.target_jobs_per_worker.max(1));
let desired_workers = desired_from_pressure.clamp(config.min_workers, config.max_workers);
let is_idle = stats.claimable_count == 0
&& stats.expired_running_count == 0
&& stats.running_active_count == 0
&& desired_workers <= config.min_workers;
let idle_rounds = if is_idle {
previous_idle_rounds.saturating_add(1)
} else {
0
};
let should_scale_down = current_workers > desired_workers
&& idle_rounds >= config.scale_down_idle_rounds
&& config.scale_down_idle_rounds > 0;
ExternalGenerationWorkerControllerDecision {
desired_workers,
should_scale_down,
idle_rounds,
}
}
async fn reconcile_external_generation_worker_instances(
config: &ExternalGenerationWorkerControllerConfig,
active_instances: &BTreeSet<usize>,
decision: &ExternalGenerationWorkerControllerDecision,
) -> Result<(), String> {
let current_workers = active_instances.len();
let mut started = 0usize;
for instance in 1..=config.max_workers {
if current_workers.saturating_add(started) >= decision.desired_workers {
break;
}
if !active_instances.contains(&instance) {
systemctl_worker_instance(config, "start", instance).await?;
started = started.saturating_add(1);
}
}
if decision.desired_workers > current_workers && started == 0 {
warn!(
current_workers,
desired_workers = decision.desired_workers,
"external generation worker controller 未找到可启动的缺口实例"
);
}
if started > 0 {
return Ok(());
}
if decision.should_scale_down && decision.desired_workers < current_workers {
if let Some(instance) = active_instances
.iter()
.rev()
.copied()
.find(|instance| *instance > config.min_workers.max(1))
{
systemctl_worker_instance(config, "stop", instance).await?;
}
}
Ok(())
}
async fn list_active_external_generation_worker_instances(
config: &ExternalGenerationWorkerControllerConfig,
) -> Result<BTreeSet<usize>, String> {
let mut active_instances = BTreeSet::new();
for instance in 1..=config.max_workers {
if is_external_generation_worker_instance_active(config, instance).await? {
active_instances.insert(instance);
}
}
Ok(active_instances)
}
async fn is_external_generation_worker_instance_active(
config: &ExternalGenerationWorkerControllerConfig,
instance: usize,
) -> Result<bool, String> {
let service = format_worker_service_name(&config.service_template, instance)?;
if config.dry_run {
return Ok(instance <= config.min_workers);
}
let output = Command::new("systemctl")
.arg("is-active")
.arg("--quiet")
.arg(&service)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.output()
.await
.map_err(|error| format!("执行 systemctl is-active {service} 失败:{error}"))?;
Ok(output.status.success())
}
async fn systemctl_worker_instance(
config: &ExternalGenerationWorkerControllerConfig,
action: &str,
instance: usize,
) -> Result<(), String> {
let service = format_worker_service_name(&config.service_template, instance)?;
if config.dry_run {
info!(
action,
service, "external generation worker controller dry-run 跳过 systemctl"
);
return Ok(());
}
let started_at = Instant::now();
let output = Command::new("systemctl")
.arg(action)
.arg(&service)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.await
.map_err(|error| format!("执行 systemctl {action} {service} 失败:{error}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"systemctl {action} {service} 返回失败 status={} stderr={}",
output.status, stderr
));
}
info!(
action,
service,
elapsed_ms = started_at.elapsed().as_millis(),
"external generation worker controller 已执行 systemctl"
);
Ok(())
}
fn format_worker_service_name(template: &str, instance: usize) -> Result<String, String> {
let instance = instance.to_string();
if template.contains("{}") {
return Ok(template.replacen("{}", &instance, 1));
}
if template.contains("%i") {
return Ok(template.replacen("%i", &instance, 1));
}
Err("external generation controller service template 必须包含 {} 或 %i".to_string())
}
fn ceil_div_usize(value: usize, divisor: usize) -> usize {
if value == 0 {
0
} else {
value.saturating_add(divisor.saturating_sub(1)) / divisor.max(1)
}
}
impl ExternalGenerationWorkerControllerConfig {
fn from_state(state: &AppState) -> Self {
let min_workers = state.config.external_generation_controller_min_workers;
let max_workers = state
.config
.external_generation_controller_max_workers
.max(min_workers);
Self {
min_workers,
max_workers,
target_jobs_per_worker: state
.config
.external_generation_controller_target_jobs_per_worker
.max(1),
poll_interval: state.config.external_generation_controller_poll_interval,
scale_down_idle_rounds: state
.config
.external_generation_controller_scale_down_idle_rounds,
service_template: state
.config
.external_generation_controller_service_template
.clone(),
dry_run: state.config.external_generation_controller_dry_run,
}
}
}
type ExternalGenerationControllerShutdownSignal = Pin<Box<dyn Future<Output = ()> + Send>>;
fn external_generation_controller_shutdown_signal() -> ExternalGenerationControllerShutdownSignal {
Box::pin(async {
wait_for_external_generation_controller_shutdown_signal().await;
})
}
#[cfg(unix)]
async fn wait_for_external_generation_controller_shutdown_signal() {
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).ok();
tokio::select! {
result = tokio::signal::ctrl_c() => {
if let Err(error) = result {
warn!(error = %error, "external generation worker controller 监听 SIGINT 失败");
}
}
_ = async {
if let Some(sigterm) = sigterm.as_mut() {
sigterm.recv().await;
} else {
std::future::pending::<()>().await;
}
} => {}
}
}
#[cfg(not(unix))]
async fn wait_for_external_generation_controller_shutdown_signal() {
if let Err(error) = tokio::signal::ctrl_c().await {
warn!(error = %error, "external generation worker controller 监听 Ctrl-C 失败");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn scales_up_to_max_when_queue_pressure_is_high() {
let config = controller_config_fixture();
let stats = stats_fixture(120, 0, 8);
let decision = decide_external_generation_worker_target(&stats, 1, 0, &config);
assert_eq!(decision.desired_workers, 8);
assert!(!decision.should_scale_down);
assert_eq!(decision.idle_rounds, 0);
}
#[test]
fn scale_down_requires_consecutive_idle_rounds() {
let config = controller_config_fixture();
let stats = stats_fixture(0, 0, 0);
let first = decide_external_generation_worker_target(&stats, 5, 0, &config);
let ready = decide_external_generation_worker_target(
&stats,
5,
config.scale_down_idle_rounds.saturating_sub(1),
&config,
);
assert_eq!(first.desired_workers, config.min_workers);
assert!(!first.should_scale_down);
assert!(ready.should_scale_down);
}
#[test]
fn running_jobs_hold_capacity_before_scale_down() {
let config = controller_config_fixture();
let stats = stats_fixture(0, 6, 0);
let decision = decide_external_generation_worker_target(&stats, 5, 5, &config);
assert_eq!(decision.desired_workers, 3);
assert!(!decision.should_scale_down);
assert_eq!(decision.idle_rounds, 0);
}
#[test]
fn expired_running_jobs_are_not_counted_twice_as_claimable_pressure() {
let config = controller_config_fixture();
let stats = stats_fixture(0, 0, 3);
let decision = decide_external_generation_worker_target(&stats, 1, 0, &config);
assert_eq!(decision.desired_workers, 2);
assert!(!decision.should_scale_down);
}
#[test]
fn formats_worker_service_name_with_supported_templates() {
assert_eq!(
format_worker_service_name("genarrative-external-generation-worker@{}.service", 3)
.expect("format"),
"genarrative-external-generation-worker@3.service"
);
assert_eq!(
format_worker_service_name("worker@%i.service", 7).expect("format"),
"worker@7.service"
);
assert!(format_worker_service_name("worker.service", 1).is_err());
}
#[tokio::test]
async fn dry_run_reconcile_does_not_start_low_number_gaps_when_capacity_is_enough() {
let config = controller_config_fixture();
let active_instances = BTreeSet::from([3usize, 4usize]);
let decision = ExternalGenerationWorkerControllerDecision {
desired_workers: 2,
should_scale_down: false,
idle_rounds: 0,
};
let result =
reconcile_external_generation_worker_instances(&config, &active_instances, &decision)
.await;
assert!(result.is_ok());
}
fn controller_config_fixture() -> ExternalGenerationWorkerControllerConfig {
ExternalGenerationWorkerControllerConfig {
min_workers: 1,
max_workers: 8,
target_jobs_per_worker: 2,
poll_interval: Duration::from_secs(10),
scale_down_idle_rounds: 3,
service_template: "genarrative-external-generation-worker@{}.service".to_string(),
dry_run: true,
}
}
fn stats_fixture(
claimable_pending_count: u32,
running_active_count: u32,
expired_running_count: u32,
) -> ExternalGenerationQueueStatsRecord {
let claimable_count = claimable_pending_count.saturating_add(expired_running_count);
ExternalGenerationQueueStatsRecord {
pending_count: claimable_pending_count,
delayed_pending_count: 0,
claimable_pending_count,
running_active_count,
expired_running_count,
terminal_count: 0,
claimable_count,
oldest_claimable_age_micros: None,
now_micros: 0,
}
}
}

View File

@@ -9,7 +9,11 @@ use module_assets::{
generate_asset_binding_id, generate_asset_object_id,
};
use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use shared_contracts::external_generation::{
ExternalGenerationJobStatus, ExternalGenerationJobStatusRecord,
};
use shared_contracts::jump_hop::{
JumpHopActionRequest, JumpHopActionType, JumpHopCharacterAsset, JumpHopDraftResponse,
JumpHopGalleryDetailResponse, JumpHopGenerationStatus, JumpHopJumpRequest, JumpHopJumpResponse,
@@ -20,7 +24,9 @@ use shared_contracts::jump_hop::{
JumpHopWorksResponse, JumpHopWorkspaceCreateRequest,
};
use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros};
use spacetime_client::SpacetimeClientError;
use spacetime_client::{
ExternalGenerationJobEnqueueRecordInput, ExternalGenerationJobRecord, SpacetimeClientError,
};
use std::{
collections::BTreeMap,
time::{SystemTime, UNIX_EPOCH},
@@ -49,6 +55,7 @@ use crate::{
};
const JUMP_HOP_TILE_ITEM_COUNT: usize = 18;
pub(crate) const JUMP_HOP_COMPILE_DRAFT_JOB_KIND: &str = "jump_hop_compile_draft";
const JUMP_HOP_PROVIDER: &str = "jump-hop";
const JUMP_HOP_CREATION_PROVIDER: &str = "jump-hop-creation";
@@ -72,6 +79,14 @@ const JUMP_HOP_BACK_BUTTON_IMAGE_SIZE: &str = "1024*1024";
const JUMP_HOP_BACK_BUTTON_IMAGE_WIDTH: u32 = 1024;
const JUMP_HOP_BACK_BUTTON_IMAGE_HEIGHT: u32 = 1024;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JumpHopCompileDraftWorkerPayload {
pub session_id: String,
pub owner_user_id: String,
pub payload: JumpHopActionRequest,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct JumpHopTileAtlasSlice {
tile_type: JumpHopTileType,
@@ -174,6 +189,37 @@ pub async fn execute_jump_hop_action(
let owner_user_id = authenticated.claims().user_id().to_string();
let mut payload = payload;
let is_compile_draft = matches!(payload.action_type, JumpHopActionType::CompileDraft);
let should_queue_generation = matches!(
payload.action_type,
JumpHopActionType::CompileDraft | JumpHopActionType::RegenerateTiles
) && !state.config.external_generation_mode.is_inline();
if should_queue_generation {
let mut queued_response = state
.spacetime_client()
.mark_jump_hop_generation_queued(
session_id.clone(),
owner_user_id.clone(),
payload.clone(),
)
.await
.map_err(|error| {
jump_hop_error_response(
&request_context,
JUMP_HOP_CREATION_PROVIDER,
map_jump_hop_client_error(error),
)
})?;
let queue_job = enqueue_jump_hop_compile_draft_job(
&state,
&request_context,
&session_id,
owner_user_id.as_str(),
payload,
)
.await?;
queued_response.queue_state = Some(map_jump_hop_queue_job_status(queue_job));
return Ok(json_success_body(Some(&request_context), queued_response));
}
let generation_points_cost = if is_compile_draft {
resolve_jump_hop_generation_points_cost(&state).await
} else {
@@ -246,6 +292,99 @@ pub async fn execute_jump_hop_action(
}
}
async fn enqueue_jump_hop_compile_draft_job(
state: &AppState,
request_context: &RequestContext,
session_id: &str,
owner_user_id: &str,
payload: JumpHopActionRequest,
) -> Result<ExternalGenerationJobRecord, Response> {
let job_id = build_prefixed_uuid_id("extgen-");
let now_micros = current_utc_micros();
let request_payload_json = serde_json::to_string(&JumpHopCompileDraftWorkerPayload {
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
payload,
})
.map_err(|error| {
jump_hop_error_response(
request_context,
JUMP_HOP_CREATION_PROVIDER,
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({
"message": format!("跳一跳 worker 任务参数序列化失败:{error}"),
})),
)
})?;
state
.spacetime_client()
.enqueue_external_generation_job(ExternalGenerationJobEnqueueRecordInput {
dedupe_key: format!("jump-hop:compile-draft:{session_id}:{job_id}"),
job_id,
job_kind: JUMP_HOP_COMPILE_DRAFT_JOB_KIND.to_string(),
owner_user_id: owner_user_id.to_string(),
source_module: "jump-hop".to_string(),
source_entity_id: session_id.to_string(),
request_label: "跳一跳草稿生成".to_string(),
request_payload_json,
max_attempts: 1,
available_at_micros: now_micros,
created_at_micros: now_micros,
})
.await
.map_err(|error| {
jump_hop_error_response(
request_context,
JUMP_HOP_CREATION_PROVIDER,
map_jump_hop_client_error(error),
)
})
}
fn map_jump_hop_queue_job_status(
job: ExternalGenerationJobRecord,
) -> ExternalGenerationJobStatusRecord {
ExternalGenerationJobStatusRecord {
operation_id: job.job_id,
status: ExternalGenerationJobStatus::Queued,
phase_label: job.request_label,
phase_detail: "排队中。".to_string(),
progress: 8,
error: job.last_error_message,
updated_at_micros: job.updated_at_micros,
}
}
pub(crate) async fn execute_jump_hop_compile_draft_worker_job(
state: &AppState,
request_context: &RequestContext,
mut worker_payload: JumpHopCompileDraftWorkerPayload,
) -> Result<JumpHopSessionSnapshotResponse, Response> {
maybe_generate_jump_hop_assets(
state,
request_context,
worker_payload.session_id.as_str(),
worker_payload.owner_user_id.as_str(),
&mut worker_payload.payload,
)
.await?;
let response = state
.spacetime_client()
.execute_jump_hop_action(
worker_payload.session_id,
worker_payload.owner_user_id,
worker_payload.payload,
)
.await
.map_err(|error| {
jump_hop_error_response(
request_context,
JUMP_HOP_CREATION_PROVIDER,
map_jump_hop_client_error(error),
)
})?;
Ok(response.session)
}
async fn resolve_jump_hop_generation_points_cost(state: &AppState) -> u64 {
crate::creation_entry_config::resolve_creation_entry_mud_point_cost(
state,
@@ -1005,15 +1144,8 @@ fn slice_jump_hop_tile_atlas(
let y1 = (row.saturating_add(1)).saturating_mul(height) / JUMP_HOP_TILE_ATLAS_ROWS;
let tile_width = x1.saturating_sub(x0).max(1);
let tile_height = y1.saturating_sub(y0).max(1);
let faces = slice_jump_hop_tile_uv_faces(
&source,
x0,
y0,
tile_width,
tile_height,
row,
col,
)?;
let faces =
slice_jump_hop_tile_uv_faces(&source, x0, y0, tile_width, tile_height, row, col)?;
slices.push(JumpHopTileAtlasSlice {
tile_type: jump_hop_tile_type_by_index(index),
source_atlas_cell: format!("row-{}-col-{}", row + 1, col + 1),
@@ -1043,22 +1175,70 @@ fn slice_jump_hop_tile_uv_faces(
Ok(JumpHopTileFaceSlices {
top: slice_jump_hop_tile_uv_face(
source, uv_x, uv_y, face_side, atlas_row, atlas_col, JumpHopTileFaceKey::Top, 1, 0,
source,
uv_x,
uv_y,
face_side,
atlas_row,
atlas_col,
JumpHopTileFaceKey::Top,
1,
0,
)?,
front: slice_jump_hop_tile_uv_face(
source, uv_x, uv_y, face_side, atlas_row, atlas_col, JumpHopTileFaceKey::Front, 1, 1,
source,
uv_x,
uv_y,
face_side,
atlas_row,
atlas_col,
JumpHopTileFaceKey::Front,
1,
1,
)?,
right: slice_jump_hop_tile_uv_face(
source, uv_x, uv_y, face_side, atlas_row, atlas_col, JumpHopTileFaceKey::Right, 2, 1,
source,
uv_x,
uv_y,
face_side,
atlas_row,
atlas_col,
JumpHopTileFaceKey::Right,
2,
1,
)?,
back: slice_jump_hop_tile_uv_face(
source, uv_x, uv_y, face_side, atlas_row, atlas_col, JumpHopTileFaceKey::Back, 3, 1,
source,
uv_x,
uv_y,
face_side,
atlas_row,
atlas_col,
JumpHopTileFaceKey::Back,
3,
1,
)?,
left: slice_jump_hop_tile_uv_face(
source, uv_x, uv_y, face_side, atlas_row, atlas_col, JumpHopTileFaceKey::Left, 0, 1,
source,
uv_x,
uv_y,
face_side,
atlas_row,
atlas_col,
JumpHopTileFaceKey::Left,
0,
1,
)?,
bottom: slice_jump_hop_tile_uv_face(
source, uv_x, uv_y, face_side, atlas_row, atlas_col, JumpHopTileFaceKey::Bottom, 1, 2,
source,
uv_x,
uv_y,
face_side,
atlas_row,
atlas_col,
JumpHopTileFaceKey::Bottom,
1,
2,
)?,
})
}
@@ -1095,12 +1275,7 @@ fn slice_jump_hop_tile_uv_face(
Ok(JumpHopTileFaceSlice {
face,
source_atlas_cell: format!(
"row-{}-col-{}/{}",
atlas_row + 1,
atlas_col + 1,
face_label
),
source_atlas_cell: format!("row-{}-col-{}/{}", atlas_row + 1, atlas_col + 1, face_label),
bytes: cursor.into_inner(),
})
}
@@ -1827,7 +2002,9 @@ mod tests {
assert!(prompt.contains("18个用于跳一跳地板的立方体主题物体 UV 展开包装图"));
assert!(prompt.contains("按三列六行均匀排布"));
assert!(prompt.contains("每个大单元格代表一个完整的 1x1x1 立方体方块物体"));
assert!(prompt.contains("该单元内的六张面贴图精确贴到 Three.js 标准极小倒角立方体的六个面上"));
assert!(
prompt.contains("该单元内的六张面贴图精确贴到 Three.js 标准极小倒角立方体的六个面上")
);
assert!(prompt.contains("cube object UV unwrap atlas / 立方体主题物体六面展开图集"));
assert!(prompt.contains("不是单纯平铺材质、不是抽象纹理、不是只把主题颜色铺满"));
assert!(prompt.contains("游戏界面或图标集页面"));
@@ -1850,7 +2027,9 @@ mod tests {
assert!(prompt.contains("full-bleed opaque square face texture"));
assert!(prompt.contains("四角、边缘和中心都要有可识别内容"));
assert!(prompt.contains("不留透明、不留空白、不留实底背景"));
assert!(prompt.contains("允许大面积水果切面、果柄叶片、剥皮条带、籽点、条纹和轮廓图案作为包装身份锚点"));
assert!(prompt.contains(
"允许大面积水果切面、果柄叶片、剥皮条带、籽点、条纹和轮廓图案作为包装身份锚点"
));
assert!(prompt.contains("不要把一个小水果、小叶片、小石头或小物体放在面中央"));
assert!(prompt.contains("这不是透视渲染图"));
assert!(prompt.contains("不要画摄像机视角、透视块、已烘焙侧壁"));
@@ -1868,14 +2047,18 @@ mod tests {
assert!(prompt.contains("小贴纸图标、小物体居中、纯果皮材质、纯果肉纹理"));
assert!(prompt.contains("English guardrail"));
assert!(prompt.contains("one vertical 1024x1536 image"));
assert!(prompt.contains("exactly 18 cube object UV unwraps in a 3 columns by 6 rows atlas"));
assert!(
prompt.contains("exactly 18 cube object UV unwraps in a 3 columns by 6 rows atlas")
);
assert!(prompt.contains("row1 col2 top"));
assert!(prompt.contains("row2 col1 left"));
assert!(prompt.contains("row2 col2 front"));
assert!(prompt.contains("row2 col3 right"));
assert!(prompt.contains("row2 col4 back"));
assert!(prompt.contains("row3 col2 bottom"));
assert!(prompt.contains("six different face textures that stitch into one recognizable cubified theme object"));
assert!(prompt.contains(
"six different face textures that stitch into one recognizable cubified theme object"
));
assert!(prompt.contains("no generic flat material"));
assert!(prompt.contains("no small centered stickers"));
assert!(prompt.contains("every face is full-bleed opaque square texture"));
@@ -2022,7 +2205,9 @@ mod tests {
"科幻芯片主题的俯视角清爽游戏化立体感平台素材",
);
assert!(prompt.contains("画面内容是科幻芯片主题的正交平面清爽游戏化立方体主题身份方块包装贴图"));
assert!(
prompt.contains("画面内容是科幻芯片主题的正交平面清爽游戏化立方体主题身份方块包装贴图")
);
assert!(!prompt.contains("画面内容是科幻芯片主题的俯视角清爽游戏化立体感平台素材"));
assert!(!prompt.contains("画面内容是科幻芯片主题的俯视角"));
@@ -2118,12 +2303,10 @@ mod tests {
.max(1);
let tile_x = atlas_col.saturating_mul(cell_width);
let tile_y = atlas_row.saturating_mul(cell_height);
let uv_x = tile_x.saturating_add(
cell_width.saturating_sub(face_side * JUMP_HOP_TILE_UV_FACE_COLS) / 2,
);
let uv_y = tile_y.saturating_add(
cell_height.saturating_sub(face_side * JUMP_HOP_TILE_UV_FACE_ROWS) / 2,
);
let uv_x = tile_x
.saturating_add(cell_width.saturating_sub(face_side * JUMP_HOP_TILE_UV_FACE_COLS) / 2);
let uv_y = tile_y
.saturating_add(cell_height.saturating_sub(face_side * JUMP_HOP_TILE_UV_FACE_ROWS) / 2);
for y in uv_y + face_row * face_side..uv_y + (face_row + 1) * face_side {
for x in uv_x + face_col * face_side..uv_x + (face_col + 1) * face_side {
atlas.put_pixel(x, y, color);
@@ -2159,14 +2342,8 @@ mod tests {
),
"{message}"
);
assert!(
decoded.pixels().any(|pixel| pixel.0 == color),
"{message}"
);
assert!(
decoded.pixels().all(|pixel| pixel.0[3] == 255),
"{message}"
);
assert!(decoded.pixels().any(|pixel| pixel.0 == color), "{message}");
assert!(decoded.pixels().all(|pixel| pixel.0[3] == 255), "{message}");
}
#[test]

View File

@@ -41,6 +41,9 @@ mod edutainment_baby_drawing;
mod edutainment_baby_object;
mod error_middleware;
mod external_api_audit;
mod external_generation;
mod external_generation_worker;
mod external_generation_worker_controller;
pub(crate) mod generated_asset_sheets;
mod generated_image_assets;
mod health;
@@ -115,6 +118,8 @@ use tracing::{error, info, warn};
use crate::{
app::{build_router, build_spacetime_unavailable_router},
config::AppConfig,
external_generation_worker::run_external_generation_worker,
external_generation_worker_controller::run_external_generation_worker_controller,
state::{AppState, AppStateInitError},
tracking_outbox::TrackingOutbox,
wallet_refund_outbox::WalletRefundOutbox,
@@ -168,24 +173,57 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
process_metrics::register_process_metrics();
telemetry::register_http_runtime_metrics();
if !config.process_role.runs_http() {
return run_worker_only(config).await;
}
run_http_role(config).await
}
async fn run_worker_only(config: AppConfig) -> Result<(), io::Error> {
let process_role = config.process_role;
let state = restore_app_state_for_startup(config)
.await
.map_err(|error| {
io::Error::other(format!(
"初始化 external generation worker 状态失败:{error}"
))
})?;
spawn_app_state_background_workers(&state);
info!(
process_role = process_role.as_str(),
"api-server 以非 HTTP 角色启动"
);
if process_role.runs_external_generation_worker() {
run_external_generation_worker(state).await
} else if process_role.runs_external_generation_controller() {
run_external_generation_worker_controller(state).await
} else {
Err(io::Error::other(format!(
"不支持的非 HTTP 进程角色:{}",
process_role.as_str()
)))
}
}
async fn run_http_role(config: AppConfig) -> Result<(), io::Error> {
let bind_address = config.bind_socket_addr();
let listen_backlog = config.listen_backlog;
let worker_threads = config.worker_threads;
let otel_enabled = config.otel_enabled;
let process_role = config.process_role;
let outbox_flush_timeout = config.shutdown_outbox_flush_timeout;
let listener = build_tcp_listener(bind_address, listen_backlog)?;
let (router, shutdown_context) = match restore_app_state_for_startup(config).await {
let (router, shutdown_context, worker_state) = match restore_app_state_for_startup(config).await
{
Ok(state) => {
state.puzzle_gallery_cache().spawn_cleanup_task();
spawn_app_state_background_workers(&state);
let tracking_outbox = state.tracking_outbox();
if let Some(outbox) = tracking_outbox.clone() {
outbox.spawn_worker();
}
let wallet_refund_outbox = state.wallet_refund_outbox();
if let Some(outbox) = wallet_refund_outbox.clone() {
outbox.spawn_worker();
}
let worker_state = process_role
.runs_external_generation_worker()
.then(|| state.clone());
(
build_router(state.clone()),
ShutdownContext {
@@ -194,6 +232,7 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
wallet_refund_outbox,
outbox_flush_timeout,
},
worker_state,
)
}
Err(AppStateInitError::DependencyUnavailable(message)) => (
@@ -204,6 +243,7 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
wallet_refund_outbox: None,
outbox_flush_timeout,
},
None,
),
Err(error) => {
return Err(std::io::Error::other(format!(
@@ -217,12 +257,20 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
listen_backlog,
worker_threads = worker_threads.unwrap_or(0),
otel_enabled,
process_role = process_role.as_str(),
"api-server 已完成 tracing 初始化并开始监听"
);
let result = axum::serve(listener, router)
.with_graceful_shutdown(shutdown_signal(shutdown_context.clone()))
.await;
let http_server = axum::serve(listener, router)
.with_graceful_shutdown(shutdown_signal(shutdown_context.clone()));
let result = if let Some(worker_state) = worker_state {
tokio::select! {
result = http_server => result,
result = run_external_generation_worker(worker_state) => result,
}
} else {
http_server.await
};
finalize_shutdown(shutdown_context).await;
result
}
@@ -333,6 +381,16 @@ async fn finalize_shutdown(context: ShutdownContext) {
}
}
fn spawn_app_state_background_workers(state: &AppState) {
state.puzzle_gallery_cache().spawn_cleanup_task();
if let Some(outbox) = state.tracking_outbox() {
outbox.spawn_worker();
}
if let Some(outbox) = state.wallet_refund_outbox() {
outbox.spawn_worker();
}
}
fn build_tcp_listener(
bind_address: SocketAddr,
listen_backlog: i32,

View File

@@ -6,6 +6,7 @@ pub mod big_fish;
pub mod custom_world;
pub mod editor_project;
pub mod edutainment;
pub mod external_generation;
pub mod health;
pub mod internal;
pub mod jump_hop;

View File

@@ -0,0 +1,26 @@
use axum::{Router, middleware, routing::get};
use crate::{
auth::require_bearer_auth,
external_generation::{
get_external_generation_job_status, get_external_generation_queue_overview,
},
state::AppState,
};
pub fn router(state: AppState) -> Router<AppState> {
Router::new()
.route(
"/api/runtime/external-generation/queue-overview",
get(get_external_generation_queue_overview).route_layer(
middleware::from_fn_with_state(state.clone(), require_bearer_auth),
),
)
.route(
"/api/runtime/external-generation/jobs/{job_id}",
get(get_external_generation_job_status).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
)),
)
}

View File

@@ -52,22 +52,22 @@ use shared_contracts::{
};
use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros};
use spacetime_client::{
PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput,
PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord,
PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord,
PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
ExternalGenerationJobEnqueueRecordInput, PuzzleAgentMessageRecord,
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleCreatorIntentRecord,
PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord,
PuzzleFormDraftSaveRecordInput, PuzzleGeneratedImageCandidateRecord,
PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord,
PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord,
PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord,
PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunPauseRecordInput,
PuzzleRunPropRecordInput, PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput,
PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput,
PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput,
PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput,
SpacetimeClientError,
PuzzleLeaderboardSubmitRecordInput, PuzzleLevelGenerationFailureRecordInput,
PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord,
PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord,
PuzzleRunDragRecordInput, PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord,
PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleSelectCoverImageRecordInput,
PuzzleUiBackgroundSaveRecordInput, PuzzleWorkLikeReportRecordInput,
PuzzleWorkPointIncentiveClaimRecordInput, PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput,
PuzzleWorkUpsertRecordInput, SpacetimeClientError,
};
use std::convert::Infallible;
@@ -79,6 +79,10 @@ use crate::{
should_skip_asset_operation_billing_for_connectivity,
},
auth::{AuthenticatedAccessToken, RuntimePrincipal},
external_generation_worker::{
PUZZLE_COMPILE_DRAFT_JOB_KIND, PUZZLE_GENERATE_IMAGES_JOB_KIND,
PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND,
},
generated_asset_sheets::apply_generated_asset_sheet_green_screen_alpha,
http_error::AppError,
llm_model_routing::{CREATION_TEMPLATE_LLM_MODEL, PUZZLE_LEVEL_NAME_VISION_LLM_MODEL},
@@ -185,6 +189,25 @@ async fn release_claimed_puzzle_background_compile_task(
}
}
pub(crate) fn spawn_release_claimed_puzzle_background_compile_task(
state: PuzzleApiState,
task_id: String,
claim_id: String,
session_id: String,
owner_user_id: String,
) {
tokio::spawn(async move {
release_claimed_puzzle_background_compile_task(
&state,
&task_id,
&claim_id,
&session_id,
&owner_user_id,
)
.await;
});
}
fn has_puzzle_cover_image_src(value: &Option<String>) -> bool {
value
.as_deref()
@@ -215,6 +238,65 @@ fn mark_puzzle_initial_generation_started_snapshot(
session
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct ExternalGenerationWriteLeaseGuard {
pub(crate) job_id: Option<String>,
pub(crate) worker_id: Option<String>,
pub(crate) lease_token: Option<String>,
}
impl ExternalGenerationWriteLeaseGuard {
pub(crate) fn inline() -> Self {
Self {
job_id: None,
worker_id: None,
lease_token: None,
}
}
pub(crate) fn from_claimed_job(job_id: String, worker_id: String, lease_token: String) -> Self {
Self {
job_id: Some(job_id),
worker_id: Some(worker_id),
lease_token: Some(lease_token),
}
}
}
#[derive(Debug)]
pub(crate) struct PuzzleExternalGenerationWorkerError {
error: AppError,
should_fail_queue_job: bool,
}
impl PuzzleExternalGenerationWorkerError {
pub(crate) fn with_failure_state_written(error: AppError) -> Self {
Self {
error,
should_fail_queue_job: true,
}
}
pub(crate) fn with_failure_state_pending(error: AppError) -> Self {
Self {
error,
should_fail_queue_job: false,
}
}
pub(crate) fn body_text(&self) -> String {
self.error.body_text()
}
pub(crate) fn into_app_error(self) -> AppError {
self.error
}
pub(crate) fn should_fail_queue_job(&self) -> bool {
self.should_fail_queue_job
}
}
pub(crate) fn format_puzzle_reference_image_upload_bytes(bytes: usize) -> String {
format!("{:.1}MB", bytes as f64 / 1024.0 / 1024.0)
}
@@ -237,7 +319,7 @@ mod mappers;
use self::mappers::*;
mod draft;
use self::draft::*;
pub(crate) use self::draft::*;
mod tags;
@@ -246,7 +328,7 @@ use self::tags::*;
mod generation;
mod vector_engine;
use self::generation::*;
pub(crate) use self::generation::*;
use self::vector_engine::*;
#[cfg(test)]

View File

@@ -137,6 +137,213 @@ pub(crate) async fn create_seeded_puzzle_session_when_form_save_missing(
Ok(replacement.session_id)
}
fn default_puzzle_image_generation_points_cost() -> u64 {
PUZZLE_IMAGE_GENERATION_POINTS_COST
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct PuzzleCompileDraftWorkerPayload {
pub session_id: String,
pub owner_user_id: String,
pub billing_asset_id: String,
pub ai_redraw: bool,
#[serde(default = "default_puzzle_image_generation_points_cost")]
pub billing_points_cost: u64,
#[serde(default)]
pub prompt_text: Option<String>,
#[serde(default)]
pub reference_image_src: Option<String>,
#[serde(default)]
pub image_model: Option<String>,
pub requested_at_micros: i64,
#[serde(default)]
pub background_task_id: Option<String>,
#[serde(default)]
pub background_claim_id: Option<String>,
}
pub(crate) async fn execute_puzzle_compile_draft_worker_job(
state: &PuzzleApiState,
request_context: &RequestContext,
payload: PuzzleCompileDraftWorkerPayload,
external_generation_guard: ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, PuzzleExternalGenerationWorkerError> {
let now = current_utc_micros();
let session = if payload.ai_redraw {
execute_billable_asset_operation_with_cost(
state.root_state(),
&payload.owner_user_id,
"puzzle_initial_image",
&payload.billing_asset_id,
payload.billing_points_cost,
async {
compile_puzzle_draft_with_initial_cover(
state,
request_context,
payload.session_id.clone(),
payload.owner_user_id.clone(),
payload.prompt_text.as_deref(),
payload.reference_image_src.as_deref(),
payload.image_model.as_deref(),
now,
&external_generation_guard,
)
.await
},
)
.await
} else {
compile_puzzle_draft_with_uploaded_cover(
state,
request_context,
payload.session_id.clone(),
payload.owner_user_id.clone(),
payload.prompt_text.as_deref(),
payload.reference_image_src.as_deref(),
now,
&external_generation_guard,
)
.await
};
match session {
Ok(session) => {
if session
.draft
.as_ref()
.is_some_and(|draft| draft.generation_status == "ready")
{
send_generation_result_subscribe_message_after_completion(
state.root_state(),
GenerationResultSubscribeMessage {
owner_user_id: payload.owner_user_id.clone(),
task_name: Some("拼图".to_string()),
work_name: session.draft.as_ref().map(|draft| draft.work_title.clone()),
status: GenerationResultSubscribeMessageStatus::Succeeded,
consumed_points: if payload.ai_redraw {
payload.billing_points_cost
} else {
0
},
completed_at_micros: current_utc_micros(),
page: Some("/pages/web-view/index".to_string()),
},
)
.await;
}
release_inline_puzzle_compile_background_claim(
state,
&payload,
&external_generation_guard,
);
Ok(session)
}
Err(error) => {
match mark_puzzle_compile_failure_for_worker(
state,
&payload.session_id,
&payload.owner_user_id,
error.body_text(),
now,
&external_generation_guard,
)
.await
{
Ok(()) => {
send_generation_result_subscribe_message_after_completion(
state.root_state(),
GenerationResultSubscribeMessage {
owner_user_id: payload.owner_user_id.clone(),
task_name: Some("拼图".to_string()),
work_name: None,
status: GenerationResultSubscribeMessageStatus::Failed,
consumed_points: 0,
completed_at_micros: now,
page: Some("/pages/web-view/index".to_string()),
},
)
.await;
release_inline_puzzle_compile_background_claim(
state,
&payload,
&external_generation_guard,
);
Err(PuzzleExternalGenerationWorkerError::with_failure_state_written(error))
}
Err(mark_error) => {
Err(PuzzleExternalGenerationWorkerError::with_failure_state_pending(mark_error))
}
}
}
}
}
fn release_inline_puzzle_compile_background_claim(
state: &PuzzleApiState,
payload: &PuzzleCompileDraftWorkerPayload,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) {
if external_generation_guard.job_id.is_some() {
return;
}
release_puzzle_compile_background_claim(state, payload);
}
pub(crate) fn release_puzzle_compile_background_claim(
state: &PuzzleApiState,
payload: &PuzzleCompileDraftWorkerPayload,
) {
let (Some(task_id), Some(claim_id)) = (
payload.background_task_id.as_ref(),
payload.background_claim_id.as_ref(),
) else {
return;
};
spawn_release_claimed_puzzle_background_compile_task(
state.clone(),
task_id.clone(),
claim_id.clone(),
payload.session_id.clone(),
payload.owner_user_id.clone(),
);
}
pub(crate) async fn mark_puzzle_compile_failure_for_worker(
state: &PuzzleApiState,
session_id: &str,
owner_user_id: &str,
error_message: String,
failed_at_micros: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<(), AppError> {
let result = state
.spacetime_client()
.mark_puzzle_draft_generation_failed(PuzzleDraftCompileFailureRecordInput {
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
error_message,
failed_at_micros,
external_generation_job_id: external_generation_guard.job_id.clone(),
external_generation_worker_id: external_generation_guard.worker_id.clone(),
external_generation_lease_token: external_generation_guard.lease_token.clone(),
})
.await;
if let Err(error) = result {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id,
owner_user_id,
message = %error,
"拼图 worker 草稿失败态回写失败"
);
return Err(map_puzzle_client_error(error));
}
Ok(())
}
pub(crate) fn select_puzzle_level_for_api(
draft: &PuzzleResultDraftRecord,
level_id: Option<&str>,
@@ -1163,6 +1370,44 @@ pub(crate) fn find_puzzle_level_for_initial_asset_check<'a>(
.or_else(|| levels.first())
}
pub(crate) async fn compile_puzzle_draft_with_initial_cover(
state: &PuzzleApiState,
request_context: &RequestContext,
session_id: String,
owner_user_id: String,
prompt_text: Option<&str>,
reference_image_src: Option<&str>,
image_model: Option<&str>,
now: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, AppError> {
let compiled_session = state
.spacetime_client()
.compile_puzzle_agent_draft_with_external_generation_guard(
session_id,
owner_user_id.clone(),
now,
external_generation_guard.job_id.clone(),
external_generation_guard.worker_id.clone(),
external_generation_guard.lease_token.clone(),
)
.await
.map_err(map_puzzle_compile_error)?;
generate_puzzle_initial_cover_from_compiled_session(
state,
request_context,
compiled_session,
owner_user_id,
prompt_text,
reference_image_src,
image_model,
now,
external_generation_guard,
)
.await
}
pub(crate) async fn generate_puzzle_initial_cover_from_compiled_session(
state: &PuzzleApiState,
request_context: &RequestContext,
@@ -1172,6 +1417,7 @@ pub(crate) async fn generate_puzzle_initial_cover_from_compiled_session(
reference_image_src: Option<&str>,
image_model: Option<&str>,
now: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, AppError> {
let draft = compiled_session.draft.clone().ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
@@ -1322,6 +1568,9 @@ pub(crate) async fn generate_puzzle_initial_cover_from_compiled_session(
levels_json: levels_json_with_generated_name.clone(),
candidates_json,
saved_at_micros: current_utc_micros(),
external_generation_job_id: external_generation_guard.job_id.clone(),
external_generation_worker_id: external_generation_guard.worker_id.clone(),
external_generation_lease_token: external_generation_guard.lease_token.clone(),
})
.await
.map_err(map_puzzle_client_error)
@@ -1435,6 +1684,7 @@ pub(crate) async fn compile_puzzle_draft_with_uploaded_cover(
prompt_text: Option<&str>,
reference_image_src: Option<&str>,
now: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, AppError> {
let uploaded_image_src = reference_image_src
.map(str::trim)
@@ -1469,7 +1719,14 @@ pub(crate) async fn compile_puzzle_draft_with_uploaded_cover(
})?;
let compiled_session = state
.spacetime_client()
.compile_puzzle_agent_draft(session_id.clone(), owner_user_id.clone(), now)
.compile_puzzle_agent_draft_with_external_generation_guard(
session_id.clone(),
owner_user_id.clone(),
now,
external_generation_guard.job_id.clone(),
external_generation_guard.worker_id.clone(),
external_generation_guard.lease_token.clone(),
)
.await
.map_err(map_puzzle_compile_error)?;
let draft = compiled_session.draft.clone().ok_or_else(|| {
@@ -1618,6 +1875,9 @@ pub(crate) async fn compile_puzzle_draft_with_uploaded_cover(
levels_json: levels_json_with_generated_name.clone(),
candidates_json,
saved_at_micros: current_utc_micros(),
external_generation_job_id: external_generation_guard.job_id.clone(),
external_generation_worker_id: external_generation_guard.worker_id.clone(),
external_generation_lease_token: external_generation_guard.lease_token.clone(),
})
.await
.map_err(map_puzzle_client_error)

View File

@@ -22,6 +22,510 @@ pub(crate) fn should_use_uploaded_puzzle_image_directly(
.is_some_and(|value| !value.is_empty())
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct PuzzleGenerateImagesWorkerPayload {
pub session_id: String,
pub owner_user_id: String,
pub billing_asset_id: String,
#[serde(default)]
pub level_id: Option<String>,
#[serde(default)]
pub prompt_text: Option<String>,
#[serde(default)]
pub reference_image_src: Option<String>,
#[serde(default)]
pub reference_image_srcs: Vec<String>,
#[serde(default)]
pub reference_image_asset_object_id: Option<String>,
#[serde(default)]
pub reference_image_asset_object_ids: Vec<String>,
#[serde(default)]
pub image_model: Option<String>,
#[serde(default)]
pub ai_redraw: Option<bool>,
#[serde(default)]
pub should_auto_name_level: Option<bool>,
#[serde(default)]
pub work_title: Option<String>,
#[serde(default)]
pub work_description: Option<String>,
#[serde(default)]
pub picture_description: Option<String>,
#[serde(default)]
pub summary: Option<String>,
#[serde(default)]
pub theme_tags: Option<Vec<String>>,
#[serde(default)]
pub levels_json: Option<String>,
pub requested_at_micros: i64,
}
impl PuzzleGenerateImagesWorkerPayload {
fn to_action_request(&self) -> ExecutePuzzleAgentActionRequest {
ExecutePuzzleAgentActionRequest {
action: "generate_puzzle_images".to_string(),
prompt_text: self.prompt_text.clone(),
reference_image_src: self.reference_image_src.clone(),
reference_image_srcs: self.reference_image_srcs.clone(),
reference_image_asset_object_id: self.reference_image_asset_object_id.clone(),
reference_image_asset_object_ids: self.reference_image_asset_object_ids.clone(),
image_model: self.image_model.clone(),
ai_redraw: self.ai_redraw,
candidate_count: Some(1),
should_auto_name_level: self.should_auto_name_level,
candidate_id: None,
level_id: self.level_id.clone(),
work_title: self.work_title.clone(),
work_description: self.work_description.clone(),
picture_description: self.picture_description.clone(),
level_name: None,
summary: self.summary.clone(),
theme_tags: self.theme_tags.clone(),
levels_json: self.levels_json.clone(),
}
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct PuzzleGenerateUiBackgroundWorkerPayload {
pub session_id: String,
pub owner_user_id: String,
pub billing_asset_id: String,
#[serde(default)]
pub level_id: Option<String>,
#[serde(default)]
pub prompt_text: Option<String>,
#[serde(default)]
pub levels_json: Option<String>,
pub requested_at_micros: i64,
}
impl PuzzleGenerateUiBackgroundWorkerPayload {
fn to_action_request(&self) -> ExecutePuzzleAgentActionRequest {
ExecutePuzzleAgentActionRequest {
action: "generate_puzzle_ui_background".to_string(),
prompt_text: self.prompt_text.clone(),
reference_image_src: None,
reference_image_srcs: Vec::new(),
reference_image_asset_object_id: None,
reference_image_asset_object_ids: Vec::new(),
image_model: None,
ai_redraw: None,
candidate_count: None,
should_auto_name_level: None,
candidate_id: None,
level_id: self.level_id.clone(),
work_title: None,
work_description: None,
picture_description: None,
level_name: None,
summary: None,
theme_tags: None,
levels_json: self.levels_json.clone(),
}
}
}
pub(crate) async fn execute_puzzle_generate_images_worker_job(
state: &PuzzleApiState,
request_context: &RequestContext,
payload: PuzzleGenerateImagesWorkerPayload,
external_generation_guard: ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, PuzzleExternalGenerationWorkerError> {
let now = current_utc_micros();
let session = execute_billable_asset_operation_with_cost(
state.root_state(),
&payload.owner_user_id,
"puzzle_generated_image",
&payload.billing_asset_id,
PUZZLE_IMAGE_GENERATION_POINTS_COST,
async {
execute_puzzle_generate_images_worker_job_inner(
state,
request_context,
&payload,
now,
&external_generation_guard,
)
.await
},
)
.await;
match session {
Ok(session) => Ok(session),
Err(error) => {
match mark_puzzle_level_generation_failure_for_worker(
state,
&payload,
error.body_text(),
now,
&external_generation_guard,
)
.await
{
Ok(()) => {
Err(PuzzleExternalGenerationWorkerError::with_failure_state_written(error))
}
Err(mark_error) => {
Err(PuzzleExternalGenerationWorkerError::with_failure_state_pending(mark_error))
}
}
}
}
}
pub(crate) async fn execute_puzzle_generate_ui_background_worker_job(
state: &PuzzleApiState,
request_context: &RequestContext,
payload: PuzzleGenerateUiBackgroundWorkerPayload,
external_generation_guard: ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, PuzzleExternalGenerationWorkerError> {
let now = current_utc_micros();
let session = execute_billable_asset_operation_with_cost(
state.root_state(),
&payload.owner_user_id,
"puzzle_ui_background_image",
&payload.billing_asset_id,
PUZZLE_IMAGE_GENERATION_POINTS_COST,
async {
execute_puzzle_generate_ui_background_worker_job_inner(
state,
request_context,
&payload,
now,
&external_generation_guard,
)
.await
},
)
.await;
match session {
Ok(session) => Ok(session),
Err(error) => {
match mark_puzzle_level_generation_failure_for_external_generation(
state,
&payload.session_id,
&payload.owner_user_id,
payload.level_id.clone(),
payload.levels_json.clone(),
error.body_text(),
now,
&external_generation_guard,
)
.await
{
Ok(()) => {
Err(PuzzleExternalGenerationWorkerError::with_failure_state_written(error))
}
Err(mark_error) => {
Err(PuzzleExternalGenerationWorkerError::with_failure_state_pending(mark_error))
}
}
}
}
}
async fn execute_puzzle_generate_images_worker_job_inner(
state: &PuzzleApiState,
request_context: &RequestContext,
payload: &PuzzleGenerateImagesWorkerPayload,
now: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, AppError> {
let action_payload = payload.to_action_request();
let target_level_id = payload.level_id.clone();
let levels_json = payload.levels_json.clone();
let session = get_puzzle_session_for_image_generation(
state,
payload.session_id.clone(),
payload.owner_user_id.clone(),
&action_payload,
levels_json.as_deref(),
now,
)
.await?;
let mut draft = session.draft.clone().ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": PUZZLE_AGENT_API_BASE_PROVIDER,
"message": "拼图结果页草稿尚未生成",
}))
})?;
if let Some(levels_json) = levels_json.as_ref() {
draft.levels = parse_puzzle_level_records_from_module_json(levels_json)?;
}
let mut target_level = select_puzzle_level_for_api(&draft, target_level_id.as_deref())?;
let prompt = resolve_puzzle_level_image_prompt(
payload.prompt_text.as_deref(),
&target_level.picture_description,
&draft.summary,
);
let should_auto_name_level = payload
.should_auto_name_level
.unwrap_or_else(|| target_level.level_name.trim().is_empty());
if should_auto_name_level {
let naming =
generate_puzzle_first_level_name(state, target_level.picture_description.as_str())
.await;
target_level.level_name = naming.level_name.clone();
target_level.ui_background_prompt = naming.ui_background_prompt.clone();
}
let reference_image_sources = collect_puzzle_reference_image_sources(
payload.reference_image_src.as_deref(),
payload.reference_image_srcs.as_slice(),
payload.reference_image_asset_object_id.as_deref(),
payload.reference_image_asset_object_ids.as_slice(),
);
let primary_reference_image_src = reference_image_sources.first().map(String::as_str);
// 中文注释:拼图结果页从多候选抽卡收口为单图替换,前端传入的旧 candidateCount 只做兼容忽略。
let candidate_start_index = target_level.candidates.len();
let ai_redraw = payload.ai_redraw.unwrap_or(true);
let mut candidates =
if should_use_uploaded_puzzle_image_directly(primary_reference_image_src, ai_redraw) {
vec![
create_uploaded_puzzle_image_candidate(
state,
payload.owner_user_id.as_str(),
&session.session_id,
&target_level.level_name,
&prompt,
primary_reference_image_src.expect("checked reference image"),
candidate_start_index,
)
.await?,
]
} else {
let (_, profile_id) = build_stable_puzzle_work_ids(&session.session_id);
generate_puzzle_image_candidates(
state,
payload.owner_user_id.as_str(),
Some(profile_id.as_str()),
&session.session_id,
&target_level.level_name,
&prompt,
primary_reference_image_src,
ai_redraw,
payload.image_model.as_deref(),
1,
candidate_start_index,
)
.await
.map_err(map_puzzle_generation_endpoint_error)?
};
if candidates.is_empty() {
return Err(
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": PUZZLE_AGENT_API_BASE_PROVIDER,
"message": "拼图候选图生成结果为空",
})),
);
}
if let Some(refined_naming) = generate_puzzle_first_level_name_from_image(
state,
target_level.picture_description.as_str(),
&candidates[0].downloaded_image,
)
.await
.filter(|_| should_auto_name_level)
{
target_level.level_name = refined_naming.level_name.clone();
if refined_naming.ui_background_prompt.is_some() {
target_level.ui_background_prompt = refined_naming.ui_background_prompt.clone();
}
}
let mut updated_levels =
build_puzzle_levels_with_primary_update(&draft, &target_level, primary_reference_image_src);
for candidate in &mut candidates {
candidate.record.prompt = prompt.clone();
}
let selected_candidate = candidates
.iter()
.find(|candidate| candidate.record.selected)
.or_else(|| candidates.first())
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": PUZZLE_AGENT_API_BASE_PROVIDER,
"message": "拼图候选图生成结果为空",
}))
})?;
let asset_bundle = generate_puzzle_level_asset_bundle_required(
state,
request_context,
payload.owner_user_id.as_str(),
&session.session_id,
&target_level,
&selected_candidate.downloaded_image,
)
.await?;
attach_puzzle_level_asset_bundle(
&mut updated_levels,
target_level.level_id.as_str(),
asset_bundle,
);
attach_selected_puzzle_candidate_to_levels(
&mut updated_levels,
target_level.level_id.as_str(),
&selected_candidate.record,
);
let levels_json_with_generated_name =
Some(serialize_puzzle_level_records_for_module(&updated_levels)?);
let candidates_json = serde_json::to_string(
&candidates
.iter()
.map(|candidate| to_puzzle_generated_image_candidate(&candidate.record))
.collect::<Vec<_>>(),
)
.map_err(|error| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": PUZZLE_AGENT_API_BASE_PROVIDER,
"message": format!("拼图候选图序列化失败:{error}"),
}))
})?;
state
.spacetime_client()
.save_puzzle_generated_images(PuzzleGeneratedImagesSaveRecordInput {
session_id: session.session_id.clone(),
owner_user_id: payload.owner_user_id.clone(),
level_id: Some(target_level.level_id.clone()),
levels_json: levels_json_with_generated_name,
candidates_json,
saved_at_micros: now,
external_generation_job_id: external_generation_guard.job_id.clone(),
external_generation_worker_id: external_generation_guard.worker_id.clone(),
external_generation_lease_token: external_generation_guard.lease_token.clone(),
})
.await
.map_err(map_puzzle_client_error)
}
async fn execute_puzzle_generate_ui_background_worker_job_inner(
state: &PuzzleApiState,
request_context: &RequestContext,
payload: &PuzzleGenerateUiBackgroundWorkerPayload,
now: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<PuzzleAgentSessionRecord, AppError> {
let action_payload = payload.to_action_request();
let target_level_id = payload.level_id.clone();
let levels_json = payload.levels_json.clone();
let session = get_puzzle_session_for_image_generation(
state,
payload.session_id.clone(),
payload.owner_user_id.clone(),
&action_payload,
levels_json.as_deref(),
now,
)
.await?;
let mut draft = session.draft.clone().ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": PUZZLE_AGENT_API_BASE_PROVIDER,
"message": "拼图结果页草稿尚未生成",
}))
})?;
if let Some(levels_json) = levels_json.as_ref() {
draft.levels = parse_puzzle_level_records_from_module_json(levels_json)?;
}
let target_level = select_puzzle_level_for_api(&draft, target_level_id.as_deref())?;
let raw_prompt = payload
.prompt_text
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or_default()
.to_string();
let resolved_prompt =
normalize_puzzle_ui_background_prompt(raw_prompt.as_str(), &draft, &target_level);
let generated = generate_puzzle_ui_background_image(
state,
request_context,
payload.owner_user_id.as_str(),
&session.session_id,
&target_level.level_name,
resolved_prompt.as_str(),
)
.await
.map_err(map_puzzle_generation_endpoint_error)?;
state
.spacetime_client()
.save_puzzle_ui_background(PuzzleUiBackgroundSaveRecordInput {
session_id: session.session_id.clone(),
owner_user_id: payload.owner_user_id.clone(),
level_id: Some(target_level.level_id.clone()),
levels_json,
prompt: resolved_prompt.clone(),
image_src: generated.image_src.clone(),
image_object_key: Some(generated.object_key.clone()),
saved_at_micros: now,
external_generation_job_id: external_generation_guard.job_id.clone(),
external_generation_worker_id: external_generation_guard.worker_id.clone(),
external_generation_lease_token: external_generation_guard.lease_token.clone(),
})
.await
.map_err(map_puzzle_client_error)
}
pub(crate) async fn mark_puzzle_level_generation_failure_for_worker(
state: &PuzzleApiState,
payload: &PuzzleGenerateImagesWorkerPayload,
error_message: String,
failed_at_micros: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<(), AppError> {
mark_puzzle_level_generation_failure_for_external_generation(
state,
&payload.session_id,
&payload.owner_user_id,
payload.level_id.clone(),
payload.levels_json.clone(),
error_message,
failed_at_micros,
external_generation_guard,
)
.await
}
async fn mark_puzzle_level_generation_failure_for_external_generation(
state: &PuzzleApiState,
session_id: &str,
owner_user_id: &str,
level_id: Option<String>,
levels_json: Option<String>,
error_message: String,
failed_at_micros: i64,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) -> Result<(), AppError> {
let result = state
.spacetime_client()
.mark_puzzle_level_generation_failed(PuzzleLevelGenerationFailureRecordInput {
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
level_id,
levels_json,
error_message,
failed_at_micros,
external_generation_job_id: external_generation_guard.job_id.clone(),
external_generation_worker_id: external_generation_guard.worker_id.clone(),
external_generation_lease_token: external_generation_guard.lease_token.clone(),
})
.await;
if let Err(error) = result {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %session_id,
owner_user_id = %owner_user_id,
message = %error,
"拼图 worker 关卡生图失败态回写失败"
);
return Err(map_puzzle_client_error(error));
}
Ok(())
}
pub(crate) async fn create_uploaded_puzzle_image_candidate(
state: &PuzzleApiState,
owner_user_id: &str,

File diff suppressed because it is too large Load Diff

View File

@@ -535,6 +535,108 @@ fn puzzle_image_generation_fallback_session_ready_when_asset_pack_complete() {
assert_eq!(session.stage, "ready_to_publish");
}
#[test]
fn puzzle_generate_images_worker_payload_keeps_action_snapshot() {
let raw_levels_json = serde_json::to_string(&vec![json!({
"levelId": "puzzle-level-2",
"levelName": "",
"pictureDescription": "新关卡里有一座发光钟楼。",
"candidates": [],
"selectedCandidateId": null,
"coverImageSrc": null,
"coverAssetId": null,
"generationStatus": "generating",
})])
.expect("levels json");
let levels_json = normalize_puzzle_levels_json_for_module(Some(raw_levels_json.as_str()))
.expect("levels should normalize")
.expect("levels json should exist");
let payload = PuzzleGenerateImagesWorkerPayload {
session_id: "puzzle-session-1".to_string(),
owner_user_id: "user-1".to_string(),
billing_asset_id: "puzzle-session-1:123".to_string(),
level_id: Some("puzzle-level-2".to_string()),
prompt_text: Some("发光钟楼".to_string()),
reference_image_src: None,
reference_image_srcs: vec!["data:image/png;base64,abc".to_string()],
reference_image_asset_object_id: Some("asset-object-1".to_string()),
reference_image_asset_object_ids: vec!["asset-object-2".to_string()],
image_model: Some(PUZZLE_IMAGE_MODEL_GPT_IMAGE_2.to_string()),
ai_redraw: Some(true),
should_auto_name_level: Some(true),
work_title: Some("暖灯猫街作品".to_string()),
work_description: Some("一套雨夜猫街主题拼图。".to_string()),
picture_description: None,
summary: Some("一套雨夜猫街主题拼图。".to_string()),
theme_tags: Some(vec!["猫咪".to_string(), "雨夜".to_string()]),
levels_json: Some(levels_json.clone()),
requested_at_micros: 123,
};
let encoded = serde_json::to_string(&payload).expect("payload should serialize");
let decoded: PuzzleGenerateImagesWorkerPayload =
serde_json::from_str(encoded.as_str()).expect("payload should deserialize");
assert_eq!(decoded.level_id.as_deref(), Some("puzzle-level-2"));
assert_eq!(decoded.reference_image_srcs.len(), 1);
assert_eq!(
decoded.reference_image_asset_object_ids,
vec!["asset-object-2".to_string()]
);
assert_eq!(decoded.should_auto_name_level, Some(true));
let records = parse_puzzle_level_records_from_module_json(
decoded.levels_json.as_deref().expect("levels json"),
)
.expect("levels should parse as module json");
assert_eq!(records[0].level_id, "puzzle-level-2");
assert_eq!(records[0].generation_status, "generating");
}
#[test]
fn puzzle_generate_ui_background_worker_payload_keeps_action_snapshot() {
let raw_levels_json = serde_json::to_string(&vec![json!({
"levelId": "puzzle-level-3",
"levelName": "钟楼回廊",
"pictureDescription": "新关卡里有一座发光钟楼。",
"uiBackgroundPrompt": "发光钟楼延展成竖屏回廊,远处有暖色窗光。",
"candidates": [],
"selectedCandidateId": null,
"coverImageSrc": null,
"coverAssetId": null,
"generationStatus": "generating",
})])
.expect("levels json");
let levels_json = normalize_puzzle_levels_json_for_module(Some(raw_levels_json.as_str()))
.expect("levels should normalize")
.expect("levels json should exist");
let payload = PuzzleGenerateUiBackgroundWorkerPayload {
session_id: "puzzle-session-1".to_string(),
owner_user_id: "user-1".to_string(),
billing_asset_id: "puzzle-session-1:456".to_string(),
level_id: Some("puzzle-level-3".to_string()),
prompt_text: Some("发光钟楼延展成竖屏回廊".to_string()),
levels_json: Some(levels_json.clone()),
requested_at_micros: 456,
};
let encoded = serde_json::to_string(&payload).expect("payload should serialize");
let decoded: PuzzleGenerateUiBackgroundWorkerPayload =
serde_json::from_str(encoded.as_str()).expect("payload should deserialize");
assert_eq!(decoded.level_id.as_deref(), Some("puzzle-level-3"));
assert_eq!(
decoded.prompt_text.as_deref(),
Some("发光钟楼延展成竖屏回廊")
);
assert_eq!(decoded.requested_at_micros, 456);
let records = parse_puzzle_level_records_from_module_json(
decoded.levels_json.as_deref().expect("levels json"),
)
.expect("levels should parse as module json");
assert_eq!(records[0].level_id, "puzzle-level-3");
assert_eq!(records[0].generation_status, "generating");
}
#[test]
fn puzzle_first_level_name_parser_accepts_json_and_normalizes_text() {
assert_eq!(

View File

@@ -11,7 +11,11 @@ use module_assets::{
generate_asset_binding_id, generate_asset_object_id,
};
use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use shared_contracts::external_generation::{
ExternalGenerationJobStatus, ExternalGenerationJobStatusRecord,
};
use shared_contracts::puzzle_clear::{
PuzzleClearActionRequest, PuzzleClearActionType, PuzzleClearCardAsset,
PuzzleClearDraftResponse, PuzzleClearGenerationStatus, PuzzleClearImageAsset,
@@ -22,7 +26,9 @@ use shared_contracts::puzzle_clear::{
PuzzleClearWorkspaceCreateRequest,
};
use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros};
use spacetime_client::SpacetimeClientError;
use spacetime_client::{
ExternalGenerationJobEnqueueRecordInput, ExternalGenerationJobRecord, SpacetimeClientError,
};
use std::{
collections::BTreeMap,
time::{SystemTime, UNIX_EPOCH},
@@ -51,6 +57,7 @@ const PUZZLE_CLEAR_CREATION_PROVIDER: &str = "puzzle-clear-creation";
const PUZZLE_CLEAR_RUNTIME_PROVIDER: &str = "puzzle-clear-runtime";
const PUZZLE_CLEAR_TEMPLATE_ID: &str = "puzzle-clear";
const PUZZLE_CLEAR_TEMPLATE_NAME: &str = "拼消消";
pub(crate) const PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND: &str = "puzzle_clear_compile_draft";
const PUZZLE_CLEAR_RUNTIME_RUNS_ROUTE: &str = "/api/runtime/puzzle-clear/runs";
const PUZZLE_CLEAR_ATLAS_CELL_SIZE: u32 = 256;
const PUZZLE_CLEAR_SHEET_COLUMNS: u32 = 4;
@@ -76,6 +83,15 @@ const PUZZLE_CLEAR_SHEET_INTERNAL_SEAM_SIDE_CONTRAST_THRESHOLD: f32 = 145.0;
const PUZZLE_CLEAR_SHEET_INTERNAL_SEAM_SIDE_TEXTURE_MAX: f32 = 36.0;
const PUZZLE_CLEAR_ATLAS_NEGATIVE_PROMPT: &str = "文字、Logo、水印、按钮、UI 字、网格线、编号、标签、边框、外轮廓框、白色描边、白色贴纸边、圆角框、阴影框、分隔线、裁切参考线、单格内部拼接线、内部竖切、内部横切、照片拼贴、相册拼贴、多场景拼贴、双联图、三联图、画中画、单格双图、单格多图、低清晰度、纯色背景、空白背景、白底商品图、孤立主体、单体素材、素材表、图标、贴纸、同品种重复、同一物体多角度、重复同款小图、主体跨格、主体贴边、拼贴、重影、不同图案互相穿插";
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct PuzzleClearCompileDraftWorkerPayload {
pub session_id: String,
pub owner_user_id: String,
pub author_display_name: String,
pub payload: PuzzleClearActionRequest,
}
pub async fn create_puzzle_clear_session(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
@@ -160,6 +176,39 @@ pub async fn execute_puzzle_clear_action(
.unwrap_or("拼消消玩家")
.to_string();
let mut payload = payload;
let should_queue_generation = matches!(
payload.action_type,
PuzzleClearActionType::CompileDraft | PuzzleClearActionType::RegenerateAtlas
) && !state.config.external_generation_mode.is_inline();
if should_queue_generation {
let mut queued_response = state
.spacetime_client()
.mark_puzzle_clear_generation_queued(
session_id.clone(),
owner_user_id.clone(),
author_display_name.clone(),
payload.clone(),
)
.await
.map_err(|error| {
puzzle_clear_error_response(
&request_context,
PUZZLE_CLEAR_CREATION_PROVIDER,
map_puzzle_clear_client_error(error),
)
})?;
let queue_job = enqueue_puzzle_clear_compile_draft_job(
&state,
&request_context,
&session_id,
owner_user_id.as_str(),
author_display_name.as_str(),
payload,
)
.await?;
queued_response.queue_state = Some(map_puzzle_clear_queue_job_status(queue_job));
return Ok(json_success_body(Some(&request_context), queued_response));
}
if let Err(response) = maybe_prepare_puzzle_clear_assets_inner(
&state,
&request_context,
@@ -210,6 +259,129 @@ pub async fn execute_puzzle_clear_action(
Ok(json_success_body(Some(&request_context), response))
}
async fn enqueue_puzzle_clear_compile_draft_job(
state: &AppState,
request_context: &RequestContext,
session_id: &str,
owner_user_id: &str,
author_display_name: &str,
payload: PuzzleClearActionRequest,
) -> Result<ExternalGenerationJobRecord, Response> {
let job_id = build_prefixed_uuid_id("extgen-");
let now_micros = current_utc_micros();
let request_payload_json = serde_json::to_string(&PuzzleClearCompileDraftWorkerPayload {
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
author_display_name: author_display_name.to_string(),
payload,
})
.map_err(|error| {
puzzle_clear_error_response(
request_context,
PUZZLE_CLEAR_CREATION_PROVIDER,
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({
"message": format!("拼消消 worker 任务参数序列化失败:{error}"),
})),
)
})?;
state
.spacetime_client()
.enqueue_external_generation_job(ExternalGenerationJobEnqueueRecordInput {
dedupe_key: format!("puzzle-clear:compile-draft:{session_id}:{job_id}"),
job_id,
job_kind: PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND.to_string(),
owner_user_id: owner_user_id.to_string(),
source_module: "puzzle-clear".to_string(),
source_entity_id: session_id.to_string(),
request_label: "拼消消草稿生成".to_string(),
request_payload_json,
max_attempts: 1,
available_at_micros: now_micros,
created_at_micros: now_micros,
})
.await
.map_err(|error| {
puzzle_clear_error_response(
request_context,
PUZZLE_CLEAR_CREATION_PROVIDER,
map_puzzle_clear_client_error(error),
)
})
}
fn map_puzzle_clear_queue_job_status(
job: ExternalGenerationJobRecord,
) -> ExternalGenerationJobStatusRecord {
ExternalGenerationJobStatusRecord {
operation_id: job.job_id,
status: ExternalGenerationJobStatus::Queued,
phase_label: job.request_label,
phase_detail: "排队中。".to_string(),
progress: 8,
error: job.last_error_message,
updated_at_micros: job.updated_at_micros,
}
}
pub(crate) async fn execute_puzzle_clear_compile_draft_worker_job(
state: &AppState,
request_context: &RequestContext,
mut worker_payload: PuzzleClearCompileDraftWorkerPayload,
) -> Result<PuzzleClearSessionSnapshotResponse, Response> {
if let Err(response) = maybe_prepare_puzzle_clear_assets_inner(
state,
request_context,
worker_payload.session_id.as_str(),
worker_payload.owner_user_id.as_str(),
&mut worker_payload.payload,
)
.await
{
let (error_message, response) = extract_puzzle_clear_response_error_message(response).await;
tracing::warn!(
provider = PUZZLE_CLEAR_CREATION_PROVIDER,
session_id = worker_payload.session_id,
error = %error_message,
"拼消消 worker 素材生成失败,准备回写 failed 状态"
);
if let Err(writeback_error) = state
.spacetime_client()
.mark_puzzle_clear_generation_failed(
worker_payload.session_id.clone(),
worker_payload.owner_user_id.clone(),
worker_payload.author_display_name.clone(),
worker_payload.payload.clone(),
)
.await
{
tracing::warn!(
provider = PUZZLE_CLEAR_CREATION_PROVIDER,
session_id = worker_payload.session_id,
error = %writeback_error,
"拼消消 worker 失败状态回写失败"
);
}
return Err(response);
}
let response = state
.spacetime_client()
.execute_puzzle_clear_action(
worker_payload.session_id,
worker_payload.owner_user_id,
worker_payload.author_display_name,
worker_payload.payload,
)
.await
.map_err(|error| {
puzzle_clear_error_response(
request_context,
PUZZLE_CLEAR_CREATION_PROVIDER,
map_puzzle_clear_client_error(error),
)
})?;
Ok(response.session)
}
pub async fn list_puzzle_clear_works(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,

View File

@@ -14,7 +14,11 @@ use module_assets::{
build_asset_object_upsert_input, generate_asset_binding_id, generate_asset_object_id,
};
use platform_oss::{LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use shared_contracts::external_generation::{
ExternalGenerationJobStatus, ExternalGenerationJobStatusRecord,
};
use shared_contracts::wooden_fish::{
WoodenFishActionRequest, WoodenFishAudioAsset, WoodenFishCheckpointRunRequest,
WoodenFishDraftResponse, WoodenFishFinishRunRequest, WoodenFishGalleryDetailResponse,
@@ -24,7 +28,9 @@ use shared_contracts::wooden_fish::{
WoodenFishWorkspaceCreateRequest,
};
use shared_kernel::{build_prefixed_uuid_id, format_timestamp_micros};
use spacetime_client::SpacetimeClientError;
use spacetime_client::{
ExternalGenerationJobEnqueueRecordInput, ExternalGenerationJobRecord, SpacetimeClientError,
};
use crate::generated_image_assets::{
GeneratedImageAssetAdapter, GeneratedImageAssetDataUrl,
@@ -54,6 +60,8 @@ const WOODEN_FISH_CREATION_PROVIDER: &str = "wooden-fish-creation";
const WOODEN_FISH_RUNTIME_PROVIDER: &str = "wooden-fish-runtime";
const WOODEN_FISH_TEMPLATE_ID: &str = "wooden-fish";
const WOODEN_FISH_TEMPLATE_NAME: &str = "敲木鱼";
pub(crate) const WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND: &str =
"wooden_fish_generate_image_assets";
const DEFAULT_HIT_OBJECT_PROMPT: &str = "默认敲击物图案,圆润木质质感,透明背景";
const DEFAULT_HIT_OBJECT_ASSET_ID: &str = "wooden-fish-default-hit-object";
const DEFAULT_HIT_OBJECT_IMAGE_SRC: &str = "/wooden-fish/default-hit-object.png";
@@ -73,6 +81,15 @@ const DEFAULT_HIT_OBJECT_REFERENCE_BYTES: &[u8] = include_bytes!(concat!(
));
const WOODEN_FISH_AUTHOR_FALLBACK_DISPLAY_NAME: &str = "玩家";
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct WoodenFishGenerateImageAssetsWorkerPayload {
pub session_id: String,
pub owner_user_id: String,
pub author_display_name: String,
pub payload: WoodenFishActionRequest,
}
pub async fn create_wooden_fish_session(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
@@ -155,6 +172,40 @@ pub async fn execute_wooden_fish_action(
payload.action_type,
shared_contracts::wooden_fish::WoodenFishActionType::CompileDraft
);
let should_queue_generation = matches!(
payload.action_type,
shared_contracts::wooden_fish::WoodenFishActionType::CompileDraft
| shared_contracts::wooden_fish::WoodenFishActionType::RegenerateHitObject
) && !state.config.external_generation_mode.is_inline();
if should_queue_generation {
let mut queued_response = state
.spacetime_client()
.mark_wooden_fish_generation_queued(
session_id.clone(),
owner_user_id.clone(),
author_display_name.clone(),
payload.clone(),
)
.await
.map_err(|error| {
wooden_fish_error_response(
&request_context,
WOODEN_FISH_CREATION_PROVIDER,
map_wooden_fish_client_error(error),
)
})?;
let queue_job = enqueue_wooden_fish_generate_image_assets_job(
&state,
&request_context,
&session_id,
owner_user_id.as_str(),
author_display_name.as_str(),
payload,
)
.await?;
queued_response.queue_state = Some(map_wooden_fish_queue_job_status(queue_job));
return Ok(json_success_body(Some(&request_context), queued_response));
}
let generation_points_cost = if is_compile_draft {
resolve_wooden_fish_generation_points_cost(&state).await
} else {
@@ -226,6 +277,70 @@ pub async fn execute_wooden_fish_action(
Ok(json_success_body(Some(&request_context), response))
}
async fn enqueue_wooden_fish_generate_image_assets_job(
state: &AppState,
request_context: &RequestContext,
session_id: &str,
owner_user_id: &str,
author_display_name: &str,
payload: WoodenFishActionRequest,
) -> Result<ExternalGenerationJobRecord, Response> {
let job_id = build_prefixed_uuid_id("extgen-");
let now_micros = current_utc_micros();
let request_payload_json = serde_json::to_string(&WoodenFishGenerateImageAssetsWorkerPayload {
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
author_display_name: author_display_name.to_string(),
payload,
})
.map_err(|error| {
wooden_fish_error_response(
request_context,
WOODEN_FISH_CREATION_PROVIDER,
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({
"message": format!("敲木鱼 worker 任务参数序列化失败:{error}"),
})),
)
})?;
state
.spacetime_client()
.enqueue_external_generation_job(ExternalGenerationJobEnqueueRecordInput {
dedupe_key: format!("wooden-fish:generate-image-assets:{session_id}:{job_id}"),
job_id,
job_kind: WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND.to_string(),
owner_user_id: owner_user_id.to_string(),
source_module: "wooden-fish".to_string(),
source_entity_id: session_id.to_string(),
request_label: "敲木鱼图片素材生成".to_string(),
request_payload_json,
max_attempts: 1,
available_at_micros: now_micros,
created_at_micros: now_micros,
})
.await
.map_err(|error| {
wooden_fish_error_response(
request_context,
WOODEN_FISH_CREATION_PROVIDER,
map_wooden_fish_client_error(error),
)
})
}
fn map_wooden_fish_queue_job_status(
job: ExternalGenerationJobRecord,
) -> ExternalGenerationJobStatusRecord {
ExternalGenerationJobStatusRecord {
operation_id: job.job_id,
status: ExternalGenerationJobStatus::Queued,
phase_label: job.request_label,
phase_detail: "排队中。".to_string(),
progress: 8,
error: job.last_error_message,
updated_at_micros: job.updated_at_micros,
}
}
pub async fn publish_wooden_fish_work(
State(state): State<AppState>,
Path(profile_id): Path<String>,
@@ -635,6 +750,40 @@ async fn execute_wooden_fish_action_with_generated_assets(
})
}
pub(crate) async fn execute_wooden_fish_generate_image_assets_worker_job(
state: &AppState,
request_context: &RequestContext,
mut worker_payload: WoodenFishGenerateImageAssetsWorkerPayload,
) -> Result<WoodenFishSessionSnapshotResponse, Response> {
let result = execute_wooden_fish_action_with_generated_assets(
state,
request_context,
worker_payload.session_id.as_str(),
worker_payload.owner_user_id.as_str(),
worker_payload.author_display_name.as_str(),
&mut worker_payload.payload,
)
.await;
if result.as_ref().err().is_some_and(|response| {
response.status().is_server_error()
&& matches!(
worker_payload.payload.action_type,
shared_contracts::wooden_fish::WoodenFishActionType::CompileDraft
)
}) {
mark_wooden_fish_generation_failed(
state,
request_context,
worker_payload.session_id.as_str(),
worker_payload.owner_user_id.as_str(),
worker_payload.author_display_name.as_str(),
)
.await;
}
let response = result?;
Ok(response.session)
}
async fn resolve_wooden_fish_generation_points_cost(state: &AppState) -> u64 {
crate::creation_entry_config::resolve_creation_entry_mud_point_cost(
state,