Merge branch 'master' of http://82.157.175.59:3000/GenarrativeAI/Genarrative
This commit is contained in:
@@ -10,6 +10,8 @@ use crate::puzzle::{
|
||||
|
||||
const MIGRATION_SCHEMA_VERSION: u32 = 1;
|
||||
const MIGRATION_MAX_TABLE_NAME_LEN: usize = 96;
|
||||
const MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN: usize = 128;
|
||||
const MIGRATION_MAX_IMPORT_CHUNK_BYTES: usize = 1024 * 1024;
|
||||
const MIGRATION_MAX_OPERATOR_NOTE_CHARS: usize = 160;
|
||||
const MIGRATION_MIN_BOOTSTRAP_SECRET_LEN: usize = 16;
|
||||
const MIGRATION_BOOTSTRAP_SECRET: Option<&str> =
|
||||
@@ -24,6 +26,21 @@ pub struct DatabaseMigrationOperator {
|
||||
pub note: String,
|
||||
}
|
||||
|
||||
#[spacetimedb::table(
|
||||
accessor = database_migration_import_chunk,
|
||||
index(accessor = by_database_migration_import_upload, btree(columns = [upload_id]))
|
||||
)]
|
||||
pub struct DatabaseMigrationImportChunk {
|
||||
#[primary_key]
|
||||
pub chunk_key: String,
|
||||
pub upload_id: String,
|
||||
pub chunk_index: u32,
|
||||
pub chunk_count: u32,
|
||||
pub operator_identity: Identity,
|
||||
pub created_at: Timestamp,
|
||||
pub chunk: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationExportInput {
|
||||
pub include_tables: Vec<String>,
|
||||
@@ -37,6 +54,27 @@ pub struct DatabaseMigrationImportInput {
|
||||
pub dry_run: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationImportChunkInput {
|
||||
pub upload_id: String,
|
||||
pub chunk_index: u32,
|
||||
pub chunk_count: u32,
|
||||
pub chunk: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationImportChunksInput {
|
||||
pub upload_id: String,
|
||||
pub include_tables: Vec<String>,
|
||||
pub replace_existing: bool,
|
||||
pub dry_run: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
|
||||
pub struct DatabaseMigrationImportChunksClearInput {
|
||||
pub upload_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum DatabaseMigrationImportMode {
|
||||
Strict,
|
||||
@@ -322,6 +360,76 @@ pub fn import_database_migration_incremental_from_file(
|
||||
}
|
||||
}
|
||||
|
||||
// 大迁移 JSON 先按分片写入私有临时表,避免单次 HTTP request body 触发 SpacetimeDB 413。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn put_database_migration_import_chunk(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunkInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match put_database_migration_import_chunk_inner(ctx, input) {
|
||||
Ok(()) => empty_database_migration_result(true, None),
|
||||
Err(error) => empty_database_migration_result(false, Some(error)),
|
||||
}
|
||||
}
|
||||
|
||||
// 分片提交保持与直接导入相同的严格追加语义;提交成功后清理临时分片。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn import_database_migration_from_chunks(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunksInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match import_database_migration_from_chunks_inner(
|
||||
ctx,
|
||||
input,
|
||||
DatabaseMigrationImportMode::Strict,
|
||||
) {
|
||||
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: stats,
|
||||
warnings,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => empty_database_migration_result(false, Some(error)),
|
||||
}
|
||||
}
|
||||
|
||||
// 分片增量提交只插入目标库缺失的行;主键或唯一约束冲突的行会跳过。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn import_database_migration_incremental_from_chunks(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunksInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match import_database_migration_from_chunks_inner(
|
||||
ctx,
|
||||
input,
|
||||
DatabaseMigrationImportMode::Incremental,
|
||||
) {
|
||||
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
|
||||
ok: true,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: stats,
|
||||
warnings,
|
||||
error_message: None,
|
||||
},
|
||||
Err(error) => empty_database_migration_result(false, Some(error)),
|
||||
}
|
||||
}
|
||||
|
||||
// 调用方上传失败或提交失败时可显式清理同一 upload_id 的临时分片。
|
||||
#[spacetimedb::procedure]
|
||||
pub fn clear_database_migration_import_chunks(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunksClearInput,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
match clear_database_migration_import_chunks_inner(ctx, input) {
|
||||
Ok(()) => empty_database_migration_result(true, None),
|
||||
Err(error) => empty_database_migration_result(false, Some(error)),
|
||||
}
|
||||
}
|
||||
|
||||
fn export_database_migration_to_file_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationExportInput,
|
||||
@@ -362,14 +470,7 @@ fn import_database_migration_from_file_inner(
|
||||
}
|
||||
ctx.try_with_tx(|tx| require_migration_operator(tx, caller))?;
|
||||
|
||||
let migration_file = serde_json::from_str::<MigrationFile>(&input.migration_json)
|
||||
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
|
||||
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
|
||||
return Err(format!(
|
||||
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
|
||||
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
|
||||
));
|
||||
}
|
||||
let migration_file = parse_migration_file(&input.migration_json)?;
|
||||
|
||||
let (stats, warnings) = if input.dry_run {
|
||||
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
|
||||
@@ -389,6 +490,158 @@ fn import_database_migration_from_file_inner(
|
||||
Ok((stats, warnings))
|
||||
}
|
||||
|
||||
fn put_database_migration_import_chunk_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunkInput,
|
||||
) -> Result<(), String> {
|
||||
let caller = ctx.sender();
|
||||
let upload_id = normalize_import_upload_id(&input.upload_id)?;
|
||||
if input.chunk_count == 0 {
|
||||
return Err("分片总数必须大于 0".to_string());
|
||||
}
|
||||
if input.chunk_index >= input.chunk_count {
|
||||
return Err(format!(
|
||||
"分片序号越界: {} / {}",
|
||||
input.chunk_index, input.chunk_count
|
||||
));
|
||||
}
|
||||
if input.chunk.is_empty() {
|
||||
return Err("迁移 JSON 分片不能为空".to_string());
|
||||
}
|
||||
if input.chunk.len() > MIGRATION_MAX_IMPORT_CHUNK_BYTES {
|
||||
return Err(format!(
|
||||
"迁移 JSON 分片过大,单片最多 {} bytes",
|
||||
MIGRATION_MAX_IMPORT_CHUNK_BYTES
|
||||
));
|
||||
}
|
||||
|
||||
let chunk_key = build_import_chunk_key(&upload_id, input.chunk_index);
|
||||
ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
if let Some(existing) = tx
|
||||
.db
|
||||
.database_migration_import_chunk()
|
||||
.chunk_key()
|
||||
.find(&chunk_key)
|
||||
{
|
||||
if existing.operator_identity != caller {
|
||||
return Err("同名迁移分片已由其他 identity 上传,已拒绝覆盖".to_string());
|
||||
}
|
||||
tx.db
|
||||
.database_migration_import_chunk()
|
||||
.chunk_key()
|
||||
.delete(&chunk_key);
|
||||
}
|
||||
tx.db
|
||||
.database_migration_import_chunk()
|
||||
.insert(DatabaseMigrationImportChunk {
|
||||
chunk_key: chunk_key.clone(),
|
||||
upload_id: upload_id.clone(),
|
||||
chunk_index: input.chunk_index,
|
||||
chunk_count: input.chunk_count,
|
||||
operator_identity: caller,
|
||||
created_at: tx.timestamp,
|
||||
chunk: input.chunk.clone(),
|
||||
});
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn import_database_migration_from_chunks_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunksInput,
|
||||
import_mode: DatabaseMigrationImportMode,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<DatabaseMigrationTableStat>,
|
||||
Vec<DatabaseMigrationWarning>,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
let caller = ctx.sender();
|
||||
let upload_id = normalize_import_upload_id(&input.upload_id)?;
|
||||
let included_tables = normalize_include_tables(&input.include_tables)?;
|
||||
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
|
||||
return Err("增量导入不能同时启用 replace_existing".to_string());
|
||||
}
|
||||
|
||||
let migration_json = ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
read_database_migration_import_chunks(tx, &upload_id, caller)
|
||||
})?;
|
||||
let migration_file = parse_migration_file(&migration_json)?;
|
||||
|
||||
let (stats, warnings) = if input.dry_run {
|
||||
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
|
||||
} else {
|
||||
ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
apply_migration_file(
|
||||
tx,
|
||||
&migration_file,
|
||||
included_tables.as_ref(),
|
||||
input.replace_existing,
|
||||
import_mode,
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
clear_database_migration_import_chunks_tx(tx, &upload_id);
|
||||
Ok::<(), String>(())
|
||||
})?;
|
||||
|
||||
Ok((stats, warnings))
|
||||
}
|
||||
|
||||
fn clear_database_migration_import_chunks_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationImportChunksClearInput,
|
||||
) -> Result<(), String> {
|
||||
let caller = ctx.sender();
|
||||
let upload_id = normalize_import_upload_id(&input.upload_id)?;
|
||||
ctx.try_with_tx(|tx| {
|
||||
require_migration_operator(tx, caller)?;
|
||||
clear_database_migration_import_chunks_tx(tx, &upload_id);
|
||||
Ok::<(), String>(())
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn empty_database_migration_result(
|
||||
ok: bool,
|
||||
error_message: Option<String>,
|
||||
) -> DatabaseMigrationProcedureResult {
|
||||
DatabaseMigrationProcedureResult {
|
||||
ok,
|
||||
schema_version: MIGRATION_SCHEMA_VERSION,
|
||||
migration_json: None,
|
||||
table_stats: Vec::new(),
|
||||
warnings: Vec::new(),
|
||||
error_message,
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_migration_file(migration_json: &str) -> Result<MigrationFile, String> {
|
||||
if migration_json.trim().is_empty() {
|
||||
return Err("migration_json 不能为空".to_string());
|
||||
}
|
||||
|
||||
let migration_file = serde_json::from_str::<MigrationFile>(migration_json)
|
||||
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
|
||||
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
|
||||
return Err(format!(
|
||||
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
|
||||
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
|
||||
));
|
||||
}
|
||||
|
||||
Ok(migration_file)
|
||||
}
|
||||
|
||||
fn authorize_database_migration_operator_inner(
|
||||
ctx: &mut ProcedureContext,
|
||||
input: DatabaseMigrationAuthorizeOperatorInput,
|
||||
@@ -530,6 +783,96 @@ fn normalize_migration_operator_note(input: &str) -> Result<String, String> {
|
||||
Ok(note.to_string())
|
||||
}
|
||||
|
||||
fn normalize_import_upload_id(input: &str) -> Result<String, String> {
|
||||
let upload_id = input.trim();
|
||||
if upload_id.is_empty() {
|
||||
return Err("upload_id 不能为空".to_string());
|
||||
}
|
||||
if upload_id.len() > MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN {
|
||||
return Err(format!(
|
||||
"upload_id 过长,最多 {} bytes",
|
||||
MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN
|
||||
));
|
||||
}
|
||||
if !upload_id
|
||||
.chars()
|
||||
.all(|character| character.is_ascii_alphanumeric() || matches!(character, '-' | '_'))
|
||||
{
|
||||
return Err("upload_id 只能使用 ASCII 字母、数字、短横线或下划线".to_string());
|
||||
}
|
||||
Ok(upload_id.to_string())
|
||||
}
|
||||
|
||||
fn build_import_chunk_key(upload_id: &str, chunk_index: u32) -> String {
|
||||
format!("{upload_id}:{chunk_index:010}")
|
||||
}
|
||||
|
||||
fn read_database_migration_import_chunks(
|
||||
ctx: &ReducerContext,
|
||||
upload_id: &str,
|
||||
caller: Identity,
|
||||
) -> Result<String, String> {
|
||||
let mut chunks = ctx
|
||||
.db
|
||||
.database_migration_import_chunk()
|
||||
.by_database_migration_import_upload()
|
||||
.filter(upload_id)
|
||||
.collect::<Vec<_>>();
|
||||
if chunks.is_empty() {
|
||||
return Err(format!("未找到迁移 JSON 分片: {upload_id}"));
|
||||
}
|
||||
if chunks.iter().any(|chunk| chunk.operator_identity != caller) {
|
||||
return Err("迁移 JSON 分片包含其他 identity 上传的片段,已拒绝提交".to_string());
|
||||
}
|
||||
|
||||
let chunk_count = chunks[0].chunk_count;
|
||||
if chunk_count == 0 {
|
||||
return Err("迁移 JSON 分片总数不合法".to_string());
|
||||
}
|
||||
if chunks
|
||||
.iter()
|
||||
.any(|chunk| chunk.chunk_count != chunk_count || chunk.upload_id != upload_id)
|
||||
{
|
||||
return Err("迁移 JSON 分片总数不一致".to_string());
|
||||
}
|
||||
if chunks.len() != chunk_count as usize {
|
||||
return Err(format!(
|
||||
"迁移 JSON 分片未上传完整,已收到 {} / {}",
|
||||
chunks.len(),
|
||||
chunk_count
|
||||
));
|
||||
}
|
||||
|
||||
chunks.sort_by_key(|chunk| chunk.chunk_index);
|
||||
let mut expected_index = 0u32;
|
||||
let mut migration_json = String::new();
|
||||
for chunk in chunks {
|
||||
if chunk.chunk_index != expected_index {
|
||||
return Err(format!("迁移 JSON 分片缺失序号: {expected_index}"));
|
||||
}
|
||||
migration_json.push_str(&chunk.chunk);
|
||||
expected_index = expected_index.saturating_add(1);
|
||||
}
|
||||
|
||||
Ok(migration_json)
|
||||
}
|
||||
|
||||
fn clear_database_migration_import_chunks_tx(ctx: &ReducerContext, upload_id: &str) {
|
||||
let chunk_keys = ctx
|
||||
.db
|
||||
.database_migration_import_chunk()
|
||||
.by_database_migration_import_upload()
|
||||
.filter(upload_id)
|
||||
.map(|chunk| chunk.chunk_key)
|
||||
.collect::<Vec<_>>();
|
||||
for chunk_key in chunk_keys {
|
||||
ctx.db
|
||||
.database_migration_import_chunk()
|
||||
.chunk_key()
|
||||
.delete(&chunk_key);
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_include_tables(input: &[String]) -> Result<Option<HashSet<String>>, String> {
|
||||
if input.is_empty() {
|
||||
return Ok(None);
|
||||
|
||||
Reference in New Issue
Block a user