完善外部生成Worker动态扩缩容

新增外部生成controller进程角色与systemd服务

补齐队列统计procedure与spacetime-client绑定

更新生产部署脚本、健康巡检和server provision的worker/controller口径

新增容器worker smoke脚本并同步运维文档与团队记忆
This commit is contained in:
2026-06-12 15:21:35 +08:00
parent 69815d918a
commit 4a6c126366
30 changed files with 2030 additions and 28 deletions

View File

@@ -56,7 +56,7 @@ shared-kernel = { workspace = true }
shared-logging = { workspace = true }
socket2 = { workspace = true }
spacetime-client = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal", "process"] }
tokio-stream = { workspace = true }
futures-util = { workspace = true }
time = { workspace = true, features = ["formatting"] }

View File

@@ -28,6 +28,13 @@ pub struct AppConfig {
pub external_generation_worker_concurrency: usize,
pub external_generation_worker_poll_interval: Duration,
pub external_generation_worker_lease: Duration,
pub external_generation_controller_min_workers: usize,
pub external_generation_controller_max_workers: usize,
pub external_generation_controller_target_jobs_per_worker: usize,
pub external_generation_controller_poll_interval: Duration,
pub external_generation_controller_scale_down_idle_rounds: u32,
pub external_generation_controller_service_template: String,
pub external_generation_controller_dry_run: bool,
pub max_concurrent_requests: Option<usize>,
pub gallery_max_concurrent_requests: Option<usize>,
pub detail_max_concurrent_requests: Option<usize>,
@@ -181,6 +188,7 @@ pub struct AppConfig {
pub enum ProcessRole {
Api,
ExternalGenerationWorker,
ExternalGenerationController,
All,
}
@@ -208,6 +216,7 @@ impl ProcessRole {
match self {
Self::Api => "api",
Self::ExternalGenerationWorker => "external-generation-worker",
Self::ExternalGenerationController => "external-generation-controller",
Self::All => "all",
}
}
@@ -219,6 +228,10 @@ impl ProcessRole {
pub fn runs_external_generation_worker(self) -> bool {
matches!(self, Self::ExternalGenerationWorker | Self::All)
}
pub fn runs_external_generation_controller(self) -> bool {
matches!(self, Self::ExternalGenerationController)
}
}
impl Default for AppConfig {
@@ -234,6 +247,14 @@ impl Default for AppConfig {
external_generation_worker_concurrency: 2,
external_generation_worker_poll_interval: Duration::from_millis(2_000),
external_generation_worker_lease: Duration::from_secs(3_600),
external_generation_controller_min_workers: 1,
external_generation_controller_max_workers: 8,
external_generation_controller_target_jobs_per_worker: 2,
external_generation_controller_poll_interval: Duration::from_millis(10_000),
external_generation_controller_scale_down_idle_rounds: 6,
external_generation_controller_service_template:
"genarrative-external-generation-worker@{}.service".to_string(),
external_generation_controller_dry_run: false,
max_concurrent_requests: None,
gallery_max_concurrent_requests: None,
detail_max_concurrent_requests: None,
@@ -459,6 +480,49 @@ impl AppConfig {
]) {
config.external_generation_worker_lease = Duration::from_secs(lease_seconds.max(1));
}
if let Some(min_workers) =
read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MIN_WORKERS"])
{
config.external_generation_controller_min_workers = min_workers;
}
if let Some(max_workers) =
read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MAX_WORKERS"])
{
config.external_generation_controller_max_workers = max_workers;
}
if config.external_generation_controller_max_workers
< config.external_generation_controller_min_workers
{
config.external_generation_controller_max_workers =
config.external_generation_controller_min_workers;
}
if let Some(target_jobs_per_worker) = read_first_usize_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_TARGET_JOBS_PER_WORKER",
]) {
config.external_generation_controller_target_jobs_per_worker =
target_jobs_per_worker.max(1);
}
if let Some(poll_interval_ms) = read_first_positive_u64_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_POLL_INTERVAL_MS",
]) {
config.external_generation_controller_poll_interval =
Duration::from_millis(poll_interval_ms);
}
if let Some(idle_rounds) = read_first_u32_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SCALE_DOWN_IDLE_ROUNDS",
]) {
config.external_generation_controller_scale_down_idle_rounds = idle_rounds;
}
if let Some(service_template) = read_first_non_empty_env(&[
"GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SERVICE_TEMPLATE",
]) {
config.external_generation_controller_service_template = service_template;
}
if let Some(dry_run) =
read_first_bool_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_DRY_RUN"])
{
config.external_generation_controller_dry_run = dry_run;
}
if let Some(max_concurrent_requests) =
read_first_usize_env(&["GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"])
{
@@ -1214,6 +1278,9 @@ fn parse_process_role(value: &str) -> Option<ProcessRole> {
"external-generation-worker" | "external_generation_worker" | "worker" => {
Some(ProcessRole::ExternalGenerationWorker)
}
"external-generation-controller" | "external_generation_controller" | "controller" => {
Some(ProcessRole::ExternalGenerationController)
}
"all" => Some(ProcessRole::All),
_ => None,
}
@@ -1419,15 +1486,29 @@ mod tests {
parse_process_role("worker"),
Some(ProcessRole::ExternalGenerationWorker)
);
assert_eq!(
parse_process_role("controller"),
Some(ProcessRole::ExternalGenerationController)
);
assert_eq!(
parse_process_role("'external_generation_controller'"),
Some(ProcessRole::ExternalGenerationController)
);
assert_eq!(parse_process_role("all"), Some(ProcessRole::All));
assert_eq!(parse_process_role("unknown"), None);
assert!(ProcessRole::Api.runs_http());
assert!(!ProcessRole::Api.runs_external_generation_worker());
assert!(!ProcessRole::Api.runs_external_generation_controller());
assert!(!ProcessRole::ExternalGenerationWorker.runs_http());
assert!(ProcessRole::ExternalGenerationWorker.runs_external_generation_worker());
assert!(!ProcessRole::ExternalGenerationWorker.runs_external_generation_controller());
assert!(!ProcessRole::ExternalGenerationController.runs_http());
assert!(!ProcessRole::ExternalGenerationController.runs_external_generation_worker());
assert!(ProcessRole::ExternalGenerationController.runs_external_generation_controller());
assert!(ProcessRole::All.runs_http());
assert!(ProcessRole::All.runs_external_generation_worker());
assert!(!ProcessRole::All.runs_external_generation_controller());
}
#[test]

