337 lines
10 KiB
JavaScript
337 lines
10 KiB
JavaScript
#!/usr/bin/env node
|
||
|
||
import { randomUUID } from 'node:crypto';
|
||
import { readFile } from 'node:fs/promises';
|
||
import path from 'node:path';
|
||
import {
|
||
assertReadableFile,
|
||
callSpacetimeProcedure,
|
||
callSpacetimeProcedureViaCli,
|
||
createSpacetimeWebIdentity,
|
||
ensureProcedureOk,
|
||
parseArgs,
|
||
} from './spacetime-migration-common.mjs';
|
||
|
||
const DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE = 512 * 1024;
|
||
|
||
try {
|
||
const options = parseArgs(process.argv.slice(2));
|
||
if (!options.in) {
|
||
throw new Error('必须传入 --in。');
|
||
}
|
||
if (options.incremental === true && options.replaceExisting === true) {
|
||
throw new Error('--incremental 不能和 --replace-existing 同时使用。');
|
||
}
|
||
|
||
const inPath = path.resolve(options.in);
|
||
await assertReadableFile(inPath);
|
||
const migrationJson = await readFile(inPath, 'utf8');
|
||
if (!migrationJson.trim()) {
|
||
throw new Error(`迁移文件为空: ${inPath}`);
|
||
}
|
||
|
||
const webOptions = await prepareWebImportOptions(options);
|
||
let result;
|
||
try {
|
||
result = await importMigrationJsonWithFallback(webOptions, migrationJson);
|
||
} finally {
|
||
await revokeTemporaryWebIdentity(webOptions);
|
||
}
|
||
ensureProcedureOk(result);
|
||
|
||
console.log(
|
||
`[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`,
|
||
);
|
||
printTableStats(result.table_stats);
|
||
printMigrationWarnings(result.warnings);
|
||
} catch (error) {
|
||
console.error(
|
||
`[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
async function prepareWebImportOptions(options) {
|
||
if (options.token) {
|
||
return { ...options, useHttp: true };
|
||
}
|
||
|
||
const identity = await createSpacetimeWebIdentity(options);
|
||
console.log(
|
||
`[spacetime:migration:import] 已通过 Web API 创建临时 identity: ${identity.identity}`,
|
||
);
|
||
|
||
try {
|
||
const authorizeResult = await callSpacetimeProcedureViaCli(
|
||
options,
|
||
'authorize_database_migration_operator',
|
||
{
|
||
bootstrap_secret: options.bootstrapSecret || '',
|
||
operator_identity_hex: identity.identity,
|
||
note: options.note || 'temporary web api migration import',
|
||
},
|
||
);
|
||
ensureProcedureOk(authorizeResult);
|
||
} catch (error) {
|
||
throw new Error(
|
||
`授权临时 Web API identity 失败。当前 spacetime CLI identity 必须已经是迁移操作员;如果目标库迁移操作员表不为空,bootstrap secret 不会越权授权新的操作员。可先用已有迁移操作员授权当前部署机 identity,或为导入脚本提供已有迁移操作员的 --token。原始错误: ${
|
||
error instanceof Error ? error.message : String(error)
|
||
}`,
|
||
);
|
||
}
|
||
console.log(`[spacetime:migration:import] 已授权临时 Web API identity`);
|
||
|
||
return {
|
||
...options,
|
||
token: identity.token,
|
||
temporaryWebIdentity: identity.identity,
|
||
useHttp: true,
|
||
};
|
||
}
|
||
|
||
async function importMigrationJsonWithFallback(options, migrationJson) {
|
||
const chunkSize = resolveChunkSize(options);
|
||
if (Buffer.byteLength(migrationJson, 'utf8') > chunkSize) {
|
||
return importMigrationJsonChunked(options, migrationJson, chunkSize);
|
||
}
|
||
|
||
try {
|
||
return await importMigrationJsonDirect(options, migrationJson);
|
||
} catch (error) {
|
||
if (!isRequestBodyTooLargeError(error)) {
|
||
throw error;
|
||
}
|
||
console.warn(
|
||
`[spacetime:migration:import] 直接导入触发 HTTP 413,改用 ${chunkSize} bytes 分片上传。`,
|
||
);
|
||
return importMigrationJsonChunked(options, migrationJson, chunkSize);
|
||
}
|
||
}
|
||
|
||
async function importMigrationJsonDirect(options, migrationJson) {
|
||
const includeTables = resolveImportIncludeTables(options, migrationJson);
|
||
const procedureName =
|
||
options.incremental === true
|
||
? 'import_database_migration_incremental_from_file'
|
||
: 'import_database_migration_from_file';
|
||
const input = {
|
||
migration_json: migrationJson,
|
||
include_tables: includeTables,
|
||
replace_existing: options.replaceExisting === true,
|
||
dry_run: options.dryRun === true,
|
||
};
|
||
if (options.replaceExisting === true) {
|
||
console.log(
|
||
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
|
||
);
|
||
} else if (options.incremental === true) {
|
||
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
|
||
}
|
||
return callSpacetimeProcedure(options, procedureName, input);
|
||
}
|
||
|
||
async function importMigrationJsonChunked(options, migrationJson, chunkSize) {
|
||
const includeTables = resolveImportIncludeTables(options, migrationJson);
|
||
const procedureName =
|
||
options.incremental === true
|
||
? 'import_database_migration_incremental_from_chunks'
|
||
: 'import_database_migration_from_chunks';
|
||
const uploadId = `migration-${Date.now()}-${randomUUID()}`;
|
||
const chunks = splitStringByUtf8Bytes(migrationJson, chunkSize);
|
||
console.log(
|
||
`[spacetime:migration:import] 使用分片导入: upload_id=${uploadId}, chunks=${chunks.length}, chunk_size=${chunkSize}`,
|
||
);
|
||
if (options.replaceExisting === true) {
|
||
console.log(
|
||
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
|
||
);
|
||
} else if (options.incremental === true) {
|
||
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
|
||
}
|
||
|
||
let committed = false;
|
||
try {
|
||
for (let index = 0; index < chunks.length; index += 1) {
|
||
const chunkResult = await callSpacetimeProcedure(
|
||
options,
|
||
'put_database_migration_import_chunk',
|
||
{
|
||
upload_id: uploadId,
|
||
chunk_index: index,
|
||
chunk_count: chunks.length,
|
||
chunk: chunks[index],
|
||
},
|
||
);
|
||
ensureProcedureOk(chunkResult);
|
||
console.log(
|
||
`[spacetime:migration:import] 已上传迁移分片 ${index + 1}/${chunks.length}`,
|
||
);
|
||
}
|
||
|
||
const result = await callSpacetimeProcedure(options, procedureName, {
|
||
upload_id: uploadId,
|
||
include_tables: includeTables,
|
||
replace_existing: options.replaceExisting === true,
|
||
dry_run: options.dryRun === true,
|
||
});
|
||
ensureProcedureOk(result);
|
||
committed = true;
|
||
return result;
|
||
} finally {
|
||
if (!committed) {
|
||
await clearMigrationChunksBestEffort(options, uploadId);
|
||
}
|
||
}
|
||
}
|
||
|
||
function resolveImportIncludeTables(options, migrationJson) {
|
||
if (options.replaceExisting !== true) {
|
||
return options.includeTables;
|
||
}
|
||
|
||
const migrationTables = readMigrationTableNames(migrationJson);
|
||
if (options.includeTables.length === 0) {
|
||
return migrationTables;
|
||
}
|
||
|
||
const requestedTables = new Set(options.includeTables);
|
||
return migrationTables.filter((tableName) => requestedTables.has(tableName));
|
||
}
|
||
|
||
function readMigrationTableNames(migrationJson) {
|
||
let payload;
|
||
try {
|
||
payload = JSON.parse(migrationJson);
|
||
} catch (error) {
|
||
throw new Error(
|
||
`迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
}
|
||
|
||
if (!payload || !Array.isArray(payload.tables)) {
|
||
throw new Error('迁移文件缺少 tables 数组。');
|
||
}
|
||
|
||
const tableNames = [];
|
||
const seen = new Set();
|
||
for (const table of payload.tables) {
|
||
if (!table || typeof table.name !== 'string' || !table.name.trim()) {
|
||
throw new Error('迁移文件 tables 内存在缺少 name 的表项。');
|
||
}
|
||
const tableName = table.name.trim();
|
||
if (!seen.has(tableName)) {
|
||
tableNames.push(tableName);
|
||
seen.add(tableName);
|
||
}
|
||
}
|
||
|
||
return tableNames;
|
||
}
|
||
|
||
function resolveChunkSize(options) {
|
||
const chunkSize = options.chunkSize || DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE;
|
||
if (chunkSize > 1024 * 1024) {
|
||
throw new Error('--chunk-size 不能超过 1048576,避免触发迁移分片 procedure 单片限制。');
|
||
}
|
||
return chunkSize;
|
||
}
|
||
|
||
function splitStringByUtf8Bytes(value, maxBytes) {
|
||
const chunks = [];
|
||
let current = '';
|
||
let currentBytes = 0;
|
||
for (const character of value) {
|
||
const characterBytes = Buffer.byteLength(character, 'utf8');
|
||
if (characterBytes > maxBytes) {
|
||
throw new Error(`单个字符超过 chunk-size,当前 chunk-size: ${maxBytes}`);
|
||
}
|
||
if (currentBytes + characterBytes > maxBytes && current) {
|
||
chunks.push(current);
|
||
current = '';
|
||
currentBytes = 0;
|
||
}
|
||
current += character;
|
||
currentBytes += characterBytes;
|
||
}
|
||
if (current) {
|
||
chunks.push(current);
|
||
}
|
||
return chunks;
|
||
}
|
||
|
||
function isRequestBodyTooLargeError(error) {
|
||
const message = error instanceof Error ? error.message : String(error);
|
||
return (
|
||
message.includes('HTTP 413') ||
|
||
message.toLowerCase().includes('length limit exceeded')
|
||
);
|
||
}
|
||
|
||
async function clearMigrationChunksBestEffort(options, uploadId) {
|
||
try {
|
||
const result = await callSpacetimeProcedure(
|
||
options,
|
||
'clear_database_migration_import_chunks',
|
||
{ upload_id: uploadId },
|
||
);
|
||
ensureProcedureOk(result);
|
||
console.warn(`[spacetime:migration:import] 已清理失败导入的临时分片: ${uploadId}`);
|
||
} catch (error) {
|
||
console.warn(
|
||
`[spacetime:migration:import] 清理临时迁移分片失败: ${
|
||
error instanceof Error ? error.message : String(error)
|
||
}`,
|
||
);
|
||
}
|
||
}
|
||
|
||
async function revokeTemporaryWebIdentity(options) {
|
||
if (!options.temporaryWebIdentity) {
|
||
return;
|
||
}
|
||
|
||
try {
|
||
const revokeResult = await callSpacetimeProcedure(
|
||
options,
|
||
'revoke_database_migration_operator',
|
||
{ operator_identity_hex: options.temporaryWebIdentity },
|
||
);
|
||
ensureProcedureOk(revokeResult);
|
||
console.log(`[spacetime:migration:import] 已撤销临时 Web API identity`);
|
||
} catch (error) {
|
||
console.warn(
|
||
`[spacetime:migration:import] 撤销临时 Web API identity 失败: ${
|
||
error instanceof Error ? error.message : String(error)
|
||
}`,
|
||
);
|
||
}
|
||
}
|
||
|
||
function printTableStats(tableStats) {
|
||
if (!Array.isArray(tableStats) || tableStats.length === 0) {
|
||
return;
|
||
}
|
||
|
||
const rows = tableStats.map((stat) => ({
|
||
table: stat.table_name,
|
||
imported: stat.imported_row_count,
|
||
skipped: stat.skipped_row_count,
|
||
}));
|
||
console.table(rows);
|
||
}
|
||
|
||
function printMigrationWarnings(warnings) {
|
||
if (!Array.isArray(warnings) || warnings.length === 0) {
|
||
return;
|
||
}
|
||
|
||
console.warn('[spacetime:migration:import] 迁移告警汇总:');
|
||
console.table(
|
||
warnings.map((warning) => ({
|
||
table: warning.table_name,
|
||
kind: warning.warning_kind,
|
||
message: warning.message,
|
||
})),
|
||
);
|
||
}
|