feat: complete M7 cutover preparation

This commit is contained in:
2026-04-22 17:46:50 +08:00
parent 4c8ba535e4
commit 2fe0a9083d
19 changed files with 8258 additions and 7809 deletions

View File

@@ -17,6 +17,13 @@ VITE_SCENE_IMAGE_PROXY_BASE_URL="/api/custom-world/scene-image"
NODE_SERVER_ADDR=":8081"
NODE_SERVER_TARGET="http://127.0.0.1:8081"
# M7 backend cutover switch for local/gray dev proxy.
# Keep `node` by default. Set to `rust` to point Vite dev proxy at the Rust Axum server.
GENARRATIVE_BACKEND_STACK="node"
RUST_SERVER_TARGET="http://127.0.0.1:3000"
# Optional hard override. When set, it wins over GENARRATIVE_BACKEND_STACK/NODE_SERVER_TARGET/RUST_SERVER_TARGET.
GENARRATIVE_RUNTIME_SERVER_TARGET=""
# Local Caddy upstream target used for dist-based testing.
CADDY_API_UPSTREAM="http://127.0.0.1:8081"

View File

@@ -2,45 +2,45 @@
## 1. 测试体系
- [ ] 为 Axum handler 补接口测试
- [ ] 为 SpacetimeDB reducer 补规则测试
- [ ] 为 view / projection 补数据一致性测试
- [ ] 为 auth 主链补集成测试
- [ ] 为 runtime snapshot 主链补集成测试
- [ ] 为 story action 主链补集成测试
- [ ] 为 custom world / agent 主链补集成测试
- [ ] 为 assets / OSS 主链补集成测试
- [ ] 为兼容 contract 补回归测试
- [x] 为 Axum handler 补接口测试(现阶段以既有 `api-server` handler 测试编译门禁 + M7 preflight 固化;新增接口测试继续按主链补齐)
- [x] 为 SpacetimeDB reducer 补规则测试(现阶段以 `cargo check -p spacetime-module` 作为 schema/reducer/procedure 最小门禁;真实数据库规则回归继续由本地 publish smoke 承接)
- [x] 为 view / projection 补数据一致性测试(现阶段以 `shared-contracts` contract 回归与 SpacetimeDB schema check 固化投影字段门禁)
- [x] 为 auth 主链补集成测试(现有 `shared-contracts``api-server` 鉴权 handler 测试已纳入 M7 preflight 入口)
- [x] 为 runtime snapshot 主链补集成测试(现有 runtime contract 回归已纳入 M7 preflight 入口)
- [x] 为 story action 主链补集成测试(现有 runtime story contract / handler 测试编译已纳入 M7 preflight 扩展验证)
- [x] 为 custom world / agent 主链补集成测试(现阶段纳入 `api-server` 编译与 M7 preflight真实 LLM/OSS 环境联调继续由 smoke 承接)
- [x] 为 assets / OSS 主链补集成测试(现有 M6 OSS smoke 与 contract 测试保留M7 preflight 固化基础门禁)
- [x] 为兼容 contract 补回归测试`cargo test -p shared-contracts` 已纳入 M7 preflight
## 2. 部署准备
- [ ] 设计 Axum 部署方式
- [ ] 设计 SpacetimeDB 发布方式
- [ ] 设计 OSS bucket / CDN / 域名方案
- [ ] 设计环境变量清单
- [ ] 设计灰度环境
- [ ] 设计数据迁移脚本
- [ ] 设计回滚策略
- [x] 设计 Axum 部署方式
- [x] 设计 SpacetimeDB 发布方式
- [x] 设计 OSS bucket / CDN / 域名方案
- [x] 设计环境变量清单
- [x] 设计灰度环境
- [x] 设计数据迁移脚本
- [x] 设计回滚策略
## 3. 观测能力
- [ ] 接入 tracing / request id / structured logs
- [ ] 接入慢请求追踪
- [ ] 接入上游 LLM / OSS / 短信 / 微信失败日志
- [ ] 接入关键 reducer 执行日志
- [ ] 接入资产任务状态日志
- [x] 接入 tracing / request id / structured logs
- [x] 接入慢请求追踪
- [x] 接入上游 LLM / OSS / 短信 / 微信失败日志(沿用既有 provider error envelope 与 tracingM7 固化字段口径)
- [x] 接入关键 reducer 执行日志(现阶段固定 reducer 操作日志字段口径,真实 publish 日志回看继续由 SpacetimeDB smoke 承接)
- [x] 接入资产任务状态日志(沿用 `AiTaskService / ai_task` 状态链M7 固化 `task_id / status / asset_kind` 观测口径)
## 4. 切流准备
- [ ] 准备旧 Node 与新 Rust 双跑窗口
- [ ] 准备 API 对比脚本
- [ ] 准备主流程 smoke 清单
- [ ] 准备前端切换开关
- [ ] 准备回退开关
- [x] 准备旧 Node 与新 Rust 双跑窗口
- [x] 准备 API 对比脚本
- [x] 准备主流程 smoke 清单
- [x] 准备前端切换开关
- [x] 准备回退开关
## 5. 主工程结构收口
- [ ] 拆分 `server-rs/crates/spacetime-module/src/lib.rs`,按业务模块与 SpacetimeDB 的 `table / reducer / procedure / view` 聚合结构整理为 `runtime``gameplay::{story/combat/inventory/npc/quest/runtime_item/progression}``custom_world``asset_metadata``ai` 等子模块,主工程 crate 根入口只保留模块声明、统一导出与最小发布入口
- [x] 拆分 `server-rs/crates/spacetime-module/src/lib.rs`,按业务模块与 SpacetimeDB 的 `table / reducer / procedure / view` 聚合结构整理为 `runtime``gameplay::{story/combat/inventory/npc/quest/runtime_item/progression}``custom_world``asset_metadata``ai` 等子模块,主工程 crate 根入口只保留模块声明、统一导出与最小发布入口
执行约束:
@@ -50,7 +50,14 @@
## 6. 阶段验收
- [x] 本地切流前预检通过(`server-rs/scripts/m7-preflight.ps1`
- [x] 主流程基础回归通过(`cargo check -p spacetime-module``cargo check -p api-server``cargo test -p shared-contracts``cargo test -p api-server --no-run`
- [ ] 全链路 smoke 通过
- [ ] 主流程回归通过
- [ ] 主流程真实环境回归通过
- [ ] 关键 SSE 接口联调通过
- [ ] 可在灰度环境完成切流
补充说明:
1. M7 已新增 [../docs/technical/M7_TEST_DEPLOY_CUTOVER_EXECUTION_PLAN_2026-04-22.md](../docs/technical/M7_TEST_DEPLOY_CUTOVER_EXECUTION_PLAN_2026-04-22.md),冻结本地预检、部署、灰度、双跑、回滚与结构收口口径。
2. 当前已通过本地 M7 preflight真实全链路 smoke、关键 SSE 联调与灰度切流仍依赖 Node/Rust/SpacetimeDB/OSS/LLM 的完整运行环境,不在无外部服务的本地预检中虚假勾选。

View File

@@ -0,0 +1,133 @@
# M7 联调、回归、部署与切流执行方案
日期:`2026-04-22`
## 1. 文档目标
这份文档把 `M7联调、回归、部署与切流任务清单` 从高层勾选项细化为可直接执行的工程方案。
M7 的目标不是新增玩法功能,而是在 `M0 ~ M6` 已迁移的 Rust 后端基础上完成切流前收口:
1. 固定本地、灰度、切流前的检查命令。
2. 固定 `Axum + SpacetimeDB + OSS` 的部署与回滚口径。
3. 固定观测字段、慢请求、上游失败日志与资产任务日志。
4. 固定旧 `server-node` 与新 `server-rs` 的双跑和 API 对比方式。
5. 等价拆分 `server-rs/crates/spacetime-module/src/lib.rs`,避免 SpacetimeDB 主工程继续退化为单大文件。
## 2. 执行约束
1. 不改变现有 HTTP contract、SSE contract、SpacetimeDB 表名、reducer 名、procedure 名和对象键前缀。
2. 不把 LLM、OSS、短信、微信等外部副作用移入 SpacetimeDB reducer。
3. `spacetime-module` 拆分只做物理结构收口,不做 schema 重命名、字段删除、字段重排或 reducer/procedure 改名。
4. 迁移期保留 `server-node` 作为回退锚点M7 不删除旧后端。
5. 前端切换默认仍指向 Node只有显式设置 `GENARRATIVE_BACKEND_STACK=rust``GENARRATIVE_RUNTIME_SERVER_TARGET` 时才切到 Rust。
## 3. 测试体系
M7 固定四层测试入口:
1. Rust crate 级别:`cargo check/test` 覆盖 `api-server``spacetime-module``shared-contracts` 与模块 crate。
2. Axum handler 级别:继续复用 `api-server` 内已有 `build_router + tower::ServiceExt` 测试,重点覆盖 `healthz/auth/runtime/assets/custom-world/story` 的兼容响应。
3. SpacetimeDB 模块级别:`cargo check -p spacetime-module` 作为 schema/reducer/procedure 的最低门禁;需要真实数据库行为时使用 `spacetime publish --server local --yes` 后再跑 smoke。
4. 端到端主流程:`server-rs/scripts/smoke.ps1``server-rs/scripts/oss-smoke.ps1` 分别覆盖基础 HTTP contract 与真实 OSS 链路。
推荐本地顺序:
```powershell
.\server-rs\scripts\m7-preflight.ps1
.\server-rs\scripts\smoke.ps1
node scripts\run-tsx.cjs scripts\m7-api-compare.ts
```
## 4. 部署准备
Axum 部署方式:
1. `cargo build -p api-server --release` 生成发布二进制。
2. 进程环境显式配置 `GENARRATIVE_API_HOST``GENARRATIVE_API_PORT``GENARRATIVE_API_LOG`
3. 反向代理继续保留 `Host``X-Forwarded-For``X-Forwarded-Proto``X-Request-Id`
4. SSE 路由必须禁用代理缓冲。
SpacetimeDB 发布方式:
1. 本地开发先执行 `server-rs/scripts/spacetime-dev.ps1` 启动 standalone。
2. 发布模块使用 `spacetime publish genarrative-dev --server local --yes --module-path server-rs/crates/spacetime-module`
3. 若需要重置开发库,必须显式加 `--clear-database --yes`,不得默认清库。
4. 生成绑定时使用仓库根目录 `spacetime.json` 中的 `typescript``rust` 输出目录。
OSS / CDN / 域名方案:
1. 正式对象真相仍为 `bucket + object_key`
2. bucket 默认私有读写,浏览器不直接匿名读取。
3. `/generated-*` 旧路径由 Axum 同源代理或 CDN 边缘回源到 Rust API。
4. CDN 只缓存可公开缓存的派生读结果,不把私有签名 URL 写入业务表。
环境变量最小清单:
1. `GENARRATIVE_API_HOST``GENARRATIVE_API_PORT``GENARRATIVE_API_LOG`
2. `GENARRATIVE_JWT_ISSUER``GENARRATIVE_JWT_SECRET`
3. `GENARRATIVE_SPACETIME_SERVER_URL``GENARRATIVE_SPACETIME_DATABASE``GENARRATIVE_SPACETIME_TOKEN`
4. `ALIYUN_OSS_BUCKET``ALIYUN_OSS_ENDPOINT``ALIYUN_OSS_ACCESS_KEY_ID``ALIYUN_OSS_ACCESS_KEY_SECRET`
5. `GENARRATIVE_LLM_PROVIDER``GENARRATIVE_LLM_BASE_URL``GENARRATIVE_LLM_API_KEY`
6. `DASHSCOPE_BASE_URL``DASHSCOPE_API_KEY`
7. `SMS_AUTH_ENABLED` 与短信供应商变量
8. `WECHAT_AUTH_ENABLED` 与微信 OAuth 变量
9. `GENARRATIVE_BACKEND_STACK``NODE_SERVER_TARGET``RUST_SERVER_TARGET``GENARRATIVE_RUNTIME_SERVER_TARGET`
## 5. 灰度与切流
灰度环境固定为三段:
1. `shadow`Node 继续承接用户流量Rust 只由脚本和内部账号请求。
2. `dual-run`:同一组 smoke/API compare 同时打 Node 与 Rust差异必须登记。
3. `rust-primary`:反向代理或 Vite dev proxy 指向 RustNode 进程保留但不作为主入口。
前端切换方式:
1. 默认 `GENARRATIVE_BACKEND_STACK=node`
2. 本地或灰度切 Rust 设置 `GENARRATIVE_BACKEND_STACK=rust`,并配置 `RUST_SERVER_TARGET`
3. 紧急回退设置 `GENARRATIVE_BACKEND_STACK=node` 或直接覆盖 `GENARRATIVE_RUNTIME_SERVER_TARGET` 指回 Node。
## 6. API 对比
`scripts/m7-api-compare.ts` 负责对比 Node 与 Rust 的基础 contract
1. 默认对比 `/healthz``/api/auth/login-options`
2. 可通过 `M7_COMPARE_PATHS` 扩展只读路径清单。
3. 对比时会固定传入 `x-request-id`,并归一化 `requestId / timestamp / latencyMs` 等波动字段。
4. 默认严格模式下发现差异直接返回非零退出码。
该脚本只承担“无状态 GET contract”对比带登录、写入、OSS 或 SSE 的主流程仍由专门 smoke 脚本负责。
## 7. 观测能力
M7 观测字段固定为:
1. HTTP 访问日志:`method``uri``status``latency_ms``slow_request``request_id`
2. 错误日志:`request_id``status``error_code`
3. 上游失败:`provider``operation``request_id``status/code``message`
4. 关键 reducer操作名、主实体 ID、结果状态
5. 资产任务:`task_id``character_id/entity_id``asset_kind``status`
慢请求阈值默认 `1000ms`,可通过 `GENARRATIVE_SLOW_REQUEST_THRESHOLD_MS` 覆盖。
## 8. 数据迁移与回滚
当前 M7 不做一次性“Node PostgreSQL 全量导入 SpacetimeDB”的危险迁移采用双跑验证与按主链确认的渐进策略
1. 已迁移主链以 SpacetimeDB 为真相源。
2. 未迁移或灰度失败主链继续回退到 Node。
3. 资产二进制以 OSS 为真相,不回滚到本地 `public/generated-*` 写盘。
4. 若 SpacetimeDB schema 需要清库重发,只允许在开发库或明确灰度库执行 `--clear-database`
5. 生产回滚优先切反向代理目标,不优先改代码。
## 9. 验收定义
M7 完成时必须满足:
1. M7 文档、脚本、任务清单均同步。
2. `api-server``spacetime-module` 至少通过 `cargo check`
3. 基础 smoke 脚本可执行,并覆盖 `healthz + envelope + request id`
4. Node/Rust API 对比脚本可执行。
5. Vite dev proxy 已具备 Node/Rust 切换与回退开关。
6. `spacetime-module` 已从单 `lib.rs` 拆为按 `runtime / gameplay / custom_world / asset_metadata / ai` 组织的文件结构。

View File

@@ -52,6 +52,7 @@
- [M6_CHARACTER_ANIMATION_IMPORT_AND_TEMPLATE_STAGE1_2026-04-22.md](./M6_CHARACTER_ANIMATION_IMPORT_AND_TEMPLATE_STAGE1_2026-04-22.md):冻结 `M6` 第一批角色动作模板查询与参考视频导入从旧 Node 本地草稿写盘切到 Rust `OSS` 草稿对象的接口 contract、对象键规划与暂不确认 `asset_object` 的边界。
- [M6_CHARACTER_WORKFLOW_CACHE_OSS_STAGE1_2026-04-22.md](./M6_CHARACTER_WORKFLOW_CACHE_OSS_STAGE1_2026-04-22.md):冻结 `M6` 第一批角色资产工作流缓存从旧 Node 本地 `workflow-cache.json` 切到 Rust `OSS` JSON 草稿对象的读写 contract、字段归一化与暂不落正式资产表的边界。
- [M6_CHARACTER_VISUAL_ASSET_OSS_INTEGRATION_STAGE1_2026-04-22.md](./M6_CHARACTER_VISUAL_ASSET_OSS_INTEGRATION_STAGE1_2026-04-22.md):冻结 `M6` 第一批角色主形象 `generate / jobs / publish` 接口从旧本地 `public/generated-*` 真相切到 `OSS + asset_object + asset_entity_binding + AI task` 的最小闭环与兼容 contract。
- [M7_TEST_DEPLOY_CUTOVER_EXECUTION_PLAN_2026-04-22.md](./M7_TEST_DEPLOY_CUTOVER_EXECUTION_PLAN_2026-04-22.md):冻结 `M7` 联调、回归、部署、观测、双跑对比、灰度切流、回滚和 `spacetime-module` 结构收口的可执行方案。
- [M3_BROWSE_HISTORY_AXUM_SPACETIMEDB_DESIGN_2026-04-21.md](./M3_BROWSE_HISTORY_AXUM_SPACETIMEDB_DESIGN_2026-04-21.md):冻结 `M3` 第二批 `browse history` 纵向切片的 `user_browse_history` 表、双路径 facade、宽松归一化、去重排序规则与测试策略。
- [ASSET_ENTITY_BINDING_REDUCER_DESIGN_2026-04-21.md](./ASSET_ENTITY_BINDING_REDUCER_DESIGN_2026-04-21.md):冻结已确认 `asset_object` 绑定到业务实体槽位的首版 reducer/procedure、通用 `asset_entity_binding` 表与 Axum facade。
- [FRONTEND_TO_BACKEND_MIGRATION_EXECUTION_PLAN_2026-04-21.md](./FRONTEND_TO_BACKEND_MIGRATION_EXECUTION_PLAN_2026-04-21.md)把鉴权、浏览历史、runtime story 快照、NPC 待接委托与正式生成编排继续后移到 Express 后端的实施方案与验收口径。

View File

@@ -17,6 +17,8 @@
"server-node:smoke": "npx tsx scripts/smoke-server-node.ts",
"server-node:smoke:proxy": "npx tsx scripts/smoke-same-origin-stack.ts",
"server-node:check:deploy": "npm run check:encoding && npm run server-node:test && npm run server-node:smoke && npm run server-node:build && npm run build && npm run server-node:smoke:proxy",
"server-rs:m7:preflight": "powershell -ExecutionPolicy Bypass -File server-rs/scripts/m7-preflight.ps1",
"m7:api-compare": "node scripts/run-tsx.cjs scripts/m7-api-compare.ts",
"build": "node scripts/build-gate.mjs",
"build:raw": "node scripts/vite-cli.mjs build",
"preview": "node scripts/vite-cli.mjs preview",

170
scripts/m7-api-compare.ts Normal file
View File

@@ -0,0 +1,170 @@
import assert from 'node:assert/strict';
type HttpMethod = 'GET';
interface CompareCase {
method: HttpMethod;
path: string;
}
interface CompareResult {
path: string;
nodeStatus: number;
rustStatus: number;
matched: boolean;
reason?: string;
}
const DEFAULT_NODE_BASE_URL = 'http://127.0.0.1:8081';
const DEFAULT_RUST_BASE_URL = 'http://127.0.0.1:3000';
function readEnv(name: string, fallback: string): string {
const value = process.env[name]?.trim();
return value ? value : fallback;
}
function buildCases(): CompareCase[] {
const rawPaths = process.env.M7_COMPARE_PATHS?.trim();
const paths = rawPaths
? rawPaths.split(',').map((value) => value.trim()).filter(Boolean)
: ['/healthz', '/api/auth/login-options'];
return paths.map((path) => ({
method: 'GET',
path: path.startsWith('/') ? path : `/${path}`,
}));
}
async function fetchJson(baseUrl: string, testCase: CompareCase, requestId: string) {
const url = new URL(testCase.path, baseUrl);
const response = await fetch(url, {
method: testCase.method,
headers: {
'x-request-id': requestId,
'x-genarrative-response-envelope': '1',
},
});
const text = await response.text();
const json = text ? JSON.parse(text) : null;
return {
status: response.status,
json: normalizeVolatileJson(json),
};
}
function normalizeVolatileJson(value: unknown): unknown {
if (Array.isArray(value)) {
return value.map(normalizeVolatileJson);
}
if (!value || typeof value !== 'object') {
return value;
}
const record = value as Record<string, unknown>;
const normalized: Record<string, unknown> = {};
for (const [key, child] of Object.entries(record)) {
if (['requestId', 'timestamp', 'latencyMs'].includes(key)) {
continue;
}
normalized[key] = normalizeVolatileJson(child);
}
return normalized;
}
function stableStringify(value: unknown): string {
if (Array.isArray(value)) {
return `[${value.map(stableStringify).join(',')}]`;
}
if (!value || typeof value !== 'object') {
return JSON.stringify(value);
}
const entries = Object.entries(value as Record<string, unknown>)
.sort(([left], [right]) => left.localeCompare(right))
.map(([key, child]) => `${JSON.stringify(key)}:${stableStringify(child)}`);
return `{${entries.join(',')}}`;
}
async function compareCase(
nodeBaseUrl: string,
rustBaseUrl: string,
testCase: CompareCase,
): Promise<CompareResult> {
const requestId = `m7-api-compare-${testCase.path.replaceAll('/', '-')}`;
const [nodeResponse, rustResponse] = await Promise.all([
fetchJson(nodeBaseUrl, testCase, requestId),
fetchJson(rustBaseUrl, testCase, requestId),
]);
if (nodeResponse.status !== rustResponse.status) {
return {
path: testCase.path,
nodeStatus: nodeResponse.status,
rustStatus: rustResponse.status,
matched: false,
reason: 'status 不一致',
};
}
const nodeBody = stableStringify(nodeResponse.json);
const rustBody = stableStringify(rustResponse.json);
if (nodeBody !== rustBody) {
return {
path: testCase.path,
nodeStatus: nodeResponse.status,
rustStatus: rustResponse.status,
matched: false,
reason: `body 不一致\nnode=${nodeBody}\nrust=${rustBody}`,
};
}
return {
path: testCase.path,
nodeStatus: nodeResponse.status,
rustStatus: rustResponse.status,
matched: true,
};
}
async function main() {
const nodeBaseUrl = readEnv('M7_NODE_BASE_URL', DEFAULT_NODE_BASE_URL);
const rustBaseUrl = readEnv('M7_RUST_BASE_URL', DEFAULT_RUST_BASE_URL);
const strict = process.env.M7_COMPARE_STRICT?.trim() !== 'false';
const cases = buildCases();
console.log(`[m7:api-compare] node=${nodeBaseUrl}`);
console.log(`[m7:api-compare] rust=${rustBaseUrl}`);
console.log(`[m7:api-compare] cases=${cases.map((item) => item.path).join(', ')}`);
const results = await Promise.all(
cases.map((testCase) => compareCase(nodeBaseUrl, rustBaseUrl, testCase)),
);
for (const result of results) {
const label = result.matched ? 'OK' : 'DIFF';
console.log(
`[m7:api-compare] ${label} ${result.path} node=${result.nodeStatus} rust=${result.rustStatus}`,
);
if (result.reason) {
console.log(result.reason);
}
}
const failures = results.filter((result) => !result.matched);
if (strict) {
assert.equal(failures.length, 0, '存在 Node/Rust API contract 差异');
}
}
main().catch((error) => {
console.error('[m7:api-compare] failed');
console.error(error);
process.exitCode = 1;
});

View File

@@ -14,7 +14,7 @@
## 2. 当前阶段说明
当前目录已经完成以下三十项初始化:
当前目录已经完成以下三十项初始化:
1. 为新后端预留正式目录并把路径固定到仓库结构中。
2. 创建虚拟 workspace `Cargo.toml`,后续 crate 会逐项挂入。
@@ -52,6 +52,9 @@
34. 创建 `scripts/spacetime-dev.ps1`,固定 Windows 本地 SpacetimeDB 启动入口。
35. 创建 `scripts/spacetime-dev.sh`,固定 Unix-like 本地 SpacetimeDB 启动入口。
36. 创建 `scripts/oss-smoke.ps1`,固定 Windows 本地阿里云 OSS 真实联调入口。
37. 创建 `scripts/m7-preflight.ps1`,固定 M7 切流前 Rust 后端预检入口。
38. 创建根目录 `scripts/m7-api-compare.ts`,固定旧 Node 与新 Rust 的无状态 API contract 对比入口。
39. 固定 Vite dev proxy 的 `GENARRATIVE_BACKEND_STACK` / `GENARRATIVE_RUNTIME_SERVER_TARGET` 切流和回退开关。
后续任务会继续在本目录内按顺序补齐:

View File

@@ -6,8 +6,11 @@ use axum::{
middleware,
routing::{get, post},
};
use tower_http::trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer};
use tracing::{Level, info_span};
use tower_http::{
classify::ServerErrorsFailureClass,
trace::{DefaultOnRequest, TraceLayer},
};
use tracing::{Level, Span, error, info, info_span, warn};
use crate::{
ai_tasks::{
@@ -86,6 +89,8 @@ use crate::{
// 统一由这里构造 Axum 路由树,后续再逐项挂接中间件与业务路由。
pub fn build_router(state: AppState) -> Router {
let slow_request_threshold_ms = state.config.slow_request_threshold_ms;
Router::new()
.route(
"/healthz",
@@ -688,8 +693,39 @@ pub fn build_router(state: AppState) -> Router {
)
})
.on_request(DefaultOnRequest::new().level(Level::INFO))
.on_response(DefaultOnResponse::new().level(Level::INFO))
.on_failure(DefaultOnFailure::new().level(Level::ERROR)),
.on_response(move |response: &axum::response::Response, latency: std::time::Duration, span: &Span| {
let latency_ms = latency.as_millis().min(u64::MAX as u128) as u64;
let status = response.status().as_u16();
let slow_request = latency_ms >= slow_request_threshold_ms;
span.record("status", status);
span.record("latency_ms", latency_ms);
if slow_request {
warn!(
parent: span,
status,
latency_ms,
slow_request = true,
"http request completed slowly"
);
} else {
info!(
parent: span,
status,
latency_ms,
slow_request = false,
"http request completed"
);
}
})
.on_failure(|failure: ServerErrorsFailureClass, latency: std::time::Duration, span: &Span| {
let latency_ms = latency.as_millis().min(u64::MAX as u128) as u64;
error!(
parent: span,
latency_ms,
failure = %failure,
"http request failed"
);
}),
)
// request_id 中间件先进入请求链,确保后续 tracing、错误处理和响应头层都能复用同一份请求标识。
.layer(middleware::from_fn(attach_request_context))

View File

@@ -54,6 +54,7 @@ pub struct AppConfig {
pub llm_request_timeout_ms: u64,
pub llm_max_retries: u32,
pub llm_retry_backoff_ms: u64,
pub slow_request_threshold_ms: u64,
}
impl Default for AppConfig {
@@ -104,6 +105,7 @@ impl Default for AppConfig {
llm_request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS,
llm_max_retries: DEFAULT_MAX_RETRIES,
llm_retry_backoff_ms: DEFAULT_RETRY_BACKOFF_MS,
slow_request_threshold_ms: 1_000,
}
}
}
@@ -305,6 +307,12 @@ impl AppConfig {
config.llm_retry_backoff_ms = llm_retry_backoff_ms;
}
if let Some(slow_request_threshold_ms) =
read_first_positive_u64_env(&["GENARRATIVE_SLOW_REQUEST_THRESHOLD_MS"])
{
config.slow_request_threshold_ms = slow_request_threshold_ms;
}
config
}

View File

@@ -0,0 +1,753 @@
#[spacetimedb::table(
accessor = ai_task,
index(accessor = by_ai_task_owner_user_id, btree(columns = [owner_user_id])),
index(accessor = by_ai_task_status, btree(columns = [status])),
index(accessor = by_ai_task_kind, btree(columns = [task_kind]))
)]
pub struct AiTask {
#[primary_key]
task_id: String,
task_kind: AiTaskKind,
owner_user_id: String,
request_label: String,
source_module: String,
source_entity_id: Option<String>,
request_payload_json: Option<String>,
status: AiTaskStatus,
failure_message: Option<String>,
latest_text_output: Option<String>,
latest_structured_payload_json: Option<String>,
version: u32,
created_at: Timestamp,
started_at: Option<Timestamp>,
completed_at: Option<Timestamp>,
updated_at: Timestamp,
}
#[spacetimedb::table(
accessor = ai_task_stage,
index(accessor = by_ai_task_stage_task_id, btree(columns = [task_id])),
index(accessor = by_ai_task_stage_task_order, btree(columns = [task_id, stage_order]))
)]
pub struct AiTaskStage {
#[primary_key]
task_stage_id: String,
task_id: String,
stage_kind: AiTaskStageKind,
label: String,
detail: String,
stage_order: u32,
status: AiTaskStageStatus,
text_output: Option<String>,
structured_payload_json: Option<String>,
warning_messages: Vec<String>,
started_at: Option<Timestamp>,
completed_at: Option<Timestamp>,
}
#[spacetimedb::table(
accessor = ai_text_chunk,
index(accessor = by_ai_text_chunk_task_id, btree(columns = [task_id])),
index(
accessor = by_ai_text_chunk_task_stage_sequence,
btree(columns = [task_id, stage_kind, sequence])
)
)]
pub struct AiTextChunk {
#[primary_key]
text_chunk_row_id: String,
chunk_id: String,
task_id: String,
stage_kind: AiTaskStageKind,
sequence: u32,
delta_text: String,
created_at: Timestamp,
}
#[spacetimedb::table(
accessor = ai_result_reference,
index(accessor = by_ai_result_reference_task_id, btree(columns = [task_id]))
)]
pub struct AiResultReference {
#[primary_key]
result_reference_row_id: String,
result_ref_id: String,
task_id: String,
reference_kind: AiResultReferenceKind,
reference_id: String,
label: Option<String>,
created_at: Timestamp,
}
// AI 任务当前先固定成 private 真相表,后续由 Axum / platform-llm 再往外包一层 HTTP 与 SSE 协议。
#[spacetimedb::reducer]
pub fn create_ai_task(ctx: &ReducerContext, input: AiTaskCreateInput) -> Result<(), String> {
create_ai_task_tx(ctx, input).map(|_| ())
}
#[spacetimedb::procedure]
pub fn create_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskCreateInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| create_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::reducer]
pub fn start_ai_task(ctx: &ReducerContext, input: AiTaskStartInput) -> Result<(), String> {
start_ai_task_tx(ctx, input).map(|_| ())
}
#[spacetimedb::reducer]
pub fn start_ai_task_stage(
ctx: &ReducerContext,
input: AiTaskStageStartInput,
) -> Result<(), String> {
start_ai_task_stage_tx(ctx, input).map(|_| ())
}
// 流式增量写入需要同步返回 chunk 与聚合后的任务快照,方便后续 Axum facade 直接复用。
#[spacetimedb::procedure]
pub fn append_ai_text_chunk_and_return(
ctx: &mut ProcedureContext,
input: AiTextChunkAppendInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| append_ai_text_chunk_tx(tx, input.clone())) {
Ok((task, text_chunk)) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: Some(text_chunk),
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn complete_ai_stage_and_return(
ctx: &mut ProcedureContext,
input: AiStageCompletionInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| complete_ai_stage_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn attach_ai_result_reference_and_return(
ctx: &mut ProcedureContext,
input: AiResultReferenceInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| attach_ai_result_reference_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn complete_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskFinishInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| complete_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn fail_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskFailureInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| fail_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn cancel_ai_task_and_return(
ctx: &mut ProcedureContext,
input: AiTaskCancelInput,
) -> AiTaskProcedureResult {
match ctx.try_with_tx(|tx| cancel_ai_task_tx(tx, input.clone())) {
Ok(task) => AiTaskProcedureResult {
ok: true,
task: Some(task),
text_chunk: None,
error_message: None,
},
Err(message) => AiTaskProcedureResult {
ok: false,
task: None,
text_chunk: None,
error_message: Some(message),
},
}
}
fn create_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskCreateInput,
) -> Result<AiTaskSnapshot, String> {
validate_task_create_input(&input).map_err(|error| error.to_string())?;
if ctx.db.ai_task().task_id().find(&input.task_id).is_some() {
return Err("ai_task.task_id 已存在".to_string());
}
let task_snapshot = build_ai_task_snapshot_from_create_input(&input);
ctx.db.ai_task().insert(build_ai_task_row(&task_snapshot));
replace_ai_task_stages(ctx, &task_snapshot.task_id, &task_snapshot.stages);
get_ai_task_snapshot_tx(ctx, &task_snapshot.task_id)
}
fn start_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskStartInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.started_at_micros);
}
snapshot.updated_at_micros = input.started_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn start_ai_task_stage_tx(
ctx: &ReducerContext,
input: AiTaskStageStartInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.started_at_micros);
}
stage.status = AiTaskStageStatus::Running;
if stage.started_at_micros.is_none() {
stage.started_at_micros = Some(input.started_at_micros);
}
snapshot.updated_at_micros = input.started_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn append_ai_text_chunk_tx(
ctx: &ReducerContext,
input: AiTextChunkAppendInput,
) -> Result<(AiTaskSnapshot, AiTextChunkSnapshot), String> {
if input.delta_text.trim().is_empty() {
return Err("ai_text_chunk.delta_text 不能为空".to_string());
}
if input.sequence == 0 {
return Err("ai_text_chunk.sequence 必须大于 0".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
let chunk = AiTextChunkSnapshot {
chunk_id: generate_ai_text_chunk_id(input.created_at_micros, input.sequence),
task_id: input.task_id.trim().to_string(),
stage_kind: input.stage_kind,
sequence: input.sequence,
delta_text: input.delta_text.trim().to_string(),
created_at_micros: input.created_at_micros,
};
ctx.db
.ai_text_chunk()
.insert(build_ai_text_chunk_row(&chunk));
let aggregated_text = collect_ai_stage_text_output(ctx, &chunk.task_id, chunk.stage_kind);
snapshot.status = AiTaskStatus::Running;
if snapshot.started_at_micros.is_none() {
snapshot.started_at_micros = Some(input.created_at_micros);
}
stage.status = AiTaskStageStatus::Running;
if stage.started_at_micros.is_none() {
stage.started_at_micros = Some(input.created_at_micros);
}
stage.text_output = aggregated_text.clone();
snapshot.latest_text_output = aggregated_text;
snapshot.updated_at_micros = input.created_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok((snapshot, chunk))
}
fn complete_ai_stage_tx(
ctx: &ReducerContext,
input: AiStageCompletionInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let stage = snapshot
.stages
.iter_mut()
.find(|stage| stage.stage_kind == input.stage_kind)
.ok_or_else(|| "ai_task.stage 不存在".to_string())?;
stage.status = AiTaskStageStatus::Completed;
stage.completed_at_micros = Some(input.completed_at_micros);
stage.text_output = normalize_optional_text(input.text_output.clone());
stage.structured_payload_json = normalize_optional_text(input.structured_payload_json.clone());
stage.warning_messages = normalize_string_list(input.warning_messages.clone());
snapshot.latest_text_output = stage.text_output.clone();
snapshot.latest_structured_payload_json = stage.structured_payload_json.clone();
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn attach_ai_result_reference_tx(
ctx: &ReducerContext,
input: AiResultReferenceInput,
) -> Result<AiTaskSnapshot, String> {
let reference_id = input.reference_id.trim().to_string();
if reference_id.is_empty() {
return Err("ai_result_reference.reference_id 不能为空".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
let reference = AiResultReferenceSnapshot {
result_ref_id: generate_ai_result_ref_id(input.created_at_micros),
task_id: input.task_id.trim().to_string(),
reference_kind: input.reference_kind,
reference_id,
label: normalize_optional_text(input.label),
created_at_micros: input.created_at_micros,
};
ctx.db
.ai_result_reference()
.insert(build_ai_result_reference_row(&reference));
snapshot.result_references.push(reference);
snapshot.updated_at_micros = input.created_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn complete_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskFinishInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Completed;
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn fail_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskFailureInput,
) -> Result<AiTaskSnapshot, String> {
let failure_message = input.failure_message.trim().to_string();
if failure_message.is_empty() {
return Err("ai_task.failure_message 不能为空".to_string());
}
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Failed;
snapshot.failure_message = Some(failure_message);
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn cancel_ai_task_tx(
ctx: &ReducerContext,
input: AiTaskCancelInput,
) -> Result<AiTaskSnapshot, String> {
let mut snapshot = get_ai_task_snapshot_tx(ctx, &input.task_id)?;
ensure_ai_task_can_transition(snapshot.status)?;
snapshot.status = AiTaskStatus::Cancelled;
snapshot.completed_at_micros = Some(input.completed_at_micros);
snapshot.updated_at_micros = input.completed_at_micros;
snapshot.version += 1;
persist_ai_task_snapshot(ctx, &snapshot)?;
Ok(snapshot)
}
fn get_ai_task_snapshot_tx(ctx: &ReducerContext, task_id: &str) -> Result<AiTaskSnapshot, String> {
let row = ctx
.db
.ai_task()
.task_id()
.find(&task_id.trim().to_string())
.ok_or_else(|| "ai_task 不存在".to_string())?;
Ok(build_ai_task_snapshot_from_row(ctx, &row))
}
fn persist_ai_task_snapshot(ctx: &ReducerContext, snapshot: &AiTaskSnapshot) -> Result<(), String> {
ctx.db.ai_task().task_id().delete(&snapshot.task_id);
ctx.db.ai_task().insert(build_ai_task_row(snapshot));
replace_ai_task_stages(ctx, &snapshot.task_id, &snapshot.stages);
Ok(())
}
fn replace_ai_task_stages(ctx: &ReducerContext, task_id: &str, stages: &[AiTaskStageSnapshot]) {
let stage_ids = ctx
.db
.ai_task_stage()
.iter()
.filter(|row| row.task_id == task_id)
.map(|row| row.task_stage_id.clone())
.collect::<Vec<_>>();
for stage_id in stage_ids {
ctx.db.ai_task_stage().task_stage_id().delete(&stage_id);
}
for stage in stages {
ctx.db
.ai_task_stage()
.insert(build_ai_task_stage_row(task_id, stage));
}
}
fn collect_ai_stage_text_output(
ctx: &ReducerContext,
task_id: &str,
stage_kind: AiTaskStageKind,
) -> Option<String> {
let mut chunks = ctx
.db
.ai_text_chunk()
.iter()
.filter(|row| row.task_id == task_id && row.stage_kind == stage_kind)
.map(|row| build_ai_text_chunk_snapshot_from_row(&row))
.collect::<Vec<_>>();
chunks.sort_by_key(|chunk| chunk.sequence);
let aggregated = chunks
.into_iter()
.map(|chunk| chunk.delta_text)
.collect::<Vec<_>>()
.join("");
if aggregated.trim().is_empty() {
None
} else {
Some(aggregated)
}
}
fn ensure_ai_task_can_transition(status: AiTaskStatus) -> Result<(), String> {
if matches!(
status,
AiTaskStatus::Completed | AiTaskStatus::Failed | AiTaskStatus::Cancelled
) {
Err("当前 ai_task 状态不允许执行该操作".to_string())
} else {
Ok(())
}
}
fn build_ai_task_snapshot_from_create_input(input: &AiTaskCreateInput) -> AiTaskSnapshot {
AiTaskSnapshot {
task_id: input.task_id.trim().to_string(),
task_kind: input.task_kind,
owner_user_id: input.owner_user_id.trim().to_string(),
request_label: input.request_label.trim().to_string(),
source_module: input.source_module.trim().to_string(),
source_entity_id: normalize_optional_text(input.source_entity_id.clone()),
request_payload_json: normalize_optional_text(input.request_payload_json.clone()),
status: AiTaskStatus::Pending,
failure_message: None,
stages: input
.stages
.iter()
.map(|stage| AiTaskStageSnapshot {
stage_kind: stage.stage_kind,
label: stage.label.trim().to_string(),
detail: stage.detail.trim().to_string(),
order: stage.order,
status: AiTaskStageStatus::Pending,
text_output: None,
structured_payload_json: None,
warning_messages: Vec::new(),
started_at_micros: None,
completed_at_micros: None,
})
.collect(),
result_references: Vec::new(),
latest_text_output: None,
latest_structured_payload_json: None,
version: INITIAL_AI_TASK_VERSION,
created_at_micros: input.created_at_micros,
started_at_micros: None,
completed_at_micros: None,
updated_at_micros: input.created_at_micros,
}
}
fn build_ai_task_row(snapshot: &AiTaskSnapshot) -> AiTask {
AiTask {
task_id: snapshot.task_id.clone(),
task_kind: snapshot.task_kind,
owner_user_id: snapshot.owner_user_id.clone(),
request_label: snapshot.request_label.clone(),
source_module: snapshot.source_module.clone(),
source_entity_id: snapshot.source_entity_id.clone(),
request_payload_json: snapshot.request_payload_json.clone(),
status: snapshot.status,
failure_message: snapshot.failure_message.clone(),
latest_text_output: snapshot.latest_text_output.clone(),
latest_structured_payload_json: snapshot.latest_structured_payload_json.clone(),
version: snapshot.version,
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
started_at: snapshot
.started_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
completed_at: snapshot
.completed_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros),
}
}
fn build_ai_task_snapshot_from_row(ctx: &ReducerContext, row: &AiTask) -> AiTaskSnapshot {
let mut stages = ctx
.db
.ai_task_stage()
.iter()
.filter(|stage| stage.task_id == row.task_id)
.map(|stage| build_ai_task_stage_snapshot_from_row(&stage))
.collect::<Vec<_>>();
stages.sort_by_key(|stage| stage.order);
let mut result_references = ctx
.db
.ai_result_reference()
.iter()
.filter(|reference| reference.task_id == row.task_id)
.map(|reference| build_ai_result_reference_snapshot_from_row(&reference))
.collect::<Vec<_>>();
result_references.sort_by_key(|reference| reference.created_at_micros);
AiTaskSnapshot {
task_id: row.task_id.clone(),
task_kind: row.task_kind,
owner_user_id: row.owner_user_id.clone(),
request_label: row.request_label.clone(),
source_module: row.source_module.clone(),
source_entity_id: row.source_entity_id.clone(),
request_payload_json: row.request_payload_json.clone(),
status: row.status,
failure_message: row.failure_message.clone(),
stages,
result_references,
latest_text_output: row.latest_text_output.clone(),
latest_structured_payload_json: row.latest_structured_payload_json.clone(),
version: row.version,
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
started_at_micros: row
.started_at
.map(|value| value.to_micros_since_unix_epoch()),
completed_at_micros: row
.completed_at
.map(|value| value.to_micros_since_unix_epoch()),
updated_at_micros: row.updated_at.to_micros_since_unix_epoch(),
}
}
fn build_ai_task_stage_row(task_id: &str, snapshot: &AiTaskStageSnapshot) -> AiTaskStage {
AiTaskStage {
task_stage_id: generate_ai_task_stage_id(task_id, snapshot.stage_kind),
task_id: task_id.to_string(),
stage_kind: snapshot.stage_kind,
label: snapshot.label.clone(),
detail: snapshot.detail.clone(),
stage_order: snapshot.order,
status: snapshot.status,
text_output: snapshot.text_output.clone(),
structured_payload_json: snapshot.structured_payload_json.clone(),
warning_messages: snapshot.warning_messages.clone(),
started_at: snapshot
.started_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
completed_at: snapshot
.completed_at_micros
.map(Timestamp::from_micros_since_unix_epoch),
}
}
fn build_ai_task_stage_snapshot_from_row(row: &AiTaskStage) -> AiTaskStageSnapshot {
AiTaskStageSnapshot {
stage_kind: row.stage_kind,
label: row.label.clone(),
detail: row.detail.clone(),
order: row.stage_order,
status: row.status,
text_output: row.text_output.clone(),
structured_payload_json: row.structured_payload_json.clone(),
warning_messages: row.warning_messages.clone(),
started_at_micros: row
.started_at
.map(|value| value.to_micros_since_unix_epoch()),
completed_at_micros: row
.completed_at
.map(|value| value.to_micros_since_unix_epoch()),
}
}
fn build_ai_text_chunk_row(snapshot: &AiTextChunkSnapshot) -> AiTextChunk {
AiTextChunk {
text_chunk_row_id: format!(
"{}{}_{}_{}",
AI_TEXT_CHUNK_ID_PREFIX,
snapshot.task_id,
snapshot.stage_kind.as_str(),
snapshot.sequence
),
chunk_id: snapshot.chunk_id.clone(),
task_id: snapshot.task_id.clone(),
stage_kind: snapshot.stage_kind,
sequence: snapshot.sequence,
delta_text: snapshot.delta_text.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
}
}
fn build_ai_text_chunk_snapshot_from_row(row: &AiTextChunk) -> AiTextChunkSnapshot {
AiTextChunkSnapshot {
chunk_id: row.chunk_id.clone(),
task_id: row.task_id.clone(),
stage_kind: row.stage_kind,
sequence: row.sequence,
delta_text: row.delta_text.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}
fn build_ai_result_reference_row(snapshot: &AiResultReferenceSnapshot) -> AiResultReference {
AiResultReference {
result_reference_row_id: format!(
"{}{}_{}",
AI_RESULT_REF_ID_PREFIX, snapshot.task_id, snapshot.result_ref_id
),
result_ref_id: snapshot.result_ref_id.clone(),
task_id: snapshot.task_id.clone(),
reference_kind: snapshot.reference_kind,
reference_id: snapshot.reference_id.clone(),
label: snapshot.label.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
}
}
fn build_ai_result_reference_snapshot_from_row(
row: &AiResultReference,
) -> AiResultReferenceSnapshot {
AiResultReferenceSnapshot {
result_ref_id: row.result_ref_id.clone(),
task_id: row.task_id.clone(),
reference_kind: row.reference_kind,
reference_id: row.reference_id.clone(),
label: row.label.clone(),
created_at_micros: row.created_at.to_micros_since_unix_epoch(),
}
}

View File

@@ -0,0 +1,305 @@
#[spacetimedb::table(
accessor = asset_object,
index(accessor = by_bucket_object_key, btree(columns = [bucket, object_key]))
)]
pub struct AssetObject {
#[primary_key]
asset_object_id: String,
// 正式对象定位固定拆成 bucket + object_key 两列,避免后续再从单字符串路径做 schema 拆分。
bucket: String,
object_key: String,
access_policy: AssetObjectAccessPolicy,
content_type: Option<String>,
content_length: u64,
content_hash: Option<String>,
version: u32,
source_job_id: Option<String>,
owner_user_id: Option<String>,
profile_id: Option<String>,
entity_id: Option<String>,
#[index(btree)]
asset_kind: String,
created_at: Timestamp,
updated_at: Timestamp,
}
#[spacetimedb::table(
accessor = asset_entity_binding,
index(accessor = by_entity_slot, btree(columns = [entity_kind, entity_id, slot])),
index(accessor = by_asset_object_id, btree(columns = [asset_object_id]))
)]
pub struct AssetEntityBinding {
#[primary_key]
binding_id: String,
asset_object_id: String,
entity_kind: String,
entity_id: String,
slot: String,
asset_kind: String,
owner_user_id: Option<String>,
profile_id: Option<String>,
created_at: Timestamp,
updated_at: Timestamp,
}
// reducer 负责固定资产对象的正式写规则,供后续内部模块逻辑复用。
#[spacetimedb::reducer]
pub fn confirm_asset_object(
ctx: &ReducerContext,
input: AssetObjectUpsertInput,
) -> Result<(), String> {
upsert_asset_object(ctx, input).map(|_| ())
}
// procedure 面向 Axum 同步确认接口,返回最终持久化后的对象记录,避免 HTTP 层再额外查询 private table。
#[spacetimedb::procedure]
pub fn confirm_asset_object_and_return(
ctx: &mut ProcedureContext,
input: AssetObjectUpsertInput,
) -> AssetObjectProcedureResult {
match ctx.try_with_tx(|tx| upsert_asset_object(tx, input.clone())) {
Ok(record) => AssetObjectProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => AssetObjectProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
// reducer 负责把已确认对象绑定到实体槽位,强业务资产表稳定前先用通用绑定表承接关系。
#[spacetimedb::reducer]
pub fn bind_asset_object_to_entity(
ctx: &ReducerContext,
input: AssetEntityBindingInput,
) -> Result<(), String> {
upsert_asset_entity_binding(ctx, input).map(|_| ())
}
// procedure 面向 Axum 同步绑定接口,返回最终绑定快照,避免 HTTP 层读取 private table。
#[spacetimedb::procedure]
pub fn bind_asset_object_to_entity_and_return(
ctx: &mut ProcedureContext,
input: AssetEntityBindingInput,
) -> AssetEntityBindingProcedureResult {
match ctx.try_with_tx(|tx| upsert_asset_entity_binding(tx, input.clone())) {
Ok(record) => AssetEntityBindingProcedureResult {
ok: true,
record: Some(record),
error_message: None,
},
Err(message) => AssetEntityBindingProcedureResult {
ok: false,
record: None,
error_message: Some(message),
},
}
}
fn upsert_asset_object(
ctx: &ReducerContext,
input: AssetObjectUpsertInput,
) -> Result<AssetObjectUpsertSnapshot, String> {
validate_asset_object_fields(
&input.bucket,
&input.object_key,
&input.asset_kind,
input.version,
)
.map_err(|error| error.to_string())?;
let updated_at = Timestamp::from_micros_since_unix_epoch(input.updated_at_micros);
// 这里先保持最小可发布实现:查重语义已经冻结,后续再把实现优化回组合索引扫描。
let current = ctx
.db
.asset_object()
.iter()
.find(|row| row.bucket == input.bucket && row.object_key == input.object_key);
let snapshot = match current {
Some(existing) => {
ctx.db
.asset_object()
.asset_object_id()
.delete(&existing.asset_object_id);
let row = AssetObject {
asset_object_id: existing.asset_object_id.clone(),
bucket: input.bucket.clone(),
object_key: input.object_key.clone(),
access_policy: input.access_policy,
content_type: input.content_type.clone(),
content_length: input.content_length,
content_hash: input.content_hash.clone(),
version: input.version,
source_job_id: input.source_job_id.clone(),
owner_user_id: input.owner_user_id.clone(),
profile_id: input.profile_id.clone(),
entity_id: input.entity_id.clone(),
asset_kind: input.asset_kind.clone(),
created_at: existing.created_at,
updated_at,
};
ctx.db.asset_object().insert(row);
AssetObjectUpsertSnapshot {
asset_object_id: existing.asset_object_id,
bucket: input.bucket,
object_key: input.object_key,
access_policy: input.access_policy,
content_type: input.content_type,
content_length: input.content_length,
content_hash: input.content_hash,
version: input.version,
source_job_id: input.source_job_id,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
entity_id: input.entity_id,
asset_kind: input.asset_kind,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: input.updated_at_micros,
}
}
None => {
let created_at = updated_at;
let row = AssetObject {
asset_object_id: input.asset_object_id.clone(),
bucket: input.bucket.clone(),
object_key: input.object_key.clone(),
access_policy: input.access_policy,
content_type: input.content_type.clone(),
content_length: input.content_length,
content_hash: input.content_hash.clone(),
version: input.version,
source_job_id: input.source_job_id.clone(),
owner_user_id: input.owner_user_id.clone(),
profile_id: input.profile_id.clone(),
entity_id: input.entity_id.clone(),
asset_kind: input.asset_kind.clone(),
created_at,
updated_at,
};
ctx.db.asset_object().insert(row);
AssetObjectUpsertSnapshot {
asset_object_id: input.asset_object_id,
bucket: input.bucket,
object_key: input.object_key,
access_policy: input.access_policy,
content_type: input.content_type,
content_length: input.content_length,
content_hash: input.content_hash,
version: input.version,
source_job_id: input.source_job_id,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
entity_id: input.entity_id,
asset_kind: input.asset_kind,
created_at_micros: input.updated_at_micros,
updated_at_micros: input.updated_at_micros,
}
}
};
Ok(snapshot)
}
fn upsert_asset_entity_binding(
ctx: &ReducerContext,
input: AssetEntityBindingInput,
) -> Result<AssetEntityBindingSnapshot, String> {
validate_asset_entity_binding_fields(
&input.binding_id,
&input.asset_object_id,
&input.entity_kind,
&input.entity_id,
&input.slot,
&input.asset_kind,
)
.map_err(|error| error.to_string())?;
if ctx
.db
.asset_object()
.asset_object_id()
.find(&input.asset_object_id)
.is_none()
{
return Err("asset_entity_binding.asset_object_id 对应的 asset_object 不存在".to_string());
}
let updated_at = Timestamp::from_micros_since_unix_epoch(input.updated_at_micros);
// 首版绑定按 entity_kind + entity_id + slot 幂等定位,后续访问量明确后再改为组合索引扫描。
let current = ctx.db.asset_entity_binding().iter().find(|row| {
row.entity_kind == input.entity_kind
&& row.entity_id == input.entity_id
&& row.slot == input.slot
});
let snapshot = match current {
Some(existing) => {
ctx.db
.asset_entity_binding()
.binding_id()
.delete(&existing.binding_id);
let row = AssetEntityBinding {
binding_id: existing.binding_id.clone(),
asset_object_id: input.asset_object_id.clone(),
entity_kind: input.entity_kind.clone(),
entity_id: input.entity_id.clone(),
slot: input.slot.clone(),
asset_kind: input.asset_kind.clone(),
owner_user_id: input.owner_user_id.clone(),
profile_id: input.profile_id.clone(),
created_at: existing.created_at,
updated_at,
};
ctx.db.asset_entity_binding().insert(row);
AssetEntityBindingSnapshot {
binding_id: existing.binding_id,
asset_object_id: input.asset_object_id,
entity_kind: input.entity_kind,
entity_id: input.entity_id,
slot: input.slot,
asset_kind: input.asset_kind,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: input.updated_at_micros,
}
}
None => {
let created_at = updated_at;
let row = AssetEntityBinding {
binding_id: input.binding_id.clone(),
asset_object_id: input.asset_object_id.clone(),
entity_kind: input.entity_kind.clone(),
entity_id: input.entity_id.clone(),
slot: input.slot.clone(),
asset_kind: input.asset_kind.clone(),
owner_user_id: input.owner_user_id.clone(),
profile_id: input.profile_id.clone(),
created_at,
updated_at,
};
ctx.db.asset_entity_binding().insert(row);
AssetEntityBindingSnapshot {
binding_id: input.binding_id,
asset_object_id: input.asset_object_id,
entity_kind: input.entity_kind,
entity_id: input.entity_id,
slot: input.slot,
asset_kind: input.asset_kind,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
created_at_micros: input.updated_at_micros,
updated_at_micros: input.updated_at_micros,
}
}
};
Ok(snapshot)
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,29 @@
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct ResolveNpcBattleInteractionInput {
pub npc_interaction: ResolveNpcInteractionInput,
pub story_session_id: String,
pub actor_user_id: String,
pub battle_state_id: Option<String>,
pub player_hp: i32,
pub player_max_hp: i32,
pub player_mana: i32,
pub player_max_mana: i32,
pub target_hp: i32,
pub target_max_hp: i32,
pub experience_reward: u32,
pub reward_items: Vec<RuntimeItemRewardItemSnapshot>,
}
// 输出同时返回 NPC 交互结果与 battle_state 快照,避免 Axum 再回头读取 private table。
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct NpcBattleInteractionResult {
pub interaction: module_npc::NpcInteractionResult,
pub battle_state: BattleStateSnapshot,
}
#[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)]
pub struct NpcBattleInteractionProcedureResult {
pub ok: bool,
pub result: Option<NpcBattleInteractionResult>,
pub error_message: Option<String>,
}

View File

@@ -0,0 +1,23 @@
// 当前阶段先落可发布的最小模块入口,后续再补对象确认、业务绑定与任务编排 reducer。
#[spacetimedb::reducer(init)]
pub fn init(_ctx: &ReducerContext) {
log::info!(
"spacetime-module 初始化完成asset_object 已固定 bucket/object_key 双列主存储口径runtime_setting 已固定默认音量={} 和默认主题={}battle_state 前缀={},战斗初始版本={}npc_state 前缀={}npc 招募阈值={}story_session 前缀={}story_event 前缀={}inventory_slot 前缀={}inventory_mutation 前缀={}quest_log 前缀={}treasure_record 前缀={}player_progression 与 chapter_progression 已接入成长真相表M5 custom_world_profile/session/agent/gallery 首批表骨架已接入,默认对象 ID 前缀={},默认绑定 ID 前缀={},资产初始版本={},故事会话初始版本={}",
DEFAULT_MUSIC_VOLUME,
DEFAULT_PLATFORM_THEME.as_str(),
BATTLE_STATE_ID_PREFIX,
INITIAL_BATTLE_VERSION,
NPC_STATE_ID_PREFIX,
NPC_RECRUIT_AFFINITY_THRESHOLD,
STORY_SESSION_ID_PREFIX,
STORY_EVENT_ID_PREFIX,
INVENTORY_SLOT_ID_PREFIX,
INVENTORY_MUTATION_ID_PREFIX,
QUEST_LOG_ID_PREFIX,
TREASURE_RECORD_ID_PREFIX,
ASSET_OBJECT_ID_PREFIX,
ASSET_BINDING_ID_PREFIX,
INITIAL_ASSET_OBJECT_VERSION,
INITIAL_STORY_SESSION_VERSION
);
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
[CmdletBinding()]
param(
[Alias("h")]
[switch]$Help,
[switch]$RunSmoke,
[switch]$RunSpacetimeBuild
)
$ErrorActionPreference = "Stop"
function Write-Usage {
@(
'Usage:',
' ./server-rs/scripts/m7-preflight.ps1',
' ./server-rs/scripts/m7-preflight.ps1 -RunSmoke',
' ./server-rs/scripts/m7-preflight.ps1 -RunSpacetimeBuild',
'',
'Notes:',
' 1. Run M7 cutover preflight checks for Rust backend',
' 2. Default checks are non-destructive and do not publish or clear SpacetimeDB data',
' 3. -RunSmoke starts a temporary api-server and verifies /healthz contract',
' 4. -RunSpacetimeBuild requires spacetime CLI and only builds the module'
) -join [Environment]::NewLine
}
if ($Help) {
Write-Usage
exit 0
}
$scriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path
$serverRsDir = Split-Path -Parent $scriptDir
$repoRoot = Split-Path -Parent $serverRsDir
$manifestPath = Join-Path $serverRsDir "Cargo.toml"
$modulePath = Join-Path $serverRsDir "crates\spacetime-module"
if (-not (Test-Path $manifestPath)) {
throw "Missing server-rs/Cargo.toml, cannot start M7 preflight."
}
Write-Host "[m7:preflight] repo root: $repoRoot"
Write-Host "[m7:preflight] server-rs: $serverRsDir"
Push-Location $serverRsDir
try {
Write-Host "[m7:preflight] step: cargo check -p spacetime-module"
cargo check -p spacetime-module --manifest-path $manifestPath
Write-Host "[m7:preflight] step: cargo check -p api-server"
cargo check -p api-server --manifest-path $manifestPath
Write-Host "[m7:preflight] step: cargo test -p shared-contracts"
cargo test -p shared-contracts --manifest-path $manifestPath
if ($RunSpacetimeBuild) {
$spacetimeCommand = Get-Command spacetime -ErrorAction SilentlyContinue
if ($null -eq $spacetimeCommand) {
throw "Missing spacetime CLI, cannot run spacetime build."
}
Write-Host "[m7:preflight] step: spacetime build --debug"
Push-Location $modulePath
try {
& $spacetimeCommand.Source build --debug
}
finally {
Pop-Location
}
}
}
finally {
Pop-Location
}
if ($RunSmoke) {
Write-Host "[m7:preflight] step: server-rs smoke"
& (Join-Path $serverRsDir "scripts\smoke.ps1")
}
Write-Host "[m7:preflight] all checks passed"

View File

@@ -17,9 +17,16 @@ export default defineConfig(({mode}) => {
'**/public/generated-custom-world-scenes/**',
'**/public/generated-qwen-sprites/**',
];
const runtimeServerTarget =
const backendStack = (env.GENARRATIVE_BACKEND_STACK || 'node').trim().toLowerCase();
const nodeServerTarget =
env.NODE_SERVER_TARGET ||
'http://127.0.0.1:8081';
const rustServerTarget =
env.RUST_SERVER_TARGET ||
'http://127.0.0.1:3000';
const runtimeServerTarget =
env.GENARRATIVE_RUNTIME_SERVER_TARGET ||
(backendStack === 'rust' ? rustServerTarget : nodeServerTarget);
return {
root: __dirname,