Handle SpacetimeDB migration imports with chunked uploads
Some checks failed
CI / verify (push) Has been cancelled

This commit is contained in:
2026-04-30 15:20:49 +08:00
parent 1ccb8a710d
commit 22f3f963de
9 changed files with 567 additions and 20 deletions

View File

@@ -13,7 +13,7 @@
重点补充RPG 创作与运行时脚本职责地图见 [RPG_CREATION_AND_RUNTIME_SCRIPT_RESPONSIBILITY_MAP_2026-04-28.md](./reference/RPG_CREATION_AND_RUNTIME_SCRIPT_RESPONSIBILITY_MAP_2026-04-28.md)。
- [PRD](./prd):产品需求与阶段计划;新增 RPG 开场动画方案见 [AI_NATIVE_RPG_OPENING_ANIMATION_PRD_2026-04-25.md](./prd/AI_NATIVE_RPG_OPENING_ANIMATION_PRD_2026-04-25.md)。
SpacetimeDB 表结构变更、自动迁移边界和保留旧数据的分阶段迁移流程见 [SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md](./technical/SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md)。
SpacetimeDB 表结构变更、自动迁移边界和保留旧数据的分阶段迁移流程见 [SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md](./technical/SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md)private 表迁移 JSON 导入导出、HTTP 413 分片导入和 Jenkins 数据库迁移流水线见 [SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md](./technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md) 与 [JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md](./technical/JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md)
## 推荐阅读顺序

View File

@@ -72,12 +72,13 @@ Genarrative-Database-Import
1. `INPUT_FILE`:必填,迁移 JSON 文件路径。
2. `DATABASE``SERVER``SERVER_URL``DEPLOY_DIRECTORY``ROOT_DIR`:与导出流水线一致。
3. `INCLUDE_TABLES`:可选,只导入指定表。
4. `DRY_RUN`:默认 `true`,只校验不写入
5. `INCREMENTAL`:默认 `true`跳过已存在或冲突的行
6. `REPLACE_EXISTING`:默认 `false`,只覆盖本次迁移文件中涉及的表;不可与 `INCREMENTAL` 同时启用
7. `BOOTSTRAP_SECRET`:可选,用于授权临时 Web API identity
8. `TOKEN`可选SpacetimeDB 客户端连接 token留空时脚本会自动创建临时 identity 并在结束后撤销
9. `NOTE`:迁移授权备注
4. `CHUNK_SIZE`:迁移 JSON 分片大小,默认 `524288` bytes。导入脚本会在文件超过该大小或直接导入触发 HTTP 413 时自动分片上传
5. `DRY_RUN`:默认 `true`只校验不写入
6. `INCREMENTAL`:默认 `true`,跳过已存在或冲突的行
7. `REPLACE_EXISTING`:默认 `false`,只覆盖本次迁移文件中涉及的表;不可与 `INCREMENTAL` 同时启用
8. `BOOTSTRAP_SECRET`:可选,用于授权临时 Web API identity。
9. `TOKEN`可选SpacetimeDB 客户端连接 token留空时脚本会自动创建临时 identity 并在结束后撤销
10. `NOTE`:迁移授权备注。
## 4. 安全边界
@@ -85,6 +86,7 @@ Genarrative-Database-Import
2. `INCREMENTAL``REPLACE_EXISTING` 互斥Jenkinsfile 会在执行前阻止同时启用。
3. Jenkinsfile 不打印 token生产环境应通过 Jenkins 凭据或目标机器环境变量传入敏感值。
4. 如果不传 `TOKEN`,导入脚本会创建临时 Web API identity并调用迁移授权/撤销 procedure 收敛权限窗口。
5. 如果日志出现 `SpacetimeDB HTTP 413: Failed to buffer the request body: length limit exceeded`,优先把 `CHUNK_SIZE` 调低到 `262144` 或更小后重跑。该参数只降低单次 HTTP body不改变导入表范围。
## 5. 本地部署测试参数

View File

