Files
Genarrative/scripts/spacetime-import-migration-json.mjs
kdletters 1ccb8a710d
Some checks failed
CI / verify (push) Has been cancelled
Add migration token parameters to Jenkins deploy flows
2026-04-30 10:48:58 +08:00

204 lines
6.0 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env node
import { readFile } from 'node:fs/promises';
import path from 'node:path';
import {
assertReadableFile,
callSpacetimeProcedure,
callSpacetimeProcedureViaCli,
createSpacetimeWebIdentity,
ensureProcedureOk,
parseArgs,
} from './spacetime-migration-common.mjs';
try {
const options = parseArgs(process.argv.slice(2));
if (!options.in) {
throw new Error('必须传入 --in。');
}
if (options.incremental === true && options.replaceExisting === true) {
throw new Error('--incremental 不能和 --replace-existing 同时使用。');
}
const inPath = path.resolve(options.in);
await assertReadableFile(inPath);
const migrationJson = await readFile(inPath, 'utf8');
if (!migrationJson.trim()) {
throw new Error(`迁移文件为空: ${inPath}`);
}
const webOptions = await prepareWebImportOptions(options);
let result;
try {
result = await importMigrationJsonDirect(webOptions, migrationJson);
} finally {
await revokeTemporaryWebIdentity(webOptions);
}
ensureProcedureOk(result);
console.log(
`[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`,
);
printTableStats(result.table_stats);
printMigrationWarnings(result.warnings);
} catch (error) {
console.error(
`[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
async function prepareWebImportOptions(options) {
if (options.token) {
return { ...options, useHttp: true };
}
const identity = await createSpacetimeWebIdentity(options);
console.log(
`[spacetime:migration:import] 已通过 Web API 创建临时 identity: ${identity.identity}`,
);
try {
const authorizeResult = await callSpacetimeProcedureViaCli(
options,
'authorize_database_migration_operator',
{
bootstrap_secret: options.bootstrapSecret || '',
operator_identity_hex: identity.identity,
note: options.note || 'temporary web api migration import',
},
);
ensureProcedureOk(authorizeResult);
} catch (error) {
throw new Error(
`授权临时 Web API identity 失败。当前 spacetime CLI identity 必须已经是迁移操作员如果目标库迁移操作员表不为空bootstrap secret 不会越权授权新的操作员。可先用已有迁移操作员授权当前部署机 identity或为导入脚本提供已有迁移操作员的 --token。原始错误: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
console.log(`[spacetime:migration:import] 已授权临时 Web API identity`);
return {
...options,
token: identity.token,
temporaryWebIdentity: identity.identity,
useHttp: true,
};
}
async function importMigrationJsonDirect(options, migrationJson) {
const includeTables = resolveImportIncludeTables(options, migrationJson);
const procedureName =
options.incremental === true
? 'import_database_migration_incremental_from_file'
: 'import_database_migration_from_file';
const input = {
migration_json: migrationJson,
include_tables: includeTables,
replace_existing: options.replaceExisting === true,
dry_run: options.dryRun === true,
};
if (options.replaceExisting === true) {
console.log(
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
);
} else if (options.incremental === true) {
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
}
return callSpacetimeProcedure(options, procedureName, input);
}
function resolveImportIncludeTables(options, migrationJson) {
if (options.replaceExisting !== true) {
return options.includeTables;
}
const migrationTables = readMigrationTableNames(migrationJson);
if (options.includeTables.length === 0) {
return migrationTables;
}
const requestedTables = new Set(options.includeTables);
return migrationTables.filter((tableName) => requestedTables.has(tableName));
}
function readMigrationTableNames(migrationJson) {
let payload;
try {
payload = JSON.parse(migrationJson);
} catch (error) {
throw new Error(
`迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`,
);
}
if (!payload || !Array.isArray(payload.tables)) {
throw new Error('迁移文件缺少 tables 数组。');
}
const tableNames = [];
const seen = new Set();
for (const table of payload.tables) {
if (!table || typeof table.name !== 'string' || !table.name.trim()) {
throw new Error('迁移文件 tables 内存在缺少 name 的表项。');
}
const tableName = table.name.trim();
if (!seen.has(tableName)) {
tableNames.push(tableName);
seen.add(tableName);
}
}
return tableNames;
}
async function revokeTemporaryWebIdentity(options) {
if (!options.temporaryWebIdentity) {
return;
}
try {
const revokeResult = await callSpacetimeProcedure(
options,
'revoke_database_migration_operator',
{ operator_identity_hex: options.temporaryWebIdentity },
);
ensureProcedureOk(revokeResult);
console.log(`[spacetime:migration:import] 已撤销临时 Web API identity`);
} catch (error) {
console.warn(
`[spacetime:migration:import] 撤销临时 Web API identity 失败: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
function printTableStats(tableStats) {
if (!Array.isArray(tableStats) || tableStats.length === 0) {
return;
}
const rows = tableStats.map((stat) => ({
table: stat.table_name,
imported: stat.imported_row_count,
skipped: stat.skipped_row_count,
}));
console.table(rows);
}
function printMigrationWarnings(warnings) {
if (!Array.isArray(warnings) || warnings.length === 0) {
return;
}
console.warn('[spacetime:migration:import] 迁移告警汇总:');
console.table(
warnings.map((warning) => ({
table: warning.table_name,
kind: warning.warning_kind,
message: warning.message,
})),
);
}