Merge branch 'master' of http://82.157.175.59:3000/GenarrativeAI/Genarrative
Some checks failed
CI / verify (push) Has been cancelled
Some checks failed
CI / verify (push) Has been cancelled
This commit is contained in:
@@ -11,6 +11,7 @@ crate-type = ["cdylib"]
|
||||
log = { workspace = true }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
spacetimedb-lib = { version = "=2.1.0", default-features = false, features = ["serde"] }
|
||||
module-ai = { path = "../module-ai", default-features = false, features = ["spacetime-types"] }
|
||||
module-assets = { path = "../module-assets", default-features = false, features = ["spacetime-types"] }
|
||||
module-big-fish = { path = "../module-big-fish", default-features = false, features = ["spacetime-types"] }
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
mod assets;
|
||||
mod runtime;
|
||||
mod session;
|
||||
mod tables;
|
||||
|
||||
pub use assets::*;
|
||||
pub use runtime::*;
|
||||
pub use session::*;
|
||||
pub use tables::*;
|
||||
|
||||
@@ -1,198 +0,0 @@
|
||||
use crate::big_fish::tables::{big_fish_creation_session, big_fish_runtime_run};
|
||||
use crate::*;
|
||||
|
||||
#[spacetimedb::procedure]
|
||||
pub fn start_big_fish_run(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: BigFishRunStartInput,
|
||||
) -> BigFishRunProcedureResult {
|
||||
match ctx.try_with_tx(|tx| start_big_fish_run_tx(tx, input.clone())) {
|
||||
Ok(run) => BigFishRunProcedureResult {
|
||||
ok: true,
|
||||
run: Some(run),
|
||||
error_message: None,
|
||||
},
|
||||
Err(message) => BigFishRunProcedureResult {
|
||||
ok: false,
|
||||
run: None,
|
||||
error_message: Some(message),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[spacetimedb::procedure]
|
||||
pub fn submit_big_fish_input(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: BigFishRunInputSubmitInput,
|
||||
) -> BigFishRunProcedureResult {
|
||||
match ctx.try_with_tx(|tx| submit_big_fish_input_tx(tx, input.clone())) {
|
||||
Ok(run) => BigFishRunProcedureResult {
|
||||
ok: true,
|
||||
run: Some(run),
|
||||
error_message: None,
|
||||
},
|
||||
Err(message) => BigFishRunProcedureResult {
|
||||
ok: false,
|
||||
run: None,
|
||||
error_message: Some(message),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[spacetimedb::procedure]
|
||||
pub fn get_big_fish_run(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: BigFishRunGetInput,
|
||||
) -> BigFishRunProcedureResult {
|
||||
match ctx.try_with_tx(|tx| get_big_fish_run_tx(tx, input.clone())) {
|
||||
Ok(run) => BigFishRunProcedureResult {
|
||||
ok: true,
|
||||
run: Some(run),
|
||||
error_message: None,
|
||||
},
|
||||
Err(message) => BigFishRunProcedureResult {
|
||||
ok: false,
|
||||
run: None,
|
||||
error_message: Some(message),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn start_big_fish_run_tx(
|
||||
ctx: &ReducerContext,
|
||||
input: BigFishRunStartInput,
|
||||
) -> Result<BigFishRuntimeSnapshot, String> {
|
||||
validate_run_start_input(&input).map_err(|error| error.to_string())?;
|
||||
if ctx
|
||||
.db
|
||||
.big_fish_runtime_run()
|
||||
.run_id()
|
||||
.find(&input.run_id)
|
||||
.is_some()
|
||||
{
|
||||
return Err("big_fish_runtime_run.run_id 已存在".to_string());
|
||||
}
|
||||
let session = ctx
|
||||
.db
|
||||
.big_fish_creation_session()
|
||||
.session_id()
|
||||
.find(&input.session_id)
|
||||
.ok_or_else(|| "big_fish_creation_session 不存在".to_string())?;
|
||||
if session.owner_user_id != input.owner_user_id
|
||||
&& session.stage != BigFishCreationStage::Published
|
||||
{
|
||||
return Err("big_fish_creation_session 不存在".to_string());
|
||||
}
|
||||
let draft = session
|
||||
.draft_json
|
||||
.as_deref()
|
||||
.ok_or_else(|| "big_fish.draft 尚未编译".to_string())
|
||||
.and_then(|value| deserialize_draft(value).map_err(|error| error.to_string()))?;
|
||||
let snapshot = build_initial_runtime_snapshot(
|
||||
input.run_id.clone(),
|
||||
input.session_id.clone(),
|
||||
&draft,
|
||||
input.started_at_micros,
|
||||
);
|
||||
let now = Timestamp::from_micros_since_unix_epoch(input.started_at_micros);
|
||||
ctx.db.big_fish_runtime_run().insert(BigFishRuntimeRun {
|
||||
run_id: input.run_id,
|
||||
session_id: input.session_id,
|
||||
owner_user_id: input.owner_user_id,
|
||||
status: snapshot.status,
|
||||
snapshot_json: serialize_runtime_snapshot(&snapshot).map_err(|error| error.to_string())?,
|
||||
last_input_x: 0.0,
|
||||
last_input_y: 0.0,
|
||||
tick: snapshot.tick,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
});
|
||||
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
fn submit_big_fish_input_tx(
|
||||
ctx: &ReducerContext,
|
||||
input: BigFishRunInputSubmitInput,
|
||||
) -> Result<BigFishRuntimeSnapshot, String> {
|
||||
validate_run_input_submit_input(&input).map_err(|error| error.to_string())?;
|
||||
let run = ctx
|
||||
.db
|
||||
.big_fish_runtime_run()
|
||||
.run_id()
|
||||
.find(&input.run_id)
|
||||
.filter(|row| row.owner_user_id == input.owner_user_id)
|
||||
.ok_or_else(|| "big_fish_runtime_run 不存在".to_string())?;
|
||||
let session = ctx
|
||||
.db
|
||||
.big_fish_creation_session()
|
||||
.session_id()
|
||||
.find(&run.session_id)
|
||||
.ok_or_else(|| "big_fish_creation_session 不存在".to_string())?;
|
||||
if session.owner_user_id != input.owner_user_id
|
||||
&& session.stage != BigFishCreationStage::Published
|
||||
{
|
||||
return Err("big_fish_creation_session 不存在".to_string());
|
||||
}
|
||||
let draft = session
|
||||
.draft_json
|
||||
.as_deref()
|
||||
.ok_or_else(|| "big_fish.draft 尚未编译".to_string())
|
||||
.and_then(|value| deserialize_draft(value).map_err(|error| error.to_string()))?;
|
||||
let current_snapshot =
|
||||
deserialize_runtime_snapshot(&run.snapshot_json).map_err(|error| error.to_string())?;
|
||||
let next_snapshot = advance_runtime_snapshot(
|
||||
current_snapshot,
|
||||
&draft.runtime_params,
|
||||
input.input_x,
|
||||
input.input_y,
|
||||
input.submitted_at_micros,
|
||||
);
|
||||
replace_big_fish_run(
|
||||
ctx,
|
||||
&run,
|
||||
BigFishRuntimeRun {
|
||||
run_id: run.run_id.clone(),
|
||||
session_id: run.session_id.clone(),
|
||||
owner_user_id: run.owner_user_id.clone(),
|
||||
status: next_snapshot.status,
|
||||
snapshot_json: serialize_runtime_snapshot(&next_snapshot)
|
||||
.map_err(|error| error.to_string())?,
|
||||
last_input_x: input.input_x,
|
||||
last_input_y: input.input_y,
|
||||
tick: next_snapshot.tick,
|
||||
created_at: run.created_at,
|
||||
updated_at: Timestamp::from_micros_since_unix_epoch(input.submitted_at_micros),
|
||||
},
|
||||
);
|
||||
|
||||
Ok(next_snapshot)
|
||||
}
|
||||
|
||||
fn get_big_fish_run_tx(
|
||||
ctx: &ReducerContext,
|
||||
input: BigFishRunGetInput,
|
||||
) -> Result<BigFishRuntimeSnapshot, String> {
|
||||
validate_run_get_input(&input).map_err(|error| error.to_string())?;
|
||||
let run = ctx
|
||||
.db
|
||||
.big_fish_runtime_run()
|
||||
.run_id()
|
||||
.find(&input.run_id)
|
||||
.filter(|row| row.owner_user_id == input.owner_user_id)
|
||||
.ok_or_else(|| "big_fish_runtime_run 不存在".to_string())?;
|
||||
|
||||
deserialize_runtime_snapshot(&run.snapshot_json).map_err(|error| error.to_string())
|
||||
}
|
||||
|
||||
fn replace_big_fish_run(
|
||||
ctx: &ReducerContext,
|
||||
current: &BigFishRuntimeRun,
|
||||
next: BigFishRuntimeRun,
|
||||
) {
|
||||
ctx.db
|
||||
.big_fish_runtime_run()
|
||||
.run_id()
|
||||
.delete(¤t.run_id);
|
||||
ctx.db.big_fish_runtime_run().insert(next);
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
use crate::big_fish::tables::{big_fish_agent_message, big_fish_creation_session};
|
||||
use crate::*;
|
||||
|
||||
const INITIAL_BIG_FISH_CREATION_PROGRESS_PERCENT: u32 = 0;
|
||||
|
||||
#[spacetimedb::procedure]
|
||||
pub fn create_big_fish_session(
|
||||
ctx: &mut ProcedureContext,
|
||||
@@ -182,7 +184,8 @@ pub(crate) fn create_big_fish_session_tx(
|
||||
owner_user_id: input.owner_user_id.clone(),
|
||||
seed_text: input.seed_text.trim().to_string(),
|
||||
current_turn: 0,
|
||||
progress_percent: 20,
|
||||
// 中文注释:欢迎语和初始锚点只建立工作台上下文,不能提前抬高创作进度。
|
||||
progress_percent: INITIAL_BIG_FISH_CREATION_PROGRESS_PERCENT,
|
||||
stage: BigFishCreationStage::CollectingAnchors,
|
||||
anchor_pack_json: serialize_anchor_pack(&anchor_pack)
|
||||
.map_err(|error| error.to_string())?,
|
||||
@@ -292,7 +295,7 @@ pub(crate) fn delete_big_fish_work_tx(
|
||||
.filter(|row| row.owner_user_id == input.owner_user_id)
|
||||
.ok_or_else(|| "big_fish_creation_session 不存在".to_string())?;
|
||||
|
||||
// 删除作品时同步清理 Agent 消息、素材槽与运行快照,避免创作页消失后残留孤儿数据。
|
||||
// 删除作品时同步清理 Agent 消息与素材槽;最终游玩模拟已经迁到前端,不再写后端运行快照。
|
||||
ctx.db
|
||||
.big_fish_creation_session()
|
||||
.session_id()
|
||||
@@ -318,18 +321,6 @@ pub(crate) fn delete_big_fish_work_tx(
|
||||
{
|
||||
ctx.db.big_fish_asset_slot().slot_id().delete(&slot.slot_id);
|
||||
}
|
||||
for run in ctx
|
||||
.db
|
||||
.big_fish_runtime_run()
|
||||
.iter()
|
||||
.filter(|row| {
|
||||
row.session_id == input.session_id && row.owner_user_id == input.owner_user_id
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
ctx.db.big_fish_runtime_run().run_id().delete(&run.run_id);
|
||||
}
|
||||
|
||||
list_big_fish_works_tx(
|
||||
ctx,
|
||||
BigFishWorksListInput {
|
||||
@@ -707,6 +698,11 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn initial_big_fish_creation_progress_starts_from_zero() {
|
||||
assert_eq!(INITIAL_BIG_FISH_CREATION_PROGRESS_PERCENT, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn big_fish_direct_work_content_ignores_empty_created_session() {
|
||||
let empty_session =
|
||||
|
||||
@@ -51,22 +51,3 @@ pub struct BigFishAssetSlot {
|
||||
pub(crate) prompt_snapshot: String,
|
||||
pub(crate) updated_at: Timestamp,
|
||||
}
|
||||
|
||||
#[spacetimedb::table(
|
||||
accessor = big_fish_runtime_run,
|
||||
index(accessor = by_big_fish_run_owner_user_id, btree(columns = [owner_user_id])),
|
||||
index(accessor = by_big_fish_run_session_id, btree(columns = [session_id]))
|
||||
)]
|
||||
pub struct BigFishRuntimeRun {
|
||||
#[primary_key]
|
||||
pub(crate) run_id: String,
|
||||
pub(crate) session_id: String,
|
||||
pub(crate) owner_user_id: String,
|
||||
pub(crate) status: BigFishRunStatus,
|
||||
pub(crate) snapshot_json: String,
|
||||
pub(crate) last_input_x: f32,
|
||||
pub(crate) last_input_y: f32,
|
||||
pub(crate) tick: u64,
|
||||
pub(crate) created_at: Timestamp,
|
||||
pub(crate) updated_at: Timestamp,
|
||||
}
|
||||
|
||||
@@ -20,7 +20,9 @@ use module_quest::{
|
||||
};
|
||||
pub(crate) use serde_json::{Map as JsonMap, Value as JsonValue, json};
|
||||
pub(crate) use shared_kernel::format_timestamp_micros;
|
||||
pub use spacetimedb::{ProcedureContext, ReducerContext, SpacetimeType, Table, Timestamp};
|
||||
pub use spacetimedb::{
|
||||
Identity, ProcedureContext, ReducerContext, SpacetimeType, Table, Timestamp,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
|
||||
mod ai;
|
||||
@@ -29,6 +31,7 @@ mod auth;
|
||||
mod big_fish;
|
||||
mod domain_types;
|
||||
mod entry;
|
||||
mod migration;
|
||||
mod puzzle;
|
||||
mod runtime;
|
||||
|
||||
@@ -38,6 +41,7 @@ pub use auth::*;
|
||||
pub use big_fish::*;
|
||||
pub use domain_types::*;
|
||||
pub use entry::*;
|
||||
pub use migration::*;
|
||||
pub use runtime::*;
|
||||
|
||||
#[spacetimedb::table(accessor = player_progression)]
|
||||
|
||||
713
server-rs/crates/spacetime-module/src/migration.rs
Normal file
713
server-rs/crates/spacetime-module/src/migration.rs
Normal file
@@ -0,0 +1,713 @@
|
||||
use crate::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use spacetimedb_lib::sats::de::serde::DeserializeWrapper;
|
||||
use spacetimedb_lib::sats::ser::serde::SerializeWrapper;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::puzzle::{
|
||||
puzzle_agent_message, puzzle_agent_session, puzzle_runtime_run, puzzle_work_profile,
|
||||
};
|
||||
|
||||
const MIGRATION_SCHEMA_VERSION: u32 = 1;
|
||||
const MIGRATION_MAX_TABLE_NAME_LEN: usize = 96;
|
||||
const MIGRATION_MAX_OPERATOR_NOTE_CHARS: usize = 160;
|
||||
const MIGRATION_MIN_BOOTSTRAP_SECRET_LEN: usize = 16;
|
||||
const MIGRATION_BOOTSTRAP_SECRET: Option<&str> =
|
||||
option_env!("GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET");
|
||||
|
||||
#[spacetimedb::table(accessor = database_migration_operator)]
|
||||
pub struct DatabaseMigrationOperator {
|
||||
#[primary_key]
|
||||
pub operator_identity: Identity,
|
||||
pub created_at: Timestamp,
|
||||
pub created_by: Identity,
|
||||
pub note: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationExportInput {
|
||||
pub include_tables: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationImportInput {
|
||||
pub migration_json: String,
|
||||
pub include_tables: Vec<String>,
|
||||
pub replace_existing: bool,
|
||||
pub dry_run: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum DatabaseMigrationImportMode {
|
||||
Strict,
|
||||
Incremental,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationAuthorizeOperatorInput {
|
||||
pub bootstrap_secret: String,
|
||||
pub operator_identity_hex: String,
|
||||
pub note: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationRevokeOperatorInput {
|
||||
pub operator_identity_hex: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationTableStat {
|
||||
pub table_name: String,
|
||||
pub exported_row_count: u64,
|
||||
pub imported_row_count: u64,
|
||||
pub skipped_row_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationProcedureResult {
|
||||
pub ok: bool,
|
||||
pub schema_version: u32,
|
||||
pub migration_json: Option<String>,
|
||||
pub table_stats: Vec<DatabaseMigrationTableStat>,
|
||||
pub error_message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationOperatorProcedureResult {
|
||||
pub ok: bool,
|
||||
pub operator_identity_hex: Option<String>,
|
||||
pub error_message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct MigrationFile {
|
||||
schema_version: u32,
|
||||
exported_at_micros: i64,
|
||||
tables: Vec<MigrationTable>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct MigrationTable {
|
||||
name: String,
|
||||
rows: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
macro_rules! migration_tables {
|
||||
($macro_name:ident $(, $arg:expr)* $(,)?) => {
|
||||
$macro_name! {
|
||||
$($arg,)*
|
||||
auth_store_snapshot,
|
||||
user_account,
|
||||
auth_identity,
|
||||
refresh_session,
|
||||
ai_task,
|
||||
ai_task_stage,
|
||||
ai_text_chunk,
|
||||
ai_result_reference,
|
||||
runtime_snapshot,
|
||||
runtime_setting,
|
||||
user_browse_history,
|
||||
profile_dashboard_state,
|
||||
profile_wallet_ledger,
|
||||
profile_invite_code,
|
||||
profile_referral_relation,
|
||||
profile_played_world,
|
||||
profile_membership,
|
||||
profile_recharge_order,
|
||||
profile_save_archive,
|
||||
player_progression,
|
||||
chapter_progression,
|
||||
npc_state,
|
||||
story_session,
|
||||
story_event,
|
||||
inventory_slot,
|
||||
battle_state,
|
||||
treasure_record,
|
||||
quest_record,
|
||||
quest_log,
|
||||
custom_world_profile,
|
||||
custom_world_session,
|
||||
custom_world_agent_session,
|
||||
custom_world_agent_message,
|
||||
custom_world_agent_operation,
|
||||
custom_world_draft_card,
|
||||
custom_world_gallery_entry,
|
||||
asset_object,
|
||||
asset_entity_binding,
|
||||
puzzle_agent_session,
|
||||
puzzle_agent_message,
|
||||
puzzle_work_profile,
|
||||
puzzle_runtime_run,
|
||||
big_fish_creation_session,
|
||||
big_fish_agent_message,
|
||||
big_fish_asset_slot
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! collect_all_migration_tables {
|
||||
($ctx:expr, $include_tables:expr, $tables:expr) => {
|
||||
migration_tables!(collect_migration_table, $ctx, $include_tables, $tables);
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! collect_migration_table {
|
||||
($ctx:expr, $include_tables:expr, $tables:expr, $($table:ident),+ $(,)?) => {
|
||||
$(
|
||||
if should_include_table($include_tables, stringify!($table)) {
|
||||
let rows = $ctx
|
||||
.db
|
||||
.$table()
|
||||
.iter()
|
||||
.map(|row| row_to_json(&row))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
$tables.push(MigrationTable {
|
||||
name: stringify!($table).to_string(),
|
||||
rows,
|
||||
});
|
||||
}
|
||||
)+
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! clear_all_migration_tables {
|
||||
($ctx:expr, $include_tables:expr) => {
|
||||
migration_tables!(clear_migration_table, $ctx, $include_tables);
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! clear_migration_table {
|
||||
($ctx:expr, $include_tables:expr, $($table:ident),+ $(,)?) => {
|
||||
$(
|
||||
if should_include_table($include_tables, stringify!($table)) {
|
||||
for row in $ctx.db.$table().iter().collect::<Vec<_>>() {
|
||||
$ctx.db.$table().delete(row);
|
||||
}
|
||||
}
|
||||
)+
|
||||
};
|
||||
}
|
||||
|
||||
// 迁移权限独立存表,避免把 private 表导出能力开放给任意登录身份。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn authorize_database_migration_operator(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationAuthorizeOperatorInput,
|
||||
) -> DatabaseMigrationOperatorProcedureResult {
|
||||
match authorize_database_migration_operator_inner(ctx, input) {
|
||||
Ok(operator_identity_hex) => DatabaseMigrationOperatorProcedureResult {
|
||||
ok: true,
|
||||
operator_identity_hex: Some(operator_identity_hex),
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationOperatorProcedureResult {
|
||||
ok: false,
|
||||
operator_identity_hex: None,
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[spacetimedb::procedure]
|
||||
pub fn revoke_database_migration_operator(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationRevokeOperatorInput,
|
||||
) -> DatabaseMigrationOperatorProcedureResult {
|
||||
match revoke_database_migration_operator_inner(ctx, input) {
|
||||
Ok(operator_identity_hex) => DatabaseMigrationOperatorProcedureResult {
|
||||
ok: true,
|
||||
operator_identity_hex: Some(operator_identity_hex),
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationOperatorProcedureResult {
|
||||
ok: false,
|
||||
operator_identity_hex: None,
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// 迁移导出走 procedure 返回 JSON 字符串,避免 reducer 无返回值且不能读取 private 表给外部。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn export_database_migration_to_file(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationExportInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match export_database_migration_to_file_inner(ctx, input) {
|
||||
Ok((migration_json, stats)) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: Some(migration_json),
|
||||
table_stats: stats,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationProcedureResult {
|
||||
ok: false,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// 迁移导入由 Node 侧读文件后把 JSON 字符串传入,procedure 只负责校验和写表事务。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn import_database_migration_from_file(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match import_database_migration_from_file_inner(ctx, input, DatabaseMigrationImportMode::Strict)
|
||||
{
|
||||
Ok(stats) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: stats,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationProcedureResult {
|
||||
ok: false,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// 增量导入只插入目标库缺失的行;主键或唯一约束冲突的行会跳过,不更新已有数据。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn import_database_migration_incremental_from_file(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match import_database_migration_from_file_inner(
|
||||
ctx,
|
||||
input,
|
||||
DatabaseMigrationImportMode::Incremental,
|
||||
) {
|
||||
Ok(stats) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: stats,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => DatabaseMigrationProcedureResult {
|
||||
ok: false,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
error_message: Some(error),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn export_database_migration_to_file_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationExportInput,
|
||||
) -> Result<(String, Vec<DatabaseMigrationTableStat>), String> {
|
||||
let caller = ctx.sender();
|
||||
let included_tables = normalize_include_tables(&input.include_tables)?;
|
||||
let exported_at_micros = ctx.timestamp.to_micros_since_unix_epoch();
|
||||
|
||||
let migration_file = ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
build_migration_file(tx, exported_at_micros, included_tables.as_ref())
|
||||
})?;
|
||||
let stats = build_export_stats(&migration_file.tables);
|
||||
let content = serde_json::to_string_pretty(&migration_file)
|
||||
.map_err(|error| format!("迁移文件序列化失败: {error}"))?;
|
||||
|
||||
Ok((content, stats))
|
||||
}
|
||||
|
||||
fn import_database_migration_from_file_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportInput,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
||||
let caller = ctx.sender();
|
||||
let included_tables = normalize_include_tables(&input.include_tables)?;
|
||||
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
|
||||
return Err("增量导入不能同时启用 replace_existing".to_string());
|
||||
}
|
||||
if input.migration_json.trim().is_empty() {
|
||||
return Err("migration_json 不能为空".to_string());
|
||||
}
|
||||
ctx.try_with_tx(|tx| require_migration_operator(tx, caller))?;
|
||||
|
||||
let migration_file = serde_json::from_str::<MigrationFile>(&input.migration_json)
|
||||
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
|
||||
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
|
||||
return Err(format!(
|
||||
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
|
||||
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
|
||||
));
|
||||
}
|
||||
|
||||
let stats = if input.dry_run {
|
||||
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
|
||||
} else {
|
||||
ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
apply_migration_file(
|
||||
tx,
|
||||
&migration_file,
|
||||
included_tables.as_ref(),
|
||||
input.replace_existing,
|
||||
import_mode,
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
fn authorize_database_migration_operator_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationAuthorizeOperatorInput,
|
||||
) -> Result<String, String> {
|
||||
let caller = ctx.sender();
|
||||
let operator_identity = parse_migration_operator_identity(&input.operator_identity_hex)?;
|
||||
let note = normalize_migration_operator_note(&input.note)?;
|
||||
let bootstrap_secret = input.bootstrap_secret.trim().to_string();
|
||||
|
||||
ctx.try_with_tx(|tx| {
|
||||
authorize_database_migration_operator_tx(
|
||||
tx,
|
||||
caller,
|
||||
operator_identity,
|
||||
&bootstrap_secret,
|
||||
note.clone(),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(operator_identity.to_hex().to_string())
|
||||
}
|
||||
|
||||
fn revoke_database_migration_operator_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationRevokeOperatorInput,
|
||||
) -> Result<String, String> {
|
||||
let caller = ctx.sender();
|
||||
let operator_identity = parse_migration_operator_identity(&input.operator_identity_hex)?;
|
||||
|
||||
ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
if tx
|
||||
.db
|
||||
.database_migration_operator()
|
||||
.operator_identity()
|
||||
.find(&operator_identity)
|
||||
.is_none()
|
||||
{
|
||||
return Err("迁移操作员不存在".to_string());
|
||||
}
|
||||
tx.db
|
||||
.database_migration_operator()
|
||||
.operator_identity()
|
||||
.delete(&operator_identity);
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(operator_identity.to_hex().to_string())
|
||||
}
|
||||
|
||||
fn authorize_database_migration_operator_tx(
|
||||
ctx: &ReducerContext,
|
||||
caller: Identity,
|
||||
operator_identity: Identity,
|
||||
bootstrap_secret: &str,
|
||||
note: String,
|
||||
) -> Result<(), String> {
|
||||
let has_operator = ctx.db.database_migration_operator().iter().next().is_some();
|
||||
if has_operator {
|
||||
require_migration_operator(ctx, caller)?;
|
||||
} else {
|
||||
require_migration_bootstrap_secret(bootstrap_secret)?;
|
||||
}
|
||||
|
||||
if ctx
|
||||
.db
|
||||
.database_migration_operator()
|
||||
.operator_identity()
|
||||
.find(&operator_identity)
|
||||
.is_some()
|
||||
{
|
||||
ctx.db
|
||||
.database_migration_operator()
|
||||
.operator_identity()
|
||||
.delete(&operator_identity);
|
||||
}
|
||||
|
||||
ctx.db
|
||||
.database_migration_operator()
|
||||
.insert(DatabaseMigrationOperator {
|
||||
operator_identity,
|
||||
created_at: ctx.timestamp,
|
||||
created_by: caller,
|
||||
note,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn require_migration_operator(ctx: &ReducerContext, caller: Identity) -> Result<(), String> {
|
||||
if ctx
|
||||
.db
|
||||
.database_migration_operator()
|
||||
.operator_identity()
|
||||
.find(&caller)
|
||||
.is_some()
|
||||
{
|
||||
Ok(())
|
||||
} else {
|
||||
Err("当前 identity 未被授权执行数据库迁移".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn require_migration_bootstrap_secret(input: &str) -> Result<(), String> {
|
||||
let configured_secret = MIGRATION_BOOTSTRAP_SECRET
|
||||
.map(str::trim)
|
||||
.filter(|secret| !secret.is_empty())
|
||||
.ok_or_else(|| "迁移引导密钥未配置,无法创建首个操作员".to_string())?;
|
||||
|
||||
if configured_secret.chars().count() < MIGRATION_MIN_BOOTSTRAP_SECRET_LEN {
|
||||
return Err("迁移引导密钥长度不足,至少需要 16 个字符".to_string());
|
||||
}
|
||||
if input != configured_secret {
|
||||
return Err("迁移引导密钥不正确".to_string());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_migration_operator_identity(input: &str) -> Result<Identity, String> {
|
||||
let identity_hex = input.trim().trim_start_matches("0x");
|
||||
if identity_hex.len() != 64 {
|
||||
return Err("operator_identity_hex 必须是 64 位十六进制 identity".to_string());
|
||||
}
|
||||
|
||||
Identity::from_hex(identity_hex)
|
||||
.map_err(|error| format!("operator_identity_hex 格式不合法: {error}"))
|
||||
}
|
||||
|
||||
fn normalize_migration_operator_note(input: &str) -> Result<String, String> {
|
||||
let note = input.trim();
|
||||
if note.chars().count() > MIGRATION_MAX_OPERATOR_NOTE_CHARS {
|
||||
return Err(format!(
|
||||
"迁移操作员备注过长,最多 {} 个字符",
|
||||
MIGRATION_MAX_OPERATOR_NOTE_CHARS
|
||||
));
|
||||
}
|
||||
|
||||
Ok(note.to_string())
|
||||
}
|
||||
|
||||
fn normalize_include_tables(input: &[String]) -> Result<Option<HashSet<String>>, String> {
|
||||
if input.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut tables = HashSet::new();
|
||||
for raw_name in input {
|
||||
let name = raw_name.trim();
|
||||
if name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if name.len() > MIGRATION_MAX_TABLE_NAME_LEN {
|
||||
return Err(format!("迁移表名过长: {name}"));
|
||||
}
|
||||
if !is_supported_migration_table(name) {
|
||||
return Err(format!("迁移表不在白名单内: {name}"));
|
||||
}
|
||||
tables.insert(name.to_string());
|
||||
}
|
||||
Ok(Some(tables))
|
||||
}
|
||||
|
||||
fn should_include_table(include_tables: Option<&HashSet<String>>, table_name: &str) -> bool {
|
||||
include_tables
|
||||
.map(|tables| tables.contains(table_name))
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
fn build_migration_file(
|
||||
ctx: &ReducerContext,
|
||||
exported_at_micros: i64,
|
||||
include_tables: Option<&HashSet<String>>,
|
||||
) -> Result<MigrationFile, String> {
|
||||
let mut tables = Vec::new();
|
||||
collect_all_migration_tables!(ctx, include_tables, tables);
|
||||
|
||||
Ok(MigrationFile {
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
exported_at_micros,
|
||||
tables,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_export_stats(tables: &[MigrationTable]) -> Vec<DatabaseMigrationTableStat> {
|
||||
tables
|
||||
.iter()
|
||||
.map(|table| DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: table.rows.len() as u64,
|
||||
imported_row_count: 0,
|
||||
skipped_row_count: 0,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn build_import_dry_run_stats(
|
||||
tables: &[MigrationTable],
|
||||
include_tables: Option<&HashSet<String>>,
|
||||
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
||||
let mut stats = Vec::new();
|
||||
for table in tables {
|
||||
if !is_supported_migration_table(&table.name) {
|
||||
return Err(format!("迁移文件包含不支持的表: {}", table.name));
|
||||
}
|
||||
if should_include_table(include_tables, &table.name) {
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
imported_row_count: table.rows.len() as u64,
|
||||
skipped_row_count: 0,
|
||||
});
|
||||
} else {
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
imported_row_count: 0,
|
||||
skipped_row_count: table.rows.len() as u64,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
fn apply_migration_file(
|
||||
ctx: &ReducerContext,
|
||||
migration_file: &MigrationFile,
|
||||
include_tables: Option<&HashSet<String>>,
|
||||
replace_existing: bool,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
) -> Result<Vec<DatabaseMigrationTableStat>, String> {
|
||||
let mut stats = Vec::new();
|
||||
for table in &migration_file.tables {
|
||||
if !is_supported_migration_table(&table.name) {
|
||||
return Err(format!("迁移文件包含不支持的表: {}", table.name));
|
||||
}
|
||||
}
|
||||
|
||||
let import_table_names = build_import_table_name_set(migration_file, include_tables);
|
||||
if replace_existing {
|
||||
// replace_existing 只覆盖本次迁移文件实际会导入的表,避免分批导入时误清空其它迁移白名单表。
|
||||
clear_all_migration_tables!(ctx, Some(&import_table_names));
|
||||
}
|
||||
|
||||
for table in &migration_file.tables {
|
||||
if !should_include_table(include_tables, &table.name) {
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
imported_row_count: 0,
|
||||
skipped_row_count: table.rows.len() as u64,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
let (imported_row_count, skipped_row_count) =
|
||||
insert_migration_table_rows(ctx, table, import_mode)?;
|
||||
stats.push(DatabaseMigrationTableStat {
|
||||
table_name: table.name.clone(),
|
||||
exported_row_count: 0,
|
||||
imported_row_count,
|
||||
skipped_row_count,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
fn build_import_table_name_set(
|
||||
migration_file: &MigrationFile,
|
||||
include_tables: Option<&HashSet<String>>,
|
||||
) -> HashSet<String> {
|
||||
migration_file
|
||||
.tables
|
||||
.iter()
|
||||
.filter(|table| should_include_table(include_tables, &table.name))
|
||||
.map(|table| table.name.clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn row_to_json<T: spacetimedb::Serialize>(row: &T) -> Result<serde_json::Value, String> {
|
||||
serde_json::to_value(SerializeWrapper::from_ref(row))
|
||||
.map_err(|error| format!("迁移行序列化失败: {error}"))
|
||||
}
|
||||
|
||||
fn row_from_json<T>(value: &serde_json::Value) -> Result<T, String>
|
||||
where
|
||||
T: for<'de> spacetimedb::Deserialize<'de>,
|
||||
{
|
||||
let wrapped: DeserializeWrapper<T> = serde_json::from_value(value.clone())
|
||||
.map_err(|error| format!("迁移行反序列化失败: {error}"))?;
|
||||
Ok(wrapped.0)
|
||||
}
|
||||
|
||||
fn insert_migration_table_rows(
|
||||
ctx: &ReducerContext,
|
||||
table: &MigrationTable,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
) -> Result<(u64, u64), String> {
|
||||
macro_rules! insert_table_match_arm {
|
||||
($($table:ident),+ $(,)?) => {
|
||||
match table.name.as_str() {
|
||||
$(
|
||||
stringify!($table) => {
|
||||
let mut imported = 0u64;
|
||||
let mut skipped = 0u64;
|
||||
for value in &table.rows {
|
||||
let row = row_from_json(value)
|
||||
.map_err(|error| format!("{}: {error}", stringify!($table)))?;
|
||||
let insert_result = ctx.db
|
||||
.$table()
|
||||
.try_insert(row);
|
||||
match insert_result {
|
||||
Ok(_) => imported = imported.saturating_add(1),
|
||||
Err(error) => {
|
||||
if import_mode == DatabaseMigrationImportMode::Incremental {
|
||||
skipped = skipped.saturating_add(1);
|
||||
} else {
|
||||
return Err(format!("{} 导入失败: {error}", stringify!($table)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((imported, skipped))
|
||||
}
|
||||
)+
|
||||
_ => Err(format!("迁移表不在白名单内: {}", table.name)),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
migration_tables!(insert_table_match_arm)
|
||||
}
|
||||
|
||||
fn is_supported_migration_table(table_name: &str) -> bool {
|
||||
macro_rules! supported_table_match {
|
||||
($($table:ident),+ $(,)?) => {
|
||||
matches!(
|
||||
table_name,
|
||||
$(stringify!($table))|+
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
migration_tables!(supported_table_match)
|
||||
}
|
||||
Reference in New Issue
Block a user