feat: add incremental spacetime migration import
Some checks failed
CI / verify (push) Has been cancelled

This commit is contained in:
2026-04-27 17:15:45 +08:00
parent 1e4a64f542
commit e9a6cd38f9
4 changed files with 166 additions and 15 deletions

View File

@@ -37,6 +37,12 @@ pub struct DatabaseMigrationImportInput {
pub dry_run: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DatabaseMigrationImportMode {
Strict,
Incremental,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationAuthorizeOperatorInput {
pub bootstrap_secret: String,
@@ -252,7 +258,36 @@ pub fn import_database_migration_from_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_file_inner(ctx, input) {
match import_database_migration_from_file_inner(ctx, input, DatabaseMigrationImportMode::Strict)
{
Ok(stats) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
error_message: None,
},
Err(error) => DatabaseMigrationProcedureResult {
ok: false,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
error_message: Some(error),
},
}
}
// 增量导入只插入目标库缺失的行;主键或唯一约束冲突的行会跳过,不更新已有数据。
#[spacetimedb::procedure]
pub fn import_database_migration_incremental_from_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_file_inner(
ctx,
input,
DatabaseMigrationImportMode::Incremental,
) {
Ok(stats) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
@@ -292,9 +327,13 @@ fn export_database_migration_to_file_inner(
fn import_database_migration_from_file_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
import_mode: DatabaseMigrationImportMode,
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
let caller = ctx.sender();
let included_tables = normalize_include_tables(&input.include_tables)?;
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
return Err("增量导入不能同时启用 replace_existing".to_string());
}
if input.migration_json.trim().is_empty() {
return Err("migration_json 不能为空".to_string());
}
@@ -319,6 +358,7 @@ fn import_database_migration_from_file_inner(
&migration_file,
included_tables.as_ref(),
input.replace_existing,
import_mode,
)
})?
};
@@ -555,6 +595,7 @@ fn apply_migration_file(
migration_file: &MigrationFile,
include_tables: Option<&HashSet<String>>,
replace_existing: bool,
import_mode: DatabaseMigrationImportMode,
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
let mut stats = Vec::new();
for table in &migration_file.tables {
@@ -563,8 +604,10 @@ fn apply_migration_file(
}
}
let import_table_names = build_import_table_name_set(migration_file, include_tables);
if replace_existing {
clear_all_migration_tables!(ctx, include_tables);
// replace_existing 只覆盖本次迁移文件实际会导入的表,避免分批导入时误清空其它迁移白名单表。
clear_all_migration_tables!(ctx, Some(&import_table_names));
}
for table in &migration_file.tables {
@@ -578,18 +621,31 @@ fn apply_migration_file(
continue;
}
let imported_row_count = insert_migration_table_rows(ctx, table)?;
let (imported_row_count, skipped_row_count) =
insert_migration_table_rows(ctx, table, import_mode)?;
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count,
skipped_row_count: 0,
skipped_row_count,
});
}
Ok(stats)
}
fn build_import_table_name_set(
migration_file: &MigrationFile,
include_tables: Option<&HashSet<String>>,
) -> HashSet<String> {
migration_file
.tables
.iter()
.filter(|table| should_include_table(include_tables, &table.name))
.map(|table| table.name.clone())
.collect()
}
fn row_to_json<T: spacetimedb::Serialize>(row: &T) -> Result<serde_json::Value, String> {
serde_json::to_value(SerializeWrapper::from_ref(row))
.map_err(|error| format!("迁移行序列化失败: {error}"))
@@ -607,23 +663,33 @@ where
fn insert_migration_table_rows(
ctx: &ReducerContext,
table: &MigrationTable,
) -> Result<u64, String> {
import_mode: DatabaseMigrationImportMode,
) -> Result<(u64, u64), String> {
macro_rules! insert_table_match_arm {
($($table:ident),+ $(,)?) => {
match table.name.as_str() {
$(
stringify!($table) => {
let mut imported = 0u64;
let mut skipped = 0u64;
for value in &table.rows {
let row = row_from_json(value)
.map_err(|error| format!("{}: {error}", stringify!($table)))?;
ctx.db
let insert_result = ctx.db
.$table()
.try_insert(row)
.map_err(|error| format!("{} 导入失败: {error}", stringify!($table)))?;
imported = imported.saturating_add(1);
.try_insert(row);
match insert_result {
Ok(_) => imported = imported.saturating_add(1),
Err(error) => {
if import_mode == DatabaseMigrationImportMode::Incremental {
skipped = skipped.saturating_add(1);
} else {
return Err(format!("{} 导入失败: {error}", stringify!($table)));
}
}
}
}
Ok(imported)
Ok((imported, skipped))
}
)+
_ => Err(format!("迁移表不在白名单内: {}", table.name)),