#!/usr/bin/env node import { readFile } from 'node:fs/promises'; import path from 'node:path'; import { assertReadableFile, callSpacetimeProcedure, callSpacetimeProcedureViaCli, createSpacetimeWebIdentity, ensureProcedureOk, parseArgs, } from './spacetime-migration-common.mjs'; try { const options = parseArgs(process.argv.slice(2)); if (!options.in) { throw new Error('必须传入 --in。'); } if (options.incremental === true && options.replaceExisting === true) { throw new Error('--incremental 不能和 --replace-existing 同时使用。'); } const inPath = path.resolve(options.in); await assertReadableFile(inPath); const migrationJson = await readFile(inPath, 'utf8'); if (!migrationJson.trim()) { throw new Error(`迁移文件为空: ${inPath}`); } const webOptions = await prepareWebImportOptions(options); let result; try { result = await importMigrationJsonDirect(webOptions, migrationJson); } finally { await revokeTemporaryWebIdentity(webOptions); } ensureProcedureOk(result); console.log( `[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`, ); printTableStats(result.table_stats); printMigrationWarnings(result.warnings); } catch (error) { console.error( `[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } async function prepareWebImportOptions(options) { if (options.token) { return { ...options, useHttp: true }; } const identity = await createSpacetimeWebIdentity(options); console.log( `[spacetime:migration:import] 已通过 Web API 创建临时 identity: ${identity.identity}`, ); const authorizeResult = await callSpacetimeProcedureViaCli( options, 'authorize_database_migration_operator', { bootstrap_secret: options.bootstrapSecret || '', operator_identity_hex: identity.identity, note: options.note || 'temporary web api migration import', }, ); ensureProcedureOk(authorizeResult); console.log(`[spacetime:migration:import] 已授权临时 Web API identity`); return { ...options, token: identity.token, temporaryWebIdentity: identity.identity, useHttp: true, }; } async function importMigrationJsonDirect(options, migrationJson) { const includeTables = resolveImportIncludeTables(options, migrationJson); const procedureName = options.incremental === true ? 'import_database_migration_incremental_from_file' : 'import_database_migration_from_file'; const input = { migration_json: migrationJson, include_tables: includeTables, replace_existing: options.replaceExisting === true, dry_run: options.dryRun === true, }; if (options.replaceExisting === true) { console.log( `[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`, ); } else if (options.incremental === true) { console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`); } return callSpacetimeProcedure(options, procedureName, input); } function resolveImportIncludeTables(options, migrationJson) { if (options.replaceExisting !== true) { return options.includeTables; } const migrationTables = readMigrationTableNames(migrationJson); if (options.includeTables.length === 0) { return migrationTables; } const requestedTables = new Set(options.includeTables); return migrationTables.filter((tableName) => requestedTables.has(tableName)); } function readMigrationTableNames(migrationJson) { let payload; try { payload = JSON.parse(migrationJson); } catch (error) { throw new Error( `迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`, ); } if (!payload || !Array.isArray(payload.tables)) { throw new Error('迁移文件缺少 tables 数组。'); } const tableNames = []; const seen = new Set(); for (const table of payload.tables) { if (!table || typeof table.name !== 'string' || !table.name.trim()) { throw new Error('迁移文件 tables 内存在缺少 name 的表项。'); } const tableName = table.name.trim(); if (!seen.has(tableName)) { tableNames.push(tableName); seen.add(tableName); } } return tableNames; } async function revokeTemporaryWebIdentity(options) { if (!options.temporaryWebIdentity) { return; } try { const revokeResult = await callSpacetimeProcedure( options, 'revoke_database_migration_operator', { operator_identity_hex: options.temporaryWebIdentity }, ); ensureProcedureOk(revokeResult); console.log(`[spacetime:migration:import] 已撤销临时 Web API identity`); } catch (error) { console.warn( `[spacetime:migration:import] 撤销临时 Web API identity 失败: ${ error instanceof Error ? error.message : String(error) }`, ); } } function printTableStats(tableStats) { if (!Array.isArray(tableStats) || tableStats.length === 0) { return; } const rows = tableStats.map((stat) => ({ table: stat.table_name, imported: stat.imported_row_count, skipped: stat.skipped_row_count, })); console.table(rows); } function printMigrationWarnings(warnings) { if (!Array.isArray(warnings) || warnings.length === 0) { return; } console.warn('[spacetime:migration:import] 迁移告警汇总:'); console.table( warnings.map((warning) => ({ table: warning.table_name, kind: warning.warning_kind, message: warning.message, })), ); }