Files
Genarrative/scripts/spacetime-import-migration-json.mjs

196 lines
5.5 KiB
JavaScript

#!/usr/bin/env node
import { readFile } from 'node:fs/promises';
import path from 'node:path';
import {
assertReadableFile,
callSpacetimeProcedure,
callSpacetimeProcedureViaCli,
createSpacetimeWebIdentity,
ensureProcedureOk,
parseArgs,
} from './spacetime-migration-common.mjs';
try {
const options = parseArgs(process.argv.slice(2));
if (!options.in) {
throw new Error('必须传入 --in。');
}
if (options.incremental === true && options.replaceExisting === true) {
throw new Error('--incremental 不能和 --replace-existing 同时使用。');
}
const inPath = path.resolve(options.in);
await assertReadableFile(inPath);
const migrationJson = await readFile(inPath, 'utf8');
if (!migrationJson.trim()) {
throw new Error(`迁移文件为空: ${inPath}`);
}
const webOptions = await prepareWebImportOptions(options);
let result;
try {
result = await importMigrationJsonDirect(webOptions, migrationJson);
} finally {
await revokeTemporaryWebIdentity(webOptions);
}
ensureProcedureOk(result);
console.log(
`[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`,
);
printTableStats(result.table_stats);
printMigrationWarnings(result.warnings);
} catch (error) {
console.error(
`[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
async function prepareWebImportOptions(options) {
if (options.token) {
return { ...options, useHttp: true };
}
const identity = await createSpacetimeWebIdentity(options);
console.log(
`[spacetime:migration:import] 已通过 Web API 创建临时 identity: ${identity.identity}`,
);
const authorizeResult = await callSpacetimeProcedureViaCli(
options,
'authorize_database_migration_operator',
{
bootstrap_secret: options.bootstrapSecret || '',
operator_identity_hex: identity.identity,
note: options.note || 'temporary web api migration import',
},
);
ensureProcedureOk(authorizeResult);
console.log(`[spacetime:migration:import] 已授权临时 Web API identity`);
return {
...options,
token: identity.token,
temporaryWebIdentity: identity.identity,
useHttp: true,
};
}
async function importMigrationJsonDirect(options, migrationJson) {
const includeTables = resolveImportIncludeTables(options, migrationJson);
const procedureName =
options.incremental === true
? 'import_database_migration_incremental_from_file'
: 'import_database_migration_from_file';
const input = {
migration_json: migrationJson,
include_tables: includeTables,
replace_existing: options.replaceExisting === true,
dry_run: options.dryRun === true,
};
if (options.replaceExisting === true) {
console.log(
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
);
} else if (options.incremental === true) {
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
}
return callSpacetimeProcedure(options, procedureName, input);
}
function resolveImportIncludeTables(options, migrationJson) {
if (options.replaceExisting !== true) {
return options.includeTables;
}
const migrationTables = readMigrationTableNames(migrationJson);
if (options.includeTables.length === 0) {
return migrationTables;
}
const requestedTables = new Set(options.includeTables);
return migrationTables.filter((tableName) => requestedTables.has(tableName));
}
function readMigrationTableNames(migrationJson) {
let payload;
try {
payload = JSON.parse(migrationJson);
} catch (error) {
throw new Error(
`迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`,
);
}
if (!payload || !Array.isArray(payload.tables)) {
throw new Error('迁移文件缺少 tables 数组。');
}
const tableNames = [];
const seen = new Set();
for (const table of payload.tables) {
if (!table || typeof table.name !== 'string' || !table.name.trim()) {
throw new Error('迁移文件 tables 内存在缺少 name 的表项。');
}
const tableName = table.name.trim();
if (!seen.has(tableName)) {
tableNames.push(tableName);
seen.add(tableName);
}
}
return tableNames;
}
async function revokeTemporaryWebIdentity(options) {
if (!options.temporaryWebIdentity) {
return;
}
try {
const revokeResult = await callSpacetimeProcedure(
options,
'revoke_database_migration_operator',
{ operator_identity_hex: options.temporaryWebIdentity },
);
ensureProcedureOk(revokeResult);
console.log(`[spacetime:migration:import] 已撤销临时 Web API identity`);
} catch (error) {
console.warn(
`[spacetime:migration:import] 撤销临时 Web API identity 失败: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
function printTableStats(tableStats) {
if (!Array.isArray(tableStats) || tableStats.length === 0) {
return;
}
const rows = tableStats.map((stat) => ({
table: stat.table_name,
imported: stat.imported_row_count,
skipped: stat.skipped_row_count,
}));
console.table(rows);
}
function printMigrationWarnings(warnings) {
if (!Array.isArray(warnings) || warnings.length === 0) {
return;
}
console.warn('[spacetime:migration:import] 迁移告警汇总:');
console.table(
warnings.map((warning) => ({
table: warning.table_name,
kind: warning.warning_kind,
message: warning.message,
})),
);
}