#!/usr/bin/env node import { randomUUID } from 'node:crypto'; import { readFile } from 'node:fs/promises'; import path from 'node:path'; import { assertReadableFile, callSpacetimeProcedure, callSpacetimeProcedureViaCli, createSpacetimeWebIdentity, ensureProcedureOk, parseArgs, } from './spacetime-migration-common.mjs'; const DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE = 512 * 1024; try { const options = parseArgs(process.argv.slice(2)); if (!options.in) { throw new Error('必须传入 --in。'); } if (options.incremental === true && options.replaceExisting === true) { throw new Error('--incremental 不能和 --replace-existing 同时使用。'); } const inPath = path.resolve(options.in); await assertReadableFile(inPath); const migrationJson = await readFile(inPath, 'utf8'); if (!migrationJson.trim()) { throw new Error(`迁移文件为空: ${inPath}`); } const webOptions = await prepareWebImportOptions(options); let result; try { result = await importMigrationJsonWithFallback(webOptions, migrationJson); } finally { await revokeTemporaryWebIdentity(webOptions); } ensureProcedureOk(result); console.log( `[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`, ); printTableStats(result.table_stats); printMigrationWarnings(result.warnings); } catch (error) { console.error( `[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } async function prepareWebImportOptions(options) { if (options.token) { return { ...options, useHttp: true }; } const identity = await createSpacetimeWebIdentity(options); console.log( `[spacetime:migration:import] 已通过 Web API 创建临时 identity: ${identity.identity}`, ); try { const authorizeResult = await callSpacetimeProcedureViaCli( options, 'authorize_database_migration_operator', { bootstrap_secret: options.bootstrapSecret || '', operator_identity_hex: identity.identity, note: options.note || 'temporary web api migration import', }, ); ensureProcedureOk(authorizeResult); } catch (error) { throw new Error( `授权临时 Web API identity 失败。当前 spacetime CLI identity 必须已经是迁移操作员;如果目标库迁移操作员表不为空,bootstrap secret 不会越权授权新的操作员。可先用已有迁移操作员授权当前部署机 identity,或为导入脚本提供已有迁移操作员的 --token。原始错误: ${ error instanceof Error ? error.message : String(error) }`, ); } console.log(`[spacetime:migration:import] 已授权临时 Web API identity`); return { ...options, token: identity.token, temporaryWebIdentity: identity.identity, useHttp: true, }; } async function importMigrationJsonWithFallback(options, migrationJson) { const chunkSize = resolveChunkSize(options); if (Buffer.byteLength(migrationJson, 'utf8') > chunkSize) { return importMigrationJsonChunked(options, migrationJson, chunkSize); } try { return await importMigrationJsonDirect(options, migrationJson); } catch (error) { if (!isRequestBodyTooLargeError(error)) { throw error; } console.warn( `[spacetime:migration:import] 直接导入触发 HTTP 413,改用 ${chunkSize} bytes 分片上传。`, ); return importMigrationJsonChunked(options, migrationJson, chunkSize); } } async function importMigrationJsonDirect(options, migrationJson) { const includeTables = resolveImportIncludeTables(options, migrationJson); const procedureName = options.incremental === true ? 'import_database_migration_incremental_from_file' : 'import_database_migration_from_file'; const input = { migration_json: migrationJson, include_tables: includeTables, replace_existing: options.replaceExisting === true, dry_run: options.dryRun === true, }; if (options.replaceExisting === true) { console.log( `[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`, ); } else if (options.incremental === true) { console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`); } return callSpacetimeProcedure(options, procedureName, input); } async function importMigrationJsonChunked(options, migrationJson, chunkSize) { const includeTables = resolveImportIncludeTables(options, migrationJson); const procedureName = options.incremental === true ? 'import_database_migration_incremental_from_chunks' : 'import_database_migration_from_chunks'; const uploadId = `migration-${Date.now()}-${randomUUID()}`; const chunks = splitStringByUtf8Bytes(migrationJson, chunkSize); console.log( `[spacetime:migration:import] 使用分片导入: upload_id=${uploadId}, chunks=${chunks.length}, chunk_size=${chunkSize}`, ); if (options.replaceExisting === true) { console.log( `[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`, ); } else if (options.incremental === true) { console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`); } let committed = false; try { for (let index = 0; index < chunks.length; index += 1) { const chunkResult = await callSpacetimeProcedure( options, 'put_database_migration_import_chunk', { upload_id: uploadId, chunk_index: index, chunk_count: chunks.length, chunk: chunks[index], }, ); ensureProcedureOk(chunkResult); console.log( `[spacetime:migration:import] 已上传迁移分片 ${index + 1}/${chunks.length}`, ); } const result = await callSpacetimeProcedure(options, procedureName, { upload_id: uploadId, include_tables: includeTables, replace_existing: options.replaceExisting === true, dry_run: options.dryRun === true, }); ensureProcedureOk(result); committed = true; return result; } finally { if (!committed) { await clearMigrationChunksBestEffort(options, uploadId); } } } function resolveImportIncludeTables(options, migrationJson) { if (options.replaceExisting !== true) { return options.includeTables; } const migrationTables = readMigrationTableNames(migrationJson); if (options.includeTables.length === 0) { return migrationTables; } const requestedTables = new Set(options.includeTables); return migrationTables.filter((tableName) => requestedTables.has(tableName)); } function readMigrationTableNames(migrationJson) { let payload; try { payload = JSON.parse(migrationJson); } catch (error) { throw new Error( `迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`, ); } if (!payload || !Array.isArray(payload.tables)) { throw new Error('迁移文件缺少 tables 数组。'); } const tableNames = []; const seen = new Set(); for (const table of payload.tables) { if (!table || typeof table.name !== 'string' || !table.name.trim()) { throw new Error('迁移文件 tables 内存在缺少 name 的表项。'); } const tableName = table.name.trim(); if (!seen.has(tableName)) { tableNames.push(tableName); seen.add(tableName); } } return tableNames; } function resolveChunkSize(options) { const chunkSize = options.chunkSize || DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE; if (chunkSize > 1024 * 1024) { throw new Error('--chunk-size 不能超过 1048576,避免触发迁移分片 procedure 单片限制。'); } return chunkSize; } function splitStringByUtf8Bytes(value, maxBytes) { const chunks = []; let current = ''; let currentBytes = 0; for (const character of value) { const characterBytes = Buffer.byteLength(character, 'utf8'); if (characterBytes > maxBytes) { throw new Error(`单个字符超过 chunk-size,当前 chunk-size: ${maxBytes}`); } if (currentBytes + characterBytes > maxBytes && current) { chunks.push(current); current = ''; currentBytes = 0; } current += character; currentBytes += characterBytes; } if (current) { chunks.push(current); } return chunks; } function isRequestBodyTooLargeError(error) { const message = error instanceof Error ? error.message : String(error); return ( message.includes('HTTP 413') || message.toLowerCase().includes('length limit exceeded') ); } async function clearMigrationChunksBestEffort(options, uploadId) { try { const result = await callSpacetimeProcedure( options, 'clear_database_migration_import_chunks', { upload_id: uploadId }, ); ensureProcedureOk(result); console.warn(`[spacetime:migration:import] 已清理失败导入的临时分片: ${uploadId}`); } catch (error) { console.warn( `[spacetime:migration:import] 清理临时迁移分片失败: ${ error instanceof Error ? error.message : String(error) }`, ); } } async function revokeTemporaryWebIdentity(options) { if (!options.temporaryWebIdentity) { return; } try { const revokeResult = await callSpacetimeProcedure( options, 'revoke_database_migration_operator', { operator_identity_hex: options.temporaryWebIdentity }, ); ensureProcedureOk(revokeResult); console.log(`[spacetime:migration:import] 已撤销临时 Web API identity`); } catch (error) { console.warn( `[spacetime:migration:import] 撤销临时 Web API identity 失败: ${ error instanceof Error ? error.message : String(error) }`, ); } } function printTableStats(tableStats) { if (!Array.isArray(tableStats) || tableStats.length === 0) { return; } const rows = tableStats.map((stat) => ({ table: stat.table_name, imported: stat.imported_row_count, skipped: stat.skipped_row_count, })); console.table(rows); } function printMigrationWarnings(warnings) { if (!Array.isArray(warnings) || warnings.length === 0) { return; } console.warn('[spacetime:migration:import] 迁移告警汇总:'); console.table( warnings.map((warning) => ({ table: warning.table_name, kind: warning.warning_kind, message: warning.message, })), ); }