From 5a4a8a48927b5c8e5af0adb096d66614d0f4ca9c Mon Sep 17 00:00:00 2001 From: kdletters <61648117+kdletters@users.noreply.github.com> Date: Sun, 17 May 2026 20:52:15 +0800 Subject: [PATCH] feat(api-server): add container loadtest observability --- .dockerignore | 36 +++ .gitignore | 1 + .hermes/shared-memory/decision-log.md | 9 + .hermes/shared-memory/pitfalls.md | 8 + deploy/container/README.md | 132 ++++++++ deploy/container/api-server.Dockerfile | 49 +++ deploy/container/api-server.env.example | 35 ++ deploy/container/docker-compose.loadtest.yml | 85 +++++ deploy/container/nginx.conf | 133 ++++++++ deploy/container/otelcol.yaml | 23 ++ deploy/nginx/genarrative-dev-http.conf | 6 + deploy/nginx/genarrative.conf | 6 + ...】server-rs与SpacetimeDB数据契约-2026-05-15.md | 2 +- ...发运维】本地开发验证与生产运维-2026-05-15.md | 18 ++ package.json | 8 + scripts/container-compose.mjs | 99 ++++++ scripts/loadtest/README.md | 19 ++ server-rs/Cargo.lock | 1 + server-rs/Cargo.toml | 1 + server-rs/crates/api-server/Cargo.toml | 3 + .../crates/api-server/src/api_response.rs | 73 ++++- .../crates/api-server/src/backpressure.rs | 30 +- server-rs/crates/api-server/src/main.rs | 3 + .../crates/api-server/src/process_metrics.rs | 306 ++++++++++++++++++ server-rs/crates/api-server/src/puzzle.rs | 28 +- .../api-server/src/puzzle_gallery_cache.rs | 42 ++- server-rs/crates/api-server/src/telemetry.rs | 123 ++++++- .../crates/spacetime-client/src/big_fish.rs | 2 +- .../spacetime-client/src/custom_world.rs | 2 +- server-rs/crates/spacetime-client/src/lib.rs | 12 +- .../crates/spacetime-client/src/match3d.rs | 2 +- .../crates/spacetime-client/src/puzzle.rs | 2 +- .../crates/spacetime-client/src/runtime.rs | 2 +- .../spacetime-client/src/square_hole.rs | 2 +- .../crates/spacetime-client/src/telemetry.rs | 50 +++ .../spacetime-client/src/visual_novel.rs | 2 +- 36 files changed, 1325 insertions(+), 30 deletions(-) create mode 100644 .dockerignore create mode 100644 deploy/container/README.md create mode 100644 deploy/container/api-server.Dockerfile create mode 100644 deploy/container/api-server.env.example create mode 100644 deploy/container/docker-compose.loadtest.yml create mode 100644 deploy/container/nginx.conf create mode 100644 deploy/container/otelcol.yaml create mode 100644 scripts/container-compose.mjs create mode 100644 server-rs/crates/api-server/src/process_metrics.rs diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..422a9ea0 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,36 @@ +.git +.codex-temp +.codex-logs +.codex-runlogs +.idea +.vite +node_modules +target +dist +coverage +logs +tmp +*.log +/*.png +/*.jpg +/*.jpeg +/*.webp + +.env +.env.local +.env.secrets.local +.env.secrets.* +spacetime.local.json +deploy/container/api-server.env + +server-rs/target +server-rs/target-* +server-rs/.data +server-rs/.spacetimedb + +public/generated-* + +scripts/loadtest/data/*.local.json +scripts/loadtest/data/k6-*.log +scripts/loadtest/data/k6-*summary*.md +scripts/loadtest/data/latest-*-prefix.txt diff --git a/.gitignore b/.gitignore index 6f27c449..11a83c89 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ temp*build*/ .worktrees/ .env.secrets.local spacetime.local.json +deploy/container/api-server.env # Local load-test data extracted from private migration files scripts/loadtest/data/*.local.json diff --git a/.hermes/shared-memory/decision-log.md b/.hermes/shared-memory/decision-log.md index 478a9bb7..f34b87e6 100644 --- a/.hermes/shared-memory/decision-log.md +++ b/.hermes/shared-memory/decision-log.md @@ -16,6 +16,15 @@ --- +## 2026-05-17 容器化方案只作为隔离压测与预发模拟路径 + +- 背景:Windows 本机直连极高 VU 压测会放大本地连接与发送缓冲行为,和线上 Linux + Nginx + systemd 拓扑不一致;需要一个更接近生产网络层的模拟方案,但不能扰动当前生产发布链路。 +- 决策:新增 `deploy/container/` 容器化方案,使用 Docker Compose 组合 Linux release `api-server`、容器 Nginx、`otelcol-contrib` debug exporter 和可选 k6。该方案只用于本机或预发压测模拟,不替换当前生产 `systemd + Nginx + Jenkins` 路径。 +- 隔离边界:容器方案使用独立 `deploy/container/api-server.env`、独立 Nginx 配置、独立 compose 命令和默认 `18080` 端口;真实 token 不进入镜像、不提交 Git;生产 systemd 单元、Jenkins 发布脚本和 `deploy/nginx/` 模板仍是正式线上来源。 +- 影响范围:`deploy/container/`、`scripts/container-compose.mjs`、`package.json` 容器命令、开发运维文档和容器 build context 排除规则。 +- 验证方式:执行 `npm run container:config` 展开 compose 配置;需要真实运行时再执行 `npm run container:build`、`npm run container:up`、`npm run container:k6`,并结合容器 Nginx log 与 OTLP debug exporter 判断瓶颈。 +- 关联文档:`deploy/container/README.md`、`docs/【开发运维】本地开发验证与生产运维-2026-05-15.md`。 + ## 2026-05-16 公开作品列表短期由 BFF 订阅读模型缓存 - 背景:作品列表压测和实时性讨论中,曾考虑让浏览器前端直接订阅公开作品列表,减少 HTTP 拉取和 BFF 压力。 diff --git a/.hermes/shared-memory/pitfalls.md b/.hermes/shared-memory/pitfalls.md index 1069ff7a..b4615f82 100644 --- a/.hermes/shared-memory/pitfalls.md +++ b/.hermes/shared-memory/pitfalls.md @@ -99,6 +99,14 @@ - 验证:搜索 `server-rs/crates/spacetime-client/src/puzzle.rs` 不应再出现 gallery 主路径调用 `list_puzzle_gallery_then`;搜索 `server-rs/crates/spacetime-client/src/lib.rs` 应订阅 `puzzle_gallery_card_view`;执行 `npm run spacetime:generate`、`cargo check --manifest-path server-rs/Cargo.toml -p spacetime-client`、`cargo check --manifest-path server-rs/Cargo.toml -p api-server` 和 schema/runtime access 检查。 - 关联:`server-rs/crates/spacetime-module/src/puzzle.rs`、`server-rs/crates/spacetime-client/src/lib.rs`、`server-rs/crates/spacetime-client/src/puzzle.rs`、`server-rs/crates/api-server/src/puzzle_gallery_cache.rs`、`/api/runtime/puzzle/gallery`。 +## Windows 本地直连高 VU 压测不要误判成业务内存泄漏 + +- 现象:本地 Windows release `api-server` 直连 K6 压测时,250 RPS、`PREALLOCATED_VUS=300` 能把进程 private memory 瞬时推到约 7GB;同样配置打 `/healthz` 小响应也能复现,压测结束后回落到 100MB 级。 +- 原因:高水位主要来自本机直连的 K6 VU / 长连接 / Hyper 发送链路和 Windows 连接缓冲,不是 SpacetimeDB procedure、拼图 JSON 缓存或 OTEL exporter。降低到接近真实并发的 VU 后,同样 250 RPS 拼图广场 p95 约 9ms,峰值约 600MB。 +- 处理:本地容量判断时让 `PREALLOCATED_VUS` / `MAX_VUS` 接近真实并发,不要把过高 VU 预分配当作默认吞吐测试;同时观察 `process.memory.*`、`process.windows.handle.count`、`genarrative.http.server.response_bodies.in_flight`、`genarrative.http.server.request_permits.available`、`genarrative.puzzle_gallery.cache.*` 和 `genarrative.spacetime.read.*`。如果内存高但 body in-flight、背压 permit、cache rebuild 和 SpacetimeDB read 都不显示积压,优先按连接 / 发送链路高水位处理。 +- 验证:对照打 `/api/runtime/puzzle/gallery` 与 `/healthz`;对比 `PREALLOCATED_VUS=300 MAX_VUS=800` 和 `PREALLOCATED_VUS=20 MAX_VUS=40`;压测结束后继续采样 10 秒确认 private memory 回落。 +- 关联:`scripts/loadtest/README.md`、`docs/【开发运维】本地开发验证与生产运维-2026-05-15.md`、`server-rs/crates/api-server/src/process_metrics.rs`、`server-rs/crates/api-server/src/telemetry.rs`。 + ## 多玩法公开广场列表优先订阅 public view / read model - 现象:抓大鹅、方洞挑战、视觉小说、大鱼吃小鱼等公开列表如果沿用 `list_*_works` procedure,即使只读已发布作品,也会在每个 HTTP 请求里回到 SpacetimeDB WASM 侧扫描、反序列化配置并组装列表,50RPS 以上容易变成热点。 diff --git a/deploy/container/README.md b/deploy/container/README.md new file mode 100644 index 00000000..c9eb84c5 --- /dev/null +++ b/deploy/container/README.md @@ -0,0 +1,132 @@ +# Genarrative 容器化压测与隔离部署方案 + +本目录只服务本机或预发的容器化模拟压测,不替换当前生产 `systemd + Nginx + Jenkins` 发布路径。生产服务器仍以 `deploy/systemd/`、`deploy/nginx/`、`scripts/jenkins-*.sh` 和 `scripts/deploy/production-api-deploy.sh` 为准。 + +## 拓扑 + +```text +Docker Compose +├─ nginx :80 -> api-server:8082,负责静态站点、/admin/、/api/ 反代、upstream timing log、连接限制 +├─ api-server :8082,Linux release 构建,连接外部 SpacetimeDB +├─ otelcol :4317/4318,debug exporter,接收 traces / metrics / logs +└─ k6 profile=loadtest 时临时启动,在 compose 网络内压 nginx +``` + +默认 host 端口: + +- `http://127.0.0.1:18080`:容器 Nginx。 +- `127.0.0.1:4317` / `127.0.0.1:4318`:容器 Collector OTLP gRPC / HTTP。 + +如端口冲突,可设置: + +```powershell +$env:GENARRATIVE_CONTAINER_HTTP_PORT="18081" +$env:GENARRATIVE_CONTAINER_OTLP_HTTP_PORT="14318" +$env:GENARRATIVE_CONTAINER_OTLP_GRPC_PORT="14317" +``` + +## 初始化 + +```bash +npm run container:init +``` + +该命令会从 `deploy/container/api-server.env.example` 生成本地 `deploy/container/api-server.env`。真实 token、库名和外部服务密钥只写本地 env 文件,不提交 Git。 + +Docker Desktop 下默认通过 `host.docker.internal:3101` 连接宿主机上 `npm run dev` 启动的 SpacetimeDB: + +```env +GENARRATIVE_SPACETIME_SERVER_URL=http://host.docker.internal:3101 +GENARRATIVE_SPACETIME_DATABASE=genarrative-loadtest +GENARRATIVE_SPACETIME_TOKEN= +``` + +Linux Docker Engine 如果不能解析 `host.docker.internal`,Compose 已配置 `host-gateway`;仍不通时把 `GENARRATIVE_SPACETIME_SERVER_URL` 改成宿主机网关 IP 或同网络内的 SpacetimeDB 地址。 + +## 启动与验证 + +```bash +npm run container:config +npm run container:build +npm run container:up +npm run container:ps +curl -sS http://127.0.0.1:18080/api/runtime/puzzle/gallery +``` + +查看日志: + +```bash +npm run container:logs -- nginx +npm run container:logs -- api-server +npm run container:logs -- otelcol +``` + +`npm run container:config` 默认只校验配置,不打印完整 env。排查 compose 展开结果时可临时使用: + +```bash +npm run container:config -- --print +``` + +如果 `deploy/container/api-server.env` 已写入真实 token,不要把完整展开结果贴到公开渠道。 + +停止: + +```bash +npm run container:down +``` + +如需同时清理容器卷: + +```bash +npm run container:down -- -v +``` + +## 压测 + +k6 在 compose 网络内访问 `http://nginx`,避免 Windows 本机直连连接模型干扰 Linux 容器结果: + +```bash +npm run container:k6 +``` + +作品列表脚本一次 iteration 默认请求两个公开列表接口,因此目标 500 HTTP req/s 对应 `PEAK_RPS=250`: + +```powershell +$env:SCENARIO="spike" +$env:START_RPS="25" +$env:PEAK_RPS="250" +$env:HOLD="60s" +$env:END_RPS="25" +$env:PREALLOCATED_VUS="100" +$env:MAX_VUS="500" +$env:DETAIL_RATIO="0" +npm run container:k6 +``` + +如果要压 1000 HTTP req/s,把 `PEAK_RPS` 调到 `500`;如果要压 5000 HTTP req/s,把 `PEAK_RPS` 调到 `2500`,并同时提高 `PREALLOCATED_VUS` / `MAX_VUS`,观察是否先被带宽、Nginx `limit_conn` 或 api-server 背压限制。 + +## OTLP + +容器内 `otelcol` 默认使用 debug exporter。开启 api-server OTEL: + +```env +GENARRATIVE_OTEL_ENABLED=true +OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4318 +``` + +然后重建或重启容器: + +```bash +npm run container:up +npm run container:logs -- otelcol +``` + +Collector 日志会输出 traces / metrics / logs。接 Rider、Jaeger、Tempo、Prometheus、Grafana 或托管平台时,另建独立 Collector 配置,不直接改生产 systemd 或 Nginx 模板。 + +## 隔离边界 + +- 不改生产 systemd 单元。 +- 不改 Jenkins 发布主流程。 +- 不要求真实 HTTPS 证书。 +- 不把真实 `.env`、`.env.local`、`.env.secrets.local` 或 `deploy/container/api-server.env` 放入 Docker build context。 +- 不在容器镜像里内置 SpacetimeDB 数据或 token。 diff --git a/deploy/container/api-server.Dockerfile b/deploy/container/api-server.Dockerfile new file mode 100644 index 00000000..5385b719 --- /dev/null +++ b/deploy/container/api-server.Dockerfile @@ -0,0 +1,49 @@ +FROM rust:1.88-bookworm AS rust-builder +WORKDIR /workspace + +COPY server-rs ./server-rs +RUN cargo build --release -p api-server --manifest-path server-rs/Cargo.toml && \ + cp server-rs/target/release/api-server /tmp/api-server + +FROM debian:bookworm-slim AS api-runtime +WORKDIR /srv/genarrative + +RUN apt-get update && \ + apt-get install -y --no-install-recommends ca-certificates curl && \ + rm -rf /var/lib/apt/lists/* && \ + useradd --system --create-home --home-dir /srv/genarrative --shell /usr/sbin/nologin genarrative + +COPY --from=rust-builder /tmp/api-server /usr/local/bin/api-server + +RUN mkdir -p /var/lib/genarrative/auth && \ + chown -R genarrative:genarrative /srv/genarrative /var/lib/genarrative + +USER genarrative +EXPOSE 8082 + +ENV GENARRATIVE_ENV=container \ + GENARRATIVE_API_HOST=0.0.0.0 \ + GENARRATIVE_API_PORT=8082 \ + GENARRATIVE_AUTH_STORE_PATH=/var/lib/genarrative/auth/auth-store.json + +CMD ["api-server"] + +FROM node:22-bookworm-slim AS web-builder +WORKDIR /workspace + +COPY package.json package-lock.json ./ +COPY apps/admin-web/package.json ./apps/admin-web/package.json +RUN npm ci + +COPY index.html metadata.json tsconfig.json vite.config.ts ./ +COPY src ./src +COPY public ./public +COPY media ./media +COPY packages ./packages +COPY apps/admin-web ./apps/admin-web +RUN npm run build:raw && npm run admin-web:build + +FROM nginx:1.27-alpine AS nginx-runtime +COPY --from=web-builder /workspace/dist /srv/genarrative/web +COPY --from=web-builder /workspace/apps/admin-web/dist /srv/genarrative/web/admin +COPY deploy/container/nginx.conf /etc/nginx/nginx.conf diff --git a/deploy/container/api-server.env.example b/deploy/container/api-server.env.example new file mode 100644 index 00000000..ad4ff549 --- /dev/null +++ b/deploy/container/api-server.env.example @@ -0,0 +1,35 @@ +# 复制为 deploy/container/api-server.env 后填入本机或预发值。 +# 该文件只用于容器隔离方案,不参与 systemd/Jenkins 生产部署。 +# 不要在这里写真实 token 后提交 Git。 + +GENARRATIVE_ENV=container +GENARRATIVE_API_HOST=0.0.0.0 +GENARRATIVE_API_PORT=8082 +GENARRATIVE_API_LOG=info,tower_http=info +GENARRATIVE_API_LISTEN_BACKLOG=1024 +GENARRATIVE_API_WORKER_THREADS=4 +GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=512 + +GENARRATIVE_OTEL_ENABLED=false +OTEL_SERVICE_NAME=genarrative-api +OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4318 +OTEL_RESOURCE_ATTRIBUTES=deployment.environment=container,service.namespace=genarrative + +GENARRATIVE_INTERNAL_API_SECRET=CHANGE_ME_FOR_CONTAINER +GENARRATIVE_JWT_ISSUER=genarrative-container +GENARRATIVE_JWT_SECRET=CHANGE_ME_FOR_CONTAINER +AUTH_REFRESH_COOKIE_SECURE=false +GENARRATIVE_AUTH_STORE_PATH=/var/lib/genarrative/auth/auth-store.json + +# Docker Desktop 下连接宿主机 npm run dev 启动的 SpacetimeDB。 +# Linux Docker Engine 可改成宿主机网关 IP,或在 compose 里接入同一网络内的 SpacetimeDB。 +GENARRATIVE_SPACETIME_SERVER_URL=http://host.docker.internal:3101 +GENARRATIVE_SPACETIME_DATABASE=genarrative-loadtest +GENARRATIVE_SPACETIME_TOKEN= +GENARRATIVE_SPACETIME_POOL_SIZE=8 +GENARRATIVE_SPACETIME_PROCEDURE_TIMEOUT_SECONDS=45 + +GENARRATIVE_LLM_PROVIDER=openai-compatible +GENARRATIVE_LLM_BASE_URL= +GENARRATIVE_LLM_API_KEY= +GENARRATIVE_LLM_MODEL= diff --git a/deploy/container/docker-compose.loadtest.yml b/deploy/container/docker-compose.loadtest.yml new file mode 100644 index 00000000..2450e6ec --- /dev/null +++ b/deploy/container/docker-compose.loadtest.yml @@ -0,0 +1,85 @@ +name: genarrative-container-loadtest + +services: + api-server: + build: + context: ../.. + dockerfile: deploy/container/api-server.Dockerfile + target: api-runtime + env_file: + - ./api-server.env + environment: + GENARRATIVE_API_HOST: 0.0.0.0 + GENARRATIVE_API_PORT: 8082 + OTEL_EXPORTER_OTLP_ENDPOINT: http://otelcol:4318 + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - api-auth-store:/var/lib/genarrative/auth + depends_on: + otelcol: + condition: service_started + healthcheck: + test: ["CMD", "curl", "-fsS", "http://127.0.0.1:8082/healthz"] + interval: 10s + timeout: 3s + retries: 12 + start_period: 20s + + nginx: + build: + context: ../.. + dockerfile: deploy/container/api-server.Dockerfile + target: nginx-runtime + depends_on: + api-server: + condition: service_healthy + ports: + - "${GENARRATIVE_CONTAINER_HTTP_PORT:-18080}:80" + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - nginx-logs:/var/log/nginx + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1/api/runtime/puzzle/gallery"] + interval: 10s + timeout: 5s + retries: 12 + start_period: 20s + + otelcol: + image: otel/opentelemetry-collector-contrib:0.125.0 + command: ["--config=/etc/otelcol/config.yaml"] + volumes: + - ./otelcol.yaml:/etc/otelcol/config.yaml:ro + ports: + - "${GENARRATIVE_CONTAINER_OTLP_GRPC_PORT:-4317}:4317" + - "${GENARRATIVE_CONTAINER_OTLP_HTTP_PORT:-4318}:4318" + + k6: + image: grafana/k6:0.52.0 + profiles: ["loadtest"] + depends_on: + nginx: + condition: service_healthy + environment: + BASE_URL: http://nginx + WORKS_DATA: data/works-list.sample.json + SCENARIO: ${SCENARIO:-spike} + START_RPS: ${START_RPS:-5} + PEAK_RPS: ${PEAK_RPS:-250} + HOLD: ${HOLD:-60s} + END_RPS: ${END_RPS:-5} + PREALLOCATED_VUS: ${PREALLOCATED_VUS:-100} + MAX_VUS: ${MAX_VUS:-500} + DETAIL_RATIO: ${DETAIL_RATIO:-0} + SLEEP_MIN_SECONDS: ${SLEEP_MIN_SECONDS:-0} + SLEEP_MAX_SECONDS: ${SLEEP_MAX_SECONDS:-0} + volumes: + - ../../scripts/loadtest:/scripts/loadtest:ro + working_dir: /scripts/loadtest + command: ["run", "k6-works-list.js"] + +volumes: + api-auth-store: + nginx-logs: diff --git a/deploy/container/nginx.conf b/deploy/container/nginx.conf new file mode 100644 index 00000000..ae274c96 --- /dev/null +++ b/deploy/container/nginx.conf @@ -0,0 +1,133 @@ +worker_processes auto; + +events { + worker_connections 4096; +} + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format genarrative_upstream + '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" "$http_user_agent" ' + 'request_time=$request_time upstream_connect_time=$upstream_connect_time ' + 'upstream_header_time=$upstream_header_time upstream_response_time=$upstream_response_time ' + 'upstream_status=$upstream_status request_id=$request_id'; + + upstream genarrative_api { + server api-server:8082; + keepalive 64; + } + + limit_conn_zone $binary_remote_addr zone=genarrative_api_conn:10m; + + sendfile on; + keepalive_timeout 65; + + gzip on; + gzip_vary on; + gzip_proxied any; + gzip_comp_level 5; + gzip_min_length 1024; + gzip_types + text/plain + text/css + text/javascript + application/javascript + application/json + application/xml + application/xml+rss + image/svg+xml; + + server { + listen 80; + server_name _; + + access_log /var/log/nginx/genarrative.access.log genarrative_upstream; + error_log /var/log/nginx/genarrative.error.log warn; + limit_conn_status 429; + limit_conn_log_level warn; + + root /srv/genarrative/web; + index index.html; + + location ^~ /admin/api/ { + default_type application/json; + limit_conn genarrative_api_conn 64; + + proxy_pass http://genarrative_api/admin/api/; + proxy_http_version 1.1; + proxy_set_header Connection ""; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Request-Id $request_id; + } + + location = /admin { + return 301 /admin/; + } + + location ^~ /admin/assets/ { + try_files $uri =404; + } + + location ^~ /admin/ { + try_files $uri $uri/ /admin/index.html; + } + + location ^~ /assets/ { + try_files $uri =404; + } + + location ~ ^/api(?:/|$) { + default_type application/json; + limit_conn genarrative_api_conn 64; + + proxy_pass http://genarrative_api; + proxy_http_version 1.1; + proxy_buffering off; + proxy_read_timeout 3600s; + proxy_send_timeout 3600s; + add_header X-Accel-Buffering no always; + proxy_set_header Connection ""; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Request-Id $request_id; + } + + location ~ ^/(generated-|healthz) { + return 404; + } + + location ~ ^/v1/database/[^/]+/subscribe$ { + proxy_pass http://host.docker.internal:3101; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + proxy_set_header Host $host; + proxy_read_timeout 3600s; + } + + location ^~ /v1/identity { + proxy_pass http://host.docker.internal:3101; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + proxy_set_header Host $host; + } + + location ^~ /v1/ { + return 404; + } + + location / { + try_files $uri $uri/ /index.html; + } + } +} diff --git a/deploy/container/otelcol.yaml b/deploy/container/otelcol.yaml new file mode 100644 index 00000000..f86d0155 --- /dev/null +++ b/deploy/container/otelcol.yaml @@ -0,0 +1,23 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + debug: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [debug] + metrics: + receivers: [otlp] + exporters: [debug] + logs: + receivers: [otlp] + exporters: [debug] diff --git a/deploy/nginx/genarrative-dev-http.conf b/deploy/nginx/genarrative-dev-http.conf index d9e85b16..6c9bede4 100644 --- a/deploy/nginx/genarrative-dev-http.conf +++ b/deploy/nginx/genarrative-dev-http.conf @@ -13,11 +13,15 @@ upstream genarrative_api { keepalive 64; } +limit_conn_zone $binary_remote_addr zone=genarrative_api_conn:10m; + server { listen 80; server_name genarrative.example.com; access_log /var/log/nginx/genarrative.access.log genarrative_upstream; error_log /var/log/nginx/genarrative.error.log warn; + limit_conn_status 429; + limit_conn_log_level warn; gzip on; gzip_vary on; @@ -43,6 +47,7 @@ server { location ^~ /admin/api/ { default_type application/json; + limit_conn genarrative_api_conn 64; if ($genarrative_maintenance) { return 503 '{"ok":false,"error":{"code":"MAINTENANCE","message":"服务维护中"}}'; @@ -83,6 +88,7 @@ server { # 临时兼容主站仍在使用的 /api/* HTTP facade;前端完成 SpacetimeDB SDK 迁移后删除。 location ~ ^/api(?:/|$) { default_type application/json; + limit_conn genarrative_api_conn 64; if ($genarrative_maintenance) { return 503 '{"ok":false,"error":{"code":"MAINTENANCE","message":"服务维护中"}}'; diff --git a/deploy/nginx/genarrative.conf b/deploy/nginx/genarrative.conf index e0854442..984dd130 100644 --- a/deploy/nginx/genarrative.conf +++ b/deploy/nginx/genarrative.conf @@ -11,11 +11,15 @@ upstream genarrative_api { keepalive 64; } +limit_conn_zone $binary_remote_addr zone=genarrative_api_conn:10m; + server { listen 80; server_name genarrative.example.com; access_log /var/log/nginx/genarrative.access.log genarrative_upstream; error_log /var/log/nginx/genarrative.error.log warn; + limit_conn_status 429; + limit_conn_log_level warn; location /.well-known/acme-challenge/ { root /var/www/html; @@ -59,6 +63,7 @@ server { location ^~ /admin/api/ { default_type application/json; + limit_conn genarrative_api_conn 64; if ($genarrative_maintenance) { return 503 '{"ok":false,"error":{"code":"MAINTENANCE","message":"服务维护中"}}'; @@ -99,6 +104,7 @@ server { # 临时兼容主站仍在使用的 /api/* HTTP facade;前端完成 SpacetimeDB SDK 迁移后删除。 location ~ ^/api(?:/|$) { default_type application/json; + limit_conn genarrative_api_conn 64; if ($genarrative_maintenance) { return 503 '{"ok":false,"error":{"code":"MAINTENANCE","message":"服务维护中"}}'; diff --git a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md index 478bf090..2ac833a4 100644 --- a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md +++ b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md @@ -499,7 +499,7 @@ npm run check:server-rs-ddd - 接口:`GET /api/runtime/puzzle/gallery` - 响应契约:保留 `items` 字段兼容旧前端;当前 `items` 只返回前 10 个完整卡片,新增 `previewRefs` 返回后 10 个 `workId/profileId` 引用,并返回 `hasMore`、`nextCursor` 与 `totalCount`。 -- 缓存策略:`api-server` 在 `PuzzleGalleryCache` 中缓存最终 `PuzzleGalleryResponse` DTO。缓存 miss / 过期时单飞重建,避免并发请求重复排序、映射和 JSON DTO 构造;缓存短 TTL 刷新 `recentPlayCount7d`,后台 cleanup task 周期清理超过最大空闲窗口的旧响应。 +- 缓存策略:`api-server` 在 `PuzzleGalleryCache` 中缓存最终 `PuzzleGalleryResponse` 的预序列化 data JSON。缓存 miss / 过期时单飞重建,避免并发请求重复排序、映射、DTO 深拷贝和 `serde_json::Value` 树构造;开启响应 envelope 时只按请求拼接轻量 meta,缓存短 TTL 刷新 `recentPlayCount7d`,后台 cleanup task 周期清理超过最大空闲窗口的旧响应。OTLP 通过 `genarrative.puzzle_gallery.cache.*`、`genarrative.spacetime.read.*`、`genarrative.http.server.response_bodies.in_flight` 和 `genarrative.http.server.request_permits.available` 区分缓存重建、SpacetimeDB 本地订阅读、响应 body 生命周期和 HTTP 背压状态。 - 详情路径:公开详情、点赞、游玩记录和 Remix 仍按原有 procedure / reducer 路径处理;前端拿到 `previewRefs` 后如果需要展开更多内容,应优先使用后续列表窗口能力或详情 cache,不要把自动详情预取变成新的 procedure 热点。 ### api-server 长期订阅读模型 diff --git a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md index d39d2529..f28c8fb8 100644 --- a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md +++ b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md @@ -161,6 +161,20 @@ Jenkins 按 web / api / Spacetime module / build / deploy / publish 拆分 - 作品列表短期继续由 `api-server` / BFF 订阅 SpacetimeDB 公开 read model 后读本地 cache,不让浏览器前端直接订阅完整列表;未来如新增 `public_work_gallery_entry` 等专用公开作品列表 read model,前端只可订阅稳定、低基数、公开的专用投影,禁止订阅 `puzzle_work_profile`、`custom_world_profile` 等玩法源表后自行 join、聚合或判断权限。前端直订阅落地前必须先补齐权限、字段契约、排序 / 分页、埋点和 BFF 回退策略。 - 50 HTTP req/s 验收目标为 `http_req_failed < 1%`、`p95 < 2s`、`dropped_iterations = 0`,同时压测窗口内 Nginx 无新增 502。 +容器化压测与隔离部署方案单独放在 `deploy/container/`,用于本机或预发模拟 Linux release + Nginx + OTLP Collector 拓扑,不替换当前生产 `systemd + Nginx + Jenkins` 发布路径: + +```bash +npm run container:init +npm run container:config +npm run container:build +npm run container:up +npm run container:k6 +npm run container:down +``` + +容器方案默认暴露 `http://127.0.0.1:18080`,`api-server` 在容器内监听 `0.0.0.0:8082`,Nginx 通过 `api-server:8082` upstream 反代 `/api/` 和 `/admin/api/`。SpacetimeDB 默认仍连接宿主机 `http://host.docker.internal:3101`,真实库名、token 和外部服务密钥只写本地 `deploy/container/api-server.env`,不提交 Git。完整拓扑、端口、k6 参数和 OTLP debug exporter 使用方法见 `deploy/container/README.md`。 +`npm run container:config` 默认只做 quiet 校验,避免把本地 env 中的 token 展开到终端;确需排查完整 compose 时再传 `-- --print`。 + OpenTelemetry 现阶段可选 OTLP traces / metrics / logs,但本地日志与 Nginx 文件日志仍保留: - 默认 `GENARRATIVE_OTEL_ENABLED=false`,未开启时 api-server 不依赖 Collector。 @@ -169,6 +183,10 @@ OpenTelemetry 现阶段可选 OTLP traces / metrics / logs,但本地日志与 - api-server 当前发 OTLP HTTP,`OTEL_EXPORTER_OTLP_ENDPOINT` 指向 Collector HTTP base endpoint;不要改到 gRPC `4317` 或 Rider 端口,Rider 由 Collector 通过 `RIDER_OTLP_GRPC_ENDPOINT` 转发。 - 应用日志仍通过 `journalctl -u genarrative-api.service` 查看,Nginx 日志仍写文件;日志等级继续用 `GENARRATIVE_API_LOG` / `RUST_LOG` 控制,例如 `info,tower_http=info,spacetime_client=info`。 - debug exporter / Rider 转发都会同时接收 traces、metrics 和 logs。 +- api-server 会随 metrics 发送进程级指标:`process.memory.usage`、`process.memory.virtual`、`process.thread.count`、`genarrative.process.memory.private`;Windows 额外发送 `process.windows.handle.count`,Linux 额外发送 `process.unix.file_descriptor.count`。这些指标只描述当前进程,不携带请求、用户或作品 label。 +- HTTP 运行态补充发送 `genarrative.http.server.response_bodies.in_flight` 与 `genarrative.http.server.request_permits.available`,用于区分业务 handler / 背压 permit 是否仍被占用;拼图广场热点缓存补充发送 `genarrative.puzzle_gallery.cache.*` 指标,记录命中、未命中、重建耗时和预序列化 data JSON 字节数。 +- SpacetimeDB 观测分为两类:procedure / reducer 调用继续用 `genarrative.spacetime.procedure.*`,订阅本地 cache 读使用 `genarrative.spacetime.read.*`。`read=list_puzzle_gallery` 表示拼图广场当前从 `puzzle_gallery_card_view` 本地 cache 读取,不再每个 HTTP 请求调用 `list_puzzle_gallery` procedure。 +- 本地 Windows 直连压测的内存高水位要结合 K6 VU / 连接数解释。250 RPS 下过高 `PREALLOCATED_VUS` 可能让 300 个本地 Established 连接把 `api-server` private memory 瞬时推到 GB 级,且 `/healthz` 小响应也能复现;若压测结束后回落、`response_bodies.in_flight` 和背压 permit 未显示业务积压,应优先按连接 / 发送链路高水位处理,而不是判断为 SpacetimeDB 或 JSON 缓存泄漏。 - Rider 的 Logs 面板只展示 log event 自身字段,不会自动展开父 span 的全部 attributes;请求完成日志会直接带 `request_id`、`http.request.method`、`http.route`、`url.scheme`、`url.path`、`http.response.status_code`、`status_class`、`latency_ms` 和 `slow_request`,完整链路继续到 Traces 面板按 trace/span 查看。 - 指标 label 只允许低基数字段:HTTP 使用 `method`、`route`、`status_class`,SpacetimeDB 调用使用 `procedure`、`status_class`;`request_id` 只进入 trace/log attribute,不进入 metric label。 diff --git a/package.json b/package.json index 720c0aff..4f65c0b6 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,14 @@ "test:watch": "vitest", "loadtest:extract-works": "node scripts/loadtest/extract-works-list-data.mjs", "loadtest:k6:works": "k6 run scripts/loadtest/k6-works-list.js", + "container:init": "node scripts/container-compose.mjs init", + "container:build": "node scripts/container-compose.mjs build", + "container:up": "node scripts/container-compose.mjs up", + "container:down": "node scripts/container-compose.mjs down", + "container:logs": "node scripts/container-compose.mjs logs", + "container:ps": "node scripts/container-compose.mjs ps", + "container:config": "node scripts/container-compose.mjs config", + "container:k6": "node scripts/container-compose.mjs k6", "check": "npm run lint && npm run test && npm run build && npm run check:content", "check:data": "node scripts/run-tsx.cjs scripts/validate-content.ts", "check:overrides": "node scripts/run-tsx.cjs scripts/validate-overrides.ts", diff --git a/scripts/container-compose.mjs b/scripts/container-compose.mjs new file mode 100644 index 00000000..0ee92af5 --- /dev/null +++ b/scripts/container-compose.mjs @@ -0,0 +1,99 @@ +import {spawn} from 'node:child_process'; +import {copyFileSync, existsSync} from 'node:fs'; +import path from 'node:path'; + +const [, , rawCommand = 'help', ...args] = process.argv; +const command = rawCommand.trim(); +const printComposeConfig = args.includes('--print'); +const passthroughArgs = args.filter((arg) => arg !== '--print'); +const projectRoot = process.cwd(); +const composeFile = path.join('deploy', 'container', 'docker-compose.loadtest.yml'); +const envExamplePath = path.join('deploy', 'container', 'api-server.env.example'); +const envPath = path.join('deploy', 'container', 'api-server.env'); + +const supportedCommands = new Set(['init', 'build', 'up', 'down', 'logs', 'ps', 'config', 'k6']); + +if (command === 'help' || !supportedCommands.has(command)) { + printHelp(command !== 'help'); + process.exit(command === 'help' ? 0 : 1); +} + +if (command === 'init') { + ensureEnvFile(); + process.exit(0); +} + +if (!existsSync(envPath)) { + ensureEnvFile(); + console.error('[container] 请先检查 deploy/container/api-server.env 中的 SpacetimeDB 地址、库名和 token。'); + process.exit(1); +} + +const composeArgs = buildComposeArgs(command, passthroughArgs); +const child = spawn('docker', composeArgs, { + cwd: projectRoot, + env: process.env, + stdio: 'inherit', + shell: false, +}); + +child.on('error', (error) => { + console.error(`[container] docker compose 启动失败: ${error.message}`); + console.error('[container] 请确认 Docker Desktop 或 Docker Engine 已安装,并且 docker 在 PATH 中。'); + process.exit(1); +}); + +child.on('exit', (code, signal) => { + if (signal) { + console.error(`[container] docker compose 被信号终止: ${signal}`); + process.exit(1); + } + process.exit(code ?? 0); +}); + +function buildComposeArgs(selectedCommand, extraArgs) { + const baseArgs = ['compose', '-f', composeFile]; + switch (selectedCommand) { + case 'build': + return [...baseArgs, 'build', ...extraArgs]; + case 'up': + return [...baseArgs, 'up', '-d', ...extraArgs]; + case 'down': + return [...baseArgs, 'down', ...extraArgs]; + case 'logs': + return [...baseArgs, 'logs', ...extraArgs]; + case 'ps': + return [...baseArgs, 'ps', ...extraArgs]; + case 'config': + return [...baseArgs, 'config', ...(printComposeConfig ? [] : ['--quiet']), ...extraArgs]; + case 'k6': + return [...baseArgs, '--profile', 'loadtest', 'run', '--rm', 'k6', ...extraArgs]; + default: + throw new Error(`unsupported command: ${selectedCommand}`); + } +} + +function ensureEnvFile() { + if (existsSync(envPath)) { + console.log(`[container] 已存在 ${envPath}`); + return; + } + copyFileSync(envExamplePath, envPath); + console.log(`[container] 已从 ${envExamplePath} 生成 ${envPath}`); +} + +function printHelp(isError) { + const output = isError ? console.error : console.log; + output(`Usage: npm run container: -- [docker compose args] + +Commands: + container:init 生成 deploy/container/api-server.env + container:build 构建 api-server 容器镜像 + container:up 后台启动 api-server + nginx + otelcol + container:down 停止并清理容器 + container:logs 查看容器日志 + container:ps 查看容器状态 + container:config 校验 compose 配置,传 -- --print 可展开完整配置 + container:k6 在 compose 网络内运行 k6 +`); +} diff --git a/scripts/loadtest/README.md b/scripts/loadtest/README.md index 9ae8dc31..ef2e0307 100644 --- a/scripts/loadtest/README.md +++ b/scripts/loadtest/README.md @@ -312,6 +312,25 @@ OTLP logs 是远端观测增量,不替代本地日志;api-server 日志仍 Rider 的 Logs 面板展示的是 OTLP log event 自身字段,不会自动把父 span 的全部 attributes 摊平到每一条日志。请求完成日志会直接携带 `request_id`、`http.request.method`、`http.route`、`url.scheme`、`url.path`、`http.response.status_code`、`status_class`、`latency_ms` 和 `slow_request`;更完整的请求链路仍在 Traces 面板中按同一个 trace/span 关联查看。 +压测期间可在 Metrics 面板或 debug exporter 中观察进程内存指标: + +- `process.memory.usage`:进程常驻内存 / RSS。 +- `process.memory.virtual`:进程虚拟内存;Windows 当前按 `PrivateUsage` 上报,Linux 取 `VmSize`。 +- `genarrative.process.memory.private`:进程私有内存,Windows 来自 `PrivateUsage`,Linux 近似取 `/proc/self/status` 的 `VmData`。 +- `process.thread.count`:线程数。 +- `process.windows.handle.count`:Windows 句柄数。 +- `process.unix.file_descriptor.count`:Linux 文件描述符数。 +- `genarrative.http.server.response_bodies.in_flight`:Axum / Hyper 仍持有的响应 body 数;如果内存高但该值很低,说明热点不在业务 handler 生命周期内。 +- `genarrative.http.server.request_permits.available`:应用层 HTTP 背压剩余 permit 数;如果该值未接近 0,说明没有打满 `GENARRATIVE_API_MAX_CONCURRENT_REQUESTS`。 +- `genarrative.puzzle_gallery.cache.hits` / `genarrative.puzzle_gallery.cache.misses` / `genarrative.puzzle_gallery.cache.rebuilds`:拼图广场响应缓存命中、未命中和重建次数。 +- `genarrative.puzzle_gallery.cache.rebuild.duration`:拼图广场缓存重建耗时。 +- `genarrative.puzzle_gallery.cache.data_json_bytes`:拼图广场缓存内预序列化 data JSON 大小。 +- `genarrative.spacetime.read.calls` / `genarrative.spacetime.read.duration_ms`:SpacetimeDB 订阅本地 cache 读次数和耗时;`read=list_puzzle_gallery` 表示当前路径走 view / local cache,不是 procedure。 + +若 `/api/runtime/puzzle/gallery` 单接口压测出现 GB 级瞬时内存峰值,先区分“持续泄漏”和“请求期分配峰值”:关闭 OTEL 后若峰值仍复现且压测结束后回落,主因通常不是 Collector / exporter。当前拼图广场列表命中缓存时应复用 `PuzzleGalleryCache` 中的预序列化 data JSON,只按请求拼接 envelope meta,不应每个请求重新深拷贝 `PuzzleGalleryResponse` 或构造完整 `serde_json::Value`。 + +本地 Windows 直连 `api-server` 压测还要单独看 K6 的 VU / 连接模型。已验证在 250 RPS、`PREALLOCATED_VUS=300` 时,哪怕打 `/healthz` 这种小响应,也可能因为本地 300 个 Established 连接触发 `api-server` private memory 瞬时升到约 7GB,压测结束后回落到 100MB 级;同样 250 RPS 改成 `PREALLOCATED_VUS=20 MAX_VUS=40` 后,拼图广场 p95 约 9ms,峰值降到约 600MB。这个现象说明高水位主要来自本机直连连接 / 发送链路,不等价于 SpacetimeDB 或拼图 JSON 缓存泄漏。做本地容量判断时优先让 VU 接近真实并发,避免用过高预分配 VU 把测试变成 Windows 本机连接缓冲压力测试;生产仍以 Nginx upstream keepalive、系统内存和 OTLP 指标一起判断。 + 线上回归辅助命令: ```bash diff --git a/server-rs/Cargo.lock b/server-rs/Cargo.lock index 0eb3d4a3..6661b048 100644 --- a/server-rs/Cargo.lock +++ b/server-rs/Cargo.lock @@ -131,6 +131,7 @@ dependencies = [ "urlencoding", "uuid", "webp", + "windows-sys 0.61.2", "zip", ] diff --git a/server-rs/Cargo.toml b/server-rs/Cargo.toml index 6500ac2f..bddf6c17 100644 --- a/server-rs/Cargo.toml +++ b/server-rs/Cargo.toml @@ -117,6 +117,7 @@ opentelemetry-otlp = { version = "0.31", default-features = false, features = [" opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "metrics", "logs"] } tracing-opentelemetry = { version = "0.32", default-features = false } tracing-subscriber = "0.3" +windows-sys = "0.61" url = "2" urlencoding = "2" uuid = "1" diff --git a/server-rs/crates/api-server/Cargo.toml b/server-rs/crates/api-server/Cargo.toml index 127c256f..ce4ef1e6 100644 --- a/server-rs/crates/api-server/Cargo.toml +++ b/server-rs/crates/api-server/Cargo.toml @@ -58,6 +58,9 @@ urlencoding = { workspace = true } uuid = { workspace = true, features = ["v4"] } zip = { workspace = true, features = ["deflate"] } +[target.'cfg(windows)'.dependencies] +windows-sys = { workspace = true, features = ["Win32_Foundation", "Win32_System_Diagnostics_ToolHelp", "Win32_System_ProcessStatus", "Win32_System_Threading"] } + [dev-dependencies] base64 = { workspace = true } hmac = { workspace = true } diff --git a/server-rs/crates/api-server/src/api_response.rs b/server-rs/crates/api-server/src/api_response.rs index 35a8bc64..c9e7ffee 100644 --- a/server-rs/crates/api-server/src/api_response.rs +++ b/server-rs/crates/api-server/src/api_response.rs @@ -1,4 +1,13 @@ -use axum::Json; +use std::convert::Infallible; + +use axum::{ + Json, + body::Body, + http::{HeaderValue, header}, + response::{IntoResponse, Response}, +}; +use bytes::Bytes; +use futures_util::stream; use serde::Serialize; use serde_json::Value; #[cfg(test)] @@ -32,6 +41,30 @@ where Json(serde_json::to_value(data).unwrap_or(Value::Null)) } +pub fn json_success_data_bytes_response( + request_context: Option<&RequestContext>, + data_json: Bytes, +) -> Response { + if let Some(context) = request_context + && context.wants_envelope() + { + let meta = serde_json::to_vec(&build_api_response_meta(Some(context))) + .map(Bytes::from) + .unwrap_or_else(|_| Bytes::from_static(b"null")); + let chunks = [ + Bytes::from_static(b"{\"ok\":true,\"data\":"), + data_json, + Bytes::from_static(b",\"error\":null,\"meta\":"), + meta, + Bytes::from_static(b"}"), + ]; + let stream = stream::iter(chunks.into_iter().map(Ok::)); + return json_body_response(Body::from_stream(stream)); + } + + json_bytes_response(data_json) +} + pub fn json_error_body( request_context: Option<&RequestContext>, error: &ApiErrorPayload, @@ -65,6 +98,19 @@ fn build_api_response_meta(request_context: Option<&RequestContext>) -> ApiRespo ) } +fn json_bytes_response(bytes: Bytes) -> Response { + json_body_response(Body::from(bytes)) +} + +fn json_body_response(body: Body) -> Response { + let mut response = body.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json; charset=utf-8"), + ); + response +} + #[cfg(test)] mod tests { use super::*; @@ -106,6 +152,31 @@ mod tests { assert!(body.get("meta").is_none()); } + #[tokio::test] + async fn success_response_streams_cached_data_inside_standard_envelope() { + use http_body_util::BodyExt; + + let request_context = build_request_context(true); + let response = json_success_data_bytes_response( + Some(&request_context), + Bytes::from_static(br#"{"items":[]}"#), + ); + let body = response + .into_body() + .collect() + .await + .expect("response body should collect") + .to_bytes(); + let payload: Value = serde_json::from_slice(&body).expect("body should be json"); + + assert_eq!(payload["ok"], Value::Bool(true)); + assert_eq!(payload["data"]["items"], Value::Array(Vec::new())); + assert_eq!( + payload["meta"]["requestId"], + Value::String("req-test".to_string()) + ); + } + #[test] fn error_body_returns_legacy_shape_without_envelope_header() { let request_context = build_request_context(false); diff --git a/server-rs/crates/api-server/src/backpressure.rs b/server-rs/crates/api-server/src/backpressure.rs index 4b310c56..6f9c5122 100644 --- a/server-rs/crates/api-server/src/backpressure.rs +++ b/server-rs/crates/api-server/src/backpressure.rs @@ -37,13 +37,25 @@ pub async fn limit_concurrent_requests( fn acquire_http_request_permit( permit_pool: Arc, -) -> Result { - permit_pool.try_acquire_owned() +) -> Result { + match permit_pool.clone().try_acquire_owned() { + Ok(permit) => { + crate::telemetry::update_http_request_permits_available(permit_pool.available_permits()); + Ok(HttpRequestPermitGuard { + permit: Some(permit), + permit_pool, + }) + } + Err(error) => { + crate::telemetry::update_http_request_permits_available(permit_pool.available_permits()); + Err(error) + } + } } fn hold_permit_until_response_body_dropped( response: Response, - permit: OwnedSemaphorePermit, + permit: HttpRequestPermitGuard, ) -> Response { response.map(|body| { Body::new(body.map_frame(move |frame| { @@ -53,6 +65,18 @@ fn hold_permit_until_response_body_dropped( }) } +struct HttpRequestPermitGuard { + permit: Option, + permit_pool: Arc, +} + +impl Drop for HttpRequestPermitGuard { + fn drop(&mut self) { + drop(self.permit.take()); + crate::telemetry::update_http_request_permits_available(self.permit_pool.available_permits()); + } +} + fn reject_overloaded_request(request: &Request) -> Response { let request_context = request.extensions().get::().cloned(); let mut response = AppError::from_status(StatusCode::TOO_MANY_REQUESTS) diff --git a/server-rs/crates/api-server/src/main.rs b/server-rs/crates/api-server/src/main.rs index 4c758721..665f3526 100644 --- a/server-rs/crates/api-server/src/main.rs +++ b/server-rs/crates/api-server/src/main.rs @@ -56,6 +56,7 @@ mod password_management; mod phone_auth; mod platform_errors; mod profile_identity; +mod process_metrics; mod prompt; mod puzzle; mod puzzle_agent_turn; @@ -140,6 +141,8 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> { enabled: config.otel_enabled, }, )?; + process_metrics::register_process_metrics(); + telemetry::register_http_runtime_metrics(); let bind_address = config.bind_socket_addr(); let listen_backlog = config.listen_backlog; diff --git a/server-rs/crates/api-server/src/process_metrics.rs b/server-rs/crates/api-server/src/process_metrics.rs new file mode 100644 index 00000000..5f27c8b8 --- /dev/null +++ b/server-rs/crates/api-server/src/process_metrics.rs @@ -0,0 +1,306 @@ +use std::sync::OnceLock; + +use opentelemetry::global; +use tracing::warn; + +// 进程指标只描述 api-server 自身,不携带请求、用户或作品维度,避免 OTLP 指标高基数膨胀。 +pub(crate) fn register_process_metrics() { + static REGISTERED: OnceLock<()> = OnceLock::new(); + REGISTERED.get_or_init(register_process_metrics_once); +} + +fn register_process_metrics_once() { + let meter = global::meter("genarrative-api"); + + meter + .i64_observable_up_down_counter("process.memory.usage") + .with_unit("By") + .with_description("api-server process physical memory usage") + .with_callback(|observer| { + let Some(snapshot) = ProcessMetricsSnapshot::collect() else { + return; + }; + observer.observe(to_i64(snapshot.rss_bytes), &[]); + }) + .build(); + + meter + .i64_observable_up_down_counter("process.memory.virtual") + .with_unit("By") + .with_description("api-server committed virtual memory") + .with_callback(|observer| { + let Some(snapshot) = ProcessMetricsSnapshot::collect() else { + return; + }; + if let Some(virtual_bytes) = snapshot.virtual_bytes { + observer.observe(to_i64(virtual_bytes), &[]); + } + }) + .build(); + + meter + .i64_observable_up_down_counter("genarrative.process.memory.private") + .with_unit("By") + .with_description("api-server private memory for local diagnostics") + .with_callback(|observer| { + let Some(snapshot) = ProcessMetricsSnapshot::collect() else { + return; + }; + if let Some(private_bytes) = snapshot.private_bytes { + observer.observe(to_i64(private_bytes), &[]); + } + }) + .build(); + + meter + .i64_observable_up_down_counter("process.thread.count") + .with_unit("{thread}") + .with_description("api-server process thread count") + .with_callback(|observer| { + let Some(snapshot) = ProcessMetricsSnapshot::collect() else { + return; + }; + observer.observe(to_i64(snapshot.thread_count), &[]); + }) + .build(); + + meter + .i64_observable_up_down_counter("process.windows.handle.count") + .with_unit("{handle}") + .with_description("api-server process handle count on Windows") + .with_callback(|observer| { + let Some(snapshot) = ProcessMetricsSnapshot::collect() else { + return; + }; + if let Some(handle_count) = snapshot.windows_handle_count { + observer.observe(to_i64(handle_count), &[]); + } + }) + .build(); + + meter + .i64_observable_up_down_counter("process.unix.file_descriptor.count") + .with_unit("{file_descriptor}") + .with_description("api-server process file descriptor count on Unix") + .with_callback(|observer| { + let Some(snapshot) = ProcessMetricsSnapshot::collect() else { + return; + }; + if let Some(fd_count) = snapshot.unix_fd_count { + observer.observe(to_i64(fd_count), &[]); + } + }) + .build(); +} + +fn to_i64(value: u64) -> i64 { + value.min(i64::MAX as u64) as i64 +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct ProcessMetricsSnapshot { + rss_bytes: u64, + private_bytes: Option, + virtual_bytes: Option, + thread_count: u64, + windows_handle_count: Option, + unix_fd_count: Option, +} + +impl ProcessMetricsSnapshot { + fn collect() -> Option { + collect_process_metrics() + .inspect_err(|error| { + warn!(%error, "采集 api-server 进程内存指标失败"); + }) + .ok() + } +} + +#[cfg(windows)] +fn collect_process_metrics() -> Result { + use windows_sys::Win32::{ + System::{ + ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS_EX}, + Threading::{GetCurrentProcess, GetCurrentProcessId, GetProcessHandleCount}, + }, + }; + + let handle = unsafe { GetCurrentProcess() }; + let mut counters = PROCESS_MEMORY_COUNTERS_EX { + cb: std::mem::size_of::() as u32, + ..Default::default() + }; + let ok = unsafe { + GetProcessMemoryInfo( + handle, + std::ptr::addr_of_mut!(counters).cast(), + counters.cb, + ) + }; + if ok == 0 { + return Err("GetProcessMemoryInfo returned false".to_string()); + } + + let mut handle_count = 0_u32; + let handle_count = if unsafe { GetProcessHandleCount(handle, &mut handle_count) } == 0 { + None + } else { + Some(u64::from(handle_count)) + }; + + Ok(ProcessMetricsSnapshot { + rss_bytes: counters.WorkingSetSize as u64, + private_bytes: Some(counters.PrivateUsage as u64), + virtual_bytes: Some(counters.PrivateUsage as u64), + thread_count: u64::from(unsafe { GetCurrentProcessId() }.thread_count()?), + windows_handle_count: handle_count, + unix_fd_count: None, + }) +} + +#[cfg(windows)] +trait WindowsProcessThreadCount { + fn thread_count(self) -> Result; +} + +#[cfg(windows)] +impl WindowsProcessThreadCount for u32 { + fn thread_count(self) -> Result { + use windows_sys::Win32::{ + Foundation::{CloseHandle, INVALID_HANDLE_VALUE}, + System::Diagnostics::ToolHelp::{ + CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next, + TH32CS_SNAPPROCESS, + }, + }; + + let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) }; + if snapshot == INVALID_HANDLE_VALUE { + return Err("CreateToolhelp32Snapshot returned INVALID_HANDLE_VALUE".to_string()); + } + + let mut entry = PROCESSENTRY32 { + dwSize: std::mem::size_of::() as u32, + ..Default::default() + }; + let mut found = None; + let mut ok = unsafe { Process32First(snapshot, &mut entry) }; + while ok != 0 { + if entry.th32ProcessID == self { + found = Some(entry.cntThreads); + break; + } + ok = unsafe { Process32Next(snapshot, &mut entry) }; + } + unsafe { + CloseHandle(snapshot); + } + + found.ok_or_else(|| format!("process {self} not found in ToolHelp snapshot")) + } +} + +#[cfg(target_os = "linux")] +fn collect_process_metrics() -> Result { + let status = std::fs::read_to_string("/proc/self/status") + .map_err(|error| format!("read /proc/self/status failed: {error}"))?; + let statm = std::fs::read_to_string("/proc/self/statm") + .map_err(|error| format!("read /proc/self/statm failed: {error}"))?; + let page_size = linux_page_size_bytes()?; + + let rss_bytes = parse_status_kb(&status, "VmRSS:") + .map(|value| value * 1024) + .or_else(|| parse_statm_pages(&statm, 1).map(|value| value * page_size)) + .ok_or_else(|| "missing VmRSS/statm resident field".to_string())?; + let virtual_bytes = parse_status_kb(&status, "VmSize:") + .map(|value| value * 1024) + .or_else(|| parse_statm_pages(&statm, 0).map(|value| value * page_size)) + .ok_or_else(|| "missing VmSize/statm size field".to_string())?; + let private_bytes = parse_status_kb(&status, "VmData:").map(|value| value * 1024); + let thread_count = parse_status_u64(&status, "Threads:") + .ok_or_else(|| "missing Threads field".to_string())?; + + Ok(ProcessMetricsSnapshot { + rss_bytes, + private_bytes, + virtual_bytes: Some(virtual_bytes), + thread_count, + windows_handle_count: None, + unix_fd_count: linux_fd_count(), + }) +} + +#[cfg(target_os = "linux")] +fn linux_page_size_bytes() -> Result { + let output = std::process::Command::new("getconf") + .arg("PAGESIZE") + .output() + .map_err(|error| format!("getconf PAGESIZE failed: {error}"))?; + if !output.status.success() { + return Err(format!("getconf PAGESIZE exited with {}", output.status)); + } + let text = String::from_utf8(output.stdout) + .map_err(|error| format!("getconf PAGESIZE output is not utf8: {error}"))?; + text.trim() + .parse::() + .map_err(|error| format!("parse PAGESIZE failed: {error}")) +} + +#[cfg(target_os = "linux")] +fn linux_fd_count() -> Option { + let entries = std::fs::read_dir("/proc/self/fd").ok()?; + Some(entries.filter_map(Result::ok).count() as u64) +} + +#[cfg(target_os = "linux")] +fn parse_status_kb(status: &str, key: &str) -> Option { + parse_status_u64(status, key) +} + +#[cfg(target_os = "linux")] +fn parse_status_u64(status: &str, key: &str) -> Option { + status.lines().find_map(|line| { + let rest = line.strip_prefix(key)?.trim(); + rest.split_whitespace().next()?.parse::().ok() + }) +} + +#[cfg(target_os = "linux")] +fn parse_statm_pages(statm: &str, index: usize) -> Option { + statm + .split_whitespace() + .nth(index)? + .parse::() + .ok() +} + +#[cfg(not(any(windows, target_os = "linux")))] +fn collect_process_metrics() -> Result { + Err("process metrics are only implemented for Windows and Linux".to_string()) +} + +#[cfg(test)] +mod tests { + #[cfg(target_os = "linux")] + use super::{parse_statm_pages, parse_status_kb, parse_status_u64}; + + #[cfg(target_os = "linux")] + #[test] + fn parses_linux_proc_status_memory_fields() { + let status = "Name:\tapi-server\nVmSize:\t 123456 kB\nVmRSS:\t 7890 kB\nVmData:\t 3456 kB\nThreads:\t37\n"; + + assert_eq!(parse_status_kb(status, "VmRSS:"), Some(7890)); + assert_eq!(parse_status_kb(status, "VmSize:"), Some(123456)); + assert_eq!(parse_status_kb(status, "VmData:"), Some(3456)); + assert_eq!(parse_status_u64(status, "Threads:"), Some(37)); + } + + #[cfg(target_os = "linux")] + #[test] + fn parses_linux_statm_pages() { + assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 0), Some(100)); + assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 1), Some(20)); + assert_eq!(parse_statm_pages("100 20", 7), None); + } +} diff --git a/server-rs/crates/api-server/src/puzzle.rs b/server-rs/crates/api-server/src/puzzle.rs index cdf24f7e..4619c613 100644 --- a/server-rs/crates/api-server/src/puzzle.rs +++ b/server-rs/crates/api-server/src/puzzle.rs @@ -1529,15 +1529,19 @@ pub async fn claim_puzzle_work_point_incentive( pub async fn list_puzzle_gallery( State(state): State, Extension(request_context): Extension, -) -> Result, Response> { +) -> Result { if let Some(response) = state.puzzle_gallery_cache().read_fresh_response().await { + crate::telemetry::record_puzzle_gallery_cache_hit(); return Ok(puzzle_gallery_cached_json(&request_context, response)); } + crate::telemetry::record_puzzle_gallery_cache_miss(); let _rebuild_guard = state.puzzle_gallery_cache().acquire_rebuild_guard().await; if let Some(response) = state.puzzle_gallery_cache().read_fresh_response().await { + crate::telemetry::record_puzzle_gallery_cache_hit(); return Ok(puzzle_gallery_cached_json(&request_context, response)); } + let rebuild_started_at = std::time::Instant::now(); let items = state .spacetime_client() .list_puzzle_gallery() @@ -1556,12 +1560,26 @@ pub async fn list_puzzle_gallery( .map(|item| map_puzzle_gallery_card_response(&state, item)) .collect(), ); - state + let cached_response = state .puzzle_gallery_cache() - .store_response(response.clone()) - .await; + .store_response(response) + .await + .map_err(|error| { + puzzle_error_response( + &request_context, + PUZZLE_GALLERY_PROVIDER, + AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ + "provider": PUZZLE_GALLERY_PROVIDER, + "message": format!("拼图广场缓存序列化失败:{error}"), + })), + ) + })?; + crate::telemetry::record_puzzle_gallery_cache_rebuild( + rebuild_started_at.elapsed(), + cached_response.data_json_len(), + ); - Ok(json_success_body(Some(&request_context), response)) + Ok(puzzle_gallery_cached_json(&request_context, cached_response)) } pub async fn get_puzzle_gallery_detail( diff --git a/server-rs/crates/api-server/src/puzzle_gallery_cache.rs b/server-rs/crates/api-server/src/puzzle_gallery_cache.rs index 179cb091..a6b9eb7d 100644 --- a/server-rs/crates/api-server/src/puzzle_gallery_cache.rs +++ b/server-rs/crates/api-server/src/puzzle_gallery_cache.rs @@ -3,8 +3,8 @@ use std::{ time::{Duration, Instant}, }; -use axum::Json; -use serde_json::Value; +use axum::response::Response; +use bytes::Bytes; use shared_contracts::{ puzzle_gallery::{PuzzleGalleryResponse, PuzzleGalleryWorkRefResponse}, puzzle_works::PuzzleWorkSummaryResponse, @@ -14,7 +14,7 @@ use tokio::{ time, }; -use crate::{api_response::json_success_body, request_context::RequestContext}; +use crate::{api_response::json_success_data_bytes_response, request_context::RequestContext}; const PUZZLE_GALLERY_PRIMARY_ITEM_COUNT: usize = 10; const PUZZLE_GALLERY_PREVIEW_REF_COUNT: usize = 10; @@ -30,10 +30,21 @@ pub struct PuzzleGalleryCache { #[derive(Clone, Debug)] struct PuzzleGalleryCacheEntry { - response: PuzzleGalleryResponse, + data_json: Bytes, built_at: Instant, } +#[derive(Clone, Debug)] +pub struct PuzzleGalleryCachedResponse { + data_json: Bytes, +} + +impl PuzzleGalleryCachedResponse { + pub fn data_json_len(&self) -> usize { + self.data_json.len() + } +} + impl PuzzleGalleryCache { pub fn new() -> Self { Self { @@ -46,22 +57,31 @@ impl PuzzleGalleryCache { self.rebuild_lock.lock().await } - pub async fn read_fresh_response(&self) -> Option { + pub async fn read_fresh_response(&self) -> Option { let guard = self.inner.read().await; let entry = guard.as_ref()?; let now = Instant::now(); if now.duration_since(entry.built_at) > PUZZLE_GALLERY_CACHE_TTL { return None; } - Some(entry.response.clone()) + Some(PuzzleGalleryCachedResponse { + data_json: entry.data_json.clone(), + }) } - pub async fn store_response(&self, response: PuzzleGalleryResponse) { + pub async fn store_response( + &self, + response: PuzzleGalleryResponse, + ) -> Result { let now = Instant::now(); + let cached = PuzzleGalleryCachedResponse { + data_json: Bytes::from(serde_json::to_vec(&response)?), + }; *self.inner.write().await = Some(PuzzleGalleryCacheEntry { - response, + data_json: cached.data_json.clone(), built_at: now, }); + Ok(cached) } pub fn spawn_cleanup_task(&self) { @@ -118,9 +138,9 @@ pub fn build_puzzle_gallery_window_response( pub fn puzzle_gallery_cached_json( request_context: &RequestContext, - response: PuzzleGalleryResponse, -) -> Json { - json_success_body(Some(request_context), response) + response: PuzzleGalleryCachedResponse, +) -> Response { + json_success_data_bytes_response(Some(request_context), response.data_json) } #[cfg(test)] diff --git a/server-rs/crates/api-server/src/telemetry.rs b/server-rs/crates/api-server/src/telemetry.rs index 40347d8d..39643976 100644 --- a/server-rs/crates/api-server/src/telemetry.rs +++ b/server-rs/crates/api-server/src/telemetry.rs @@ -4,11 +4,19 @@ use axum::{ http::{HeaderMap, Request, Response}, middleware::Next, }; +use http_body_util::BodyExt; use opentelemetry::{KeyValue, global, metrics::Counter}; +use std::sync::{ + Arc, OnceLock, + atomic::{AtomicI64, Ordering}, +}; use tracing::{info, warn}; use crate::{request_context::resolve_request_id, state::AppState}; +static HTTP_RESPONSE_BODY_IN_FLIGHT: AtomicI64 = AtomicI64::new(0); +static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock> = OnceLock::new(); + // 集中维护 api-server HTTP 观测,避免在 handler 中散落高基数字段或重复创建 instrument。 pub async fn record_http_observability( State(state): State, @@ -67,7 +75,46 @@ pub async fn record_http_observability( ); } - response + track_response_body_in_flight(response) +} + +pub(crate) fn update_http_request_permits_available(available: usize) { + let gauge = HTTP_REQUEST_PERMITS_AVAILABLE.get_or_init(|| { + let gauge = Arc::new(AtomicI64::new(0)); + register_http_request_permits_available_metric(gauge.clone()); + gauge + }); + gauge.store(available.min(i64::MAX as usize) as i64, Ordering::Relaxed); +} + +pub(crate) fn record_puzzle_gallery_cache_hit() { + puzzle_gallery_cache_metrics().hits.add(1, &[]); +} + +pub(crate) fn record_puzzle_gallery_cache_miss() { + puzzle_gallery_cache_metrics().misses.add(1, &[]); +} + +pub(crate) fn record_puzzle_gallery_cache_rebuild(duration: std::time::Duration, data_bytes: usize) { + let metrics = puzzle_gallery_cache_metrics(); + metrics.rebuilds.add(1, &[]); + metrics + .rebuild_duration + .record(duration.as_secs_f64(), &[]); + metrics + .data_json_bytes + .record(data_bytes.min(u64::MAX as usize) as u64, &[]); +} + +fn track_response_body_in_flight(response: Response) -> Response { + response.map(|body| { + HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_add(1, Ordering::Relaxed); + let guard = ResponseBodyInFlightGuard; + Body::new(body.map_frame(move |frame| { + let _guard = &guard; + frame + })) + }) } struct HttpMetrics { @@ -76,6 +123,22 @@ struct HttpMetrics { duration: opentelemetry::metrics::Histogram, } +struct PuzzleGalleryCacheMetrics { + hits: Counter, + misses: Counter, + rebuilds: Counter, + rebuild_duration: opentelemetry::metrics::Histogram, + data_json_bytes: opentelemetry::metrics::Histogram, +} + +struct ResponseBodyInFlightGuard; + +impl Drop for ResponseBodyInFlightGuard { + fn drop(&mut self) { + HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_sub(1, Ordering::Relaxed); + } +} + fn http_metrics() -> &'static HttpMetrics { static METRICS: std::sync::OnceLock = std::sync::OnceLock::new(); METRICS.get_or_init(|| { @@ -99,6 +162,64 @@ fn http_metrics() -> &'static HttpMetrics { }) } +fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics { + static METRICS: std::sync::OnceLock = std::sync::OnceLock::new(); + METRICS.get_or_init(|| { + let meter = global::meter("genarrative-api"); + PuzzleGalleryCacheMetrics { + hits: meter + .u64_counter("genarrative.puzzle_gallery.cache.hits") + .with_description("Puzzle gallery response cache hits") + .build(), + misses: meter + .u64_counter("genarrative.puzzle_gallery.cache.misses") + .with_description("Puzzle gallery response cache misses") + .build(), + rebuilds: meter + .u64_counter("genarrative.puzzle_gallery.cache.rebuilds") + .with_description("Puzzle gallery response cache rebuild count") + .build(), + rebuild_duration: meter + .f64_histogram("genarrative.puzzle_gallery.cache.rebuild.duration") + .with_unit("s") + .with_description("Puzzle gallery response cache rebuild duration") + .build(), + data_json_bytes: meter + .u64_histogram("genarrative.puzzle_gallery.cache.data_json_bytes") + .with_unit("By") + .with_description("Serialized puzzle gallery data JSON size") + .build(), + } + }) +} + +fn register_http_request_permits_available_metric(gauge: Arc) { + let meter = global::meter("genarrative-api"); + meter + .i64_observable_up_down_counter("genarrative.http.server.request_permits.available") + .with_unit("{permit}") + .with_description("Available api-server HTTP backpressure permits") + .with_callback(move |observer| { + observer.observe(gauge.load(Ordering::Relaxed), &[]); + }) + .build(); +} + +pub(crate) fn register_http_runtime_metrics() { + static REGISTERED: OnceLock<()> = OnceLock::new(); + REGISTERED.get_or_init(|| { + let meter = global::meter("genarrative-api"); + meter + .i64_observable_up_down_counter("genarrative.http.server.response_bodies.in_flight") + .with_unit("{response}") + .with_description("HTTP response bodies still owned by Axum/Hyper") + .with_callback(|observer| { + observer.observe(HTTP_RESPONSE_BODY_IN_FLIGHT.load(Ordering::Relaxed), &[]); + }) + .build(); + }); +} + fn http_base_labels(method: String, route: String) -> Vec { vec![ KeyValue::new("http.request.method", method), diff --git a/server-rs/crates/spacetime-client/src/big_fish.rs b/server-rs/crates/spacetime-client/src/big_fish.rs index d9677e29..63325f3c 100644 --- a/server-rs/crates/spacetime-client/src/big_fish.rs +++ b/server-rs/crates/spacetime-client/src/big_fish.rs @@ -74,7 +74,7 @@ impl SpacetimeClient { pub async fn list_big_fish_gallery( &self, ) -> Result, SpacetimeClientError> { - self.read_after_connect(move |connection| { + self.read_after_connect("list_big_fish_gallery", move |connection| { let recent_play_counts = public_work_recent_play_counts(connection, "big-fish"); let mut items = connection .db() diff --git a/server-rs/crates/spacetime-client/src/custom_world.rs b/server-rs/crates/spacetime-client/src/custom_world.rs index d201c616..9d5f5285 100644 --- a/server-rs/crates/spacetime-client/src/custom_world.rs +++ b/server-rs/crates/spacetime-client/src/custom_world.rs @@ -199,7 +199,7 @@ impl SpacetimeClient { async fn read_custom_world_gallery_entries_from_cache( &self, ) -> Result, SpacetimeClientError> { - self.read_after_connect(move |connection| { + self.read_after_connect("list_custom_world_gallery", move |connection| { let recent_play_counts = public_work_recent_play_counts(connection, "custom-world"); let mut entries = connection .db() diff --git a/server-rs/crates/spacetime-client/src/lib.rs b/server-rs/crates/spacetime-client/src/lib.rs index be1f7e99..b3b33e7d 100644 --- a/server-rs/crates/spacetime-client/src/lib.rs +++ b/server-rs/crates/spacetime-client/src/lib.rs @@ -407,12 +407,21 @@ impl SpacetimeClient { async fn read_after_connect( &self, + read_name: &'static str, read: impl FnOnce(&DbConnection) -> Result + Send + 'static, ) -> Result where T: Send + 'static, { - let lease = self.acquire_connection().await?; + let metrics_guard = telemetry::begin_read(read_name); + let lease = match self.acquire_connection().await { + Ok(lease) => lease, + Err(error) => { + let final_result = Err(error); + metrics_guard.finish(&final_result); + return final_result; + } + }; let final_result = if let Some(connection) = lease.connection.as_ref() { read(&connection.connection) } else { @@ -422,6 +431,7 @@ impl SpacetimeClient { }; self.release_connection(lease).await; + metrics_guard.finish(&final_result); final_result } diff --git a/server-rs/crates/spacetime-client/src/match3d.rs b/server-rs/crates/spacetime-client/src/match3d.rs index e4b54328..df7fb762 100644 --- a/server-rs/crates/spacetime-client/src/match3d.rs +++ b/server-rs/crates/spacetime-client/src/match3d.rs @@ -225,7 +225,7 @@ impl SpacetimeClient { pub async fn list_match3d_gallery( &self, ) -> Result, SpacetimeClientError> { - self.read_after_connect(move |connection| { + self.read_after_connect("list_match3d_gallery", move |connection| { let mut items = connection .db() .match_3_d_gallery_view() diff --git a/server-rs/crates/spacetime-client/src/puzzle.rs b/server-rs/crates/spacetime-client/src/puzzle.rs index 517d62d6..f6ddd839 100644 --- a/server-rs/crates/spacetime-client/src/puzzle.rs +++ b/server-rs/crates/spacetime-client/src/puzzle.rs @@ -403,7 +403,7 @@ impl SpacetimeClient { pub async fn list_puzzle_gallery( &self, ) -> Result, SpacetimeClientError> { - self.read_after_connect(move |connection| { + self.read_after_connect("list_puzzle_gallery", move |connection| { let mut items = connection .db() .puzzle_gallery_card_view() diff --git a/server-rs/crates/spacetime-client/src/runtime.rs b/server-rs/crates/spacetime-client/src/runtime.rs index da9cd9d7..baac3495 100644 --- a/server-rs/crates/spacetime-client/src/runtime.rs +++ b/server-rs/crates/spacetime-client/src/runtime.rs @@ -5,7 +5,7 @@ impl SpacetimeClient { &self, ) -> Result { match self - .read_after_connect(move |connection| { + .read_after_connect("get_creation_entry_config", move |connection| { let config_id = module_runtime::CREATION_ENTRY_CONFIG_GLOBAL_ID.to_string(); let header = connection .db() diff --git a/server-rs/crates/spacetime-client/src/square_hole.rs b/server-rs/crates/spacetime-client/src/square_hole.rs index d59c2c56..0b8e9e26 100644 --- a/server-rs/crates/spacetime-client/src/square_hole.rs +++ b/server-rs/crates/spacetime-client/src/square_hole.rs @@ -228,7 +228,7 @@ impl SpacetimeClient { pub async fn list_square_hole_gallery( &self, ) -> Result, SpacetimeClientError> { - self.read_after_connect(move |connection| { + self.read_after_connect("list_square_hole_gallery", move |connection| { let mut items = connection .db() .square_hole_gallery_view() diff --git a/server-rs/crates/spacetime-client/src/telemetry.rs b/server-rs/crates/spacetime-client/src/telemetry.rs index 4893b1fd..c89e0f19 100644 --- a/server-rs/crates/spacetime-client/src/telemetry.rs +++ b/server-rs/crates/spacetime-client/src/telemetry.rs @@ -10,6 +10,11 @@ pub(crate) struct ProcedureMetricsGuard { started_at: std::time::Instant, } +pub(crate) struct ReadMetricsGuard { + read: &'static str, + started_at: std::time::Instant, +} + pub(crate) fn begin_procedure(procedure: &'static str) -> ProcedureMetricsGuard { ProcedureMetricsGuard { procedure, @@ -17,6 +22,13 @@ pub(crate) fn begin_procedure(procedure: &'static str) -> ProcedureMetricsGuard } } +pub(crate) fn begin_read(read: &'static str) -> ReadMetricsGuard { + ReadMetricsGuard { + read, + started_at: std::time::Instant::now(), + } +} + impl ProcedureMetricsGuard { pub(crate) fn finish(&self, result: &Result) { let duration = self.started_at.elapsed(); @@ -24,10 +36,20 @@ impl ProcedureMetricsGuard { } } +impl ReadMetricsGuard { + pub(crate) fn finish(&self, result: &Result) { + let duration = self.started_at.elapsed(); + record_read(self.read, duration, result.is_err()); + } +} + struct SpacetimeMetrics { calls: Counter, errors: Counter, duration_ms: opentelemetry::metrics::Histogram, + read_calls: Counter, + read_errors: Counter, + read_duration_ms: opentelemetry::metrics::Histogram, } fn spacetime_metrics() -> &'static SpacetimeMetrics { @@ -48,6 +70,19 @@ fn spacetime_metrics() -> &'static SpacetimeMetrics { .with_unit("ms") .with_description("SpacetimeDB procedure duration in milliseconds") .build(), + read_calls: meter + .u64_counter("genarrative.spacetime.read.calls") + .with_description("SpacetimeDB local subscription cache read count") + .build(), + read_errors: meter + .u64_counter("genarrative.spacetime.read.errors") + .with_description("SpacetimeDB local subscription cache read error count") + .build(), + read_duration_ms: meter + .f64_histogram("genarrative.spacetime.read.duration_ms") + .with_unit("ms") + .with_description("SpacetimeDB local subscription cache read duration in milliseconds") + .build(), } }) } @@ -66,3 +101,18 @@ fn record_procedure(procedure: &'static str, duration: Duration, failed: bool) { metrics.errors.add(1, &labels); } } + +fn record_read(read: &'static str, duration: Duration, failed: bool) { + let labels = vec![ + KeyValue::new("read", read), + KeyValue::new("status_class", if failed { "error" } else { "ok" }), + ]; + let metrics = spacetime_metrics(); + metrics.read_calls.add(1, &labels); + metrics + .read_duration_ms + .record(duration.as_secs_f64() * 1000.0, &labels); + if failed { + metrics.read_errors.add(1, &labels); + } +} diff --git a/server-rs/crates/spacetime-client/src/visual_novel.rs b/server-rs/crates/spacetime-client/src/visual_novel.rs index 5d736ffb..3454298f 100644 --- a/server-rs/crates/spacetime-client/src/visual_novel.rs +++ b/server-rs/crates/spacetime-client/src/visual_novel.rs @@ -239,7 +239,7 @@ impl SpacetimeClient { pub async fn list_visual_novel_gallery( &self, ) -> Result, SpacetimeClientError> { - self.read_after_connect(move |connection| { + self.read_after_connect("list_visual_novel_gallery", move |connection| { let mut items = connection .db() .visual_novel_gallery_view()