diff --git a/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md b/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md index 9dd88b5b..33e53949 100644 --- a/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md +++ b/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md @@ -95,13 +95,21 @@ node scripts/spacetime-revoke-migration-operator.mjs \ `import_database_migration_from_file(ctx, input)` +`import_database_migration_incremental_from_file(ctx, input)` + 输入字段: - `migration_json`: 导出 procedure 生成的完整迁移 JSON 字符串。 - `include_tables`: 可选表名白名单。为空时导入文件内所有支持表。 -- `replace_existing`: 是否先清空目标表。跨服务器全量迁移必须为 `true`。 +- `replace_existing`: 是否先清空本次迁移文件内实际导入的目标表。不会清空迁移文件未包含的表;分批迁移时只覆盖当前批次。 - `dry_run`: 只解析和统计,不写表。 +导入模式: + +- 默认严格追加:不清空目标表,逐行插入;遇到主键或唯一约束冲突时失败并回滚,适合确认目标库没有同表旧数据时使用。 +- 增量追加:调用 `import_database_migration_incremental_from_file`,不清空目标表;遇到已存在或唯一约束冲突的行会跳过并计入 `skipped_row_count`,只插入目标库缺失的行。该模式不会更新目标库已有行。 +- 覆盖导入:`replace_existing = true` 时先删除覆盖范围内的目标表旧数据,再插入迁移文件中的数据;只适合迁移文件是这些表完整快照的场景。 + 返回字段: - `ok`: 是否成功。 @@ -152,10 +160,22 @@ node scripts/spacetime-import-migration-json.mjs \ --server maincloud \ --database xushi-p4wfr \ --bootstrap-secret <服务器目标库发布时输出的随机密钥> \ - --in tmp/spacetime-migrations/source-2026-04-27.json \ - --replace-existing + --in tmp/spacetime-migrations/source-2026-04-27.json ``` +如果目标库已有部分数据,且只想补充缺失行,使用增量模式: + +```bash +node scripts/spacetime-import-migration-json.mjs \ + --server maincloud \ + --database xushi-p4wfr \ + --bootstrap-secret <服务器目标库发布时输出的随机密钥> \ + --in tmp/spacetime-migrations/source-2026-04-27.json \ + --incremental +``` + +如果目标库对应表已有数据,并且本次文件应作为这些表的覆盖来源,再显式追加 `--replace-existing`。脚本会把覆盖范围限定为迁移文件内实际包含且本次会导入的表,避免分批导入时清空文件外的其它表。 + 默认情况下,脚本会自动完成三步: 1. `POST /v1/identity` 创建临时 Web API identity/token。 @@ -169,6 +189,10 @@ node scripts/spacetime-import-migration-json.mjs \ 正式导入前建议先加 `--dry-run`,确认 JSON 可解析、版本匹配、表名都在迁移白名单内。 +`--dry-run` 不会模拟目标库主键或唯一约束冲突,因此增量模式的 `skipped_row_count` 只有真实导入时才准确。 + +不要在只想追加数据时使用 `--replace-existing`。该参数会先删除覆盖范围内的目标表旧数据,再插入迁移文件中的数据;如果源文件不是完整快照,会造成目标表数据丢失。 + 如需分批迁移,可用逗号分隔表名: ```bash diff --git a/scripts/spacetime-import-migration-json.mjs b/scripts/spacetime-import-migration-json.mjs index 1f47fd74..ba869e3a 100644 --- a/scripts/spacetime-import-migration-json.mjs +++ b/scripts/spacetime-import-migration-json.mjs @@ -16,6 +16,9 @@ try { if (!options.in) { throw new Error('必须传入 --in。'); } + if (options.incremental === true && options.replaceExisting === true) { + throw new Error('--incremental 不能和 --replace-existing 同时使用。'); + } const inPath = path.resolve(options.in); await assertReadableFile(inPath); @@ -75,13 +78,69 @@ async function prepareWebImportOptions(options) { } async function importMigrationJsonDirect(options, migrationJson) { + const includeTables = resolveImportIncludeTables(options, migrationJson); + const procedureName = + options.incremental === true + ? 'import_database_migration_incremental_from_file' + : 'import_database_migration_from_file'; const input = { migration_json: migrationJson, - include_tables: options.includeTables, + include_tables: includeTables, replace_existing: options.replaceExisting === true, dry_run: options.dryRun === true, }; - return callSpacetimeProcedure(options, 'import_database_migration_from_file', input); + if (options.replaceExisting === true) { + console.log( + `[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`, + ); + } else if (options.incremental === true) { + console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`); + } + return callSpacetimeProcedure(options, procedureName, input); +} + +function resolveImportIncludeTables(options, migrationJson) { + if (options.replaceExisting !== true) { + return options.includeTables; + } + + const migrationTables = readMigrationTableNames(migrationJson); + if (options.includeTables.length === 0) { + return migrationTables; + } + + const requestedTables = new Set(options.includeTables); + return migrationTables.filter((tableName) => requestedTables.has(tableName)); +} + +function readMigrationTableNames(migrationJson) { + let payload; + try { + payload = JSON.parse(migrationJson); + } catch (error) { + throw new Error( + `迁移文件 JSON 解析失败: ${error instanceof Error ? error.message : String(error)}`, + ); + } + + if (!payload || !Array.isArray(payload.tables)) { + throw new Error('迁移文件缺少 tables 数组。'); + } + + const tableNames = []; + const seen = new Set(); + for (const table of payload.tables) { + if (!table || typeof table.name !== 'string' || !table.name.trim()) { + throw new Error('迁移文件 tables 内存在缺少 name 的表项。'); + } + const tableName = table.name.trim(); + if (!seen.has(tableName)) { + tableNames.push(tableName); + seen.add(tableName); + } + } + + return tableNames; } async function revokeTemporaryWebIdentity(options) { diff --git a/scripts/spacetime-migration-common.mjs b/scripts/spacetime-migration-common.mjs index 91e93e45..1b1261d4 100644 --- a/scripts/spacetime-migration-common.mjs +++ b/scripts/spacetime-migration-common.mjs @@ -67,6 +67,8 @@ export function parseArgs(argv) { .filter(Boolean); } else if (arg === '--replace-existing') { options.replaceExisting = true; + } else if (arg === '--incremental') { + options.incremental = true; } else if (arg === '--dry-run') { options.dryRun = true; } else if (arg === '--anonymous' || arg === '--no-config') { diff --git a/server-rs/crates/spacetime-module/src/migration.rs b/server-rs/crates/spacetime-module/src/migration.rs index 1ff8b286..3a128daf 100644 --- a/server-rs/crates/spacetime-module/src/migration.rs +++ b/server-rs/crates/spacetime-module/src/migration.rs @@ -37,6 +37,12 @@ pub struct DatabaseMigrationImportInput { pub dry_run: bool, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum DatabaseMigrationImportMode { + Strict, + Incremental, +} + #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct DatabaseMigrationAuthorizeOperatorInput { pub bootstrap_secret: String, @@ -252,7 +258,36 @@ pub fn import_database_migration_from_file( ctx: &mut ProcedureContext, input: DatabaseMigrationImportInput, ) -> DatabaseMigrationProcedureResult { - match import_database_migration_from_file_inner(ctx, input) { + match import_database_migration_from_file_inner(ctx, input, DatabaseMigrationImportMode::Strict) + { + Ok(stats) => DatabaseMigrationProcedureResult { + ok: true, + schema_version: MIGRATION_SCHEMA_VERSION, + migration_json: None, + table_stats: stats, + error_message: None, + }, + Err(error) => DatabaseMigrationProcedureResult { + ok: false, + schema_version: MIGRATION_SCHEMA_VERSION, + migration_json: None, + table_stats: Vec::new(), + error_message: Some(error), + }, + } +} + +// 增量导入只插入目标库缺失的行;主键或唯一约束冲突的行会跳过,不更新已有数据。 +#[spacetimedb::procedure] +pub fn import_database_migration_incremental_from_file( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportInput, +) -> DatabaseMigrationProcedureResult { + match import_database_migration_from_file_inner( + ctx, + input, + DatabaseMigrationImportMode::Incremental, + ) { Ok(stats) => DatabaseMigrationProcedureResult { ok: true, schema_version: MIGRATION_SCHEMA_VERSION, @@ -292,9 +327,13 @@ fn export_database_migration_to_file_inner( fn import_database_migration_from_file_inner( ctx: &mut ProcedureContext, input: DatabaseMigrationImportInput, + import_mode: DatabaseMigrationImportMode, ) -> Result, String> { let caller = ctx.sender(); let included_tables = normalize_include_tables(&input.include_tables)?; + if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing { + return Err("增量导入不能同时启用 replace_existing".to_string()); + } if input.migration_json.trim().is_empty() { return Err("migration_json 不能为空".to_string()); } @@ -319,6 +358,7 @@ fn import_database_migration_from_file_inner( &migration_file, included_tables.as_ref(), input.replace_existing, + import_mode, ) })? }; @@ -555,6 +595,7 @@ fn apply_migration_file( migration_file: &MigrationFile, include_tables: Option<&HashSet>, replace_existing: bool, + import_mode: DatabaseMigrationImportMode, ) -> Result, String> { let mut stats = Vec::new(); for table in &migration_file.tables { @@ -563,8 +604,10 @@ fn apply_migration_file( } } + let import_table_names = build_import_table_name_set(migration_file, include_tables); if replace_existing { - clear_all_migration_tables!(ctx, include_tables); + // replace_existing 只覆盖本次迁移文件实际会导入的表,避免分批导入时误清空其它迁移白名单表。 + clear_all_migration_tables!(ctx, Some(&import_table_names)); } for table in &migration_file.tables { @@ -578,18 +621,31 @@ fn apply_migration_file( continue; } - let imported_row_count = insert_migration_table_rows(ctx, table)?; + let (imported_row_count, skipped_row_count) = + insert_migration_table_rows(ctx, table, import_mode)?; stats.push(DatabaseMigrationTableStat { table_name: table.name.clone(), exported_row_count: 0, imported_row_count, - skipped_row_count: 0, + skipped_row_count, }); } Ok(stats) } +fn build_import_table_name_set( + migration_file: &MigrationFile, + include_tables: Option<&HashSet>, +) -> HashSet { + migration_file + .tables + .iter() + .filter(|table| should_include_table(include_tables, &table.name)) + .map(|table| table.name.clone()) + .collect() +} + fn row_to_json(row: &T) -> Result { serde_json::to_value(SerializeWrapper::from_ref(row)) .map_err(|error| format!("迁移行序列化失败: {error}")) @@ -607,23 +663,33 @@ where fn insert_migration_table_rows( ctx: &ReducerContext, table: &MigrationTable, -) -> Result { + import_mode: DatabaseMigrationImportMode, +) -> Result<(u64, u64), String> { macro_rules! insert_table_match_arm { ($($table:ident),+ $(,)?) => { match table.name.as_str() { $( stringify!($table) => { let mut imported = 0u64; + let mut skipped = 0u64; for value in &table.rows { let row = row_from_json(value) .map_err(|error| format!("{}: {error}", stringify!($table)))?; - ctx.db + let insert_result = ctx.db .$table() - .try_insert(row) - .map_err(|error| format!("{} 导入失败: {error}", stringify!($table)))?; - imported = imported.saturating_add(1); + .try_insert(row); + match insert_result { + Ok(_) => imported = imported.saturating_add(1), + Err(error) => { + if import_mode == DatabaseMigrationImportMode::Incremental { + skipped = skipped.saturating_add(1); + } else { + return Err(format!("{} 导入失败: {error}", stringify!($table))); + } + } + } } - Ok(imported) + Ok((imported, skipped)) } )+ _ => Err(format!("迁移表不在白名单内: {}", table.name)),