feat: workerize external generation

This commit is contained in:
2026-06-05 17:29:08 +08:00
parent 5150925947
commit 8d54ea3374
60 changed files with 5285 additions and 700 deletions

View File

@@ -0,0 +1,572 @@
use std::{future::Future, io, pin::Pin, time::Duration};
use axum::extract::FromRef;
use serde_json::json;
use shared_kernel::offset_datetime_to_unix_micros;
use spacetime_client::{
ExternalGenerationJobClaimRecordInput, ExternalGenerationJobCompleteRecordInput,
ExternalGenerationJobFailRecordInput, ExternalGenerationJobRecord,
ExternalGenerationJobRenewLeaseRecordInput,
};
use tokio::{
task::JoinSet,
time::{Instant, sleep},
};
use tracing::{error, info, warn};
use crate::{
puzzle::{
ExternalGenerationWriteLeaseGuard, PuzzleCompileDraftWorkerPayload,
PuzzleGenerateImagesWorkerPayload, PuzzleGenerateUiBackgroundWorkerPayload,
execute_puzzle_compile_draft_worker_job, execute_puzzle_generate_images_worker_job,
execute_puzzle_generate_ui_background_worker_job,
},
request_context::RequestContext,
state::{AppState, PuzzleApiState},
};
pub(crate) const PUZZLE_COMPILE_DRAFT_JOB_KIND: &str = "puzzle_compile_draft";
pub(crate) const PUZZLE_GENERATE_IMAGES_JOB_KIND: &str = "puzzle_generate_images";
pub(crate) const PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND: &str = "puzzle_generate_ui_background";
pub(crate) async fn run_external_generation_worker(state: AppState) -> Result<(), io::Error> {
let worker_id = state.config.external_generation_worker_id.clone();
let concurrency = state.config.external_generation_worker_concurrency.max(1);
let poll_interval = state.config.external_generation_worker_poll_interval;
let lease = state.config.external_generation_worker_lease;
let mut tasks = JoinSet::new();
let mut shutdown = external_generation_worker_shutdown_signal();
info!(
worker_id,
concurrency,
poll_interval_ms = poll_interval.as_millis(),
lease_seconds = lease.as_secs(),
"external generation worker 已启动"
);
loop {
while tasks.len() >= concurrency {
if await_worker_task_or_shutdown(&mut tasks, &mut shutdown).await {
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
}
let available = concurrency.saturating_sub(tasks.len()).max(1);
let now_micros = current_utc_micros();
let lease_expires_at_micros = now_micros.saturating_add(duration_micros_i64(lease));
let claim_jobs = state.spacetime_client().claim_external_generation_jobs(
ExternalGenerationJobClaimRecordInput {
worker_id: worker_id.clone(),
limit: available.min(u32::MAX as usize) as u32,
lease_expires_at_micros,
claimed_at_micros: now_micros,
},
);
tokio::pin!(claim_jobs);
let jobs = match tokio::select! {
_ = shutdown.as_mut() => {
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
result = &mut claim_jobs => result
} {
Ok(jobs) => jobs,
Err(error) => {
error!(error = %error, "领取外部生成任务失败,等待下一轮重试");
if await_one_task_or_sleep_or_shutdown(
&mut tasks,
sleep(poll_interval),
&mut shutdown,
)
.await
{
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
continue;
}
};
if jobs.is_empty() {
if await_one_task_or_sleep_or_shutdown(&mut tasks, sleep(poll_interval), &mut shutdown)
.await
{
drain_external_generation_worker_tasks(&mut tasks).await;
return Ok(());
}
continue;
}
for job in jobs {
let state = state.clone();
let worker_id = worker_id.clone();
tasks.spawn(async move {
if let Err(error) =
process_external_generation_job(state, worker_id, lease, job).await
{
error!(error = %error, "external generation worker 执行任务失败");
}
});
}
}
}
type ExternalGenerationShutdownSignal = Pin<Box<dyn Future<Output = ()> + Send>>;
fn external_generation_worker_shutdown_signal() -> ExternalGenerationShutdownSignal {
Box::pin(async {
wait_for_external_generation_worker_shutdown_signal().await;
})
}
#[cfg(unix)]
async fn wait_for_external_generation_worker_shutdown_signal() {
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).ok();
tokio::select! {
result = tokio::signal::ctrl_c() => {
if let Err(error) = result {
warn!(error = %error, "external generation worker 监听 SIGINT 失败");
}
}
_ = async {
if let Some(sigterm) = sigterm.as_mut() {
sigterm.recv().await;
} else {
std::future::pending::<()>().await;
}
} => {}
}
}
#[cfg(not(unix))]
async fn wait_for_external_generation_worker_shutdown_signal() {
if let Err(error) = tokio::signal::ctrl_c().await {
warn!(error = %error, "external generation worker 监听 Ctrl-C 失败");
}
}
async fn await_worker_task(tasks: &mut JoinSet<()>) {
if let Some(result) = tasks.join_next().await
&& let Err(error) = result
{
error!(error = %error, "external generation worker 子任务 panic");
}
}
async fn await_worker_task_or_shutdown(
tasks: &mut JoinSet<()>,
shutdown: &mut ExternalGenerationShutdownSignal,
) -> bool {
tokio::select! {
_ = shutdown.as_mut() => true,
_ = await_worker_task(tasks) => false,
}
}
async fn await_one_task_or_sleep_or_shutdown(
tasks: &mut JoinSet<()>,
sleeper: impl Future<Output = ()>,
shutdown: &mut ExternalGenerationShutdownSignal,
) -> bool {
tokio::pin!(sleeper);
if tasks.is_empty() {
tokio::select! {
_ = shutdown.as_mut() => true,
_ = &mut sleeper => false,
}
} else {
tokio::select! {
_ = shutdown.as_mut() => true,
_ = &mut sleeper => false,
result = tasks.join_next() => {
if let Some(Err(error)) = result {
error!(error = %error, "external generation worker 子任务 panic");
}
false
}
}
}
}
async fn drain_external_generation_worker_tasks(tasks: &mut JoinSet<()>) {
info!(
in_flight_jobs = tasks.len(),
"external generation worker 收到停机信号,停止领取新任务并等待当前任务完成"
);
while !tasks.is_empty() {
await_worker_task(tasks).await;
}
info!("external generation worker 已完成优雅停机");
}
async fn process_external_generation_job(
state: AppState,
worker_id: String,
lease: Duration,
job: ExternalGenerationJobRecord,
) -> Result<(), String> {
let heartbeat_interval = external_generation_worker_heartbeat_interval(lease);
let work = process_external_generation_job_once(state.clone(), worker_id.clone(), job.clone());
tokio::pin!(work);
let heartbeat = sleep(heartbeat_interval);
tokio::pin!(heartbeat);
loop {
tokio::select! {
biased;
result = &mut work => return result,
_ = &mut heartbeat => {
renew_job_lease(&state, &worker_id, &job, lease).await?;
heartbeat.as_mut().reset(Instant::now() + heartbeat_interval);
}
}
}
}
async fn process_external_generation_job_once(
state: AppState,
worker_id: String,
job: ExternalGenerationJobRecord,
) -> Result<(), String> {
match job.job_kind.as_str() {
PUZZLE_COMPILE_DRAFT_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleCompileDraftWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼图生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
let puzzle_state = PuzzleApiState::from_ref(&state);
let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?;
match execute_puzzle_compile_draft_worker_job(
&puzzle_state,
&request_context,
payload,
write_guard,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"progressPercent": session.progress_percent,
})
.to_string(),
),
)
.await
}
Err(error) => {
let message = error.body_text();
fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message)
.await?;
Err(message)
}
}
}
PUZZLE_GENERATE_IMAGES_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleGenerateImagesWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼图关卡图片生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
let puzzle_state = PuzzleApiState::from_ref(&state);
let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?;
match execute_puzzle_generate_images_worker_job(
&puzzle_state,
&request_context,
payload,
write_guard,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"progressPercent": session.progress_percent,
})
.to_string(),
),
)
.await
}
Err(error) => {
let message = error.body_text();
fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message)
.await?;
Err(message)
}
}
}
PUZZLE_GENERATE_UI_BACKGROUND_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleGenerateUiBackgroundWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼图 UI 背景图生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
let puzzle_state = PuzzleApiState::from_ref(&state);
let write_guard = build_external_generation_write_lease_guard(&worker_id, &job)?;
match execute_puzzle_generate_ui_background_worker_job(
&puzzle_state,
&request_context,
payload,
write_guard,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"progressPercent": session.progress_percent,
})
.to_string(),
),
)
.await
}
Err(error) => {
let message = error.body_text();
fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message)
.await?;
Err(message)
}
}
}
unknown => {
warn!(
job_id = job.job_id,
job_kind = unknown,
"external generation worker 收到暂不支持的任务类型"
);
fail_job(
&state,
&worker_id,
&job,
format!("暂不支持的外部生成任务类型:{unknown}"),
)
.await
}
}
}
async fn fail_queue_job_after_worker_error(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
error: &crate::puzzle::PuzzleExternalGenerationWorkerError,
message: &str,
) -> Result<(), String> {
if error.should_fail_queue_job() {
return fail_job(state, worker_id, job, message.to_string()).await;
}
warn!(
job_id = job.job_id,
job_kind = job.job_kind,
"external generation worker 业务失败态尚未写回,保留任务租约等待后续重试"
);
Ok(())
}
async fn complete_job(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
result_payload_json: Option<String>,
) -> Result<(), String> {
state
.spacetime_client()
.complete_external_generation_job(ExternalGenerationJobCompleteRecordInput {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
result_payload_json,
completed_at_micros: current_utc_micros(),
})
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
async fn fail_job(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
error_message: String,
) -> Result<(), String> {
let now_micros = current_utc_micros();
state
.spacetime_client()
.fail_external_generation_job(ExternalGenerationJobFailRecordInput {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
error_message,
retry_after_micros: now_micros.saturating_add(60_000_000),
failed_at_micros: now_micros,
})
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
async fn renew_job_lease(
state: &AppState,
worker_id: &str,
job: &ExternalGenerationJobRecord,
lease: Duration,
) -> Result<(), String> {
let now_micros = current_utc_micros();
state
.spacetime_client()
.renew_external_generation_job_lease(ExternalGenerationJobRenewLeaseRecordInput {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
lease_expires_at_micros: now_micros.saturating_add(duration_micros_i64(lease)),
renewed_at_micros: now_micros,
})
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
fn require_job_lease_token(job: &ExternalGenerationJobRecord) -> Result<String, String> {
job.lease_token
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.ok_or_else(|| format!("external_generation_job {} 缺少 lease token", job.job_id))
}
fn build_external_generation_write_lease_guard(
worker_id: &str,
job: &ExternalGenerationJobRecord,
) -> Result<ExternalGenerationWriteLeaseGuard, String> {
Ok(ExternalGenerationWriteLeaseGuard {
job_id: job.job_id.clone(),
worker_id: worker_id.to_string(),
lease_token: require_job_lease_token(job)?,
})
}
fn duration_micros_i64(duration: Duration) -> i64 {
duration.as_micros().min(i64::MAX as u128) as i64
}
fn external_generation_worker_heartbeat_interval(lease: Duration) -> Duration {
let heartbeat_millis = (lease.as_millis() / 3).clamp(250, 30_000) as u64;
Duration::from_millis(heartbeat_millis)
}
fn current_utc_micros() -> i64 {
offset_datetime_to_unix_micros(time::OffsetDateTime::now_utc())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn worker_write_guard_uses_claimed_job_lease_token() {
let job = external_generation_job_record_fixture(Some("lease-1"));
let guard = build_external_generation_write_lease_guard("worker-a", &job)
.expect("guard should build");
assert_eq!(guard.job_id, "extgen-1");
assert_eq!(guard.worker_id, "worker-a");
assert_eq!(guard.lease_token, "lease-1");
}
#[test]
fn worker_write_guard_requires_claimed_job_lease_token() {
let job = external_generation_job_record_fixture(None);
let error = build_external_generation_write_lease_guard("worker-a", &job)
.expect_err("missing token should fail");
assert!(error.contains("缺少 lease token"));
}
fn external_generation_job_record_fixture(
lease_token: Option<&str>,
) -> ExternalGenerationJobRecord {
ExternalGenerationJobRecord {
job_id: "extgen-1".to_string(),
dedupe_key: "puzzle:generate_puzzle_images:session-1:extgen-1".to_string(),
job_kind: PUZZLE_GENERATE_IMAGES_JOB_KIND.to_string(),
owner_user_id: "user-1".to_string(),
source_module: "puzzle".to_string(),
source_entity_id: "session-1:puzzle-level-1".to_string(),
request_label: "拼图关卡图片生成".to_string(),
request_payload_json: "{}".to_string(),
status: "running".to_string(),
attempt: 1,
max_attempts: 1,
last_error_message: None,
worker_id: Some("worker-a".to_string()),
lease_expires_at: Some("2026-06-03T00:00:00Z".to_string()),
available_at: "2026-06-03T00:00:00Z".to_string(),
result_payload_json: None,
created_at: "2026-06-03T00:00:00Z".to_string(),
started_at: Some("2026-06-03T00:00:00Z".to_string()),
completed_at: None,
updated_at: "2026-06-03T00:00:00Z".to_string(),
lease_token: lease_token.map(ToOwned::to_owned),
}
}
}