View File

@@ -0,0 +1,465 @@
use std::{collections::BTreeSet, future::Future, io, pin::Pin, process::Stdio, time::Duration};
use spacetime_client::ExternalGenerationQueueStatsRecord;
use tokio::{
process::Command,
time::{Instant, sleep},
};
use tracing::{error, info, warn};
use crate::state::AppState;
#[derive(Clone, Debug)]
struct ExternalGenerationWorkerControllerConfig {
min_workers: usize,
max_workers: usize,
target_jobs_per_worker: usize,
poll_interval: Duration,
scale_down_idle_rounds: u32,
service_template: String,
dry_run: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct ExternalGenerationWorkerControllerDecision {
desired_workers: usize,
should_scale_down: bool,
idle_rounds: u32,
}
#[derive(Debug, Default)]
struct ExternalGenerationWorkerControllerState {
idle_rounds: u32,
}
pub(crate) async fn run_external_generation_worker_controller(
state: AppState,
) -> Result<(), io::Error> {
let config = ExternalGenerationWorkerControllerConfig::from_state(&state);
let mut controller_state = ExternalGenerationWorkerControllerState::default();
let mut shutdown = external_generation_controller_shutdown_signal();
info!(
min_workers = config.min_workers,
max_workers = config.max_workers,
target_jobs_per_worker = config.target_jobs_per_worker,
poll_interval_ms = config.poll_interval.as_millis(),
scale_down_idle_rounds = config.scale_down_idle_rounds,
service_template = config.service_template,
dry_run = config.dry_run,
"external generation worker controller 已启动"
);
loop {
let tick = run_external_generation_controller_tick(&state, &config, &mut controller_state);
tokio::select! {
_ = shutdown.as_mut() => {
info!("external generation worker controller 收到停机信号");
return Ok(());
}
result = tick => {
if let Err(error) = result {
error!(error = %error, "external generation worker controller 本轮扩缩容失败");
}
}
}
let next_tick = sleep(config.poll_interval);
tokio::pin!(next_tick);
tokio::select! {
_ = shutdown.as_mut() => {
info!("external generation worker controller 收到停机信号");
return Ok(());
}
_ = &mut next_tick => {}
}
}
}
async fn run_external_generation_controller_tick(
state: &AppState,
config: &ExternalGenerationWorkerControllerConfig,
controller_state: &mut ExternalGenerationWorkerControllerState,
) -> Result<(), String> {
let stats = state
.spacetime_client()
.get_external_generation_queue_stats()
.await
.map_err(|error| format!("读取 external_generation_job 队列统计失败:{error}"))?;
let active_instances = list_active_external_generation_worker_instances(config).await?;
let current_workers = active_instances.len();
let decision = decide_external_generation_worker_target(
&stats,
current_workers,
controller_state.idle_rounds,
config,
);
controller_state.idle_rounds = decision.idle_rounds;
info!(
pending = stats.pending_count,
delayed_pending = stats.delayed_pending_count,
claimable = stats.claimable_count,
running_active = stats.running_active_count,
expired_running = stats.expired_running_count,
oldest_claimable_age_ms = stats.oldest_claimable_age_micros.unwrap_or(0) / 1_000,
current_workers,
desired_workers = decision.desired_workers,
idle_rounds = decision.idle_rounds,
"external generation worker controller 完成队列评估"
);
reconcile_external_generation_worker_instances(config, &active_instances, &decision).await
}
fn decide_external_generation_worker_target(
stats: &ExternalGenerationQueueStatsRecord,
current_workers: usize,
previous_idle_rounds: u32,
config: &ExternalGenerationWorkerControllerConfig,
) -> ExternalGenerationWorkerControllerDecision {
let pressure = stats
.claimable_pending_count
.saturating_add(stats.running_active_count)
.saturating_add(stats.expired_running_count);
let desired_from_pressure =
ceil_div_usize(pressure as usize, config.target_jobs_per_worker.max(1));
let desired_workers = desired_from_pressure.clamp(config.min_workers, config.max_workers);
let is_idle = stats.claimable_count == 0
&& stats.expired_running_count == 0
&& stats.running_active_count == 0
&& desired_workers <= config.min_workers;
let idle_rounds = if is_idle {
previous_idle_rounds.saturating_add(1)
} else {
0
};
let should_scale_down = current_workers > desired_workers
&& idle_rounds >= config.scale_down_idle_rounds
&& config.scale_down_idle_rounds > 0;
ExternalGenerationWorkerControllerDecision {
desired_workers,
should_scale_down,
idle_rounds,
}
}
async fn reconcile_external_generation_worker_instances(
config: &ExternalGenerationWorkerControllerConfig,
active_instances: &BTreeSet<usize>,
decision: &ExternalGenerationWorkerControllerDecision,
) -> Result<(), String> {
let current_workers = active_instances.len();
let mut started = 0usize;
for instance in 1..=config.max_workers {
if current_workers.saturating_add(started) >= decision.desired_workers {
break;
}
if !active_instances.contains(&instance) {
systemctl_worker_instance(config, "start", instance).await?;
started = started.saturating_add(1);
}
}
if decision.desired_workers > current_workers && started == 0 {
warn!(
current_workers,
desired_workers = decision.desired_workers,
"external generation worker controller 未找到可启动的缺口实例"
);
}
if started > 0 {
return Ok(());
}
if decision.should_scale_down && decision.desired_workers < current_workers {
if let Some(instance) = active_instances
.iter()
.rev()
.copied()
.find(|instance| *instance > config.min_workers.max(1))
{
systemctl_worker_instance(config, "stop", instance).await?;
}
}
Ok(())
}
async fn list_active_external_generation_worker_instances(
config: &ExternalGenerationWorkerControllerConfig,
) -> Result<BTreeSet<usize>, String> {
let mut active_instances = BTreeSet::new();
for instance in 1..=config.max_workers {
if is_external_generation_worker_instance_active(config, instance).await? {
active_instances.insert(instance);
}
}
Ok(active_instances)
}
async fn is_external_generation_worker_instance_active(
config: &ExternalGenerationWorkerControllerConfig,
instance: usize,
) -> Result<bool, String> {
let service = format_worker_service_name(&config.service_template, instance)?;
if config.dry_run {
return Ok(instance <= config.min_workers);
}
let output = Command::new("systemctl")
.arg("is-active")
.arg("--quiet")
.arg(&service)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.output()
.await
.map_err(|error| format!("执行 systemctl is-active {service} 失败:{error}"))?;
Ok(output.status.success())
}
async fn systemctl_worker_instance(
config: &ExternalGenerationWorkerControllerConfig,
action: &str,
instance: usize,
) -> Result<(), String> {
let service = format_worker_service_name(&config.service_template, instance)?;
if config.dry_run {
info!(
action,
service, "external generation worker controller dry-run 跳过 systemctl"
);
return Ok(());
}
let started_at = Instant::now();
let output = Command::new("systemctl")
.arg(action)
.arg(&service)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.await
.map_err(|error| format!("执行 systemctl {action} {service} 失败:{error}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"systemctl {action} {service} 返回失败 status={} stderr={}",
output.status, stderr
));
}
info!(
action,
service,
elapsed_ms = started_at.elapsed().as_millis(),
"external generation worker controller 已执行 systemctl"
);
Ok(())
}
fn format_worker_service_name(template: &str, instance: usize) -> Result<String, String> {
let instance = instance.to_string();
if template.contains("{}") {
return Ok(template.replacen("{}", &instance, 1));
}
if template.contains("%i") {
return Ok(template.replacen("%i", &instance, 1));
}
Err("external generation controller service template 必须包含 {} 或 %i".to_string())
}
fn ceil_div_usize(value: usize, divisor: usize) -> usize {
if value == 0 {
0
} else {
value.saturating_add(divisor.saturating_sub(1)) / divisor.max(1)
}
}
impl ExternalGenerationWorkerControllerConfig {
fn from_state(state: &AppState) -> Self {
let min_workers = state.config.external_generation_controller_min_workers;
let max_workers = state
.config
.external_generation_controller_max_workers
.max(min_workers);
Self {
min_workers,
max_workers,
target_jobs_per_worker: state
.config
.external_generation_controller_target_jobs_per_worker
.max(1),
poll_interval: state.config.external_generation_controller_poll_interval,
scale_down_idle_rounds: state
.config
.external_generation_controller_scale_down_idle_rounds,
service_template: state
.config
.external_generation_controller_service_template
.clone(),
dry_run: state.config.external_generation_controller_dry_run,
}
}
}
type ExternalGenerationControllerShutdownSignal = Pin<Box<dyn Future<Output = ()> + Send>>;
fn external_generation_controller_shutdown_signal() -> ExternalGenerationControllerShutdownSignal {
Box::pin(async {
wait_for_external_generation_controller_shutdown_signal().await;
})
}
#[cfg(unix)]
async fn wait_for_external_generation_controller_shutdown_signal() {
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).ok();
tokio::select! {
result = tokio::signal::ctrl_c() => {
if let Err(error) = result {
warn!(error = %error, "external generation worker controller 监听 SIGINT 失败");
}
}
_ = async {
if let Some(sigterm) = sigterm.as_mut() {
sigterm.recv().await;
} else {
std::future::pending::<()>().await;
}
} => {}
}
}
#[cfg(not(unix))]
async fn wait_for_external_generation_controller_shutdown_signal() {
if let Err(error) = tokio::signal::ctrl_c().await {
warn!(error = %error, "external generation worker controller 监听 Ctrl-C 失败");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn scales_up_to_max_when_queue_pressure_is_high() {
let config = controller_config_fixture();
let stats = stats_fixture(120, 0, 8);
let decision = decide_external_generation_worker_target(&stats, 1, 0, &config);
assert_eq!(decision.desired_workers, 8);
assert!(!decision.should_scale_down);
assert_eq!(decision.idle_rounds, 0);
}
#[test]
fn scale_down_requires_consecutive_idle_rounds() {
let config = controller_config_fixture();
let stats = stats_fixture(0, 0, 0);
let first = decide_external_generation_worker_target(&stats, 5, 0, &config);
let ready = decide_external_generation_worker_target(
&stats,
5,
config.scale_down_idle_rounds.saturating_sub(1),
&config,
);
assert_eq!(first.desired_workers, config.min_workers);
assert!(!first.should_scale_down);
assert!(ready.should_scale_down);
}
#[test]
fn running_jobs_hold_capacity_before_scale_down() {
let config = controller_config_fixture();
let stats = stats_fixture(0, 6, 0);
let decision = decide_external_generation_worker_target(&stats, 5, 5, &config);
assert_eq!(decision.desired_workers, 3);
assert!(!decision.should_scale_down);
assert_eq!(decision.idle_rounds, 0);
}
#[test]
fn expired_running_jobs_are_not_counted_twice_as_claimable_pressure() {
let config = controller_config_fixture();
let stats = stats_fixture(0, 0, 3);
let decision = decide_external_generation_worker_target(&stats, 1, 0, &config);
assert_eq!(decision.desired_workers, 2);
assert!(!decision.should_scale_down);
}
#[test]
fn formats_worker_service_name_with_supported_templates() {
assert_eq!(
format_worker_service_name("genarrative-external-generation-worker@{}.service", 3)
.expect("format"),
"genarrative-external-generation-worker@3.service"
);
assert_eq!(
format_worker_service_name("worker@%i.service", 7).expect("format"),
"worker@7.service"
);
assert!(format_worker_service_name("worker.service", 1).is_err());
}
#[tokio::test]
async fn dry_run_reconcile_does_not_start_low_number_gaps_when_capacity_is_enough() {
let config = controller_config_fixture();
let active_instances = BTreeSet::from([3usize, 4usize]);
let decision = ExternalGenerationWorkerControllerDecision {
desired_workers: 2,
should_scale_down: false,
idle_rounds: 0,
};
let result =
reconcile_external_generation_worker_instances(&config, &active_instances, &decision)
.await;
assert!(result.is_ok());
}
fn controller_config_fixture() -> ExternalGenerationWorkerControllerConfig {
ExternalGenerationWorkerControllerConfig {
min_workers: 1,
max_workers: 8,
target_jobs_per_worker: 2,
poll_interval: Duration::from_secs(10),
scale_down_idle_rounds: 3,
service_template: "genarrative-external-generation-worker@{}.service".to_string(),
dry_run: true,
}
}
fn stats_fixture(
claimable_pending_count: u32,
running_active_count: u32,
expired_running_count: u32,
) -> ExternalGenerationQueueStatsRecord {
let claimable_count = claimable_pending_count.saturating_add(expired_running_count);
ExternalGenerationQueueStatsRecord {
pending_count: claimable_pending_count,
delayed_pending_count: 0,
claimable_pending_count,
running_active_count,
expired_running_count,
terminal_count: 0,
claimable_count,
oldest_claimable_age_micros: None,
now_micros: 0,
}
}
}

View File

@@ -41,6 +41,7 @@ mod edutainment_baby_object;
mod error_middleware;
mod external_api_audit;
mod external_generation_worker;
mod external_generation_worker_controller;
pub(crate) mod generated_asset_sheets;
mod generated_image_assets;
mod health;
@@ -116,6 +117,7 @@ use crate::{
app::{build_router, build_spacetime_unavailable_router},
config::AppConfig,
external_generation_worker::run_external_generation_worker,
external_generation_worker_controller::run_external_generation_worker_controller,
state::{AppState, AppStateInitError},
tracking_outbox::TrackingOutbox,
wallet_refund_outbox::WalletRefundOutbox,
@@ -188,9 +190,18 @@ async fn run_worker_only(config: AppConfig) -> Result<(), io::Error> {
spawn_app_state_background_workers(&state);
info!(
process_role = process_role.as_str(),
"api-server 以 worker 角色启动"
"api-server 以非 HTTP 角色启动"
);
run_external_generation_worker(state).await
if process_role.runs_external_generation_worker() {
run_external_generation_worker(state).await
} else if process_role.runs_external_generation_controller() {
run_external_generation_worker_controller(state).await
} else {
Err(io::Error::other(format!(
"不支持的非 HTTP 进程角色:{}",
process_role.as_str()
)))
}
}
async fn run_http_role(config: AppConfig) -> Result<(), io::Error> {

View File

@@ -126,4 +126,23 @@ impl SpacetimeClient {
)
.await
}
pub async fn get_external_generation_queue_stats(
&self,
) -> Result<ExternalGenerationQueueStatsRecord, SpacetimeClientError> {
self.call_after_connect(
"get_external_generation_queue_stats_and_return",
move |connection, sender| {
connection
.procedures()
.get_external_generation_queue_stats_and_return_then(move |_, result| {
let mapped = result
.map_err(SpacetimeClientError::from_sdk_error)
.and_then(map_external_generation_queue_stats_result);
send_once(&sender, mapped);
});
},
)
.await
}
}

View File

@@ -33,12 +33,13 @@ pub use mapper::{
CustomWorldWorkSummaryRecord, ExternalGenerationJobClaimRecordInput,
ExternalGenerationJobCompleteRecordInput, ExternalGenerationJobEnqueueRecordInput,
ExternalGenerationJobFailRecordInput, ExternalGenerationJobRecord,
ExternalGenerationJobRenewLeaseRecordInput, JumpHopActionRequest, JumpHopActionResponse,
JumpHopActionType, JumpHopCharacterAsset, JumpHopDifficulty, JumpHopDraftResponse,
JumpHopGalleryCardResponse, JumpHopGalleryDetailResponse, JumpHopGalleryResponse,
JumpHopGenerationStatus, JumpHopJumpRequest, JumpHopJumpResponse, JumpHopJumpResult,
JumpHopLastJump, JumpHopPath, JumpHopPlatform, JumpHopRestartRunRequest, JumpHopRunResponse,
JumpHopRunStatus, JumpHopRuntimeRunSnapshotResponse, JumpHopScoring, JumpHopSessionResponse,
ExternalGenerationJobRenewLeaseRecordInput, ExternalGenerationQueueStatsRecord,
JumpHopActionRequest, JumpHopActionResponse, JumpHopActionType, JumpHopCharacterAsset,
JumpHopDifficulty, JumpHopDraftResponse, JumpHopGalleryCardResponse,
JumpHopGalleryDetailResponse, JumpHopGalleryResponse, JumpHopGenerationStatus,
JumpHopJumpRequest, JumpHopJumpResponse, JumpHopJumpResult, JumpHopLastJump, JumpHopPath,
JumpHopPlatform, JumpHopRestartRunRequest, JumpHopRunResponse, JumpHopRunStatus,
JumpHopRuntimeRunSnapshotResponse, JumpHopScoring, JumpHopSessionResponse,
JumpHopSessionSnapshotResponse, JumpHopStartRunRequest, JumpHopStylePreset, JumpHopTileAsset,
JumpHopTileType, JumpHopWorkDetailResponse, JumpHopWorkMutationResponse,
JumpHopWorkProfileResponse, JumpHopWorkSummaryResponse, JumpHopWorksResponse,

View File

@@ -73,6 +73,7 @@ pub use self::external_generation::{
ExternalGenerationJobClaimRecordInput, ExternalGenerationJobCompleteRecordInput,
ExternalGenerationJobEnqueueRecordInput, ExternalGenerationJobFailRecordInput,
ExternalGenerationJobRecord, ExternalGenerationJobRenewLeaseRecordInput,
ExternalGenerationQueueStatsRecord,
};
pub use self::jump_hop::{
JumpHopActionRequest, JumpHopActionResponse, JumpHopActionType, JumpHopCharacterAsset,
@@ -186,6 +187,7 @@ pub(crate) use self::custom_world::{
};
pub(crate) use self::external_generation::{
map_external_generation_job_claim_result, map_external_generation_job_procedure_result,
map_external_generation_queue_stats_result,
};
pub(crate) use self::inventory::{
map_runtime_inventory_state_procedure_result, map_runtime_item_reward_item_snapshot,

View File

@@ -94,6 +94,30 @@ pub(crate) fn map_external_generation_job_claim_result(
.collect())
}
pub(crate) fn map_external_generation_queue_stats_result(
result: ExternalGenerationQueueStatsProcedureResult,
) -> Result<ExternalGenerationQueueStatsRecord, SpacetimeClientError> {
if !result.ok {
return Err(SpacetimeClientError::procedure_failed(result.error_message));
}
let stats = result.stats.ok_or_else(|| {
SpacetimeClientError::missing_snapshot("external_generation queue stats 快照")
})?;
Ok(ExternalGenerationQueueStatsRecord {
pending_count: stats.pending_count,
delayed_pending_count: stats.delayed_pending_count,
claimable_pending_count: stats.claimable_pending_count,
running_active_count: stats.running_active_count,
expired_running_count: stats.expired_running_count,
terminal_count: stats.terminal_count,
claimable_count: stats.claimable_count,
oldest_claimable_age_micros: stats.oldest_claimable_age_micros,
now_micros: stats.now_micros,
})
}
fn map_external_generation_job_snapshot(
snapshot: ExternalGenerationJobSnapshot,
) -> ExternalGenerationJobRecord {
@@ -199,3 +223,16 @@ pub struct ExternalGenerationJobRecord {
pub updated_at: String,
pub lease_token: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExternalGenerationQueueStatsRecord {
pub pending_count: u32,
pub delayed_pending_count: u32,
pub claimable_pending_count: u32,
pub running_active_count: u32,
pub expired_running_count: u32,
pub terminal_count: u32,
pub claimable_count: u32,
pub oldest_claimable_age_micros: Option<i64>,
pub now_micros: i64,
}

View File

@@ -360,6 +360,8 @@ pub mod external_generation_job_renew_lease_input_type;
pub mod external_generation_job_snapshot_type;
pub mod external_generation_job_table;
pub mod external_generation_job_type;
pub mod external_generation_queue_stats_procedure_result_type;
pub mod external_generation_queue_stats_snapshot_type;
pub mod fail_ai_task_and_return_procedure;
pub mod fail_external_generation_job_and_return_procedure;
pub mod finalize_big_fish_agent_message_turn_procedure;
@@ -386,6 +388,7 @@ pub mod get_custom_world_agent_session_procedure;
pub mod get_custom_world_gallery_detail_by_code_procedure;
pub mod get_custom_world_gallery_detail_procedure;
pub mod get_custom_world_library_detail_procedure;
pub mod get_external_generation_queue_stats_and_return_procedure;
pub mod get_jump_hop_agent_session_procedure;
pub mod get_jump_hop_leaderboard_procedure;
pub mod get_jump_hop_run_procedure;
@@ -1491,6 +1494,8 @@ pub use external_generation_job_renew_lease_input_type::ExternalGenerationJobRen
pub use external_generation_job_snapshot_type::ExternalGenerationJobSnapshot;
pub use external_generation_job_table::*;
pub use external_generation_job_type::ExternalGenerationJob;
pub use external_generation_queue_stats_procedure_result_type::ExternalGenerationQueueStatsProcedureResult;
pub use external_generation_queue_stats_snapshot_type::ExternalGenerationQueueStatsSnapshot;
pub use fail_ai_task_and_return_procedure::fail_ai_task_and_return;
pub use fail_external_generation_job_and_return_procedure::fail_external_generation_job_and_return;
pub use finalize_big_fish_agent_message_turn_procedure::finalize_big_fish_agent_message_turn;
@@ -1517,6 +1522,7 @@ pub use get_custom_world_agent_session_procedure::get_custom_world_agent_session
pub use get_custom_world_gallery_detail_by_code_procedure::get_custom_world_gallery_detail_by_code;
pub use get_custom_world_gallery_detail_procedure::get_custom_world_gallery_detail;
pub use get_custom_world_library_detail_procedure::get_custom_world_library_detail;
pub use get_external_generation_queue_stats_and_return_procedure::get_external_generation_queue_stats_and_return;
pub use get_jump_hop_agent_session_procedure::get_jump_hop_agent_session;
pub use get_jump_hop_leaderboard_procedure::get_jump_hop_leaderboard;
pub use get_jump_hop_run_procedure::get_jump_hop_run;

View File

@@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::external_generation_queue_stats_snapshot_type::ExternalGenerationQueueStatsSnapshot;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct ExternalGenerationQueueStatsProcedureResult {
pub ok: bool,
pub stats: Option<ExternalGenerationQueueStatsSnapshot>,
pub error_message: Option<String>,
}
impl __sdk::InModule for ExternalGenerationQueueStatsProcedureResult {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,23 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct ExternalGenerationQueueStatsSnapshot {
pub pending_count: u32,
pub delayed_pending_count: u32,
pub claimable_pending_count: u32,
pub running_active_count: u32,
pub expired_running_count: u32,
pub terminal_count: u32,
pub claimable_count: u32,
pub oldest_claimable_age_micros: Option<i64>,
pub now_micros: i64,
}
impl __sdk::InModule for ExternalGenerationQueueStatsSnapshot {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,54 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::external_generation_queue_stats_procedure_result_type::ExternalGenerationQueueStatsProcedureResult;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct GetExternalGenerationQueueStatsAndReturnArgs {}
impl __sdk::InModule for GetExternalGenerationQueueStatsAndReturnArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `get_external_generation_queue_stats_and_return`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait get_external_generation_queue_stats_and_return {
fn get_external_generation_queue_stats_and_return(&self) {
self.get_external_generation_queue_stats_and_return_then(|_, _| {});
}
fn get_external_generation_queue_stats_and_return_then(
&self,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<ExternalGenerationQueueStatsProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
);
}
impl get_external_generation_queue_stats_and_return for super::RemoteProcedures {
fn get_external_generation_queue_stats_and_return_then(
&self,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<ExternalGenerationQueueStatsProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
) {
self.imp
.invoke_procedure_with_callback::<_, ExternalGenerationQueueStatsProcedureResult>(
"get_external_generation_queue_stats_and_return",
GetExternalGenerationQueueStatsAndReturnArgs {},
__callback,
);
}
}

View File

@@ -137,6 +137,27 @@ pub struct ExternalGenerationJobProcedureResult {
pub error_message: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct ExternalGenerationQueueStatsSnapshot {
pub pending_count: u32,
pub delayed_pending_count: u32,
pub claimable_pending_count: u32,
pub running_active_count: u32,
pub expired_running_count: u32,
// 中文注释:保留字段兼容已生成 bindingscontroller 只按非终态队列压力扩缩容,不每轮扫描历史终态任务。
pub terminal_count: u32,
pub claimable_count: u32,
pub oldest_claimable_age_micros: Option<i64>,
pub now_micros: i64,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct ExternalGenerationQueueStatsProcedureResult {
pub ok: bool,
pub stats: Option<ExternalGenerationQueueStatsSnapshot>,
pub error_message: Option<String>,
}
#[spacetimedb::procedure]
pub fn enqueue_external_generation_job_and_return(
ctx: &mut ProcedureContext,
@@ -197,6 +218,24 @@ pub fn fail_external_generation_job_and_return(
}
}
#[spacetimedb::procedure]
pub fn get_external_generation_queue_stats_and_return(
ctx: &mut ProcedureContext,
) -> ExternalGenerationQueueStatsProcedureResult {
match ctx.try_with_tx(|tx| get_external_generation_queue_stats_tx(tx)) {
Ok(stats) => ExternalGenerationQueueStatsProcedureResult {
ok: true,
stats: Some(stats),
error_message: None,
},
Err(message) => ExternalGenerationQueueStatsProcedureResult {
ok: false,
stats: None,
error_message: Some(message),
},
}
}
fn enqueue_external_generation_job_tx(
ctx: &ReducerContext,
input: ExternalGenerationJobEnqueueInput,
@@ -427,6 +466,58 @@ fn fail_external_generation_job_tx(
Ok(map_external_generation_job_row(row))
}
fn get_external_generation_queue_stats_tx(
ctx: &ReducerContext,
) -> Result<ExternalGenerationQueueStatsSnapshot, String> {
let now = ctx.timestamp;
let now_micros = now.to_micros_since_unix_epoch();
let mut stats = ExternalGenerationQueueStatsSnapshot {
pending_count: 0,
delayed_pending_count: 0,
claimable_pending_count: 0,
running_active_count: 0,
expired_running_count: 0,
terminal_count: 0,
claimable_count: 0,
oldest_claimable_age_micros: None,
now_micros,
};
for row in ctx
.db
.external_generation_job()
.by_external_generation_job_status_available()
.filter(&EXTERNAL_GENERATION_STATUS_PENDING.to_string())
{
stats.pending_count = stats.pending_count.saturating_add(1);
if is_external_generation_job_claimable(&row, now) {
stats.claimable_pending_count = stats.claimable_pending_count.saturating_add(1);
record_external_generation_claimable_age(&mut stats, &row, now_micros);
} else {
stats.delayed_pending_count = stats.delayed_pending_count.saturating_add(1);
}
}
for row in ctx
.db
.external_generation_job()
.by_external_generation_job_status_available()
.filter(&EXTERNAL_GENERATION_STATUS_RUNNING.to_string())
{
if is_external_generation_job_claimable(&row, now) {
stats.expired_running_count = stats.expired_running_count.saturating_add(1);
record_external_generation_claimable_age(&mut stats, &row, now_micros);
} else {
stats.running_active_count = stats.running_active_count.saturating_add(1);
}
}
stats.claimable_count = stats
.claimable_pending_count
.saturating_add(stats.expired_running_count);
Ok(stats)
}
pub(crate) fn validate_external_generation_job_lease_for_tx(
ctx: &ReducerContext,
job_id: &str,
@@ -524,6 +615,22 @@ fn is_external_generation_job_claimable(row: &ExternalGenerationJob, now: Timest
}
}
fn record_external_generation_claimable_age(
stats: &mut ExternalGenerationQueueStatsSnapshot,
row: &ExternalGenerationJob,
now_micros: i64,
) {
let age = now_micros
.saturating_sub(row.available_at.to_micros_since_unix_epoch())
.max(0);
stats.oldest_claimable_age_micros = Some(
stats
.oldest_claimable_age_micros
.map(|current| current.max(age))
.unwrap_or(age),
);
}
fn persist_external_generation_job_row(ctx: &ReducerContext, row: ExternalGenerationJob) {
ctx.db
.external_generation_job()
@@ -725,6 +832,30 @@ mod tests {
assert_ne!(first, second);
}
#[test]
fn claimable_age_keeps_oldest_available_job() {
let mut stats = ExternalGenerationQueueStatsSnapshot {
pending_count: 0,
delayed_pending_count: 0,
claimable_pending_count: 0,
running_active_count: 0,
expired_running_count: 0,
terminal_count: 0,
claimable_count: 0,
oldest_claimable_age_micros: None,
now_micros: 10_000,
};
let mut old_job = external_generation_job_fixture(EXTERNAL_GENERATION_STATUS_PENDING);
old_job.available_at = micros(1_000);
let mut newer_job = external_generation_job_fixture(EXTERNAL_GENERATION_STATUS_RUNNING);
newer_job.available_at = micros(8_000);
record_external_generation_claimable_age(&mut stats, &newer_job, 10_000);
record_external_generation_claimable_age(&mut stats, &old_job, 10_000);
assert_eq!(stats.oldest_claimable_age_micros, Some(9_000));
}
#[test]
fn positive_duration_between_client_times_is_preserved() {
assert_eq!(