@@ -5,6 +5,8 @@
## 文档列表
- [SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md](./SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md):冻结 SpacetimeDB 表结构变更约束、自动迁移可接受范围、冲突后的系统行为,以及保留旧数据的增量迁移流程;凡涉及 `spacetime publish`、表字段调整或 `migration.rs` 对齐时优先参考。
- [SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md](./SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md):记录 SpacetimeDB private 表迁移 JSON 导出/导入 procedure、迁移操作员授权、HTTP 413 分片导入、Jenkins 自动迁移回灌和导入脚本参数。
- [JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md](./JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md):记录 `Genarrative-Database-Export` / `Genarrative-Database-Import` 两条 SCM-backed 数据库迁移流水线参数、默认 dry-run、token 边界和 `CHUNK_SIZE` 413 规避参数。
- [RPG_PROMPT_FRONTEND_REMOVAL_AND_SERVER_RS_MIGRATION_2026-04-28.md](./RPG_PROMPT_FRONTEND_REMOVAL_AND_SERVER_RS_MIGRATION_2026-04-28.md):冻结 RPG 提示词禁止存在前端的边界,明确前端只保留 API client角色私聊/NPC 对话/剧情续写等 prompt 统一收口到 `server-rs`
- [RPG_CREATION_RESULT_VIEW_BACKEND_TRUTH_MIGRATION_2026-04-28.md](./RPG_CREATION_RESULT_VIEW_BACKEND_TRUTH_MIGRATION_2026-04-28.md):冻结 RPG 创作结果页保存、Agent session/result preview 真相优先级和结果页入口裁决迁移到后端 result-view 的落地边界。
- [RPG_CREATION_PROFILE_GENERATION_BACKEND_MIGRATION_2026-04-28.md](./RPG_CREATION_PROFILE_GENERATION_BACKEND_MIGRATION_2026-04-28.md):记录 RPG 创作 profile 生成移除非浏览器 legacy AI 回退,统一通过 `server-rs``/api/runtime/custom-world/profile` 生成世界底稿。

View File

