feat: add spacetimedb json migration tooling
Some checks failed
CI / verify (push) Has been cancelled

This commit is contained in:
2026-04-27 14:54:26 +08:00
parent ded6f6ee2a
commit 9a79494c68
13 changed files with 1532 additions and 2 deletions

1
server-rs/Cargo.lock generated
View File

@@ -2698,6 +2698,7 @@ dependencies = [
"serde_json",
"shared-kernel",
"spacetimedb",
"spacetimedb-lib",
]
[[package]]

View File

@@ -11,6 +11,7 @@ crate-type = ["cdylib"]
log = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
spacetimedb-lib = { version = "=2.1.0", default-features = false, features = ["serde"] }
module-ai = { path = "../module-ai", default-features = false, features = ["spacetime-types"] }
module-assets = { path = "../module-assets", default-features = false, features = ["spacetime-types"] }
module-big-fish = { path = "../module-big-fish", default-features = false, features = ["spacetime-types"] }

View File

@@ -20,7 +20,9 @@ use module_quest::{
};
pub(crate) use serde_json::{Map as JsonMap, Value as JsonValue, json};
pub(crate) use shared_kernel::format_timestamp_micros;
pub use spacetimedb::{ProcedureContext, ReducerContext, SpacetimeType, Table, Timestamp};
pub use spacetimedb::{
Identity, ProcedureContext, ReducerContext, SpacetimeType, Table, Timestamp,
};
use std::collections::HashSet;
mod ai;
@@ -29,6 +31,7 @@ mod auth;
mod big_fish;
mod domain_types;
mod entry;
mod migration;
mod puzzle;
mod runtime;
@@ -38,6 +41,7 @@ pub use auth::*;
pub use big_fish::*;
pub use domain_types::*;
pub use entry::*;
pub use migration::*;
pub use runtime::*;
#[spacetimedb::table(accessor = player_progression)]

View File

