|
|
|
|
@@ -63,12 +63,20 @@ pub struct DatabaseMigrationTableStat {
|
|
|
|
|
pub skipped_row_count: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
|
|
|
|
pub struct DatabaseMigrationWarning {
|
|
|
|
|
pub table_name: String,
|
|
|
|
|
pub warning_kind: String,
|
|
|
|
|
pub message: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
|
|
|
|
pub struct DatabaseMigrationProcedureResult {
|
|
|
|
|
pub ok: bool,
|
|
|
|
|
pub schema_version: u32,
|
|
|
|
|
pub migration_json: Option<String>,
|
|
|
|
|
pub table_stats: Vec<DatabaseMigrationTableStat>,
|
|
|
|
|
pub warnings: Vec<DatabaseMigrationWarning>,
|
|
|
|
|
pub error_message: Option<String>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -241,6 +249,7 @@ pub fn export_database_migration_to_file(
|
|
|
|
|
schema_version: MIGRATION_SCHEMA_VERSION,
|
|
|
|
|
migration_json: Some(migration_json),
|
|
|
|
|
table_stats: stats,
|
|
|
|
|
warnings: Vec::new(),
|
|
|
|
|
error_message: None,
|
|
|
|
|
},
|
|
|
|
|
Err(error) => DatabaseMigrationProcedureResult {
|
|
|
|
|
@@ -248,6 +257,7 @@ pub fn export_database_migration_to_file(
|
|
|
|
|
schema_version: MIGRATION_SCHEMA_VERSION,
|
|
|
|
|
migration_json: None,
|
|
|
|
|
table_stats: Vec::new(),
|
|
|
|
|
warnings: Vec::new(),
|
|
|
|
|
error_message: Some(error),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
@@ -261,11 +271,12 @@ pub fn import_database_migration_from_file(
|
|
|
|
|
) -> DatabaseMigrationProcedureResult {
|
|
|
|
|
match import_database_migration_from_file_inner(ctx, input, DatabaseMigrationImportMode::Strict)
|
|
|
|
|
{
|
|
|
|
|
Ok(stats) => DatabaseMigrationProcedureResult {
|
|
|
|
|
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
|
|
|
|
|
ok: true,
|
|
|
|
|
schema_version: MIGRATION_SCHEMA_VERSION,
|
|
|
|
|
migration_json: None,
|
|
|
|
|
table_stats: stats,
|
|
|
|
|
warnings,
|
|
|
|
|
error_message: None,
|
|
|
|
|
},
|
|
|
|
|
Err(error) => DatabaseMigrationProcedureResult {
|
|
|
|
|
@@ -273,6 +284,7 @@ pub fn import_database_migration_from_file(
|
|
|
|
|
schema_version: MIGRATION_SCHEMA_VERSION,
|
|
|
|
|
migration_json: None,
|
|
|
|
|
table_stats: Vec::new(),
|
|
|
|
|
warnings: Vec::new(),
|
|
|
|
|
error_message: Some(error),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
@@ -289,11 +301,12 @@ pub fn import_database_migration_incremental_from_file(
|
|
|
|
|
input,
|
|
|
|
|
DatabaseMigrationImportMode::Incremental,
|
|
|
|
|
) {
|
|
|
|
|
Ok(stats) => DatabaseMigrationProcedureResult {
|
|
|
|
|
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
|
|
|
|
|
ok: true,
|
|
|
|
|
schema_version: MIGRATION_SCHEMA_VERSION,
|
|
|
|
|
migration_json: None,
|
|
|
|
|
table_stats: stats,
|
|
|
|
|
warnings,
|
|
|
|
|
error_message: None,
|
|
|
|
|
},
|
|
|
|
|
Err(error) => DatabaseMigrationProcedureResult {
|
|
|
|
|
@@ -301,6 +314,7 @@ pub fn import_database_migration_incremental_from_file(
|
|
|
|
|
schema_version: MIGRATION_SCHEMA_VERSION,
|
|
|
|
|
migration_json: None,
|
|
|
|
|
table_stats: Vec::new(),
|
|
|
|
|
warnings: Vec::new(),
|
|
|
|
|
error_message: Some(error),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
@@ -329,7 +343,13 @@ fn import_database_migration_from_file_inner(
|
|
|
|
|
ctx: &mut ProcedureContext,
|
|
|
|
|
input: DatabaseMigrationImportInput,
|
|
|
|
|
import_mode: DatabaseMigrationImportMode,
|
|
|
|
|
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
|
|
|
|
) -> Result<
|
|
|
|
|
(
|
|
|
|
|
Vec<DatabaseMigrationTableStat>,
|
|
|
|
|
Vec<DatabaseMigrationWarning>,
|
|
|
|
|
),
|
|
|
|
|
String,
|
|
|
|
|
> {
|
|
|
|
|
let caller = ctx.sender();
|
|
|
|
|
let included_tables = normalize_include_tables(&input.include_tables)?;
|
|
|
|
|
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
|
|
|
|
|
@@ -349,7 +369,7 @@ fn import_database_migration_from_file_inner(
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let stats = if input.dry_run {
|
|
|
|
|
let (stats, warnings) = if input.dry_run {
|
|
|
|
|
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
|
|
|
|
|
} else {
|
|
|
|
|
ctx.try_with_tx(|tx| {
|
|
|
|
|
@@ -364,7 +384,7 @@ fn import_database_migration_from_file_inner(
|
|
|
|
|
})?
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(stats)
|
|
|
|
|
Ok((stats, warnings))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn authorize_database_migration_operator_inner(
|
|
|
|
|
@@ -566,11 +586,25 @@ fn build_export_stats(tables: &[MigrationTable]) -> Vec<DatabaseMigrationTableSt
|
|
|
|
|
fn build_import_dry_run_stats(
|
|
|
|
|
tables: &[MigrationTable],
|
|
|
|
|
include_tables: Option<&HashSet<String>>,
|
|
|
|
|
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
|
|
|
|
) -> Result<
|
|
|
|
|
(
|
|
|
|
|
Vec<DatabaseMigrationTableStat>,
|
|
|
|
|
Vec<DatabaseMigrationWarning>,
|
|
|
|
|
),
|
|
|
|
|
String,
|
|
|
|
|
> {
|
|
|
|
|
let mut stats = Vec::new();
|
|
|
|
|
let mut warnings = Vec::new();
|
|
|
|
|
for table in tables {
|
|
|
|
|
if !is_supported_migration_table(&table.name) {
|
|
|
|
|
return Err(format!("迁移文件包含不支持的表: {}", table.name));
|
|
|
|
|
warnings.push(build_dropped_table_warning(table));
|
|
|
|
|
stats.push(DatabaseMigrationTableStat {
|
|
|
|
|
table_name: table.name.clone(),
|
|
|
|
|
exported_row_count: 0,
|
|
|
|
|
imported_row_count: 0,
|
|
|
|
|
skipped_row_count: table.rows.len() as u64,
|
|
|
|
|
});
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if should_include_table(include_tables, &table.name) {
|
|
|
|
|
stats.push(DatabaseMigrationTableStat {
|
|
|
|
|
@@ -588,7 +622,7 @@ fn build_import_dry_run_stats(
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(stats)
|
|
|
|
|
Ok((stats, warnings))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn apply_migration_file(
|
|
|
|
|
@@ -597,13 +631,15 @@ fn apply_migration_file(
|
|
|
|
|
include_tables: Option<&HashSet<String>>,
|
|
|
|
|
replace_existing: bool,
|
|
|
|
|
import_mode: DatabaseMigrationImportMode,
|
|
|
|
|
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
|
|
|
|
) -> Result<
|
|
|
|
|
(
|
|
|
|
|
Vec<DatabaseMigrationTableStat>,
|
|
|
|
|
Vec<DatabaseMigrationWarning>,
|
|
|
|
|
),
|
|
|
|
|
String,
|
|
|
|
|
> {
|
|
|
|
|
let mut stats = Vec::new();
|
|
|
|
|
for table in &migration_file.tables {
|
|
|
|
|
if !is_supported_migration_table(&table.name) {
|
|
|
|
|
return Err(format!("迁移文件包含不支持的表: {}", table.name));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let mut warnings = Vec::new();
|
|
|
|
|
|
|
|
|
|
let import_table_names = build_import_table_name_set(migration_file, include_tables);
|
|
|
|
|
if replace_existing {
|
|
|
|
|
@@ -612,6 +648,17 @@ fn apply_migration_file(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for table in &migration_file.tables {
|
|
|
|
|
if !is_supported_migration_table(&table.name) {
|
|
|
|
|
warnings.push(build_dropped_table_warning(table));
|
|
|
|
|
stats.push(DatabaseMigrationTableStat {
|
|
|
|
|
table_name: table.name.clone(),
|
|
|
|
|
exported_row_count: 0,
|
|
|
|
|
imported_row_count: 0,
|
|
|
|
|
skipped_row_count: table.rows.len() as u64,
|
|
|
|
|
});
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !should_include_table(include_tables, &table.name) {
|
|
|
|
|
stats.push(DatabaseMigrationTableStat {
|
|
|
|
|
table_name: table.name.clone(),
|
|
|
|
|
@@ -623,7 +670,7 @@ fn apply_migration_file(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let (imported_row_count, skipped_row_count) =
|
|
|
|
|
insert_migration_table_rows(ctx, table, import_mode)?;
|
|
|
|
|
insert_migration_table_rows(ctx, table, import_mode, &mut warnings)?;
|
|
|
|
|
stats.push(DatabaseMigrationTableStat {
|
|
|
|
|
table_name: table.name.clone(),
|
|
|
|
|
exported_row_count: 0,
|
|
|
|
|
@@ -632,7 +679,7 @@ fn apply_migration_file(
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(stats)
|
|
|
|
|
Ok((stats, warnings))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_import_table_name_set(
|
|
|
|
|
@@ -647,17 +694,49 @@ fn build_import_table_name_set(
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_dropped_table_warning(table: &MigrationTable) -> DatabaseMigrationWarning {
|
|
|
|
|
DatabaseMigrationWarning {
|
|
|
|
|
table_name: table.name.clone(),
|
|
|
|
|
warning_kind: "dropped_table".to_string(),
|
|
|
|
|
message: format!(
|
|
|
|
|
"迁移文件包含当前模块已删除或未加入白名单的表 {},已跳过 {} 行",
|
|
|
|
|
table.name,
|
|
|
|
|
table.rows.len()
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_dropped_field_warning(table_name: &str, field_name: &str) -> DatabaseMigrationWarning {
|
|
|
|
|
DatabaseMigrationWarning {
|
|
|
|
|
table_name: table_name.to_string(),
|
|
|
|
|
warning_kind: "dropped_field".to_string(),
|
|
|
|
|
message: format!("表 {table_name} 的旧字段 {field_name} 当前已不存在,已在导入时丢弃"),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn row_to_json<T: spacetimedb::Serialize>(row: &T) -> Result<serde_json::Value, String> {
|
|
|
|
|
serde_json::to_value(SerializeWrapper::from_ref(row))
|
|
|
|
|
.map_err(|error| format!("迁移行序列化失败: {error}"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn row_from_json<T>(value: &serde_json::Value) -> Result<T, String>
|
|
|
|
|
fn row_from_json<T>(
|
|
|
|
|
table_name: &str,
|
|
|
|
|
value: &serde_json::Value,
|
|
|
|
|
warnings: &mut Vec<DatabaseMigrationWarning>,
|
|
|
|
|
) -> Result<T, String>
|
|
|
|
|
where
|
|
|
|
|
T: for<'de> spacetimedb::Deserialize<'de>,
|
|
|
|
|
{
|
|
|
|
|
let wrapped: DeserializeWrapper<T> = serde_json::from_value(value.clone())
|
|
|
|
|
.map_err(|error| format!("迁移行反序列化失败: {error}"))?;
|
|
|
|
|
let wrapped = match serde_json::from_value::<DeserializeWrapper<T>>(value.clone()) {
|
|
|
|
|
Ok(row) => row,
|
|
|
|
|
Err(original_error) => recover_row_with_deleted_fields::<T>(
|
|
|
|
|
table_name,
|
|
|
|
|
value,
|
|
|
|
|
&original_error.to_string(),
|
|
|
|
|
warnings,
|
|
|
|
|
)
|
|
|
|
|
.ok_or_else(|| format!("迁移行反序列化失败,且无法通过丢弃旧字段恢复: {original_error}"))?,
|
|
|
|
|
};
|
|
|
|
|
Ok(wrapped.0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -711,10 +790,55 @@ fn normalize_migration_row(table_name: &str, value: &serde_json::Value) -> serde
|
|
|
|
|
next_value
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn recover_row_with_deleted_fields<T>(
|
|
|
|
|
table_name: &str,
|
|
|
|
|
value: &serde_json::Value,
|
|
|
|
|
error_message: &str,
|
|
|
|
|
warnings: &mut Vec<DatabaseMigrationWarning>,
|
|
|
|
|
) -> Option<DeserializeWrapper<T>>
|
|
|
|
|
where
|
|
|
|
|
T: for<'de> spacetimedb::Deserialize<'de>,
|
|
|
|
|
{
|
|
|
|
|
let mut candidate = value.as_object()?.clone();
|
|
|
|
|
let mut next_error = error_message.to_string();
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
let field_name = extract_unknown_field_name(&next_error)?;
|
|
|
|
|
candidate.remove(&field_name)?;
|
|
|
|
|
warnings.push(build_dropped_field_warning(table_name, &field_name));
|
|
|
|
|
|
|
|
|
|
match serde_json::from_value::<DeserializeWrapper<T>>(serde_json::Value::Object(
|
|
|
|
|
candidate.clone(),
|
|
|
|
|
)) {
|
|
|
|
|
Ok(row) => return Some(row),
|
|
|
|
|
Err(error) => next_error = error.to_string(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn extract_unknown_field_name(error_message: &str) -> Option<String> {
|
|
|
|
|
let marker = "unknown field";
|
|
|
|
|
let marker_index = error_message.find(marker)?;
|
|
|
|
|
let after_marker = error_message[marker_index + marker.len()..].trim_start();
|
|
|
|
|
|
|
|
|
|
for quote in ['`', '"', '\''] {
|
|
|
|
|
if let Some(rest) = after_marker.strip_prefix(quote) {
|
|
|
|
|
let end_index = rest.find(quote)?;
|
|
|
|
|
return Some(rest[..end_index].to_string());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
after_marker
|
|
|
|
|
.split(|character: char| !character.is_ascii_alphanumeric() && character != '_')
|
|
|
|
|
.find(|value| !value.is_empty())
|
|
|
|
|
.map(str::to_string)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn insert_migration_table_rows(
|
|
|
|
|
ctx: &ReducerContext,
|
|
|
|
|
table: &MigrationTable,
|
|
|
|
|
import_mode: DatabaseMigrationImportMode,
|
|
|
|
|
warnings: &mut Vec<DatabaseMigrationWarning>,
|
|
|
|
|
) -> Result<(u64, u64), String> {
|
|
|
|
|
macro_rules! insert_table_match_arm {
|
|
|
|
|
($($table:ident),+ $(,)?) => {
|
|
|
|
|
@@ -725,7 +849,7 @@ fn insert_migration_table_rows(
|
|
|
|
|
let mut skipped = 0u64;
|
|
|
|
|
for value in &table.rows {
|
|
|
|
|
let normalized_value = normalize_migration_row(stringify!($table), value);
|
|
|
|
|
let row = row_from_json(&normalized_value)
|
|
|
|
|
let row = row_from_json(stringify!($table), &normalized_value, warnings)
|
|
|
|
|
.map_err(|error| format!("{}: {error}", stringify!($table)))?;
|
|
|
|
|
let insert_result = ctx.db
|
|
|
|
|
.$table()
|
|
|
|
|
|