@@ -8,7 +8,7 @@ SpacetimeDB reducer 必须保持确定性,不能访问文件系统和网络。
1. `spacetime-module` 内的导出 procedure 读取迁移白名单表,并直接返回迁移 JSON 字符串。
2. Node 运维脚本默认通过 `spacetime call` 调用导出 procedure把返回的 JSON 字符串写入本地文件。
3. Node 运维脚本读取本地 JSON 文件内容。导入时默认先通过 `POST /v1/identity` 创建临时 Web API identity/token再用当前 CLI 登录态把该 identity 授权为迁移操作员,最后通过 HTTP request body 把 JSON 字符串传给导入 procedure。
3. Node 运维脚本读取本地 JSON 文件内容。导入时默认先通过 `POST /v1/identity` 创建临时 Web API identity/token再用当前 CLI 登录态把该 identity 授权为迁移操作员;小文件直接通过 HTTP request body 传给导入 procedure,大文件自动切成分片上传后再提交
4. 导入 procedure 校验 JSON 与表白名单后,在事务中写入目标数据库。
procedure 不再访问 HTTP 文件桥,也不接收部署机本地文件路径。这样可以避开 SpacetimeDB 对 private/special-purpose 地址的 HTTP 访问限制,并避免把 private 表内容通过临时 HTTP 服务转发。
@@ -30,6 +30,8 @@ SpacetimeDB Wasm 运行环境不支持 `std::time::SystemTime::now()`procedur
`database_migration_operator` 只控制迁移 procedure 调用权限,不会被导出或导入,避免把源库的运维权限复制到目标库。
大文件分片导入额外使用私有临时表 `database_migration_import_chunk` 暂存上传片段。这张表只保存当前导入过程的中间数据,提交成功后自动删除,失败时由脚本尽量调用清理 procedure它不在迁移白名单内也不会被导出到业务迁移 JSON。
首次授权时,操作员表为空,必须通过编译进模块的 `GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET` 引导密钥授权第一位操作员。发布脚本会在构建或发布 SpacetimeDB 模块时自动生成一份强随机引导密钥、注入 wasm 编译环境,并在控制台显示;运维人员必须记录对应数据库本次发布输出的密钥。表内已经存在操作员后,后续授权与撤销只能由已有操作员发起;此时不再接受引导密钥越权扩权。
新增 procedure
@@ -99,6 +101,14 @@ node scripts/spacetime-revoke-migration-operator.mjs \
`import_database_migration_incremental_from_file(ctx, input)`
`put_database_migration_import_chunk(ctx, input)`
`import_database_migration_from_chunks(ctx, input)`
`import_database_migration_incremental_from_chunks(ctx, input)`
`clear_database_migration_import_chunks(ctx, input)`
输入字段:
- `migration_json`: 导出 procedure 生成的完整迁移 JSON 字符串。
@@ -106,6 +116,15 @@ node scripts/spacetime-revoke-migration-operator.mjs \
- `replace_existing`: 是否先清空本次迁移文件内实际导入的目标表。不会清空迁移文件未包含的表;分批迁移时只覆盖当前批次。
- `dry_run`: 只解析和统计,不写表。
分片导入字段:
- `upload_id`: 本次分片上传的唯一 ID只允许 ASCII 字母、数字、短横线或下划线。
- `chunk_index`: 当前分片序号,从 `0` 开始。
- `chunk_count`: 本次上传总分片数。
- `chunk`: 当前迁移 JSON 片段,单片最多 `1048576` bytes。
Node 导入脚本默认在文件超过 `524288` bytes 时使用分片导入;如果小文件直接导入仍遇到 `SpacetimeDB HTTP 413: Failed to buffer the request body: length limit exceeded`,也会自动退回分片流程。可通过 `--chunk-size <bytes>` 或环境变量 `GENARRATIVE_SPACETIME_MIGRATION_CHUNK_SIZE` 调小单片大小。
导入模式:
- 默认严格追加:不清空目标表,逐行插入;遇到主键或唯一约束冲突时失败并回滚,适合确认目标库没有同表旧数据时使用。
@@ -252,8 +271,9 @@ node scripts/spacetime-import-migration-json.mjs \
1. `POST /v1/identity` 创建临时 Web API identity/token。
2. 使用当前机器 `spacetime` CLI 登录态调用 `authorize_database_migration_operator`,授权这个临时 identity。
3. 使用 `Authorization: Bearer <临时 token>` 调用 `import_database_migration_from_file`,把完整迁移 JSON 放在 HTTP body 中
4. 导入请求结束后,脚本会用同一个临时 Web API token 调用 `revoke_database_migration_operator`,撤销该临时 identity
3. 使用 `Authorization: Bearer <临时 token>` 导入迁移 JSON。文件不超过 `--chunk-size` 时直接调用 `import_database_migration_from_file`;超过阈值或直接导入触发 HTTP 413 时,先逐片调用 `put_database_migration_import_chunk`,再调用 `import_database_migration_from_chunks``import_database_migration_incremental_from_chunks`
4. 分片上传或提交失败时,脚本会尽量调用 `clear_database_migration_import_chunks` 清理临时分片
5. 导入请求结束后,脚本会用同一个临时 Web API token 调用 `revoke_database_migration_operator`,撤销该临时 identity。
所有直接访问 SpacetimeDB Web API 的 POST 请求必须显式发送 `Content-Type: application/json`。部分 SpacetimeDB 版本不会接受省略 content type 或附带非预期 media type 的请求,即使 body 本身是合法 JSON也会返回 `HTTP 415`
@@ -265,6 +285,8 @@ node scripts/spacetime-import-migration-json.mjs \
`--dry-run` 不会模拟目标库主键或唯一约束冲突,因此增量模式的 `skipped_row_count` 只有真实导入时才准确。
如果 Jenkins 或 SpacetimeDB 返回 `HTTP 413`,优先降低导入流水线的 `CHUNK_SIZE`,例如 `262144`。该参数只影响上传到 procedure 的单片 request body不改变迁移 JSON 的表范围和导入语义。
不要在只想追加数据时使用 `--replace-existing`。该参数会先删除覆盖范围内的目标表旧数据,再插入迁移文件中的数据;如果源文件不是完整快照,会造成目标表数据丢失。
如需分批迁移,可用逗号分隔表名:
@@ -303,6 +325,6 @@ node scripts/spacetime-export-migration-json.mjs \
## 风险与限制
迁移 JSON 作为 procedure 返回值和 HTTP request body 传递,会受 SpacetimeDB 调用响应体、请求体以及中间代理大小限制。数据量较大时,先按 `include_tables` 分批迁移;若单表本身过大,再补充分片 procedure而不是恢复 HTTP 文件桥
迁移 JSON 作为 procedure 返回值和 HTTP request body 传递,会受 SpacetimeDB 调用响应体、请求体以及中间代理大小限制。导入端已经内置分片上传来规避 `HTTP 413` 请求体限制;如果导出响应本身过大,仍需先按 `include_tables` 分批导出
`spacetime call` 在 PowerShell 中手写 JSON 容易被剥掉双引号。导入大文件时也不能把完整 JSON 放进命令行参数,否则 Linux 会在启动子进程时返回 `spawn E2BIG`。推荐使用仓库里的 Node 脚本,由脚本直接走 Web API request body避免 shell 二次处理和命令行长度限制。

