Files
Genarrative/scripts/container-worker-smoke.mjs
kdletters 4a6c126366 完善外部生成Worker动态扩缩容
新增外部生成controller进程角色与systemd服务

补齐队列统计procedure与spacetime-client绑定

更新生产部署脚本、健康巡检和server provision的worker/controller口径

新增容器worker smoke脚本并同步运维文档与团队记忆
2026-06-12 15:21:35 +08:00

840 lines
26 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import {spawn} from 'node:child_process';
import {
chmodSync,
copyFileSync,
existsSync,
mkdirSync,
readFileSync,
writeFileSync,
} from 'node:fs';
import net from 'node:net';
import path from 'node:path';
const [, , rawCommand = 'help', ...rawArgs] = process.argv;
const projectRoot = process.cwd();
const composeFile = path.join('deploy', 'container', 'docker-compose.loadtest.yml');
const smokeDir = path.join('deploy', 'container', 'worker-smoke');
const envPath = path.join(smokeDir, 'api-server.env');
const statePath = path.join(smokeDir, 'state.json');
const localImageDir = path.join(smokeDir, 'image');
const localImageDockerfilePath = path.join(localImageDir, 'Dockerfile.local');
const localImageBinaryPath = path.join(localImageDir, 'api-server');
const localCargoTargetDir = path.join('server-rs', 'target-worker-smoke');
const localSpacetimeImageDir = path.join(smokeDir, 'spacetimedb-image');
const localSpacetimeDockerfilePath = path.join(localSpacetimeImageDir, 'Dockerfile.local');
const localSpacetimeBinaryPath = path.join(localSpacetimeImageDir, 'spacetime');
const localSpacetimeStandalonePath = path.join(
localSpacetimeImageDir,
'spacetimedb-standalone',
);
const projectName = process.env.GENARRATIVE_WORKER_SMOKE_PROJECT || 'genarrative-worker-smoke';
const defaultDatabase =
process.env.GENARRATIVE_WORKER_SMOKE_DATABASE || 'genarrative-worker-smoke';
const command = rawCommand.trim();
const supportedCommands = new Set([
'help',
'init',
'build',
'up-spacetime',
'publish',
'up',
'enqueue',
'status',
'api-update',
'scale',
'logs',
'ps',
'down',
'smoke',
]);
if (!supportedCommands.has(command)) {
printHelp(true);
process.exit(1);
}
try {
await main();
} catch (error) {
console.error(`[worker-smoke] ${error.message}`);
process.exit(1);
}
async function main() {
switch (command) {
case 'help':
printHelp(false);
return;
case 'init':
await ensureStateAndEnv({force: rawArgs.includes('--force')});
return;
case 'build':
await ensureStateAndEnv();
await buildRuntimeImages();
return;
case 'up-spacetime':
await ensureStateAndEnv();
await ensureSpacetimeImage();
await dockerCompose(['up', '-d', 'spacetimedb', 'otelcol']);
await waitForSpacetime();
return;
case 'publish':
await ensureStateAndEnv();
await publishModule();
return;
case 'up':
await ensureStateAndEnv();
await upRuntime();
await waitForApi();
return;
case 'enqueue':
await ensureStateAndEnv();
await enqueueSmokeJob();
return;
case 'status':
await ensureStateAndEnv();
await printQueueStatus();
return;
case 'api-update':
await ensureStateAndEnv();
await apiOnlyUpdate({build: rawArgs.includes('--build')});
return;
case 'scale':
await ensureStateAndEnv();
await scaleWorkers(rawArgs[0] ?? '1');
return;
case 'logs':
await ensureStateAndEnv();
await dockerCompose(['logs', ...rawArgs]);
return;
case 'ps':
await ensureStateAndEnv();
await dockerCompose(['ps', ...rawArgs]);
return;
case 'down':
await ensureStateAndEnv({create: false});
await dockerCompose(['down', ...rawArgs]);
return;
case 'smoke':
await runSmoke();
return;
default:
throw new Error(`未知命令: ${command}`);
}
}
async function runSmoke() {
if (rawArgs.includes('--force')) {
await ensureStateAndEnv();
await dockerComposeCapture(['down', '-v'], {allowFailure: true});
}
const state = await ensureStateAndEnv({force: rawArgs.includes('--force')});
await assertSavedPortsAvailableForNewProject(state);
console.log(
`[worker-smoke] 使用隔离环境 project=${projectName} database=${state.database}`,
);
await buildRuntimeImages();
await ensureSpacetimeImage();
await dockerCompose(['up', '-d', 'spacetimedb', 'otelcol']);
await waitForSpacetime();
await publishModule();
await upRuntime();
await waitForApi();
await assertWorkersRunning();
const beforeWorkerIds = await getContainerIds('external-generation-worker');
console.log(`[worker-smoke] worker 容器: ${beforeWorkerIds.join(', ')}`);
const firstJobId = await enqueueSmokeJob({label: 'before-api-update'});
await waitForJobConsumed(firstJobId);
await apiOnlyUpdate({build: false});
const afterWorkerIds = await getContainerIds('external-generation-worker');
if (beforeWorkerIds.join('\n') !== afterWorkerIds.join('\n')) {
throw new Error(
`api-update 后 worker 容器发生变化: before=${beforeWorkerIds.join(',')} after=${afterWorkerIds.join(',')}`,
);
}
console.log('[worker-smoke] api-only 更新未重建 worker 容器。');
const secondJobId = await enqueueSmokeJob({label: 'after-api-update'});
await waitForJobConsumed(secondJobId);
await printQueueStatus();
console.log('[worker-smoke] smoke 通过worker 独立消费队列API-only 更新未停止 worker。');
}
async function buildRuntimeImages() {
const imageMode = resolveImageMode();
if (imageMode === 'local-binary') {
await buildLocalBinaryRuntimeImages();
return;
}
await dockerCompose(['build', 'api-server', 'external-generation-worker']);
}
function resolveImageMode() {
if (rawArgs.includes('--local-binary')) {
return 'local-binary';
}
const envMode = process.env.GENARRATIVE_WORKER_SMOKE_IMAGE_MODE;
if (!envMode || envMode === 'dockerfile') {
return 'dockerfile';
}
if (envMode === 'local-binary') {
return 'local-binary';
}
throw new Error(
`GENARRATIVE_WORKER_SMOKE_IMAGE_MODE 仅支持 dockerfile 或 local-binary: ${envMode}`,
);
}
async function buildLocalBinaryRuntimeImages() {
const profile =
rawArgs.includes('--release') ||
process.env.GENARRATIVE_WORKER_SMOKE_CARGO_PROFILE === 'release'
? 'release'
: 'debug';
const buildArgs = ['build', '-p', 'api-server', '--manifest-path', 'server-rs/Cargo.toml'];
if (profile === 'release') {
buildArgs.push('--release');
}
const cargoImage = resolveLocalBinaryCargoImage();
const cargoHome = resolveLocalBinaryCargoHome();
mkdirSync(cargoHome, {recursive: true});
console.log(
`[worker-smoke] 使用 ${cargoImage} 复用本机 Cargo 缓存构建 ${profile} api-server 二进制。`,
);
await run('docker', [
'run',
'--rm',
'-u',
currentUserSpec(),
'-v',
`${projectRoot}:/workspace`,
'-v',
`${cargoHome}:/cargo-home`,
'-w',
'/workspace',
'-e',
'HOME=/cargo-home',
'-e',
'CARGO_HOME=/cargo-home',
'-e',
`CARGO_TARGET_DIR=/workspace/${toContainerPath(localCargoTargetDir)}`,
cargoImage,
'cargo',
'--config',
'build.rustc-wrapper=""',
'--config',
'target.x86_64-unknown-linux-gnu.linker="cc"',
'--config',
'target.x86_64-unknown-linux-gnu.rustflags=[]',
...buildArgs,
]);
const sourceBinaryPath = path.join(localCargoTargetDir, profile, 'api-server');
if (!existsSync(sourceBinaryPath)) {
throw new Error(`未找到 worker smoke api-server 二进制: ${sourceBinaryPath}`);
}
mkdirSync(localImageDir, {recursive: true});
copyFileSync(sourceBinaryPath, localImageBinaryPath);
chmodSync(localImageBinaryPath, 0o755);
const baseImage = await resolveLocalBinaryBaseImage();
writeFileSync(localImageDockerfilePath, buildLocalBinaryDockerfile(baseImage), 'utf8');
await run('docker', [
'build',
'-f',
localImageDockerfilePath,
'-t',
`${projectName}-api-server`,
'-t',
`${projectName}-external-generation-worker`,
localImageDir,
]);
}
function resolveLocalBinaryCargoImage() {
return process.env.GENARRATIVE_WORKER_SMOKE_CARGO_IMAGE || 'rust:1.93-bookworm';
}
function resolveLocalBinaryCargoHome() {
if (process.env.GENARRATIVE_WORKER_SMOKE_CARGO_HOME) {
return path.resolve(process.env.GENARRATIVE_WORKER_SMOKE_CARGO_HOME);
}
if (!process.env.HOME) {
throw new Error('未找到 HOME无法挂载本机 Cargo 缓存。');
}
return path.join(process.env.HOME, '.cargo');
}
function currentUserSpec() {
if (typeof process.getuid === 'function' && typeof process.getgid === 'function') {
return `${process.getuid()}:${process.getgid()}`;
}
return '0:0';
}
async function ensureSpacetimeImage() {
if (process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_IMAGE_MODE === 'official') {
return;
}
const imageName = localSpacetimeImageName();
const existingImage = await runCapture('docker', ['image', 'inspect', imageName], {
allowFailure: true,
quiet: true,
});
if (existingImage.code === 0 && !rawArgs.includes('--force')) {
return;
}
const spacetimePath = await resolveSpacetimeBinaryPath();
if (!spacetimePath) {
throw new Error('未找到本机 spacetime CLI无法构建隔离 SpacetimeDB 镜像。');
}
mkdirSync(localSpacetimeImageDir, {recursive: true});
copyFileSync(spacetimePath, localSpacetimeBinaryPath);
chmodSync(localSpacetimeBinaryPath, 0o755);
const standalonePath = path.join(path.dirname(spacetimePath), 'spacetimedb-standalone');
if (!existsSync(standalonePath)) {
throw new Error(`未找到本机 spacetimedb-standalone: ${standalonePath}`);
}
copyFileSync(standalonePath, localSpacetimeStandalonePath);
chmodSync(localSpacetimeStandalonePath, 0o755);
writeFileSync(localSpacetimeDockerfilePath, buildLocalSpacetimeDockerfile(), 'utf8');
console.log(`[worker-smoke] 使用本机 spacetime CLI 构建隔离镜像: ${imageName}`);
await run('docker', [
'build',
'-f',
localSpacetimeDockerfilePath,
'-t',
imageName,
localSpacetimeImageDir,
]);
}
function buildLocalSpacetimeDockerfile() {
return `FROM debian:bookworm-slim
WORKDIR /var/lib/spacetimedb
RUN apt-get update && \\
apt-get install -y --no-install-recommends ca-certificates libstdc++6 zlib1g && \\
rm -rf /var/lib/apt/lists/*
COPY spacetime /usr/local/bin/spacetime
COPY spacetimedb-standalone /usr/local/bin/spacetimedb-standalone
RUN chmod 0755 /usr/local/bin/spacetime /usr/local/bin/spacetimedb-standalone
ENTRYPOINT ["spacetime"]
`;
}
async function resolveSpacetimeBinaryPath() {
if (process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_BIN) {
return process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_BIN;
}
const versionResult = await runCapture('spacetime', ['--version'], {quiet: true});
const pathMatch = versionResult.stdout.match(/^spacetime Path:\s*(.+)$/mu);
if (pathMatch?.[1]) {
return pathMatch[1].trim();
}
const whichResult = await runCapture('which', ['spacetime'], {quiet: true});
return whichResult.stdout.trim();
}
async function resolveLocalBinaryBaseImage() {
if (process.env.GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE) {
return process.env.GENARRATIVE_WORKER_SMOKE_LOCAL_BASE_IMAGE;
}
return 'debian:bookworm-slim';
}
function buildLocalBinaryDockerfile(baseImage) {
return `FROM ${baseImage}
WORKDIR /srv/genarrative
RUN apt-get update && \\
apt-get install -y --no-install-recommends ca-certificates curl libssl3 zlib1g libzstd1 && \\
rm -rf /var/lib/apt/lists/* && \\
(id -u genarrative >/dev/null 2>&1 || useradd --system --create-home --home-dir /srv/genarrative --shell /usr/sbin/nologin genarrative)
COPY api-server /usr/local/bin/api-server
RUN chmod 0755 /usr/local/bin/api-server && \\
mkdir -p /var/lib/genarrative/auth /var/lib/genarrative/tracking-outbox && \\
chown -R genarrative:genarrative /srv/genarrative /var/lib/genarrative
USER genarrative
EXPOSE 8082
ENV GENARRATIVE_ENV=container \\
GENARRATIVE_API_HOST=0.0.0.0 \\
GENARRATIVE_API_PORT=8082 \\
GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox
CMD ["api-server"]
`;
}
function toContainerPath(localPath) {
return localPath.split(path.sep).join('/');
}
async function upRuntime() {
const services = ['api-server', 'external-generation-worker'];
if (rawArgs.includes('--with-nginx')) {
services.push('nginx');
}
await dockerCompose(['up', '-d', ...services]);
}
async function ensureStateAndEnv(options = {}) {
const {force = false, create = true} = options;
if (!create && !existsSync(statePath)) {
return defaultState();
}
mkdirSync(smokeDir, {recursive: true});
if (!existsSync(statePath) || force) {
const state = {
database: defaultDatabase,
spacetimePort: await findAvailablePort(
Number(process.env.GENARRATIVE_WORKER_SMOKE_SPACETIME_PORT || 19101),
),
httpPort: await findAvailablePort(
Number(process.env.GENARRATIVE_WORKER_SMOKE_HTTP_PORT || 19080),
),
otlpGrpcPort: await findAvailablePort(
Number(process.env.GENARRATIVE_WORKER_SMOKE_OTLP_GRPC_PORT || 15317),
),
otlpHttpPort: await findAvailablePort(
Number(process.env.GENARRATIVE_WORKER_SMOKE_OTLP_HTTP_PORT || 15318),
),
createdAt: new Date().toISOString(),
};
writeFileSync(statePath, `${JSON.stringify(state, null, 2)}\n`, 'utf8');
}
const state = readState();
if (!existsSync(envPath) || force) {
writeFileSync(envPath, buildSmokeEnv(state), 'utf8');
}
console.log(`[worker-smoke] env=${envPath}`);
console.log(`[worker-smoke] state=${statePath}`);
console.log(`[worker-smoke] SpacetimeDB=http://127.0.0.1:${state.spacetimePort}`);
console.log(`[worker-smoke] Nginx=http://127.0.0.1:${state.httpPort}`);
return state;
}
function buildSmokeEnv(state) {
return `# 本文件由 scripts/container-worker-smoke.mjs 生成,仅用于本机隔离 worker smoke。
# 不要在这里写真实生产密钥;目录 deploy/container/worker-smoke/ 已被 gitignore。
GENARRATIVE_ENV=container-worker-smoke
GENARRATIVE_API_HOST=0.0.0.0
GENARRATIVE_API_PORT=8082
GENARRATIVE_API_LOG=info,tower_http=info
GENARRATIVE_API_LISTEN_BACKLOG=256
GENARRATIVE_API_WORKER_THREADS=2
GENARRATIVE_PROCESS_ROLE=api
GENARRATIVE_EXTERNAL_GENERATION_MODE=queue
GENARRATIVE_EXTERNAL_GENERATION_WORKER_ID=
GENARRATIVE_EXTERNAL_GENERATION_WORKER_CONCURRENCY=1
GENARRATIVE_EXTERNAL_GENERATION_WORKER_POLL_INTERVAL_MS=500
GENARRATIVE_EXTERNAL_GENERATION_WORKER_LEASE_SECONDS=60
GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=64
GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS=32
GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS=16
GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS=8
GENARRATIVE_TRACKING_OUTBOX_ENABLED=false
GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox
GENARRATIVE_OTEL_ENABLED=false
OTEL_SERVICE_NAME=genarrative-worker-smoke-api
OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4318
OTEL_RESOURCE_ATTRIBUTES=deployment.environment=worker-smoke,service.namespace=genarrative
GENARRATIVE_INTERNAL_API_SECRET=worker-smoke-internal-secret
GENARRATIVE_JWT_ISSUER=genarrative-worker-smoke
GENARRATIVE_JWT_SECRET=worker-smoke-jwt-secret
AUTH_REFRESH_COOKIE_SECURE=false
GENARRATIVE_DEV_PASSWORD_ENTRY_AUTO_REGISTER_ENABLED=true
GENARRATIVE_SPACETIME_SERVER_URL=http://spacetimedb:3101
GENARRATIVE_SPACETIME_DATABASE=${state.database}
GENARRATIVE_SPACETIME_TOKEN=
GENARRATIVE_SPACETIME_POOL_SIZE=2
GENARRATIVE_SPACETIME_PROCEDURE_TIMEOUT_SECONDS=15
GENARRATIVE_LLM_PROVIDER=openai-compatible
GENARRATIVE_LLM_BASE_URL=
GENARRATIVE_LLM_API_KEY=
GENARRATIVE_LLM_MODEL=
VECTOR_ENGINE_BASE_URL=
VECTOR_ENGINE_API_KEY=
ALIYUN_OSS_BUCKET=
ALIYUN_OSS_ENDPOINT=oss-cn-shanghai.aliyuncs.com
ALIYUN_OSS_ACCESS_KEY_ID=
ALIYUN_OSS_ACCESS_KEY_SECRET=
WECHAT_MINIPROGRAM_MESSAGE_TOKEN=
WECHAT_MINIPROGRAM_MESSAGE_ENCODING_AES_KEY=
`;
}
function defaultState() {
return {
database: defaultDatabase,
spacetimePort: 19101,
httpPort: 19080,
otlpGrpcPort: 15317,
otlpHttpPort: 15318,
};
}
function readState() {
if (!existsSync(statePath)) {
return defaultState();
}
return JSON.parse(readFileSync(statePath, 'utf8'));
}
async function findAvailablePort(startPort) {
for (let port = startPort; port < startPort + 100; port += 1) {
if (await isPortAvailable(port)) {
return port;
}
}
throw new Error(`未找到可用端口: ${startPort}-${startPort + 99}`);
}
function isPortAvailable(port) {
return new Promise((resolve) => {
const server = net.createServer();
server.once('error', () => resolve(false));
server.once('listening', () => {
server.close(() => resolve(true));
});
server.listen(port, '127.0.0.1');
});
}
async function publishModule() {
const state = readState();
const serverUrl = spacetimeServerUrl(state);
const publishArgs = [
'publish',
state.database,
'--server',
serverUrl,
'--module-path',
'server-rs/crates/spacetime-module',
'--delete-data=on-conflict',
'--anonymous',
'--yes=all',
'--no-config',
];
const buildOptions = process.env.GENARRATIVE_WORKER_SMOKE_STDB_BUILD_OPTIONS;
if (buildOptions) {
publishArgs.push('--build-options', buildOptions);
}
await run('spacetime', publishArgs);
}
async function enqueueSmokeJob(options = {}) {
if (!rawArgs.includes('--no-worker-check')) {
await assertWorkersRunning();
}
const state = readState();
const nowMicros = Date.now() * 1000;
const suffix = `${Date.now()}-${Math.random().toString(16).slice(2, 8)}`;
const jobId = `extgen-smoke-${suffix}`;
const label = options.label || rawArgs[0] || 'manual';
const input = {
job_id: jobId,
dedupe_key: `worker-smoke:${label}:${suffix}`,
job_kind: 'worker_smoke_unsupported',
owner_user_id: 'worker-smoke-user',
source_module: 'worker-smoke',
source_entity_id: `worker-smoke-entity-${suffix}`,
request_label: `worker-smoke ${label}`,
request_payload_json: JSON.stringify({label, suffix}),
max_attempts: 1,
available_at_micros: nowMicros,
created_at_micros: nowMicros,
};
await run('spacetime', [
'call',
'--server',
spacetimeServerUrl(state),
'--anonymous',
'--yes',
'--no-config',
state.database,
'enqueue_external_generation_job_and_return',
JSON.stringify(input),
]);
console.log(`[worker-smoke] 已入队测试 job: ${jobId}`);
return jobId;
}
async function printQueueStatus() {
console.log('[worker-smoke] external_generation_job 是 private tablestatus 显示最近 worker 日志:');
await printServiceLogs('external-generation-worker', 120);
}
async function waitForJobConsumed(jobId) {
const deadline = Date.now() + 60_000;
let lastOutput = '';
while (Date.now() < deadline) {
const result = await dockerComposeCapture(
['logs', '--no-color', 'external-generation-worker'],
{allowFailure: true, quiet: true},
);
lastOutput = `${result.stdout}\n${result.stderr}`;
if (lastOutput.includes(jobId) && lastOutput.includes('暂不支持的任务类型')) {
console.log(`[worker-smoke] job ${jobId} 已被 worker 领取并执行到 unsupported 分支。`);
return;
}
await sleep(1000);
}
await printServiceLogs('external-generation-worker', 120);
throw new Error(`等待 worker 消费 job ${jobId} 超时,最后输出:\n${lastOutput}`);
}
async function assertSavedPortsAvailableForNewProject(state) {
const existingContainers = await getProjectContainerIds();
if (existingContainers.length > 0) {
return;
}
const ports = [
['SpacetimeDB', state.spacetimePort],
['Nginx', state.httpPort],
['OTLP gRPC', state.otlpGrpcPort],
['OTLP HTTP', state.otlpHttpPort],
];
for (const [label, port] of ports) {
if (!(await isPortAvailable(port))) {
throw new Error(
`${label} 端口 ${port} 已被占用;可执行 npm run container:worker-smoke -- smoke --force 重新分配隔离端口。`,
);
}
}
}
async function getProjectContainerIds() {
const result = await dockerComposeCapture(['ps', '-q'], {
allowFailure: true,
quiet: true,
});
if (result.code !== 0) {
return [];
}
return result.stdout
.split(/\r?\n/u)
.map((line) => line.trim())
.filter(Boolean);
}
async function assertWorkersRunning() {
const result = await dockerComposeCapture(
['ps', '--status', 'running', '-q', 'external-generation-worker'],
{allowFailure: true, quiet: true},
);
const workerIds = result.stdout
.split(/\r?\n/u)
.map((line) => line.trim())
.filter(Boolean);
if (result.code === 0 && workerIds.length > 0) {
return;
}
await printServiceLogs('external-generation-worker', 80);
throw new Error('external-generation-worker 未处于 running 状态,已输出最近日志。');
}
async function printServiceLogs(service, tail = 80) {
await dockerComposeCapture(['logs', '--tail', String(tail), service], {
allowFailure: true,
});
}
async function waitForSpacetime() {
const state = readState();
const url = `${spacetimeServerUrl(state)}/v1/ping`;
await waitForHttp(url, 'SpacetimeDB');
}
async function waitForApi() {
const deadline = Date.now() + 120_000;
while (Date.now() < deadline) {
const result = await dockerComposeCapture(
['exec', '-T', 'api-server', 'curl', '-fsS', 'http://127.0.0.1:8082/healthz'],
{allowFailure: true, quiet: true},
);
if (result.code === 0) {
console.log('[worker-smoke] api-server 已就绪: api-server:8082/healthz');
return;
}
await sleep(2000);
}
throw new Error('api-server 等待超时: api-server:8082/healthz');
}
async function waitForHttp(url, label) {
const deadline = Date.now() + 120_000;
while (Date.now() < deadline) {
const result = await runCapture('curl', ['-fsS', '--max-time', '3', url], {
allowFailure: true,
});
if (result.code === 0) {
console.log(`[worker-smoke] ${label} 已就绪: ${url}`);
return;
}
await sleep(2000);
}
throw new Error(`${label} 等待超时: ${url}`);
}
async function apiOnlyUpdate({build}) {
const beforeWorkerIds = await getContainerIds('external-generation-worker');
const args = ['up', '-d', '--no-deps', '--force-recreate'];
if (build) {
args.push('--build');
}
args.push('api-server');
await dockerCompose(args);
await waitForApi();
const afterWorkerIds = await getContainerIds('external-generation-worker');
if (beforeWorkerIds.join('\n') !== afterWorkerIds.join('\n')) {
throw new Error('API-only 更新不应重建 external-generation-worker 容器');
}
console.log('[worker-smoke] API-only 更新完成worker 容器保持不变。');
}
async function scaleWorkers(rawCount) {
const count = Number.parseInt(rawCount, 10);
if (!Number.isInteger(count) || count < 0 || count > 16) {
throw new Error(`worker 数量必须是 0-16 的整数: ${rawCount}`);
}
await dockerCompose([
'up',
'-d',
'--scale',
`external-generation-worker=${count}`,
'external-generation-worker',
]);
}
async function getContainerIds(service) {
const result = await dockerComposeCapture(['ps', '-q', service]);
return result.stdout
.split(/\r?\n/u)
.map((line) => line.trim())
.filter(Boolean)
.sort();
}
async function dockerCompose(args) {
await run('docker', composeArgs(args), {env: composeEnv()});
}
async function dockerComposeCapture(args, options = {}) {
return runCapture('docker', composeArgs(args), {
env: composeEnv(),
...options,
});
}
function composeArgs(args) {
return ['compose', '-p', projectName, '-f', composeFile, ...args];
}
function composeEnv() {
const state = readState();
return {
...process.env,
GENARRATIVE_CONTAINER_API_ENV_FILE: './worker-smoke/api-server.env',
GENARRATIVE_CONTAINER_SPACETIME_IMAGE:
process.env.GENARRATIVE_CONTAINER_SPACETIME_IMAGE || localSpacetimeImageName(),
GENARRATIVE_CONTAINER_SPACETIME_PORT: String(state.spacetimePort),
GENARRATIVE_CONTAINER_HTTP_PORT: String(state.httpPort),
GENARRATIVE_CONTAINER_OTLP_GRPC_PORT: String(state.otlpGrpcPort),
GENARRATIVE_CONTAINER_OTLP_HTTP_PORT: String(state.otlpHttpPort),
};
}
function localSpacetimeImageName() {
return `${projectName}-spacetimedb:2.4.1`;
}
function spacetimeServerUrl(state) {
return `http://127.0.0.1:${state.spacetimePort}`;
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function run(commandName, args, options = {}) {
const result = await runCapture(commandName, args, options);
if (result.code !== 0 && !options.allowFailure) {
throw new Error(`${commandName} ${args.join(' ')} 失败exit=${result.code}`);
}
return result;
}
function runCapture(commandName, args, options = {}) {
return new Promise((resolve, reject) => {
const child = spawn(commandName, args, {
cwd: projectRoot,
env: options.env ?? process.env,
shell: false,
});
let stdout = '';
let stderr = '';
child.stdout?.on('data', (chunk) => {
const text = chunk.toString();
stdout += text;
if (!options.quiet) {
process.stdout.write(text);
}
});
child.stderr?.on('data', (chunk) => {
const text = chunk.toString();
stderr += text;
if (!options.quiet) {
process.stderr.write(text);
}
});
child.on('error', reject);
child.on('exit', (code, signal) => {
if (signal) {
reject(new Error(`${commandName} 被信号终止: ${signal}`));
return;
}
resolve({code: code ?? 0, stdout, stderr});
});
});
}
function printHelp(isError) {
const output = isError ? console.error : console.log;
output(`Usage: npm run container:worker-smoke -- <command>
Commands:
init [--force] 生成隔离 env 与端口 state
build [--local-binary] [--release]
构建 api-server / worker 镜像;--local-binary 让容器内 Cargo 复用本机缓存
up-spacetime 启动隔离 SpacetimeDB 与 otelcol
publish 向隔离 SpacetimeDB 发布 spacetime-module
up [--with-nginx] 启动 api-server / worker需要 Nginx 时显式加 --with-nginx
enqueue [label] [--no-worker-check]
写入一个 unsupported 测试 job验证 worker claim/fail
status 查看最近 worker 日志external_generation_job 是 private table
api-update [--build] 仅重建/重启 api-server不触碰 worker
scale <n> 调整 external-generation-worker 实例数
ps 查看隔离 compose 状态
logs [service] 查看隔离 compose 日志
down [-v] 停止隔离 compose-v 会清理数据卷
smoke [--force] [--local-binary] [--release]
一键执行 build -> publish -> up -> enqueue -> api-update -> enqueue
`);
}