@@ -0,0 +1,648 @@
use crate::*;
use serde::{Deserialize, Serialize};
use spacetimedb_lib::sats::de::serde::DeserializeWrapper;
use spacetimedb_lib::sats::ser::serde::SerializeWrapper;
use std::collections::HashSet;
use crate::puzzle::{
puzzle_agent_message, puzzle_agent_session, puzzle_runtime_run, puzzle_work_profile,
};
const MIGRATION_SCHEMA_VERSION: u32 = 1;
const MIGRATION_MAX_TABLE_NAME_LEN: usize = 96;
const MIGRATION_MAX_OPERATOR_NOTE_CHARS: usize = 160;
const MIGRATION_MIN_BOOTSTRAP_SECRET_LEN: usize = 16;
const MIGRATION_BOOTSTRAP_SECRET: Option<&str> =
option_env!("GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET");
#[spacetimedb::table(accessor = database_migration_operator)]
pub struct DatabaseMigrationOperator {
#[primary_key]
pub operator_identity: Identity,
pub created_at: Timestamp,
pub created_by: Identity,
pub note: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationExportInput {
pub include_tables: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportInput {
pub migration_json: String,
pub include_tables: Vec<String>,
pub replace_existing: bool,
pub dry_run: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationAuthorizeOperatorInput {
pub bootstrap_secret: String,
pub operator_identity_hex: String,
pub note: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationRevokeOperatorInput {
pub operator_identity_hex: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationTableStat {
pub table_name: String,
pub exported_row_count: u64,
pub imported_row_count: u64,
pub skipped_row_count: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationProcedureResult {
pub ok: bool,
pub schema_version: u32,
pub migration_json: Option<String>,
pub table_stats: Vec<DatabaseMigrationTableStat>,
pub error_message: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationOperatorProcedureResult {
pub ok: bool,
pub operator_identity_hex: Option<String>,
pub error_message: Option<String>,
}
#[derive(Serialize, Deserialize)]
struct MigrationFile {
schema_version: u32,
exported_at_micros: i64,
tables: Vec<MigrationTable>,
}
#[derive(Serialize, Deserialize)]
struct MigrationTable {
name: String,
rows: Vec<serde_json::Value>,
}
macro_rules! migration_tables {
($macro_name:ident $(, $arg:expr)* $(,)?) => {
$macro_name! {
$($arg,)*
auth_store_snapshot,
user_account,
auth_identity,
refresh_session,
ai_task,
ai_task_stage,
ai_text_chunk,
ai_result_reference,
runtime_snapshot,
runtime_setting,
user_browse_history,
profile_dashboard_state,
profile_wallet_ledger,
profile_invite_code,
profile_referral_relation,
profile_played_world,
profile_membership,
profile_recharge_order,
profile_save_archive,
player_progression,
chapter_progression,
npc_state,
story_session,
story_event,
inventory_slot,
battle_state,
treasure_record,
quest_record,
quest_log,
custom_world_profile,
custom_world_session,
custom_world_agent_session,
custom_world_agent_message,
custom_world_agent_operation,
custom_world_draft_card,
custom_world_gallery_entry,
asset_object,
asset_entity_binding,
puzzle_agent_session,
puzzle_agent_message,
puzzle_work_profile,
puzzle_runtime_run,
big_fish_creation_session,
big_fish_agent_message,
big_fish_asset_slot,
big_fish_runtime_run
}
};
}
macro_rules! collect_all_migration_tables {
($ctx:expr, $include_tables:expr, $tables:expr) => {
migration_tables!(collect_migration_table, $ctx, $include_tables, $tables);
};
}
macro_rules! collect_migration_table {
($ctx:expr, $include_tables:expr, $tables:expr, $($table:ident),+ $(,)?) => {
$(
if should_include_table($include_tables, stringify!($table)) {
let rows = $ctx
.db
.$table()
.iter()
.map(|row| row_to_json(&row))
.collect::<Result<Vec<_>, _>>()?;
$tables.push(MigrationTable {
name: stringify!($table).to_string(),
rows,
});
}
)+
};
}
macro_rules! clear_all_migration_tables {
($ctx:expr, $include_tables:expr) => {
migration_tables!(clear_migration_table, $ctx, $include_tables);
};
}
macro_rules! clear_migration_table {
($ctx:expr, $include_tables:expr, $($table:ident),+ $(,)?) => {
$(
if should_include_table($include_tables, stringify!($table)) {
for row in $ctx.db.$table().iter().collect::<Vec<_>>() {
$ctx.db.$table().delete(row);
}
}
)+
};
}
// 迁移权限独立存表,避免把 private 表导出能力开放给任意登录身份。
#[spacetimedb::procedure]
pub fn authorize_database_migration_operator(
ctx: &mut ProcedureContext,
input: DatabaseMigrationAuthorizeOperatorInput,
) -> DatabaseMigrationOperatorProcedureResult {
match authorize_database_migration_operator_inner(ctx, input) {
Ok(operator_identity_hex) => DatabaseMigrationOperatorProcedureResult {
ok: true,
operator_identity_hex: Some(operator_identity_hex),
error_message: None,
},
Err(error) => DatabaseMigrationOperatorProcedureResult {
ok: false,
operator_identity_hex: None,
error_message: Some(error),
},
}
}
#[spacetimedb::procedure]
pub fn revoke_database_migration_operator(
ctx: &mut ProcedureContext,
input: DatabaseMigrationRevokeOperatorInput,
) -> DatabaseMigrationOperatorProcedureResult {
match revoke_database_migration_operator_inner(ctx, input) {
Ok(operator_identity_hex) => DatabaseMigrationOperatorProcedureResult {
ok: true,
operator_identity_hex: Some(operator_identity_hex),
error_message: None,
},
Err(error) => DatabaseMigrationOperatorProcedureResult {
ok: false,
operator_identity_hex: None,
error_message: Some(error),
},
}
}
// 迁移导出走 procedure 返回 JSON 字符串,避免 reducer 无返回值且不能读取 private 表给外部。
#[spacetimedb::procedure]
pub fn export_database_migration_to_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationExportInput,
) -> DatabaseMigrationProcedureResult {
match export_database_migration_to_file_inner(ctx, input) {
Ok((migration_json, stats)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: Some(migration_json),
table_stats: stats,
error_message: None,
},
Err(error) => DatabaseMigrationProcedureResult {
ok: false,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
error_message: Some(error),
},
}
}
// 迁移导入由 Node 侧读文件后把 JSON 字符串传入procedure 只负责校验和写表事务。
#[spacetimedb::procedure]
pub fn import_database_migration_from_file(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_file_inner(ctx, input) {
Ok(stats) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
error_message: None,
},
Err(error) => DatabaseMigrationProcedureResult {
ok: false,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
error_message: Some(error),
},
}
}
fn export_database_migration_to_file_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationExportInput,
) -> Result<(String, Vec<DatabaseMigrationTableStat>), String> {
let caller = ctx.sender();
let included_tables = normalize_include_tables(&input.include_tables)?;
let exported_at_micros = ctx.timestamp.to_micros_since_unix_epoch();
let migration_file = ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
build_migration_file(tx, exported_at_micros, included_tables.as_ref())
})?;
let stats = build_export_stats(&migration_file.tables);
let content = serde_json::to_string_pretty(&migration_file)
.map_err(|error| format!("迁移文件序列化失败: {error}"))?;
Ok((content, stats))
}
fn import_database_migration_from_file_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportInput,
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
let caller = ctx.sender();
let included_tables = normalize_include_tables(&input.include_tables)?;
if input.migration_json.trim().is_empty() {
return Err("migration_json 不能为空".to_string());
}
ctx.try_with_tx(|tx| require_migration_operator(tx, caller))?;
let migration_file = serde_json::from_str::<MigrationFile>(&input.migration_json)
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
return Err(format!(
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
));
}
let stats = if input.dry_run {
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
} else {
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
apply_migration_file(
tx,
&migration_file,
included_tables.as_ref(),
input.replace_existing,
)
})?
};
Ok(stats)
}
fn authorize_database_migration_operator_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationAuthorizeOperatorInput,
) -> Result<String, String> {
let caller = ctx.sender();
let operator_identity = parse_migration_operator_identity(&input.operator_identity_hex)?;
let note = normalize_migration_operator_note(&input.note)?;
let bootstrap_secret = input.bootstrap_secret.trim().to_string();
ctx.try_with_tx(|tx| {
authorize_database_migration_operator_tx(
tx,
caller,
operator_identity,
&bootstrap_secret,
note.clone(),
)
})?;
Ok(operator_identity.to_hex().to_string())
}
fn revoke_database_migration_operator_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationRevokeOperatorInput,
) -> Result<String, String> {
let caller = ctx.sender();
let operator_identity = parse_migration_operator_identity(&input.operator_identity_hex)?;
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
if tx
.db
.database_migration_operator()
.operator_identity()
.find(&operator_identity)
.is_none()
{
return Err("迁移操作员不存在".to_string());
}
tx.db
.database_migration_operator()
.operator_identity()
.delete(&operator_identity);
Ok(())
})?;
Ok(operator_identity.to_hex().to_string())
}
fn authorize_database_migration_operator_tx(
ctx: &ReducerContext,
caller: Identity,
operator_identity: Identity,
bootstrap_secret: &str,
note: String,
) -> Result<(), String> {
let has_operator = ctx.db.database_migration_operator().iter().next().is_some();
if has_operator {
require_migration_operator(ctx, caller)?;
} else {
require_migration_bootstrap_secret(bootstrap_secret)?;
}
if ctx
.db
.database_migration_operator()
.operator_identity()
.find(&operator_identity)
.is_some()
{
ctx.db
.database_migration_operator()
.operator_identity()
.delete(&operator_identity);
}
ctx.db
.database_migration_operator()
.insert(DatabaseMigrationOperator {
operator_identity,
created_at: ctx.timestamp,
created_by: caller,
note,
});
Ok(())
}
fn require_migration_operator(ctx: &ReducerContext, caller: Identity) -> Result<(), String> {
if ctx
.db
.database_migration_operator()
.operator_identity()
.find(&caller)
.is_some()
{
Ok(())
} else {
Err("当前 identity 未被授权执行数据库迁移".to_string())
}
}
fn require_migration_bootstrap_secret(input: &str) -> Result<(), String> {
let configured_secret = MIGRATION_BOOTSTRAP_SECRET
.map(str::trim)
.filter(|secret| !secret.is_empty())
.ok_or_else(|| "迁移引导密钥未配置,无法创建首个操作员".to_string())?;
if configured_secret.chars().count() < MIGRATION_MIN_BOOTSTRAP_SECRET_LEN {
return Err("迁移引导密钥长度不足,至少需要 16 个字符".to_string());
}
if input != configured_secret {
return Err("迁移引导密钥不正确".to_string());
}
Ok(())
}
fn parse_migration_operator_identity(input: &str) -> Result<Identity, String> {
let identity_hex = input.trim().trim_start_matches("0x");
if identity_hex.len() != 64 {
return Err("operator_identity_hex 必须是 64 位十六进制 identity".to_string());
}
Identity::from_hex(identity_hex)
.map_err(|error| format!("operator_identity_hex 格式不合法: {error}"))
}
fn normalize_migration_operator_note(input: &str) -> Result<String, String> {
let note = input.trim();
if note.chars().count() > MIGRATION_MAX_OPERATOR_NOTE_CHARS {
return Err(format!(
"迁移操作员备注过长,最多 {} 个字符",
MIGRATION_MAX_OPERATOR_NOTE_CHARS
));
}
Ok(note.to_string())
}
fn normalize_include_tables(input: &[String]) -> Result<Option<HashSet<String>>, String> {
if input.is_empty() {
return Ok(None);
}
let mut tables = HashSet::new();
for raw_name in input {
let name = raw_name.trim();
if name.is_empty() {
continue;
}
if name.len() > MIGRATION_MAX_TABLE_NAME_LEN {
return Err(format!("迁移表名过长: {name}"));
}
if !is_supported_migration_table(name) {
return Err(format!("迁移表不在白名单内: {name}"));
}
tables.insert(name.to_string());
}
Ok(Some(tables))
}
fn should_include_table(include_tables: Option<&HashSet<String>>, table_name: &str) -> bool {
include_tables
.map(|tables| tables.contains(table_name))
.unwrap_or(true)
}
fn build_migration_file(
ctx: &ReducerContext,
exported_at_micros: i64,
include_tables: Option<&HashSet<String>>,
) -> Result<MigrationFile, String> {
let mut tables = Vec::new();
collect_all_migration_tables!(ctx, include_tables, tables);
Ok(MigrationFile {
schema_version: MIGRATION_SCHEMA_VERSION,
exported_at_micros,
tables,
})
}
fn build_export_stats(tables: &[MigrationTable]) -> Vec<DatabaseMigrationTableStat> {
tables
.iter()
.map(|table| DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: table.rows.len() as u64,
imported_row_count: 0,
skipped_row_count: 0,
})
.collect()
}
fn build_import_dry_run_stats(
tables: &[MigrationTable],
include_tables: Option<&HashSet<String>>,
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
let mut stats = Vec::new();
for table in tables {
if !is_supported_migration_table(&table.name) {
return Err(format!("迁移文件包含不支持的表: {}", table.name));
}
if should_include_table(include_tables, &table.name) {
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: table.rows.len() as u64,
skipped_row_count: 0,
});
} else {
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: 0,
skipped_row_count: table.rows.len() as u64,
});
}
}
Ok(stats)
}
fn apply_migration_file(
ctx: &ReducerContext,
migration_file: &MigrationFile,
include_tables: Option<&HashSet<String>>,
replace_existing: bool,
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
let mut stats = Vec::new();
for table in &migration_file.tables {
if !is_supported_migration_table(&table.name) {
return Err(format!("迁移文件包含不支持的表: {}", table.name));
}
}
if replace_existing {
clear_all_migration_tables!(ctx, include_tables);
}
for table in &migration_file.tables {
if !should_include_table(include_tables, &table.name) {
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count: 0,
skipped_row_count: table.rows.len() as u64,
});
continue;
}
let imported_row_count = insert_migration_table_rows(ctx, table)?;
stats.push(DatabaseMigrationTableStat {
table_name: table.name.clone(),
exported_row_count: 0,
imported_row_count,
skipped_row_count: 0,
});
}
Ok(stats)
}
fn row_to_json<T: spacetimedb::Serialize>(row: &T) -> Result<serde_json::Value, String> {
serde_json::to_value(SerializeWrapper::from_ref(row))
.map_err(|error| format!("迁移行序列化失败: {error}"))
}
fn row_from_json<T>(value: &serde_json::Value) -> Result<T, String>
where
T: for<'de> spacetimedb::Deserialize<'de>,
{
let wrapped: DeserializeWrapper<T> = serde_json::from_value(value.clone())
.map_err(|error| format!("迁移行反序列化失败: {error}"))?;
Ok(wrapped.0)
}
fn insert_migration_table_rows(
ctx: &ReducerContext,
table: &MigrationTable,
) -> Result<u64, String> {
macro_rules! insert_table_match_arm {
($($table:ident),+ $(,)?) => {
match table.name.as_str() {
$(
stringify!($table) => {
let mut imported = 0u64;
for value in &table.rows {
let row = row_from_json(value)
.map_err(|error| format!("{}: {error}", stringify!($table)))?;
ctx.db
.$table()
.try_insert(row)
.map_err(|error| format!("{} 导入失败: {error}", stringify!($table)))?;
imported = imported.saturating_add(1);
}
Ok(imported)
}
)+
_ => Err(format!("迁移表不在白名单内: {}", table.name)),
}
};
}
migration_tables!(insert_table_match_arm)
}
fn is_supported_migration_table(table_name: &str) -> bool {
macro_rules! supported_table_match {
($($table:ident),+ $(,)?) => {
matches!(
table_name,
$(stringify!($table))|+
)
};
}
migration_tables!(supported_table_match)
}