From 22f3f963def67708838d69d2a65abdfb5f9f3e49 Mon Sep 17 00:00:00 2001 From: kdletters Date: Thu, 30 Apr 2026 15:20:49 +0800 Subject: [PATCH] Handle SpacetimeDB migration imports with chunked uploads --- docs/README.md | 2 +- ...DATABASE_MIGRATION_PIPELINES_2026-04-29.md | 14 +- docs/technical/README.md | 2 + ...N_STRING_MIGRATION_PROCEDURE_2026-04-27.md | 30 +- docs/technical/SPACETIMEDB_TABLE_CATALOG.md | 16 + jenkins/Jenkinsfile.database-import | 4 + scripts/spacetime-import-migration-json.mjs | 135 ++++++- scripts/spacetime-migration-common.mjs | 25 ++ .../crates/spacetime-module/src/migration.rs | 359 +++++++++++++++++- 9 files changed, 567 insertions(+), 20 deletions(-) diff --git a/docs/README.md b/docs/README.md index 1af49af1..d03a1ec6 100644 --- a/docs/README.md +++ b/docs/README.md @@ -13,7 +13,7 @@ 重点补充:RPG 创作与运行时脚本职责地图见 [RPG_CREATION_AND_RUNTIME_SCRIPT_RESPONSIBILITY_MAP_2026-04-28.md](./reference/RPG_CREATION_AND_RUNTIME_SCRIPT_RESPONSIBILITY_MAP_2026-04-28.md)。 - [PRD](./prd):产品需求与阶段计划;新增 RPG 开场动画方案见 [AI_NATIVE_RPG_OPENING_ANIMATION_PRD_2026-04-25.md](./prd/AI_NATIVE_RPG_OPENING_ANIMATION_PRD_2026-04-25.md)。 -SpacetimeDB 表结构变更、自动迁移边界和保留旧数据的分阶段迁移流程见 [SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md](./technical/SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md)。 +SpacetimeDB 表结构变更、自动迁移边界和保留旧数据的分阶段迁移流程见 [SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md](./technical/SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md);private 表迁移 JSON 导入导出、HTTP 413 分片导入和 Jenkins 数据库迁移流水线见 [SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md](./technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md) 与 [JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md](./technical/JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md)。 ## 推荐阅读顺序 diff --git a/docs/technical/JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md b/docs/technical/JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md index b40f30b6..a51de72e 100644 --- a/docs/technical/JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md +++ b/docs/technical/JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md @@ -72,12 +72,13 @@ Genarrative-Database-Import 1. `INPUT_FILE`:必填,迁移 JSON 文件路径。 2. `DATABASE`、`SERVER`、`SERVER_URL`、`DEPLOY_DIRECTORY`、`ROOT_DIR`:与导出流水线一致。 3. `INCLUDE_TABLES`:可选,只导入指定表。 -4. `DRY_RUN`:默认 `true`,只校验不写入。 -5. `INCREMENTAL`:默认 `true`,跳过已存在或冲突的行。 -6. `REPLACE_EXISTING`:默认 `false`,只覆盖本次迁移文件中涉及的表;不可与 `INCREMENTAL` 同时启用。 -7. `BOOTSTRAP_SECRET`:可选,用于授权临时 Web API identity。 -8. `TOKEN`:可选,SpacetimeDB 客户端连接 token;留空时脚本会自动创建临时 identity 并在结束后撤销。 -9. `NOTE`:迁移授权备注。 +4. `CHUNK_SIZE`:迁移 JSON 分片大小,默认 `524288` bytes。导入脚本会在文件超过该大小或直接导入触发 HTTP 413 时自动分片上传。 +5. `DRY_RUN`:默认 `true`,只校验不写入。 +6. `INCREMENTAL`:默认 `true`,跳过已存在或冲突的行。 +7. `REPLACE_EXISTING`:默认 `false`,只覆盖本次迁移文件中涉及的表;不可与 `INCREMENTAL` 同时启用。 +8. `BOOTSTRAP_SECRET`:可选,用于授权临时 Web API identity。 +9. `TOKEN`:可选,SpacetimeDB 客户端连接 token;留空时脚本会自动创建临时 identity 并在结束后撤销。 +10. `NOTE`:迁移授权备注。 ## 4. 安全边界 @@ -85,6 +86,7 @@ Genarrative-Database-Import 2. `INCREMENTAL` 与 `REPLACE_EXISTING` 互斥,Jenkinsfile 会在执行前阻止同时启用。 3. Jenkinsfile 不打印 token;生产环境应通过 Jenkins 凭据或目标机器环境变量传入敏感值。 4. 如果不传 `TOKEN`,导入脚本会创建临时 Web API identity,并调用迁移授权/撤销 procedure 收敛权限窗口。 +5. 如果日志出现 `SpacetimeDB HTTP 413: Failed to buffer the request body: length limit exceeded`,优先把 `CHUNK_SIZE` 调低到 `262144` 或更小后重跑。该参数只降低单次 HTTP body,不改变导入表范围。 ## 5. 本地部署测试参数 diff --git a/docs/technical/README.md b/docs/technical/README.md index 9e611bd3..effa1cc1 100644 --- a/docs/technical/README.md +++ b/docs/technical/README.md @@ -5,6 +5,8 @@ ## 文档列表 - [SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md](./SPACETIMEDB_SCHEMA_CHANGE_CONSTRAINTS.md):冻结 SpacetimeDB 表结构变更约束、自动迁移可接受范围、冲突后的系统行为,以及保留旧数据的增量迁移流程;凡涉及 `spacetime publish`、表字段调整或 `migration.rs` 对齐时优先参考。 +- [SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md](./SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md):记录 SpacetimeDB private 表迁移 JSON 导出/导入 procedure、迁移操作员授权、HTTP 413 分片导入、Jenkins 自动迁移回灌和导入脚本参数。 +- [JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md](./JENKINS_SPACETIMEDB_DATABASE_MIGRATION_PIPELINES_2026-04-29.md):记录 `Genarrative-Database-Export` / `Genarrative-Database-Import` 两条 SCM-backed 数据库迁移流水线参数、默认 dry-run、token 边界和 `CHUNK_SIZE` 413 规避参数。 - [RPG_PROMPT_FRONTEND_REMOVAL_AND_SERVER_RS_MIGRATION_2026-04-28.md](./RPG_PROMPT_FRONTEND_REMOVAL_AND_SERVER_RS_MIGRATION_2026-04-28.md):冻结 RPG 提示词禁止存在前端的边界,明确前端只保留 API client,角色私聊/NPC 对话/剧情续写等 prompt 统一收口到 `server-rs`。 - [RPG_CREATION_RESULT_VIEW_BACKEND_TRUTH_MIGRATION_2026-04-28.md](./RPG_CREATION_RESULT_VIEW_BACKEND_TRUTH_MIGRATION_2026-04-28.md):冻结 RPG 创作结果页保存、Agent session/result preview 真相优先级和结果页入口裁决迁移到后端 result-view 的落地边界。 - [RPG_CREATION_PROFILE_GENERATION_BACKEND_MIGRATION_2026-04-28.md](./RPG_CREATION_PROFILE_GENERATION_BACKEND_MIGRATION_2026-04-28.md):记录 RPG 创作 profile 生成移除非浏览器 legacy AI 回退,统一通过 `server-rs` 的 `/api/runtime/custom-world/profile` 生成世界底稿。 diff --git a/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md b/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md index 92bf4c85..294c0069 100644 --- a/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md +++ b/docs/technical/SPACETIMEDB_JSON_STRING_MIGRATION_PROCEDURE_2026-04-27.md @@ -8,7 +8,7 @@ SpacetimeDB reducer 必须保持确定性,不能访问文件系统和网络。 1. `spacetime-module` 内的导出 procedure 读取迁移白名单表,并直接返回迁移 JSON 字符串。 2. Node 运维脚本默认通过 `spacetime call` 调用导出 procedure,把返回的 JSON 字符串写入本地文件。 -3. Node 运维脚本读取本地 JSON 文件内容。导入时默认先通过 `POST /v1/identity` 创建临时 Web API identity/token,再用当前 CLI 登录态把该 identity 授权为迁移操作员,最后通过 HTTP request body 把 JSON 字符串传给导入 procedure。 +3. Node 运维脚本读取本地 JSON 文件内容。导入时默认先通过 `POST /v1/identity` 创建临时 Web API identity/token,再用当前 CLI 登录态把该 identity 授权为迁移操作员;小文件直接通过 HTTP request body 传给导入 procedure,大文件自动切成分片上传后再提交。 4. 导入 procedure 校验 JSON 与表白名单后,在事务中写入目标数据库。 procedure 不再访问 HTTP 文件桥,也不接收部署机本地文件路径。这样可以避开 SpacetimeDB 对 private/special-purpose 地址的 HTTP 访问限制,并避免把 private 表内容通过临时 HTTP 服务转发。 @@ -30,6 +30,8 @@ SpacetimeDB Wasm 运行环境不支持 `std::time::SystemTime::now()`,procedur `database_migration_operator` 只控制迁移 procedure 调用权限,不会被导出或导入,避免把源库的运维权限复制到目标库。 +大文件分片导入额外使用私有临时表 `database_migration_import_chunk` 暂存上传片段。这张表只保存当前导入过程的中间数据,提交成功后自动删除,失败时由脚本尽量调用清理 procedure;它不在迁移白名单内,也不会被导出到业务迁移 JSON。 + 首次授权时,操作员表为空,必须通过编译进模块的 `GENARRATIVE_SPACETIME_MIGRATION_BOOTSTRAP_SECRET` 引导密钥授权第一位操作员。发布脚本会在构建或发布 SpacetimeDB 模块时自动生成一份强随机引导密钥、注入 wasm 编译环境,并在控制台显示;运维人员必须记录对应数据库本次发布输出的密钥。表内已经存在操作员后,后续授权与撤销只能由已有操作员发起;此时不再接受引导密钥越权扩权。 新增 procedure: @@ -99,6 +101,14 @@ node scripts/spacetime-revoke-migration-operator.mjs \ `import_database_migration_incremental_from_file(ctx, input)` +`put_database_migration_import_chunk(ctx, input)` + +`import_database_migration_from_chunks(ctx, input)` + +`import_database_migration_incremental_from_chunks(ctx, input)` + +`clear_database_migration_import_chunks(ctx, input)` + 输入字段: - `migration_json`: 导出 procedure 生成的完整迁移 JSON 字符串。 @@ -106,6 +116,15 @@ node scripts/spacetime-revoke-migration-operator.mjs \ - `replace_existing`: 是否先清空本次迁移文件内实际导入的目标表。不会清空迁移文件未包含的表;分批迁移时只覆盖当前批次。 - `dry_run`: 只解析和统计,不写表。 +分片导入字段: + +- `upload_id`: 本次分片上传的唯一 ID,只允许 ASCII 字母、数字、短横线或下划线。 +- `chunk_index`: 当前分片序号,从 `0` 开始。 +- `chunk_count`: 本次上传总分片数。 +- `chunk`: 当前迁移 JSON 片段,单片最多 `1048576` bytes。 + +Node 导入脚本默认在文件超过 `524288` bytes 时使用分片导入;如果小文件直接导入仍遇到 `SpacetimeDB HTTP 413: Failed to buffer the request body: length limit exceeded`,也会自动退回分片流程。可通过 `--chunk-size ` 或环境变量 `GENARRATIVE_SPACETIME_MIGRATION_CHUNK_SIZE` 调小单片大小。 + 导入模式: - 默认严格追加:不清空目标表,逐行插入;遇到主键或唯一约束冲突时失败并回滚,适合确认目标库没有同表旧数据时使用。 @@ -252,8 +271,9 @@ node scripts/spacetime-import-migration-json.mjs \ 1. `POST /v1/identity` 创建临时 Web API identity/token。 2. 使用当前机器 `spacetime` CLI 登录态调用 `authorize_database_migration_operator`,授权这个临时 identity。 -3. 使用 `Authorization: Bearer <临时 token>` 调用 `import_database_migration_from_file`,把完整迁移 JSON 放在 HTTP body 中。 -4. 导入请求结束后,脚本会用同一个临时 Web API token 调用 `revoke_database_migration_operator`,撤销该临时 identity。 +3. 使用 `Authorization: Bearer <临时 token>` 导入迁移 JSON。文件不超过 `--chunk-size` 时直接调用 `import_database_migration_from_file`;超过阈值或直接导入触发 HTTP 413 时,先逐片调用 `put_database_migration_import_chunk`,再调用 `import_database_migration_from_chunks` 或 `import_database_migration_incremental_from_chunks`。 +4. 分片上传或提交失败时,脚本会尽量调用 `clear_database_migration_import_chunks` 清理临时分片。 +5. 导入请求结束后,脚本会用同一个临时 Web API token 调用 `revoke_database_migration_operator`,撤销该临时 identity。 所有直接访问 SpacetimeDB Web API 的 POST 请求必须显式发送 `Content-Type: application/json`。部分 SpacetimeDB 版本不会接受省略 content type 或附带非预期 media type 的请求,即使 body 本身是合法 JSON,也会返回 `HTTP 415`。 @@ -265,6 +285,8 @@ node scripts/spacetime-import-migration-json.mjs \ `--dry-run` 不会模拟目标库主键或唯一约束冲突,因此增量模式的 `skipped_row_count` 只有真实导入时才准确。 +如果 Jenkins 或 SpacetimeDB 返回 `HTTP 413`,优先降低导入流水线的 `CHUNK_SIZE`,例如 `262144`。该参数只影响上传到 procedure 的单片 request body,不改变迁移 JSON 的表范围和导入语义。 + 不要在只想追加数据时使用 `--replace-existing`。该参数会先删除覆盖范围内的目标表旧数据,再插入迁移文件中的数据;如果源文件不是完整快照,会造成目标表数据丢失。 如需分批迁移,可用逗号分隔表名: @@ -303,6 +325,6 @@ node scripts/spacetime-export-migration-json.mjs \ ## 风险与限制 -迁移 JSON 作为 procedure 返回值和 HTTP request body 传递,会受 SpacetimeDB 调用响应体、请求体以及中间代理大小限制。数据量较大时,先按 `include_tables` 分批迁移;若单表本身过大,再补充分片 procedure,而不是恢复 HTTP 文件桥。 +迁移 JSON 作为 procedure 返回值和 HTTP request body 传递,会受 SpacetimeDB 调用响应体、请求体以及中间代理大小限制。导入端已经内置分片上传来规避 `HTTP 413` 请求体限制;如果导出响应本身过大,仍需先按 `include_tables` 分批导出。 `spacetime call` 在 PowerShell 中手写 JSON 容易被剥掉双引号。导入大文件时也不能把完整 JSON 放进命令行参数,否则 Linux 会在启动子进程时返回 `spawn E2BIG`。推荐使用仓库里的 Node 脚本,由脚本直接走 Web API request body,避免 shell 二次处理和命令行长度限制。 diff --git a/docs/technical/SPACETIMEDB_TABLE_CATALOG.md b/docs/technical/SPACETIMEDB_TABLE_CATALOG.md index 8474f0a6..344597fd 100644 --- a/docs/technical/SPACETIMEDB_TABLE_CATALOG.md +++ b/docs/technical/SPACETIMEDB_TABLE_CATALOG.md @@ -30,6 +30,22 @@ spacetime sql "SELECT * FROM custom_world_gallery_entry" | 大鱼吃小鱼 | `big_fish_creation_session`, `big_fish_agent_message`, `big_fish_asset_slot`, `big_fish_runtime_run` | | 资产 | `asset_object`, `asset_entity_binding` | | AI 任务 | `ai_task`, `ai_task_stage`, `ai_text_chunk`, `ai_result_reference` | +| 运维迁移 | `database_migration_operator`, `database_migration_import_chunk` | + +## 运维迁移表 + +### `database_migration_operator` + +- 作用:迁移操作员白名单,控制导出、导入、授权和撤销迁移 procedure 的调用权限。 +- 结构:`operator_identity PK: Identity`, `created_at: Timestamp`, `created_by: Identity`, `note: String`。 +- 索引:主键 `operator_identity`。 + +### `database_migration_import_chunk` + +- 作用:大迁移 JSON 分片导入的私有临时表,用于规避单次 HTTP request body 过大导致的 `HTTP 413`;提交成功后由导入 procedure 自动清理,失败时由脚本尽量清理。 +- 结构:`chunk_key PK: String`, `upload_id: String`, `chunk_index: u32`, `chunk_count: u32`, `operator_identity: Identity`, `created_at: Timestamp`, `chunk: String`。 +- 索引:主键 `chunk_key`,`upload_id`。 +- 迁移边界:不加入迁移白名单,不导出到业务迁移 JSON。 ## 认证表 diff --git a/jenkins/Jenkinsfile.database-import b/jenkins/Jenkinsfile.database-import index df93bb96..6ab8b74b 100644 --- a/jenkins/Jenkinsfile.database-import +++ b/jenkins/Jenkinsfile.database-import @@ -16,6 +16,7 @@ pipeline { string(name: 'ROOT_DIR', defaultValue: '', description: 'spacetime CLI root-dir,可选,优先于 DEPLOY_DIRECTORY') string(name: 'INPUT_FILE', defaultValue: '', description: '必填,迁移 JSON 文件路径,相对源码根目录或绝对路径') string(name: 'INCLUDE_TABLES', defaultValue: '', description: '可选,逗号分隔的表名白名单') + string(name: 'CHUNK_SIZE', defaultValue: '524288', description: '迁移 JSON 分片大小,默认 512KiB,用于规避 SpacetimeDB HTTP 413') booleanParam(name: 'DRY_RUN', defaultValue: true, description: '仅校验导入,不写入数据') booleanParam(name: 'INCREMENTAL', defaultValue: true, description: '增量导入,跳过已存在或冲突的行') booleanParam(name: 'REPLACE_EXISTING', defaultValue: false, description: '覆盖本次文件内涉及的表,不可与 INCREMENTAL 同时启用') @@ -80,6 +81,9 @@ pipeline { if [[ -n "${params.INCLUDE_TABLES}" ]]; then args+=(--include "${params.INCLUDE_TABLES}") fi + if [[ -n "${params.CHUNK_SIZE}" ]]; then + args+=(--chunk-size "${params.CHUNK_SIZE}") + fi if [[ "${params.DRY_RUN}" == "true" ]]; then args+=(--dry-run) fi diff --git a/scripts/spacetime-import-migration-json.mjs b/scripts/spacetime-import-migration-json.mjs index 0f14c12e..b3ba84ad 100644 --- a/scripts/spacetime-import-migration-json.mjs +++ b/scripts/spacetime-import-migration-json.mjs @@ -1,5 +1,6 @@ #!/usr/bin/env node +import { randomUUID } from 'node:crypto'; import { readFile } from 'node:fs/promises'; import path from 'node:path'; import { @@ -11,6 +12,8 @@ import { parseArgs, } from './spacetime-migration-common.mjs'; +const DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE = 512 * 1024; + try { const options = parseArgs(process.argv.slice(2)); if (!options.in) { @@ -30,7 +33,7 @@ try { const webOptions = await prepareWebImportOptions(options); let result; try { - result = await importMigrationJsonDirect(webOptions, migrationJson); + result = await importMigrationJsonWithFallback(webOptions, migrationJson); } finally { await revokeTemporaryWebIdentity(webOptions); } @@ -86,6 +89,25 @@ async function prepareWebImportOptions(options) { }; } +async function importMigrationJsonWithFallback(options, migrationJson) { + const chunkSize = resolveChunkSize(options); + if (Buffer.byteLength(migrationJson, 'utf8') > chunkSize) { + return importMigrationJsonChunked(options, migrationJson, chunkSize); + } + + try { + return await importMigrationJsonDirect(options, migrationJson); + } catch (error) { + if (!isRequestBodyTooLargeError(error)) { + throw error; + } + console.warn( + `[spacetime:migration:import] 直接导入触发 HTTP 413,改用 ${chunkSize} bytes 分片上传。`, + ); + return importMigrationJsonChunked(options, migrationJson, chunkSize); + } +} + async function importMigrationJsonDirect(options, migrationJson) { const includeTables = resolveImportIncludeTables(options, migrationJson); const procedureName = @@ -108,6 +130,60 @@ async function importMigrationJsonDirect(options, migrationJson) { return callSpacetimeProcedure(options, procedureName, input); } +async function importMigrationJsonChunked(options, migrationJson, chunkSize) { + const includeTables = resolveImportIncludeTables(options, migrationJson); + const procedureName = + options.incremental === true + ? 'import_database_migration_incremental_from_chunks' + : 'import_database_migration_from_chunks'; + const uploadId = `migration-${Date.now()}-${randomUUID()}`; + const chunks = splitStringByUtf8Bytes(migrationJson, chunkSize); + console.log( + `[spacetime:migration:import] 使用分片导入: upload_id=${uploadId}, chunks=${chunks.length}, chunk_size=${chunkSize}`, + ); + if (options.replaceExisting === true) { + console.log( + `[spacetime:migration:import] replace-existing 仅覆盖本次文件内的表: ${includeTables.join(', ') || '无'}`, + ); + } else if (options.incremental === true) { + console.log(`[spacetime:migration:import] 使用增量模式,已存在或冲突的行会跳过`); + } + + let committed = false; + try { + for (let index = 0; index < chunks.length; index += 1) { + const chunkResult = await callSpacetimeProcedure( + options, + 'put_database_migration_import_chunk', + { + upload_id: uploadId, + chunk_index: index, + chunk_count: chunks.length, + chunk: chunks[index], + }, + ); + ensureProcedureOk(chunkResult); + console.log( + `[spacetime:migration:import] 已上传迁移分片 ${index + 1}/${chunks.length}`, + ); + } + + const result = await callSpacetimeProcedure(options, procedureName, { + upload_id: uploadId, + include_tables: includeTables, + replace_existing: options.replaceExisting === true, + dry_run: options.dryRun === true, + }); + ensureProcedureOk(result); + committed = true; + return result; + } finally { + if (!committed) { + await clearMigrationChunksBestEffort(options, uploadId); + } + } +} + function resolveImportIncludeTables(options, migrationJson) { if (options.replaceExisting !== true) { return options.includeTables; @@ -152,6 +228,63 @@ function readMigrationTableNames(migrationJson) { return tableNames; } +function resolveChunkSize(options) { + const chunkSize = options.chunkSize || DEFAULT_MIGRATION_IMPORT_CHUNK_SIZE; + if (chunkSize > 1024 * 1024) { + throw new Error('--chunk-size 不能超过 1048576,避免触发迁移分片 procedure 单片限制。'); + } + return chunkSize; +} + +function splitStringByUtf8Bytes(value, maxBytes) { + const chunks = []; + let current = ''; + let currentBytes = 0; + for (const character of value) { + const characterBytes = Buffer.byteLength(character, 'utf8'); + if (characterBytes > maxBytes) { + throw new Error(`单个字符超过 chunk-size,当前 chunk-size: ${maxBytes}`); + } + if (currentBytes + characterBytes > maxBytes && current) { + chunks.push(current); + current = ''; + currentBytes = 0; + } + current += character; + currentBytes += characterBytes; + } + if (current) { + chunks.push(current); + } + return chunks; +} + +function isRequestBodyTooLargeError(error) { + const message = error instanceof Error ? error.message : String(error); + return ( + message.includes('HTTP 413') || + message.toLowerCase().includes('length limit exceeded') + ); +} + +async function clearMigrationChunksBestEffort(options, uploadId) { + try { + const result = await callSpacetimeProcedure( + options, + 'clear_database_migration_import_chunks', + { upload_id: uploadId }, + ); + ensureProcedureOk(result); + console.warn(`[spacetime:migration:import] 已清理失败导入的临时分片: ${uploadId}`); + } catch (error) { + console.warn( + `[spacetime:migration:import] 清理临时迁移分片失败: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + } +} + async function revokeTemporaryWebIdentity(options) { if (!options.temporaryWebIdentity) { return; diff --git a/scripts/spacetime-migration-common.mjs b/scripts/spacetime-migration-common.mjs index 99e93882..091f3ebe 100644 --- a/scripts/spacetime-migration-common.mjs +++ b/scripts/spacetime-migration-common.mjs @@ -4,6 +4,10 @@ import path from 'node:path'; export function parseArgs(argv) { const options = { + chunkSize: parseOptionalPositiveInteger( + process.env.GENARRATIVE_SPACETIME_MIGRATION_CHUNK_SIZE, + 'GENARRATIVE_SPACETIME_MIGRATION_CHUNK_SIZE', + ), database: process.env.GENARRATIVE_SPACETIME_MAINCLOUD_DATABASE || process.env.GENARRATIVE_SPACETIME_DATABASE || @@ -48,6 +52,8 @@ export function parseArgs(argv) { options.token = readValue(arg); } else if (arg === '--bootstrap-secret') { options.bootstrapSecret = readValue(arg); + } else if (arg === '--chunk-size') { + options.chunkSize = parsePositiveInteger(readValue(arg), arg); } else if (arg === '--operator-identity') { options.operatorIdentity = readValue(arg); } else if (arg === '--note') { @@ -81,6 +87,25 @@ export function parseArgs(argv) { return options; } +export function parsePositiveInteger(value, name) { + if (!/^[1-9][0-9]*$/u.test(String(value).trim())) { + throw new Error(`${name} 必须是正整数。`); + } + + const parsed = Number.parseInt(String(value).trim(), 10); + if (!Number.isSafeInteger(parsed)) { + throw new Error(`${name} 超出安全整数范围。`); + } + return parsed; +} + +function parseOptionalPositiveInteger(value, name) { + if (!value) { + return 0; + } + return parsePositiveInteger(value, name); +} + export function buildSpacetimeCallArgs(options, procedureName, input) { if (!options.database) { throw new Error('必须传入 --database。'); diff --git a/server-rs/crates/spacetime-module/src/migration.rs b/server-rs/crates/spacetime-module/src/migration.rs index 89b22121..2cd8dea2 100644 --- a/server-rs/crates/spacetime-module/src/migration.rs +++ b/server-rs/crates/spacetime-module/src/migration.rs @@ -10,6 +10,8 @@ use crate::puzzle::{ const MIGRATION_SCHEMA_VERSION: u32 = 1; const MIGRATION_MAX_TABLE_NAME_LEN: usize = 96; +const MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN: usize = 128; +const MIGRATION_MAX_IMPORT_CHUNK_BYTES: usize = 1024 * 1024; const MIGRATION_MAX_OPERATOR_NOTE_CHARS: usize = 160; const MIGRATION_MIN_BOOTSTRAP_SECRET_LEN: usize = 16; const MIGRATION_BOOTSTRAP_SECRET: Option<&str> = @@ -24,6 +26,21 @@ pub struct DatabaseMigrationOperator { pub note: String, } +#[spacetimedb::table( + accessor = database_migration_import_chunk, + index(accessor = by_database_migration_import_upload, btree(columns = [upload_id])) +)] +pub struct DatabaseMigrationImportChunk { + #[primary_key] + pub chunk_key: String, + pub upload_id: String, + pub chunk_index: u32, + pub chunk_count: u32, + pub operator_identity: Identity, + pub created_at: Timestamp, + pub chunk: String, +} + #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct DatabaseMigrationExportInput { pub include_tables: Vec, @@ -37,6 +54,27 @@ pub struct DatabaseMigrationImportInput { pub dry_run: bool, } +#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] +pub struct DatabaseMigrationImportChunkInput { + pub upload_id: String, + pub chunk_index: u32, + pub chunk_count: u32, + pub chunk: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] +pub struct DatabaseMigrationImportChunksInput { + pub upload_id: String, + pub include_tables: Vec, + pub replace_existing: bool, + pub dry_run: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] +pub struct DatabaseMigrationImportChunksClearInput { + pub upload_id: String, +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum DatabaseMigrationImportMode { Strict, @@ -321,6 +359,76 @@ pub fn import_database_migration_incremental_from_file( } } +// 大迁移 JSON 先按分片写入私有临时表,避免单次 HTTP request body 触发 SpacetimeDB 413。 +#[spacetimedb::procedure] +pub fn put_database_migration_import_chunk( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunkInput, +) -> DatabaseMigrationProcedureResult { + match put_database_migration_import_chunk_inner(ctx, input) { + Ok(()) => empty_database_migration_result(true, None), + Err(error) => empty_database_migration_result(false, Some(error)), + } +} + +// 分片提交保持与直接导入相同的严格追加语义;提交成功后清理临时分片。 +#[spacetimedb::procedure] +pub fn import_database_migration_from_chunks( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunksInput, +) -> DatabaseMigrationProcedureResult { + match import_database_migration_from_chunks_inner( + ctx, + input, + DatabaseMigrationImportMode::Strict, + ) { + Ok((stats, warnings)) => DatabaseMigrationProcedureResult { + ok: true, + schema_version: MIGRATION_SCHEMA_VERSION, + migration_json: None, + table_stats: stats, + warnings, + error_message: None, + }, + Err(error) => empty_database_migration_result(false, Some(error)), + } +} + +// 分片增量提交只插入目标库缺失的行;主键或唯一约束冲突的行会跳过。 +#[spacetimedb::procedure] +pub fn import_database_migration_incremental_from_chunks( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunksInput, +) -> DatabaseMigrationProcedureResult { + match import_database_migration_from_chunks_inner( + ctx, + input, + DatabaseMigrationImportMode::Incremental, + ) { + Ok((stats, warnings)) => DatabaseMigrationProcedureResult { + ok: true, + schema_version: MIGRATION_SCHEMA_VERSION, + migration_json: None, + table_stats: stats, + warnings, + error_message: None, + }, + Err(error) => empty_database_migration_result(false, Some(error)), + } +} + +// 调用方上传失败或提交失败时可显式清理同一 upload_id 的临时分片。 +#[spacetimedb::procedure] +pub fn clear_database_migration_import_chunks( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunksClearInput, +) -> DatabaseMigrationProcedureResult { + match clear_database_migration_import_chunks_inner(ctx, input) { + Ok(()) => empty_database_migration_result(true, None), + Err(error) => empty_database_migration_result(false, Some(error)), + } +} + fn export_database_migration_to_file_inner( ctx: &mut ProcedureContext, input: DatabaseMigrationExportInput, @@ -361,14 +469,7 @@ fn import_database_migration_from_file_inner( } ctx.try_with_tx(|tx| require_migration_operator(tx, caller))?; - let migration_file = serde_json::from_str::(&input.migration_json) - .map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?; - if migration_file.schema_version != MIGRATION_SCHEMA_VERSION { - return Err(format!( - "迁移文件 schema_version 不匹配,期望 {},实际 {}", - MIGRATION_SCHEMA_VERSION, migration_file.schema_version - )); - } + let migration_file = parse_migration_file(&input.migration_json)?; let (stats, warnings) = if input.dry_run { build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())? @@ -388,6 +489,158 @@ fn import_database_migration_from_file_inner( Ok((stats, warnings)) } +fn put_database_migration_import_chunk_inner( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunkInput, +) -> Result<(), String> { + let caller = ctx.sender(); + let upload_id = normalize_import_upload_id(&input.upload_id)?; + if input.chunk_count == 0 { + return Err("分片总数必须大于 0".to_string()); + } + if input.chunk_index >= input.chunk_count { + return Err(format!( + "分片序号越界: {} / {}", + input.chunk_index, input.chunk_count + )); + } + if input.chunk.is_empty() { + return Err("迁移 JSON 分片不能为空".to_string()); + } + if input.chunk.len() > MIGRATION_MAX_IMPORT_CHUNK_BYTES { + return Err(format!( + "迁移 JSON 分片过大,单片最多 {} bytes", + MIGRATION_MAX_IMPORT_CHUNK_BYTES + )); + } + + let chunk_key = build_import_chunk_key(&upload_id, input.chunk_index); + ctx.try_with_tx(|tx| { + require_migration_operator(tx, caller)?; + if let Some(existing) = tx + .db + .database_migration_import_chunk() + .chunk_key() + .find(&chunk_key) + { + if existing.operator_identity != caller { + return Err("同名迁移分片已由其他 identity 上传,已拒绝覆盖".to_string()); + } + tx.db + .database_migration_import_chunk() + .chunk_key() + .delete(&chunk_key); + } + tx.db + .database_migration_import_chunk() + .insert(DatabaseMigrationImportChunk { + chunk_key: chunk_key.clone(), + upload_id: upload_id.clone(), + chunk_index: input.chunk_index, + chunk_count: input.chunk_count, + operator_identity: caller, + created_at: tx.timestamp, + chunk: input.chunk.clone(), + }); + Ok(()) + })?; + + Ok(()) +} + +fn import_database_migration_from_chunks_inner( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunksInput, + import_mode: DatabaseMigrationImportMode, +) -> Result< + ( + Vec, + Vec, + ), + String, +> { + let caller = ctx.sender(); + let upload_id = normalize_import_upload_id(&input.upload_id)?; + let included_tables = normalize_include_tables(&input.include_tables)?; + if import_mode == DatabaseMigrationImportMode::Incremental && input.replace_existing { + return Err("增量导入不能同时启用 replace_existing".to_string()); + } + + let migration_json = ctx.try_with_tx(|tx| { + require_migration_operator(tx, caller)?; + read_database_migration_import_chunks(tx, &upload_id, caller) + })?; + let migration_file = parse_migration_file(&migration_json)?; + + let (stats, warnings) = if input.dry_run { + build_import_dry_run_stats(&migration_file.tables, included_tables.as_ref())? + } else { + ctx.try_with_tx(|tx| { + require_migration_operator(tx, caller)?; + apply_migration_file( + tx, + &migration_file, + included_tables.as_ref(), + input.replace_existing, + import_mode, + ) + })? + }; + + ctx.try_with_tx(|tx| { + require_migration_operator(tx, caller)?; + clear_database_migration_import_chunks_tx(tx, &upload_id); + Ok::<(), String>(()) + })?; + + Ok((stats, warnings)) +} + +fn clear_database_migration_import_chunks_inner( + ctx: &mut ProcedureContext, + input: DatabaseMigrationImportChunksClearInput, +) -> Result<(), String> { + let caller = ctx.sender(); + let upload_id = normalize_import_upload_id(&input.upload_id)?; + ctx.try_with_tx(|tx| { + require_migration_operator(tx, caller)?; + clear_database_migration_import_chunks_tx(tx, &upload_id); + Ok::<(), String>(()) + })?; + Ok(()) +} + +fn empty_database_migration_result( + ok: bool, + error_message: Option, +) -> DatabaseMigrationProcedureResult { + DatabaseMigrationProcedureResult { + ok, + schema_version: MIGRATION_SCHEMA_VERSION, + migration_json: None, + table_stats: Vec::new(), + warnings: Vec::new(), + error_message, + } +} + +fn parse_migration_file(migration_json: &str) -> Result { + if migration_json.trim().is_empty() { + return Err("migration_json 不能为空".to_string()); + } + + let migration_file = serde_json::from_str::(migration_json) + .map_err(|error| format!("迁移文件 JSON 解析失败: {error}"))?; + if migration_file.schema_version != MIGRATION_SCHEMA_VERSION { + return Err(format!( + "迁移文件 schema_version 不匹配,期望 {},实际 {}", + MIGRATION_SCHEMA_VERSION, migration_file.schema_version + )); + } + + Ok(migration_file) +} + fn authorize_database_migration_operator_inner( ctx: &mut ProcedureContext, input: DatabaseMigrationAuthorizeOperatorInput, @@ -529,6 +782,96 @@ fn normalize_migration_operator_note(input: &str) -> Result { Ok(note.to_string()) } +fn normalize_import_upload_id(input: &str) -> Result { + let upload_id = input.trim(); + if upload_id.is_empty() { + return Err("upload_id 不能为空".to_string()); + } + if upload_id.len() > MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN { + return Err(format!( + "upload_id 过长,最多 {} bytes", + MIGRATION_MAX_IMPORT_UPLOAD_ID_LEN + )); + } + if !upload_id + .chars() + .all(|character| character.is_ascii_alphanumeric() || matches!(character, '-' | '_')) + { + return Err("upload_id 只能使用 ASCII 字母、数字、短横线或下划线".to_string()); + } + Ok(upload_id.to_string()) +} + +fn build_import_chunk_key(upload_id: &str, chunk_index: u32) -> String { + format!("{upload_id}:{chunk_index:010}") +} + +fn read_database_migration_import_chunks( + ctx: &ReducerContext, + upload_id: &str, + caller: Identity, +) -> Result { + let mut chunks = ctx + .db + .database_migration_import_chunk() + .by_database_migration_import_upload() + .filter(upload_id) + .collect::>(); + if chunks.is_empty() { + return Err(format!("未找到迁移 JSON 分片: {upload_id}")); + } + if chunks.iter().any(|chunk| chunk.operator_identity != caller) { + return Err("迁移 JSON 分片包含其他 identity 上传的片段,已拒绝提交".to_string()); + } + + let chunk_count = chunks[0].chunk_count; + if chunk_count == 0 { + return Err("迁移 JSON 分片总数不合法".to_string()); + } + if chunks + .iter() + .any(|chunk| chunk.chunk_count != chunk_count || chunk.upload_id != upload_id) + { + return Err("迁移 JSON 分片总数不一致".to_string()); + } + if chunks.len() != chunk_count as usize { + return Err(format!( + "迁移 JSON 分片未上传完整,已收到 {} / {}", + chunks.len(), + chunk_count + )); + } + + chunks.sort_by_key(|chunk| chunk.chunk_index); + let mut expected_index = 0u32; + let mut migration_json = String::new(); + for chunk in chunks { + if chunk.chunk_index != expected_index { + return Err(format!("迁移 JSON 分片缺失序号: {expected_index}")); + } + migration_json.push_str(&chunk.chunk); + expected_index = expected_index.saturating_add(1); + } + + Ok(migration_json) +} + +fn clear_database_migration_import_chunks_tx(ctx: &ReducerContext, upload_id: &str) { + let chunk_keys = ctx + .db + .database_migration_import_chunk() + .by_database_migration_import_upload() + .filter(upload_id) + .map(|chunk| chunk.chunk_key) + .collect::>(); + for chunk_key in chunk_keys { + ctx.db + .database_migration_import_chunk() + .chunk_key() + .delete(&chunk_key); + } +} + fn normalize_include_tables(input: &[String]) -> Result>, String> { if input.is_empty() { return Ok(None);