use std::{future::Future, io, pin::Pin, time::Duration}; use axum::extract::FromRef; use serde_json::json; use shared_kernel::offset_datetime_to_unix_micros; use spacetime_client::{ ExternalGenerationJobClaimRecordInput, ExternalGenerationJobCompleteRecordInput, ExternalGenerationJobFailRecordInput, ExternalGenerationJobRecord, ExternalGenerationJobRenewLeaseRecordInput, }; use tokio::{ task::JoinSet, time::{Instant, sleep}, }; use tracing::{error, info, warn}; use crate::{ jump_hop::{ JUMP_HOP_COMPILE_DRAFT_JOB_KIND, JumpHopCompileDraftWorkerPayload, execute_jump_hop_compile_draft_worker_job, }, puzzle::{ ExternalGenerationWriteLeaseGuard, PuzzleCompileDraftWorkerPayload, PuzzleGenerateImagesWorkerPayload, PuzzleGenerateUiBackgroundWorkerPayload, execute_puzzle_compile_draft_worker_job, execute_puzzle_generate_images_worker_job, execute_puzzle_generate_ui_background_worker_job, release_puzzle_compile_background_claim, }, puzzle_clear::{ PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND, PuzzleClearCompileDraftWorkerPayload, execute_puzzle_clear_compile_draft_worker_job, }, request_context::RequestContext, state::{AppState, PuzzleApiState}, wooden_fish::{ WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND, WoodenFishGenerateImageAssetsWorkerPayload, execute_wooden_fish_generate_image_assets_worker_job, }, }; pub(crate) const PUZZLE_COMPILE_DRAFT_JOB_KIND: &str = "puzzle_compile_draft"; pub(crate) const PUZZLE_GENERATE_IMAGES_JOB_KIND: &str = "puzzle_generate_images"; pub(crate) const PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND: &str = "puzzle_generate_ui_background"; pub(crate) async fn run_external_generation_worker(state: AppState) -> Result<(), io::Error> { let worker_id = state.config.external_generation_worker_id.clone(); let concurrency = state.config.external_generation_worker_concurrency.max(1); let poll_interval = state.config.external_generation_worker_poll_interval; let lease = state.config.external_generation_worker_lease; let mut tasks = JoinSet::new(); let mut shutdown = external_generation_worker_shutdown_signal(); info!( worker_id, concurrency, poll_interval_ms = poll_interval.as_millis(), lease_seconds = lease.as_secs(), "external generation worker 已启动" ); loop { while tasks.len() >= concurrency { if await_worker_task_or_shutdown(&mut tasks, &mut shutdown).await { drain_external_generation_worker_tasks(&mut tasks).await; return Ok(()); } } let available = concurrency.saturating_sub(tasks.len()).max(1); let now_micros = current_utc_micros(); let lease_expires_at_micros = now_micros.saturating_add(duration_micros_i64(lease)); let claim_jobs = state.spacetime_client().claim_external_generation_jobs( ExternalGenerationJobClaimRecordInput { worker_id: worker_id.clone(), limit: available.min(u32::MAX as usize) as u32, lease_expires_at_micros, claimed_at_micros: now_micros, }, ); tokio::pin!(claim_jobs); let jobs = match tokio::select! { _ = shutdown.as_mut() => { drain_external_generation_worker_tasks(&mut tasks).await; return Ok(()); } result = &mut claim_jobs => result } { Ok(jobs) => jobs, Err(error) => { error!(error = %error, "领取外部生成任务失败,等待下一轮重试"); if await_one_task_or_sleep_or_shutdown( &mut tasks, sleep(poll_interval), &mut shutdown, ) .await { drain_external_generation_worker_tasks(&mut tasks).await; return Ok(()); } continue; } }; if jobs.is_empty() { if await_one_task_or_sleep_or_shutdown(&mut tasks, sleep(poll_interval), &mut shutdown) .await { drain_external_generation_worker_tasks(&mut tasks).await; return Ok(()); } continue; } for job in jobs { let state = state.clone(); let worker_id = worker_id.clone(); tasks.spawn(async move { if let Err(error) = process_external_generation_job(state, worker_id, lease, job).await { error!(error = %error, "external generation worker 执行任务失败"); } }); } } } type ExternalGenerationShutdownSignal = Pin + Send>>; fn external_generation_worker_shutdown_signal() -> ExternalGenerationShutdownSignal { Box::pin(async { wait_for_external_generation_worker_shutdown_signal().await; }) } #[cfg(unix)] async fn wait_for_external_generation_worker_shutdown_signal() { use tokio::signal::unix::{SignalKind, signal}; let mut sigterm = signal(SignalKind::terminate()).ok(); tokio::select! { result = tokio::signal::ctrl_c() => { if let Err(error) = result { warn!(error = %error, "external generation worker 监听 SIGINT 失败"); } } _ = async { if let Some(sigterm) = sigterm.as_mut() { sigterm.recv().await; } else { std::future::pending::<()>().await; } } => {} } } #[cfg(not(unix))] async fn wait_for_external_generation_worker_shutdown_signal() { if let Err(error) = tokio::signal::ctrl_c().await { warn!(error = %error, "external generation worker 监听 Ctrl-C 失败"); } } async fn await_worker_task(tasks: &mut JoinSet<()>) { if let Some(result) = tasks.join_next().await && let Err(error) = result { error!(error = %error, "external generation worker 子任务 panic"); } } async fn await_worker_task_or_shutdown( tasks: &mut JoinSet<()>, shutdown: &mut ExternalGenerationShutdownSignal, ) -> bool { tokio::select! { _ = shutdown.as_mut() => true, _ = await_worker_task(tasks) => false, } } async fn await_one_task_or_sleep_or_shutdown( tasks: &mut JoinSet<()>, sleeper: impl Future, shutdown: &mut ExternalGenerationShutdownSignal, ) -> bool { tokio::pin!(sleeper); if tasks.is_empty() { tokio::select! { _ = shutdown.as_mut() => true, _ = &mut sleeper => false, } } else { tokio::select! { _ = shutdown.as_mut() => true, _ = &mut sleeper => false, result = tasks.join_next() => { if let Some(Err(error)) = result { error!(error = %error, "external generation worker 子任务 panic"); } false } } } } async fn drain_external_generation_worker_tasks(tasks: &mut JoinSet<()>) { info!( in_flight_jobs = tasks.len(), "external generation worker 收到停机信号,停止领取新任务并等待当前任务完成" ); while !tasks.is_empty() { await_worker_task(tasks).await; } info!("external generation worker 已完成优雅停机"); } async fn process_external_generation_job( state: AppState, worker_id: String, lease: Duration, job: ExternalGenerationJobRecord, ) -> Result<(), String> { let heartbeat_interval = external_generation_worker_heartbeat_interval(lease); let work = process_external_generation_job_once(state.clone(), worker_id.clone(), job.clone()); tokio::pin!(work); let heartbeat = sleep(heartbeat_interval); tokio::pin!(heartbeat); loop { tokio::select! { biased; result = &mut work => return result, _ = &mut heartbeat => { renew_job_lease(&state, &worker_id, &job, lease).await?; heartbeat.as_mut().reset(Instant::now() + heartbeat_interval); } } } } async fn process_external_generation_job_once( state: AppState, worker_id: String, job: ExternalGenerationJobRecord, ) -> Result<(), String> { match job.job_kind.as_str() { PUZZLE_COMPILE_DRAFT_JOB_KIND => { let payload = match serde_json::from_str::( job.request_payload_json.as_str(), ) { Ok(payload) => payload, Err(error) => { let message = format!("拼图生成任务参数解析失败:{error}"); fail_job(&state, &worker_id, &job, message.clone()).await?; return Err(message); } }; let request_context = RequestContext::new( format!("external-generation-worker-{}", job.job_id), format!("external-generation-worker {}", job.job_kind), std::time::Duration::ZERO, false, ); let puzzle_state = PuzzleApiState::from_ref(&state); let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?; match execute_puzzle_compile_draft_worker_job( &puzzle_state, &request_context, payload.clone(), write_guard, ) .await { Ok(session) => { let result = complete_job( &state, &worker_id, &job, Some( json!({ "sessionId": session.session_id, "progressPercent": session.progress_percent, }) .to_string(), ), ) .await; if result.is_ok() { release_puzzle_compile_background_claim(&puzzle_state, &payload); } result } Err(error) => { let message = error.body_text(); let should_release_claim = error.should_fail_queue_job(); let result = fail_queue_job_after_worker_error( &state, &worker_id, &job, &error, &message, ) .await; if result.is_ok() && should_release_claim { release_puzzle_compile_background_claim(&puzzle_state, &payload); } result?; Err(message) } } } PUZZLE_GENERATE_IMAGES_JOB_KIND => { let payload = match serde_json::from_str::( job.request_payload_json.as_str(), ) { Ok(payload) => payload, Err(error) => { let message = format!("拼图关卡图片生成任务参数解析失败:{error}"); fail_job(&state, &worker_id, &job, message.clone()).await?; return Err(message); } }; let request_context = RequestContext::new( format!("external-generation-worker-{}", job.job_id), format!("external-generation-worker {}", job.job_kind), std::time::Duration::ZERO, false, ); let puzzle_state = PuzzleApiState::from_ref(&state); let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?; match execute_puzzle_generate_images_worker_job( &puzzle_state, &request_context, payload, write_guard, ) .await { Ok(session) => { complete_job( &state, &worker_id, &job, Some( json!({ "sessionId": session.session_id, "progressPercent": session.progress_percent, }) .to_string(), ), ) .await } Err(error) => { let message = error.body_text(); fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message) .await?; Err(message) } } } PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND => { let payload = match serde_json::from_str::( job.request_payload_json.as_str(), ) { Ok(payload) => payload, Err(error) => { let message = format!("拼图 UI 背景图生成任务参数解析失败:{error}"); fail_job(&state, &worker_id, &job, message.clone()).await?; return Err(message); } }; let request_context = RequestContext::new( format!("external-generation-worker-{}", job.job_id), format!("external-generation-worker {}", job.job_kind), std::time::Duration::ZERO, false, ); let puzzle_state = PuzzleApiState::from_ref(&state); let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?; match execute_puzzle_generate_ui_background_worker_job( &puzzle_state, &request_context, payload, write_guard, ) .await { Ok(session) => { complete_job( &state, &worker_id, &job, Some( json!({ "sessionId": session.session_id, "progressPercent": session.progress_percent, }) .to_string(), ), ) .await } Err(error) => { let message = error.body_text(); fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message) .await?; Err(message) } } } JUMP_HOP_COMPILE_DRAFT_JOB_KIND => { let payload = match serde_json::from_str::( job.request_payload_json.as_str(), ) { Ok(payload) => payload, Err(error) => { let message = format!("跳一跳生成任务参数解析失败:{error}"); fail_job(&state, &worker_id, &job, message.clone()).await?; return Err(message); } }; let request_context = RequestContext::new( format!("external-generation-worker-{}", job.job_id), format!("external-generation-worker {}", job.job_kind), std::time::Duration::ZERO, false, ); match execute_jump_hop_compile_draft_worker_job(&state, &request_context, payload).await { Ok(session) => { complete_job( &state, &worker_id, &job, Some( json!({ "sessionId": session.session_id, "status": session.status, }) .to_string(), ), ) .await } Err(response) => { let message = response_error_message(response).await; fail_job(&state, &worker_id, &job, message.clone()).await?; Err(message) } } } PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND => { let payload = match serde_json::from_str::( job.request_payload_json.as_str(), ) { Ok(payload) => payload, Err(error) => { let message = format!("拼消消生成任务参数解析失败:{error}"); fail_job(&state, &worker_id, &job, message.clone()).await?; return Err(message); } }; let request_context = RequestContext::new( format!("external-generation-worker-{}", job.job_id), format!("external-generation-worker {}", job.job_kind), std::time::Duration::ZERO, false, ); match execute_puzzle_clear_compile_draft_worker_job(&state, &request_context, payload) .await { Ok(session) => { complete_job( &state, &worker_id, &job, Some( json!({ "sessionId": session.session_id, "status": session.status, }) .to_string(), ), ) .await } Err(response) => { let message = response_error_message(response).await; fail_job(&state, &worker_id, &job, message.clone()).await?; Err(message) } } } WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND => { let payload = match serde_json::from_str::( job.request_payload_json.as_str(), ) { Ok(payload) => payload, Err(error) => { let message = format!("敲木鱼图片生成任务参数解析失败:{error}"); fail_job(&state, &worker_id, &job, message.clone()).await?; return Err(message); } }; let request_context = RequestContext::new( format!("external-generation-worker-{}", job.job_id), format!("external-generation-worker {}", job.job_kind), std::time::Duration::ZERO, false, ); match execute_wooden_fish_generate_image_assets_worker_job( &state, &request_context, payload, ) .await { Ok(session) => { complete_job( &state, &worker_id, &job, Some( json!({ "sessionId": session.session_id, "status": session.status, }) .to_string(), ), ) .await } Err(response) => { let message = response_error_message(response).await; fail_job(&state, &worker_id, &job, message.clone()).await?; Err(message) } } } unknown => { warn!( job_id = job.job_id, job_kind = unknown, "external generation worker 收到暂不支持的任务类型" ); fail_job( &state, &worker_id, &job, format!("暂不支持的外部生成任务类型:{unknown}"), ) .await } } } async fn response_error_message(response: axum::response::Response) -> String { use axum::body::to_bytes; let status = response.status(); let body_bytes = match to_bytes(response.into_body(), 64 * 1024).await { Ok(bytes) => bytes, Err(error) => { return format!("外部生成任务失败:{status},响应读取失败:{error}"); } }; let body_text = String::from_utf8_lossy(&body_bytes).trim().to_string(); if body_text.is_empty() { return format!("外部生成任务失败:{status}"); } if let Ok(body_json) = serde_json::from_str::(&body_text) && let Some(message) = body_json .get("error") .and_then(|error| error.get("message")) .and_then(serde_json::Value::as_str) .map(str::trim) .filter(|message| !message.is_empty()) { return message.to_string(); } body_text } async fn fail_queue_job_after_worker_error( state: &AppState, worker_id: &str, job: &ExternalGenerationJobRecord, error: &crate::puzzle::PuzzleExternalGenerationWorkerError, message: &str, ) -> Result<(), String> { if error.should_fail_queue_job() { return fail_job(state, worker_id, job, message.to_string()).await; } warn!( job_id = job.job_id, job_kind = job.job_kind, "external generation worker 业务失败态尚未写回,保留任务租约等待后续重试" ); Ok(()) } async fn complete_job( state: &AppState, worker_id: &str, job: &ExternalGenerationJobRecord, result_payload_json: Option, ) -> Result<(), String> { state .spacetime_client() .complete_external_generation_job(ExternalGenerationJobCompleteRecordInput { job_id: job.job_id.clone(), worker_id: worker_id.to_string(), lease_token: require_job_lease_token(job)?, result_payload_json, completed_at_micros: current_utc_micros(), }) .await .map(|_| ()) .map_err(|error| error.to_string()) } async fn fail_job( state: &AppState, worker_id: &str, job: &ExternalGenerationJobRecord, error_message: String, ) -> Result<(), String> { let now_micros = current_utc_micros(); state .spacetime_client() .fail_external_generation_job(ExternalGenerationJobFailRecordInput { job_id: job.job_id.clone(), worker_id: worker_id.to_string(), lease_token: require_job_lease_token(job)?, error_message, retry_after_micros: now_micros.saturating_add(60_000_000), failed_at_micros: now_micros, }) .await .map(|_| ()) .map_err(|error| error.to_string()) } async fn renew_job_lease( state: &AppState, worker_id: &str, job: &ExternalGenerationJobRecord, lease: Duration, ) -> Result<(), String> { let now_micros = current_utc_micros(); state .spacetime_client() .renew_external_generation_job_lease(ExternalGenerationJobRenewLeaseRecordInput { job_id: job.job_id.clone(), worker_id: worker_id.to_string(), lease_token: require_job_lease_token(job)?, lease_expires_at_micros: now_micros.saturating_add(duration_micros_i64(lease)), renewed_at_micros: now_micros, }) .await .map(|_| ()) .map_err(|error| error.to_string()) } fn require_job_lease_token(job: &ExternalGenerationJobRecord) -> Result { job.lease_token .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) .map(ToOwned::to_owned) .ok_or_else(|| format!("external_generation_job {} 缺少 lease token", job.job_id)) } fn build_external_generation_write_lease_guard( worker_id: &str, job: &ExternalGenerationJobRecord, ) -> Result { Ok(ExternalGenerationWriteLeaseGuard::from_claimed_job( job.job_id.clone(), worker_id.to_string(), require_job_lease_token(job)?, )) } fn duration_micros_i64(duration: Duration) -> i64 { duration.as_micros().min(i64::MAX as u128) as i64 } fn external_generation_worker_heartbeat_interval(lease: Duration) -> Duration { let heartbeat_millis = (lease.as_millis() / 3).clamp(250, 30_000) as u64; Duration::from_millis(heartbeat_millis) } fn current_utc_micros() -> i64 { offset_datetime_to_unix_micros(time::OffsetDateTime::now_utc()) } #[cfg(test)] mod tests { use super::*; #[test] fn worker_write_guard_uses_claimed_job_lease_token() { let job = external_generation_job_record_fixture(Some("lease-1")); let guard = build_external_generation_write_lease_guard("worker-a", &job) .expect("guard should build"); assert_eq!(guard.job_id.as_deref(), Some("extgen-1")); assert_eq!(guard.worker_id.as_deref(), Some("worker-a")); assert_eq!(guard.lease_token.as_deref(), Some("lease-1")); } #[test] fn worker_write_guard_requires_claimed_job_lease_token() { let job = external_generation_job_record_fixture(None); let error = build_external_generation_write_lease_guard("worker-a", &job) .expect_err("missing token should fail"); assert!(error.contains("缺少 lease token")); } fn external_generation_job_record_fixture( lease_token: Option<&str>, ) -> ExternalGenerationJobRecord { ExternalGenerationJobRecord { job_id: "extgen-1".to_string(), dedupe_key: "puzzle:generate_puzzle_images:session-1:extgen-1".to_string(), job_kind: PUZZLE_GENERATE_IMAGES_JOB_KIND.to_string(), owner_user_id: "user-1".to_string(), source_module: "puzzle".to_string(), source_entity_id: "session-1:puzzle-level-1".to_string(), request_label: "拼图关卡图片生成".to_string(), request_payload_json: "{}".to_string(), status: "running".to_string(), attempt: 1, max_attempts: 1, last_error_message: None, worker_id: Some("worker-a".to_string()), lease_expires_at: Some("2026-06-03T00:00:00Z".to_string()), available_at: "2026-06-03T00:00:00Z".to_string(), result_payload_json: None, created_at: "2026-06-03T00:00:00Z".to_string(), started_at: Some("2026-06-03T00:00:00Z".to_string()), completed_at: None, updated_at: "2026-06-03T00:00:00Z".to_string(), lease_token: lease_token.map(ToOwned::to_owned), } } }