扩展外部生成Worker队列

新增外部生成队列概览和单任务状态契约

将跳一跳、拼消消、敲木鱼图片生成动作接入worker队列

前端生成等待页展示当前任务和队列数量

更新外部生成worker运维文档和团队决策记录
This commit is contained in:
2026-06-12 23:15:55 +08:00
parent 3bccfd1a83
commit 951caac32d
43 changed files with 1913 additions and 67 deletions

View File

@@ -15,14 +15,26 @@ use tokio::{
use tracing::{error, info, warn};
use crate::{
jump_hop::{
JUMP_HOP_COMPILE_DRAFT_JOB_KIND, JumpHopCompileDraftWorkerPayload,
execute_jump_hop_compile_draft_worker_job,
},
puzzle::{
ExternalGenerationWriteLeaseGuard, PuzzleCompileDraftWorkerPayload,
PuzzleGenerateImagesWorkerPayload, PuzzleGenerateUiBackgroundWorkerPayload,
execute_puzzle_compile_draft_worker_job, execute_puzzle_generate_images_worker_job,
execute_puzzle_generate_ui_background_worker_job, release_puzzle_compile_background_claim,
},
puzzle_clear::{
PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND, PuzzleClearCompileDraftWorkerPayload,
execute_puzzle_clear_compile_draft_worker_job,
},
request_context::RequestContext,
state::{AppState, PuzzleApiState},
wooden_fish::{
WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND, WoodenFishGenerateImageAssetsWorkerPayload,
execute_wooden_fish_generate_image_assets_worker_job,
},
};
pub(crate) const PUZZLE_COMPILE_DRAFT_JOB_KIND: &str = "puzzle_compile_draft";
@@ -395,6 +407,135 @@ async fn process_external_generation_job_once(
}
}
}
JUMP_HOP_COMPILE_DRAFT_JOB_KIND => {
let payload = match serde_json::from_str::<JumpHopCompileDraftWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("跳一跳生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
match execute_jump_hop_compile_draft_worker_job(&state, &request_context, payload).await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"status": session.status,
})
.to_string(),
),
)
.await
}
Err(response) => {
let message = response_error_message(response).await;
fail_job(&state, &worker_id, &job, message.clone()).await?;
Err(message)
}
}
}
PUZZLE_CLEAR_COMPILE_DRAFT_JOB_KIND => {
let payload = match serde_json::from_str::<PuzzleClearCompileDraftWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("拼消消生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
match execute_puzzle_clear_compile_draft_worker_job(&state, &request_context, payload)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"status": session.status,
})
.to_string(),
),
)
.await
}
Err(response) => {
let message = response_error_message(response).await;
fail_job(&state, &worker_id, &job, message.clone()).await?;
Err(message)
}
}
}
WOODEN_FISH_GENERATE_IMAGE_ASSETS_JOB_KIND => {
let payload = match serde_json::from_str::<WoodenFishGenerateImageAssetsWorkerPayload>(
job.request_payload_json.as_str(),
) {
Ok(payload) => payload,
Err(error) => {
let message = format!("敲木鱼图片生成任务参数解析失败:{error}");
fail_job(&state, &worker_id, &job, message.clone()).await?;
return Err(message);
}
};
let request_context = RequestContext::new(
format!("external-generation-worker-{}", job.job_id),
format!("external-generation-worker {}", job.job_kind),
std::time::Duration::ZERO,
false,
);
match execute_wooden_fish_generate_image_assets_worker_job(
&state,
&request_context,
payload,
)
.await
{
Ok(session) => {
complete_job(
&state,
&worker_id,
&job,
Some(
json!({
"sessionId": session.session_id,
"status": session.status,
})
.to_string(),
),
)
.await
}
Err(response) => {
let message = response_error_message(response).await;
fail_job(&state, &worker_id, &job, message.clone()).await?;
Err(message)
}
}
}
unknown => {
warn!(
job_id = job.job_id,
@@ -412,6 +553,32 @@ async fn process_external_generation_job_once(
}
}
async fn response_error_message(response: axum::response::Response) -> String {
use axum::body::to_bytes;
let status = response.status();
let body_bytes = match to_bytes(response.into_body(), 64 * 1024).await {
Ok(bytes) => bytes,
Err(error) => {
return format!("外部生成任务失败:{status},响应读取失败:{error}");
}
};
let body_text = String::from_utf8_lossy(&body_bytes).trim().to_string();
if body_text.is_empty() {
return format!("外部生成任务失败:{status}");
}
if let Ok(body_json) = serde_json::from_str::<serde_json::Value>(&body_text)
&& let Some(message) = body_json
.get("error")
.and_then(|error| error.get("message"))
.and_then(serde_json::Value::as_str)
.map(str::trim)
.filter(|message| !message.is_empty())
{
return message.to_string();
}
body_text
}
async fn fail_queue_job_after_worker_error(
state: &AppState,
worker_id: &str,