Add SpacetimeDB conflict migration flow
This commit is contained in:
@@ -122,6 +122,38 @@ node scripts/spacetime-revoke-migration-operator.mjs \
|
||||
|
||||
## Node 脚本
|
||||
|
||||
### 发布冲突自动迁移
|
||||
|
||||
`npm run spacetime:publish:maincloud` 默认采用冲突感知发布:
|
||||
|
||||
1. 先不清库发布新 wasm。
|
||||
2. 如果发布成功,流程结束。
|
||||
3. 如果发布失败且输出可判定为 schema 冲突,脚本自动导出旧库迁移 JSON 到 `tmp/spacetime-migrations/maincloud/<database>/<timestamp>.json`。
|
||||
4. 导出成功后执行清库发布新 wasm。
|
||||
5. 新 wasm 发布成功后,把第 3 步导出的 JSON 自动导入回灌。
|
||||
|
||||
任一阶段失败都会中止流程,并保留已经导出的迁移 JSON。非 schema 冲突的发布失败不会进入迁移流程。
|
||||
|
||||
```bash
|
||||
npm run spacetime:publish:maincloud -- --database xushi-p4wfr
|
||||
```
|
||||
|
||||
可选参数:
|
||||
|
||||
- `--no-migrate-on-conflict`:禁用冲突自动迁移,只保留原始发布失败。
|
||||
- `--migration-dir <dir>`:指定迁移 JSON 输出目录。
|
||||
- `--clear-database`:显式清库发布;该模式代表人工确认清库,不触发自动迁移。
|
||||
|
||||
冲突自动迁移需要发布脚本本次生成的 `GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET`。因此不要和 `--no-migration-bootstrap-secret` 同时使用。
|
||||
|
||||
### 删除表和删除字段
|
||||
|
||||
迁移文件来自旧模块时,可能包含新模块已经删除的表或字段。导入阶段按以下规则处理:
|
||||
|
||||
- 迁移文件包含新模块已删除或不在白名单内的表时,不中断迁移;该表全部行计入 `skipped_row_count`,并在导入结束后统一展示 `dropped_table` 告警。
|
||||
- 迁移行包含新模块已删除的旧字段时,导入 procedure 会尝试丢弃旧字段后继续反序列化;恢复成功则导入该行,并在导入结束后统一展示 `dropped_field` 告警。
|
||||
- 新模块新增必填字段、字段类型变化、枚举不兼容等无法通过“丢弃旧字段”恢复的情况仍会失败并回滚,避免写入不完整数据。
|
||||
|
||||
本机导出时,先确保本机 SpacetimeDB 服务和源数据库可访问,然后授权本机调用身份:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
import { writeFile } from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
import {
|
||||
callSpacetimeProcedureAuto,
|
||||
callSpacetimeProcedure,
|
||||
callSpacetimeProcedureViaCli,
|
||||
createSpacetimeWebIdentity,
|
||||
ensureParentDir,
|
||||
ensureProcedureOk,
|
||||
parseArgs,
|
||||
@@ -18,11 +20,13 @@ try {
|
||||
const input = {
|
||||
include_tables: options.includeTables,
|
||||
};
|
||||
const result = await callSpacetimeProcedureAuto(
|
||||
options,
|
||||
'export_database_migration_to_file',
|
||||
input,
|
||||
);
|
||||
const webOptions = await prepareWebExportOptions(options);
|
||||
let result;
|
||||
try {
|
||||
result = await callSpacetimeProcedure(webOptions, 'export_database_migration_to_file', input);
|
||||
} finally {
|
||||
await revokeTemporaryWebIdentity(webOptions);
|
||||
}
|
||||
ensureProcedureOk(result);
|
||||
|
||||
if (typeof result.migration_json !== 'string' || result.migration_json.trim() === '') {
|
||||
@@ -35,6 +39,7 @@ try {
|
||||
|
||||
console.log(`[spacetime:migration:export] 已写入 ${outPath}`);
|
||||
printTableStats(result.table_stats);
|
||||
printMigrationWarnings(result.warnings);
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`[spacetime:migration:export] ${error instanceof Error ? error.message : String(error)}`,
|
||||
@@ -42,6 +47,58 @@ try {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
async function prepareWebExportOptions(options) {
|
||||
if (options.token) {
|
||||
return { ...options, useHttp: true };
|
||||
}
|
||||
|
||||
const identity = await createSpacetimeWebIdentity(options);
|
||||
console.log(
|
||||
`[spacetime:migration:export] 已通过 Web API 创建临时 identity: ${identity.identity}`,
|
||||
);
|
||||
|
||||
const authorizeResult = await callSpacetimeProcedureViaCli(
|
||||
options,
|
||||
'authorize_database_migration_operator',
|
||||
{
|
||||
bootstrap_secret: options.bootstrapSecret || '',
|
||||
operator_identity_hex: identity.identity,
|
||||
note: options.note || 'temporary web api migration export',
|
||||
},
|
||||
);
|
||||
ensureProcedureOk(authorizeResult);
|
||||
console.log(`[spacetime:migration:export] 已授权临时 Web API identity`);
|
||||
|
||||
return {
|
||||
...options,
|
||||
token: identity.token,
|
||||
temporaryWebIdentity: identity.identity,
|
||||
useHttp: true,
|
||||
};
|
||||
}
|
||||
|
||||
async function revokeTemporaryWebIdentity(options) {
|
||||
if (!options.temporaryWebIdentity) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const revokeResult = await callSpacetimeProcedure(
|
||||
options,
|
||||
'revoke_database_migration_operator',
|
||||
{ operator_identity_hex: options.temporaryWebIdentity },
|
||||
);
|
||||
ensureProcedureOk(revokeResult);
|
||||
console.log(`[spacetime:migration:export] 已撤销临时 Web API identity`);
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
`[spacetime:migration:export] 撤销临时 Web API identity 失败: ${
|
||||
error instanceof Error ? error.message : String(error)
|
||||
}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function printTableStats(tableStats) {
|
||||
if (!Array.isArray(tableStats) || tableStats.length === 0) {
|
||||
return;
|
||||
@@ -53,3 +110,18 @@ function printTableStats(tableStats) {
|
||||
}));
|
||||
console.table(rows);
|
||||
}
|
||||
|
||||
function printMigrationWarnings(warnings) {
|
||||
if (!Array.isArray(warnings) || warnings.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.warn('[spacetime:migration:export] 迁移告警:');
|
||||
console.table(
|
||||
warnings.map((warning) => ({
|
||||
table: warning.table_name,
|
||||
kind: warning.warning_kind,
|
||||
message: warning.message,
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ try {
|
||||
`[spacetime:migration:import] ${options.dryRun ? 'dry-run 完成' : '导入完成'}: ${inPath}`,
|
||||
);
|
||||
printTableStats(result.table_stats);
|
||||
printMigrationWarnings(result.warnings);
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`[spacetime:migration:import] ${error instanceof Error ? error.message : String(error)}`,
|
||||
@@ -177,3 +178,18 @@ function printTableStats(tableStats) {
|
||||
}));
|
||||
console.table(rows);
|
||||
}
|
||||
|
||||
function printMigrationWarnings(warnings) {
|
||||
if (!Array.isArray(warnings) || warnings.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.warn('[spacetime:migration:import] 迁移告警汇总:');
|
||||
console.table(
|
||||
warnings.map((warning) => ({
|
||||
table: warning.table_name,
|
||||
kind: warning.warning_kind,
|
||||
message: warning.message,
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -250,12 +250,24 @@ function normalizeSatsProduct(value) {
|
||||
};
|
||||
}
|
||||
|
||||
if (value.length === 5) {
|
||||
return {
|
||||
ok: normalizeSatsValue(value[0]),
|
||||
schema_version: normalizeSatsValue(value[1]),
|
||||
migration_json: normalizeSatsOption(value[2]),
|
||||
table_stats: normalizeTableStats(value[3]),
|
||||
warnings: [],
|
||||
error_message: normalizeSatsOption(value[4]),
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
ok: normalizeSatsValue(value[0]),
|
||||
schema_version: normalizeSatsValue(value[1]),
|
||||
migration_json: normalizeSatsOption(value[2]),
|
||||
table_stats: normalizeTableStats(value[3]),
|
||||
error_message: normalizeSatsOption(value[4]),
|
||||
warnings: normalizeMigrationWarnings(value[4]),
|
||||
error_message: normalizeSatsOption(value[5]),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -309,6 +321,28 @@ function normalizeTableStats(value) {
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeMigrationWarnings(value) {
|
||||
if (!Array.isArray(value)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return value.map((entry) => {
|
||||
if (entry && typeof entry === 'object' && !Array.isArray(entry)) {
|
||||
return normalizeSatsValue(entry);
|
||||
}
|
||||
|
||||
if (Array.isArray(entry)) {
|
||||
return {
|
||||
table_name: normalizeSatsValue(entry[0]),
|
||||
warning_kind: normalizeSatsValue(entry[1]),
|
||||
message: normalizeSatsValue(entry[2]),
|
||||
};
|
||||
}
|
||||
|
||||
return entry;
|
||||
});
|
||||
}
|
||||
|
||||
export function resolveServerUrl(options) {
|
||||
if (options.serverUrl) {
|
||||
return options.serverUrl;
|
||||
|
||||
@@ -7,6 +7,8 @@ SERVER_RS_DIR="${REPO_ROOT}/server-rs"
|
||||
MODULE_PATH="${SERVER_RS_DIR}/target/wasm32-unknown-unknown/release/spacetime_module.wasm"
|
||||
SPACETIME_SERVER_ALIAS="maincloud"
|
||||
CLEAR_DATABASE=0
|
||||
MIGRATE_ON_CONFLICT=1
|
||||
MIGRATION_DIR=""
|
||||
MIGRATION_BOOTSTRAP_SECRET=""
|
||||
MIGRATION_BOOTSTRAP_SECRET_MODE="auto"
|
||||
|
||||
@@ -41,11 +43,13 @@ usage() {
|
||||
npm run spacetime:publish:maincloud
|
||||
npm run spacetime:publish:maincloud -- --database <database>
|
||||
npm run spacetime:publish:maincloud -- --clear-database
|
||||
npm run spacetime:publish:maincloud -- --no-migrate-on-conflict
|
||||
npm run spacetime:publish:maincloud -- --no-migration-bootstrap-secret
|
||||
|
||||
说明:
|
||||
发布 server-rs/crates/spacetime-module 到 SpacetimeDB Maincloud。
|
||||
数据库名优先读取 --database,其次读取 GENARRATIVE_SPACETIME_MAINCLOUD_DATABASE。
|
||||
默认遇到 schema 冲突时会先导出迁移 JSON,再清库发布并导入回灌。
|
||||
默认在构建 wasm 前随机生成迁移引导密钥,注入 GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET 并显示在控制台。
|
||||
EOF
|
||||
}
|
||||
@@ -80,6 +84,71 @@ prepare_migration_bootstrap_secret() {
|
||||
echo "[spacetime:maincloud] 迁移引导密钥: ${MIGRATION_BOOTSTRAP_SECRET}"
|
||||
}
|
||||
|
||||
timestamp_slug() {
|
||||
node -e 'process.stdout.write(new Date().toISOString().replace(/[:.]/g, "-"));'
|
||||
}
|
||||
|
||||
is_publish_conflict_output() {
|
||||
local output="$1"
|
||||
[[ "${output}" == *"conflict"* ]] || [[ "${output}" == *"schema"* && "${output}" == *"clear"* ]]
|
||||
}
|
||||
|
||||
run_publish() {
|
||||
local output_file="$1"
|
||||
shift
|
||||
set +e
|
||||
spacetime "$@" >"${output_file}" 2>&1
|
||||
local status=$?
|
||||
set -e
|
||||
cat "${output_file}"
|
||||
return "${status}"
|
||||
}
|
||||
|
||||
run_conflict_migration_publish() {
|
||||
local migration_root migration_file publish_log
|
||||
|
||||
if [[ "${MIGRATION_BOOTSTRAP_SECRET_MODE}" == "disabled" ]]; then
|
||||
echo "[spacetime:maincloud] schema 冲突需要迁移引导密钥;请去掉 --no-migration-bootstrap-secret 后重试。" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
migration_root="${MIGRATION_DIR:-${REPO_ROOT}/tmp/spacetime-migrations/maincloud/${SPACETIME_DATABASE}}"
|
||||
mkdir -p "${migration_root}"
|
||||
migration_file="${migration_root}/$(timestamp_slug).json"
|
||||
publish_log="$(mktemp)"
|
||||
|
||||
echo "[spacetime:maincloud] 检测到 schema 冲突,开始导出旧库迁移 JSON: ${migration_file}"
|
||||
node "${REPO_ROOT}/scripts/spacetime-export-migration-json.mjs" \
|
||||
--server "${SPACETIME_SERVER_ALIAS}" \
|
||||
--server-url "${SPACETIME_SERVER_URL}" \
|
||||
--database "${SPACETIME_DATABASE}" \
|
||||
--bootstrap-secret "${MIGRATION_BOOTSTRAP_SECRET}" \
|
||||
--out "${migration_file}" \
|
||||
--note "publish conflict export $(date -u +%Y-%m-%dT%H:%M:%SZ)"
|
||||
|
||||
echo "[spacetime:maincloud] 清库发布新 SpacetimeDB wasm"
|
||||
if ! run_publish "${publish_log}" publish "${SPACETIME_DATABASE}" --server "${SPACETIME_SERVER_ALIAS}" --bin-path "${MODULE_PATH}" --clear-database --yes; then
|
||||
echo "[spacetime:maincloud] 清库发布失败,迁移 JSON 已保留: ${migration_file}" >&2
|
||||
rm -f "${publish_log}"
|
||||
exit 1
|
||||
fi
|
||||
rm -f "${publish_log}"
|
||||
|
||||
echo "[spacetime:maincloud] 导入迁移 JSON 回灌数据"
|
||||
if ! node "${REPO_ROOT}/scripts/spacetime-import-migration-json.mjs" \
|
||||
--server "${SPACETIME_SERVER_ALIAS}" \
|
||||
--server-url "${SPACETIME_SERVER_URL}" \
|
||||
--database "${SPACETIME_DATABASE}" \
|
||||
--bootstrap-secret "${MIGRATION_BOOTSTRAP_SECRET}" \
|
||||
--in "${migration_file}" \
|
||||
--note "publish conflict import $(date -u +%Y-%m-%dT%H:%M:%SZ)"; then
|
||||
echo "[spacetime:maincloud] 导入失败,迁移 JSON 已保留: ${migration_file}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[spacetime:maincloud] schema 冲突迁移完成,迁移 JSON: ${migration_file}"
|
||||
}
|
||||
|
||||
load_env_file "${REPO_ROOT}/.env"
|
||||
load_env_file "${REPO_ROOT}/.env.local"
|
||||
|
||||
@@ -104,6 +173,14 @@ while [[ $# -gt 0 ]]; do
|
||||
CLEAR_DATABASE=1
|
||||
shift
|
||||
;;
|
||||
--no-migrate-on-conflict)
|
||||
MIGRATE_ON_CONFLICT=0
|
||||
shift
|
||||
;;
|
||||
--migration-dir)
|
||||
MIGRATION_DIR="${2:?缺少 --migration-dir 的值}"
|
||||
shift 2
|
||||
;;
|
||||
--migration-bootstrap-secret)
|
||||
MIGRATION_BOOTSTRAP_SECRET="${2:?缺少 --migration-bootstrap-secret 的值}"
|
||||
MIGRATION_BOOTSTRAP_SECRET_MODE="manual"
|
||||
@@ -166,7 +243,19 @@ if [[ "${CLEAR_DATABASE}" -eq 1 ]]; then
|
||||
fi
|
||||
|
||||
echo "[spacetime:maincloud] 发布 SpacetimeDB wasm: ${SPACETIME_DATABASE} -> ${SPACETIME_SERVER_ALIAS}"
|
||||
spacetime "${PUBLISH_ARGS[@]}"
|
||||
PUBLISH_LOG="$(mktemp)"
|
||||
if ! run_publish "${PUBLISH_LOG}" "${PUBLISH_ARGS[@]}"; then
|
||||
PUBLISH_OUTPUT="$(cat "${PUBLISH_LOG}")"
|
||||
rm -f "${PUBLISH_LOG}"
|
||||
if [[ "${CLEAR_DATABASE}" -eq 0 && "${MIGRATE_ON_CONFLICT}" -eq 1 ]] && is_publish_conflict_output "${PUBLISH_OUTPUT}"; then
|
||||
run_conflict_migration_publish
|
||||
else
|
||||
echo "[spacetime:maincloud] 发布失败。" >&2
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
rm -f "${PUBLISH_LOG}"
|
||||
fi
|
||||
|
||||
cat <<EOF
|
||||
[spacetime:maincloud] 发布完成。api-server 可使用以下环境:
|
||||
|
||||
@@ -63,12 +63,20 @@ pub struct DatabaseMigrationTableStat {
|
||||
pub skipped_row_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationWarning {
|
||||
pub table_name: String,
|
||||
pub warning_kind: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationProcedureResult {
|
||||
pub ok: bool,
|
||||
pub schema_version: u32,
|
||||
pub migration_json: Option<String>,
|
||||
pub table_stats: Vec<DatabaseMigrationTableStat>,
|
||||
pub warnings: Vec<DatabaseMigrationWarning>,
|
||||
pub error_message: Option<String>,
|
||||
}
|
||||
|
||||
@@ -239,6 +247,7 @@ pub fn export_database_migration_to_file(
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: Some(migration_json),
|
||||
table_stats: stats,
|
||||
warnings: Vec::new(),
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationProcedureResult {
|
||||
@@ -246,6 +255,7 @@ pub fn export_database_migration_to_file(
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
warnings: Vec::new(),
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
@@ -259,11 +269,12 @@ pub fn import_database_migration_from_file(
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match import_database_migration_from_file_inner(ctx, input, DatabaseMigrationImportMode::Strict)
|
||||
{
|
||||
Ok(stats) => DatabaseMigrationProcedureResult {
|
||||
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: stats,
|
||||
warnings,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationProcedureResult {
|
||||
@@ -271,6 +282,7 @@ pub fn import_database_migration_from_file(
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
warnings: Vec::new(),
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
@@ -287,11 +299,12 @@ pub fn import_database_migration_incremental_from_file(
|
||||
input,
|
||||
DatabaseMigrationImportMode::Incremental,
|
||||
) {
|
||||
Ok(stats) => DatabaseMigrationProcedureResult {
|
||||
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: stats,
|
||||
warnings,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationProcedureResult {
|
||||
@@ -299,6 +312,7 @@ pub fn import_database_migration_incremental_from_file(
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
warnings: Vec::new(),
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
@@ -327,7 +341,13 @@ fn import_database_migration_from_file_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportInput,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
||||
) -> Result<
|
||||
(
|
||||
Vec<DatabaseMigrationTableStat>,
|
||||
Vec<DatabaseMigrationWarning>,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
let caller = ctx.sender();
|
||||
let included_tables = normalize_include_tables(&input.include_tables)?;
|
||||
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
|
||||
@@ -347,7 +367,7 @@ fn import_database_migration_from_file_inner(
|
||||
));
|
||||
}
|
||||
|
||||
let stats = if input.dry_run {
|
||||
let (stats, warnings) = if input.dry_run {
|
||||
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
|
||||
} else {
|
||||
ctx.try_with_tx(|tx| {
|
||||
@@ -362,7 +382,7 @@ fn import_database_migration_from_file_inner(
|
||||
})?
|
||||
};
|
||||
|
||||
Ok(stats)
|
||||
Ok((stats, warnings))
|
||||
}
|
||||
|
||||
fn authorize_database_migration_operator_inner(
|
||||
@@ -564,11 +584,25 @@ fn build_export_stats(tables: &[MigrationTable]) -> Vec<DatabaseMigrationTableSt
|
||||
fn build_import_dry_run_stats(
|
||||
tables: &[MigrationTable],
|
||||
include_tables: Option<&HashSet<String>>,
|
||||
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
||||
) -> Result<
|
||||
(
|
||||
Vec<DatabaseMigrationTableStat>,
|
||||
Vec<DatabaseMigrationWarning>,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
let mut stats = Vec::new();
|
||||
let mut warnings = Vec::new();
|
||||
for table in tables {
|
||||
if !is_supported_migration_table(&table.name) {
|
||||
return Err(format!("迁移文件包含不支持的表: {}", table.name));
|
||||
warnings.push(build_dropped_table_warning(table));
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
imported_row_count: 0,
|
||||
skipped_row_count: table.rows.len() as u64,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if should_include_table(include_tables, &table.name) {
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
@@ -586,7 +620,7 @@ fn build_import_dry_run_stats(
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(stats)
|
||||
Ok((stats, warnings))
|
||||
}
|
||||
|
||||
fn apply_migration_file(
|
||||
@@ -595,13 +629,15 @@ fn apply_migration_file(
|
||||
include_tables: Option<&HashSet<String>>,
|
||||
replace_existing: bool,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
||||
) -> Result<
|
||||
(
|
||||
Vec<DatabaseMigrationTableStat>,
|
||||
Vec<DatabaseMigrationWarning>,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
let mut stats = Vec::new();
|
||||
for table in &migration_file.tables {
|
||||
if !is_supported_migration_table(&table.name) {
|
||||
return Err(format!("迁移文件包含不支持的表: {}", table.name));
|
||||
}
|
||||
}
|
||||
let mut warnings = Vec::new();
|
||||
|
||||
let import_table_names = build_import_table_name_set(migration_file, include_tables);
|
||||
if replace_existing {
|
||||
@@ -610,6 +646,17 @@ fn apply_migration_file(
|
||||
}
|
||||
|
||||
for table in &migration_file.tables {
|
||||
if !is_supported_migration_table(&table.name) {
|
||||
warnings.push(build_dropped_table_warning(table));
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
imported_row_count: 0,
|
||||
skipped_row_count: table.rows.len() as u64,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if !should_include_table(include_tables, &table.name) {
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
@@ -621,7 +668,7 @@ fn apply_migration_file(
|
||||
}
|
||||
|
||||
let (imported_row_count, skipped_row_count) =
|
||||
insert_migration_table_rows(ctx, table, import_mode)?;
|
||||
insert_migration_table_rows(ctx, table, import_mode, &mut warnings)?;
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
@@ -630,7 +677,7 @@ fn apply_migration_file(
|
||||
});
|
||||
}
|
||||
|
||||
Ok(stats)
|
||||
Ok((stats, warnings))
|
||||
}
|
||||
|
||||
fn build_import_table_name_set(
|
||||
@@ -645,24 +692,101 @@ fn build_import_table_name_set(
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn build_dropped_table_warning(table: &MigrationTable) -> DatabaseMigrationWarning {
|
||||
DatabaseMigrationWarning {
|
||||
table_name: table.name.clone(),
|
||||
warning_kind: "dropped_table".to_string(),
|
||||
message: format!(
|
||||
"迁移文件包含当前模块已删除或未加入白名单的表 {},已跳过 {} 行",
|
||||
table.name,
|
||||
table.rows.len()
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_dropped_field_warning(table_name: &str, field_name: &str) -> DatabaseMigrationWarning {
|
||||
DatabaseMigrationWarning {
|
||||
table_name: table_name.to_string(),
|
||||
warning_kind: "dropped_field".to_string(),
|
||||
message: format!("表 {table_name} 的旧字段 {field_name} 当前已不存在,已在导入时丢弃"),
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_json<T: spacetimedb::Serialize>(row: &T) -> Result<serde_json::Value, String> {
|
||||
serde_json::to_value(SerializeWrapper::from_ref(row))
|
||||
.map_err(|error| format!("迁移行序列化失败: {error}"))
|
||||
}
|
||||
|
||||
fn row_from_json<T>(value: &serde_json::Value) -> Result<T, String>
|
||||
fn row_from_json<T>(
|
||||
table_name: &str,
|
||||
value: &serde_json::Value,
|
||||
warnings: &mut Vec<DatabaseMigrationWarning>,
|
||||
) -> Result<T, String>
|
||||
where
|
||||
T: for<'de> spacetimedb::Deserialize<'de>,
|
||||
{
|
||||
let wrapped: DeserializeWrapper<T> = serde_json::from_value(value.clone())
|
||||
.map_err(|error| format!("迁移行反序列化失败: {error}"))?;
|
||||
let wrapped = match serde_json::from_value::<DeserializeWrapper<T>>(value.clone()) {
|
||||
Ok(row) => row,
|
||||
Err(original_error) => recover_row_with_deleted_fields::<T>(
|
||||
table_name,
|
||||
value,
|
||||
&original_error.to_string(),
|
||||
warnings,
|
||||
)
|
||||
.ok_or_else(|| format!("迁移行反序列化失败,且无法通过丢弃旧字段恢复: {original_error}"))?,
|
||||
};
|
||||
Ok(wrapped.0)
|
||||
}
|
||||
|
||||
fn recover_row_with_deleted_fields<T>(
|
||||
table_name: &str,
|
||||
value: &serde_json::Value,
|
||||
error_message: &str,
|
||||
warnings: &mut Vec<DatabaseMigrationWarning>,
|
||||
) -> Option<DeserializeWrapper<T>>
|
||||
where
|
||||
T: for<'de> spacetimedb::Deserialize<'de>,
|
||||
{
|
||||
let mut candidate = value.as_object()?.clone();
|
||||
let mut next_error = error_message.to_string();
|
||||
|
||||
loop {
|
||||
let field_name = extract_unknown_field_name(&next_error)?;
|
||||
candidate.remove(&field_name)?;
|
||||
warnings.push(build_dropped_field_warning(table_name, &field_name));
|
||||
|
||||
match serde_json::from_value::<DeserializeWrapper<T>>(serde_json::Value::Object(
|
||||
candidate.clone(),
|
||||
)) {
|
||||
Ok(row) => return Some(row),
|
||||
Err(error) => next_error = error.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_unknown_field_name(error_message: &str) -> Option<String> {
|
||||
let marker = "unknown field";
|
||||
let marker_index = error_message.find(marker)?;
|
||||
let after_marker = error_message[marker_index + marker.len()..].trim_start();
|
||||
|
||||
for quote in ['`', '"', '\''] {
|
||||
if let Some(rest) = after_marker.strip_prefix(quote) {
|
||||
let end_index = rest.find(quote)?;
|
||||
return Some(rest[..end_index].to_string());
|
||||
}
|
||||
}
|
||||
|
||||
after_marker
|
||||
.split(|character: char| !character.is_ascii_alphanumeric() && character != '_')
|
||||
.find(|value| !value.is_empty())
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
fn insert_migration_table_rows(
|
||||
ctx: &ReducerContext,
|
||||
table: &MigrationTable,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
warnings: &mut Vec<DatabaseMigrationWarning>,
|
||||
) -> Result<(u64, u64), String> {
|
||||
macro_rules! insert_table_match_arm {
|
||||
($($table:ident),+ $(,)?) => {
|
||||
@@ -672,7 +796,7 @@ fn insert_migration_table_rows(
|
||||
let mut imported = 0u64;
|
||||
let mut skipped = 0u64;
|
||||
for value in &table.rows {
|
||||
let row = row_from_json(value)
|
||||
let row = row_from_json(stringify!($table), value, warnings)
|
||||
.map_err(|error| format!("{}: {error}", stringify!($table)))?;
|
||||
let insert_result = ctx.db
|
||||
.$table()
|
||||
|
||||
Reference in New Issue
Block a user