Files
Genarrative/server-rs/crates/spacetime-module/src/migration.rs

1321 lines
44 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::runtime::analytics_date_dimension::analytics_date_dimension;
use crate::runtime::creation_entry_config::{creation_entry_config, creation_entry_type_config};
use crate::*;
use serde::{Deserialize, Serialize};
use spacetimedb::sats::de::serde::DeserializeWrapper;
use spacetimedb::sats::ser::serde::SerializeWrapper;
use std::collections::HashSet;
use crate::big_fish::big_fish_runtime_run;
use crate::match3d::tables::{
match3d_agent_message, match3d_agent_session, match3d_runtime_run, match3d_work_profile,
};
use crate::puzzle::{
puzzle_agent_message, puzzle_agent_session, puzzle_event, puzzle_leaderboard_entry,
puzzle_runtime_run, puzzle_work_profile,
};
use crate::square_hole::tables::{
square_hole_agent_message, square_hole_agent_session, square_hole_runtime_run,
square_hole_work_profile,
};
use crate::{
visual_novel_agent_message, visual_novel_agent_session, visual_novel_runtime_event,
visual_novel_runtime_history_entry, visual_novel_runtime_run, visual_novel_work_profile,
};
const MIGRATION_SCHEMA_VERSION: u32 = 1;
const MIGRATION_MAX_TABLE_NAME_LEN: usize = 96;
const MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN: usize = 128;
const MIGRATION_MAX_IMPORT_CHUNK_BYTES: usize = 1024 * 1024;
const MIGRATION_MAX_OPERATOR_NOTE_CHARS: usize = 160;
const MIGRATION_MIN_BOOTSTRAP_SECRET_LEN: usize = 16;
const MIGRATION_BOOTSTRAP_SECRET: Option<&str> =
option_env!("GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET");
#[spacetimedb::table(accessor = database_migration_operator)]
pub struct DatabaseMigrationOperator {
#[primary_key]
pub operator_identity: Identity,
pub created_at: Timestamp,
pub created_by: Identity,
pub note: String,
}
#[spacetimedb::table(
accessor = database_migration_import_chunk,
index(accessor = by_database_migration_import_upload, btree(columns = [upload_id]))
)]
pub struct DatabaseMigrationImportChunk {
#[primary_key]
pub chunk_key: String,
pub upload_id: String,
pub chunk_index: u32,
pub chunk_count: u32,
pub operator_identity: Identity,
pub created_at: Timestamp,
pub chunk: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationExportInput {
pub include_tables: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportInput {
pub migration_json: String,
pub include_tables: Vec<String>,
pub replace_existing: bool,
pub dry_run: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportChunkInput {
pub upload_id: String,
pub chunk_index: u32,
pub chunk_count: u32,
pub chunk: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportChunksInput {
pub upload_id: String,
pub include_tables: Vec<String>,
pub replace_existing: bool,
pub dry_run: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportChunksClearInput {
pub upload_id: String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DatabaseMigrationImportMode {
Strict,
Incremental,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationAuthorizeOperatorInput {
pub bootstrap_secret: String,
pub operator_identity_hex: String,
pub note: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationRevokeOperatorInput {
pub operator_identity_hex: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationTableStat {
pub table_name: String,
pub exported_row_count: u64,
pub imported_row_count: u64,
pub skipped_row_count: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationWarning {
pub table_name: String,
pub warning_kind: String,
pub message: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationProcedureResult {
pub ok: bool,
pub schema_version: u32,
pub migration_json: Option<String>,
pub table_stats: Vec<DatabaseMigrationTableStat>,
pub warnings: Vec<DatabaseMigrationWarning>,
pub error_message: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationOperatorProcedureResult {
pub ok: bool,
pub operator_identity_hex: Option<String>,
pub error_message: Option<String>,
}
#[derive(Serialize, Deserialize)]
struct MigrationFile {
schema_version: u32,
exported_at_micros: i64,
tables: Vec<MigrationTable>,
}
#[derive(Serialize, Deserialize)]
struct MigrationTable {
name: String,
rows: Vec<serde_json::Value>,
}
macro_rules! migration_tables {
($macro_name:ident $(, $arg:expr)* $(,)?) => {
$macro_name! {
$($arg,)*
auth_store_snapshot,
user_account,
auth_identity,
refresh_session,
ai_task,
ai_task_stage,
ai_text_chunk,
ai_result_reference,
ai_task_event,
runtime_snapshot,
runtime_setting,
creation_entry_config,
creation_entry_type_config,
user_browse_history,
profile_dashboard_state,
profile_wallet_ledger,
analytics_date_dimension,
tracking_event,
tracking_daily_stat,
profile_task_config,
profile_task_progress,
profile_task_reward_claim,
profile_redeem_code,
profile_redeem_code_usage,
profile_invite_code,
profile_referral_relation,
profile_played_world,
public_work_play_daily_stat,
public_work_like,
profile_membership,
profile_recharge_order,
profile_feedback_submission,
profile_save_archive,
player_progression,
chapter_progression,
npc_state,
story_session,
story_event,
inventory_slot,
battle_state,
treasure_record,
quest_record,
quest_log,
custom_world_profile,
custom_world_session,
custom_world_agent_session,
custom_world_agent_message,
custom_world_agent_operation,
custom_world_draft_card,
custom_world_gallery_entry,
asset_object,
asset_entity_binding,
asset_event,
puzzle_agent_session,
puzzle_agent_message,
puzzle_work_profile,
puzzle_event,
puzzle_runtime_run,
puzzle_leaderboard_entry,
match3d_agent_session,
match3d_agent_message,
match3d_work_profile,
match3d_runtime_run,
square_hole_agent_session,
square_hole_agent_message,
square_hole_work_profile,
square_hole_runtime_run,
visual_novel_agent_session,
visual_novel_agent_message,
visual_novel_work_profile,
visual_novel_runtime_run,
visual_novel_runtime_history_entry,
visual_novel_runtime_event,
big_fish_creation_session,
big_fish_agent_message,
big_fish_asset_slot,
big_fish_runtime_run,
big_fish_event
}
};
}
macro_rules! collect_all_migration_tables {
($ctx:expr, $include_tables:expr, $tables:expr) => {
migration_tables!(collect_migration_table, $ctx, $include_tables, $tables);
};
}
macro_rules! collect_migration_table {
($ctx:expr, $include_tables:expr, $tables:expr, $($table:ident),+ $(,)?) => {
$(
if should_include_table($include_tables, stringify!($table)) {
let rows = $ctx
.db
.$table()
.iter()
.map(|row| row_to_json(&row))
.collect::<Result<Vec<_>, _>>()?;
$tables.push(MigrationTable {
name: stringify!($table).to_string(),
rows,
});
}
)+
};
}
macro_rules! clear_all_migration_tables {
($ctx:expr, $include_tables:expr) => {
migration_tables!(clear_migration_table, $ctx, $include_tables);
};
}
macro_rules! clear_migration_table {
($ctx:expr, $include_tables:expr, $($table:ident),+ $(,)?) => {
$(
if should_include_table($include_tables, stringify!($table)) {
for row in $ctx.db.$table().iter().collect::<Vec<_>>() {
$ctx.db.$table().delete(row);
}
}
)+
};
}
// 迁移权限独立存表,避免把 private 表导出能力开放给任意登录身份。
#[spacetimedb::procedure]
pub fn authorize_database_migration_operator(
ctx: &mut ProcedureContext,
input: DatabaseMigrationAuthorizeOperatorInput,
) -> DatabaseMigrationOperatorProcedureResult {
match authorize_database_migration_operator_inner(ctx, input) {
Ok(operator_identity_hex) => DatabaseMigrationOperatorProcedureResult {
ok: true,
operator_identity_hex: Some(operator_identity_hex),
error_message: None,
},
Err(error) => DatabaseMigrationOperatorProcedureResult {
ok: false,
operator_identity_hex: None,
error_message: Some(error),
},
}
}
#[spacetimedb::procedure]
pub fn revoke_database_migration_operator(
ctx: &mut ProcedureContext,
input: DatabaseMigrationRevokeOperatorInput,
) -> DatabaseMigrationOperatorProcedureResult {
match revoke_database_migration_operator_inner(ctx, input) {
Ok(operator_identity_hex) => DatabaseMigrationOperatorProcedureResult {
ok: true,
operator_identity_hex: Some(operator_identity_hex),
error_message: None,
},
Err(error) => DatabaseMigrationOperatorProcedureResult {
ok: false,
operator_identity_hex: None,
error_message: Some(error),
},
}
}
// 迁移导出走 procedure 返回 JSON 字符串,避免 reducer 无返回值且不能读取 private 表给外部。
#[spacetimedb::procedure]
pub fn export_database_migration_to_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationExportInput,
) -> DatabaseMigrationProcedureResult {
match export_database_migration_to_file_inner(ctx, input) {
Ok((migration_json, stats)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: Some(migration_json),
table_stats: stats,
warnings: Vec::new(),
error_message: None,
},
Err(error) => DatabaseMigrationProcedureResult {
ok: false,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
warnings: Vec::new(),
error_message: Some(error),
},
}
}
// 迁移导入由 Node 侧读文件后把 JSON 字符串传入procedure 只负责校验和写表事务。
#[spacetimedb::procedure]
pub fn import_database_migration_from_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_file_inner(ctx, input, DatabaseMigrationImportMode::Strict)
{
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
warnings,
error_message: None,
},
Err(error) => DatabaseMigrationProcedureResult {
ok: false,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
warnings: Vec::new(),
error_message: Some(error),
},
}
}
// 增量导入只插入目标库缺失的行;主键或唯一约束冲突的行会跳过,不更新已有数据。
#[spacetimedb::procedure]
pub fn import_database_migration_incremental_from_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_file_inner(
ctx,
input,
DatabaseMigrationImportMode::Incremental,
) {
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
warnings,
error_message: None,
},
Err(error) => DatabaseMigrationProcedureResult {
ok: false,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
warnings: Vec::new(),
error_message: Some(error),
},
}
}
// 大迁移 JSON 先按分片写入私有临时表,避免单次 HTTP request body 触发 SpacetimeDB 413。
#[spacetimedb::procedure]
pub fn put_database_migration_import_chunk(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunkInput,
) -> DatabaseMigrationProcedureResult {
match put_database_migration_import_chunk_inner(ctx, input) {
Ok(()) => empty_database_migration_result(true, None),
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
// 分片提交保持与直接导入相同的严格追加语义;提交成功后清理临时分片。
#[spacetimedb::procedure]
pub fn import_database_migration_from_chunks(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_chunks_inner(
ctx,
input,
DatabaseMigrationImportMode::Strict,
) {
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
warnings,
error_message: None,
},
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
// 分片增量提交只插入目标库缺失的行;主键或唯一约束冲突的行会跳过。
#[spacetimedb::procedure]
pub fn import_database_migration_incremental_from_chunks(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_chunks_inner(
ctx,
input,
DatabaseMigrationImportMode::Incremental,
) {
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
warnings,
error_message: None,
},
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
// 调用方上传失败或提交失败时可显式清理同一 upload_id 的临时分片。
#[spacetimedb::procedure]
pub fn clear_database_migration_import_chunks(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksClearInput,
) -> DatabaseMigrationProcedureResult {
match clear_database_migration_import_chunks_inner(ctx, input) {
Ok(()) => empty_database_migration_result(true, None),
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
fn export_database_migration_to_file_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationExportInput,
) -> Result<(String, Vec<DatabaseMigrationTableStat>), String> {
let caller = ctx.sender();
let included_tables = normalize_include_tables(&input.include_tables)?;
let exported_at_micros = ctx.timestamp.to_micros_since_unix_epoch();
let migration_file = ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
build_migration_file(tx, exported_at_micros, included_tables.as_ref())
})?;
let stats = build_export_stats(&migration_file.tables);
let content = serde_json::to_string_pretty(&migration_file)
.map_err(|error| format!("迁移文件序列化失败: {error}"))?;
Ok((content, stats))
}
fn import_database_migration_from_file_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
import_mode: DatabaseMigrationImportMode,
) -> Result<
(
Vec<DatabaseMigrationTableStat>,
Vec<DatabaseMigrationWarning>,
),
String,
> {
let caller = ctx.sender();
let included_tables = normalize_include_tables(&input.include_tables)?;
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
return Err("增量导入不能同时启用 replace_existing".to_string());
}
if input.migration_json.trim().is_empty() {
return Err("migration_json 不能为空".to_string());
}
ctx.try_with_tx(|tx| require_migration_operator(tx, caller))?;
let migration_file = parse_migration_file(&input.migration_json)?;
let (stats, warnings) = if input.dry_run {
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
} else {
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
apply_migration_file(
tx,
&migration_file,
included_tables.as_ref(),
input.replace_existing,
import_mode,
)
})?
};
Ok((stats, warnings))
}
fn put_database_migration_import_chunk_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunkInput,
) -> Result<(), String> {
let caller = ctx.sender();
let upload_id = normalize_import_upload_id(&input.upload_id)?;
if input.chunk_count == 0 {
return Err("分片总数必须大于 0".to_string());
}
if input.chunk_index >= input.chunk_count {
return Err(format!(
"分片序号越界: {} / {}",
input.chunk_index, input.chunk_count
));
}
if input.chunk.is_empty() {
return Err("迁移 JSON 分片不能为空".to_string());
}
if input.chunk.len() > MIGRATION_MAX_IMPORT_CHUNK_BYTES {
return Err(format!(
"迁移 JSON 分片过大,单片最多 {} bytes",
MIGRATION_MAX_IMPORT_CHUNK_BYTES
));
}
let chunk_key = build_import_chunk_key(&upload_id, input.chunk_index);
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
if let Some(existing) = tx
.db
.database_migration_import_chunk()
.chunk_key()
.find(&chunk_key)
{
if existing.operator_identity != caller {
return Err("同名迁移分片已由其他 identity 上传,已拒绝覆盖".to_string());
}
tx.db
.database_migration_import_chunk()
.chunk_key()
.delete(&chunk_key);
}
tx.db
.database_migration_import_chunk()
.insert(DatabaseMigrationImportChunk {
chunk_key: chunk_key.clone(),
upload_id: upload_id.clone(),
chunk_index: input.chunk_index,
chunk_count: input.chunk_count,
operator_identity: caller,
created_at: tx.timestamp,
chunk: input.chunk.clone(),
});
Ok(())
})?;
Ok(())
}
fn import_database_migration_from_chunks_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksInput,
import_mode: DatabaseMigrationImportMode,
) -> Result<
(
Vec<DatabaseMigrationTableStat>,
Vec<DatabaseMigrationWarning>,
),
String,
> {
let caller = ctx.sender();
let upload_id = normalize_import_upload_id(&input.upload_id)?;
let included_tables = normalize_include_tables(&input.include_tables)?;
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
return Err("增量导入不能同时启用 replace_existing".to_string());
}
let migration_json = ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
read_database_migration_import_chunks(tx, &upload_id, caller)
})?;
let migration_file = parse_migration_file(&migration_json)?;
let (stats, warnings) = if input.dry_run {
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
} else {
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
apply_migration_file(
tx,
&migration_file,
included_tables.as_ref(),
input.replace_existing,
import_mode,
)
})?
};
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
clear_database_migration_import_chunks_tx(tx, &upload_id);
Ok::<(), String>(())
})?;
Ok((stats, warnings))
}
fn clear_database_migration_import_chunks_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksClearInput,
) -> Result<(), String> {
let caller = ctx.sender();
let upload_id = normalize_import_upload_id(&input.upload_id)?;
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
clear_database_migration_import_chunks_tx(tx, &upload_id);
Ok::<(), String>(())
})?;
Ok(())
}
fn empty_database_migration_result(
ok: bool,
error_message: Option<String>,
) -> DatabaseMigrationProcedureResult {
DatabaseMigrationProcedureResult {
ok,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
warnings: Vec::new(),
error_message,
}
}
fn parse_migration_file(migration_json: &str) -> Result<MigrationFile, String> {
if migration_json.trim().is_empty() {
return Err("migration_json 不能为空".to_string());
}
let migration_file = serde_json::from_str::<MigrationFile>(migration_json)
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
return Err(format!(
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
));
}
Ok(migration_file)
}
fn authorize_database_migration_operator_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationAuthorizeOperatorInput,
) -> Result<String, String> {
let caller = ctx.sender();
let operator_identity = parse_migration_operator_identity(&input.operator_identity_hex)?;
let note = normalize_migration_operator_note(&input.note)?;
let bootstrap_secret = input.bootstrap_secret.trim().to_string();
ctx.try_with_tx(|tx| {
authorize_database_migration_operator_tx(
tx,
caller,
operator_identity,
&bootstrap_secret,
note.clone(),
)
})?;
Ok(operator_identity.to_hex().to_string())
}
fn revoke_database_migration_operator_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationRevokeOperatorInput,
) -> Result<String, String> {
let caller = ctx.sender();
let operator_identity = parse_migration_operator_identity(&input.operator_identity_hex)?;
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
if tx
.db
.database_migration_operator()
.operator_identity()
.find(&operator_identity)
.is_none()
{
return Err("迁移操作员不存在".to_string());
}
tx.db
.database_migration_operator()
.operator_identity()
.delete(&operator_identity);
Ok(())
})?;
Ok(operator_identity.to_hex().to_string())
}
fn authorize_database_migration_operator_tx(
ctx: &ReducerContext,
caller: Identity,
operator_identity: Identity,
bootstrap_secret: &str,
note: String,
) -> Result<(), String> {
let has_operator = ctx.db.database_migration_operator().iter().next().is_some();
if has_operator {
require_migration_operator(ctx, caller)?;
} else {
require_migration_bootstrap_secret(bootstrap_secret)?;
}
if ctx
.db
.database_migration_operator()
.operator_identity()
.find(&operator_identity)
.is_some()
{
ctx.db
.database_migration_operator()
.operator_identity()
.delete(&operator_identity);
}
ctx.db
.database_migration_operator()
.insert(DatabaseMigrationOperator {
operator_identity,
created_at: ctx.timestamp,
created_by: caller,
note,
});
Ok(())
}
fn require_migration_operator(ctx: &ReducerContext, caller: Identity) -> Result<(), String> {
if ctx
.db
.database_migration_operator()
.operator_identity()
.find(&caller)
.is_some()
{
Ok(())
} else {
Err("当前 identity 未被授权执行数据库迁移".to_string())
}
}
fn require_migration_bootstrap_secret(input: &str) -> Result<(), String> {
let configured_secret = MIGRATION_BOOTSTRAP_SECRET
.map(str::trim)
.filter(|secret| !secret.is_empty())
.ok_or_else(|| "迁移引导密钥未配置,无法创建首个操作员".to_string())?;
if configured_secret.chars().count() < MIGRATION_MIN_BOOTSTRAP_SECRET_LEN {
return Err("迁移引导密钥长度不足,至少需要 16 个字符".to_string());
}
if input != configured_secret {
return Err("迁移引导密钥不正确".to_string());
}
Ok(())
}
fn parse_migration_operator_identity(input: &str) -> Result<Identity, String> {
let identity_hex = input.trim().trim_start_matches("0x");
if identity_hex.len() != 64 {
return Err("operator_identity_hex 必须是 64 位十六进制 identity".to_string());
}
Identity::from_hex(identity_hex)
.map_err(|error| format!("operator_identity_hex 格式不合法: {error}"))
}
fn normalize_migration_operator_note(input: &str) -> Result<String, String> {
let note = input.trim();
if note.chars().count() > MIGRATION_MAX_OPERATOR_NOTE_CHARS {
return Err(format!(
"迁移操作员备注过长,最多 {} 个字符",
MIGRATION_MAX_OPERATOR_NOTE_CHARS
));
}
Ok(note.to_string())
}
fn normalize_import_upload_id(input: &str) -> Result<String, String> {
let upload_id = input.trim();
if upload_id.is_empty() {
return Err("upload_id 不能为空".to_string());
}
if upload_id.len() > MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN {
return Err(format!(
"upload_id 过长,最多 {} bytes",
MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN
));
}
if !upload_id
.chars()
.all(|character| character.is_ascii_alphanumeric() || matches!(character, '-' | '_'))
{
return Err("upload_id 只能使用 ASCII 字母、数字、短横线或下划线".to_string());
}
Ok(upload_id.to_string())
}
fn build_import_chunk_key(upload_id: &str, chunk_index: u32) -> String {
format!("{upload_id}:{chunk_index:010}")
}
fn read_database_migration_import_chunks(
ctx: &ReducerContext,
upload_id: &str,
caller: Identity,
) -> Result<String, String> {
let mut chunks = ctx
.db
.database_migration_import_chunk()
.by_database_migration_import_upload()
.filter(upload_id)
.collect::<Vec<_>>();
if chunks.is_empty() {
return Err(format!("未找到迁移 JSON 分片: {upload_id}"));
}
if chunks.iter().any(|chunk| chunk.operator_identity != caller) {
return Err("迁移 JSON 分片包含其他 identity 上传的片段,已拒绝提交".to_string());
}
let chunk_count = chunks[0].chunk_count;
if chunk_count == 0 {
return Err("迁移 JSON 分片总数不合法".to_string());
}
if chunks
.iter()
.any(|chunk| chunk.chunk_count != chunk_count || chunk.upload_id != upload_id)
{
return Err("迁移 JSON 分片总数不一致".to_string());
}
if chunks.len() != chunk_count as usize {
return Err(format!(
"迁移 JSON 分片未上传完整,已收到 {} / {}",
chunks.len(),
chunk_count
));
}
chunks.sort_by_key(|chunk| chunk.chunk_index);
let mut expected_index = 0u32;
let mut migration_json = String::new();
for chunk in chunks {
if chunk.chunk_index != expected_index {
return Err(format!("迁移 JSON 分片缺失序号: {expected_index}"));
}
migration_json.push_str(&chunk.chunk);
expected_index = expected_index.saturating_add(1);
}
Ok(migration_json)
}
fn clear_database_migration_import_chunks_tx(ctx: &ReducerContext, upload_id: &str) {
let chunk_keys = ctx
.db
.database_migration_import_chunk()
.by_database_migration_import_upload()
.filter(upload_id)
.map(|chunk| chunk.chunk_key)
.collect::<Vec<_>>();
for chunk_key in chunk_keys {
ctx.db
.database_migration_import_chunk()
.chunk_key()
.delete(&chunk_key);
}
}
fn normalize_include_tables(input: &[String]) -> Result<Option<HashSet<String>>, String> {
if input.is_empty() {
return Ok(None);
}
let mut tables = HashSet::new();
for raw_name in input {
let name = raw_name.trim();
if name.is_empty() {
continue;
}
if name.len() > MIGRATION_MAX_TABLE_NAME_LEN {
return Err(format!("迁移表名过长: {name}"));
}
if !is_supported_migration_table(name) {
return Err(format!("迁移表不在白名单内: {name}"));
}
tables.insert(name.to_string());
}
Ok(Some(tables))
}
fn should_include_table(include_tables: Option<&HashSet<String>>, table_name: &str) -> bool {
include_tables
.map(|tables| tables.contains(table_name))
.unwrap_or(true)
}
fn build_migration_file(
ctx: &ReducerContext,
exported_at_micros: i64,
include_tables: Option<&HashSet<String>>,
) -> Result<MigrationFile, String> {
let mut tables = Vec::new();
collect_all_migration_tables!(ctx, include_tables, tables);
Ok(MigrationFile {
schema_version: MIGRATION_SCHEMA_VERSION,
exported_at_micros,
tables,
})
}
fn build_export_stats(tables: &[MigrationTable]) -> Vec<DatabaseMigrationTableStat> {
tables
.iter()
.map(|table| DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: table.rows.len() as u64,
imported_row_count: 0,
skipped_row_count: 0,
})
.collect()
}
fn build_import_dry_run_stats(
tables: &[MigrationTable],
include_tables: Option<&HashSet<String>>,
) -> Result<
(
Vec<DatabaseMigrationTableStat>,
Vec<DatabaseMigrationWarning>,
),
String,
> {
let mut stats = Vec::new();
let mut warnings = Vec::new();
for table in tables {
if !is_supported_migration_table(&table.name) {
warnings.push(build_dropped_table_warning(table));
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: 0,
skipped_row_count: table.rows.len() as u64,
});
continue;
}
if should_include_table(include_tables, &table.name) {
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: table.rows.len() as u64,
skipped_row_count: 0,
});
} else {
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: 0,
skipped_row_count: table.rows.len() as u64,
});
}
}
Ok((stats, warnings))
}
fn apply_migration_file(
ctx: &ReducerContext,
migration_file: &MigrationFile,
include_tables: Option<&HashSet<String>>,
replace_existing: bool,
import_mode: DatabaseMigrationImportMode,
) -> Result<
(
Vec<DatabaseMigrationTableStat>,
Vec<DatabaseMigrationWarning>,
),
String,
> {
let mut stats = Vec::new();
let mut warnings = Vec::new();
let import_table_names = build_import_table_name_set(migration_file, include_tables);
if replace_existing {
// replace_existing 只覆盖本次迁移文件实际会导入的表,避免分批导入时误清空其它迁移白名单表。
clear_all_migration_tables!(ctx, Some(&import_table_names));
}
for table in &migration_file.tables {
if !is_supported_migration_table(&table.name) {
warnings.push(build_dropped_table_warning(table));
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: 0,
skipped_row_count: table.rows.len() as u64,
});
continue;
}
if !should_include_table(include_tables, &table.name) {
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: 0,
skipped_row_count: table.rows.len() as u64,
});
continue;
}
let (imported_row_count, skipped_row_count) =
insert_migration_table_rows(ctx, table, import_mode, &mut warnings)?;
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count,
skipped_row_count,
});
}
Ok((stats, warnings))
}
fn build_import_table_name_set(
migration_file: &MigrationFile,
include_tables: Option<&HashSet<String>>,
) -> HashSet<String> {
migration_file
.tables
.iter()
.filter(|table| should_include_table(include_tables, &table.name))
.map(|table| table.name.clone())
.collect()
}
fn build_dropped_table_warning(table: &MigrationTable) -> DatabaseMigrationWarning {
DatabaseMigrationWarning {
table_name: table.name.clone(),
warning_kind: "dropped_table".to_string(),
message: format!(
"迁移文件包含当前模块已删除或未加入白名单的表 {},已跳过 {} 行",
table.name,
table.rows.len()
),
}
}
fn build_dropped_field_warning(table_name: &str, field_name: &str) -> DatabaseMigrationWarning {
DatabaseMigrationWarning {
table_name: table_name.to_string(),
warning_kind: "dropped_field".to_string(),
message: format!("表 {table_name} 的旧字段 {field_name} 当前已不存在,已在导入时丢弃"),
}
}
fn row_to_json<T: spacetimedb::Serialize>(row: &T) -> Result<serde_json::Value, String> {
serde_json::to_value(SerializeWrapper::from_ref(row))
.map_err(|error| format!("迁移行序列化失败: {error}"))
}
fn row_from_json<T>(
table_name: &str,
value: &serde_json::Value,
warnings: &mut Vec<DatabaseMigrationWarning>,
) -> Result<T, String>
where
T: for<'de> spacetimedb::Deserialize<'de>,
{
let wrapped = match serde_json::from_value::<DeserializeWrapper<T>>(value.clone()) {
Ok(row) => row,
Err(original_error) => recover_row_with_deleted_fields::<T>(
table_name,
value,
&original_error.to_string(),
warnings,
)
.ok_or_else(|| format!("迁移行反序列化失败,且无法通过丢弃旧字段恢复: {original_error}"))?,
};
Ok(wrapped.0)
}
fn normalize_migration_row(table_name: &str, value: &serde_json::Value) -> serde_json::Value {
let mut next_value = value.clone();
if table_name == "user_account" {
if let Some(object) = next_value.as_object_mut() {
// 中文注释:头像字段晚于认证拆表加入,旧迁移包按未设置头像兼容。
object
.entry("avatar_url".to_string())
.or_insert(serde_json::Value::Null);
}
}
if table_name == "profile_invite_code" {
if let Some(object) = next_value.as_object_mut() {
// 中文注释:邀请码 metadata 晚于邀请表加入,旧迁移包按空对象兼容。
object
.entry("metadata_json".to_string())
.or_insert_with(|| serde_json::Value::String("{}".to_string()));
}
}
if table_name == "big_fish_creation_session" {
if let Some(object) = next_value.as_object_mut() {
// 中文注释:旧迁移包没有公开游玩次数字段,导入时按新建作品默认 0 兼容。
object
.entry("play_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("remix_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("like_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("published_at".to_string())
.or_insert(serde_json::Value::Null);
}
}
if table_name == "custom_world_profile" || table_name == "custom_world_gallery_entry" {
if let Some(object) = next_value.as_object_mut() {
// 中文注释:自定义世界公开互动计数字段晚于基础作品表加入,旧迁移包按 0 兼容。
object
.entry("play_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("remix_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("like_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
}
}
if table_name == "puzzle_work_profile" {
if let Some(object) = next_value.as_object_mut() {
// 中文注释:拼图公开互动计数晚于基础作品表加入,旧迁移包按 0 兼容。
object
.entry("play_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("remix_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("like_count".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("point_incentive_total_half_points".to_string())
.or_insert_with(|| serde_json::Value::from(0));
object
.entry("point_incentive_claimed_points".to_string())
.or_insert_with(|| serde_json::Value::from(0));
// 中文注释:拼图多关卡字段晚于旧作品表加入,旧迁移包留空并由读取层补出首关。
object
.entry("levels_json".to_string())
.or_insert_with(|| serde_json::Value::from(""));
// 中文注释:作品名称/描述从旧关卡名/画面摘要拆出,旧行保留旧值做兼容回填。
let fallback_title = object
.get("level_name")
.cloned()
.unwrap_or_else(|| serde_json::Value::from(""));
object
.entry("work_title".to_string())
.or_insert(fallback_title);
let fallback_description = object
.get("summary")
.cloned()
.unwrap_or_else(|| serde_json::Value::from(""));
object
.entry("work_description".to_string())
.or_insert(fallback_description);
}
}
next_value
}
fn recover_row_with_deleted_fields<T>(
table_name: &str,
value: &serde_json::Value,
error_message: &str,
warnings: &mut Vec<DatabaseMigrationWarning>,
) -> Option<DeserializeWrapper<T>>
where
T: for<'de> spacetimedb::Deserialize<'de>,
{
let mut candidate = value.as_object()?.clone();
let mut next_error = error_message.to_string();
loop {
let field_name = extract_unknown_field_name(&next_error)?;
candidate.remove(&field_name)?;
warnings.push(build_dropped_field_warning(table_name, &field_name));
match serde_json::from_value::<DeserializeWrapper<T>>(serde_json::Value::Object(
candidate.clone(),
)) {
Ok(row) => return Some(row),
Err(error) => next_error = error.to_string(),
}
}
}
fn extract_unknown_field_name(error_message: &str) -> Option<String> {
let marker = "unknown field";
let marker_index = error_message.find(marker)?;
let after_marker = error_message[marker_index + marker.len()..].trim_start();
for quote in ['`', '"', '\''] {
if let Some(rest) = after_marker.strip_prefix(quote) {
let end_index = rest.find(quote)?;
return Some(rest[..end_index].to_string());
}
}
after_marker
.split(|character: char| !character.is_ascii_alphanumeric() && character != '_')
.find(|value| !value.is_empty())
.map(str::to_string)
}
fn insert_migration_table_rows(
ctx: &ReducerContext,
table: &MigrationTable,
import_mode: DatabaseMigrationImportMode,
warnings: &mut Vec<DatabaseMigrationWarning>,
) -> Result<(u64, u64), String> {
macro_rules! insert_table_match_arm {
($($table:ident),+ $(,)?) => {
match table.name.as_str() {
$(
stringify!($table) => {
let mut imported = 0u64;
let mut skipped = 0u64;
for value in &table.rows {
let normalized_value = normalize_migration_row(stringify!($table), value);
let row = row_from_json(stringify!($table), &normalized_value, warnings)
.map_err(|error| format!("{}: {error}", stringify!($table)))?;
let insert_result = ctx.db
.$table()
.try_insert(row);
match insert_result {
Ok(_) => imported = imported.saturating_add(1),
Err(error) => {
if import_mode == DatabaseMigrationImportMode::Incremental {
skipped = skipped.saturating_add(1);
} else {
return Err(format!("{} 导入失败: {error}", stringify!($table)));
}
}
}
}
Ok((imported, skipped))
}
)+
_ => Err(format!("迁移表不在白名单内: {}", table.name)),
}
};
}
migration_tables!(insert_table_match_arm)
}
fn is_supported_migration_table(table_name: &str) -> bool {
macro_rules! supported_table_match {
($($table:ident),+ $(,)?) => {
matches!(
table_name,
$(stringify!($table))|+
)
};
}
migration_tables!(supported_table_match)
}