View File

@@ -30,6 +30,22 @@ spacetime sql <db> "SELECT * FROM custom_world_gallery_entry"
| 大鱼吃小鱼 | `big_fish_creation_session`, `big_fish_agent_message`, `big_fish_asset_slot`, `big_fish_runtime_run` |
| 资产 | `asset_object`, `asset_entity_binding` |
| AI 任务 | `ai_task`, `ai_task_stage`, `ai_text_chunk`, `ai_result_reference` |
| 运维迁移 | `database_migration_operator`, `database_migration_import_chunk` |
## 运维迁移表
### `database_migration_operator`
- 作用:迁移操作员白名单,控制导出、导入、授权和撤销迁移 procedure 的调用权限。
- 结构:`operator_identity PK: Identity`, `created_at: Timestamp`, `created_by: Identity`, `note: String`
- 索引:主键 `operator_identity`
### `database_migration_import_chunk`
- 作用:大迁移 JSON 分片导入的私有临时表,用于规避单次 HTTP request body 过大导致的 `HTTP 413`;提交成功后由导入 procedure 自动清理,失败时由脚本尽量清理。
- 结构:`chunk_key PK: String`, `upload_id: String`, `chunk_index: u32`, `chunk_count: u32`, `operator_identity: Identity`, `created_at: Timestamp`, `chunk: String`
- 索引:主键 `chunk_key``upload_id`
- 迁移边界:不加入迁移白名单,不导出到业务迁移 JSON。
## 认证表

View File

