Files
Genarrative/scripts/spacetime-import-migration-json.mjs
kdletters 22f3f963de
Some checks failed
CI / verify (push) Has been cancelled
Handle SpacetimeDB migration imports with chunked uploads
2026-04-30 15:20:49 +08:00

337 lines
10 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env node
import { randomUUID } from 'node:crypto';
import { readFile } from 'node:fs/promises';
import path from 'node:path';
import {
assertReadableFile,
callSpacetimeProcedure,
callSpacetimeProcedureViaCli,
createSpacetimeWebIdentity,
ensureProcedureOk,
parseArgs,
} from './spacetime-migration-common.mjs';
const DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE = 512 * 1024;
try {
const options = parseArgs(process.argv.slice(2));
if (!options.in) {
throw new Error('必须传入 --in。');
}
if (options.incremental === true && options.replaceExisting === true) {
throw new Error('--incremental 不能和 --replace-existing 同时使用。');
}
const inPath = path.resolve(options.in);
await assertReadableFile(inPath);
const migrationJson = await readFile(inPath, 'utf8');
if (!migrationJson.trim()) {
throw new Error(`迁移文件为空: ${inPath}`);
}
const webOptions = await prepareWebImportOptions(options);
let result;
try {
result = await importMigrationJsonWithFallback(webOptions, migrationJson);
} finally {
await revokeTemporaryWebIdentity(webOptions);
}
ensureProcedureOk(result);
console.log(
`[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`,
);
printTableStats(result.table_stats);
printMigrationWarnings(result.warnings);
} catch (error) {
console.error(
`[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
async function prepareWebImportOptions(options) {
if (options.token) {
return { ...options, useHttp: true };
}
const identity = await createSpacetimeWebIdentity(options);
console.log(
`[spacetime:migration:import] 已通过 Web API 创建临时 identity: ${identity.identity}`,
);
try {
const authorizeResult = await callSpacetimeProcedureViaCli(
options,
'authorize_database_migration_operator',
{
bootstrap_secret: options.bootstrapSecret || '',
operator_identity_hex: identity.identity,
note: options.note || 'temporary web api migration import',
},
);
ensureProcedureOk(authorizeResult);
} catch (error) {
throw new Error(
`授权临时 Web API identity 失败。当前 spacetime CLI identity 必须已经是迁移操作员如果目标库迁移操作员表不为空bootstrap secret 不会越权授权新的操作员。可先用已有迁移操作员授权当前部署机 identity或为导入脚本提供已有迁移操作员的 --token。原始错误: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
console.log(`[spacetime:migration:import] 已授权临时 Web API identity`);
return {
...options,
token: identity.token,
temporaryWebIdentity: identity.identity,
useHttp: true,
};
}
async function importMigrationJsonWithFallback(options, migrationJson) {
const chunkSize = resolveChunkSize(options);
if (Buffer.byteLength(migrationJson, 'utf8') > chunkSize) {
return importMigrationJsonChunked(options, migrationJson, chunkSize);
}
try {
return await importMigrationJsonDirect(options, migrationJson);
} catch (error) {
if (!isRequestBodyTooLargeError(error)) {
throw error;
}
console.warn(
`[spacetime:migration:import] 直接导入触发 HTTP 413改用 ${chunkSize} bytes 分片上传。`,
);
return importMigrationJsonChunked(options, migrationJson, chunkSize);
}
}
async function importMigrationJsonDirect(options, migrationJson) {
const includeTables = resolveImportIncludeTables(options, migrationJson);
const procedureName =
options.incremental === true
? 'import_database_migration_incremental_from_file'
: 'import_database_migration_from_file';
const input = {
migration_json: migrationJson,
include_tables: includeTables,
replace_existing: options.replaceExisting === true,
dry_run: options.dryRun === true,
};
if (options.replaceExisting === true) {
console.log(
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
);
} else if (options.incremental === true) {
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
}
return callSpacetimeProcedure(options, procedureName, input);
}
async function importMigrationJsonChunked(options, migrationJson, chunkSize) {
const includeTables = resolveImportIncludeTables(options, migrationJson);
const procedureName =
options.incremental === true
? 'import_database_migration_incremental_from_chunks'
: 'import_database_migration_from_chunks';
const uploadId = `migration-${Date.now()}-${randomUUID()}`;
const chunks = splitStringByUtf8Bytes(migrationJson, chunkSize);
console.log(
`[spacetime:migration:import] 使用分片导入: upload_id=${uploadId}, chunks=${chunks.length}, chunk_size=${chunkSize}`,
);
if (options.replaceExisting === true) {
console.log(
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
);
} else if (options.incremental === true) {
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
}
let committed = false;
try {
for (let index = 0; index < chunks.length; index += 1) {
const chunkResult = await callSpacetimeProcedure(
options,
'put_database_migration_import_chunk',
{
upload_id: uploadId,
chunk_index: index,
chunk_count: chunks.length,
chunk: chunks[index],
},
);
ensureProcedureOk(chunkResult);
console.log(
`[spacetime:migration:import] 已上传迁移分片 ${index + 1}/${chunks.length}`,
);
}
const result = await callSpacetimeProcedure(options, procedureName, {
upload_id: uploadId,
include_tables: includeTables,
replace_existing: options.replaceExisting === true,
dry_run: options.dryRun === true,
});
ensureProcedureOk(result);
committed = true;
return result;
} finally {
if (!committed) {
await clearMigrationChunksBestEffort(options, uploadId);
}
}
}
function resolveImportIncludeTables(options, migrationJson) {
if (options.replaceExisting !== true) {
return options.includeTables;
}
const migrationTables = readMigrationTableNames(migrationJson);
if (options.includeTables.length === 0) {
return migrationTables;
}
const requestedTables = new Set(options.includeTables);
return migrationTables.filter((tableName) => requestedTables.has(tableName));
}
function readMigrationTableNames(migrationJson) {
let payload;
try {
payload = JSON.parse(migrationJson);
} catch (error) {
throw new Error(
`迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`,
);
}
if (!payload || !Array.isArray(payload.tables)) {
throw new Error('迁移文件缺少 tables 数组。');
}
const tableNames = [];
const seen = new Set();
for (const table of payload.tables) {
if (!table || typeof table.name !== 'string' || !table.name.trim()) {
throw new Error('迁移文件 tables 内存在缺少 name 的表项。');
}
const tableName = table.name.trim();
if (!seen.has(tableName)) {
tableNames.push(tableName);
seen.add(tableName);
}
}
return tableNames;
}
function resolveChunkSize(options) {
const chunkSize = options.chunkSize || DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE;
if (chunkSize > 1024 * 1024) {
throw new Error('--chunk-size 不能超过 1048576避免触发迁移分片 procedure 单片限制。');
}
return chunkSize;
}
function splitStringByUtf8Bytes(value, maxBytes) {
const chunks = [];
let current = '';
let currentBytes = 0;
for (const character of value) {
const characterBytes = Buffer.byteLength(character, 'utf8');
if (characterBytes > maxBytes) {
throw new Error(`单个字符超过 chunk-size当前 chunk-size: ${maxBytes}`);
}
if (currentBytes + characterBytes > maxBytes && current) {
chunks.push(current);
current = '';
currentBytes = 0;
}
current += character;
currentBytes += characterBytes;
}
if (current) {
chunks.push(current);
}
return chunks;
}
function isRequestBodyTooLargeError(error) {
const message = error instanceof Error ? error.message : String(error);
return (
message.includes('HTTP 413') ||
message.toLowerCase().includes('length limit exceeded')
);
}
async function clearMigrationChunksBestEffort(options, uploadId) {
try {
const result = await callSpacetimeProcedure(
options,
'clear_database_migration_import_chunks',
{ upload_id: uploadId },
);
ensureProcedureOk(result);
console.warn(`[spacetime:migration:import] 已清理失败导入的临时分片: ${uploadId}`);
} catch (error) {
console.warn(
`[spacetime:migration:import] 清理临时迁移分片失败: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
async function revokeTemporaryWebIdentity(options) {
if (!options.temporaryWebIdentity) {
return;
}
try {
const revokeResult = await callSpacetimeProcedure(
options,
'revoke_database_migration_operator',
{ operator_identity_hex: options.temporaryWebIdentity },
);
ensureProcedureOk(revokeResult);
console.log(`[spacetime:migration:import] 已撤销临时 Web API identity`);
} catch (error) {
console.warn(
`[spacetime:migration:import] 撤销临时 Web API identity 失败: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
function printTableStats(tableStats) {
if (!Array.isArray(tableStats) || tableStats.length === 0) {
return;
}
const rows = tableStats.map((stat) => ({
table: stat.table_name,
imported: stat.imported_row_count,
skipped: stat.skipped_row_count,
}));
console.table(rows);
}
function printMigrationWarnings(warnings) {
if (!Array.isArray(warnings) || warnings.length === 0) {
return;
}
console.warn('[spacetime:migration:import] 迁移告警汇总:');
console.table(
warnings.map((warning) => ({
table: warning.table_name,
kind: warning.warning_kind,
message: warning.message,
})),
);
}