From 4a6c126366a48d91b7b126b06d6ceaae4469b267 Mon Sep 17 00:00:00 2001 From: kdletters Date: Fri, 12 Jun 2026 15:21:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=A4=96=E9=83=A8=E7=94=9F?= =?UTF-8?q?=E6=88=90Worker=E5=8A=A8=E6=80=81=E6=89=A9=E7=BC=A9=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增外部生成controller进程角色与systemd服务 补齐队列统计procedure与spacetime-client绑定 更新生产部署脚本、健康巡检和server provision的worker/controller口径 新增容器worker smoke脚本并同步运维文档与团队记忆 --- .dockerignore | 1 + .gitignore | 1 + .hermes/shared-memory/decision-log.md | 3 +- .hermes/shared-memory/development-workflow.md | 10 + .hermes/shared-memory/pitfalls.md | 6 +- deploy/container/README.md | 58 ++ deploy/container/docker-compose.loadtest.yml | 6 +- ...external-generation-controller.env.example | 13 + ...ive-external-generation-controller.service | 27 + ...端架构】外部生成Worker化方案-2026-06-03.md | 21 +- ...发运维】本地开发验证与生产运维-2026-05-15.md | 10 +- package.json | 1 + scripts/check-production-ops-guardrails.mjs | 40 + scripts/container-worker-smoke.mjs | 839 ++++++++++++++++++ scripts/deploy/production-api-deploy.sh | 52 +- scripts/jenkins-server-provision.sh | 41 +- scripts/ops/production-health-patrol.mjs | 62 ++ server-rs/crates/api-server/Cargo.toml | 2 +- server-rs/crates/api-server/src/config.rs | 81 ++ .../external_generation_worker_controller.rs | 465 ++++++++++ server-rs/crates/api-server/src/main.rs | 15 +- .../src/external_generation.rs | 19 + server-rs/crates/spacetime-client/src/lib.rs | 13 +- .../crates/spacetime-client/src/mapper.rs | 2 + .../src/mapper/external_generation.rs | 37 + .../spacetime-client/src/module_bindings.rs | 6 + ...ation_queue_stats_procedure_result_type.rs | 19 + ...al_generation_queue_stats_snapshot_type.rs | 23 + ...ration_queue_stats_and_return_procedure.rs | 54 ++ .../src/external_generation.rs | 131 +++ 30 files changed, 2030 insertions(+), 28 deletions(-) create mode 100644 deploy/env/external-generation-controller.env.example create mode 100644 deploy/systemd/genarrative-external-generation-controller.service create mode 100644 scripts/container-worker-smoke.mjs create mode 100644 server-rs/crates/api-server/src/external_generation_worker_controller.rs create mode 100644 server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_procedure_result_type.rs create mode 100644 server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_snapshot_type.rs create mode 100644 server-rs/crates/spacetime-client/src/module_bindings/get_external_generation_queue_stats_and_return_procedure.rs diff --git a/.dockerignore b/.dockerignore index 422a9ea0..35a52aba 100644 --- a/.dockerignore +++ b/.dockerignore @@ -22,6 +22,7 @@ tmp .env.secrets.* spacetime.local.json deploy/container/api-server.env +deploy/container/worker-smoke server-rs/target server-rs/target-* diff --git a/.gitignore b/.gitignore index 2885233d..9a953a92 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,7 @@ temp*build*/ .env.secrets.local spacetime.local.json deploy/container/api-server.env +deploy/container/worker-smoke/ # Local load-test data extracted from private migration files scripts/loadtest/data/*.local.json diff --git a/.hermes/shared-memory/decision-log.md b/.hermes/shared-memory/decision-log.md index 6571817d..65dba7cf 100644 --- a/.hermes/shared-memory/decision-log.md +++ b/.hermes/shared-memory/decision-log.md @@ -174,7 +174,8 @@ - 背景:拼图首图、图集、音频等外部生成链路长期占用 `api-server` HTTP handler,导致扩容只能放大 API 进程,且 HTTP 超时和外部 provider 波动会直接影响创作入口。 - 决策:外部生成任务统一进入 SpacetimeDB `external_generation_job` 持久队列,由 `api-server` 的 `external-generation-worker` 进程角色 claim lease 后执行;HTTP 角色只做鉴权、表单/状态初始化、入队和返回 `queued/running/completed/failed` 操作状态。生产通过 systemd worker 模板增加实例数或提高 `GENARRATIVE_EXTERNAL_GENERATION_WORKER_CONCURRENCY` 动态扩缩容,`GENARRATIVE_PROCESS_ROLE=all` 仅用于本地 smoke。拼图 `compile_puzzle_draft`、结果页 `generate_puzzle_images` 与 `generate_puzzle_ui_background` 已接入 worker;业务写回必须在 SpacetimeDB transaction 内校验 `external_generation_job` 的 `job_id + worker_id + lease_token`、job kind、owner 和 source entity,其中首图 worker 的前置 `compile_puzzle_agent_draft` 也必须带 guard。worker 核心业务写回失败不能返回内存快照并把 job 标成 completed;失败态业务写回成功后才能把 job 标成 failed,失败态未写回则保留租约等待后续重领。拼图业务失败不自动重试,只保留 lease 过期后的崩溃重领,避免钱包扣退费幂等漂移。生产发布会启用默认 `genarrative-external-generation-worker@1.service` 并等待 worker active,worker 停机时停止 claim 新任务并 drain 当前任务。 - 2026-06-07 追加:`GENARRATIVE_EXTERNAL_GENERATION_MODE` 使用 `queue|inline` 显式策略;生产和容器扩缩容验证保持 `queue`。本地开发若需要同步等待结果,应通过 `.env.local` 或本机环境显式配置为 `inline`,由 HTTP handler 复用同一 worker executor 直接返回 `completed`,不创建 `external_generation_job`,不支持 worker 动态扩缩容;脚本不得硬编码该策略。拼图写回 guard 字段改为可选,queue 路径仍必须完整校验 `job_id + worker_id + lease_token`;inline 路径只允许三项同时为空,半空 guard 仍拒绝。 -- 影响范围:`server-rs/crates/spacetime-module/src/external_generation.rs`、`server-rs/crates/spacetime-client/src/external_generation.rs`、`server-rs/crates/api-server/src/external_generation_worker.rs`、`deploy/systemd/genarrative-external-generation-worker@.service`、`scripts/deploy/production-api-deploy.sh`、`scripts/jenkins-server-provision.sh`、拼图 `compile_puzzle_draft`、拼图 `generate_puzzle_images`、拼图 `generate_puzzle_ui_background`、生产 env 模板和运维文档。 +- 2026-06-11 追加:生产新增固定 `external-generation-controller` 进程角色和 `genarrative-external-generation-controller.service`。controller 只读取 `get_external_generation_queue_stats_and_return` 队列统计并管理 `genarrative-external-generation-worker@N.service`,不监听 HTTP、不执行外部生成任务;默认保留 `@1`,按 `claimable_pending + running_active + expired_running` 计算目标实例数,上限由 `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MAX_WORKERS` 控制,缩容需要连续空闲轮数且每轮只停最高编号一个实例。 +- 影响范围:`server-rs/crates/spacetime-module/src/external_generation.rs`、`server-rs/crates/spacetime-client/src/external_generation.rs`、`server-rs/crates/api-server/src/external_generation_worker.rs`、`server-rs/crates/api-server/src/external_generation_worker_controller.rs`、`deploy/systemd/genarrative-external-generation-worker@.service`、`deploy/systemd/genarrative-external-generation-controller.service`、`deploy/env/external-generation-controller.env.example`、`scripts/deploy/production-api-deploy.sh`、`scripts/jenkins-server-provision.sh`、拼图 `compile_puzzle_draft`、拼图 `generate_puzzle_images`、拼图 `generate_puzzle_ui_background`、生产 env 模板和运维文档。 - 验证方式:`npm run spacetime:generate`、`npm run check:spacetime-schema`、`npm run check:server-rs-ddd`、`cargo check -p api-server --manifest-path server-rs/Cargo.toml`,并在 queue 模式下用 `GENARRATIVE_PROCESS_ROLE=all npm run dev` smoke 至少一次 queued -> worker 完成链路;本地 inline 排查只确认不创建 `external_generation_job`。 - 关联文档:`docs/technical/【后端架构】外部生成Worker化方案-2026-06-03.md`、`docs/【开发运维】本地开发验证与生产运维-2026-05-15.md`、`docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md`。 diff --git a/.hermes/shared-memory/development-workflow.md b/.hermes/shared-memory/development-workflow.md index 2e7cab4e..ca86734c 100644 --- a/.hermes/shared-memory/development-workflow.md +++ b/.hermes/shared-memory/development-workflow.md @@ -120,6 +120,16 @@ npm run server-manager:panel npm run dev:spacetime:logs ``` +本机隔离验证外部生成 worker 队列、API-only 更新和 worker 动态扩缩容时,优先使用: + +```bash +npm run container:worker-smoke -- smoke +``` + +该命令生成 `deploy/container/worker-smoke/` 下的 gitignored env 与端口 state,启动独立 compose project 和独立 SpacetimeDB,用 unsupported job 验证 worker claim / fail 回写;排查时用 `api-update` 确认 API 重建不触碰 worker,用 `scale ` 调整 worker 数量。 +`external_generation_job` 是 private table,worker-smoke 通过 worker 日志里的 job_id 和 unsupported 记录确认消费,不通过 CLI SQL 查询队列表。 +worker-smoke 默认把本机 `spacetime` CLI 打成轻量 SpacetimeDB 镜像,避免首次 smoke 依赖官方大镜像下载。若容器内 Cargo 下载依赖不稳定,追加 `--local-binary`,让容器内 Cargo 复用本机 Cargo 缓存构建当前 `api-server` 二进制,并把产物放进 Debian bookworm smoke runtime;可用 `GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE` 覆盖运行时基础镜像;隔离端口或库数据需要重建时追加 `--force`。 + 后台管理前端: ```bash diff --git a/.hermes/shared-memory/pitfalls.md b/.hermes/shared-memory/pitfalls.md index 52a1fbcb..c5f7d8bd 100644 --- a/.hermes/shared-memory/pitfalls.md +++ b/.hermes/shared-memory/pitfalls.md @@ -27,9 +27,9 @@ - 现象:拼图首关生成接口返回 `queued`,但生成页长时间不完成,重启 `genarrative-api.service` 也没有推进任务。 - 原因:HTTP 角色只入队,不再直接调用外部 provider;如果没有运行 `GENARRATIVE_PROCESS_ROLE=external-generation-worker` 或 `all` 的进程,`external_generation_job` 会停留在 `pending/running`,直到有 worker claim。 -- 处理:生产用 `systemctl enable --now genarrative-external-generation-worker@1.service` 启动至少一个 worker;首次 API deploy 会在默认 worker pattern 下自动启用并启动 `@1`,并等待 worker active。扩容继续启动 `@2.service` 等实例,缩容停止多余实例;worker 收到停机信号后会停止 claim 新任务并等待当前任务完成。本地 smoke 可临时用 `GENARRATIVE_PROCESS_ROLE=all npm run dev`;本地若只想同步排查可通过 `.env.local` 或本机环境设置 `GENARRATIVE_EXTERNAL_GENERATION_MODE=inline`,但这不会创建 job,也不能验证 worker 扩缩容。 -- 验证:`systemctl status 'genarrative-external-generation-worker@*.service'` 能看到 worker 实例;queue 模式下任务被 claim 后 `worker_id` 与 `lease_expires_at` 会更新,完成后 session 进入 ready 或 failed;inline 模式下不应产生新的 `external_generation_job`。 -- 关联:`deploy/systemd/genarrative-external-generation-worker@.service`、`deploy/env/external-generation-worker.env.example`、`server-rs/crates/spacetime-module/src/external_generation.rs`、`docs/【开发运维】本地开发验证与生产运维-2026-05-15.md`。 +- 处理:生产用 `systemctl enable --now genarrative-external-generation-worker@1.service genarrative-external-generation-controller.service` 启动保底 worker 和 controller;首次 API deploy 会在默认 worker pattern 下自动启用并启动 `@1`、等待 worker active,并重启验活 controller。扩容默认交给 controller 按队列统计启动 `@2.service` 等实例,手动扩缩容只作为兜底;worker 收到停机信号后会停止 claim 新任务并等待当前任务完成。本地 smoke 可临时用 `GENARRATIVE_PROCESS_ROLE=all npm run dev`;本地若只想同步排查可通过 `.env.local` 或本机环境设置 `GENARRATIVE_EXTERNAL_GENERATION_MODE=inline`,但这不会创建 job,也不能验证 worker 扩缩容。 +- 验证:`systemctl status genarrative-external-generation-controller.service 'genarrative-external-generation-worker@*.service'` 能看到 controller 和 worker 实例;queue 模式下任务被 claim 后 `worker_id` 与 `lease_expires_at` 会更新,完成后 session 进入 ready 或 failed;inline 模式下不应产生新的 `external_generation_job`。 +- 关联:`deploy/systemd/genarrative-external-generation-worker@.service`、`deploy/systemd/genarrative-external-generation-controller.service`、`deploy/env/external-generation-controller.env.example`、`server-rs/crates/spacetime-module/src/external_generation.rs`、`docs/【开发运维】本地开发验证与生产运维-2026-05-15.md`。 ## 外部生成 worker 业务写回必须同事务校验 lease guard diff --git a/deploy/container/README.md b/deploy/container/README.md index cd493ab1..0baccf5f 100644 --- a/deploy/container/README.md +++ b/deploy/container/README.md @@ -97,6 +97,64 @@ npm run container:up -- --scale external-generation-worker=1 external-generation 动态扩缩容验证必须保持 `GENARRATIVE_EXTERNAL_GENERATION_MODE=queue`;`inline` 模式下生成请求由 `api-server` 同步执行,不会被这些 worker 实例消费。 +### 外部生成 Worker 隔离 Smoke + +如果只想在本机隔离验证 worker 模式,不复用 `deploy/container/api-server.env`,使用专用脚本: + +```bash +npm run container:worker-smoke -- smoke +``` + +该脚本会生成 gitignored 的 `deploy/container/worker-smoke/api-server.env` 与端口 state,使用独立 compose project、独立 SpacetimeDB 数据卷和独立 host 端口,完成 `build -> up-spacetime -> publish -> up -> enqueue -> api-update -> enqueue`。测试 job 使用 `worker_smoke_unsupported` 类型,不访问真实 VectorEngine、LLM 或 OSS;预期结果是 worker 领取队列任务后按“不支持的任务类型”执行失败分支,从而验证队列 claim、lease、失败回写路径和 API / worker 进程隔离。`external_generation_job` 是 private table,脚本通过 worker 日志里的 job_id 和 unsupported 记录确认消费,不通过 CLI SQL 绕过权限。`smoke` 默认只启动 `api-server` 与 `external-generation-worker`,避免无关前端 / Nginx 镜像构建;需要同时验证 Nginx 时可分步执行 `up --with-nginx`。 + +分步排查时可执行: + +```bash +npm run container:worker-smoke -- init --force +npm run container:worker-smoke -- build +npm run container:worker-smoke -- up-spacetime +npm run container:worker-smoke -- publish +npm run container:worker-smoke -- up +npm run container:worker-smoke -- enqueue before-update +npm run container:worker-smoke -- api-update +npm run container:worker-smoke -- enqueue after-update +npm run container:worker-smoke -- status +``` + +如果隔离端口或库数据需要重置: + +```bash +npm run container:worker-smoke -- smoke --force +``` + +`container:worker-smoke` 默认会把本机 `spacetime` 2.4.1 CLI 打成轻量 SpacetimeDB 镜像,避免首次 smoke 必须拉取官方大镜像;普通 `npm run container:*` 压测仍默认使用 `clockworklabs/spacetime:v2.4.1`。如果 Docker build 阶段在容器内拉取 crates.io 依赖不稳定,可让容器内 Cargo 复用本机 Cargo 缓存构建当前二进制,再打入临时 smoke 镜像。该模式默认使用 `rust:1.93-bookworm` 作为 builder、Debian bookworm smoke runtime 承载构建产物;需要换 builder 镜像时设置 `GENARRATIVE_WORKER_SMOKE_CARGO_IMAGE`,需要换运行时基础镜像时设置 `GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE`: + +```bash +npm run container:worker-smoke -- smoke --local-binary +``` + +`api-update` 只会 `--force-recreate api-server`,并校验 `external-generation-worker` 容器 ID 不变;如要同时重建 API 镜像,使用: + +```bash +npm run container:worker-smoke -- api-update --build +``` + +验证 worker 动态扩缩容: + +```bash +npm run container:worker-smoke -- scale 3 +npm run container:worker-smoke -- ps +npm run container:worker-smoke -- enqueue scaled-workers +npm run container:worker-smoke -- scale 1 +``` + +查看或清理隔离环境: + +```bash +npm run container:worker-smoke -- logs external-generation-worker +npm run container:worker-smoke -- down -v +``` + 停止: ```bash diff --git a/deploy/container/docker-compose.loadtest.yml b/deploy/container/docker-compose.loadtest.yml index 7bb1f324..2466617a 100644 --- a/deploy/container/docker-compose.loadtest.yml +++ b/deploy/container/docker-compose.loadtest.yml @@ -2,7 +2,7 @@ name: genarrative-container-loadtest services: spacetimedb: - image: clockworklabs/spacetime:v2.4.1 + image: ${GENARRATIVE_CONTAINER_SPACETIME_IMAGE:-clockworklabs/spacetime:v2.4.1} user: root command: [ @@ -44,7 +44,7 @@ services: cpus: "2.0" mem_limit: 1g env_file: - - ./api-server.env + - ${GENARRATIVE_CONTAINER_API_ENV_FILE:-./api-server.env} environment: GENARRATIVE_API_HOST: 0.0.0.0 GENARRATIVE_API_PORT: 8082 @@ -77,7 +77,7 @@ services: cpus: "2.0" mem_limit: 1g env_file: - - ./api-server.env + - ${GENARRATIVE_CONTAINER_API_ENV_FILE:-./api-server.env} environment: GENARRATIVE_PROCESS_ROLE: external-generation-worker GENARRATIVE_TRACKING_OUTBOX_DIR: /var/lib/genarrative/tracking-outbox-worker diff --git a/deploy/env/external-generation-controller.env.example b/deploy/env/external-generation-controller.env.example new file mode 100644 index 00000000..e6d0ceca --- /dev/null +++ b/deploy/env/external-generation-controller.env.example @@ -0,0 +1,13 @@ +# 复制到 /etc/genarrative/external-generation-controller.env 后按机器容量调整。 +# controller 只管理 systemd worker 实例;SpacetimeDB、外部 provider 密钥继续复用 api-server.env。 +# systemd unit 会强制设置 GENARRATIVE_PROCESS_ROLE=external-generation-controller。 + +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MIN_WORKERS=1 +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MAX_WORKERS=8 +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_TARGET_JOBS_PER_WORKER=2 +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_POLL_INTERVAL_MS=10000 +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SCALE_DOWN_IDLE_ROUNDS=6 +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SERVICE_TEMPLATE=genarrative-external-generation-worker@{}.service +GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_DRY_RUN=false +GENARRATIVE_API_LOG=info,tower_http=info +OTEL_SERVICE_NAME=genarrative-external-generation-controller diff --git a/deploy/systemd/genarrative-external-generation-controller.service b/deploy/systemd/genarrative-external-generation-controller.service new file mode 100644 index 00000000..5f54bda4 --- /dev/null +++ b/deploy/systemd/genarrative-external-generation-controller.service @@ -0,0 +1,27 @@ +[Unit] +Description=Genarrative External Generation Worker Controller +After=network-online.target spacetimedb.service +Wants=network-online.target +Requires=spacetimedb.service + +[Service] +Type=simple +WorkingDirectory=/opt/genarrative/current +EnvironmentFile=/etc/genarrative/api-server.env +EnvironmentFile=-/etc/genarrative/external-generation-controller.env +ExecStart=/usr/bin/env GENARRATIVE_PROCESS_ROLE=external-generation-controller GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox/controller OTEL_SERVICE_NAME=genarrative-external-generation-controller /opt/genarrative/current/api-server +Restart=always +RestartSec=5 +KillSignal=SIGINT +TimeoutStopSec=120 +LimitNOFILE=65535 +TasksMax=512 + +# controller 需要调用 systemctl 管理 worker@N 实例,因此不降为 genarrative 用户。 +# 它只复用 api-server 发布包和 SpacetimeDB 配置,不直接执行外部生成任务。 +PrivateTmp=true +ProtectSystem=full +ReadWritePaths=/opt/genarrative /var/lib/genarrative + +[Install] +WantedBy=multi-user.target diff --git a/docs/technical/【后端架构】外部生成Worker化方案-2026-06-03.md b/docs/technical/【后端架构】外部生成Worker化方案-2026-06-03.md index 290c90a5..6a9013a0 100644 --- a/docs/technical/【后端架构】外部生成Worker化方案-2026-06-03.md +++ b/docs/technical/【后端架构】外部生成Worker化方案-2026-06-03.md @@ -24,6 +24,7 @@ - `renew_external_generation_job_lease_and_return`:worker 长任务执行期间按 `worker_id + lease_token` 续租,防止外部生成超过单次 lease 后被重复领取。 - `complete_external_generation_job_and_return`:worker 成功后按 `worker_id + lease_token` 写入 `result_payload_json`,任务进入 `completed`。 - `fail_external_generation_job_and_return`:worker 失败后按 `worker_id + lease_token` 回写错误,并按 `max_attempts` 决定回到 `pending` 重试或进入 `failed`。 +- `get_external_generation_queue_stats_and_return`:controller 读取队列积压、运行中任务和过期 lease 数量,用于计算 worker 目标实例数;该 procedure 只读 `external_generation_job`,不直接操作 systemd。 这个 Module 的 **Seam** 在 SpacetimeDB procedure + `spacetime-client` facade;`api-server` HTTP role 和 worker role 都只依赖这个 Interface。外部 provider、OSS、计费补偿、玩法草稿回写仍留在 `api-server` worker implementation 内,不进入 SpacetimeDB reducer。 @@ -82,6 +83,7 @@ pending/running -> cancelled (预留) - `api`:只启动 HTTP server。 - `external-generation-worker`:只启动外部生成 worker,不监听 HTTP。 +- `external-generation-controller`:只启动 worker controller,不监听 HTTP,也不直接执行外部生成任务。 - `all`:本地开发可同时启动 HTTP 与 worker。 worker 配置: @@ -91,7 +93,17 @@ worker 配置: - `GENARRATIVE_EXTERNAL_GENERATION_WORKER_POLL_INTERVAL_MS`:空队列轮询间隔。 - `GENARRATIVE_EXTERNAL_GENERATION_WORKER_LEASE_SECONDS`:任务 lease 时长;worker 会按约三分之一 lease、最长 30 秒的间隔续租。该值应覆盖一次心跳网络抖动窗口,不需要大于完整外部生成链路耗时。 -动态缩扩容方式:生产通过 `deploy/systemd/genarrative-external-generation-worker@.service` 或进程管理器启动更多 `external-generation-worker` 实例;无需改变 HTTP 进程数。缩容或发布重启 worker 时,进程收到 SIGINT/SIGTERM 后会停止 claim 新任务并等待当前任务完成;若进程被硬杀、机器断电或超过 systemd `TimeoutStopSec`,未完成任务会在 lease 过期后被其它 worker 重新领取。容器链路已有独立 `external-generation-worker` compose service;扩 worker 必须扩这个 worker service,不能只扩 `api-server` HTTP service。 +controller 配置: + +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MIN_WORKERS`:保底 worker 实例数,生产默认 `1`,controller 不会主动停止 `@1`。 +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MAX_WORKERS`:自动扩容上限,生产模板默认 `8`。 +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_TARGET_JOBS_PER_WORKER`:每个 worker 实例承担的目标未完成任务数,默认 `2`;目标实例数按 `claimable_pending + running_active + expired_running` 计算后夹在 min/max 之间,避免把已包含过期 running 的 `claimable_count` 重复计入。 +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_POLL_INTERVAL_MS`:controller 轮询队列统计的间隔,默认 `10000`。 +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SCALE_DOWN_IDLE_ROUNDS`:连续多少轮无可领取、无运行中、无过期 running 后才允许缩容,默认 `6`;缩容每轮只停止最高编号的一个实例。 +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SERVICE_TEMPLATE`:systemd worker 模板,默认 `genarrative-external-generation-worker@{}.service`。 +- `GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_DRY_RUN`:只记录决策不执行 systemctl,默认 `false`。 + +动态缩扩容方式:生产默认由 `deploy/systemd/genarrative-external-generation-controller.service` 启动 `GENARRATIVE_PROCESS_ROLE=external-generation-controller`,controller 读取 `get_external_generation_queue_stats_and_return` 后对 `genarrative-external-generation-worker@N.service` 执行精确 `systemctl start/stop`;无需改变 HTTP 进程数。controller 只操作 `@1..@MAX` 中的缺口或最高编号多余实例,保留 `@1` 作为保底 worker。缩容或发布重启 worker 时,进程收到 SIGINT/SIGTERM 后会停止 claim 新任务并等待当前任务完成;若进程被硬杀、机器断电或超过 systemd `TimeoutStopSec`,未完成任务会在 lease 过期后被其它 worker 重新领取。容器链路已有独立 `external-generation-worker` compose service;扩 worker 必须扩这个 worker service,不能只扩 `api-server` HTTP service。 ## 已接入的拼图纵切 @@ -147,13 +159,14 @@ GENARRATIVE_PROCESS_ROLE=all npm run dev curl -f http://127.0.0.1:/healthz ``` -本地同步排查可显式使用 `GENARRATIVE_EXTERNAL_GENERATION_MODE=inline npm run dev:api-server`,用于确认 provider、OSS 和 SpacetimeDB 写回链路本身是否可行;该模式不覆盖 worker 队列 smoke。生产 smoke 需要保持 `GENARRATIVE_EXTERNAL_GENERATION_MODE=queue`,并至少启动一个 `api` 角色和一个 `external-generation-worker` 角色;发布脚本会在默认 worker pattern 下自动启用并启动 `genarrative-external-generation-worker@1.service`,并等待 worker active。若 worker 数量归零,生成任务会保持 `queued/running`,不会由 HTTP 进程偷偷执行。 +本地同步排查可显式使用 `GENARRATIVE_EXTERNAL_GENERATION_MODE=inline npm run dev:api-server`,用于确认 provider、OSS 和 SpacetimeDB 写回链路本身是否可行;该模式不覆盖 worker 队列 smoke。生产 smoke 需要保持 `GENARRATIVE_EXTERNAL_GENERATION_MODE=queue`,并至少启动一个 `api` 角色、一个 `external-generation-worker` 角色和一个 `external-generation-controller` 角色;发布脚本会在默认 worker pattern 下自动启用并启动 `genarrative-external-generation-worker@1.service`,重启并验活 `genarrative-external-generation-controller.service`。若 worker 数量归零,生成任务会保持 `queued/running`,不会由 HTTP 进程偷偷执行。 -systemd 生产扩缩容示例: +systemd 生产 controller 与手动兜底示例: ```bash systemctl enable --now genarrative-external-generation-worker@1.service +systemctl enable --now genarrative-external-generation-controller.service systemctl start genarrative-external-generation-worker@2.service systemctl stop genarrative-external-generation-worker@2.service -systemctl status 'genarrative-external-generation-worker@*.service' +systemctl status genarrative-external-generation-controller.service 'genarrative-external-generation-worker@*.service' ``` diff --git a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md index ad486e3e..312d079b 100644 --- a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md +++ b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md @@ -53,6 +53,8 @@ Linux 本机多用户并发开发时,`npm run dev` 和 `npm run dev:*` 单模 本地排查外部内容生成 worker 时,可临时用 `GENARRATIVE_PROCESS_ROLE=all npm run dev:api-server` 让同一 Rust 进程同时监听 HTTP 并消费 `external_generation_job` 队列。该模式只用于 smoke;生产默认 `GENARRATIVE_PROCESS_ROLE=api`,外部生成任务由独立 `GENARRATIVE_PROCESS_ROLE=external-generation-worker` 进程消费。外部生成执行策略由 `GENARRATIVE_EXTERNAL_GENERATION_MODE` 控制,生产与容器扩缩容验证保持 `queue`,拼图首图 `compile_puzzle_draft`、结果页关卡图片 `generate_puzzle_images` 和结果页 UI 背景 `generate_puzzle_ui_background` 会进入持久队列;worker 数量为 0 时,HTTP 只返回 queued/running,不会兜底执行外部 provider。本地如果要让 `npm run dev` 或 `npm run dev:api-server` 同步等待生成结果,应在 `.env.local` 或本机环境显式配置 `GENARRATIVE_EXTERNAL_GENERATION_MODE=inline`,由 handler 直接复用 worker executor 并在完成后返回 `completed`;该配置不得硬编码进 `scripts/dev.mjs`,且 inline 不创建 `external_generation_job`、不提供动态扩缩容能力。 +需要验证“更新 API 不停 worker”和“worker 是否持续消费队列”时,优先使用隔离容器 smoke:`npm run container:worker-smoke -- smoke`。该脚本生成 gitignored 的 `deploy/container/worker-smoke/api-server.env`,启动独立 compose project 与独立 SpacetimeDB,发布当前 `spacetime-module` 后写入 `worker_smoke_unsupported` 测试 job;预期 worker claim 后执行 unsupported 失败分支,再执行 API-only recreate 并确认 worker 容器 ID 不变,最后再次入队验证 API 更新后队列仍可消费。`external_generation_job` 是 private table,脚本通过 worker 日志确认 job_id 被消费,不用 CLI SQL 查询私表。该 smoke 不读取 `.env.local`,也不依赖真实 VectorEngine / OSS 密钥;真实生图链路联调再在本地私有 env 中补齐 provider 配置。worker-smoke 默认把本机 `spacetime` CLI 打成轻量 SpacetimeDB 镜像,避免本机首次 smoke 依赖官方大镜像下载。若容器内 Cargo 拉取 crates.io 依赖不稳定,可用 `npm run container:worker-smoke -- smoke --local-binary` 让容器内 Cargo 复用本机 Cargo 缓存构建当前二进制,再打入 Debian bookworm smoke runtime 临时镜像;可用 `GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE` 覆盖运行时基础镜像;若隔离端口或库数据需要重建,追加 `--force`。 + 本地只做账号/UI smoke 且需要短信登录时,`SMS_AUTH_PROVIDER` 应显式设为 `mock`,并把 `SMS_AUTH_MOCK_VERIFY_CODE` 设为固定值(当前常用 `123456`),再重启 `npm run dev` 或 `npm run dev:api-server`。如果 `.env.local` 还保留 `SMS_AUTH_PROVIDER=aliyun`,`POST /api/auth/phone/login` 用 mock 验证码会稳定报“验证码错误”,不是前端表单问题。真实短信联调再切回 `aliyun` 并重启。 微信小程序虚拟支付使用 `WECHAT_MINI_PROGRAM_VIRTUAL_PAYMENT_OFFER_ID`、`WECHAT_MINI_PROGRAM_VIRTUAL_PAYMENT_APP_KEY`、`WECHAT_MINI_PROGRAM_VIRTUAL_PAYMENT_SANDBOX_APP_KEY` 和 `WECHAT_MINI_PROGRAM_VIRTUAL_PAYMENT_ENV` 配置。小程序充值统一走 `wechat_mp_virtual` / `wx.requestVirtualPayment`:泥点属于代币(`coin`),`buyQuantity` 按当前充值商品快照里的 `points_amount` 传;会员和后台新增道具类商品走 `short_series_goods`,`productId` 对应微信后台道具 ID。旧登录快照若缺 `session_key`,需要用户在小程序内重新登录后再支付;客户端成功回调不是最终到账,仍以后端通知或查询确认订单为准。详细口径见 `docs/【技术方案】微信虚拟支付接入-2026-05-26.md`。 @@ -262,11 +264,12 @@ Jenkins 按 web / api / Spacetime module / build / deploy / publish 拆分 `Genarrative-Server-Provision` 会安装并启用 `genarrative-health-patrol.timer`,默认每 5 分钟运行一次 `genarrative-health-patrol.service`。巡检脚本随 API release 归档到 `/opt/genarrative/current/scripts/ops/production-health-patrol.mjs`,只读检查: -- `genarrative-api.service`、`spacetimedb.service`、`nginx.service` 是否 active。 +- `genarrative-api.service`、`genarrative-external-generation-controller.service`、`spacetimedb.service`、`nginx.service` 是否 active。 +- 至少一个 `genarrative-external-generation-worker@*.service` 实例是否 active;如果 controller 存活但 worker 全部退出,巡检直接返回 `CRITICAL`,避免外部生成队列长期无人消费。 - API 直连 `/healthz`、`/readyz`。 - SpacetimeDB 直连 `/v1/ping`。 - 默认直连 API 端口检查 `/api/creation-entry/config`、`/api/runtime/puzzle/gallery`、`/api/runtime/custom-world-gallery`;如需走 Nginx / 公网域名,在 `/etc/genarrative/health-patrol.env` 配置 `GENARRATIVE_HEALTH_PATROL_PUBLIC_BASE_URL=https://<域名>`。 -- 最近 15 分钟 `genarrative-api.service`、`spacetimedb.service`、`nginx.service` 的 `err..alert` 日志。 +- 最近 15 分钟 `genarrative-api.service`、`genarrative-external-generation-controller.service`、`genarrative-external-generation-worker@*.service`、`spacetimedb.service`、`nginx.service` 的 `err..alert` 日志。 巡检输出总状态 `OK / WARNING / CRITICAL`;只有 `CRITICAL` 默认让 systemd service 失败,`WARNING` 只写日志和状态文件,避免历史日志噪声把 timer 长期打成失败。最近一次结果写入 `/var/lib/genarrative/health-patrol/status.json`。手动执行: @@ -304,7 +307,7 @@ dev 服务器上的 Gitea 内网入口固定为 `http://10.2.0.10/GenarrativeAI/ 生产环境变量模板:`deploy/env/api-server.env.example`。真实密钥只放服务器,不提交 Git,不写入文档示例。 -`api-server` 进程角色由 `GENARRATIVE_PROCESS_ROLE` 控制:`api` 只监听 HTTP,`external-generation-worker` 只消费外部生成队列,`all` 仅用于本地或临时 smoke。外部生成策略由 `GENARRATIVE_EXTERNAL_GENERATION_MODE` 控制,生产和容器压测默认保持 `queue`;`inline` 只用于本地或低并发同步排查,HTTP handler 会直接复用 worker executor,完成后返回 `completed`,但不会落 `external_generation_job`,也不能通过增加 worker 进程扩吞吐。外部生成 worker 使用同一发布包和同一套 SpacetimeDB 配置,按实例数和 `GENARRATIVE_EXTERNAL_GENERATION_WORKER_CONCURRENCY` 动态扩缩;扩容时增加 worker 进程或提高单进程并发,缩容时停止多余 worker。worker 收到 SIGINT/SIGTERM 后会停止 claim 新任务并等待当前任务完成;若进程被硬杀、机器断电或超过 systemd `TimeoutStopSec`,未完成任务才会在 lease 过期后由其它 worker 重领。每个 worker 实例应设置唯一 `GENARRATIVE_EXTERNAL_GENERATION_WORKER_ID`,默认会用主机名和 pid 兜底;systemd 生产模板 `deploy/systemd/genarrative-external-generation-worker@.service` 会用 `%H-%i` 生成实例 ID,并把 tracking outbox 隔离到 `/var/lib/genarrative/tracking-outbox/%H-%i`。`Genarrative-Server-Provision` 会默认 enable 首个 `genarrative-external-generation-worker@1.service`,并在已存在 `/opt/genarrative/current/api-server` 时随 API 一起重启;首次 API deploy 会在默认 worker pattern 下自动 `enable --now genarrative-external-generation-worker@1.service` 并等待 worker active。手动持久化首个实例可用 `systemctl enable --now genarrative-external-generation-worker@1.service`,横向扩容用 `systemctl start genarrative-external-generation-worker@2.service` / `@3.service`,缩容用 `systemctl stop genarrative-external-generation-worker@N.service`。worker 专属参数模板是 `deploy/env/external-generation-worker.env.example`,密钥与 SpacetimeDB 连接仍复用 `/etc/genarrative/api-server.env`。API 发布脚本默认会重启并验活 `genarrative-external-generation-worker@*.service`;若本次只发 HTTP 且不希望滚动 worker,可传 `--no-worker-services`。`GENARRATIVE_EXTERNAL_GENERATION_WORKER_POLL_INTERVAL_MS` 控制空队列轮询间隔,`GENARRATIVE_EXTERNAL_GENERATION_WORKER_LEASE_SECONDS` 控制单次 lease,worker 会约每三分之一 lease、最长 30 秒续租;该值应覆盖一次心跳网络抖动窗口,不需要大于完整外部生成链路耗时。SpacetimeDB 使用自身事务时间计算 claim/renew/complete/fail,完成和失败回写还会校验 `lease_token` 与未过期 lease,避免同一 job 被过期 worker 覆盖。当前拼图首关生成只做 lease 崩溃重领,不做业务失败自动重试,避免 worker 退款和重试成功之间产生钱包账本漂移。 +`api-server` 进程角色由 `GENARRATIVE_PROCESS_ROLE` 控制:`api` 只监听 HTTP,`external-generation-worker` 只消费外部生成队列,`external-generation-controller` 只管理 worker systemd 实例,`all` 仅用于本地或临时 smoke,不隐式启动 controller。外部生成策略由 `GENARRATIVE_EXTERNAL_GENERATION_MODE` 控制,生产和容器压测默认保持 `queue`;`inline` 只用于本地或低并发同步排查,HTTP handler 会直接复用 worker executor,完成后返回 `completed`,但不会落 `external_generation_job`,也不能通过增加 worker 进程扩吞吐。外部生成 worker 使用同一发布包和同一套 SpacetimeDB 配置,按实例数和 `GENARRATIVE_EXTERNAL_GENERATION_WORKER_CONCURRENCY` 动态扩缩;生产默认由 `genarrative-external-generation-controller.service` 读取 `get_external_generation_queue_stats_and_return`,按 `claimable_pending + running_active + expired_running` 计算目标 worker 数,并对 `genarrative-external-generation-worker@N.service` 精确执行 `systemctl start/stop`。controller 参数模板是 `deploy/env/external-generation-controller.env.example`:默认保底 `MIN_WORKERS=1`、上限 `MAX_WORKERS=8`、每 worker 目标 `TARGET_JOBS_PER_WORKER=2`、`POLL_INTERVAL_MS=10000`、连续 `SCALE_DOWN_IDLE_ROUNDS=6` 轮完全空闲才缩容;缩容每轮只停止最高编号的一个实例,且不主动停止 `@1`。worker 收到 SIGINT/SIGTERM 后会停止 claim 新任务并等待当前任务完成;若进程被硬杀、机器断电或超过 systemd `TimeoutStopSec`,未完成任务才会在 lease 过期后由其它 worker 重领。每个 worker 实例应设置唯一 `GENARRATIVE_EXTERNAL_GENERATION_WORKER_ID`,默认会用主机名和 pid 兜底;systemd 生产模板 `deploy/systemd/genarrative-external-generation-worker@.service` 会用 `%H-%i` 生成实例 ID,并把 tracking outbox 隔离到 `/var/lib/genarrative/tracking-outbox/%H-%i`。`Genarrative-Server-Provision` 会安装 worker 模板、controller unit 和两份专属 env 模板,默认 enable 首个 `genarrative-external-generation-worker@1.service` 与 `genarrative-external-generation-controller.service`;首次 API deploy 会在默认 worker pattern 下自动 `enable --now genarrative-external-generation-worker@1.service` 并等待 worker active,同时重启并验活 controller。手动兜底扩容仍可用 `systemctl start genarrative-external-generation-worker@2.service` / `@3.service`,缩容用 `systemctl stop genarrative-external-generation-worker@N.service`;controller 下轮会按队列压力修正到目标实例数。worker 专属参数模板是 `deploy/env/external-generation-worker.env.example`,密钥与 SpacetimeDB 连接仍复用 `/etc/genarrative/api-server.env`。API 发布脚本默认会重启并验活 `genarrative-external-generation-worker@*.service` 和 `genarrative-external-generation-controller.service`;若本次只发 HTTP 且不希望滚动 worker,可传 `--no-worker-services`,若不希望重启 controller 可传 `--no-worker-controller`。`GENARRATIVE_EXTERNAL_GENERATION_WORKER_POLL_INTERVAL_MS` 控制空队列轮询间隔,`GENARRATIVE_EXTERNAL_GENERATION_WORKER_LEASE_SECONDS` 控制单次 lease,worker 会约每三分之一 lease、最长 30 秒续租;该值应覆盖一次心跳网络抖动窗口,不需要大于完整外部生成链路耗时。SpacetimeDB 使用自身事务时间计算 claim/renew/complete/fail,完成和失败回写还会校验 `lease_token` 与未过期 lease,避免同一 job 被过期 worker 覆盖。当前拼图首关生成只做 lease 崩溃重领,不做业务失败自动重试,避免 worker 退款和重试成功之间产生钱包账本漂移。 `Genarrative-Server-Provision` 会安装 systemd 模板和 Nginx 站点模板,不再安装 clang / lld / pkg-config / OpenSSL headers / sccache 等通用构建链依赖。因 VectorEngine 图片上游 POST 已改用 `libcurl`,当前 Linux release 构建出的 `api-server` 运行时需要 `OPENSSL_3.2.0` 符号;Ubuntu 24.04 apt 默认只提供 OpenSSL 3.0.x,不能直接满足该符号版本。Provision 会把 OpenSSL `3.2.0` 独立安装到 `/opt/genarrative/openssl-3.2.0`,校验官方 tarball SHA256,并只通过 `genarrative-api.service` 的 `LD_LIBRARY_PATH=/opt/genarrative/openssl-3.2.0/lib64:/opt/genarrative/openssl-3.2.0/lib` 让 api-server 使用,避免替换系统 OpenSSL 或影响 ssh / nginx / apt。Ubuntu / apt 目标机为完成这一步会安装 `build-essential`、`ca-certificates`、`curl`、`perl`、`tar` 等 OpenSSL 运行时自举工具;这只服务于独立 OpenSSL 运行时安装,不代表 provision 重新承担 api-server 构建职责。Ubuntu / apt 目标机会额外安装 `libnginx-mod-http-brotli-filter` 与 `libnginx-mod-http-brotli-static`,随后由 `scripts/jenkins-server-provision.sh` 通过临时 `nginx -t` 配置探测 Brotli 指令是否可用;该临时配置必须先 `include /etc/nginx/modules-enabled/*.conf`,因为 apt 安装的 Brotli 是动态模块,不会出现在普通 `nginx -V` 编译参数里。探测成功才在渲染后的 `deploy/nginx/genarrative.conf` / `genarrative-dev-http.conf` 中启用 Brotli,避免未安装模块的机器直接写入无效配置。Provision 写入 Genarrative Nginx 站点时会把 `/etc/nginx/sites-enabled/default*` 移到 `/etc/nginx/sites-disabled/`,避免 Debian / Certbot 默认站点继续占用 `genarrative.world` / `www.genarrative.world` 并在 `nginx -T` 中出现 `conflicting server name ... ignored`。如果 `nginx -t` 失败,脚本会恢复写入前的 Genarrative 配置和被移动的默认站点。 @@ -336,6 +339,7 @@ npm run container:down 容器方案默认暴露 `http://127.0.0.1:18080`,`api-server` 在容器内监听 `0.0.0.0:8082`,Nginx 通过 `api-server:8082` upstream 反代 `/api/` 和 `/admin/api/`。SpacetimeDB 也纳入 compose,容器内由 `spacetimedb:3101` 提供服务,宿主机通过 `http://127.0.0.1:13101` 进行模块发布;Collector 镜像使用 `otel/opentelemetry-collector-contrib:0.151.0`。生产 provision 侧现在由目标 dev / release agent 自己准备 `provision-tools/otelcol-contrib`,并安装本机 `otelcol-contrib.service`,真实库名、token 和外部服务密钥只写本地 `deploy/container/api-server.env`,不提交 Git。完整拓扑、端口、k6 参数和 OTLP debug exporter 使用方法见 `deploy/container/README.md`。 `npm run container:config` 默认只做 quiet 校验,避免把本地 env 中的 token 展开到终端;确需排查完整 compose 时再传 `-- --print`。 +隔离验证 worker 队列和 API-only 更新时使用 `npm run container:worker-smoke -- smoke`。该命令不复用 `deploy/container/api-server.env`,会在 `deploy/container/worker-smoke/` 生成本机专用 env 与端口 state,并使用 unsupported job 验证 worker claim / fail 回写,不需要真实外部生成密钥;本机 crates.io 网络不稳时使用 `--local-binary`,由容器内 Cargo 复用本机 Cargo 缓存构建,并把产物放进 Debian bookworm smoke runtime。 OpenTelemetry 现阶段默认开启 OTLP traces / metrics / logs,但本地日志与 Nginx 文件日志仍保留: diff --git a/package.json b/package.json index c4b39b8d..de61e0f3 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "container:ps": "node scripts/container-compose.mjs ps", "container:config": "node scripts/container-compose.mjs config", "container:k6": "node scripts/container-compose.mjs k6", + "container:worker-smoke": "node scripts/container-worker-smoke.mjs", "check": "npm run lint && npm run test && npm run build && npm run check:content", "check:data": "node scripts/run-tsx.cjs scripts/validate-content.ts", "check:overrides": "node scripts/run-tsx.cjs scripts/validate-overrides.ts", diff --git a/scripts/check-production-ops-guardrails.mjs b/scripts/check-production-ops-guardrails.mjs index 6a8537f0..89ce254e 100644 --- a/scripts/check-production-ops-guardrails.mjs +++ b/scripts/check-production-ops-guardrails.mjs @@ -23,6 +23,46 @@ const checks = [ includes: 'genarrative-health-patrol.timer', reason: 'Server-Provision 必须安装并启用健康巡检 timer。', }, + { + file: 'scripts/jenkins-server-provision.sh', + includes: 'genarrative-external-generation-controller.service', + reason: 'Server-Provision 必须安装并启用外部生成 worker controller。', + }, + { + file: 'scripts/jenkins-server-provision.sh', + includes: 'genarrative-external-generation-worker@1.service', + reason: 'Server-Provision 必须启用外部生成保底 worker 实例。', + }, + { + file: 'scripts/deploy/production-api-deploy.sh', + includes: 'ensure_default_worker_service', + reason: 'API Deploy 必须在缺少 worker 实例时补启动默认外部生成 worker。', + }, + { + file: 'scripts/deploy/production-api-deploy.sh', + includes: 'wait_for_worker_services', + reason: 'API Deploy 必须等待外部生成 worker 实例 active。', + }, + { + file: 'scripts/deploy/production-api-deploy.sh', + includes: 'wait_for_worker_controller_service', + reason: 'API Deploy 必须重启并验活外部生成 worker controller。', + }, + { + file: 'deploy/systemd/genarrative-external-generation-worker@.service', + includes: 'GENARRATIVE_PROCESS_ROLE=external-generation-worker', + reason: '外部生成 worker 模板必须作为独立 worker 进程角色运行。', + }, + { + file: 'deploy/systemd/genarrative-external-generation-controller.service', + includes: 'GENARRATIVE_PROCESS_ROLE=external-generation-controller', + reason: '外部生成 worker controller 必须作为独立进程角色运行。', + }, + { + file: 'scripts/ops/production-health-patrol.mjs', + includes: 'checkActiveWorkerInstances', + reason: '生产健康巡检必须检查至少一个外部生成 worker 实例 active。', + }, { file: 'scripts/build-production-release.sh', includes: 'production-health-patrol.mjs', diff --git a/scripts/container-worker-smoke.mjs b/scripts/container-worker-smoke.mjs new file mode 100644 index 00000000..48da3a5e --- /dev/null +++ b/scripts/container-worker-smoke.mjs @@ -0,0 +1,839 @@ +import {spawn} from 'node:child_process'; +import { + chmodSync, + copyFileSync, + existsSync, + mkdirSync, + readFileSync, + writeFileSync, +} from 'node:fs'; +import net from 'node:net'; +import path from 'node:path'; + +const [, , rawCommand = 'help', ...rawArgs] = process.argv; + +const projectRoot = process.cwd(); +const composeFile = path.join('deploy', 'container', 'docker-compose.loadtest.yml'); +const smokeDir = path.join('deploy', 'container', 'worker-smoke'); +const envPath = path.join(smokeDir, 'api-server.env'); +const statePath = path.join(smokeDir, 'state.json'); +const localImageDir = path.join(smokeDir, 'image'); +const localImageDockerfilePath = path.join(localImageDir, 'Dockerfile.local'); +const localImageBinaryPath = path.join(localImageDir, 'api-server'); +const localCargoTargetDir = path.join('server-rs', 'target-worker-smoke'); +const localSpacetimeImageDir = path.join(smokeDir, 'spacetimedb-image'); +const localSpacetimeDockerfilePath = path.join(localSpacetimeImageDir, 'Dockerfile.local'); +const localSpacetimeBinaryPath = path.join(localSpacetimeImageDir, 'spacetime'); +const localSpacetimeStandalonePath = path.join( + localSpacetimeImageDir, + 'spacetimedb-standalone', +); +const projectName = process.env.GENARRATIVE_WORKER_SMOKE_PROJECT || 'genarrative-worker-smoke'; +const defaultDatabase = + process.env.GENARRATIVE_WORKER_SMOKE_DATABASE || 'genarrative-worker-smoke'; + +const command = rawCommand.trim(); +const supportedCommands = new Set([ + 'help', + 'init', + 'build', + 'up-spacetime', + 'publish', + 'up', + 'enqueue', + 'status', + 'api-update', + 'scale', + 'logs', + 'ps', + 'down', + 'smoke', +]); + +if (!supportedCommands.has(command)) { + printHelp(true); + process.exit(1); +} + +try { + await main(); +} catch (error) { + console.error(`[worker-smoke] ${error.message}`); + process.exit(1); +} + +async function main() { + switch (command) { + case 'help': + printHelp(false); + return; + case 'init': + await ensureStateAndEnv({force: rawArgs.includes('--force')}); + return; + case 'build': + await ensureStateAndEnv(); + await buildRuntimeImages(); + return; + case 'up-spacetime': + await ensureStateAndEnv(); + await ensureSpacetimeImage(); + await dockerCompose(['up', '-d', 'spacetimedb', 'otelcol']); + await waitForSpacetime(); + return; + case 'publish': + await ensureStateAndEnv(); + await publishModule(); + return; + case 'up': + await ensureStateAndEnv(); + await upRuntime(); + await waitForApi(); + return; + case 'enqueue': + await ensureStateAndEnv(); + await enqueueSmokeJob(); + return; + case 'status': + await ensureStateAndEnv(); + await printQueueStatus(); + return; + case 'api-update': + await ensureStateAndEnv(); + await apiOnlyUpdate({build: rawArgs.includes('--build')}); + return; + case 'scale': + await ensureStateAndEnv(); + await scaleWorkers(rawArgs[0] ?? '1'); + return; + case 'logs': + await ensureStateAndEnv(); + await dockerCompose(['logs', ...rawArgs]); + return; + case 'ps': + await ensureStateAndEnv(); + await dockerCompose(['ps', ...rawArgs]); + return; + case 'down': + await ensureStateAndEnv({create: false}); + await dockerCompose(['down', ...rawArgs]); + return; + case 'smoke': + await runSmoke(); + return; + default: + throw new Error(`未知命令: ${command}`); + } +} + +async function runSmoke() { + if (rawArgs.includes('--force')) { + await ensureStateAndEnv(); + await dockerComposeCapture(['down', '-v'], {allowFailure: true}); + } + const state = await ensureStateAndEnv({force: rawArgs.includes('--force')}); + await assertSavedPortsAvailableForNewProject(state); + console.log( + `[worker-smoke] 使用隔离环境 project=${projectName} database=${state.database}`, + ); + await buildRuntimeImages(); + await ensureSpacetimeImage(); + await dockerCompose(['up', '-d', 'spacetimedb', 'otelcol']); + await waitForSpacetime(); + await publishModule(); + await upRuntime(); + await waitForApi(); + await assertWorkersRunning(); + + const beforeWorkerIds = await getContainerIds('external-generation-worker'); + console.log(`[worker-smoke] worker 容器: ${beforeWorkerIds.join(', ')}`); + + const firstJobId = await enqueueSmokeJob({label: 'before-api-update'}); + await waitForJobConsumed(firstJobId); + + await apiOnlyUpdate({build: false}); + const afterWorkerIds = await getContainerIds('external-generation-worker'); + if (beforeWorkerIds.join('\n') !== afterWorkerIds.join('\n')) { + throw new Error( + `api-update 后 worker 容器发生变化: before=${beforeWorkerIds.join(',')} after=${afterWorkerIds.join(',')}`, + ); + } + console.log('[worker-smoke] api-only 更新未重建 worker 容器。'); + + const secondJobId = await enqueueSmokeJob({label: 'after-api-update'}); + await waitForJobConsumed(secondJobId); + await printQueueStatus(); + console.log('[worker-smoke] smoke 通过:worker 独立消费队列,API-only 更新未停止 worker。'); +} + +async function buildRuntimeImages() { + const imageMode = resolveImageMode(); + if (imageMode === 'local-binary') { + await buildLocalBinaryRuntimeImages(); + return; + } + await dockerCompose(['build', 'api-server', 'external-generation-worker']); +} + +function resolveImageMode() { + if (rawArgs.includes('--local-binary')) { + return 'local-binary'; + } + const envMode = process.env.GENARRATIVE_WORKER_SMOKE_IMAGE_MODE; + if (!envMode || envMode === 'dockerfile') { + return 'dockerfile'; + } + if (envMode === 'local-binary') { + return 'local-binary'; + } + throw new Error( + `GENARRATIVE_WORKER_SMOKE_IMAGE_MODE 仅支持 dockerfile 或 local-binary: ${envMode}`, + ); +} + +async function buildLocalBinaryRuntimeImages() { + const profile = + rawArgs.includes('--release') || + process.env.GENARRATIVE_WORKER_SMOKE_CARGO_PROFILE === 'release' + ? 'release' + : 'debug'; + const buildArgs = ['build', '-p', 'api-server', '--manifest-path', 'server-rs/Cargo.toml']; + if (profile === 'release') { + buildArgs.push('--release'); + } + const cargoImage = resolveLocalBinaryCargoImage(); + const cargoHome = resolveLocalBinaryCargoHome(); + mkdirSync(cargoHome, {recursive: true}); + + console.log( + `[worker-smoke] 使用 ${cargoImage} 复用本机 Cargo 缓存构建 ${profile} api-server 二进制。`, + ); + await run('docker', [ + 'run', + '--rm', + '-u', + currentUserSpec(), + '-v', + `${projectRoot}:/workspace`, + '-v', + `${cargoHome}:/cargo-home`, + '-w', + '/workspace', + '-e', + 'HOME=/cargo-home', + '-e', + 'CARGO_HOME=/cargo-home', + '-e', + `CARGO_TARGET_DIR=/workspace/${toContainerPath(localCargoTargetDir)}`, + cargoImage, + 'cargo', + '--config', + 'build.rustc-wrapper=""', + '--config', + 'target.x86_64-unknown-linux-gnu.linker="cc"', + '--config', + 'target.x86_64-unknown-linux-gnu.rustflags=[]', + ...buildArgs, + ]); + + const sourceBinaryPath = path.join(localCargoTargetDir, profile, 'api-server'); + if (!existsSync(sourceBinaryPath)) { + throw new Error(`未找到 worker smoke api-server 二进制: ${sourceBinaryPath}`); + } + + mkdirSync(localImageDir, {recursive: true}); + copyFileSync(sourceBinaryPath, localImageBinaryPath); + chmodSync(localImageBinaryPath, 0o755); + + const baseImage = await resolveLocalBinaryBaseImage(); + writeFileSync(localImageDockerfilePath, buildLocalBinaryDockerfile(baseImage), 'utf8'); + + await run('docker', [ + 'build', + '-f', + localImageDockerfilePath, + '-t', + `${projectName}-api-server`, + '-t', + `${projectName}-external-generation-worker`, + localImageDir, + ]); +} + +function resolveLocalBinaryCargoImage() { + return process.env.GENARRATIVE_WORKER_SMOKE_CARGO_IMAGE || 'rust:1.93-bookworm'; +} + +function resolveLocalBinaryCargoHome() { + if (process.env.GENARRATIVE_WORKER_SMOKE_CARGO_HOME) { + return path.resolve(process.env.GENARRATIVE_WORKER_SMOKE_CARGO_HOME); + } + if (!process.env.HOME) { + throw new Error('未找到 HOME,无法挂载本机 Cargo 缓存。'); + } + return path.join(process.env.HOME, '.cargo'); +} + +function currentUserSpec() { + if (typeof process.getuid === 'function' && typeof process.getgid === 'function') { + return `${process.getuid()}:${process.getgid()}`; + } + return '0:0'; +} + +async function ensureSpacetimeImage() { + if (process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_IMAGE_MODE === 'official') { + return; + } + const imageName = localSpacetimeImageName(); + const existingImage = await runCapture('docker', ['image', 'inspect', imageName], { + allowFailure: true, + quiet: true, + }); + if (existingImage.code === 0 && !rawArgs.includes('--force')) { + return; + } + + const spacetimePath = await resolveSpacetimeBinaryPath(); + if (!spacetimePath) { + throw new Error('未找到本机 spacetime CLI,无法构建隔离 SpacetimeDB 镜像。'); + } + + mkdirSync(localSpacetimeImageDir, {recursive: true}); + copyFileSync(spacetimePath, localSpacetimeBinaryPath); + chmodSync(localSpacetimeBinaryPath, 0o755); + const standalonePath = path.join(path.dirname(spacetimePath), 'spacetimedb-standalone'); + if (!existsSync(standalonePath)) { + throw new Error(`未找到本机 spacetimedb-standalone: ${standalonePath}`); + } + copyFileSync(standalonePath, localSpacetimeStandalonePath); + chmodSync(localSpacetimeStandalonePath, 0o755); + writeFileSync(localSpacetimeDockerfilePath, buildLocalSpacetimeDockerfile(), 'utf8'); + + console.log(`[worker-smoke] 使用本机 spacetime CLI 构建隔离镜像: ${imageName}`); + await run('docker', [ + 'build', + '-f', + localSpacetimeDockerfilePath, + '-t', + imageName, + localSpacetimeImageDir, + ]); +} + +function buildLocalSpacetimeDockerfile() { + return `FROM debian:bookworm-slim +WORKDIR /var/lib/spacetimedb +RUN apt-get update && \\ + apt-get install -y --no-install-recommends ca-certificates libstdc++6 zlib1g && \\ + rm -rf /var/lib/apt/lists/* +COPY spacetime /usr/local/bin/spacetime +COPY spacetimedb-standalone /usr/local/bin/spacetimedb-standalone +RUN chmod 0755 /usr/local/bin/spacetime /usr/local/bin/spacetimedb-standalone +ENTRYPOINT ["spacetime"] +`; +} + +async function resolveSpacetimeBinaryPath() { + if (process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_BIN) { + return process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_BIN; + } + const versionResult = await runCapture('spacetime', ['--version'], {quiet: true}); + const pathMatch = versionResult.stdout.match(/^spacetime Path:\s*(.+)$/mu); + if (pathMatch?.[1]) { + return pathMatch[1].trim(); + } + const whichResult = await runCapture('which', ['spacetime'], {quiet: true}); + return whichResult.stdout.trim(); +} + +async function resolveLocalBinaryBaseImage() { + if (process.env.GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE) { + return process.env.GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE; + } + return 'debian:bookworm-slim'; +} + +function buildLocalBinaryDockerfile(baseImage) { + return `FROM ${baseImage} +WORKDIR /srv/genarrative +RUN apt-get update && \\ + apt-get install -y --no-install-recommends ca-certificates curl libssl3 zlib1g libzstd1 && \\ + rm -rf /var/lib/apt/lists/* && \\ + (id -u genarrative >/dev/null 2>&1 || useradd --system --create-home --home-dir /srv/genarrative --shell /usr/sbin/nologin genarrative) +COPY api-server /usr/local/bin/api-server +RUN chmod 0755 /usr/local/bin/api-server && \\ + mkdir -p /var/lib/genarrative/auth /var/lib/genarrative/tracking-outbox && \\ + chown -R genarrative:genarrative /srv/genarrative /var/lib/genarrative +USER genarrative +EXPOSE 8082 +ENV GENARRATIVE_ENV=container \\ + GENARRATIVE_API_HOST=0.0.0.0 \\ + GENARRATIVE_API_PORT=8082 \\ + GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox +CMD ["api-server"] +`; +} + +function toContainerPath(localPath) { + return localPath.split(path.sep).join('/'); +} + +async function upRuntime() { + const services = ['api-server', 'external-generation-worker']; + if (rawArgs.includes('--with-nginx')) { + services.push('nginx'); + } + await dockerCompose(['up', '-d', ...services]); +} + +async function ensureStateAndEnv(options = {}) { + const {force = false, create = true} = options; + if (!create && !existsSync(statePath)) { + return defaultState(); + } + mkdirSync(smokeDir, {recursive: true}); + + if (!existsSync(statePath) || force) { + const state = { + database: defaultDatabase, + spacetimePort: await findAvailablePort( + Number(process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_PORT || 19101), + ), + httpPort: await findAvailablePort( + Number(process.env.GENARRATIVE_WORKER_SMOKE_HTTP_PORT || 19080), + ), + otlpGrpcPort: await findAvailablePort( + Number(process.env.GENARRATIVE_WORKER_SMOKE_OTLP_GRPC_PORT || 15317), + ), + otlpHttpPort: await findAvailablePort( + Number(process.env.GENARRATIVE_WORKER_SMOKE_OTLP_HTTP_PORT || 15318), + ), + createdAt: new Date().toISOString(), + }; + writeFileSync(statePath, `${JSON.stringify(state, null, 2)}\n`, 'utf8'); + } + + const state = readState(); + if (!existsSync(envPath) || force) { + writeFileSync(envPath, buildSmokeEnv(state), 'utf8'); + } + console.log(`[worker-smoke] env=${envPath}`); + console.log(`[worker-smoke] state=${statePath}`); + console.log(`[worker-smoke] SpacetimeDB=http://127.0.0.1:${state.spacetimePort}`); + console.log(`[worker-smoke] Nginx=http://127.0.0.1:${state.httpPort}`); + return state; +} + +function buildSmokeEnv(state) { + return `# 本文件由 scripts/container-worker-smoke.mjs 生成,仅用于本机隔离 worker smoke。 +# 不要在这里写真实生产密钥;目录 deploy/container/worker-smoke/ 已被 gitignore。 +GENARRATIVE_ENV=container-worker-smoke +GENARRATIVE_API_HOST=0.0.0.0 +GENARRATIVE_API_PORT=8082 +GENARRATIVE_API_LOG=info,tower_http=info +GENARRATIVE_API_LISTEN_BACKLOG=256 +GENARRATIVE_API_WORKER_THREADS=2 +GENARRATIVE_PROCESS_ROLE=api +GENARRATIVE_EXTERNAL_GENERATION_MODE=queue +GENARRATIVE_EXTERNAL_GENERATION_WORKER_ID= +GENARRATIVE_EXTERNAL_GENERATION_WORKER_CONCURRENCY=1 +GENARRATIVE_EXTERNAL_GENERATION_WORKER_POLL_INTERVAL_MS=500 +GENARRATIVE_EXTERNAL_GENERATION_WORKER_LEASE_SECONDS=60 +GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=64 +GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS=32 +GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS=16 +GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS=8 +GENARRATIVE_TRACKING_OUTBOX_ENABLED=false +GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox + +GENARRATIVE_OTEL_ENABLED=false +OTEL_SERVICE_NAME=genarrative-worker-smoke-api +OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4318 +OTEL_RESOURCE_ATTRIBUTES=deployment.environment=worker-smoke,service.namespace=genarrative + +GENARRATIVE_INTERNAL_API_SECRET=worker-smoke-internal-secret +GENARRATIVE_JWT_ISSUER=genarrative-worker-smoke +GENARRATIVE_JWT_SECRET=worker-smoke-jwt-secret +AUTH_REFRESH_COOKIE_SECURE=false +GENARRATIVE_DEV_PASSWORD_ENTRY_AUTO_REGISTER_ENABLED=true + +GENARRATIVE_SPACETIME_SERVER_URL=http://spacetimedb:3101 +GENARRATIVE_SPACETIME_DATABASE=${state.database} +GENARRATIVE_SPACETIME_TOKEN= +GENARRATIVE_SPACETIME_POOL_SIZE=2 +GENARRATIVE_SPACETIME_PROCEDURE_TIMEOUT_SECONDS=15 + +GENARRATIVE_LLM_PROVIDER=openai-compatible +GENARRATIVE_LLM_BASE_URL= +GENARRATIVE_LLM_API_KEY= +GENARRATIVE_LLM_MODEL= +VECTOR_ENGINE_BASE_URL= +VECTOR_ENGINE_API_KEY= +ALIYUN_OSS_BUCKET= +ALIYUN_OSS_ENDPOINT=oss-cn-shanghai.aliyuncs.com +ALIYUN_OSS_ACCESS_KEY_ID= +ALIYUN_OSS_ACCESS_KEY_SECRET= +WECHAT_MINIPROGRAM_MESSAGE_TOKEN= +WECHAT_MINIPROGRAM_MESSAGE_ENCODING_AES_KEY= +`; +} + +function defaultState() { + return { + database: defaultDatabase, + spacetimePort: 19101, + httpPort: 19080, + otlpGrpcPort: 15317, + otlpHttpPort: 15318, + }; +} + +function readState() { + if (!existsSync(statePath)) { + return defaultState(); + } + return JSON.parse(readFileSync(statePath, 'utf8')); +} + +async function findAvailablePort(startPort) { + for (let port = startPort; port < startPort + 100; port += 1) { + if (await isPortAvailable(port)) { + return port; + } + } + throw new Error(`未找到可用端口: ${startPort}-${startPort + 99}`); +} + +function isPortAvailable(port) { + return new Promise((resolve) => { + const server = net.createServer(); + server.once('error', () => resolve(false)); + server.once('listening', () => { + server.close(() => resolve(true)); + }); + server.listen(port, '127.0.0.1'); + }); +} + +async function publishModule() { + const state = readState(); + const serverUrl = spacetimeServerUrl(state); + const publishArgs = [ + 'publish', + state.database, + '--server', + serverUrl, + '--module-path', + 'server-rs/crates/spacetime-module', + '--delete-data=on-conflict', + '--anonymous', + '--yes=all', + '--no-config', + ]; + const buildOptions = process.env.GENARRATIVE_WORKER_SMOKE_STDB_BUILD_OPTIONS; + if (buildOptions) { + publishArgs.push('--build-options', buildOptions); + } + await run('spacetime', publishArgs); +} + +async function enqueueSmokeJob(options = {}) { + if (!rawArgs.includes('--no-worker-check')) { + await assertWorkersRunning(); + } + const state = readState(); + const nowMicros = Date.now() * 1000; + const suffix = `${Date.now()}-${Math.random().toString(16).slice(2, 8)}`; + const jobId = `extgen-smoke-${suffix}`; + const label = options.label || rawArgs[0] || 'manual'; + const input = { + job_id: jobId, + dedupe_key: `worker-smoke:${label}:${suffix}`, + job_kind: 'worker_smoke_unsupported', + owner_user_id: 'worker-smoke-user', + source_module: 'worker-smoke', + source_entity_id: `worker-smoke-entity-${suffix}`, + request_label: `worker-smoke ${label}`, + request_payload_json: JSON.stringify({label, suffix}), + max_attempts: 1, + available_at_micros: nowMicros, + created_at_micros: nowMicros, + }; + + await run('spacetime', [ + 'call', + '--server', + spacetimeServerUrl(state), + '--anonymous', + '--yes', + '--no-config', + state.database, + 'enqueue_external_generation_job_and_return', + JSON.stringify(input), + ]); + console.log(`[worker-smoke] 已入队测试 job: ${jobId}`); + return jobId; +} + +async function printQueueStatus() { + console.log('[worker-smoke] external_generation_job 是 private table,status 显示最近 worker 日志:'); + await printServiceLogs('external-generation-worker', 120); +} + +async function waitForJobConsumed(jobId) { + const deadline = Date.now() + 60_000; + let lastOutput = ''; + while (Date.now() < deadline) { + const result = await dockerComposeCapture( + ['logs', '--no-color', 'external-generation-worker'], + {allowFailure: true, quiet: true}, + ); + lastOutput = `${result.stdout}\n${result.stderr}`; + if (lastOutput.includes(jobId) && lastOutput.includes('暂不支持的任务类型')) { + console.log(`[worker-smoke] job ${jobId} 已被 worker 领取并执行到 unsupported 分支。`); + return; + } + await sleep(1000); + } + await printServiceLogs('external-generation-worker', 120); + throw new Error(`等待 worker 消费 job ${jobId} 超时,最后输出:\n${lastOutput}`); +} + +async function assertSavedPortsAvailableForNewProject(state) { + const existingContainers = await getProjectContainerIds(); + if (existingContainers.length > 0) { + return; + } + const ports = [ + ['SpacetimeDB', state.spacetimePort], + ['Nginx', state.httpPort], + ['OTLP gRPC', state.otlpGrpcPort], + ['OTLP HTTP', state.otlpHttpPort], + ]; + for (const [label, port] of ports) { + if (!(await isPortAvailable(port))) { + throw new Error( + `${label} 端口 ${port} 已被占用;可执行 npm run container:worker-smoke -- smoke --force 重新分配隔离端口。`, + ); + } + } +} + +async function getProjectContainerIds() { + const result = await dockerComposeCapture(['ps', '-q'], { + allowFailure: true, + quiet: true, + }); + if (result.code !== 0) { + return []; + } + return result.stdout + .split(/\r?\n/u) + .map((line) => line.trim()) + .filter(Boolean); +} + +async function assertWorkersRunning() { + const result = await dockerComposeCapture( + ['ps', '--status', 'running', '-q', 'external-generation-worker'], + {allowFailure: true, quiet: true}, + ); + const workerIds = result.stdout + .split(/\r?\n/u) + .map((line) => line.trim()) + .filter(Boolean); + if (result.code === 0 && workerIds.length > 0) { + return; + } + await printServiceLogs('external-generation-worker', 80); + throw new Error('external-generation-worker 未处于 running 状态,已输出最近日志。'); +} + +async function printServiceLogs(service, tail = 80) { + await dockerComposeCapture(['logs', '--tail', String(tail), service], { + allowFailure: true, + }); +} + +async function waitForSpacetime() { + const state = readState(); + const url = `${spacetimeServerUrl(state)}/v1/ping`; + await waitForHttp(url, 'SpacetimeDB'); +} + +async function waitForApi() { + const deadline = Date.now() + 120_000; + while (Date.now() < deadline) { + const result = await dockerComposeCapture( + ['exec', '-T', 'api-server', 'curl', '-fsS', 'http://127.0.0.1:8082/healthz'], + {allowFailure: true, quiet: true}, + ); + if (result.code === 0) { + console.log('[worker-smoke] api-server 已就绪: api-server:8082/healthz'); + return; + } + await sleep(2000); + } + throw new Error('api-server 等待超时: api-server:8082/healthz'); +} + +async function waitForHttp(url, label) { + const deadline = Date.now() + 120_000; + while (Date.now() < deadline) { + const result = await runCapture('curl', ['-fsS', '--max-time', '3', url], { + allowFailure: true, + }); + if (result.code === 0) { + console.log(`[worker-smoke] ${label} 已就绪: ${url}`); + return; + } + await sleep(2000); + } + throw new Error(`${label} 等待超时: ${url}`); +} + +async function apiOnlyUpdate({build}) { + const beforeWorkerIds = await getContainerIds('external-generation-worker'); + const args = ['up', '-d', '--no-deps', '--force-recreate']; + if (build) { + args.push('--build'); + } + args.push('api-server'); + await dockerCompose(args); + await waitForApi(); + const afterWorkerIds = await getContainerIds('external-generation-worker'); + if (beforeWorkerIds.join('\n') !== afterWorkerIds.join('\n')) { + throw new Error('API-only 更新不应重建 external-generation-worker 容器'); + } + console.log('[worker-smoke] API-only 更新完成,worker 容器保持不变。'); +} + +async function scaleWorkers(rawCount) { + const count = Number.parseInt(rawCount, 10); + if (!Number.isInteger(count) || count < 0 || count > 16) { + throw new Error(`worker 数量必须是 0-16 的整数: ${rawCount}`); + } + await dockerCompose([ + 'up', + '-d', + '--scale', + `external-generation-worker=${count}`, + 'external-generation-worker', + ]); +} + +async function getContainerIds(service) { + const result = await dockerComposeCapture(['ps', '-q', service]); + return result.stdout + .split(/\r?\n/u) + .map((line) => line.trim()) + .filter(Boolean) + .sort(); +} + +async function dockerCompose(args) { + await run('docker', composeArgs(args), {env: composeEnv()}); +} + +async function dockerComposeCapture(args, options = {}) { + return runCapture('docker', composeArgs(args), { + env: composeEnv(), + ...options, + }); +} + +function composeArgs(args) { + return ['compose', '-p', projectName, '-f', composeFile, ...args]; +} + +function composeEnv() { + const state = readState(); + return { + ...process.env, + GENARRATIVE_CONTAINER_API_ENV_FILE: './worker-smoke/api-server.env', + GENARRATIVE_CONTAINER_SPACETIME_IMAGE: + process.env.GENARRATIVE_CONTAINER_SPACETIME_IMAGE || localSpacetimeImageName(), + GENARRATIVE_CONTAINER_SPACETIME_PORT: String(state.spacetimePort), + GENARRATIVE_CONTAINER_HTTP_PORT: String(state.httpPort), + GENARRATIVE_CONTAINER_OTLP_GRPC_PORT: String(state.otlpGrpcPort), + GENARRATIVE_CONTAINER_OTLP_HTTP_PORT: String(state.otlpHttpPort), + }; +} + +function localSpacetimeImageName() { + return `${projectName}-spacetimedb:2.4.1`; +} + +function spacetimeServerUrl(state) { + return `http://127.0.0.1:${state.spacetimePort}`; +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function run(commandName, args, options = {}) { + const result = await runCapture(commandName, args, options); + if (result.code !== 0 && !options.allowFailure) { + throw new Error(`${commandName} ${args.join(' ')} 失败,exit=${result.code}`); + } + return result; +} + +function runCapture(commandName, args, options = {}) { + return new Promise((resolve, reject) => { + const child = spawn(commandName, args, { + cwd: projectRoot, + env: options.env ?? process.env, + shell: false, + }); + let stdout = ''; + let stderr = ''; + child.stdout?.on('data', (chunk) => { + const text = chunk.toString(); + stdout += text; + if (!options.quiet) { + process.stdout.write(text); + } + }); + child.stderr?.on('data', (chunk) => { + const text = chunk.toString(); + stderr += text; + if (!options.quiet) { + process.stderr.write(text); + } + }); + child.on('error', reject); + child.on('exit', (code, signal) => { + if (signal) { + reject(new Error(`${commandName} 被信号终止: ${signal}`)); + return; + } + resolve({code: code ?? 0, stdout, stderr}); + }); + }); +} + +function printHelp(isError) { + const output = isError ? console.error : console.log; + output(`Usage: npm run container:worker-smoke -- + +Commands: + init [--force] 生成隔离 env 与端口 state + build [--local-binary] [--release] + 构建 api-server / worker 镜像;--local-binary 让容器内 Cargo 复用本机缓存 + up-spacetime 启动隔离 SpacetimeDB 与 otelcol + publish 向隔离 SpacetimeDB 发布 spacetime-module + up [--with-nginx] 启动 api-server / worker;需要 Nginx 时显式加 --with-nginx + enqueue [label] [--no-worker-check] + 写入一个 unsupported 测试 job,验证 worker claim/fail + status 查看最近 worker 日志;external_generation_job 是 private table + api-update [--build] 仅重建/重启 api-server,不触碰 worker + scale 调整 external-generation-worker 实例数 + ps 查看隔离 compose 状态 + logs [service] 查看隔离 compose 日志 + down [-v] 停止隔离 compose,-v 会清理数据卷 + smoke [--force] [--local-binary] [--release] + 一键执行 build -> publish -> up -> enqueue -> api-update -> enqueue +`); +} diff --git a/scripts/deploy/production-api-deploy.sh b/scripts/deploy/production-api-deploy.sh index f50eacfe..e215b3e9 100644 --- a/scripts/deploy/production-api-deploy.sh +++ b/scripts/deploy/production-api-deploy.sh @@ -5,11 +5,11 @@ set -euo pipefail usage() { cat <<'EOF' 用法: - ./scripts/deploy/production-api-deploy.sh --source-dir build/ [--version ] [--release-root /opt/genarrative/releases] [--current-link /opt/genarrative/current] [--service genarrative-api.service] [--worker-service-pattern 'genarrative-external-generation-worker@*.service'] [--no-worker-services] [--health-url http://127.0.0.1:8082/readyz] [--api-env-file /etc/genarrative/api-server.env] [--database genarrative-prod] [--spacetime-server-url http://127.0.0.1:3101] + ./scripts/deploy/production-api-deploy.sh --source-dir build/ [--version ] [--release-root /opt/genarrative/releases] [--current-link /opt/genarrative/current] [--service genarrative-api.service] [--worker-service-pattern 'genarrative-external-generation-worker@*.service'] [--no-worker-services] [--worker-controller-service genarrative-external-generation-controller.service] [--no-worker-controller] [--health-url http://127.0.0.1:8082/readyz] [--api-env-file /etc/genarrative/api-server.env] [--database genarrative-prod] [--spacetime-server-url http://127.0.0.1:3101] 说明: 进入维护模式,校验并发布 api-server 单文件,更新 current 链接,重启 systemd 服务并执行 readiness 检查。 - 默认同时重启已加载的外部生成 worker 实例;未启用 worker 单元时会自动跳过。 + 默认同时重启外部生成 worker controller 和已加载的 worker 实例;未启用 worker 单元时会自动跳过。 若传入 --database,会在重启前把 GENARRATIVE_SPACETIME_DATABASE 写入 api-server 环境文件,避免服务继续读取旧库。 失败时保留维护模式。 EOF @@ -317,6 +317,43 @@ wait_for_worker_services() { return 1 } +ensure_worker_controller_service() { + local service="$1" + + if [[ -z "${service}" ]]; then + return 0 + fi + + if ! systemctl cat "${service}" >/dev/null 2>&1; then + echo "[production-api-deploy] 缺少外部生成 worker controller systemd 单元: ${service}" >&2 + return 1 + fi + + echo "[production-api-deploy] 启用并重启外部生成 worker controller: ${service}" + systemctl enable "${service}" + systemctl restart "${service}" +} + +wait_for_worker_controller_service() { + local service="$1" + + if [[ -z "${service}" ]]; then + return 0 + fi + + echo "[production-api-deploy] 等待外部生成 worker controller active: ${service}" + for _ in {1..30}; do + if systemctl is-active --quiet "${service}"; then + return 0 + fi + sleep 2 + done + + systemctl --no-pager --full status "${service}" || true + echo "[production-api-deploy] 外部生成 worker controller 未在超时时间内进入 active,发布失败。" >&2 + return 1 +} + SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" SOURCE_DIR="" VERSION="" @@ -324,6 +361,7 @@ RELEASE_ROOT="/opt/genarrative/releases" CURRENT_LINK="/opt/genarrative/current" SERVICE_NAME="genarrative-api.service" WORKER_SERVICE_PATTERN="genarrative-external-generation-worker@*.service" +WORKER_CONTROLLER_SERVICE="genarrative-external-generation-controller.service" HEALTH_URL="http://127.0.0.1:8082/readyz" API_ENV_FILE="/etc/genarrative/api-server.env" DATABASE="" @@ -364,6 +402,14 @@ while [[ $# -gt 0 ]]; do WORKER_SERVICE_PATTERN="" shift ;; + --worker-controller-service) + WORKER_CONTROLLER_SERVICE="${2:?缺少 --worker-controller-service 的值}" + shift 2 + ;; + --no-worker-controller) + WORKER_CONTROLLER_SERVICE="" + shift + ;; --health-url) HEALTH_URL="${2:?缺少 --health-url 的值}" shift 2 @@ -488,6 +534,8 @@ echo "[production-api-deploy] 重启服务: ${SERVICE_NAME}" systemctl restart "${SERVICE_NAME}" restart_worker_services "${WORKER_SERVICE_PATTERN}" wait_for_worker_services "${WORKER_SERVICE_PATTERN}" +ensure_worker_controller_service "${WORKER_CONTROLLER_SERVICE}" +wait_for_worker_controller_service "${WORKER_CONTROLLER_SERVICE}" echo "[production-api-deploy] 等待 readiness: ${HEALTH_URL}" for _ in {1..30}; do diff --git a/scripts/jenkins-server-provision.sh b/scripts/jenkins-server-provision.sh index 7e8f772d..9cc62397 100755 --- a/scripts/jenkins-server-provision.sh +++ b/scripts/jenkins-server-provision.sh @@ -5,6 +5,7 @@ PROVISION_TOOLS_DIR="${PROVISION_TOOLS_DIR:-provision-tools}" SPACETIME_BIN_SOURCE="${SPACETIME_BIN_SOURCE:-${PROVISION_TOOLS_DIR}/spacetime/spacetime}" OTELCOL_BIN_SOURCE="${OTELCOL_BIN_SOURCE:-${PROVISION_TOOLS_DIR}/otelcol-contrib}" WORKER_ENV_FILE="${WORKER_ENV_FILE:-/etc/genarrative/external-generation-worker.env}" +CONTROLLER_ENV_FILE="${CONTROLLER_ENV_FILE:-/etc/genarrative/external-generation-controller.env}" GENARRATIVE_OPENSSL_VERSION="${GENARRATIVE_OPENSSL_VERSION:-3.2.0}" GENARRATIVE_OPENSSL_PREFIX="${GENARRATIVE_OPENSSL_PREFIX:-/opt/genarrative/openssl-3.2.0}" GENARRATIVE_OPENSSL_SOURCE_URL="${GENARRATIVE_OPENSSL_SOURCE_URL:-https://github.com/openssl/openssl/releases/download/openssl-${GENARRATIVE_OPENSSL_VERSION}/openssl-${GENARRATIVE_OPENSSL_VERSION}.tar.gz}" @@ -542,6 +543,10 @@ render_external_generation_worker_env_example() { cat deploy/env/external-generation-worker.env.example } +render_external_generation_controller_env_example() { + cat deploy/env/external-generation-controller.env.example +} + render_otelcol_service() { cat deploy/systemd/otelcol-contrib.service } @@ -740,6 +745,18 @@ render_external_generation_worker_service() { deploy/systemd/genarrative-external-generation-worker@.service } +render_external_generation_controller_service() { + local current_escaped api_env_escaped controller_env_escaped + current_escaped="$(escape_sed_replacement "${CURRENT_LINK}")" + api_env_escaped="$(escape_sed_replacement "${API_ENV_FILE}")" + controller_env_escaped="$(escape_sed_replacement "${CONTROLLER_ENV_FILE}")" + sed \ + -e "s|/opt/genarrative/current|${current_escaped}|g" \ + -e "s|/etc/genarrative/api-server.env|${api_env_escaped}|g" \ + -e "s|/etc/genarrative/external-generation-controller.env|${controller_env_escaped}|g" \ + deploy/systemd/genarrative-external-generation-controller.service +} + render_database_backup_service() { local current_escaped env_escaped current_escaped="$(escape_sed_replacement "${CURRENT_LINK}")" @@ -761,6 +778,7 @@ render_health_patrol_service() { require_path deploy/systemd/spacetimedb.service require_path deploy/systemd/genarrative-api.service require_path deploy/systemd/genarrative-external-generation-worker@.service +require_path deploy/systemd/genarrative-external-generation-controller.service require_path deploy/systemd/genarrative-database-backup.service require_path deploy/systemd/genarrative-database-backup.timer require_path deploy/systemd/genarrative-health-patrol.service @@ -772,6 +790,7 @@ require_path deploy/nginx/genarrative-dev-http.conf require_path deploy/nginx/snippets/genarrative-maintenance.conf require_path deploy/env/api-server.env.example require_path deploy/env/external-generation-worker.env.example +require_path deploy/env/external-generation-controller.env.example require_path scripts/deploy/maintenance-on.sh require_path scripts/deploy/maintenance-off.sh require_path scripts/deploy/maintenance-status.sh @@ -816,21 +835,24 @@ sync_spacetime_install "${SPACETIME_ROOT}" spacetimedb_service="$(mktemp)" api_service="$(mktemp)" external_generation_worker_service="$(mktemp)" +external_generation_controller_service="$(mktemp)" database_backup_service="$(mktemp)" health_patrol_service="$(mktemp)" render_spacetimedb_service >"${spacetimedb_service}" render_api_service >"${api_service}" render_external_generation_worker_service >"${external_generation_worker_service}" +render_external_generation_controller_service >"${external_generation_controller_service}" render_database_backup_service >"${database_backup_service}" render_health_patrol_service >"${health_patrol_service}" install_file "${spacetimedb_service}" /etc/systemd/system/spacetimedb.service 0644 install_file "${api_service}" /etc/systemd/system/genarrative-api.service 0644 install_file "${external_generation_worker_service}" /etc/systemd/system/genarrative-external-generation-worker@.service 0644 +install_file "${external_generation_controller_service}" /etc/systemd/system/genarrative-external-generation-controller.service 0644 install_file "${database_backup_service}" /etc/systemd/system/genarrative-database-backup.service 0644 install_file deploy/systemd/genarrative-database-backup.timer /etc/systemd/system/genarrative-database-backup.timer 0644 install_file "${health_patrol_service}" /etc/systemd/system/genarrative-health-patrol.service 0644 install_file deploy/systemd/genarrative-health-patrol.timer /etc/systemd/system/genarrative-health-patrol.timer 0644 -rm -f "${spacetimedb_service}" "${api_service}" "${external_generation_worker_service}" "${database_backup_service}" "${health_patrol_service}" +rm -f "${spacetimedb_service}" "${api_service}" "${external_generation_worker_service}" "${external_generation_controller_service}" "${database_backup_service}" "${health_patrol_service}" if [[ ! -f "${API_ENV_FILE}" ]]; then echo "+ create ${API_ENV_FILE} from example" @@ -855,6 +877,17 @@ else echo "[server-provision] 已存在 worker 环境文件,保留不覆盖: ${WORKER_ENV_FILE}" fi +if [[ ! -f "${CONTROLLER_ENV_FILE}" ]]; then + echo "+ create ${CONTROLLER_ENV_FILE} from example" + if [[ "${DRY_RUN}" != "true" ]]; then + render_external_generation_controller_env_example >"${CONTROLLER_ENV_FILE}" + chmod 0600 "${CONTROLLER_ENV_FILE}" + chown root:root "${CONTROLLER_ENV_FILE}" + fi +else + echo "[server-provision] 已存在 controller 环境文件,保留不覆盖: ${CONTROLLER_ENV_FILE}" +fi + if [[ "${ENABLE_OTELCOL:-true}" == "true" ]]; then sync_otelcol_install otelcol_service="$(mktemp)" @@ -876,7 +909,7 @@ if [[ "${ENABLE_SERVICES}" == "true" ]]; then if [[ "${ENABLE_OTELCOL:-true}" == "true" ]]; then run_cmd systemctl enable otelcol-contrib.service fi - run_cmd systemctl enable spacetimedb.service genarrative-api.service genarrative-database-backup.timer genarrative-external-generation-worker@1.service genarrative-health-patrol.timer + run_cmd systemctl enable spacetimedb.service genarrative-api.service genarrative-database-backup.timer genarrative-external-generation-worker@1.service genarrative-external-generation-controller.service genarrative-health-patrol.timer if [[ "${ENABLE_OTELCOL:-true}" == "true" ]]; then run_cmd systemctl restart otelcol-contrib.service fi @@ -887,8 +920,10 @@ if [[ "${ENABLE_SERVICES}" == "true" ]]; then run_cmd systemctl restart genarrative-api.service run_cmd systemctl enable --now genarrative-external-generation-worker@1.service run_cmd systemctl restart genarrative-external-generation-worker@1.service + run_cmd systemctl enable --now genarrative-external-generation-controller.service + run_cmd systemctl restart genarrative-external-generation-controller.service else - echo "[server-provision] 尚未发现 ${CURRENT_LINK}/api-server,跳过 api-server 和外部生成 worker 首次启动。后续 API deploy 会启用并启动默认 worker 实例。" + echo "[server-provision] 尚未发现 ${CURRENT_LINK}/api-server,跳过 api-server、外部生成 worker 和 controller 首次启动。后续 API deploy 会启用并启动默认 worker 与 controller。" fi fi diff --git a/scripts/ops/production-health-patrol.mjs b/scripts/ops/production-health-patrol.mjs index 219d8e29..01a1265c 100644 --- a/scripts/ops/production-health-patrol.mjs +++ b/scripts/ops/production-health-patrol.mjs @@ -20,9 +20,11 @@ const DEFAULT_PUBLIC_PATHS = [ const DEFAULT_SERVICES = [ 'genarrative-api.service', + 'genarrative-external-generation-controller.service', 'spacetimedb.service', 'nginx.service', ]; +const WORKER_SERVICE_PATTERN = 'genarrative-external-generation-worker@*.service'; function usage() { console.log(`Usage: @@ -216,6 +218,61 @@ async function checkService(serviceName, timeoutMs) { ); } +async function checkActiveWorkerInstances(config) { + const result = await runCommand( + 'systemctl', + [ + 'list-units', + WORKER_SERVICE_PATTERN, + '--type=service', + '--state=active', + '--no-legend', + '--plain', + '--no-pager', + ], + config.timeoutMs, + ); + if (result.code !== 0) { + return checkResult( + 'service:external-generation-workers', + 'CRITICAL', + '无法枚举外部生成 worker 实例', + { + command: result.command, + stderr: result.stderr.trim() || result.error, + }, + ); + } + + const services = result.stdout + .split('\n') + .map((line) => line.trim().split(/\s+/u)[0]) + .filter((service) => + /^genarrative-external-generation-worker@.+\.service$/u.test(service), + ); + + if (services.length === 0) { + return checkResult( + 'service:external-generation-workers', + 'CRITICAL', + '没有 active 的外部生成 worker 实例', + { + command: result.command, + }, + ); + } + + return checkResult( + 'service:external-generation-workers', + 'OK', + `${services.length} 个 worker active`, + { + command: result.command, + services, + }, + ); +} + function requestUrl(url, timeoutMs) { return new Promise((resolve) => { const startedAt = Date.now(); @@ -310,6 +367,10 @@ async function checkRecentJournal(config) { '-u', 'genarrative-api.service', '-u', + 'genarrative-external-generation-controller.service', + '-u', + WORKER_SERVICE_PATTERN, + '-u', 'spacetimedb.service', '-u', 'nginx.service', @@ -426,6 +487,7 @@ async function main() { for (const serviceName of DEFAULT_SERVICES) { checks.push(await checkService(serviceName, config.timeoutMs)); } + checks.push(await checkActiveWorkerInstances(config)); checks.push(await checkHttp('api:/healthz', joinUrl(config.apiBaseUrl, '/healthz'), config)); checks.push(await checkHttp('api:/readyz', joinUrl(config.apiBaseUrl, '/readyz'), config)); diff --git a/server-rs/crates/api-server/Cargo.toml b/server-rs/crates/api-server/Cargo.toml index dc38ad00..93df6459 100644 --- a/server-rs/crates/api-server/Cargo.toml +++ b/server-rs/crates/api-server/Cargo.toml @@ -56,7 +56,7 @@ shared-kernel = { workspace = true } shared-logging = { workspace = true } socket2 = { workspace = true } spacetime-client = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util", "signal", "process"] } tokio-stream = { workspace = true } futures-util = { workspace = true } time = { workspace = true, features = ["formatting"] } diff --git a/server-rs/crates/api-server/src/config.rs b/server-rs/crates/api-server/src/config.rs index 0ff1f7aa..b4af828a 100644 --- a/server-rs/crates/api-server/src/config.rs +++ b/server-rs/crates/api-server/src/config.rs @@ -28,6 +28,13 @@ pub struct AppConfig { pub external_generation_worker_concurrency: usize, pub external_generation_worker_poll_interval: Duration, pub external_generation_worker_lease: Duration, + pub external_generation_controller_min_workers: usize, + pub external_generation_controller_max_workers: usize, + pub external_generation_controller_target_jobs_per_worker: usize, + pub external_generation_controller_poll_interval: Duration, + pub external_generation_controller_scale_down_idle_rounds: u32, + pub external_generation_controller_service_template: String, + pub external_generation_controller_dry_run: bool, pub max_concurrent_requests: Option, pub gallery_max_concurrent_requests: Option, pub detail_max_concurrent_requests: Option, @@ -181,6 +188,7 @@ pub struct AppConfig { pub enum ProcessRole { Api, ExternalGenerationWorker, + ExternalGenerationController, All, } @@ -208,6 +216,7 @@ impl ProcessRole { match self { Self::Api => "api", Self::ExternalGenerationWorker => "external-generation-worker", + Self::ExternalGenerationController => "external-generation-controller", Self::All => "all", } } @@ -219,6 +228,10 @@ impl ProcessRole { pub fn runs_external_generation_worker(self) -> bool { matches!(self, Self::ExternalGenerationWorker | Self::All) } + + pub fn runs_external_generation_controller(self) -> bool { + matches!(self, Self::ExternalGenerationController) + } } impl Default for AppConfig { @@ -234,6 +247,14 @@ impl Default for AppConfig { external_generation_worker_concurrency: 2, external_generation_worker_poll_interval: Duration::from_millis(2_000), external_generation_worker_lease: Duration::from_secs(3_600), + external_generation_controller_min_workers: 1, + external_generation_controller_max_workers: 8, + external_generation_controller_target_jobs_per_worker: 2, + external_generation_controller_poll_interval: Duration::from_millis(10_000), + external_generation_controller_scale_down_idle_rounds: 6, + external_generation_controller_service_template: + "genarrative-external-generation-worker@{}.service".to_string(), + external_generation_controller_dry_run: false, max_concurrent_requests: None, gallery_max_concurrent_requests: None, detail_max_concurrent_requests: None, @@ -459,6 +480,49 @@ impl AppConfig { ]) { config.external_generation_worker_lease = Duration::from_secs(lease_seconds.max(1)); } + if let Some(min_workers) = + read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MIN_WORKERS"]) + { + config.external_generation_controller_min_workers = min_workers; + } + if let Some(max_workers) = + read_first_usize_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_MAX_WORKERS"]) + { + config.external_generation_controller_max_workers = max_workers; + } + if config.external_generation_controller_max_workers + < config.external_generation_controller_min_workers + { + config.external_generation_controller_max_workers = + config.external_generation_controller_min_workers; + } + if let Some(target_jobs_per_worker) = read_first_usize_env(&[ + "GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_TARGET_JOBS_PER_WORKER", + ]) { + config.external_generation_controller_target_jobs_per_worker = + target_jobs_per_worker.max(1); + } + if let Some(poll_interval_ms) = read_first_positive_u64_env(&[ + "GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_POLL_INTERVAL_MS", + ]) { + config.external_generation_controller_poll_interval = + Duration::from_millis(poll_interval_ms); + } + if let Some(idle_rounds) = read_first_u32_env(&[ + "GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SCALE_DOWN_IDLE_ROUNDS", + ]) { + config.external_generation_controller_scale_down_idle_rounds = idle_rounds; + } + if let Some(service_template) = read_first_non_empty_env(&[ + "GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_SERVICE_TEMPLATE", + ]) { + config.external_generation_controller_service_template = service_template; + } + if let Some(dry_run) = + read_first_bool_env(&["GENARRATIVE_EXTERNAL_GENERATION_CONTROLLER_DRY_RUN"]) + { + config.external_generation_controller_dry_run = dry_run; + } if let Some(max_concurrent_requests) = read_first_usize_env(&["GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"]) { @@ -1214,6 +1278,9 @@ fn parse_process_role(value: &str) -> Option { "external-generation-worker" | "external_generation_worker" | "worker" => { Some(ProcessRole::ExternalGenerationWorker) } + "external-generation-controller" | "external_generation_controller" | "controller" => { + Some(ProcessRole::ExternalGenerationController) + } "all" => Some(ProcessRole::All), _ => None, } @@ -1419,15 +1486,29 @@ mod tests { parse_process_role("worker"), Some(ProcessRole::ExternalGenerationWorker) ); + assert_eq!( + parse_process_role("controller"), + Some(ProcessRole::ExternalGenerationController) + ); + assert_eq!( + parse_process_role("'external_generation_controller'"), + Some(ProcessRole::ExternalGenerationController) + ); assert_eq!(parse_process_role("all"), Some(ProcessRole::All)); assert_eq!(parse_process_role("unknown"), None); assert!(ProcessRole::Api.runs_http()); assert!(!ProcessRole::Api.runs_external_generation_worker()); + assert!(!ProcessRole::Api.runs_external_generation_controller()); assert!(!ProcessRole::ExternalGenerationWorker.runs_http()); assert!(ProcessRole::ExternalGenerationWorker.runs_external_generation_worker()); + assert!(!ProcessRole::ExternalGenerationWorker.runs_external_generation_controller()); + assert!(!ProcessRole::ExternalGenerationController.runs_http()); + assert!(!ProcessRole::ExternalGenerationController.runs_external_generation_worker()); + assert!(ProcessRole::ExternalGenerationController.runs_external_generation_controller()); assert!(ProcessRole::All.runs_http()); assert!(ProcessRole::All.runs_external_generation_worker()); + assert!(!ProcessRole::All.runs_external_generation_controller()); } #[test] diff --git a/server-rs/crates/api-server/src/external_generation_worker_controller.rs b/server-rs/crates/api-server/src/external_generation_worker_controller.rs new file mode 100644 index 00000000..3c4e588c --- /dev/null +++ b/server-rs/crates/api-server/src/external_generation_worker_controller.rs @@ -0,0 +1,465 @@ +use std::{collections::BTreeSet, future::Future, io, pin::Pin, process::Stdio, time::Duration}; + +use spacetime_client::ExternalGenerationQueueStatsRecord; +use tokio::{ + process::Command, + time::{Instant, sleep}, +}; +use tracing::{error, info, warn}; + +use crate::state::AppState; + +#[derive(Clone, Debug)] +struct ExternalGenerationWorkerControllerConfig { + min_workers: usize, + max_workers: usize, + target_jobs_per_worker: usize, + poll_interval: Duration, + scale_down_idle_rounds: u32, + service_template: String, + dry_run: bool, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct ExternalGenerationWorkerControllerDecision { + desired_workers: usize, + should_scale_down: bool, + idle_rounds: u32, +} + +#[derive(Debug, Default)] +struct ExternalGenerationWorkerControllerState { + idle_rounds: u32, +} + +pub(crate) async fn run_external_generation_worker_controller( + state: AppState, +) -> Result<(), io::Error> { + let config = ExternalGenerationWorkerControllerConfig::from_state(&state); + let mut controller_state = ExternalGenerationWorkerControllerState::default(); + let mut shutdown = external_generation_controller_shutdown_signal(); + + info!( + min_workers = config.min_workers, + max_workers = config.max_workers, + target_jobs_per_worker = config.target_jobs_per_worker, + poll_interval_ms = config.poll_interval.as_millis(), + scale_down_idle_rounds = config.scale_down_idle_rounds, + service_template = config.service_template, + dry_run = config.dry_run, + "external generation worker controller 已启动" + ); + + loop { + let tick = run_external_generation_controller_tick(&state, &config, &mut controller_state); + tokio::select! { + _ = shutdown.as_mut() => { + info!("external generation worker controller 收到停机信号"); + return Ok(()); + } + result = tick => { + if let Err(error) = result { + error!(error = %error, "external generation worker controller 本轮扩缩容失败"); + } + } + } + + let next_tick = sleep(config.poll_interval); + tokio::pin!(next_tick); + tokio::select! { + _ = shutdown.as_mut() => { + info!("external generation worker controller 收到停机信号"); + return Ok(()); + } + _ = &mut next_tick => {} + } + } +} + +async fn run_external_generation_controller_tick( + state: &AppState, + config: &ExternalGenerationWorkerControllerConfig, + controller_state: &mut ExternalGenerationWorkerControllerState, +) -> Result<(), String> { + let stats = state + .spacetime_client() + .get_external_generation_queue_stats() + .await + .map_err(|error| format!("读取 external_generation_job 队列统计失败:{error}"))?; + let active_instances = list_active_external_generation_worker_instances(config).await?; + let current_workers = active_instances.len(); + let decision = decide_external_generation_worker_target( + &stats, + current_workers, + controller_state.idle_rounds, + config, + ); + controller_state.idle_rounds = decision.idle_rounds; + + info!( + pending = stats.pending_count, + delayed_pending = stats.delayed_pending_count, + claimable = stats.claimable_count, + running_active = stats.running_active_count, + expired_running = stats.expired_running_count, + oldest_claimable_age_ms = stats.oldest_claimable_age_micros.unwrap_or(0) / 1_000, + current_workers, + desired_workers = decision.desired_workers, + idle_rounds = decision.idle_rounds, + "external generation worker controller 完成队列评估" + ); + + reconcile_external_generation_worker_instances(config, &active_instances, &decision).await +} + +fn decide_external_generation_worker_target( + stats: &ExternalGenerationQueueStatsRecord, + current_workers: usize, + previous_idle_rounds: u32, + config: &ExternalGenerationWorkerControllerConfig, +) -> ExternalGenerationWorkerControllerDecision { + let pressure = stats + .claimable_pending_count + .saturating_add(stats.running_active_count) + .saturating_add(stats.expired_running_count); + let desired_from_pressure = + ceil_div_usize(pressure as usize, config.target_jobs_per_worker.max(1)); + let desired_workers = desired_from_pressure.clamp(config.min_workers, config.max_workers); + let is_idle = stats.claimable_count == 0 + && stats.expired_running_count == 0 + && stats.running_active_count == 0 + && desired_workers <= config.min_workers; + let idle_rounds = if is_idle { + previous_idle_rounds.saturating_add(1) + } else { + 0 + }; + let should_scale_down = current_workers > desired_workers + && idle_rounds >= config.scale_down_idle_rounds + && config.scale_down_idle_rounds > 0; + + ExternalGenerationWorkerControllerDecision { + desired_workers, + should_scale_down, + idle_rounds, + } +} + +async fn reconcile_external_generation_worker_instances( + config: &ExternalGenerationWorkerControllerConfig, + active_instances: &BTreeSet, + decision: &ExternalGenerationWorkerControllerDecision, +) -> Result<(), String> { + let current_workers = active_instances.len(); + let mut started = 0usize; + for instance in 1..=config.max_workers { + if current_workers.saturating_add(started) >= decision.desired_workers { + break; + } + if !active_instances.contains(&instance) { + systemctl_worker_instance(config, "start", instance).await?; + started = started.saturating_add(1); + } + } + + if decision.desired_workers > current_workers && started == 0 { + warn!( + current_workers, + desired_workers = decision.desired_workers, + "external generation worker controller 未找到可启动的缺口实例" + ); + } + if started > 0 { + return Ok(()); + } + + if decision.should_scale_down && decision.desired_workers < current_workers { + if let Some(instance) = active_instances + .iter() + .rev() + .copied() + .find(|instance| *instance > config.min_workers.max(1)) + { + systemctl_worker_instance(config, "stop", instance).await?; + } + } + + Ok(()) +} + +async fn list_active_external_generation_worker_instances( + config: &ExternalGenerationWorkerControllerConfig, +) -> Result, String> { + let mut active_instances = BTreeSet::new(); + for instance in 1..=config.max_workers { + if is_external_generation_worker_instance_active(config, instance).await? { + active_instances.insert(instance); + } + } + Ok(active_instances) +} + +async fn is_external_generation_worker_instance_active( + config: &ExternalGenerationWorkerControllerConfig, + instance: usize, +) -> Result { + let service = format_worker_service_name(&config.service_template, instance)?; + if config.dry_run { + return Ok(instance <= config.min_workers); + } + + let output = Command::new("systemctl") + .arg("is-active") + .arg("--quiet") + .arg(&service) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .output() + .await + .map_err(|error| format!("执行 systemctl is-active {service} 失败:{error}"))?; + Ok(output.status.success()) +} + +async fn systemctl_worker_instance( + config: &ExternalGenerationWorkerControllerConfig, + action: &str, + instance: usize, +) -> Result<(), String> { + let service = format_worker_service_name(&config.service_template, instance)?; + if config.dry_run { + info!( + action, + service, "external generation worker controller dry-run 跳过 systemctl" + ); + return Ok(()); + } + + let started_at = Instant::now(); + let output = Command::new("systemctl") + .arg(action) + .arg(&service) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output() + .await + .map_err(|error| format!("执行 systemctl {action} {service} 失败:{error}"))?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!( + "systemctl {action} {service} 返回失败 status={} stderr={}", + output.status, stderr + )); + } + + info!( + action, + service, + elapsed_ms = started_at.elapsed().as_millis(), + "external generation worker controller 已执行 systemctl" + ); + Ok(()) +} + +fn format_worker_service_name(template: &str, instance: usize) -> Result { + let instance = instance.to_string(); + if template.contains("{}") { + return Ok(template.replacen("{}", &instance, 1)); + } + if template.contains("%i") { + return Ok(template.replacen("%i", &instance, 1)); + } + Err("external generation controller service template 必须包含 {} 或 %i".to_string()) +} + +fn ceil_div_usize(value: usize, divisor: usize) -> usize { + if value == 0 { + 0 + } else { + value.saturating_add(divisor.saturating_sub(1)) / divisor.max(1) + } +} + +impl ExternalGenerationWorkerControllerConfig { + fn from_state(state: &AppState) -> Self { + let min_workers = state.config.external_generation_controller_min_workers; + let max_workers = state + .config + .external_generation_controller_max_workers + .max(min_workers); + Self { + min_workers, + max_workers, + target_jobs_per_worker: state + .config + .external_generation_controller_target_jobs_per_worker + .max(1), + poll_interval: state.config.external_generation_controller_poll_interval, + scale_down_idle_rounds: state + .config + .external_generation_controller_scale_down_idle_rounds, + service_template: state + .config + .external_generation_controller_service_template + .clone(), + dry_run: state.config.external_generation_controller_dry_run, + } + } +} + +type ExternalGenerationControllerShutdownSignal = Pin + Send>>; + +fn external_generation_controller_shutdown_signal() -> ExternalGenerationControllerShutdownSignal { + Box::pin(async { + wait_for_external_generation_controller_shutdown_signal().await; + }) +} + +#[cfg(unix)] +async fn wait_for_external_generation_controller_shutdown_signal() { + use tokio::signal::unix::{SignalKind, signal}; + + let mut sigterm = signal(SignalKind::terminate()).ok(); + tokio::select! { + result = tokio::signal::ctrl_c() => { + if let Err(error) = result { + warn!(error = %error, "external generation worker controller 监听 SIGINT 失败"); + } + } + _ = async { + if let Some(sigterm) = sigterm.as_mut() { + sigterm.recv().await; + } else { + std::future::pending::<()>().await; + } + } => {} + } +} + +#[cfg(not(unix))] +async fn wait_for_external_generation_controller_shutdown_signal() { + if let Err(error) = tokio::signal::ctrl_c().await { + warn!(error = %error, "external generation worker controller 监听 Ctrl-C 失败"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn scales_up_to_max_when_queue_pressure_is_high() { + let config = controller_config_fixture(); + let stats = stats_fixture(120, 0, 8); + + let decision = decide_external_generation_worker_target(&stats, 1, 0, &config); + + assert_eq!(decision.desired_workers, 8); + assert!(!decision.should_scale_down); + assert_eq!(decision.idle_rounds, 0); + } + + #[test] + fn scale_down_requires_consecutive_idle_rounds() { + let config = controller_config_fixture(); + let stats = stats_fixture(0, 0, 0); + + let first = decide_external_generation_worker_target(&stats, 5, 0, &config); + let ready = decide_external_generation_worker_target( + &stats, + 5, + config.scale_down_idle_rounds.saturating_sub(1), + &config, + ); + + assert_eq!(first.desired_workers, config.min_workers); + assert!(!first.should_scale_down); + assert!(ready.should_scale_down); + } + + #[test] + fn running_jobs_hold_capacity_before_scale_down() { + let config = controller_config_fixture(); + let stats = stats_fixture(0, 6, 0); + + let decision = decide_external_generation_worker_target(&stats, 5, 5, &config); + + assert_eq!(decision.desired_workers, 3); + assert!(!decision.should_scale_down); + assert_eq!(decision.idle_rounds, 0); + } + + #[test] + fn expired_running_jobs_are_not_counted_twice_as_claimable_pressure() { + let config = controller_config_fixture(); + let stats = stats_fixture(0, 0, 3); + + let decision = decide_external_generation_worker_target(&stats, 1, 0, &config); + + assert_eq!(decision.desired_workers, 2); + assert!(!decision.should_scale_down); + } + + #[test] + fn formats_worker_service_name_with_supported_templates() { + assert_eq!( + format_worker_service_name("genarrative-external-generation-worker@{}.service", 3) + .expect("format"), + "genarrative-external-generation-worker@3.service" + ); + assert_eq!( + format_worker_service_name("worker@%i.service", 7).expect("format"), + "worker@7.service" + ); + assert!(format_worker_service_name("worker.service", 1).is_err()); + } + + #[tokio::test] + async fn dry_run_reconcile_does_not_start_low_number_gaps_when_capacity_is_enough() { + let config = controller_config_fixture(); + let active_instances = BTreeSet::from([3usize, 4usize]); + let decision = ExternalGenerationWorkerControllerDecision { + desired_workers: 2, + should_scale_down: false, + idle_rounds: 0, + }; + + let result = + reconcile_external_generation_worker_instances(&config, &active_instances, &decision) + .await; + + assert!(result.is_ok()); + } + + fn controller_config_fixture() -> ExternalGenerationWorkerControllerConfig { + ExternalGenerationWorkerControllerConfig { + min_workers: 1, + max_workers: 8, + target_jobs_per_worker: 2, + poll_interval: Duration::from_secs(10), + scale_down_idle_rounds: 3, + service_template: "genarrative-external-generation-worker@{}.service".to_string(), + dry_run: true, + } + } + + fn stats_fixture( + claimable_pending_count: u32, + running_active_count: u32, + expired_running_count: u32, + ) -> ExternalGenerationQueueStatsRecord { + let claimable_count = claimable_pending_count.saturating_add(expired_running_count); + ExternalGenerationQueueStatsRecord { + pending_count: claimable_pending_count, + delayed_pending_count: 0, + claimable_pending_count, + running_active_count, + expired_running_count, + terminal_count: 0, + claimable_count, + oldest_claimable_age_micros: None, + now_micros: 0, + } + } +} diff --git a/server-rs/crates/api-server/src/main.rs b/server-rs/crates/api-server/src/main.rs index 3b6765ed..1cb2ee7c 100644 --- a/server-rs/crates/api-server/src/main.rs +++ b/server-rs/crates/api-server/src/main.rs @@ -41,6 +41,7 @@ mod edutainment_baby_object; mod error_middleware; mod external_api_audit; mod external_generation_worker; +mod external_generation_worker_controller; pub(crate) mod generated_asset_sheets; mod generated_image_assets; mod health; @@ -116,6 +117,7 @@ use crate::{ app::{build_router, build_spacetime_unavailable_router}, config::AppConfig, external_generation_worker::run_external_generation_worker, + external_generation_worker_controller::run_external_generation_worker_controller, state::{AppState, AppStateInitError}, tracking_outbox::TrackingOutbox, wallet_refund_outbox::WalletRefundOutbox, @@ -188,9 +190,18 @@ async fn run_worker_only(config: AppConfig) -> Result<(), io::Error> { spawn_app_state_background_workers(&state); info!( process_role = process_role.as_str(), - "api-server 以 worker 角色启动" + "api-server 以非 HTTP 角色启动" ); - run_external_generation_worker(state).await + if process_role.runs_external_generation_worker() { + run_external_generation_worker(state).await + } else if process_role.runs_external_generation_controller() { + run_external_generation_worker_controller(state).await + } else { + Err(io::Error::other(format!( + "不支持的非 HTTP 进程角色:{}", + process_role.as_str() + ))) + } } async fn run_http_role(config: AppConfig) -> Result<(), io::Error> { diff --git a/server-rs/crates/spacetime-client/src/external_generation.rs b/server-rs/crates/spacetime-client/src/external_generation.rs index 5fda9610..a437bc43 100644 --- a/server-rs/crates/spacetime-client/src/external_generation.rs +++ b/server-rs/crates/spacetime-client/src/external_generation.rs @@ -126,4 +126,23 @@ impl SpacetimeClient { ) .await } + + pub async fn get_external_generation_queue_stats( + &self, + ) -> Result { + self.call_after_connect( + "get_external_generation_queue_stats_and_return", + move |connection, sender| { + connection + .procedures() + .get_external_generation_queue_stats_and_return_then(move |_, result| { + let mapped = result + .map_err(SpacetimeClientError::from_sdk_error) + .and_then(map_external_generation_queue_stats_result); + send_once(&sender, mapped); + }); + }, + ) + .await + } } diff --git a/server-rs/crates/spacetime-client/src/lib.rs b/server-rs/crates/spacetime-client/src/lib.rs index 22465f95..2394403d 100644 --- a/server-rs/crates/spacetime-client/src/lib.rs +++ b/server-rs/crates/spacetime-client/src/lib.rs @@ -33,12 +33,13 @@ pub use mapper::{ CustomWorldWorkSummaryRecord, ExternalGenerationJobClaimRecordInput, ExternalGenerationJobCompleteRecordInput, ExternalGenerationJobEnqueueRecordInput, ExternalGenerationJobFailRecordInput, ExternalGenerationJobRecord, - ExternalGenerationJobRenewLeaseRecordInput, JumpHopActionRequest, JumpHopActionResponse, - JumpHopActionType, JumpHopCharacterAsset, JumpHopDifficulty, JumpHopDraftResponse, - JumpHopGalleryCardResponse, JumpHopGalleryDetailResponse, JumpHopGalleryResponse, - JumpHopGenerationStatus, JumpHopJumpRequest, JumpHopJumpResponse, JumpHopJumpResult, - JumpHopLastJump, JumpHopPath, JumpHopPlatform, JumpHopRestartRunRequest, JumpHopRunResponse, - JumpHopRunStatus, JumpHopRuntimeRunSnapshotResponse, JumpHopScoring, JumpHopSessionResponse, + ExternalGenerationJobRenewLeaseRecordInput, ExternalGenerationQueueStatsRecord, + JumpHopActionRequest, JumpHopActionResponse, JumpHopActionType, JumpHopCharacterAsset, + JumpHopDifficulty, JumpHopDraftResponse, JumpHopGalleryCardResponse, + JumpHopGalleryDetailResponse, JumpHopGalleryResponse, JumpHopGenerationStatus, + JumpHopJumpRequest, JumpHopJumpResponse, JumpHopJumpResult, JumpHopLastJump, JumpHopPath, + JumpHopPlatform, JumpHopRestartRunRequest, JumpHopRunResponse, JumpHopRunStatus, + JumpHopRuntimeRunSnapshotResponse, JumpHopScoring, JumpHopSessionResponse, JumpHopSessionSnapshotResponse, JumpHopStartRunRequest, JumpHopStylePreset, JumpHopTileAsset, JumpHopTileType, JumpHopWorkDetailResponse, JumpHopWorkMutationResponse, JumpHopWorkProfileResponse, JumpHopWorkSummaryResponse, JumpHopWorksResponse, diff --git a/server-rs/crates/spacetime-client/src/mapper.rs b/server-rs/crates/spacetime-client/src/mapper.rs index c64108f6..622098ae 100644 --- a/server-rs/crates/spacetime-client/src/mapper.rs +++ b/server-rs/crates/spacetime-client/src/mapper.rs @@ -73,6 +73,7 @@ pub use self::external_generation::{ ExternalGenerationJobClaimRecordInput, ExternalGenerationJobCompleteRecordInput, ExternalGenerationJobEnqueueRecordInput, ExternalGenerationJobFailRecordInput, ExternalGenerationJobRecord, ExternalGenerationJobRenewLeaseRecordInput, + ExternalGenerationQueueStatsRecord, }; pub use self::jump_hop::{ JumpHopActionRequest, JumpHopActionResponse, JumpHopActionType, JumpHopCharacterAsset, @@ -186,6 +187,7 @@ pub(crate) use self::custom_world::{ }; pub(crate) use self::external_generation::{ map_external_generation_job_claim_result, map_external_generation_job_procedure_result, + map_external_generation_queue_stats_result, }; pub(crate) use self::inventory::{ map_runtime_inventory_state_procedure_result, map_runtime_item_reward_item_snapshot, diff --git a/server-rs/crates/spacetime-client/src/mapper/external_generation.rs b/server-rs/crates/spacetime-client/src/mapper/external_generation.rs index 52d55f4f..e0dafe26 100644 --- a/server-rs/crates/spacetime-client/src/mapper/external_generation.rs +++ b/server-rs/crates/spacetime-client/src/mapper/external_generation.rs @@ -94,6 +94,30 @@ pub(crate) fn map_external_generation_job_claim_result( .collect()) } +pub(crate) fn map_external_generation_queue_stats_result( + result: ExternalGenerationQueueStatsProcedureResult, +) -> Result { + if !result.ok { + return Err(SpacetimeClientError::procedure_failed(result.error_message)); + } + + let stats = result.stats.ok_or_else(|| { + SpacetimeClientError::missing_snapshot("external_generation queue stats 快照") + })?; + + Ok(ExternalGenerationQueueStatsRecord { + pending_count: stats.pending_count, + delayed_pending_count: stats.delayed_pending_count, + claimable_pending_count: stats.claimable_pending_count, + running_active_count: stats.running_active_count, + expired_running_count: stats.expired_running_count, + terminal_count: stats.terminal_count, + claimable_count: stats.claimable_count, + oldest_claimable_age_micros: stats.oldest_claimable_age_micros, + now_micros: stats.now_micros, + }) +} + fn map_external_generation_job_snapshot( snapshot: ExternalGenerationJobSnapshot, ) -> ExternalGenerationJobRecord { @@ -199,3 +223,16 @@ pub struct ExternalGenerationJobRecord { pub updated_at: String, pub lease_token: Option, } + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ExternalGenerationQueueStatsRecord { + pub pending_count: u32, + pub delayed_pending_count: u32, + pub claimable_pending_count: u32, + pub running_active_count: u32, + pub expired_running_count: u32, + pub terminal_count: u32, + pub claimable_count: u32, + pub oldest_claimable_age_micros: Option, + pub now_micros: i64, +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings.rs b/server-rs/crates/spacetime-client/src/module_bindings.rs index c3ffbcab..29fa2e03 100644 --- a/server-rs/crates/spacetime-client/src/module_bindings.rs +++ b/server-rs/crates/spacetime-client/src/module_bindings.rs @@ -360,6 +360,8 @@ pub mod external_generation_job_renew_lease_input_type; pub mod external_generation_job_snapshot_type; pub mod external_generation_job_table; pub mod external_generation_job_type; +pub mod external_generation_queue_stats_procedure_result_type; +pub mod external_generation_queue_stats_snapshot_type; pub mod fail_ai_task_and_return_procedure; pub mod fail_external_generation_job_and_return_procedure; pub mod finalize_big_fish_agent_message_turn_procedure; @@ -386,6 +388,7 @@ pub mod get_custom_world_agent_session_procedure; pub mod get_custom_world_gallery_detail_by_code_procedure; pub mod get_custom_world_gallery_detail_procedure; pub mod get_custom_world_library_detail_procedure; +pub mod get_external_generation_queue_stats_and_return_procedure; pub mod get_jump_hop_agent_session_procedure; pub mod get_jump_hop_leaderboard_procedure; pub mod get_jump_hop_run_procedure; @@ -1491,6 +1494,8 @@ pub use external_generation_job_renew_lease_input_type::ExternalGenerationJobRen pub use external_generation_job_snapshot_type::ExternalGenerationJobSnapshot; pub use external_generation_job_table::*; pub use external_generation_job_type::ExternalGenerationJob; +pub use external_generation_queue_stats_procedure_result_type::ExternalGenerationQueueStatsProcedureResult; +pub use external_generation_queue_stats_snapshot_type::ExternalGenerationQueueStatsSnapshot; pub use fail_ai_task_and_return_procedure::fail_ai_task_and_return; pub use fail_external_generation_job_and_return_procedure::fail_external_generation_job_and_return; pub use finalize_big_fish_agent_message_turn_procedure::finalize_big_fish_agent_message_turn; @@ -1517,6 +1522,7 @@ pub use get_custom_world_agent_session_procedure::get_custom_world_agent_session pub use get_custom_world_gallery_detail_by_code_procedure::get_custom_world_gallery_detail_by_code; pub use get_custom_world_gallery_detail_procedure::get_custom_world_gallery_detail; pub use get_custom_world_library_detail_procedure::get_custom_world_library_detail; +pub use get_external_generation_queue_stats_and_return_procedure::get_external_generation_queue_stats_and_return; pub use get_jump_hop_agent_session_procedure::get_jump_hop_agent_session; pub use get_jump_hop_leaderboard_procedure::get_jump_hop_leaderboard; pub use get_jump_hop_run_procedure::get_jump_hop_run; diff --git a/server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_procedure_result_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_procedure_result_type.rs new file mode 100644 index 00000000..2061d33f --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_procedure_result_type.rs @@ -0,0 +1,19 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +use super::external_generation_queue_stats_snapshot_type::ExternalGenerationQueueStatsSnapshot; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct ExternalGenerationQueueStatsProcedureResult { + pub ok: bool, + pub stats: Option, + pub error_message: Option, +} + +impl __sdk::InModule for ExternalGenerationQueueStatsProcedureResult { + type Module = super::RemoteModule; +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_snapshot_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_snapshot_type.rs new file mode 100644 index 00000000..ae98e521 --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/external_generation_queue_stats_snapshot_type.rs @@ -0,0 +1,23 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct ExternalGenerationQueueStatsSnapshot { + pub pending_count: u32, + pub delayed_pending_count: u32, + pub claimable_pending_count: u32, + pub running_active_count: u32, + pub expired_running_count: u32, + pub terminal_count: u32, + pub claimable_count: u32, + pub oldest_claimable_age_micros: Option, + pub now_micros: i64, +} + +impl __sdk::InModule for ExternalGenerationQueueStatsSnapshot { + type Module = super::RemoteModule; +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/get_external_generation_queue_stats_and_return_procedure.rs b/server-rs/crates/spacetime-client/src/module_bindings/get_external_generation_queue_stats_and_return_procedure.rs new file mode 100644 index 00000000..9d7a98a0 --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/get_external_generation_queue_stats_and_return_procedure.rs @@ -0,0 +1,54 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +use super::external_generation_queue_stats_procedure_result_type::ExternalGenerationQueueStatsProcedureResult; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +struct GetExternalGenerationQueueStatsAndReturnArgs {} + +impl __sdk::InModule for GetExternalGenerationQueueStatsAndReturnArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the procedure `get_external_generation_queue_stats_and_return`. +/// +/// Implemented for [`super::RemoteProcedures`]. +pub trait get_external_generation_queue_stats_and_return { + fn get_external_generation_queue_stats_and_return(&self) { + self.get_external_generation_queue_stats_and_return_then(|_, _| {}); + } + + fn get_external_generation_queue_stats_and_return_then( + &self, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ); +} + +impl get_external_generation_queue_stats_and_return for super::RemoteProcedures { + fn get_external_generation_queue_stats_and_return_then( + &self, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ) { + self.imp + .invoke_procedure_with_callback::<_, ExternalGenerationQueueStatsProcedureResult>( + "get_external_generation_queue_stats_and_return", + GetExternalGenerationQueueStatsAndReturnArgs {}, + __callback, + ); + } +} diff --git a/server-rs/crates/spacetime-module/src/external_generation.rs b/server-rs/crates/spacetime-module/src/external_generation.rs index f44e5b8f..829d9969 100644 --- a/server-rs/crates/spacetime-module/src/external_generation.rs +++ b/server-rs/crates/spacetime-module/src/external_generation.rs @@ -137,6 +137,27 @@ pub struct ExternalGenerationJobProcedureResult { pub error_message: Option, } +#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] +pub struct ExternalGenerationQueueStatsSnapshot { + pub pending_count: u32, + pub delayed_pending_count: u32, + pub claimable_pending_count: u32, + pub running_active_count: u32, + pub expired_running_count: u32, + // 中文注释:保留字段兼容已生成 bindings;controller 只按非终态队列压力扩缩容,不每轮扫描历史终态任务。 + pub terminal_count: u32, + pub claimable_count: u32, + pub oldest_claimable_age_micros: Option, + pub now_micros: i64, +} + +#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] +pub struct ExternalGenerationQueueStatsProcedureResult { + pub ok: bool, + pub stats: Option, + pub error_message: Option, +} + #[spacetimedb::procedure] pub fn enqueue_external_generation_job_and_return( ctx: &mut ProcedureContext, @@ -197,6 +218,24 @@ pub fn fail_external_generation_job_and_return( } } +#[spacetimedb::procedure] +pub fn get_external_generation_queue_stats_and_return( + ctx: &mut ProcedureContext, +) -> ExternalGenerationQueueStatsProcedureResult { + match ctx.try_with_tx(|tx| get_external_generation_queue_stats_tx(tx)) { + Ok(stats) => ExternalGenerationQueueStatsProcedureResult { + ok: true, + stats: Some(stats), + error_message: None, + }, + Err(message) => ExternalGenerationQueueStatsProcedureResult { + ok: false, + stats: None, + error_message: Some(message), + }, + } +} + fn enqueue_external_generation_job_tx( ctx: &ReducerContext, input: ExternalGenerationJobEnqueueInput, @@ -427,6 +466,58 @@ fn fail_external_generation_job_tx( Ok(map_external_generation_job_row(row)) } +fn get_external_generation_queue_stats_tx( + ctx: &ReducerContext, +) -> Result { + let now = ctx.timestamp; + let now_micros = now.to_micros_since_unix_epoch(); + let mut stats = ExternalGenerationQueueStatsSnapshot { + pending_count: 0, + delayed_pending_count: 0, + claimable_pending_count: 0, + running_active_count: 0, + expired_running_count: 0, + terminal_count: 0, + claimable_count: 0, + oldest_claimable_age_micros: None, + now_micros, + }; + + for row in ctx + .db + .external_generation_job() + .by_external_generation_job_status_available() + .filter(&EXTERNAL_GENERATION_STATUS_PENDING.to_string()) + { + stats.pending_count = stats.pending_count.saturating_add(1); + if is_external_generation_job_claimable(&row, now) { + stats.claimable_pending_count = stats.claimable_pending_count.saturating_add(1); + record_external_generation_claimable_age(&mut stats, &row, now_micros); + } else { + stats.delayed_pending_count = stats.delayed_pending_count.saturating_add(1); + } + } + + for row in ctx + .db + .external_generation_job() + .by_external_generation_job_status_available() + .filter(&EXTERNAL_GENERATION_STATUS_RUNNING.to_string()) + { + if is_external_generation_job_claimable(&row, now) { + stats.expired_running_count = stats.expired_running_count.saturating_add(1); + record_external_generation_claimable_age(&mut stats, &row, now_micros); + } else { + stats.running_active_count = stats.running_active_count.saturating_add(1); + } + } + + stats.claimable_count = stats + .claimable_pending_count + .saturating_add(stats.expired_running_count); + Ok(stats) +} + pub(crate) fn validate_external_generation_job_lease_for_tx( ctx: &ReducerContext, job_id: &str, @@ -524,6 +615,22 @@ fn is_external_generation_job_claimable(row: &ExternalGenerationJob, now: Timest } } +fn record_external_generation_claimable_age( + stats: &mut ExternalGenerationQueueStatsSnapshot, + row: &ExternalGenerationJob, + now_micros: i64, +) { + let age = now_micros + .saturating_sub(row.available_at.to_micros_since_unix_epoch()) + .max(0); + stats.oldest_claimable_age_micros = Some( + stats + .oldest_claimable_age_micros + .map(|current| current.max(age)) + .unwrap_or(age), + ); +} + fn persist_external_generation_job_row(ctx: &ReducerContext, row: ExternalGenerationJob) { ctx.db .external_generation_job() @@ -725,6 +832,30 @@ mod tests { assert_ne!(first, second); } + #[test] + fn claimable_age_keeps_oldest_available_job() { + let mut stats = ExternalGenerationQueueStatsSnapshot { + pending_count: 0, + delayed_pending_count: 0, + claimable_pending_count: 0, + running_active_count: 0, + expired_running_count: 0, + terminal_count: 0, + claimable_count: 0, + oldest_claimable_age_micros: None, + now_micros: 10_000, + }; + let mut old_job = external_generation_job_fixture(EXTERNAL_GENERATION_STATUS_PENDING); + old_job.available_at = micros(1_000); + let mut newer_job = external_generation_job_fixture(EXTERNAL_GENERATION_STATUS_RUNNING); + newer_job.available_at = micros(8_000); + + record_external_generation_claimable_age(&mut stats, &newer_job, 10_000); + record_external_generation_claimable_age(&mut stats, &old_job, 10_000); + + assert_eq!(stats.oldest_claimable_age_micros, Some(9_000)); + } + #[test] fn positive_duration_between_client_times_is_preserved() { assert_eq!(