@@ -16,6 +16,7 @@ pipeline {
string(name: 'ROOT_DIR', defaultValue: '', description: 'spacetime CLI root-dir可选优先于 DEPLOY_DIRECTORY')
string(name: 'INPUT_FILE', defaultValue: '', description: '必填,迁移 JSON 文件路径,相对源码根目录或绝对路径')
string(name: 'INCLUDE_TABLES', defaultValue: '', description: '可选,逗号分隔的表名白名单')
string(name: 'CHUNK_SIZE', defaultValue: '524288', description: '迁移 JSON 分片大小,默认 512KiB用于规避 SpacetimeDB HTTP 413')
booleanParam(name: 'DRY_RUN', defaultValue: true, description: '仅校验导入,不写入数据')
booleanParam(name: 'INCREMENTAL', defaultValue: true, description: '增量导入,跳过已存在或冲突的行')
booleanParam(name: 'REPLACE_EXISTING', defaultValue: false, description: '覆盖本次文件内涉及的表,不可与 INCREMENTAL 同时启用')
@@ -80,6 +81,9 @@ pipeline {
if [[ -n "${params.INCLUDE_TABLES}" ]]; then
args+=(--include "${params.INCLUDE_TABLES}")
fi
if [[ -n "${params.CHUNK_SIZE}" ]]; then
args+=(--chunk-size "${params.CHUNK_SIZE}")
fi
if [[ "${params.DRY_RUN}" == "true" ]]; then
args+=(--dry-run)
fi

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env node
import { randomUUID } from 'node:crypto';
import { readFile } from 'node:fs/promises';
import path from 'node:path';
import {
@@ -11,6 +12,8 @@ import {
parseArgs,
} from './spacetime-migration-common.mjs';
const DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE = 512 * 1024;
try {
const options = parseArgs(process.argv.slice(2));
if (!options.in) {
@@ -30,7 +33,7 @@ try {
const webOptions = await prepareWebImportOptions(options);
let result;
try {
result = await importMigrationJsonDirect(webOptions, migrationJson);
result = await importMigrationJsonWithFallback(webOptions, migrationJson);
} finally {
await revokeTemporaryWebIdentity(webOptions);
}
@@ -86,6 +89,25 @@ async function prepareWebImportOptions(options) {
};
}
async function importMigrationJsonWithFallback(options, migrationJson) {
const chunkSize = resolveChunkSize(options);
if (Buffer.byteLength(migrationJson, 'utf8') > chunkSize) {
return importMigrationJsonChunked(options, migrationJson, chunkSize);
}
try {
return await importMigrationJsonDirect(options, migrationJson);
} catch (error) {
if (!isRequestBodyTooLargeError(error)) {
throw error;
}
console.warn(
`[spacetime:migration:import] 直接导入触发 HTTP 413改用 ${chunkSize} bytes 分片上传。`,
);
return importMigrationJsonChunked(options, migrationJson, chunkSize);
}
}
async function importMigrationJsonDirect(options, migrationJson) {
const includeTables = resolveImportIncludeTables(options, migrationJson);
const procedureName =
@@ -108,6 +130,60 @@ async function importMigrationJsonDirect(options, migrationJson) {
return callSpacetimeProcedure(options, procedureName, input);
}
async function importMigrationJsonChunked(options, migrationJson, chunkSize) {
const includeTables = resolveImportIncludeTables(options, migrationJson);
const procedureName =
options.incremental === true
? 'import_database_migration_incremental_from_chunks'
: 'import_database_migration_from_chunks';
const uploadId = `migration-${Date.now()}-${randomUUID()}`;
const chunks = splitStringByUtf8Bytes(migrationJson, chunkSize);
console.log(
`[spacetime:migration:import] 使用分片导入: upload_id=${uploadId}, chunks=${chunks.length}, chunk_size=${chunkSize}`,
);
if (options.replaceExisting === true) {
console.log(
`[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`,
);
} else if (options.incremental === true) {
console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`);
}
let committed = false;
try {
for (let index = 0; index < chunks.length; index += 1) {
const chunkResult = await callSpacetimeProcedure(
options,
'put_database_migration_import_chunk',
{
upload_id: uploadId,
chunk_index: index,
chunk_count: chunks.length,
chunk: chunks[index],
},
);
ensureProcedureOk(chunkResult);
console.log(
`[spacetime:migration:import] 已上传迁移分片 ${index + 1}/${chunks.length}`,
);
}
const result = await callSpacetimeProcedure(options, procedureName, {
upload_id: uploadId,
include_tables: includeTables,
replace_existing: options.replaceExisting === true,
dry_run: options.dryRun === true,
});
ensureProcedureOk(result);
committed = true;
return result;
} finally {
if (!committed) {
await clearMigrationChunksBestEffort(options, uploadId);
}
}
}
function resolveImportIncludeTables(options, migrationJson) {
if (options.replaceExisting !== true) {
return options.includeTables;
@@ -152,6 +228,63 @@ function readMigrationTableNames(migrationJson) {
return tableNames;
}
function resolveChunkSize(options) {
const chunkSize = options.chunkSize || DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE;
if (chunkSize > 1024 * 1024) {
throw new Error('--chunk-size 不能超过 1048576避免触发迁移分片 procedure 单片限制。');
}
return chunkSize;
}
function splitStringByUtf8Bytes(value, maxBytes) {
const chunks = [];
let current = '';
let currentBytes = 0;
for (const character of value) {
const characterBytes = Buffer.byteLength(character, 'utf8');
if (characterBytes > maxBytes) {
throw new Error(`单个字符超过 chunk-size当前 chunk-size: ${maxBytes}`);
}
if (currentBytes + characterBytes > maxBytes && current) {
chunks.push(current);
current = '';
currentBytes = 0;
}
current += character;
currentBytes += characterBytes;
}
if (current) {
chunks.push(current);
}
return chunks;
}
function isRequestBodyTooLargeError(error) {
const message = error instanceof Error ? error.message : String(error);
return (
message.includes('HTTP 413') ||
message.toLowerCase().includes('length limit exceeded')
);
}
async function clearMigrationChunksBestEffort(options, uploadId) {
try {
const result = await callSpacetimeProcedure(
options,
'clear_database_migration_import_chunks',
{ upload_id: uploadId },
);
ensureProcedureOk(result);
console.warn(`[spacetime:migration:import] 已清理失败导入的临时分片: ${uploadId}`);
} catch (error) {
console.warn(
`[spacetime:migration:import] 清理临时迁移分片失败: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
async function revokeTemporaryWebIdentity(options) {
if (!options.temporaryWebIdentity) {
return;

View File

@@ -4,6 +4,10 @@ import path from 'node:path';
export function parseArgs(argv) {
const options = {
chunkSize: parseOptionalPositiveInteger(
process.env.GENARRATIVE_SPACETIME_MIGRATION_CHUNK_SIZE,
'GENARRATIVE_SPACETIME_MIGRATION_CHUNK_SIZE',
),
database:
process.env.GENARRATIVE_SPACETIME_MAINCLOUD_DATABASE ||
process.env.GENARRATIVE_SPACETIME_DATABASE ||
@@ -48,6 +52,8 @@ export function parseArgs(argv) {
options.token = readValue(arg);
} else if (arg === '--bootstrap-secret') {
options.bootstrapSecret = readValue(arg);
} else if (arg === '--chunk-size') {
options.chunkSize = parsePositiveInteger(readValue(arg), arg);
} else if (arg === '--operator-identity') {
options.operatorIdentity = readValue(arg);
} else if (arg === '--note') {
@@ -81,6 +87,25 @@ export function parseArgs(argv) {
return options;
}
export function parsePositiveInteger(value, name) {
if (!/^[1-9][0-9]*$/u.test(String(value).trim())) {
throw new Error(`${name} 必须是正整数。`);
}
const parsed = Number.parseInt(String(value).trim(), 10);
if (!Number.isSafeInteger(parsed)) {
throw new Error(`${name} 超出安全整数范围。`);
}
return parsed;
}
function parseOptionalPositiveInteger(value, name) {
if (!value) {
return 0;
}
return parsePositiveInteger(value, name);
}
export function buildSpacetimeCallArgs(options, procedureName, input) {
if (!options.database) {
throw new Error('必须传入 --database。');

View File

@@ -10,6 +10,8 @@ use crate::puzzle::{
const MIGRATION_SCHEMA_VERSION: u32 = 1;
const MIGRATION_MAX_TABLE_NAME_LEN: usize = 96;
const MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN: usize = 128;
const MIGRATION_MAX_IMPORT_CHUNK_BYTES: usize = 1024 * 1024;
const MIGRATION_MAX_OPERATOR_NOTE_CHARS: usize = 160;
const MIGRATION_MIN_BOOTSTRAP_SECRET_LEN: usize = 16;
const MIGRATION_BOOTSTRAP_SECRET: Option<&str> =
@@ -24,6 +26,21 @@ pub struct DatabaseMigrationOperator {
pub note: String,
}
#[spacetimedb::table(
accessor = database_migration_import_chunk,
index(accessor = by_database_migration_import_upload, btree(columns = [upload_id]))
)]
pub struct DatabaseMigrationImportChunk {
#[primary_key]
pub chunk_key: String,
pub upload_id: String,
pub chunk_index: u32,
pub chunk_count: u32,
pub operator_identity: Identity,
pub created_at: Timestamp,
pub chunk: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationExportInput {
pub include_tables: Vec<String>,
@@ -37,6 +54,27 @@ pub struct DatabaseMigrationImportInput {
pub dry_run: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportChunkInput {
pub upload_id: String,
pub chunk_index: u32,
pub chunk_count: u32,
pub chunk: String,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportChunksInput {
pub upload_id: String,
pub include_tables: Vec<String>,
pub replace_existing: bool,
pub dry_run: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct DatabaseMigrationImportChunksClearInput {
pub upload_id: String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DatabaseMigrationImportMode {
Strict,
@@ -321,6 +359,76 @@ pub fn import_database_migration_incremental_from_file(
}
}
// 大迁移 JSON 先按分片写入私有临时表,避免单次 HTTP request body 触发 SpacetimeDB 413。
#[spacetimedb::procedure]
pub fn put_database_migration_import_chunk(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunkInput,
) -> DatabaseMigrationProcedureResult {
match put_database_migration_import_chunk_inner(ctx, input) {
Ok(()) => empty_database_migration_result(true, None),
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
// 分片提交保持与直接导入相同的严格追加语义;提交成功后清理临时分片。
#[spacetimedb::procedure]
pub fn import_database_migration_from_chunks(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_chunks_inner(
ctx,
input,
DatabaseMigrationImportMode::Strict,
) {
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
warnings,
error_message: None,
},
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
// 分片增量提交只插入目标库缺失的行;主键或唯一约束冲突的行会跳过。
#[spacetimedb::procedure]
pub fn import_database_migration_incremental_from_chunks(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksInput,
) -> DatabaseMigrationProcedureResult {
match import_database_migration_from_chunks_inner(
ctx,
input,
DatabaseMigrationImportMode::Incremental,
) {
Ok((stats, warnings)) => DatabaseMigrationProcedureResult {
ok: true,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: stats,
warnings,
error_message: None,
},
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
// 调用方上传失败或提交失败时可显式清理同一 upload_id 的临时分片。
#[spacetimedb::procedure]
pub fn clear_database_migration_import_chunks(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksClearInput,
) -> DatabaseMigrationProcedureResult {
match clear_database_migration_import_chunks_inner(ctx, input) {
Ok(()) => empty_database_migration_result(true, None),
Err(error) => empty_database_migration_result(false, Some(error)),
}
}
fn export_database_migration_to_file_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationExportInput,
@@ -361,14 +469,7 @@ fn import_database_migration_from_file_inner(
}
ctx.try_with_tx(|tx| require_migration_operator(tx, caller))?;
let migration_file = serde_json::from_str::<MigrationFile>(&input.migration_json)
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
return Err(format!(
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
));
}
let migration_file = parse_migration_file(&input.migration_json)?;
let (stats, warnings) = if input.dry_run {
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
@@ -388,6 +489,158 @@ fn import_database_migration_from_file_inner(
Ok((stats, warnings))
}
fn put_database_migration_import_chunk_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunkInput,
) -> Result<(), String> {
let caller = ctx.sender();
let upload_id = normalize_import_upload_id(&input.upload_id)?;
if input.chunk_count == 0 {
return Err("分片总数必须大于 0".to_string());
}
if input.chunk_index >= input.chunk_count {
return Err(format!(
"分片序号越界: {} / {}",
input.chunk_index, input.chunk_count
));
}
if input.chunk.is_empty() {
return Err("迁移 JSON 分片不能为空".to_string());
}
if input.chunk.len() > MIGRATION_MAX_IMPORT_CHUNK_BYTES {
return Err(format!(
"迁移 JSON 分片过大,单片最多 {} bytes",
MIGRATION_MAX_IMPORT_CHUNK_BYTES
));
}
let chunk_key = build_import_chunk_key(&upload_id, input.chunk_index);
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
if let Some(existing) = tx
.db
.database_migration_import_chunk()
.chunk_key()
.find(&chunk_key)
{
if existing.operator_identity != caller {
return Err("同名迁移分片已由其他 identity 上传,已拒绝覆盖".to_string());
}
tx.db
.database_migration_import_chunk()
.chunk_key()
.delete(&chunk_key);
}
tx.db
.database_migration_import_chunk()
.insert(DatabaseMigrationImportChunk {
chunk_key: chunk_key.clone(),
upload_id: upload_id.clone(),
chunk_index: input.chunk_index,
chunk_count: input.chunk_count,
operator_identity: caller,
created_at: tx.timestamp,
chunk: input.chunk.clone(),
});
Ok(())
})?;
Ok(())
}
fn import_database_migration_from_chunks_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksInput,
import_mode: DatabaseMigrationImportMode,
) -> Result<
(
Vec<DatabaseMigrationTableStat>,
Vec<DatabaseMigrationWarning>,
),
String,
> {
let caller = ctx.sender();
let upload_id = normalize_import_upload_id(&input.upload_id)?;
let included_tables = normalize_include_tables(&input.include_tables)?;
if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing {
return Err("增量导入不能同时启用 replace_existing".to_string());
}
let migration_json = ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
read_database_migration_import_chunks(tx, &upload_id, caller)
})?;
let migration_file = parse_migration_file(&migration_json)?;
let (stats, warnings) = if input.dry_run {
build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())?
} else {
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
apply_migration_file(
tx,
&migration_file,
included_tables.as_ref(),
input.replace_existing,
import_mode,
)
})?
};
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
clear_database_migration_import_chunks_tx(tx, &upload_id);
Ok::<(), String>(())
})?;
Ok((stats, warnings))
}
fn clear_database_migration_import_chunks_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationImportChunksClearInput,
) -> Result<(), String> {
let caller = ctx.sender();
let upload_id = normalize_import_upload_id(&input.upload_id)?;
ctx.try_with_tx(|tx| {
require_migration_operator(tx, caller)?;
clear_database_migration_import_chunks_tx(tx, &upload_id);
Ok::<(), String>(())
})?;
Ok(())
}
fn empty_database_migration_result(
ok: bool,
error_message: Option<String>,
) -> DatabaseMigrationProcedureResult {
DatabaseMigrationProcedureResult {
ok,
schema_version: MIGRATION_SCHEMA_VERSION,
migration_json: None,
table_stats: Vec::new(),
warnings: Vec::new(),
error_message,
}
}
fn parse_migration_file(migration_json: &str) -> Result<MigrationFile, String> {
if migration_json.trim().is_empty() {
return Err("migration_json 不能为空".to_string());
}
let migration_file = serde_json::from_str::<MigrationFile>(migration_json)
.map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?;
if migration_file.schema_version != MIGRATION_SCHEMA_VERSION {
return Err(format!(
"迁移文件 schema_version 不匹配,期望 {},实际 {}",
MIGRATION_SCHEMA_VERSION, migration_file.schema_version
));
}
Ok(migration_file)
}
fn authorize_database_migration_operator_inner(
ctx: &mut ProcedureContext,
input: DatabaseMigrationAuthorizeOperatorInput,
@@ -529,6 +782,96 @@ fn normalize_migration_operator_note(input: &str) -> Result<String, String> {
Ok(note.to_string())
}
fn normalize_import_upload_id(input: &str) -> Result<String, String> {
let upload_id = input.trim();
if upload_id.is_empty() {
return Err("upload_id 不能为空".to_string());
}
if upload_id.len() > MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN {
return Err(format!(
"upload_id 过长,最多 {} bytes",
MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN
));
}
if !upload_id
.chars()
.all(|character| character.is_ascii_alphanumeric() || matches!(character, '-' | '_'))
{
return Err("upload_id 只能使用 ASCII 字母、数字、短横线或下划线".to_string());
}
Ok(upload_id.to_string())
}
fn build_import_chunk_key(upload_id: &str, chunk_index: u32) -> String {
format!("{upload_id}:{chunk_index:010}")
}
fn read_database_migration_import_chunks(
ctx: &ReducerContext,
upload_id: &str,
caller: Identity,
) -> Result<String, String> {
let mut chunks = ctx
.db
.database_migration_import_chunk()
.by_database_migration_import_upload()
.filter(upload_id)
.collect::<Vec<_>>();
if chunks.is_empty() {
return Err(format!("未找到迁移 JSON 分片: {upload_id}"));
}
if chunks.iter().any(|chunk| chunk.operator_identity != caller) {
return Err("迁移 JSON 分片包含其他 identity 上传的片段,已拒绝提交".to_string());
}
let chunk_count = chunks[0].chunk_count;
if chunk_count == 0 {
return Err("迁移 JSON 分片总数不合法".to_string());
}
if chunks
.iter()
.any(|chunk| chunk.chunk_count != chunk_count || chunk.upload_id != upload_id)
{
return Err("迁移 JSON 分片总数不一致".to_string());
}
if chunks.len() != chunk_count as usize {
return Err(format!(
"迁移 JSON 分片未上传完整,已收到 {} / {}",
chunks.len(),
chunk_count
));
}
chunks.sort_by_key(|chunk| chunk.chunk_index);
let mut expected_index = 0u32;
let mut migration_json = String::new();
for chunk in chunks {
if chunk.chunk_index != expected_index {
return Err(format!("迁移 JSON 分片缺失序号: {expected_index}"));
}
migration_json.push_str(&chunk.chunk);
expected_index = expected_index.saturating_add(1);
}
Ok(migration_json)
}
fn clear_database_migration_import_chunks_tx(ctx: &ReducerContext, upload_id: &str) {
let chunk_keys = ctx
.db
.database_migration_import_chunk()
.by_database_migration_import_upload()
.filter(upload_id)
.map(|chunk| chunk.chunk_key)
.collect::<Vec<_>>();
for chunk_key in chunk_keys {
ctx.db
.database_migration_import_chunk()
.chunk_key()
.delete(&chunk_key);
}
}
fn normalize_include_tables(input: &[String]) -> Result<Option<HashSet<String>>, String> {
if input.is_empty() {
return Ok(None);