diff --git a/.hermes/shared-memory/decision-log.md b/.hermes/shared-memory/decision-log.md index 060e73c7..0c062b55 100644 --- a/.hermes/shared-memory/decision-log.md +++ b/.hermes/shared-memory/decision-log.md @@ -551,3 +551,11 @@ - 影响范围:用户侧任务中心、后台任务配置、运营查询、埋点查询、钱包流水。 - 验证方式:非 `user` scope 的个人任务配置应被 API 和领域构造层拒绝;任务查询与埋点查询分别放在 `docs/operations/` 和 `docs/tracking/`。 - 关联文档:`PROFILE_TASK_AND_TRACKING_SYSTEM_2026-05-03.md`、`RUNTIME_PROFILE_TASK_SCOPE_2026-05-04.md`、`ANALYTICS_DATE_DIMENSION_IMPLEMENTATION_2026-05-04.md`。 + +## 普通 route tracking 先写本机 outbox 再批量入库 + +- 背景:公开作品列表压测中,成功响应后的全局 route tracking 会逐条调用 SpacetimeDB,导致数据库内存和事务压力先到边界。 +- 决策:普通 HTTP route tracking 先写入 `api-server` 本机 NDJSON outbox,后台按数量或时间阈值批量调用 SpacetimeDB;`daily_login`、`work_play_start`、支付、任务领奖、钱包等关键事件保持同步直写。 +- 默认阈值:每批 500 条或 1 秒 flush 一次;outbox 磁盘上限 256 MiB,超过后丢弃低价值 route 事件并记录指标 / 日志。 +- 影响范围:`api-server` tracking 中间件、SpacetimeDB tracking procedure、部署数据目录、OTLP 指标和运维排障。 +- 验证方式:数据库不可用时公开 route 请求不失败且 outbox 文件保留;恢复后批量写入成功并删除本地 sealed 文件;关键事件仍立即影响任务 / 统计。 diff --git a/.hermes/shared-memory/pitfalls.md b/.hermes/shared-memory/pitfalls.md index ccc90e04..f6a9e8d6 100644 --- a/.hermes/shared-memory/pitfalls.md +++ b/.hermes/shared-memory/pitfalls.md @@ -848,6 +848,14 @@ - 验证:宿主机 k6 打 `http://127.0.0.1:18080`,`PEAK_RPS=1000` 等价约 2000 HTTP req/s;320 档无 dropped iterations、无 5xx、无 OOM,200 请求 `request_time p95` 约 0.292s。336 / 352 档 p95 升到约 0.31s / 0.32s,SpacetimeDB 内存尾部可到约 `880MiB / 896MiB`。 - 关联:`deploy/container/nginx.conf`、`deploy/container/api-server.env.example`、`deploy/container/README.md`、`server-rs/crates/api-server/src/tracking.rs`。 +## tracking outbox 成功入库后删除 sealed 文件 + +- 现象:普通 route tracking 改为本机 outbox 后,容易误以为入库成功只需要清空文件内容。 +- 原因:清空文件会扩大崩溃窗口,进程在 truncate 和确认之间异常退出时可能丢失未确认事件。 +- 处理:当前 active NDJSON 达到数量或时间阈值后原子 rename 为 sealed 文件;后台批量 flush sealed 文件,SpacetimeDB 返回成功后直接删除该文件,失败则保留文件等待重试。sealed 文件如果出现无法解析的坏行,重命名为 `corrupt-*` 隔离并记录指标,避免阻塞后续批量入库。该路径是至少一次投递,重复事件由 `tracking_event.event_id` 幂等跳过。 +- 验证:模拟 SpacetimeDB 不可用时 sealed 文件保留;恢复后批量 procedure 成功,sealed 文件消失,`tracking_event` 与 `tracking_daily_stat` 均更新。 +- 关联:`docs/【开发运维】本地开发验证与生产运维-2026-05-15.md`、`server-rs/crates/api-server/src/tracking.rs`、`server-rs/crates/spacetime-module/src/runtime/profile.rs`。 + ## 后台表查询展示 SpacetimeDB 枚举时不要套用 Option 解码 - 现象:后台“表查询”查看 `profile_recharge_order` 时,`kind` 和 `status` 显示为空数组 `[]`,例如充值订单原始行里 `points_60` 的类型和状态都不可读。 diff --git a/deploy/container/api-server.Dockerfile b/deploy/container/api-server.Dockerfile index 40897357..1a0c1eaa 100644 --- a/deploy/container/api-server.Dockerfile +++ b/deploy/container/api-server.Dockerfile @@ -15,7 +15,7 @@ RUN apt-get update && \ COPY --from=rust-builder /tmp/api-server /usr/local/bin/api-server -RUN mkdir -p /var/lib/genarrative/auth && \ +RUN mkdir -p /var/lib/genarrative/auth /var/lib/genarrative/tracking-outbox && \ chown -R genarrative:genarrative /srv/genarrative /var/lib/genarrative USER genarrative @@ -24,7 +24,8 @@ EXPOSE 8082 ENV GENARRATIVE_ENV=container \ GENARRATIVE_API_HOST=0.0.0.0 \ GENARRATIVE_API_PORT=8082 \ - GENARRATIVE_AUTH_STORE_PATH=/var/lib/genarrative/auth/auth-store.json + GENARRATIVE_AUTH_STORE_PATH=/var/lib/genarrative/auth/auth-store.json \ + GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox CMD ["api-server"] diff --git a/deploy/container/api-server.env.example b/deploy/container/api-server.env.example index e2fad8c5..6c559c0e 100644 --- a/deploy/container/api-server.env.example +++ b/deploy/container/api-server.env.example @@ -12,6 +12,11 @@ GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=512 GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS=320 GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS=64 GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS=16 +GENARRATIVE_TRACKING_OUTBOX_ENABLED=true +GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox +GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE=500 +GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS=1000 +GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES=268435456 GENARRATIVE_OTEL_ENABLED=false OTEL_SERVICE_NAME=genarrative-api diff --git a/deploy/container/docker-compose.loadtest.yml b/deploy/container/docker-compose.loadtest.yml index 29b6b73e..c7e00cbc 100644 --- a/deploy/container/docker-compose.loadtest.yml +++ b/deploy/container/docker-compose.loadtest.yml @@ -53,6 +53,7 @@ services: - "host.docker.internal:host-gateway" volumes: - api-auth-store:/var/lib/genarrative/auth + - api-tracking-outbox:/var/lib/genarrative/tracking-outbox ulimits: nofile: soft: 4096 @@ -138,4 +139,5 @@ services: volumes: spacetime-data: api-auth-store: + api-tracking-outbox: nginx-logs: diff --git a/deploy/env/api-server.env.example b/deploy/env/api-server.env.example index 373f142d..bd265993 100644 --- a/deploy/env/api-server.env.example +++ b/deploy/env/api-server.env.example @@ -11,6 +11,11 @@ GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=512 GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS=64 GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS=32 GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS=16 +GENARRATIVE_TRACKING_OUTBOX_ENABLED=true +GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox +GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE=500 +GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS=1000 +GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES=268435456 GENARRATIVE_OTEL_ENABLED=false OTEL_SERVICE_NAME=genarrative-api OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 diff --git a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md index 2ac833a4..decb4f96 100644 --- a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md +++ b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md @@ -596,11 +596,13 @@ npm run check:server-rs-ddd - Rust 结构体:`TrackingDailyStat` - 源码:`server-rs/crates/spacetime-module/src/runtime/profile.rs` +- 写入:由单条或批量 tracking procedure 在同一事务中随 `tracking_event` 更新,作为运营查询和个人任务进度的聚合投影。 ### `tracking_event` - Rust 结构体:`TrackingEvent` - 源码:`server-rs/crates/spacetime-module/src/runtime/profile.rs` +- 写入:关键业务埋点同步调用单条 procedure;普通 HTTP route tracking 由 `api-server` 本机 outbox 批量调用 `record_tracking_events_and_return`。`event_id` 必须稳定且全局唯一,批量重试时用唯一索引做幂等跳过。 ### `treasure_record` diff --git a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md index 6ef7fe0b..0e1e3ad1 100644 --- a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md +++ b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md @@ -233,7 +233,7 @@ cargo test -p platform-auth --manifest-path server-rs/Cargo.toml aliyun_send_sms ## 埋点与运营查询 -用户行为埋点原始事实写入 `tracking_event`,聚合投影写入 `tracking_daily_stat`。任务配置、进度、领奖、钱包流水分别写入: +用户行为埋点原始事实写入 `tracking_event`,聚合投影写入 `tracking_daily_stat`。高频 HTTP route tracking 不直接阻塞请求链路:`api-server` 将普通 route tracking 先写入本机 tracking outbox,再由后台 worker 按数量或时间阈值批量写入 SpacetimeDB;`daily_login`、作品游玩 `work_play_start`、付费、任务领奖和钱包相关关键事件继续同步直写数据库,避免用户任务进度、游玩统计或支付状态出现可感知延迟。任务配置、进度、领奖、钱包流水分别写入: - `profile_task_config` - `profile_task_progress` @@ -242,6 +242,18 @@ cargo test -p platform-auth --manifest-path server-rs/Cargo.toml aliyun_send_sms 个人任务首版 scope 仅支持 `user`。后台、RPG、大鱼吃小鱼、Visual Novel、Story、Combat 等特定链路按 tracking 中间件排除规则处理;作品游玩统一使用 `work_play_start`。 +tracking outbox 默认配置: + +```env +GENARRATIVE_TRACKING_OUTBOX_ENABLED=true +GENARRATIVE_TRACKING_OUTBOX_DIR=/var/lib/genarrative/tracking-outbox +GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE=500 +GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS=1000 +GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES=268435456 +``` + +outbox 采用 NDJSON 文件保存原始事件。达到 `BATCH_SIZE` 或 `FLUSH_INTERVAL_MS` 任一阈值后,当前 active 文件会被原子切换为 sealed 文件并进入批量 flush;SpacetimeDB 批量 procedure 返回成功后删除 sealed 文件,失败则保留文件并重试。`MAX_BYTES` 是磁盘保护阈值,不是 flush 阈值;超过后低价值 route tracking 可以被丢弃并记录日志 / 指标,关键同步事件不进入该丢弃路径。sealed 文件若出现无法解析的坏行,会重命名为 `corrupt-*` 隔离并记录 `genarrative.tracking_outbox.files.corrupt` 指标,避免一个坏文件阻塞后续批量入库。该机制提供至少一次投递语义,依赖 `tracking_event.event_id` 幂等跳过重复事件。 + 常用检查思路: ```sql diff --git a/scripts/jenkins-server-provision.sh b/scripts/jenkins-server-provision.sh index bbabf2bd..aeb0db57 100755 --- a/scripts/jenkins-server-provision.sh +++ b/scripts/jenkins-server-provision.sh @@ -569,7 +569,7 @@ echo "[server-provision] target=${DEPLOY_TARGET}, dry_run=${DRY_RUN}, nginx_conf run_cmd id install_build_dependencies install_sccache -run_cmd mkdir -p "${SPACETIME_ROOT}" "${RELEASE_ROOT}" "$(dirname "${CURRENT_LINK}")" "$(dirname "${WEB_LINK}")" /etc/genarrative /var/lib/genarrative/maintenance /var/lib/genarrative/auth +run_cmd mkdir -p "${SPACETIME_ROOT}" "${RELEASE_ROOT}" "$(dirname "${CURRENT_LINK}")" "$(dirname "${WEB_LINK}")" /etc/genarrative /var/lib/genarrative/maintenance /var/lib/genarrative/auth /var/lib/genarrative/tracking-outbox if ! id spacetimedb >/dev/null 2>&1; then run_cmd useradd --system --home-dir "${SPACETIME_ROOT}" --shell /usr/sbin/nologin spacetimedb diff --git a/server-rs/crates/api-server/Cargo.toml b/server-rs/crates/api-server/Cargo.toml index ce4ef1e6..b423be50 100644 --- a/server-rs/crates/api-server/Cargo.toml +++ b/server-rs/crates/api-server/Cargo.toml @@ -46,7 +46,7 @@ shared-kernel = { workspace = true } shared-logging = { workspace = true } socket2 = { workspace = true } spacetime-client = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync", "fs", "io-util"] } tokio-stream = { workspace = true } futures-util = { workspace = true } time = { workspace = true, features = ["formatting"] } diff --git a/server-rs/crates/api-server/src/config.rs b/server-rs/crates/api-server/src/config.rs index e2e965f6..041df9b5 100644 --- a/server-rs/crates/api-server/src/config.rs +++ b/server-rs/crates/api-server/src/config.rs @@ -26,6 +26,11 @@ pub struct AppConfig { pub gallery_max_concurrent_requests: Option, pub detail_max_concurrent_requests: Option, pub admin_max_concurrent_requests: Option, + pub tracking_outbox_enabled: bool, + pub tracking_outbox_dir: PathBuf, + pub tracking_outbox_batch_size: usize, + pub tracking_outbox_flush_interval: Duration, + pub tracking_outbox_max_bytes: u64, pub log_filter: String, pub otel_enabled: bool, pub admin_username: Option, @@ -160,6 +165,11 @@ impl Default for AppConfig { gallery_max_concurrent_requests: None, detail_max_concurrent_requests: None, admin_max_concurrent_requests: None, + tracking_outbox_enabled: true, + tracking_outbox_dir: PathBuf::from("server-rs/.data/tracking-outbox"), + tracking_outbox_batch_size: 500, + tracking_outbox_flush_interval: Duration::from_millis(1_000), + tracking_outbox_max_bytes: 256 * 1024 * 1024, log_filter: "info,tower_http=info".to_string(), otel_enabled: false, admin_username: None, @@ -343,6 +353,26 @@ impl AppConfig { { config.admin_max_concurrent_requests = Some(max_concurrent_requests); } + if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_TRACKING_OUTBOX_ENABLED"]) { + config.tracking_outbox_enabled = enabled; + } + if let Some(dir) = read_first_non_empty_env(&["GENARRATIVE_TRACKING_OUTBOX_DIR"]) { + config.tracking_outbox_dir = PathBuf::from(dir); + } + if let Some(batch_size) = read_first_usize_env(&["GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE"]) + { + config.tracking_outbox_batch_size = batch_size; + } + if let Some(flush_interval_ms) = + read_first_positive_u64_env(&["GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS"]) + { + config.tracking_outbox_flush_interval = Duration::from_millis(flush_interval_ms); + } + if let Some(max_bytes) = + read_first_positive_u64_env(&["GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES"]) + { + config.tracking_outbox_max_bytes = max_bytes; + } if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) { config.otel_enabled = otel_enabled; } @@ -1230,6 +1260,11 @@ mod tests { std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS"); std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS"); std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_DIR"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES"); std::env::remove_var("GENARRATIVE_OTEL_ENABLED"); std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048"); std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6"); @@ -1237,6 +1272,14 @@ mod tests { std::env::set_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS", "64"); std::env::set_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS", "32"); std::env::set_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS", "16"); + std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED", "false"); + std::env::set_var( + "GENARRATIVE_TRACKING_OUTBOX_DIR", + "/tmp/genarrative-tracking-outbox", + ); + std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE", "250"); + std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS", "2000"); + std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES", "1048576"); std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true"); } @@ -1247,6 +1290,17 @@ mod tests { assert_eq!(config.gallery_max_concurrent_requests, Some(64)); assert_eq!(config.detail_max_concurrent_requests, Some(32)); assert_eq!(config.admin_max_concurrent_requests, Some(16)); + assert!(!config.tracking_outbox_enabled); + assert_eq!( + config.tracking_outbox_dir, + std::path::PathBuf::from("/tmp/genarrative-tracking-outbox") + ); + assert_eq!(config.tracking_outbox_batch_size, 250); + assert_eq!( + config.tracking_outbox_flush_interval, + std::time::Duration::from_millis(2_000) + ); + assert_eq!(config.tracking_outbox_max_bytes, 1_048_576); assert!(config.otel_enabled); unsafe { @@ -1256,6 +1310,11 @@ mod tests { std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS"); std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS"); std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_DIR"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS"); + std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES"); std::env::remove_var("GENARRATIVE_OTEL_ENABLED"); } } diff --git a/server-rs/crates/api-server/src/main.rs b/server-rs/crates/api-server/src/main.rs index 665f3526..01ed6555 100644 --- a/server-rs/crates/api-server/src/main.rs +++ b/server-rs/crates/api-server/src/main.rs @@ -55,8 +55,8 @@ mod password_entry; mod password_management; mod phone_auth; mod platform_errors; -mod profile_identity; mod process_metrics; +mod profile_identity; mod prompt; mod puzzle; mod puzzle_agent_turn; @@ -80,6 +80,7 @@ mod story_battles; mod story_sessions; mod telemetry; mod tracking; +mod tracking_outbox; mod vector_engine_audio_generation; mod visual_novel; mod volcengine_speech; @@ -154,6 +155,9 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> { .await .map_err(|error| std::io::Error::other(format!("初始化应用状态失败:{error}")))?; state.puzzle_gallery_cache().spawn_cleanup_task(); + if let Some(outbox) = state.tracking_outbox() { + outbox.spawn_worker(); + } let router = build_router(state); info!( diff --git a/server-rs/crates/api-server/src/state.rs b/server-rs/crates/api-server/src/state.rs index 5ae4244f..9249e4e5 100644 --- a/server-rs/crates/api-server/src/state.rs +++ b/server-rs/crates/api-server/src/state.rs @@ -33,6 +33,7 @@ use tracing::{info, warn}; use crate::config::AppConfig; use crate::puzzle_gallery_cache::PuzzleGalleryCache; +use crate::tracking_outbox::TrackingOutbox; use crate::wechat_pay::{WechatPayClient, map_wechat_pay_init_error}; use crate::wechat_provider::build_wechat_provider; @@ -167,6 +168,7 @@ pub struct AppStateInner { ai_task_service: AiTaskService, spacetime_client: SpacetimeClient, puzzle_gallery_cache: PuzzleGalleryCache, + tracking_outbox: Option>, llm_client: Option, creative_agent_gpt5_client: Option, creative_agent_executor: Arc, @@ -297,6 +299,7 @@ impl AppState { pool_size: config.spacetime_pool_size, procedure_timeout: config.spacetime_procedure_timeout, }); + let tracking_outbox = TrackingOutbox::from_config(&config, spacetime_client.clone()); let llm_client = build_llm_client(&config)?; let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?; let http_request_permit_pools = HttpRequestPermitPools::from_config(&config); @@ -324,6 +327,7 @@ impl AppState { ai_task_service, spacetime_client, puzzle_gallery_cache: PuzzleGalleryCache::new(), + tracking_outbox, llm_client, creative_agent_gpt5_client, creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor), @@ -582,6 +586,10 @@ impl AppState { &self.puzzle_gallery_cache } + pub fn tracking_outbox(&self) -> Option> { + self.tracking_outbox.clone() + } + pub fn llm_client(&self) -> Option<&LlmClient> { self.llm_client.as_ref() } diff --git a/server-rs/crates/api-server/src/telemetry.rs b/server-rs/crates/api-server/src/telemetry.rs index 9c40fdd7..8c217634 100644 --- a/server-rs/crates/api-server/src/telemetry.rs +++ b/server-rs/crates/api-server/src/telemetry.rs @@ -18,6 +18,8 @@ use crate::{ }; static HTTP_RESPONSE_BODY_IN_FLIGHT: AtomicI64 = AtomicI64::new(0); +static TRACKING_OUTBOX_PENDING_BYTES: AtomicI64 = AtomicI64::new(0); +static TRACKING_OUTBOX_PENDING_FILES: AtomicI64 = AtomicI64::new(0); static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock = OnceLock::new(); @@ -123,6 +125,53 @@ pub(crate) fn record_puzzle_gallery_cache_rebuild( .record(data_bytes.min(u64::MAX as usize) as u64, &[]); } +pub(crate) fn record_tracking_outbox_enqueued() { + tracking_outbox_metrics().enqueued.add(1, &[]); +} + +pub(crate) fn record_tracking_outbox_dropped(reason: &'static str) { + tracking_outbox_metrics() + .dropped + .add(1, &[KeyValue::new("reason", reason)]); +} + +pub(crate) fn record_tracking_outbox_sealed(reason: &'static str) { + tracking_outbox_metrics() + .sealed_files + .add(1, &[KeyValue::new("reason", reason)]); +} + +pub(crate) fn record_tracking_outbox_corrupt_file() { + tracking_outbox_metrics().corrupt_files.add(1, &[]); +} + +pub(crate) fn record_tracking_outbox_flush( + duration: std::time::Duration, + accepted_count: u32, + file_bytes: u64, + failed: bool, +) { + let status_class = if failed { "error" } else { "ok" }; + let labels = [KeyValue::new("status_class", status_class)]; + let metrics = tracking_outbox_metrics(); + metrics.flushes.add(1, &labels); + metrics + .flush_duration + .record(duration.as_secs_f64(), &labels); + metrics + .flushed_events + .add(u64::from(accepted_count), &labels); + metrics.flushed_bytes.add(file_bytes, &labels); +} + +pub(crate) fn update_tracking_outbox_pending_bytes(bytes: u64) { + TRACKING_OUTBOX_PENDING_BYTES.store(bytes.min(i64::MAX as u64) as i64, Ordering::Relaxed); +} + +pub(crate) fn update_tracking_outbox_pending_files(files: usize) { + TRACKING_OUTBOX_PENDING_FILES.store(files.min(i64::MAX as usize) as i64, Ordering::Relaxed); +} + fn track_response_body_in_flight(response: Response) -> Response { response.map(|body| { HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_add(1, Ordering::Relaxed); @@ -151,6 +200,17 @@ struct PuzzleGalleryCacheMetrics { data_json_bytes: opentelemetry::metrics::Histogram, } +struct TrackingOutboxMetrics { + enqueued: Counter, + dropped: Counter, + sealed_files: Counter, + corrupt_files: Counter, + flushes: Counter, + flush_duration: opentelemetry::metrics::Histogram, + flushed_events: Counter, + flushed_bytes: Counter, +} + struct HttpRequestPermitsAvailableGauges { default: Arc, gallery: Arc, @@ -254,6 +314,51 @@ fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics { }) } +fn tracking_outbox_metrics() -> &'static TrackingOutboxMetrics { + static METRICS: std::sync::OnceLock = std::sync::OnceLock::new(); + METRICS.get_or_init(|| { + let meter = global::meter("genarrative-api"); + TrackingOutboxMetrics { + enqueued: meter + .u64_counter("genarrative.tracking_outbox.events.enqueued") + .with_description("Tracking events appended to the local outbox") + .build(), + dropped: meter + .u64_counter("genarrative.tracking_outbox.events.dropped") + .with_description("Tracking events dropped by local outbox protection") + .build(), + sealed_files: meter + .u64_counter("genarrative.tracking_outbox.files.sealed") + .with_description("Tracking outbox active files sealed for flushing") + .build(), + corrupt_files: meter + .u64_counter("genarrative.tracking_outbox.files.corrupt") + .with_description( + "Tracking outbox sealed files quarantined because they could not be parsed", + ) + .build(), + flushes: meter + .u64_counter("genarrative.tracking_outbox.flushes") + .with_description("Tracking outbox sealed file flush attempts") + .build(), + flush_duration: meter + .f64_histogram("genarrative.tracking_outbox.flush.duration") + .with_unit("s") + .with_description("Tracking outbox sealed file flush duration") + .build(), + flushed_events: meter + .u64_counter("genarrative.tracking_outbox.events.flushed") + .with_description("Tracking events accepted by SpacetimeDB batch procedure") + .build(), + flushed_bytes: meter + .u64_counter("genarrative.tracking_outbox.bytes.flushed") + .with_unit("By") + .with_description("Tracking outbox bytes removed after successful flush") + .build(), + } + }) +} + fn register_http_request_permits_available_metric() -> HttpRequestPermitsAvailableGauges { let gauges = HttpRequestPermitsAvailableGauges::new(); let meter = global::meter("genarrative-api"); @@ -311,6 +416,22 @@ pub(crate) fn register_http_runtime_metrics() { observer.observe(HTTP_RESPONSE_BODY_IN_FLIGHT.load(Ordering::Relaxed), &[]); }) .build(); + meter + .i64_observable_up_down_counter("genarrative.tracking_outbox.pending.bytes") + .with_unit("By") + .with_description("Tracking outbox bytes waiting on local disk") + .with_callback(|observer| { + observer.observe(TRACKING_OUTBOX_PENDING_BYTES.load(Ordering::Relaxed), &[]); + }) + .build(); + meter + .i64_observable_up_down_counter("genarrative.tracking_outbox.pending.files") + .with_unit("{file}") + .with_description("Tracking outbox sealed files waiting for flush") + .with_callback(|observer| { + observer.observe(TRACKING_OUTBOX_PENDING_FILES.load(Ordering::Relaxed), &[]); + }) + .build(); }); } diff --git a/server-rs/crates/api-server/src/tracking.rs b/server-rs/crates/api-server/src/tracking.rs index 0f3aad21..ad3b187c 100644 --- a/server-rs/crates/api-server/src/tracking.rs +++ b/server-rs/crates/api-server/src/tracking.rs @@ -85,7 +85,7 @@ pub async fn record_route_tracking_event_after_success( draft.owner_user_id = draft.user_id.clone(); } - record_tracking_event_after_success(state, request_context, draft).await; + record_route_tracking_event_via_outbox_after_success(state, request_context, draft).await; } fn resolve_route_tracking_spec(method: &Method, path: &str) -> Option { @@ -524,26 +524,101 @@ pub async fn record_tracking_event_after_success( request_context: &RequestContext, draft: TrackingEventDraft, ) { - let occurred_at_micros = OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000; - let event_id = build_tracking_event_id(&draft, occurred_at_micros); - let event_key = draft.event_key.to_string(); - let scope_kind = draft.scope_kind; - let scope_id = draft.scope_id; - let metadata_json = draft.metadata.to_string(); + record_tracking_event_input_after_success( + state, + request_context, + build_tracking_event_input(draft), + ) + .await; +} + +async fn record_route_tracking_event_via_outbox_after_success( + state: &AppState, + request_context: &RequestContext, + draft: TrackingEventDraft, +) { + let event = build_tracking_event_input(draft); + let event_key = event.event_key.clone(); + let scope_kind = event.scope_kind; + let scope_id = event.scope_id.clone(); + + if let Some(outbox) = state.tracking_outbox() { + match outbox.enqueue(event.clone()).await { + Ok(crate::tracking_outbox::TrackingOutboxEnqueueOutcome::Enqueued) => { + tracing::debug!( + request_id = request_context.request_id(), + operation = request_context.operation(), + event_key = %event_key, + scope_kind = %scope_kind.as_str(), + scope_id = %scope_id, + "后端 route 埋点已写入本机 outbox" + ); + return; + } + Ok(crate::tracking_outbox::TrackingOutboxEnqueueOutcome::Dropped { reason }) => { + tracing::warn!( + request_id = request_context.request_id(), + operation = request_context.operation(), + event_key = %event_key, + scope_kind = %scope_kind.as_str(), + scope_id = %scope_id, + reason, + "后端 route 埋点因 outbox 保护阈值被丢弃,主业务流程继续" + ); + return; + } + Err(error) => { + tracing::warn!( + request_id = request_context.request_id(), + operation = request_context.operation(), + event_key = %event_key, + scope_kind = %scope_kind.as_str(), + scope_id = %scope_id, + error = %error, + "后端 route 埋点写入 outbox 失败,回退同步直写 SpacetimeDB" + ); + } + } + } + + record_tracking_event_input_after_success(state, request_context, event).await; +} + +async fn record_tracking_event_input_after_success( + state: &AppState, + request_context: &RequestContext, + event: module_runtime::RuntimeTrackingEventInput, +) { + let event_key = event.event_key.clone(); + let log_scope_kind = event.scope_kind; + let scope_id = event.scope_id.clone(); + + let module_runtime::RuntimeTrackingEventInput { + event_id, + event_key: procedure_event_key, + scope_kind: procedure_scope_kind, + scope_id: procedure_scope_id, + user_id, + owner_user_id, + profile_id, + module_key, + metadata_json, + occurred_at_micros, + } = event; match state .spacetime_client() .record_tracking_event( event_id, - event_key.clone(), - scope_kind, - scope_id.clone(), - draft.user_id, - draft.owner_user_id, - draft.profile_id, - draft.module_key.map(str::to_string), + procedure_event_key, + procedure_scope_kind, + procedure_scope_id, + user_id, + owner_user_id, + profile_id, + module_key, metadata_json, - occurred_at_micros as i64, + occurred_at_micros, ) .await { @@ -551,7 +626,7 @@ pub async fn record_tracking_event_after_success( request_id = request_context.request_id(), operation = request_context.operation(), event_key = %event_key, - scope_kind = %scope_kind.as_str(), + scope_kind = %log_scope_kind.as_str(), scope_id = %scope_id, "后端埋点已记录" ), @@ -559,7 +634,7 @@ pub async fn record_tracking_event_after_success( request_id = request_context.request_id(), operation = request_context.operation(), event_key = %event_key, - scope_kind = %scope_kind.as_str(), + scope_kind = %log_scope_kind.as_str(), scope_id = %scope_id, error = %error, "后端埋点记录失败,主业务流程继续" @@ -567,6 +642,26 @@ pub async fn record_tracking_event_after_success( } } +fn build_tracking_event_input( + draft: TrackingEventDraft, +) -> module_runtime::RuntimeTrackingEventInput { + let occurred_at_micros = OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000; + let event_id = build_tracking_event_id(&draft, occurred_at_micros); + + module_runtime::RuntimeTrackingEventInput { + event_id, + event_key: draft.event_key.to_string(), + scope_kind: draft.scope_kind, + scope_id: draft.scope_id, + user_id: draft.user_id, + owner_user_id: draft.owner_user_id, + profile_id: draft.profile_id, + module_key: draft.module_key.map(str::to_string), + metadata_json: draft.metadata.to_string(), + occurred_at_micros: occurred_at_micros as i64, + } +} + fn build_tracking_event_id(draft: &TrackingEventDraft, occurred_at_micros: i128) -> String { if draft.event_key == "daily_login" && draft.scope_kind == RuntimeTrackingScopeKind::User diff --git a/server-rs/crates/api-server/src/tracking_outbox.rs b/server-rs/crates/api-server/src/tracking_outbox.rs new file mode 100644 index 00000000..19a61ed6 --- /dev/null +++ b/server-rs/crates/api-server/src/tracking_outbox.rs @@ -0,0 +1,594 @@ +use std::{ + fmt, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use module_runtime::RuntimeTrackingEventInput; +use serde::{Deserialize, Serialize}; +use spacetime_client::{SpacetimeClient, SpacetimeClientError}; +use tokio::{ + fs::{self, File, OpenOptions}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + sync::Mutex, + time::sleep, +}; +use tracing::{debug, warn}; + +use crate::config::AppConfig; + +const ACTIVE_FILE_NAME: &str = "active.ndjson"; +const SEALED_FILE_PREFIX: &str = "sealed-"; +const CORRUPT_FILE_PREFIX: &str = "corrupt-"; +const SEALED_FILE_EXTENSION: &str = ".ndjson"; + +#[derive(Clone)] +pub struct TrackingOutbox { + dir: PathBuf, + batch_size: usize, + flush_interval: Duration, + max_bytes: u64, + spacetime_client: SpacetimeClient, + inner: Arc>, +} + +struct TrackingOutboxInner { + initialized: bool, + active_file: Option, + active_count: usize, + active_bytes: u64, + total_bytes: u64, + last_sealed_at: Instant, +} + +#[derive(Debug)] +pub enum TrackingOutboxEnqueueOutcome { + Enqueued, + Dropped { reason: &'static str }, +} + +#[derive(Debug)] +pub enum TrackingOutboxError { + Io(std::io::Error), + Json(serde_json::Error), + Spacetime(SpacetimeClientError), +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct TrackingOutboxRecord { + event: RuntimeTrackingEventInput, +} + +impl TrackingOutbox { + pub fn from_config(config: &AppConfig, spacetime_client: SpacetimeClient) -> Option> { + if !config.tracking_outbox_enabled { + return None; + } + + let total_bytes = directory_size_if_exists(&config.tracking_outbox_dir).unwrap_or(0); + let outbox = Self { + dir: config.tracking_outbox_dir.clone(), + batch_size: config.tracking_outbox_batch_size.max(1), + flush_interval: config.tracking_outbox_flush_interval, + max_bytes: config.tracking_outbox_max_bytes, + spacetime_client, + inner: Arc::new(Mutex::new(TrackingOutboxInner { + initialized: false, + active_file: None, + active_count: 0, + active_bytes: 0, + total_bytes, + last_sealed_at: Instant::now(), + })), + }; + crate::telemetry::update_tracking_outbox_pending_bytes(total_bytes); + Some(Arc::new(outbox)) + } + + pub async fn enqueue( + &self, + event: RuntimeTrackingEventInput, + ) -> Result { + let record = TrackingOutboxRecord { event }; + let mut line = serde_json::to_vec(&record)?; + line.push(b'\n'); + let line_bytes = line.len().min(u64::MAX as usize) as u64; + + let mut inner = self.inner.lock().await; + self.ensure_initialized_locked(&mut inner).await?; + + if inner.total_bytes.saturating_add(line_bytes) > self.max_bytes { + crate::telemetry::record_tracking_outbox_dropped("max_bytes"); + return Ok(TrackingOutboxEnqueueOutcome::Dropped { + reason: "max_bytes", + }); + } + + let active_path = self.active_path(); + if inner.active_file.is_none() { + inner.active_file = Some( + OpenOptions::new() + .create(true) + .append(true) + .open(&active_path) + .await?, + ); + } + + let file = inner + .active_file + .as_mut() + .expect("active file should be open before append"); + file.write_all(&line).await?; + inner.active_count = inner.active_count.saturating_add(1); + inner.active_bytes = inner.active_bytes.saturating_add(line_bytes); + inner.total_bytes = inner.total_bytes.saturating_add(line_bytes); + crate::telemetry::record_tracking_outbox_enqueued(); + crate::telemetry::update_tracking_outbox_pending_bytes(inner.total_bytes); + + if inner.active_count >= self.batch_size { + self.seal_active_locked(&mut inner, "batch_size").await?; + } + + Ok(TrackingOutboxEnqueueOutcome::Enqueued) + } + + pub fn spawn_worker(self: Arc) { + tokio::spawn(async move { + loop { + sleep(self.flush_interval).await; + if let Err(error) = self.seal_active_if_due().await { + warn!(error = %error, "tracking outbox 定时封存 active 文件失败"); + } + if let Err(error) = self.flush_sealed_files_once().await { + warn!(error = %error, "tracking outbox 批量写入 SpacetimeDB 失败,将保留 sealed 文件等待重试"); + } + } + }); + } + + async fn seal_active_if_due(&self) -> Result<(), TrackingOutboxError> { + let mut inner = self.inner.lock().await; + self.ensure_initialized_locked(&mut inner).await?; + if inner.active_count == 0 || inner.last_sealed_at.elapsed() < self.flush_interval { + return Ok(()); + } + + self.seal_active_locked(&mut inner, "flush_interval").await + } + + async fn flush_sealed_files_once(&self) -> Result<(), TrackingOutboxError> { + self.ensure_initialized().await?; + + let sealed_files = self.list_sealed_files().await?; + crate::telemetry::update_tracking_outbox_pending_files(sealed_files.len()); + for path in sealed_files { + let started_at = Instant::now(); + let metadata = fs::metadata(&path).await?; + let file_bytes = metadata.len(); + let events = match read_outbox_events(&path).await { + Ok(events) => events, + Err(error) if error.is_data_corruption() => { + let corrupt_path = self.corrupt_path_for(&path); + fs::rename(&path, &corrupt_path).await?; + self.subtract_total_bytes(file_bytes).await; + crate::telemetry::record_tracking_outbox_corrupt_file(); + warn!( + error = %error, + source = %path.display(), + target = %corrupt_path.display(), + "tracking outbox sealed 文件含无法解析的记录,已隔离并继续处理后续文件" + ); + continue; + } + Err(error) => return Err(error), + }; + if events.is_empty() { + fs::remove_file(&path).await?; + self.subtract_total_bytes(file_bytes).await; + continue; + } + + match self.spacetime_client.record_tracking_events(events).await { + Ok(accepted_count) => { + fs::remove_file(&path).await?; + self.subtract_total_bytes(file_bytes).await; + crate::telemetry::record_tracking_outbox_flush( + started_at.elapsed(), + accepted_count, + file_bytes, + false, + ); + debug!( + accepted_count, + file_bytes, + path = %path.display(), + "tracking outbox sealed 文件已批量入库并删除" + ); + } + Err(error) => { + crate::telemetry::record_tracking_outbox_flush( + started_at.elapsed(), + 0, + file_bytes, + true, + ); + return Err(TrackingOutboxError::Spacetime(error)); + } + } + } + + Ok(()) + } + + async fn ensure_initialized(&self) -> Result<(), TrackingOutboxError> { + let mut inner = self.inner.lock().await; + self.ensure_initialized_locked(&mut inner).await + } + + async fn ensure_initialized_locked( + &self, + inner: &mut TrackingOutboxInner, + ) -> Result<(), TrackingOutboxError> { + if inner.initialized { + return Ok(()); + } + + fs::create_dir_all(&self.dir).await?; + self.seal_existing_active_file().await?; + inner.total_bytes = directory_size(&self.dir).await?; + inner.initialized = true; + inner.last_sealed_at = Instant::now(); + crate::telemetry::update_tracking_outbox_pending_bytes(inner.total_bytes); + Ok(()) + } + + async fn seal_active_locked( + &self, + inner: &mut TrackingOutboxInner, + reason: &'static str, + ) -> Result<(), TrackingOutboxError> { + if inner.active_count == 0 && inner.active_bytes == 0 { + return Ok(()); + } + + if let Some(mut file) = inner.active_file.take() { + file.flush().await?; + file.sync_data().await?; + drop(file); + } + + let active_path = self.active_path(); + match fs::metadata(&active_path).await { + Ok(metadata) if metadata.len() > 0 => { + let sealed_path = self.next_sealed_path(); + fs::rename(&active_path, &sealed_path).await?; + crate::telemetry::record_tracking_outbox_sealed(reason); + debug!( + reason, + event_count = inner.active_count, + file_bytes = metadata.len(), + path = %sealed_path.display(), + "tracking outbox active 文件已封存" + ); + } + Ok(_) => { + let _ = fs::remove_file(&active_path).await; + } + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => return Err(error.into()), + } + + inner.active_count = 0; + inner.active_bytes = 0; + inner.last_sealed_at = Instant::now(); + Ok(()) + } + + async fn seal_existing_active_file(&self) -> Result<(), TrackingOutboxError> { + let active_path = self.active_path(); + match fs::metadata(&active_path).await { + Ok(metadata) if metadata.len() > 0 => { + fs::rename(&active_path, self.next_sealed_path()).await?; + crate::telemetry::record_tracking_outbox_sealed("startup"); + } + Ok(_) => { + let _ = fs::remove_file(&active_path).await; + } + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => return Err(error.into()), + } + Ok(()) + } + + async fn list_sealed_files(&self) -> Result, TrackingOutboxError> { + let mut entries = fs::read_dir(&self.dir).await?; + let mut files = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|value| value.to_str()) else { + continue; + }; + if name.starts_with(SEALED_FILE_PREFIX) && name.ends_with(SEALED_FILE_EXTENSION) { + files.push(path); + } + } + files.sort(); + Ok(files) + } + + async fn subtract_total_bytes(&self, bytes: u64) { + let mut inner = self.inner.lock().await; + inner.total_bytes = inner.total_bytes.saturating_sub(bytes); + crate::telemetry::update_tracking_outbox_pending_bytes(inner.total_bytes); + } + + fn active_path(&self) -> PathBuf { + self.dir.join(ACTIVE_FILE_NAME) + } + + fn next_sealed_path(&self) -> PathBuf { + self.dir.join(format!( + "{SEALED_FILE_PREFIX}{}-{uuid}{SEALED_FILE_EXTENSION}", + current_unix_micros(), + uuid = uuid::Uuid::new_v4() + )) + } + + fn corrupt_path_for(&self, path: &Path) -> PathBuf { + let name = path + .file_name() + .and_then(|value| value.to_str()) + .unwrap_or("unknown.ndjson"); + self.dir.join(format!( + "{CORRUPT_FILE_PREFIX}{}-{uuid}-{name}", + current_unix_micros(), + uuid = uuid::Uuid::new_v4() + )) + } +} + +impl fmt::Debug for TrackingOutbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrackingOutbox") + .field("dir", &self.dir) + .field("batch_size", &self.batch_size) + .field("flush_interval", &self.flush_interval) + .field("max_bytes", &self.max_bytes) + .finish() + } +} + +impl fmt::Display for TrackingOutboxError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(error) => write!(f, "{error}"), + Self::Json(error) => write!(f, "{error}"), + Self::Spacetime(error) => write!(f, "{error}"), + } + } +} + +impl From for TrackingOutboxError { + fn from(value: std::io::Error) -> Self { + Self::Io(value) + } +} + +impl From for TrackingOutboxError { + fn from(value: serde_json::Error) -> Self { + Self::Json(value) + } +} + +impl TrackingOutboxError { + fn is_data_corruption(&self) -> bool { + matches!(self, Self::Json(_)) + } +} + +async fn read_outbox_events( + path: &Path, +) -> Result, TrackingOutboxError> { + let file = File::open(path).await?; + let mut lines = BufReader::new(file).lines(); + let mut events = Vec::new(); + while let Some(line) = lines.next_line().await? { + if line.trim().is_empty() { + continue; + } + let record = serde_json::from_str::(&line)?; + events.push(record.event); + } + Ok(events) +} + +async fn directory_size(path: &Path) -> Result { + let mut total = 0u64; + let mut entries = fs::read_dir(path).await?; + while let Some(entry) = entries.next_entry().await? { + if !is_pending_outbox_file_name(&entry.file_name()) { + continue; + } + let metadata = entry.metadata().await?; + if metadata.is_file() { + total = total.saturating_add(metadata.len()); + } + } + Ok(total) +} + +fn directory_size_if_exists(path: &Path) -> Result { + if !path.is_dir() { + return Ok(0); + } + + let mut total = 0u64; + for entry in std::fs::read_dir(path)? { + let entry = entry?; + if !is_pending_outbox_file_name(&entry.file_name()) { + continue; + } + let metadata = entry.metadata()?; + if metadata.is_file() { + total = total.saturating_add(metadata.len()); + } + } + Ok(total) +} + +fn current_unix_micros() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_micros() +} + +fn is_pending_outbox_file_name(name: &std::ffi::OsStr) -> bool { + name.to_str().is_some_and(|value| { + value == ACTIVE_FILE_NAME + || (value.starts_with(SEALED_FILE_PREFIX) && value.ends_with(SEALED_FILE_EXTENSION)) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_event(event_id: &str) -> RuntimeTrackingEventInput { + RuntimeTrackingEventInput { + event_id: event_id.to_string(), + event_key: "puzzle_route_success".to_string(), + scope_kind: module_runtime::RuntimeTrackingScopeKind::Site, + scope_id: "site".to_string(), + user_id: None, + owner_user_id: None, + profile_id: None, + module_key: Some("puzzle".to_string()), + metadata_json: "{}".to_string(), + occurred_at_micros: 1_713_680_000_000_000, + } + } + + fn test_dir(name: &str) -> PathBuf { + let dir = std::env::temp_dir().join(format!( + "genarrative-tracking-outbox-{name}-{}", + current_unix_micros() + )); + let _ = std::fs::remove_dir_all(&dir); + dir + } + + fn test_outbox(dir: PathBuf, batch_size: usize, max_bytes: u64) -> Arc { + let config = AppConfig { + tracking_outbox_dir: dir, + tracking_outbox_batch_size: batch_size, + tracking_outbox_max_bytes: max_bytes, + tracking_outbox_flush_interval: Duration::from_secs(60), + ..AppConfig::default() + }; + TrackingOutbox::from_config( + &config, + SpacetimeClient::new(spacetime_client::SpacetimeClientConfig { + server_url: "http://127.0.0.1:1".to_string(), + database: "missing".to_string(), + token: None, + pool_size: 1, + procedure_timeout: Duration::from_millis(10), + }), + ) + .expect("outbox should be enabled") + } + + #[tokio::test] + async fn enqueue_seals_active_file_when_batch_size_reached() { + let dir = test_dir("batch"); + let outbox = test_outbox(dir.clone(), 2, 1024 * 1024); + + outbox.enqueue(sample_event("event-1")).await.unwrap(); + outbox.enqueue(sample_event("event-2")).await.unwrap(); + + assert!(!dir.join(ACTIVE_FILE_NAME).exists()); + let sealed_count = std::fs::read_dir(&dir) + .unwrap() + .filter_map(Result::ok) + .filter(|entry| { + entry + .file_name() + .to_str() + .is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX)) + }) + .count(); + assert_eq!(sealed_count, 1); + + let _ = std::fs::remove_dir_all(dir); + } + + #[tokio::test] + async fn enqueue_drops_when_outbox_exceeds_max_bytes() { + let dir = test_dir("max-bytes"); + let outbox = test_outbox(dir.clone(), 500, 1); + + let outcome = outbox.enqueue(sample_event("event-1")).await.unwrap(); + + assert!(matches!( + outcome, + TrackingOutboxEnqueueOutcome::Dropped { + reason: "max_bytes" + } + )); + assert!(!dir.join(ACTIVE_FILE_NAME).exists()); + + let _ = std::fs::remove_dir_all(dir); + } + + #[tokio::test] + async fn flush_quarantines_corrupt_sealed_file() { + let dir = test_dir("corrupt"); + std::fs::create_dir_all(&dir).unwrap(); + let sealed_path = dir.join(format!("{SEALED_FILE_PREFIX}bad{SEALED_FILE_EXTENSION}")); + std::fs::write(&sealed_path, b"{not-json}\n").unwrap(); + let outbox = test_outbox(dir.clone(), 500, 1024 * 1024); + + outbox.flush_sealed_files_once().await.unwrap(); + + assert!(!sealed_path.exists()); + let corrupt_count = std::fs::read_dir(&dir) + .unwrap() + .filter_map(Result::ok) + .filter(|entry| { + entry + .file_name() + .to_str() + .is_some_and(|name| name.starts_with(CORRUPT_FILE_PREFIX)) + }) + .count(); + assert_eq!(corrupt_count, 1); + + let _ = std::fs::remove_dir_all(dir); + } + + #[test] + fn directory_size_excludes_quarantined_corrupt_files() { + let dir = test_dir("directory-size"); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write(dir.join(ACTIVE_FILE_NAME), b"active").unwrap(); + std::fs::write( + dir.join(format!("{SEALED_FILE_PREFIX}one{SEALED_FILE_EXTENSION}")), + b"sealed", + ) + .unwrap(); + std::fs::write( + dir.join(format!("{CORRUPT_FILE_PREFIX}one{SEALED_FILE_EXTENSION}")), + b"corrupt", + ) + .unwrap(); + + let total = directory_size_if_exists(&dir).unwrap(); + + assert_eq!(total, 12); + + let _ = std::fs::remove_dir_all(dir); + } +} diff --git a/server-rs/crates/module-runtime/src/domain.rs b/server-rs/crates/module-runtime/src/domain.rs index a10f0cc2..4d1da0bc 100644 --- a/server-rs/crates/module-runtime/src/domain.rs +++ b/server-rs/crates/module-runtime/src/domain.rs @@ -706,6 +706,14 @@ pub struct RuntimeTrackingEventProcedureResult { pub error_message: Option, } +#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct RuntimeTrackingEventBatchProcedureResult { + pub ok: bool, + pub accepted_count: u32, + pub error_message: Option, +} + #[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RuntimeProfileTaskConfigSnapshot { diff --git a/server-rs/crates/spacetime-client/src/mapper.rs b/server-rs/crates/spacetime-client/src/mapper.rs index 6ee3b1ca..79b17304 100644 --- a/server-rs/crates/spacetime-client/src/mapper.rs +++ b/server-rs/crates/spacetime-client/src/mapper.rs @@ -246,6 +246,23 @@ impl From for RuntimeProfileTa } } +impl From for RuntimeTrackingEventInput { + fn from(input: module_runtime::RuntimeTrackingEventInput) -> Self { + Self { + event_id: input.event_id, + event_key: input.event_key, + scope_kind: map_runtime_tracking_scope_kind(input.scope_kind), + scope_id: input.scope_id, + user_id: input.user_id, + owner_user_id: input.owner_user_id, + profile_id: input.profile_id, + module_key: input.module_key, + metadata_json: input.metadata_json, + occurred_at_micros: input.occurred_at_micros, + } + } +} + impl From for AnalyticsMetricQueryInput { fn from(input: module_runtime::AnalyticsMetricQueryInput) -> Self { Self { @@ -1211,6 +1228,16 @@ pub(crate) fn map_runtime_tracking_event_procedure_result( Ok(()) } +pub(crate) fn map_runtime_tracking_event_batch_procedure_result( + result: RuntimeTrackingEventBatchProcedureResult, +) -> Result { + if !result.ok { + return Err(SpacetimeClientError::procedure_failed(result.error_message)); + } + + Ok(result.accepted_count) +} + pub(crate) fn map_runtime_profile_task_center_procedure_result( result: RuntimeProfileTaskCenterProcedureResult, ) -> Result { diff --git a/server-rs/crates/spacetime-client/src/module_bindings/mod.rs b/server-rs/crates/spacetime-client/src/module_bindings/mod.rs index 984ccd36..6a53dc72 100644 --- a/server-rs/crates/spacetime-client/src/module_bindings/mod.rs +++ b/server-rs/crates/spacetime-client/src/module_bindings/mod.rs @@ -624,6 +624,7 @@ pub mod record_custom_world_profile_play_procedure; pub mod record_daily_login_tracking_event_and_return_procedure; pub mod record_puzzle_work_like_procedure; pub mod record_tracking_event_and_return_procedure; +pub mod record_tracking_events_and_return_procedure; pub mod record_visual_novel_runtime_event_procedure; pub mod redeem_profile_referral_invite_code_procedure; pub mod redeem_profile_reward_code_procedure; @@ -764,6 +765,7 @@ pub mod runtime_snapshot_row_type; pub mod runtime_snapshot_table; pub mod runtime_snapshot_type; pub mod runtime_snapshot_upsert_input_type; +pub mod runtime_tracking_event_batch_procedure_result_type; pub mod runtime_tracking_event_input_type; pub mod runtime_tracking_event_procedure_result_type; pub mod runtime_tracking_scope_kind_type; @@ -1548,6 +1550,7 @@ pub use record_custom_world_profile_play_procedure::record_custom_world_profile_ pub use record_daily_login_tracking_event_and_return_procedure::record_daily_login_tracking_event_and_return; pub use record_puzzle_work_like_procedure::record_puzzle_work_like; pub use record_tracking_event_and_return_procedure::record_tracking_event_and_return; +pub use record_tracking_events_and_return_procedure::record_tracking_events_and_return; pub use record_visual_novel_runtime_event_procedure::record_visual_novel_runtime_event; pub use redeem_profile_referral_invite_code_procedure::redeem_profile_referral_invite_code; pub use redeem_profile_reward_code_procedure::redeem_profile_reward_code; @@ -1688,6 +1691,7 @@ pub use runtime_snapshot_row_type::RuntimeSnapshotRow; pub use runtime_snapshot_table::*; pub use runtime_snapshot_type::RuntimeSnapshot; pub use runtime_snapshot_upsert_input_type::RuntimeSnapshotUpsertInput; +pub use runtime_tracking_event_batch_procedure_result_type::RuntimeTrackingEventBatchProcedureResult; pub use runtime_tracking_event_input_type::RuntimeTrackingEventInput; pub use runtime_tracking_event_procedure_result_type::RuntimeTrackingEventProcedureResult; pub use runtime_tracking_scope_kind_type::RuntimeTrackingScopeKind; diff --git a/server-rs/crates/spacetime-client/src/module_bindings/record_tracking_events_and_return_procedure.rs b/server-rs/crates/spacetime-client/src/module_bindings/record_tracking_events_and_return_procedure.rs new file mode 100644 index 00000000..428e378f --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/record_tracking_events_and_return_procedure.rs @@ -0,0 +1,59 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +use super::runtime_tracking_event_batch_procedure_result_type::RuntimeTrackingEventBatchProcedureResult; +use super::runtime_tracking_event_input_type::RuntimeTrackingEventInput; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +struct RecordTrackingEventsAndReturnArgs { + pub inputs: Vec, +} + +impl __sdk::InModule for RecordTrackingEventsAndReturnArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the procedure `record_tracking_events_and_return`. +/// +/// Implemented for [`super::RemoteProcedures`]. +pub trait record_tracking_events_and_return { + fn record_tracking_events_and_return(&self, inputs: Vec) { + self.record_tracking_events_and_return_then(inputs, |_, _| {}); + } + + fn record_tracking_events_and_return_then( + &self, + inputs: Vec, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ); +} + +impl record_tracking_events_and_return for super::RemoteProcedures { + fn record_tracking_events_and_return_then( + &self, + inputs: Vec, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ) { + self.imp + .invoke_procedure_with_callback::<_, RuntimeTrackingEventBatchProcedureResult>( + "record_tracking_events_and_return", + RecordTrackingEventsAndReturnArgs { inputs }, + __callback, + ); + } +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/runtime_tracking_event_batch_procedure_result_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/runtime_tracking_event_batch_procedure_result_type.rs new file mode 100644 index 00000000..1d4d72d2 --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/runtime_tracking_event_batch_procedure_result_type.rs @@ -0,0 +1,17 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct RuntimeTrackingEventBatchProcedureResult { + pub ok: bool, + pub accepted_count: u32, + pub error_message: Option, +} + +impl __sdk::InModule for RuntimeTrackingEventBatchProcedureResult { + type Module = super::RemoteModule; +} diff --git a/server-rs/crates/spacetime-client/src/runtime.rs b/server-rs/crates/spacetime-client/src/runtime.rs index baac3495..1b9429a7 100644 --- a/server-rs/crates/spacetime-client/src/runtime.rs +++ b/server-rs/crates/spacetime-client/src/runtime.rs @@ -585,6 +585,35 @@ impl SpacetimeClient { .await } + pub async fn record_tracking_events( + &self, + events: Vec, + ) -> Result { + if events.is_empty() { + return Ok(0); + } + + let procedure_inputs = events + .into_iter() + .map(crate::module_bindings::RuntimeTrackingEventInput::from) + .collect::>(); + + self.call_after_connect( + "record_tracking_events_and_return", + move |connection, sender| { + connection + .procedures() + .record_tracking_events_and_return_then(procedure_inputs, move |_, result| { + let mapped = result + .map_err(SpacetimeClientError::from_sdk_error) + .and_then(map_runtime_tracking_event_batch_procedure_result); + send_once(&sender, mapped); + }); + }, + ) + .await + } + pub async fn get_profile_task_center( &self, user_id: String, diff --git a/server-rs/crates/spacetime-module/src/runtime/profile.rs b/server-rs/crates/spacetime-module/src/runtime/profile.rs index 10f3c59e..d1bbb3c3 100644 --- a/server-rs/crates/spacetime-module/src/runtime/profile.rs +++ b/server-rs/crates/spacetime-module/src/runtime/profile.rs @@ -558,6 +558,33 @@ pub fn record_tracking_event_and_return( } } +// 高频 route tracking 由 api-server 本机 outbox 批量写入,减少公开列表热路径上的 procedure 调用次数。 +#[spacetimedb::procedure] +pub fn record_tracking_events_and_return( + ctx: &mut ProcedureContext, + inputs: Vec, +) -> RuntimeTrackingEventBatchProcedureResult { + match ctx.try_with_tx(|tx| { + let mut accepted_count = 0u32; + for input in &inputs { + record_tracking_event(tx, input.clone())?; + accepted_count = accepted_count.saturating_add(1); + } + Ok(accepted_count) + }) { + Ok(accepted_count) => RuntimeTrackingEventBatchProcedureResult { + ok: true, + accepted_count, + error_message: None, + }, + Err(message) => RuntimeTrackingEventBatchProcedureResult { + ok: false, + accepted_count: 0, + error_message: Some(message), + }, + } +} + // 登录成功埋点由认证链路主动调用;任务中心只负责读取和刷新任务进度。 #[spacetimedb::procedure] pub fn record_daily_login_tracking_event_and_return( @@ -1539,6 +1566,19 @@ mod tests { assert!(!should_skip_existing_tracking_event_id(false)); } + #[test] + fn tracking_batch_result_reports_accepted_count() { + let result = RuntimeTrackingEventBatchProcedureResult { + ok: true, + accepted_count: 2, + error_message: None, + }; + + assert!(result.ok); + assert_eq!(result.accepted_count, 2); + assert!(result.error_message.is_none()); + } + #[test] fn recent_public_work_play_counts_group_requested_profiles_in_window() { let now_micros = PUBLIC_WORK_PLAY_DAY_MICROS * 10;