diff --git a/.hermes/shared-memory/decision-log.md b/.hermes/shared-memory/decision-log.md index 2044455f..dacfd637 100644 --- a/.hermes/shared-memory/decision-log.md +++ b/.hermes/shared-memory/decision-log.md @@ -1728,3 +1728,11 @@ - 影响范围:`shared-contracts` 默认 spec、`module-runtime` 入口配置响应、`spacetime-module` 后台保存校验、后台入口开关页摘要和前端 fallback spec。 - 验证方式:`GET /api/creation-entry/config` 中各玩法 `unifiedCreationSpec.title` 等于已保存契约内容;后台只修改入口名称时不应隐式改写已保存的统一创作页表头。 - 关联文档:`docs/【玩法创作】平台入口与玩法链路-2026-05-15.md`。 + +## 2026-06-11 资产计费边界改为 fail-closed 并补偿退款 + +- 背景:图片 / 资产生成入口曾在钱包或 SpacetimeDB 预扣费连通性异常时允许继续生成,且失败后同步退款如果遇到 SpacetimeDB 短暂不可用缺少本地补偿;拼图首图后台任务还使用 api-server 进程内 HashSet 互斥,多实例下不能防重复。 +- 决策:暂不实现 token 限流。所有资产生成预扣费改为 fail-closed,预扣费失败直接返回错误;支持 retry 的计费 ledger id 统一包含 HTTP `request_id`,前端静默刷新重试复用同一个 `x-request-id`。生成失败后的退款先同步调用 SpacetimeDB,失败则写入 `wallet-refund-outbox` 本地文件并由后台 worker 重放。拼图首图后台生成互斥改为 SpacetimeDB `puzzle_background_compile_task` 表,使用 `task_id + request_id` 作为 claim id,释放时校验 claim id,避免旧任务误删新租约。 +- 影响范围:`api-server` 资产计费包裹、钱包退款补偿、拼图首图后台生成、`spacetime-module` 拼图 task 表、`spacetime-client` bindings/facade、前端 API request id 复用和后端架构文档。 +- 验证方式:`npm run spacetime:generate`、`npm run check:spacetime-schema`、`npm run check:spacetime-runtime-access`、`node scripts/check-server-rs-ddd-boundaries.mjs`、`cargo check -p api-server --manifest-path server-rs/Cargo.toml`、`cargo test -p api-server --manifest-path server-rs/Cargo.toml wallet_refund_outbox`、`cargo test -p api-server --manifest-path server-rs/Cargo.toml asset_operation`、`npm run test -- src/services/apiClient.test.ts`、`npm run check:encoding`。 +- 关联文档:`docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md`。 diff --git a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md index 82249ceb..b5d01d81 100644 --- a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md +++ b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md @@ -190,6 +190,10 @@ npm run check:server-rs-ddd 1. `creation_entry_type_config.unified_creation_spec_json` 内的 `mudPointCost` 是玩法新建草稿初始生成的泥点成本真相源,同时供入口卡展示和前端余额前置校验使用;旧契约缺失时允许按代码默认成本兜底。 2. `api-server` 执行拼图首图生成、抓大鹅完整草稿生成和汪汪声浪初始三图生成时,必须通过 `GET /api/creation-entry/config` 同源配置解析对应玩法成本后再调用钱包扣费 procedure,不得继续使用前端或后端硬编码常量作为实际扣费真相。 3. 结果页单图重生成、发布、道具使用和其它独立资产操作仍按各自业务操作成本执行;不要把初始草稿成本误套到这些单次操作上。 +4. 资产操作的预扣费必须 fail-closed:钱包或 SpacetimeDB 预扣费不可达、超时或返回业务错误时,`api-server` 直接返回错误,不允许继续调用图片、音频、GLB 等外部生成 provider。 +5. 需要支持 HTTP retry 的计费 ledger id 必须包含当前请求的 `request_id`;前端 `fetchWithApiAuth` 同一次业务请求的静默刷新重试复用同一个 `x-request-id`,后端不得再使用 prompt 指纹或随机 asset id 作为扣费幂等键。 +6. 外部生成已预扣费但后续失败时必须先同步调用钱包退款;若 SpacetimeDB 暂不可用,退款请求写入 `wallet-refund-outbox` 本地文件并由后台 worker 重放。默认启用,配置项为 `GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED`、`GENARRATIVE_WALLET_REFUND_OUTBOX_DIR`、`GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE`、`GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS` 和 `GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES`。outbox 文件按 refund ledger id 幂等落盘;成功重放后删除,坏文件隔离为 `corrupt-*`。 +7. 拼图首图后台生成的跨实例互斥锁必须落在 SpacetimeDB `puzzle_background_compile_task` 表,claim id 由 `task_id + request_id` 构成,释放时必须校验 claim id,避免旧后台任务释放新请求抢到的租约。 ## 外部服务与资产 @@ -640,6 +644,12 @@ npm run check:server-rs-ddd - Rust 结构体:`PuzzleAgentSessionRow` - 源码:`server-rs/crates/spacetime-module/src/puzzle.rs` +### `puzzle_background_compile_task` + +- Rust 结构体:`PuzzleBackgroundCompileTaskRow` +- 源码:`server-rs/crates/spacetime-module/src/puzzle.rs` +- 说明:拼图首图后台生成的跨 api-server 实例互斥 claim 表,只保存活动任务租约,不表达最终生成结果;`task_id` 为主键,`claim_id` 用于释放时防止误删新租约,租约超时时间为 30 分钟。 + ### `puzzle_event` - Rust 结构体:`PuzzleEvent` diff --git a/server-rs/crates/api-server/src/asset_billing.rs b/server-rs/crates/api-server/src/asset_billing.rs index 613ce234..c6430554 100644 --- a/server-rs/crates/api-server/src/asset_billing.rs +++ b/server-rs/crates/api-server/src/asset_billing.rs @@ -4,7 +4,11 @@ use axum::http::StatusCode; use serde_json::json; use spacetime_client::SpacetimeClientError; -use crate::{http_error::AppError, state::AppState}; +use crate::{ + http_error::AppError, + state::AppState, + wallet_refund_outbox::{WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxRecord}, +}; pub(crate) const ASSET_OPERATION_POINTS_COST: u64 = 1; @@ -90,22 +94,11 @@ async fn consume_asset_operation_points( .await { Ok(_) => Ok(true), - Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => { - // 中文注释:外部生图不应被 Maincloud 钱包短暂 503 阻断;此时跳过扣费,让业务链路继续,避免用户重复点击。 - tracing::warn!( - owner_user_id, - asset_kind, - asset_id, - error = %error, - "资产操作泥点预扣因 SpacetimeDB 连接不可用而降级跳过" - ); - Ok(false) - } Err(error) => Err(map_asset_operation_wallet_error(error)), } } -/// 外部生成或发布 mutation 失败后补偿退款;退款失败只记日志,避免覆盖原始业务错误。 +/// 外部生成或发布 mutation 失败后补偿退款;立即退款失败会进入 outbox,避免覆盖原始业务错误。 async fn refund_asset_operation_points( state: &AppState, owner_user_id: &str, @@ -117,22 +110,74 @@ async fn refund_asset_operation_points( "asset_operation_refund:{}:{}:{}", owner_user_id, asset_kind, asset_id ); + let created_at_micros = current_utc_micros(); if let Err(error) = state .spacetime_client() .refund_profile_wallet_points( owner_user_id.to_string(), points_cost, - ledger_id, - current_utc_micros(), + ledger_id.clone(), + created_at_micros, ) .await { + let refund_error = error.to_string(); + if let Some(outbox) = state.wallet_refund_outbox() { + match outbox + .enqueue(WalletRefundOutboxRecord { + owner_user_id: owner_user_id.to_string(), + amount: points_cost, + ledger_id: ledger_id.clone(), + created_at_micros, + asset_kind: asset_kind.to_string(), + asset_id: asset_id.to_string(), + }) + .await + { + Ok(WalletRefundOutboxEnqueueOutcome::Enqueued) => { + tracing::warn!( + owner_user_id, + asset_kind, + asset_id, + ledger_id, + error = %refund_error, + "资产操作失败后的泥点退款立即执行失败,已写入 wallet refund outbox" + ); + return; + } + Ok(WalletRefundOutboxEnqueueOutcome::Dropped { reason }) => { + tracing::error!( + owner_user_id, + asset_kind, + asset_id, + ledger_id, + reason, + error = %refund_error, + "资产操作失败后的泥点退款立即执行失败,且 wallet refund outbox 因容量限制丢弃" + ); + return; + } + Err(outbox_error) => { + tracing::error!( + owner_user_id, + asset_kind, + asset_id, + ledger_id, + refund_error = %refund_error, + outbox_error = %outbox_error, + "资产操作失败后的泥点退款立即执行失败,且写入 wallet refund outbox 失败" + ); + return; + } + } + } tracing::error!( owner_user_id, asset_kind, asset_id, - error = %error, - "资产操作失败后的泥点退款失败" + ledger_id, + error = %refund_error, + "资产操作失败后的泥点退款失败,且 wallet refund outbox 未启用" ); } } @@ -185,7 +230,7 @@ mod tests { use super::*; #[test] - fn asset_operation_billing_skips_spacetime_connectivity_errors() { + fn asset_operation_connectivity_errors_are_classified_for_non_billing_fallbacks() { assert_eq!(ASSET_OPERATION_POINTS_COST, 1); assert!(should_skip_asset_operation_billing_for_connectivity( &SpacetimeClientError::ConnectDropped diff --git a/server-rs/crates/api-server/src/bark_battle.rs b/server-rs/crates/api-server/src/bark_battle.rs index fa33c9ee..51f775ce 100644 --- a/server-rs/crates/api-server/src/bark_battle.rs +++ b/server-rs/crates/api-server/src/bark_battle.rs @@ -306,11 +306,12 @@ pub async fn generate_bark_battle_image_asset( .filter(|value| !value.is_empty()) .map(ToString::to_string); let points_cost = resolve_bark_battle_image_asset_points_cost(&state, &payload).await; + let billing_asset_id = request_context.request_id().to_string(); let result = execute_billable_asset_operation_with_cost( &state, &owner_user_id, bark_battle_slot_asset_kind(&slot), - asset_id.as_str(), + billing_asset_id.as_str(), points_cost, async { generate_and_persist_bark_battle_image_asset( diff --git a/server-rs/crates/api-server/src/big_fish.rs b/server-rs/crates/api-server/src/big_fish.rs index 1fc8636a..e8969117 100644 --- a/server-rs/crates/api-server/src/big_fish.rs +++ b/server-rs/crates/api-server/src/big_fish.rs @@ -722,7 +722,7 @@ pub async fn execute_big_fish_action( "big_fish_publish_game" => Some("big_fish_publish_game"), _ => None, }; - let billing_asset_id = format!("{session_id}:{now}"); + let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id()); let session_operation = async { match action.as_str() { "big_fish_compile_draft" => { diff --git a/server-rs/crates/api-server/src/config.rs b/server-rs/crates/api-server/src/config.rs index 3ca4a2a6..b23bb782 100644 --- a/server-rs/crates/api-server/src/config.rs +++ b/server-rs/crates/api-server/src/config.rs @@ -32,6 +32,11 @@ pub struct AppConfig { pub tracking_outbox_batch_size: usize, pub tracking_outbox_flush_interval: Duration, pub tracking_outbox_max_bytes: u64, + pub wallet_refund_outbox_enabled: bool, + pub wallet_refund_outbox_dir: PathBuf, + pub wallet_refund_outbox_batch_size: usize, + pub wallet_refund_outbox_flush_interval: Duration, + pub wallet_refund_outbox_max_bytes: u64, pub log_filter: String, pub otel_enabled: bool, pub admin_username: Option, @@ -183,6 +188,11 @@ impl Default for AppConfig { tracking_outbox_batch_size: 500, tracking_outbox_flush_interval: Duration::from_millis(1_000), tracking_outbox_max_bytes: 256 * 1024 * 1024, + wallet_refund_outbox_enabled: true, + wallet_refund_outbox_dir: PathBuf::from("server-rs/.data/wallet-refund-outbox"), + wallet_refund_outbox_batch_size: 100, + wallet_refund_outbox_flush_interval: Duration::from_millis(1_000), + wallet_refund_outbox_max_bytes: 64 * 1024 * 1024, log_filter: "info,tower_http=info".to_string(), otel_enabled: false, admin_username: None, @@ -409,6 +419,27 @@ impl AppConfig { { config.tracking_outbox_max_bytes = max_bytes; } + if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED"]) { + config.wallet_refund_outbox_enabled = enabled; + } + if let Some(dir) = read_first_non_empty_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_DIR"]) { + config.wallet_refund_outbox_dir = PathBuf::from(dir); + } + if let Some(batch_size) = + read_first_usize_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE"]) + { + config.wallet_refund_outbox_batch_size = batch_size; + } + if let Some(flush_interval_ms) = + read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS"]) + { + config.wallet_refund_outbox_flush_interval = Duration::from_millis(flush_interval_ms); + } + if let Some(max_bytes) = + read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES"]) + { + config.wallet_refund_outbox_max_bytes = max_bytes; + } if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) { config.otel_enabled = otel_enabled; } @@ -1380,6 +1411,11 @@ mod tests { std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE"); std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS"); std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES"); std::env::remove_var("GENARRATIVE_OTEL_ENABLED"); std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048"); std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6"); @@ -1396,6 +1432,14 @@ mod tests { std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE", "250"); std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS", "2000"); std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES", "1048576"); + std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED", "false"); + std::env::set_var( + "GENARRATIVE_WALLET_REFUND_OUTBOX_DIR", + "/tmp/genarrative-wallet-refund-outbox", + ); + std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE", "50"); + std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS", "3000"); + std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES", "524288"); std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true"); } @@ -1421,6 +1465,17 @@ mod tests { std::time::Duration::from_millis(2_000) ); assert_eq!(config.tracking_outbox_max_bytes, 1_048_576); + assert!(!config.wallet_refund_outbox_enabled); + assert_eq!( + config.wallet_refund_outbox_dir, + std::path::PathBuf::from("/tmp/genarrative-wallet-refund-outbox") + ); + assert_eq!(config.wallet_refund_outbox_batch_size, 50); + assert_eq!( + config.wallet_refund_outbox_flush_interval, + std::time::Duration::from_millis(3_000) + ); + assert_eq!(config.wallet_refund_outbox_max_bytes, 524_288); assert!(config.otel_enabled); unsafe { @@ -1436,6 +1491,11 @@ mod tests { std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE"); std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS"); std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS"); + std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES"); std::env::remove_var("GENARRATIVE_OTEL_ENABLED"); } } diff --git a/server-rs/crates/api-server/src/custom_world_ai.rs b/server-rs/crates/api-server/src/custom_world_ai.rs index d235f028..be751aa4 100644 --- a/server-rs/crates/api-server/src/custom_world_ai.rs +++ b/server-rs/crates/api-server/src/custom_world_ai.rs @@ -547,11 +547,12 @@ pub async fn generate_custom_world_scene_image( require_openai_image_settings(&state) .map_err(|error| custom_world_ai_error_response(&request_context, error))?; let asset_id = format!("custom-scene-{}", current_utc_millis()); + let billing_asset_id = request_context.request_id().to_string(); let asset = execute_billable_asset_operation( &state, &owner_user_id, "scene_image", - asset_id.as_str(), + billing_asset_id.as_str(), async { let settings = require_openai_image_settings(&state)?.with_external_api_audit_context( &request_context, @@ -806,11 +807,12 @@ pub async fn generate_custom_world_cover_image( require_dashscope_settings(&state) .map_err(|error| custom_world_ai_error_response(&request_context, error))?; let asset_id = format!("custom-cover-{}", current_utc_millis()); + let billing_asset_id = request_context.request_id().to_string(); let asset = execute_billable_asset_operation( &state, &owner_user_id, "custom_world_cover", - asset_id.as_str(), + billing_asset_id.as_str(), async { let settings = require_dashscope_settings(&state)?; let http_client = build_dashscope_http_client(&settings)?; @@ -1011,11 +1013,12 @@ pub async fn generate_custom_world_opening_cg( .map_err(|error| custom_world_ai_error_response(&request_context, error))?; let opening_cg_id = normalized.opening_cg_id.clone(); + let billing_asset_id = request_context.request_id().to_string(); let generated = execute_billable_asset_operation_with_cost( &state, &owner_user_id, "custom_world_opening_cg", - opening_cg_id.as_str(), + billing_asset_id.as_str(), OPENING_CG_POINTS_COST, async { let image_settings = require_openai_image_settings(&state)? diff --git a/server-rs/crates/api-server/src/main.rs b/server-rs/crates/api-server/src/main.rs index 1867c754..3b7d8e8c 100644 --- a/server-rs/crates/api-server/src/main.rs +++ b/server-rs/crates/api-server/src/main.rs @@ -89,6 +89,7 @@ mod tracking_outbox; mod vector_engine_audio_generation; mod visual_novel; mod volcengine_speech; +mod wallet_refund_outbox; mod wechat; mod wooden_fish; mod work_author; @@ -115,6 +116,7 @@ use crate::{ config::AppConfig, state::{AppState, AppStateInitError}, tracking_outbox::TrackingOutbox, + wallet_refund_outbox::WalletRefundOutbox, }; const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024; @@ -125,6 +127,7 @@ const AUTH_STORE_STARTUP_RETRY_INTERVAL: Duration = Duration::from_secs(5); struct ShutdownContext { app_state: Option, tracking_outbox: Option>, + wallet_refund_outbox: Option>, outbox_flush_timeout: Duration, } @@ -178,11 +181,16 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> { if let Some(outbox) = tracking_outbox.clone() { outbox.spawn_worker(); } + let wallet_refund_outbox = state.wallet_refund_outbox(); + if let Some(outbox) = wallet_refund_outbox.clone() { + outbox.spawn_worker(); + } ( build_router(state.clone()), ShutdownContext { app_state: Some(state), tracking_outbox, + wallet_refund_outbox, outbox_flush_timeout, }, ) @@ -192,6 +200,7 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> { ShutdownContext { app_state: None, tracking_outbox: None, + wallet_refund_outbox: None, outbox_flush_timeout, }, ), @@ -271,12 +280,8 @@ async fn finalize_shutdown(context: ShutdownContext) { state.mark_not_ready(); } - let Some(outbox) = context.tracking_outbox else { - return; - }; - if context.outbox_flush_timeout.is_zero() { - warn!("api-server 退出时 tracking outbox flush timeout 为 0,跳过主动 flush"); + warn!("api-server 退出时 outbox flush timeout 为 0,跳过主动 flush"); return; } @@ -284,22 +289,45 @@ async fn finalize_shutdown(context: ShutdownContext) { .outbox_flush_timeout .as_millis() .min(u128::from(u64::MAX)) as u64; - info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox"); - match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await { - Ok(Ok(())) => { - info!("api-server 退出前 tracking outbox flush 完成"); + if let Some(outbox) = context.tracking_outbox { + info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox"); + match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await { + Ok(Ok(())) => { + info!("api-server 退出前 tracking outbox flush 完成"); + } + Ok(Err(error)) => { + warn!( + error = %error, + "api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试" + ); + } + Err(_) => { + warn!( + timeout_ms, + "api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试" + ); + } } - Ok(Err(error)) => { - warn!( - error = %error, - "api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试" - ); - } - Err(_) => { - warn!( - timeout_ms, - "api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试" - ); + } + + if let Some(outbox) = context.wallet_refund_outbox { + info!(timeout_ms, "api-server 退出前 flush wallet refund outbox"); + match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await { + Ok(Ok(())) => { + info!("api-server 退出前 wallet refund outbox flush 完成"); + } + Ok(Err(error)) => { + warn!( + error = %error, + "api-server 退出前 wallet refund outbox flush 未完成,已保留本地文件等待下次启动重试" + ); + } + Err(_) => { + warn!( + timeout_ms, + "api-server 退出前 wallet refund outbox flush 超时,已保留本地文件等待下次启动重试" + ); + } } } } diff --git a/server-rs/crates/api-server/src/match3d.rs b/server-rs/crates/api-server/src/match3d.rs index 1513d3ee..61828c65 100644 --- a/server-rs/crates/api-server/src/match3d.rs +++ b/server-rs/crates/api-server/src/match3d.rs @@ -1,4 +1,4 @@ -use std::{ +use std::{ collections::BTreeMap, convert::Infallible, future::Future, @@ -65,10 +65,7 @@ use spacetime_client::{ use crate::{ api_response::json_success_body, - asset_billing::{ - execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error, - should_skip_asset_operation_billing_for_connectivity, - }, + asset_billing::{execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error}, auth::{AuthenticatedAccessToken, RuntimePrincipal}, config::AppConfig, generated_asset_sheets::apply_generated_asset_sheet_green_screen_alpha, @@ -354,13 +351,6 @@ impl Match3DItemAssetsGenerationPlan { Self::Replace(plan) => plan.requested_item_names.len(), } } - - fn billing_fingerprint_source(&self) -> String { - match self { - Self::Append(plan) => format!("append:{}", plan.requested_item_names.join("|")), - Self::Replace(plan) => format!("replace:{}", plan.requested_item_names.join("|")), - } - } } fn serialize_match3d_generated_item_assets(assets: &[Match3DGeneratedItemAsset]) -> Option { diff --git a/server-rs/crates/api-server/src/match3d/draft.rs b/server-rs/crates/api-server/src/match3d/draft.rs index eb99ec38..3c8a8c2d 100644 --- a/server-rs/crates/api-server/src/match3d/draft.rs +++ b/server-rs/crates/api-server/src/match3d/draft.rs @@ -162,7 +162,12 @@ pub(super) async fn compile_match3d_draft_for_session( let initial_tags = requested_tags .clone() .unwrap_or_else(|| fallback_work_metadata.tags.clone()); - let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, current_utc_micros()); + let billing_asset_id = format!( + "{}:{}:{}", + session_id, + profile_id, + request_context.request_id() + ); let points_cost = crate::creation_entry_config::resolve_creation_entry_mud_point_cost( state, "match3d", @@ -514,15 +519,6 @@ async fn consume_match3d_draft_generation_points( .await { Ok(_) => Ok(true), - Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => { - tracing::warn!( - owner_user_id, - billing_asset_id, - error = %error, - "抓大鹅草稿泥点预扣因 SpacetimeDB 连接不可用而降级跳过" - ); - Ok(false) - } Err(error) => Err(match3d_error_response( request_context, MATCH3D_AGENT_PROVIDER, diff --git a/server-rs/crates/api-server/src/match3d/handlers.rs b/server-rs/crates/api-server/src/match3d/handlers.rs index 6eed1281..bc3462ef 100644 --- a/server-rs/crates/api-server/src/match3d/handlers.rs +++ b/server-rs/crates/api-server/src/match3d/handlers.rs @@ -751,7 +751,6 @@ pub async fn generate_match3d_background_image_for_work( )?; let prompt = normalize_match3d_background_prompt(payload.prompt.as_str()); ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?; - let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str()); let context = load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id) @@ -763,7 +762,12 @@ pub async fn generate_match3d_background_image_for_work( config, assets, } = context; - let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, prompt_fingerprint); + let billing_asset_id = format!( + "{}:{}:{}", + session_id, + profile_id, + request_context.request_id() + ); let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost( &state, owner_user_id.as_str(), @@ -860,7 +864,6 @@ pub async fn generate_match3d_container_image_for_work( )?; let prompt = normalize_match3d_background_prompt(payload.prompt.as_str()); ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?; - let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str()); let context = load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id) @@ -874,7 +877,9 @@ pub async fn generate_match3d_container_image_for_work( } = context; let billing_asset_id = format!( "{}:{}:{}:container", - session_id, profile_id, prompt_fingerprint + session_id, + profile_id, + request_context.request_id() ); let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost( &state, @@ -1017,7 +1022,7 @@ pub async fn generate_match3d_item_assets_for_work( session_id, profile_id, billed_item_count, - build_match3d_prompt_fingerprint(generation_plan.billing_fingerprint_source().as_str()) + request_context.request_id() ); let generated_assets = execute_billable_asset_operation_with_cost( &state, diff --git a/server-rs/crates/api-server/src/match3d/item_assets.rs b/server-rs/crates/api-server/src/match3d/item_assets.rs index 670d3ca8..5de20f2e 100644 --- a/server-rs/crates/api-server/src/match3d/item_assets.rs +++ b/server-rs/crates/api-server/src/match3d/item_assets.rs @@ -1208,14 +1208,6 @@ pub(super) fn normalize_match3d_background_prompt(raw: &str) -> String { .to_string() } -pub(super) fn build_match3d_prompt_fingerprint(value: &str) -> String { - let mut hash = 0u32; - for character in value.chars() { - hash = hash.wrapping_mul(31).wrapping_add(character as u32); - } - format!("{hash:08x}") -} - pub(super) fn build_fallback_match3d_background_prompt(config: &Match3DConfigJson) -> String { let theme = config.theme_text.trim(); let normalized_theme = if theme.is_empty() { "抓大鹅" } else { theme }; diff --git a/server-rs/crates/api-server/src/puzzle.rs b/server-rs/crates/api-server/src/puzzle.rs index b428f2f2..6581676c 100644 --- a/server-rs/crates/api-server/src/puzzle.rs +++ b/server-rs/crates/api-server/src/puzzle.rs @@ -1,6 +1,5 @@ use std::{ - collections::{BTreeMap, HashSet}, - sync::{Mutex, OnceLock}, + collections::BTreeMap, time::{Instant, SystemTime, UNIX_EPOCH}, }; @@ -56,17 +55,19 @@ use spacetime_client::{ PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord, - PuzzleAudioAssetRecord, PuzzleCreatorIntentRecord, PuzzleDraftCompileFailureRecordInput, - PuzzleDraftLevelRecord, PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, - PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput, - PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput, - PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, - PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, - PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord, - PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleSelectCoverImageRecordInput, - PuzzleUiBackgroundSaveRecordInput, PuzzleWorkLikeReportRecordInput, - PuzzleWorkPointIncentiveClaimRecordInput, PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, - PuzzleWorkUpsertRecordInput, SpacetimeClientError, + PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput, + PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleCreatorIntentRecord, + PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord, + PuzzleFormDraftSaveRecordInput, PuzzleGeneratedImageCandidateRecord, + PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord, + PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord, + PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord, + PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunPauseRecordInput, + PuzzleRunPropRecordInput, PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, + PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput, + PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput, + PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput, + SpacetimeClientError, }; use std::convert::Infallible; @@ -134,38 +135,51 @@ const PUZZLE_UI_BACKGROUND_PROMPT_FALLBACK_MARKER: &str = const PUZZLE_VECTOR_ENGINE_SQUARE_IMAGE_SIZE: &str = "1024x1024"; const PUZZLE_VECTOR_ENGINE_PORTRAIT_IMAGE_SIZE: &str = "1024x1536"; -static PUZZLE_BACKGROUND_COMPILE_TASKS: OnceLock>> = OnceLock::new(); - -fn puzzle_background_compile_tasks() -> &'static Mutex> { - PUZZLE_BACKGROUND_COMPILE_TASKS.get_or_init(|| Mutex::new(HashSet::new())) +fn build_puzzle_background_compile_task_id(session_id: &str) -> String { + format!("puzzle_initial_background:{session_id}") } -fn try_register_puzzle_background_compile_task(session_id: &str) -> bool { - match puzzle_background_compile_tasks().lock() { - Ok(mut tasks) => tasks.insert(session_id.to_string()), - Err(error) => { +fn build_puzzle_background_compile_claim_id(task_id: &str, request_id: &str) -> String { + format!("{task_id}:{request_id}") +} + +async fn release_claimed_puzzle_background_compile_task( + state: &PuzzleApiState, + task_id: &str, + claim_id: &str, + session_id: &str, + owner_user_id: &str, +) { + let result = state + .spacetime_client() + .release_puzzle_background_compile_task(PuzzleBackgroundCompileTaskReleaseRecordInput { + task_id: task_id.to_string(), + claim_id: claim_id.to_string(), + session_id: session_id.to_string(), + owner_user_id: owner_user_id.to_string(), + }) + .await; + match result { + Ok(true) => {} + Ok(false) => { tracing::warn!( provider = PUZZLE_AGENT_API_BASE_PROVIDER, + task_id, + claim_id, session_id, - error = %error, - "拼图后台生成任务注册表锁已损坏,允许本次任务继续" + owner_user_id, + "拼图首图后台生成任务释放未命中当前 claim" ); - true - } - } -} - -fn unregister_puzzle_background_compile_task(session_id: &str) { - match puzzle_background_compile_tasks().lock() { - Ok(mut tasks) => { - tasks.remove(session_id); } Err(error) => { tracing::warn!( provider = PUZZLE_AGENT_API_BASE_PROVIDER, + task_id, + claim_id, session_id, + owner_user_id, error = %error, - "拼图后台生成任务注册表解锁失败,忽略清理" + "拼图首图后台生成任务释放失败" ); } } diff --git a/server-rs/crates/api-server/src/puzzle/handlers.rs b/server-rs/crates/api-server/src/puzzle/handlers.rs index 2fa1a265..d7a2b0bd 100644 --- a/server-rs/crates/api-server/src/puzzle/handlers.rs +++ b/server-rs/crates/api-server/src/puzzle/handlers.rs @@ -588,7 +588,7 @@ pub async fn execute_puzzle_agent_action( let owner_user_id = authenticated.claims().user_id().to_string(); let now = current_utc_micros(); let action = payload.action.trim().to_string(); - let billing_asset_id = format!("{session_id}:{now}"); + let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id()); let mut operation_consumed_points = 0; tracing::info!( provider = PUZZLE_AGENT_API_BASE_PROVIDER, @@ -695,156 +695,207 @@ pub async fn execute_puzzle_agent_action( Err(response) => return Err(response), }; let session = if ai_redraw { - if !try_register_puzzle_background_compile_task(&compile_session_id) { - tracing::info!( - provider = PUZZLE_AGENT_API_BASE_PROVIDER, - session_id = %compile_session_id, - owner_user_id = %owner_user_id, - "拼图首图后台生成任务已存在,本次 action 直接返回生成中会话" - ); - state - .spacetime_client() - .get_puzzle_agent_session(compile_session_id.clone(), owner_user_id.clone()) - .await - .map(mark_puzzle_initial_generation_started_snapshot) - .map_err(map_puzzle_client_error) - } else { - let compiled_session = state - .spacetime_client() - .compile_puzzle_agent_draft( - compile_session_id.clone(), - owner_user_id.clone(), - now, - ) - .await - .map_err(map_puzzle_compile_error); - match compiled_session { - Ok(compiled_session) => { - let response_session = mark_puzzle_initial_generation_started_snapshot( - compiled_session.clone(), - ); - let background_state = state.clone(); - let background_request_context = request_context.clone(); - let background_session_id = compile_session_id.clone(); - let background_owner_user_id = owner_user_id.clone(); - let background_prompt_text = prompt_text.map(str::to_string); - let background_reference_image_src = - primary_reference_image_src.map(str::to_string); - let background_image_model = payload.image_model.clone(); - let background_points_cost = puzzle_draft_generation_points_cost; - let background_work_name = compiled_session - .draft - .as_ref() - .map(|draft| draft.work_title.clone()); - let background_billing_asset_id = - format!("{background_session_id}:compile_puzzle_draft"); - tokio::spawn(async move { - let operation_owner_user_id = background_owner_user_id.clone(); - let background_root_state = background_state.root_state().clone(); - let operation_state = background_state.clone(); - let result = execute_billable_asset_operation_with_cost( - &background_root_state, - &background_owner_user_id, - "puzzle_initial_image", - &background_billing_asset_id, - background_points_cost, - async move { - generate_puzzle_initial_cover_from_compiled_session( - &operation_state, - &background_request_context, - compiled_session, - operation_owner_user_id, - background_prompt_text.as_deref(), - background_reference_image_src.as_deref(), - background_image_model.as_deref(), - current_utc_micros(), - ) - .await - }, - ) - .await; - match result { - Ok(session) => { - send_generation_result_subscribe_message_after_completion( - &background_root_state, - GenerationResultSubscribeMessage { - owner_user_id: background_owner_user_id.clone(), - task_name: Some("拼图".to_string()), - work_name: session - .draft - .as_ref() - .map(|draft| draft.work_title.clone()), - status: - GenerationResultSubscribeMessageStatus::Succeeded, - consumed_points: background_points_cost, - completed_at_micros: current_utc_micros(), - page: Some("/pages/web-view/index".to_string()), - }, - ) - .await; - tracing::info!( - provider = PUZZLE_AGENT_API_BASE_PROVIDER, - session_id = %session.session_id, - owner_user_id = %background_owner_user_id, - "拼图首图后台生成任务完成" - ); - } - Err(error) => { - let error_message = error.body_text(); - let failed_at_micros = current_utc_micros(); - let failure_result = background_state - .spacetime_client() - .mark_puzzle_draft_generation_failed( - PuzzleDraftCompileFailureRecordInput { - session_id: background_session_id.clone(), - owner_user_id: background_owner_user_id.clone(), - error_message: error_message.clone(), - failed_at_micros, - }, + let background_task_id = + build_puzzle_background_compile_task_id(&compile_session_id); + let background_claim_id = build_puzzle_background_compile_claim_id( + &background_task_id, + request_context.request_id(), + ); + let claim_result = state + .spacetime_client() + .claim_puzzle_background_compile_task( + PuzzleBackgroundCompileTaskClaimRecordInput { + task_id: background_task_id.clone(), + claim_id: background_claim_id.clone(), + session_id: compile_session_id.clone(), + owner_user_id: owner_user_id.clone(), + claimed_at_micros: current_utc_micros(), + }, + ) + .await + .map_err(map_puzzle_client_error); + match claim_result { + Ok(false) => { + tracing::info!( + provider = PUZZLE_AGENT_API_BASE_PROVIDER, + session_id = %compile_session_id, + owner_user_id = %owner_user_id, + task_id = %background_task_id, + "拼图首图后台生成任务已存在,本次 action 直接返回生成中会话" + ); + state + .spacetime_client() + .get_puzzle_agent_session( + compile_session_id.clone(), + owner_user_id.clone(), + ) + .await + .map(mark_puzzle_initial_generation_started_snapshot) + .map_err(map_puzzle_client_error) + } + Ok(true) => { + let compiled_session = state + .spacetime_client() + .compile_puzzle_agent_draft( + compile_session_id.clone(), + owner_user_id.clone(), + now, + ) + .await + .map_err(map_puzzle_compile_error); + match compiled_session { + Ok(compiled_session) => { + let response_session = + mark_puzzle_initial_generation_started_snapshot( + compiled_session.clone(), + ); + let background_state = state.clone(); + let background_request_context = request_context.clone(); + let background_session_id = compile_session_id.clone(); + let background_owner_user_id = owner_user_id.clone(); + let background_task_id = background_task_id.clone(); + let background_claim_id = background_claim_id.clone(); + let background_prompt_text = prompt_text.map(str::to_string); + let background_reference_image_src = + primary_reference_image_src.map(str::to_string); + let background_image_model = payload.image_model.clone(); + let background_points_cost = puzzle_draft_generation_points_cost; + let background_work_name = compiled_session + .draft + .as_ref() + .map(|draft| draft.work_title.clone()); + let background_billing_asset_id = format!( + "{background_session_id}:compile_puzzle_draft:{}", + background_request_context.request_id() + ); + tokio::spawn(async move { + let operation_owner_user_id = background_owner_user_id.clone(); + let background_root_state = + background_state.root_state().clone(); + let operation_state = background_state.clone(); + let result = execute_billable_asset_operation_with_cost( + &background_root_state, + &background_owner_user_id, + "puzzle_initial_image", + &background_billing_asset_id, + background_points_cost, + async move { + generate_puzzle_initial_cover_from_compiled_session( + &operation_state, + &background_request_context, + compiled_session, + operation_owner_user_id, + background_prompt_text.as_deref(), + background_reference_image_src.as_deref(), + background_image_model.as_deref(), + current_utc_micros(), ) - .await; - if let Err(mark_error) = failure_result { - tracing::warn!( - provider = PUZZLE_AGENT_API_BASE_PROVIDER, - session_id = %background_session_id, - owner_user_id = %background_owner_user_id, - message = %mark_error, - "拼图首图后台生成失败态回写失败" - ); - } else { + .await + }, + ) + .await; + match result { + Ok(session) => { send_generation_result_subscribe_message_after_completion( &background_root_state, GenerationResultSubscribeMessage { owner_user_id: background_owner_user_id.clone(), task_name: Some("拼图".to_string()), - work_name: background_work_name.clone(), + work_name: session + .draft + .as_ref() + .map(|draft| draft.work_title.clone()), status: - GenerationResultSubscribeMessageStatus::Failed, - consumed_points: 0, - completed_at_micros: failed_at_micros, + GenerationResultSubscribeMessageStatus::Succeeded, + consumed_points: background_points_cost, + completed_at_micros: current_utc_micros(), page: Some("/pages/web-view/index".to_string()), }, ) .await; + tracing::info!( + provider = PUZZLE_AGENT_API_BASE_PROVIDER, + session_id = %session.session_id, + owner_user_id = %background_owner_user_id, + "拼图首图后台生成任务完成" + ); + } + Err(error) => { + let error_message = error.body_text(); + let failed_at_micros = current_utc_micros(); + let failure_result = background_state + .spacetime_client() + .mark_puzzle_draft_generation_failed( + PuzzleDraftCompileFailureRecordInput { + session_id: background_session_id.clone(), + owner_user_id: background_owner_user_id + .clone(), + error_message: error_message.clone(), + failed_at_micros, + }, + ) + .await; + if let Err(mark_error) = failure_result { + tracing::warn!( + provider = PUZZLE_AGENT_API_BASE_PROVIDER, + session_id = %background_session_id, + owner_user_id = %background_owner_user_id, + message = %mark_error, + "拼图首图后台生成失败态回写失败" + ); + } else { + send_generation_result_subscribe_message_after_completion( + &background_root_state, + GenerationResultSubscribeMessage { + owner_user_id: background_owner_user_id + .clone(), + task_name: Some("拼图".to_string()), + work_name: background_work_name.clone(), + status: + GenerationResultSubscribeMessageStatus::Failed, + consumed_points: 0, + completed_at_micros: failed_at_micros, + page: Some( + "/pages/web-view/index".to_string(), + ), + }, + ) + .await; + } + tracing::warn!( + provider = PUZZLE_AGENT_API_BASE_PROVIDER, + session_id = %background_session_id, + owner_user_id = %background_owner_user_id, + message = %error_message, + "拼图首图后台生成任务失败" + ); } - tracing::warn!( - provider = PUZZLE_AGENT_API_BASE_PROVIDER, - session_id = %background_session_id, - owner_user_id = %background_owner_user_id, - message = %error_message, - "拼图首图后台生成任务失败" - ); } - } - unregister_puzzle_background_compile_task(&background_session_id); - }); - Ok(response_session) - } - Err(error) => { - unregister_puzzle_background_compile_task(&compile_session_id); - Err(error) + release_claimed_puzzle_background_compile_task( + &background_state, + &background_task_id, + &background_claim_id, + &background_session_id, + &background_owner_user_id, + ) + .await; + }); + Ok(response_session) + } + Err(error) => { + release_claimed_puzzle_background_compile_task( + &state, + &background_task_id, + &background_claim_id, + &compile_session_id, + &owner_user_id, + ) + .await; + Err(error) + } } } + Err(error) => Err(error), } } else { compile_puzzle_draft_with_uploaded_cover( @@ -2231,7 +2282,7 @@ pub async fn use_puzzle_runtime_prop( } }; let should_sync_freeze_boundary = matches!(prop_kind.as_str(), "freezeTime" | "freeze_time"); - let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, current_utc_micros()); + let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, request_context.request_id()); let reducer_owner_user_id = owner_user_id.clone(); let reducer_run_id = run_id.clone(); let fallback_run_id = run_id.clone(); diff --git a/server-rs/crates/api-server/src/state.rs b/server-rs/crates/api-server/src/state.rs index d8882747..57653736 100644 --- a/server-rs/crates/api-server/src/state.rs +++ b/server-rs/crates/api-server/src/state.rs @@ -41,6 +41,7 @@ use tracing::{info, warn}; use crate::config::AppConfig; use crate::puzzle_gallery_cache::PuzzleGalleryCache; use crate::tracking_outbox::TrackingOutbox; +use crate::wallet_refund_outbox::WalletRefundOutbox; use crate::wechat::pay::{build_wechat_pay_config, map_wechat_pay_init_error}; use crate::wechat::provider::build_wechat_provider; use crate::work_author::{ @@ -263,6 +264,7 @@ pub struct AppStateInner { spacetime_client: SpacetimeClient, puzzle_gallery_cache: PuzzleGalleryCache, tracking_outbox: Option>, + wallet_refund_outbox: Option>, llm_client: Option, creative_agent_gpt5_client: Option, creative_agent_executor: Arc, @@ -406,6 +408,8 @@ impl AppState { procedure_timeout: config.spacetime_procedure_timeout, }); let tracking_outbox = TrackingOutbox::from_config(&config, spacetime_client.clone()); + let wallet_refund_outbox = + WalletRefundOutbox::from_config(&config, spacetime_client.clone()); let llm_client = build_llm_client(&config)?; let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?; let http_request_permit_pools = HttpRequestPermitPools::from_config(&config); @@ -441,6 +445,7 @@ impl AppState { spacetime_client, puzzle_gallery_cache: PuzzleGalleryCache::new(), tracking_outbox, + wallet_refund_outbox, llm_client, creative_agent_gpt5_client, creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor), @@ -922,6 +927,10 @@ impl AppState { self.tracking_outbox.clone() } + pub fn wallet_refund_outbox(&self) -> Option> { + self.wallet_refund_outbox.clone() + } + pub fn llm_client(&self) -> Option<&LlmClient> { self.llm_client.as_ref() } diff --git a/server-rs/crates/api-server/src/wallet_refund_outbox.rs b/server-rs/crates/api-server/src/wallet_refund_outbox.rs new file mode 100644 index 00000000..0169cd2a --- /dev/null +++ b/server-rs/crates/api-server/src/wallet_refund_outbox.rs @@ -0,0 +1,463 @@ +use std::{ + fmt, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use spacetime_client::{SpacetimeClient, SpacetimeClientError}; +use tokio::{ + fs::{self, File, OpenOptions}, + io::{AsyncReadExt, AsyncWriteExt}, + sync::{Mutex, Notify}, + time::sleep, +}; +use tracing::{debug, warn}; + +use crate::config::AppConfig; + +const PENDING_FILE_PREFIX: &str = "refund-"; +const CORRUPT_FILE_PREFIX: &str = "corrupt-"; +const TEMP_FILE_PREFIX: &str = "tmp-"; +const OUTBOX_FILE_EXTENSION: &str = ".json"; + +#[derive(Clone)] +pub struct WalletRefundOutbox { + dir: PathBuf, + batch_size: usize, + flush_interval: Duration, + max_bytes: u64, + spacetime_client: SpacetimeClient, + enqueue_lock: Arc>, + flush_notify: Arc, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct WalletRefundOutboxRecord { + pub owner_user_id: String, + pub amount: u64, + pub ledger_id: String, + pub created_at_micros: i64, + pub asset_kind: String, + pub asset_id: String, +} + +#[derive(Debug)] +pub enum WalletRefundOutboxEnqueueOutcome { + Enqueued, + Dropped { reason: &'static str }, +} + +#[derive(Debug)] +pub enum WalletRefundOutboxError { + Io(std::io::Error), + Json(serde_json::Error), + Spacetime(SpacetimeClientError), +} + +impl WalletRefundOutbox { + pub fn from_config(config: &AppConfig, spacetime_client: SpacetimeClient) -> Option> { + if !config.wallet_refund_outbox_enabled { + return None; + } + + Some(Arc::new(Self { + dir: config.wallet_refund_outbox_dir.clone(), + batch_size: config.wallet_refund_outbox_batch_size.max(1), + flush_interval: config.wallet_refund_outbox_flush_interval, + max_bytes: config.wallet_refund_outbox_max_bytes, + spacetime_client, + enqueue_lock: Arc::new(Mutex::new(())), + flush_notify: Arc::new(Notify::new()), + })) + } + + pub async fn enqueue( + &self, + record: WalletRefundOutboxRecord, + ) -> Result { + let _guard = self.enqueue_lock.lock().await; + fs::create_dir_all(&self.dir).await?; + + let pending_path = self.pending_path_for_ledger(&record.ledger_id); + if fs::metadata(&pending_path).await.is_ok() { + self.flush_notify.notify_one(); + return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued); + } + + let bytes = serde_json::to_vec(&record)?; + let line_bytes = bytes.len().min(u64::MAX as usize) as u64; + let current_bytes = directory_size_if_exists(&self.dir).unwrap_or(0); + if current_bytes.saturating_add(line_bytes) > self.max_bytes { + return Ok(WalletRefundOutboxEnqueueOutcome::Dropped { + reason: "max_bytes", + }); + } + + let temp_path = self.temp_path(); + let mut file = OpenOptions::new() + .create_new(true) + .write(true) + .open(&temp_path) + .await?; + file.write_all(&bytes).await?; + file.flush().await?; + file.sync_data().await?; + drop(file); + if fs::metadata(&pending_path).await.is_ok() { + let _ = fs::remove_file(&temp_path).await; + self.flush_notify.notify_one(); + return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued); + } + fs::rename(&temp_path, &pending_path).await?; + sync_directory_metadata(&self.dir).await?; + self.flush_notify.notify_one(); + Ok(WalletRefundOutboxEnqueueOutcome::Enqueued) + } + + pub fn spawn_worker(self: Arc) { + tokio::spawn(async move { + loop { + tokio::select! { + _ = sleep(self.flush_interval) => { + if let Err(error) = self.flush_pending_files_once().await { + warn!(error = %error, "wallet refund outbox 重放退款失败,将保留文件等待重试"); + } + } + _ = self.flush_notify.notified() => { + if let Err(error) = self.flush_pending_files_once().await { + warn!(error = %error, "wallet refund outbox 主动重放退款失败,将保留文件等待重试"); + } + } + } + } + }); + } + + pub async fn flush_for_shutdown(&self) -> Result<(), WalletRefundOutboxError> { + self.flush_pending_files_once().await + } + + async fn flush_pending_files_once(&self) -> Result<(), WalletRefundOutboxError> { + fs::create_dir_all(&self.dir).await?; + let pending_files = self.list_pending_files().await?; + for path in pending_files.into_iter().take(self.batch_size) { + let record = match read_refund_record(&path).await { + Ok(record) => record, + Err(error) if error.is_data_corruption() => { + let corrupt_path = self.corrupt_path_for(&path); + fs::rename(&path, &corrupt_path).await?; + sync_directory_metadata(&self.dir).await?; + warn!( + error = %error, + source = %path.display(), + target = %corrupt_path.display(), + "wallet refund outbox 文件无法解析,已隔离" + ); + continue; + } + Err(error) => return Err(error), + }; + + match self + .spacetime_client + .refund_profile_wallet_points( + record.owner_user_id.clone(), + record.amount, + record.ledger_id.clone(), + record.created_at_micros, + ) + .await + { + Ok(_) => { + match fs::remove_file(&path).await { + Ok(()) => {} + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => return Err(error.into()), + } + sync_directory_metadata(&self.dir).await?; + debug!( + ledger_id = %record.ledger_id, + owner_user_id = %record.owner_user_id, + asset_kind = %record.asset_kind, + asset_id = %record.asset_id, + path = %path.display(), + "wallet refund outbox 退款已重放并删除文件" + ); + } + Err(error) => return Err(WalletRefundOutboxError::Spacetime(error)), + } + } + Ok(()) + } + + async fn list_pending_files(&self) -> Result, WalletRefundOutboxError> { + let mut entries = fs::read_dir(&self.dir).await?; + let mut files = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|value| value.to_str()) else { + continue; + }; + if name.starts_with(PENDING_FILE_PREFIX) && name.ends_with(OUTBOX_FILE_EXTENSION) { + files.push(path); + } + } + files.sort(); + Ok(files) + } + + fn pending_path_for_ledger(&self, ledger_id: &str) -> PathBuf { + self.dir.join(format!( + "{PENDING_FILE_PREFIX}{}{OUTBOX_FILE_EXTENSION}", + ledger_id_hash(ledger_id) + )) + } + + fn temp_path(&self) -> PathBuf { + self.dir.join(format!( + "{TEMP_FILE_PREFIX}{}-{uuid}{OUTBOX_FILE_EXTENSION}", + current_unix_micros(), + uuid = uuid::Uuid::new_v4() + )) + } + + fn corrupt_path_for(&self, path: &Path) -> PathBuf { + let name = path + .file_name() + .and_then(|value| value.to_str()) + .unwrap_or("unknown.json"); + self.dir.join(format!( + "{CORRUPT_FILE_PREFIX}{}-{uuid}-{name}", + current_unix_micros(), + uuid = uuid::Uuid::new_v4() + )) + } +} + +impl fmt::Debug for WalletRefundOutbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WalletRefundOutbox") + .field("dir", &self.dir) + .field("batch_size", &self.batch_size) + .field("flush_interval", &self.flush_interval) + .field("max_bytes", &self.max_bytes) + .finish() + } +} + +impl fmt::Display for WalletRefundOutboxError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(error) => write!(f, "{error}"), + Self::Json(error) => write!(f, "{error}"), + Self::Spacetime(error) => write!(f, "{error}"), + } + } +} + +impl From for WalletRefundOutboxError { + fn from(value: std::io::Error) -> Self { + Self::Io(value) + } +} + +impl From for WalletRefundOutboxError { + fn from(value: serde_json::Error) -> Self { + Self::Json(value) + } +} + +impl WalletRefundOutboxError { + fn is_data_corruption(&self) -> bool { + matches!(self, Self::Json(_)) + } +} + +async fn read_refund_record( + path: &Path, +) -> Result { + let mut file = File::open(path).await?; + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes).await?; + Ok(serde_json::from_slice::(&bytes)?) +} + +fn directory_size_if_exists(path: &Path) -> Result { + if !path.is_dir() { + return Ok(0); + } + + let mut total = 0u64; + for entry in std::fs::read_dir(path)? { + let entry = entry?; + if !is_pending_outbox_file_name(&entry.file_name()) { + continue; + } + let metadata = entry.metadata()?; + if metadata.is_file() { + total = total.saturating_add(metadata.len()); + } + } + Ok(total) +} + +fn current_unix_micros() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_micros() +} + +fn ledger_id_hash(ledger_id: &str) -> String { + hex::encode(Sha256::digest(ledger_id.as_bytes())) +} + +fn is_pending_outbox_file_name(name: &std::ffi::OsStr) -> bool { + name.to_str().is_some_and(|value| { + value.starts_with(PENDING_FILE_PREFIX) && value.ends_with(OUTBOX_FILE_EXTENSION) + }) +} + +async fn sync_directory_metadata(path: &Path) -> Result<(), WalletRefundOutboxError> { + let path = path.to_path_buf(); + tokio::task::spawn_blocking(move || { + let dir = std::fs::File::open(path)?; + dir.sync_all() + }) + .await + .map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error.to_string()))??; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_record(ledger_id: &str) -> WalletRefundOutboxRecord { + WalletRefundOutboxRecord { + owner_user_id: "user-1".to_string(), + amount: 2, + ledger_id: ledger_id.to_string(), + created_at_micros: 1_713_680_000_000_000, + asset_kind: "puzzle_initial_image".to_string(), + asset_id: "asset-1".to_string(), + } + } + + fn test_dir(name: &str) -> PathBuf { + let dir = std::env::temp_dir().join(format!( + "genarrative-wallet-refund-outbox-{name}-{}", + current_unix_micros() + )); + let _ = std::fs::remove_dir_all(&dir); + dir + } + + fn test_outbox(dir: PathBuf, max_bytes: u64) -> Arc { + let config = AppConfig { + wallet_refund_outbox_dir: dir, + wallet_refund_outbox_batch_size: 500, + wallet_refund_outbox_flush_interval: Duration::from_secs(60), + wallet_refund_outbox_max_bytes: max_bytes, + ..AppConfig::default() + }; + WalletRefundOutbox::from_config( + &config, + SpacetimeClient::new(spacetime_client::SpacetimeClientConfig { + server_url: "http://127.0.0.1:1".to_string(), + database: "missing".to_string(), + token: None, + pool_size: 1, + procedure_timeout: Duration::from_millis(10), + }), + ) + .expect("outbox should be enabled") + } + + #[tokio::test] + async fn enqueue_is_idempotent_per_ledger_id() { + let dir = test_dir("idempotent"); + let outbox = test_outbox(dir.clone(), 1024 * 1024); + + outbox.enqueue(sample_record("ledger-1")).await.unwrap(); + outbox.enqueue(sample_record("ledger-1")).await.unwrap(); + + let pending_count = std::fs::read_dir(&dir) + .unwrap() + .filter_map(Result::ok) + .filter(|entry| is_pending_outbox_file_name(&entry.file_name())) + .count(); + assert_eq!(pending_count, 1); + + let _ = std::fs::remove_dir_all(dir); + } + + #[tokio::test] + async fn enqueue_drops_when_outbox_exceeds_max_bytes() { + let dir = test_dir("max-bytes"); + let outbox = test_outbox(dir.clone(), 1); + + let outcome = outbox.enqueue(sample_record("ledger-1")).await.unwrap(); + + assert!(matches!( + outcome, + WalletRefundOutboxEnqueueOutcome::Dropped { + reason: "max_bytes" + } + )); + assert!(!dir.exists() || std::fs::read_dir(&dir).unwrap().next().is_none()); + + let _ = std::fs::remove_dir_all(dir); + } + + #[tokio::test] + async fn flush_quarantines_corrupt_file() { + let dir = test_dir("corrupt"); + std::fs::create_dir_all(&dir).unwrap(); + let pending_path = dir.join(format!("{PENDING_FILE_PREFIX}bad{OUTBOX_FILE_EXTENSION}")); + std::fs::write(&pending_path, b"{not-json}").unwrap(); + let outbox = test_outbox(dir.clone(), 1024 * 1024); + + outbox.flush_pending_files_once().await.unwrap(); + + assert!(!pending_path.exists()); + let corrupt_count = std::fs::read_dir(&dir) + .unwrap() + .filter_map(Result::ok) + .filter(|entry| { + entry + .file_name() + .to_str() + .is_some_and(|name| name.starts_with(CORRUPT_FILE_PREFIX)) + }) + .count(); + assert_eq!(corrupt_count, 1); + + let _ = std::fs::remove_dir_all(dir); + } + + #[tokio::test] + async fn shutdown_flush_keeps_file_when_spacetime_is_unavailable() { + let dir = test_dir("shutdown"); + let outbox = test_outbox(dir.clone(), 1024 * 1024); + + outbox.enqueue(sample_record("ledger-1")).await.unwrap(); + let result = outbox.flush_for_shutdown().await; + + assert!( + matches!(result, Err(WalletRefundOutboxError::Spacetime(_))), + "missing test SpacetimeDB should keep refund file for retry" + ); + let pending_count = std::fs::read_dir(&dir) + .unwrap() + .filter_map(Result::ok) + .filter(|entry| is_pending_outbox_file_name(&entry.file_name())) + .count(); + assert_eq!(pending_count, 1); + + let _ = std::fs::remove_dir_all(dir); + } +} diff --git a/server-rs/crates/module-puzzle/src/application.rs b/server-rs/crates/module-puzzle/src/application.rs index eca056e2..ee1061ba 100644 --- a/server-rs/crates/module-puzzle/src/application.rs +++ b/server-rs/crates/module-puzzle/src/application.rs @@ -20,6 +20,14 @@ pub struct PuzzleAgentSessionProcedureResult { pub error_message: Option, } +#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct PuzzleBackgroundCompileTaskProcedureResult { + pub ok: bool, + pub claimed: bool, + pub error_message: Option, +} + #[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct PuzzleWorksProcedureResult { diff --git a/server-rs/crates/module-puzzle/src/commands.rs b/server-rs/crates/module-puzzle/src/commands.rs index 85f975e5..994ecd9e 100644 --- a/server-rs/crates/module-puzzle/src/commands.rs +++ b/server-rs/crates/module-puzzle/src/commands.rs @@ -68,6 +68,25 @@ pub struct PuzzleDraftCompileInput { pub compiled_at_micros: i64, } +#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct PuzzleBackgroundCompileTaskClaimInput { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, + pub claimed_at_micros: i64, +} + +#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct PuzzleBackgroundCompileTaskReleaseInput { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, +} + #[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct PuzzleDraftCompileFailureInput { diff --git a/server-rs/crates/spacetime-client/src/lib.rs b/server-rs/crates/spacetime-client/src/lib.rs index 626b34d3..fc4cd607 100644 --- a/server-rs/crates/spacetime-client/src/lib.rs +++ b/server-rs/crates/spacetime-client/src/lib.rs @@ -51,7 +51,8 @@ pub use mapper::{ PublicWorkGalleryEntryRecord, PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, - PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord, + PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput, + PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord, PuzzleClearActionRequest, PuzzleClearActionResponse, PuzzleClearActionType, PuzzleClearBoardCell, PuzzleClearBoardSnapshot, PuzzleClearCardAsset, PuzzleClearDraftResponse, PuzzleClearGenerationStatus, PuzzleClearImageAsset, PuzzleClearNextLevelRequest, diff --git a/server-rs/crates/spacetime-client/src/mapper.rs b/server-rs/crates/spacetime-client/src/mapper.rs index c1f4c069..6ba49d7d 100644 --- a/server-rs/crates/spacetime-client/src/mapper.rs +++ b/server-rs/crates/spacetime-client/src/mapper.rs @@ -101,7 +101,8 @@ pub use self::puzzle::{ PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, - PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord, + PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput, + PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord, PuzzleCreatorIntentRecord, PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, PuzzleGalleryCardRecord, PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput, @@ -199,10 +200,10 @@ pub(crate) use self::public_work::{ map_public_work_gallery_entry, map_public_work_gallery_entry_to_detail_entry, }; pub(crate) use self::puzzle::{ - map_puzzle_agent_session_procedure_result, map_puzzle_gallery_card_view_row, - map_puzzle_run_procedure_result, map_puzzle_work_procedure_result, - map_puzzle_works_procedure_result, map_runtime_profile_wallet_ledger_source_type_back, - parse_puzzle_agent_stage_record, + map_puzzle_agent_session_procedure_result, map_puzzle_background_compile_task_procedure_result, + map_puzzle_gallery_card_view_row, map_puzzle_run_procedure_result, + map_puzzle_work_procedure_result, map_puzzle_works_procedure_result, + map_runtime_profile_wallet_ledger_source_type_back, parse_puzzle_agent_stage_record, }; pub(crate) use self::puzzle_clear::{ map_puzzle_clear_agent_session_procedure_result, map_puzzle_clear_gallery_card_view_row, diff --git a/server-rs/crates/spacetime-client/src/mapper/puzzle.rs b/server-rs/crates/spacetime-client/src/mapper/puzzle.rs index ae57c440..d11564de 100644 --- a/server-rs/crates/spacetime-client/src/mapper/puzzle.rs +++ b/server-rs/crates/spacetime-client/src/mapper/puzzle.rs @@ -13,6 +13,16 @@ pub(crate) fn map_puzzle_agent_session_procedure_result( Ok(map_puzzle_agent_session_snapshot(session)) } +pub(crate) fn map_puzzle_background_compile_task_procedure_result( + result: PuzzleBackgroundCompileTaskProcedureResult, +) -> Result { + if !result.ok { + return Err(SpacetimeClientError::procedure_failed(result.error_message)); + } + + Ok(result.claimed) +} + pub(crate) fn map_puzzle_work_procedure_result( result: PuzzleWorkProcedureResult, ) -> Result { @@ -614,6 +624,23 @@ pub struct PuzzleFormDraftSaveRecordInput { pub saved_at_micros: i64, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PuzzleBackgroundCompileTaskClaimRecordInput { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, + pub claimed_at_micros: i64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PuzzleBackgroundCompileTaskReleaseRecordInput { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct PuzzleAgentMessageSubmitRecordInput { pub session_id: String, diff --git a/server-rs/crates/spacetime-client/src/module_bindings.rs b/server-rs/crates/spacetime-client/src/module_bindings.rs index cb53cae4..55677168 100644 --- a/server-rs/crates/spacetime-client/src/module_bindings.rs +++ b/server-rs/crates/spacetime-client/src/module_bindings.rs @@ -204,6 +204,7 @@ pub mod chapter_progression_table; pub mod chapter_progression_type; pub mod checkpoint_wooden_fish_run_procedure; pub mod claim_profile_task_reward_and_return_procedure; +pub mod claim_puzzle_background_compile_task_procedure; pub mod claim_puzzle_work_point_incentive_procedure; pub mod clear_database_migration_import_chunks_procedure; pub mod clear_platform_browse_history_and_return_procedure; @@ -628,6 +629,11 @@ pub mod puzzle_anchor_item_type; pub mod puzzle_anchor_pack_type; pub mod puzzle_anchor_status_type; pub mod puzzle_audio_asset_type; +pub mod puzzle_background_compile_task_claim_input_type; +pub mod puzzle_background_compile_task_procedure_result_type; +pub mod puzzle_background_compile_task_release_input_type; +pub mod puzzle_background_compile_task_row_type; +pub mod puzzle_background_compile_task_table; pub mod puzzle_board_snapshot_type; pub mod puzzle_cell_position_type; pub mod puzzle_clear_agent_session_create_input_type; @@ -766,6 +772,7 @@ pub mod redeem_profile_reward_code_procedure; pub mod refresh_session_table; pub mod refresh_session_type; pub mod refund_profile_wallet_points_and_return_procedure; +pub mod release_puzzle_background_compile_task_procedure; pub mod remix_big_fish_work_procedure; pub mod remix_custom_world_profile_procedure; pub mod remix_puzzle_work_procedure; @@ -1312,6 +1319,7 @@ pub use chapter_progression_table::*; pub use chapter_progression_type::ChapterProgression; pub use checkpoint_wooden_fish_run_procedure::checkpoint_wooden_fish_run; pub use claim_profile_task_reward_and_return_procedure::claim_profile_task_reward_and_return; +pub use claim_puzzle_background_compile_task_procedure::claim_puzzle_background_compile_task; pub use claim_puzzle_work_point_incentive_procedure::claim_puzzle_work_point_incentive; pub use clear_database_migration_import_chunks_procedure::clear_database_migration_import_chunks; pub use clear_platform_browse_history_and_return_procedure::clear_platform_browse_history_and_return; @@ -1736,6 +1744,11 @@ pub use puzzle_anchor_item_type::PuzzleAnchorItem; pub use puzzle_anchor_pack_type::PuzzleAnchorPack; pub use puzzle_anchor_status_type::PuzzleAnchorStatus; pub use puzzle_audio_asset_type::PuzzleAudioAsset; +pub use puzzle_background_compile_task_claim_input_type::PuzzleBackgroundCompileTaskClaimInput; +pub use puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult; +pub use puzzle_background_compile_task_release_input_type::PuzzleBackgroundCompileTaskReleaseInput; +pub use puzzle_background_compile_task_row_type::PuzzleBackgroundCompileTaskRow; +pub use puzzle_background_compile_task_table::*; pub use puzzle_board_snapshot_type::PuzzleBoardSnapshot; pub use puzzle_cell_position_type::PuzzleCellPosition; pub use puzzle_clear_agent_session_create_input_type::PuzzleClearAgentSessionCreateInput; @@ -1874,6 +1887,7 @@ pub use redeem_profile_reward_code_procedure::redeem_profile_reward_code; pub use refresh_session_table::*; pub use refresh_session_type::RefreshSession; pub use refund_profile_wallet_points_and_return_procedure::refund_profile_wallet_points_and_return; +pub use release_puzzle_background_compile_task_procedure::release_puzzle_background_compile_task; pub use remix_big_fish_work_procedure::remix_big_fish_work; pub use remix_custom_world_profile_procedure::remix_custom_world_profile; pub use remix_puzzle_work_procedure::remix_puzzle_work; @@ -2569,6 +2583,7 @@ pub struct DbUpdate { public_work_play_daily_stat: __sdk::TableUpdate, puzzle_agent_message: __sdk::TableUpdate, puzzle_agent_session: __sdk::TableUpdate, + puzzle_background_compile_task: __sdk::TableUpdate, puzzle_clear_agent_session: __sdk::TableUpdate, puzzle_clear_event: __sdk::TableUpdate, puzzle_clear_gallery_card_view: __sdk::TableUpdate, @@ -2854,6 +2869,11 @@ impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate { "puzzle_agent_session" => db_update.puzzle_agent_session.append( puzzle_agent_session_table::parse_table_update(table_update)?, ), + "puzzle_background_compile_task" => { + db_update.puzzle_background_compile_task.append( + puzzle_background_compile_task_table::parse_table_update(table_update)?, + ) + } "puzzle_clear_agent_session" => db_update.puzzle_clear_agent_session.append( puzzle_clear_agent_session_table::parse_table_update(table_update)?, ), @@ -3373,6 +3393,12 @@ impl __sdk::DbUpdate for DbUpdate { &self.puzzle_agent_session, ) .with_updates_by_pk(|row| &row.session_id); + diff.puzzle_background_compile_task = cache + .apply_diff_to_table::( + "puzzle_background_compile_task", + &self.puzzle_background_compile_task, + ) + .with_updates_by_pk(|row| &row.task_id); diff.puzzle_clear_agent_session = cache .apply_diff_to_table::( "puzzle_clear_agent_session", @@ -3828,6 +3854,9 @@ impl __sdk::DbUpdate for DbUpdate { "puzzle_agent_session" => db_update .puzzle_agent_session .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), + "puzzle_background_compile_task" => db_update + .puzzle_background_compile_task + .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), "puzzle_clear_agent_session" => db_update .puzzle_clear_agent_session .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), @@ -4192,6 +4221,9 @@ impl __sdk::DbUpdate for DbUpdate { "puzzle_agent_session" => db_update .puzzle_agent_session .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), + "puzzle_background_compile_task" => db_update + .puzzle_background_compile_task + .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), "puzzle_clear_agent_session" => db_update .puzzle_clear_agent_session .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), @@ -4410,6 +4442,7 @@ pub struct AppliedDiff<'r> { public_work_play_daily_stat: __sdk::TableAppliedDiff<'r, PublicWorkPlayDailyStat>, puzzle_agent_message: __sdk::TableAppliedDiff<'r, PuzzleAgentMessageRow>, puzzle_agent_session: __sdk::TableAppliedDiff<'r, PuzzleAgentSessionRow>, + puzzle_background_compile_task: __sdk::TableAppliedDiff<'r, PuzzleBackgroundCompileTaskRow>, puzzle_clear_agent_session: __sdk::TableAppliedDiff<'r, PuzzleClearAgentSessionRow>, puzzle_clear_event: __sdk::TableAppliedDiff<'r, PuzzleClearEventRow>, puzzle_clear_gallery_card_view: __sdk::TableAppliedDiff<'r, PuzzleClearGalleryCardViewRow>, @@ -4829,6 +4862,11 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> { &self.puzzle_agent_session, event, ); + callbacks.invoke_table_row_callbacks::( + "puzzle_background_compile_task", + &self.puzzle_background_compile_task, + event, + ); callbacks.invoke_table_row_callbacks::( "puzzle_clear_agent_session", &self.puzzle_clear_agent_session, @@ -5766,6 +5804,7 @@ impl __sdk::SpacetimeModule for RemoteModule { public_work_play_daily_stat_table::register_table(client_cache); puzzle_agent_message_table::register_table(client_cache); puzzle_agent_session_table::register_table(client_cache); + puzzle_background_compile_task_table::register_table(client_cache); puzzle_clear_agent_session_table::register_table(client_cache); puzzle_clear_event_table::register_table(client_cache); puzzle_clear_gallery_card_view_table::register_table(client_cache); @@ -5885,6 +5924,7 @@ impl __sdk::SpacetimeModule for RemoteModule { "public_work_play_daily_stat", "puzzle_agent_message", "puzzle_agent_session", + "puzzle_background_compile_task", "puzzle_clear_agent_session", "puzzle_clear_event", "puzzle_clear_gallery_card_view", diff --git a/server-rs/crates/spacetime-client/src/module_bindings/claim_puzzle_background_compile_task_procedure.rs b/server-rs/crates/spacetime-client/src/module_bindings/claim_puzzle_background_compile_task_procedure.rs new file mode 100644 index 00000000..45b9de6d --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/claim_puzzle_background_compile_task_procedure.rs @@ -0,0 +1,59 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +use super::puzzle_background_compile_task_claim_input_type::PuzzleBackgroundCompileTaskClaimInput; +use super::puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +struct ClaimPuzzleBackgroundCompileTaskArgs { + pub input: PuzzleBackgroundCompileTaskClaimInput, +} + +impl __sdk::InModule for ClaimPuzzleBackgroundCompileTaskArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the procedure `claim_puzzle_background_compile_task`. +/// +/// Implemented for [`super::RemoteProcedures`]. +pub trait claim_puzzle_background_compile_task { + fn claim_puzzle_background_compile_task(&self, input: PuzzleBackgroundCompileTaskClaimInput) { + self.claim_puzzle_background_compile_task_then(input, |_, _| {}); + } + + fn claim_puzzle_background_compile_task_then( + &self, + input: PuzzleBackgroundCompileTaskClaimInput, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ); +} + +impl claim_puzzle_background_compile_task for super::RemoteProcedures { + fn claim_puzzle_background_compile_task_then( + &self, + input: PuzzleBackgroundCompileTaskClaimInput, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ) { + self.imp + .invoke_procedure_with_callback::<_, PuzzleBackgroundCompileTaskProcedureResult>( + "claim_puzzle_background_compile_task", + ClaimPuzzleBackgroundCompileTaskArgs { input }, + __callback, + ); + } +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_claim_input_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_claim_input_type.rs new file mode 100644 index 00000000..f721ad91 --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_claim_input_type.rs @@ -0,0 +1,19 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct PuzzleBackgroundCompileTaskClaimInput { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, + pub claimed_at_micros: i64, +} + +impl __sdk::InModule for PuzzleBackgroundCompileTaskClaimInput { + type Module = super::RemoteModule; +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_procedure_result_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_procedure_result_type.rs new file mode 100644 index 00000000..8c85082e --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_procedure_result_type.rs @@ -0,0 +1,17 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct PuzzleBackgroundCompileTaskProcedureResult { + pub ok: bool, + pub claimed: bool, + pub error_message: Option, +} + +impl __sdk::InModule for PuzzleBackgroundCompileTaskProcedureResult { + type Module = super::RemoteModule; +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_release_input_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_release_input_type.rs new file mode 100644 index 00000000..f4d983fa --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_release_input_type.rs @@ -0,0 +1,18 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct PuzzleBackgroundCompileTaskReleaseInput { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, +} + +impl __sdk::InModule for PuzzleBackgroundCompileTaskReleaseInput { + type Module = super::RemoteModule; +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_row_type.rs b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_row_type.rs new file mode 100644 index 00000000..49e1bf07 --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_row_type.rs @@ -0,0 +1,66 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct PuzzleBackgroundCompileTaskRow { + pub task_id: String, + pub claim_id: String, + pub session_id: String, + pub owner_user_id: String, + pub created_at: __sdk::Timestamp, + pub updated_at: __sdk::Timestamp, +} + +impl __sdk::InModule for PuzzleBackgroundCompileTaskRow { + type Module = super::RemoteModule; +} + +/// Column accessor struct for the table `PuzzleBackgroundCompileTaskRow`. +/// +/// Provides typed access to columns for query building. +pub struct PuzzleBackgroundCompileTaskRowCols { + pub task_id: __sdk::__query_builder::Col, + pub claim_id: __sdk::__query_builder::Col, + pub session_id: __sdk::__query_builder::Col, + pub owner_user_id: __sdk::__query_builder::Col, + pub created_at: __sdk::__query_builder::Col, + pub updated_at: __sdk::__query_builder::Col, +} + +impl __sdk::__query_builder::HasCols for PuzzleBackgroundCompileTaskRow { + type Cols = PuzzleBackgroundCompileTaskRowCols; + fn cols(table_name: &'static str) -> Self::Cols { + PuzzleBackgroundCompileTaskRowCols { + task_id: __sdk::__query_builder::Col::new(table_name, "task_id"), + claim_id: __sdk::__query_builder::Col::new(table_name, "claim_id"), + session_id: __sdk::__query_builder::Col::new(table_name, "session_id"), + owner_user_id: __sdk::__query_builder::Col::new(table_name, "owner_user_id"), + created_at: __sdk::__query_builder::Col::new(table_name, "created_at"), + updated_at: __sdk::__query_builder::Col::new(table_name, "updated_at"), + } + } +} + +/// Indexed column accessor struct for the table `PuzzleBackgroundCompileTaskRow`. +/// +/// Provides typed access to indexed columns for query building. +pub struct PuzzleBackgroundCompileTaskRowIxCols { + pub session_id: __sdk::__query_builder::IxCol, + pub task_id: __sdk::__query_builder::IxCol, +} + +impl __sdk::__query_builder::HasIxCols for PuzzleBackgroundCompileTaskRow { + type IxCols = PuzzleBackgroundCompileTaskRowIxCols; + fn ix_cols(table_name: &'static str) -> Self::IxCols { + PuzzleBackgroundCompileTaskRowIxCols { + session_id: __sdk::__query_builder::IxCol::new(table_name, "session_id"), + task_id: __sdk::__query_builder::IxCol::new(table_name, "task_id"), + } + } +} + +impl __sdk::__query_builder::CanBeLookupTable for PuzzleBackgroundCompileTaskRow {} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_table.rs b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_table.rs new file mode 100644 index 00000000..227085b5 --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/puzzle_background_compile_task_table.rs @@ -0,0 +1,169 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use super::puzzle_background_compile_task_row_type::PuzzleBackgroundCompileTaskRow; +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +/// Table handle for the table `puzzle_background_compile_task`. +/// +/// Obtain a handle from the [`PuzzleBackgroundCompileTaskTableAccess::puzzle_background_compile_task`] method on [`super::RemoteTables`], +/// like `ctx.db.puzzle_background_compile_task()`. +/// +/// Users are encouraged not to explicitly reference this type, +/// but to directly chain method calls, +/// like `ctx.db.puzzle_background_compile_task().on_insert(...)`. +pub struct PuzzleBackgroundCompileTaskTableHandle<'ctx> { + imp: __sdk::TableHandle, + ctx: std::marker::PhantomData<&'ctx super::RemoteTables>, +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the table `puzzle_background_compile_task`. +/// +/// Implemented for [`super::RemoteTables`]. +pub trait PuzzleBackgroundCompileTaskTableAccess { + #[allow(non_snake_case)] + /// Obtain a [`PuzzleBackgroundCompileTaskTableHandle`], which mediates access to the table `puzzle_background_compile_task`. + fn puzzle_background_compile_task(&self) -> PuzzleBackgroundCompileTaskTableHandle<'_>; +} + +impl PuzzleBackgroundCompileTaskTableAccess for super::RemoteTables { + fn puzzle_background_compile_task(&self) -> PuzzleBackgroundCompileTaskTableHandle<'_> { + PuzzleBackgroundCompileTaskTableHandle { + imp: self + .imp + .get_table::("puzzle_background_compile_task"), + ctx: std::marker::PhantomData, + } + } +} + +pub struct PuzzleBackgroundCompileTaskInsertCallbackId(__sdk::CallbackId); +pub struct PuzzleBackgroundCompileTaskDeleteCallbackId(__sdk::CallbackId); + +impl<'ctx> __sdk::Table for PuzzleBackgroundCompileTaskTableHandle<'ctx> { + type Row = PuzzleBackgroundCompileTaskRow; + type EventContext = super::EventContext; + + fn count(&self) -> u64 { + self.imp.count() + } + fn iter(&self) -> impl Iterator + '_ { + self.imp.iter() + } + + type InsertCallbackId = PuzzleBackgroundCompileTaskInsertCallbackId; + + fn on_insert( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static, + ) -> PuzzleBackgroundCompileTaskInsertCallbackId { + PuzzleBackgroundCompileTaskInsertCallbackId(self.imp.on_insert(Box::new(callback))) + } + + fn remove_on_insert(&self, callback: PuzzleBackgroundCompileTaskInsertCallbackId) { + self.imp.remove_on_insert(callback.0) + } + + type DeleteCallbackId = PuzzleBackgroundCompileTaskDeleteCallbackId; + + fn on_delete( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static, + ) -> PuzzleBackgroundCompileTaskDeleteCallbackId { + PuzzleBackgroundCompileTaskDeleteCallbackId(self.imp.on_delete(Box::new(callback))) + } + + fn remove_on_delete(&self, callback: PuzzleBackgroundCompileTaskDeleteCallbackId) { + self.imp.remove_on_delete(callback.0) + } +} + +pub struct PuzzleBackgroundCompileTaskUpdateCallbackId(__sdk::CallbackId); + +impl<'ctx> __sdk::TableWithPrimaryKey for PuzzleBackgroundCompileTaskTableHandle<'ctx> { + type UpdateCallbackId = PuzzleBackgroundCompileTaskUpdateCallbackId; + + fn on_update( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static, + ) -> PuzzleBackgroundCompileTaskUpdateCallbackId { + PuzzleBackgroundCompileTaskUpdateCallbackId(self.imp.on_update(Box::new(callback))) + } + + fn remove_on_update(&self, callback: PuzzleBackgroundCompileTaskUpdateCallbackId) { + self.imp.remove_on_update(callback.0) + } +} + +/// Access to the `task_id` unique index on the table `puzzle_background_compile_task`, +/// which allows point queries on the field of the same name +/// via the [`PuzzleBackgroundCompileTaskTaskIdUnique::find`] method. +/// +/// Users are encouraged not to explicitly reference this type, +/// but to directly chain method calls, +/// like `ctx.db.puzzle_background_compile_task().task_id().find(...)`. +pub struct PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> { + imp: __sdk::UniqueConstraintHandle, + phantom: std::marker::PhantomData<&'ctx super::RemoteTables>, +} + +impl<'ctx> PuzzleBackgroundCompileTaskTableHandle<'ctx> { + /// Get a handle on the `task_id` unique index on the table `puzzle_background_compile_task`. + pub fn task_id(&self) -> PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> { + PuzzleBackgroundCompileTaskTaskIdUnique { + imp: self.imp.get_unique_constraint::("task_id"), + phantom: std::marker::PhantomData, + } + } +} + +impl<'ctx> PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> { + /// Find the subscribed row whose `task_id` column value is equal to `col_val`, + /// if such a row is present in the client cache. + pub fn find(&self, col_val: &String) -> Option { + self.imp.find(col_val) + } +} + +#[doc(hidden)] +pub(super) fn register_table(client_cache: &mut __sdk::ClientCache) { + let _table = client_cache + .get_or_make_table::("puzzle_background_compile_task"); + _table.add_unique_constraint::("task_id", |row| &row.task_id); +} + +#[doc(hidden)] +pub(super) fn parse_table_update( + raw_updates: __ws::v2::TableUpdate, +) -> __sdk::Result<__sdk::TableUpdate> { + __sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| { + __sdk::InternalError::failed_parse( + "TableUpdate", + "TableUpdate", + ) + .with_cause(e) + .into() + }) +} + +#[allow(non_camel_case_types)] +/// Extension trait for query builder access to the table `PuzzleBackgroundCompileTaskRow`. +/// +/// Implemented for [`__sdk::QueryTableAccessor`]. +pub trait puzzle_background_compile_taskQueryTableAccess { + #[allow(non_snake_case)] + /// Get a query builder for the table `PuzzleBackgroundCompileTaskRow`. + fn puzzle_background_compile_task( + &self, + ) -> __sdk::__query_builder::Table; +} + +impl puzzle_background_compile_taskQueryTableAccess for __sdk::QueryTableAccessor { + fn puzzle_background_compile_task( + &self, + ) -> __sdk::__query_builder::Table { + __sdk::__query_builder::Table::new("puzzle_background_compile_task") + } +} diff --git a/server-rs/crates/spacetime-client/src/module_bindings/release_puzzle_background_compile_task_procedure.rs b/server-rs/crates/spacetime-client/src/module_bindings/release_puzzle_background_compile_task_procedure.rs new file mode 100644 index 00000000..3b85afae --- /dev/null +++ b/server-rs/crates/spacetime-client/src/module_bindings/release_puzzle_background_compile_task_procedure.rs @@ -0,0 +1,62 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +use super::puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult; +use super::puzzle_background_compile_task_release_input_type::PuzzleBackgroundCompileTaskReleaseInput; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +struct ReleasePuzzleBackgroundCompileTaskArgs { + pub input: PuzzleBackgroundCompileTaskReleaseInput, +} + +impl __sdk::InModule for ReleasePuzzleBackgroundCompileTaskArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the procedure `release_puzzle_background_compile_task`. +/// +/// Implemented for [`super::RemoteProcedures`]. +pub trait release_puzzle_background_compile_task { + fn release_puzzle_background_compile_task( + &self, + input: PuzzleBackgroundCompileTaskReleaseInput, + ) { + self.release_puzzle_background_compile_task_then(input, |_, _| {}); + } + + fn release_puzzle_background_compile_task_then( + &self, + input: PuzzleBackgroundCompileTaskReleaseInput, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ); +} + +impl release_puzzle_background_compile_task for super::RemoteProcedures { + fn release_puzzle_background_compile_task_then( + &self, + input: PuzzleBackgroundCompileTaskReleaseInput, + + __callback: impl FnOnce( + &super::ProcedureEventContext, + Result, + ) + Send + + 'static, + ) { + self.imp + .invoke_procedure_with_callback::<_, PuzzleBackgroundCompileTaskProcedureResult>( + "release_puzzle_background_compile_task", + ReleasePuzzleBackgroundCompileTaskArgs { input }, + __callback, + ); + } +} diff --git a/server-rs/crates/spacetime-client/src/puzzle.rs b/server-rs/crates/spacetime-client/src/puzzle.rs index 25ec5ad9..8916e853 100644 --- a/server-rs/crates/spacetime-client/src/puzzle.rs +++ b/server-rs/crates/spacetime-client/src/puzzle.rs @@ -1,8 +1,10 @@ use super::*; use crate::mapper::*; +use crate::module_bindings::claim_puzzle_background_compile_task_procedure::claim_puzzle_background_compile_task; use crate::module_bindings::claim_puzzle_work_point_incentive_procedure::claim_puzzle_work_point_incentive; use crate::module_bindings::delete_puzzle_work_procedure::delete_puzzle_work; use crate::module_bindings::record_puzzle_work_like_procedure::record_puzzle_work_like; +use crate::module_bindings::release_puzzle_background_compile_task_procedure::release_puzzle_background_compile_task; use crate::module_bindings::remix_puzzle_work_procedure::remix_puzzle_work; use crate::module_bindings::save_puzzle_ui_background_procedure::save_puzzle_ui_background; @@ -194,6 +196,67 @@ impl SpacetimeClient { .await } + pub async fn claim_puzzle_background_compile_task( + &self, + input: PuzzleBackgroundCompileTaskClaimRecordInput, + ) -> Result { + let procedure_input = PuzzleBackgroundCompileTaskClaimInput { + task_id: input.task_id, + claim_id: input.claim_id, + session_id: input.session_id, + owner_user_id: input.owner_user_id, + claimed_at_micros: input.claimed_at_micros, + }; + + self.call_after_connect( + "claim_puzzle_background_compile_task", + move |connection, sender| { + connection + .procedures() + .claim_puzzle_background_compile_task_then( + procedure_input, + move |_, result| { + let mapped = result + .map_err(SpacetimeClientError::from_sdk_error) + .and_then(map_puzzle_background_compile_task_procedure_result); + send_once(&sender, mapped); + }, + ); + }, + ) + .await + } + + pub async fn release_puzzle_background_compile_task( + &self, + input: PuzzleBackgroundCompileTaskReleaseRecordInput, + ) -> Result { + let procedure_input = PuzzleBackgroundCompileTaskReleaseInput { + task_id: input.task_id, + claim_id: input.claim_id, + session_id: input.session_id, + owner_user_id: input.owner_user_id, + }; + + self.call_after_connect( + "release_puzzle_background_compile_task", + move |connection, sender| { + connection + .procedures() + .release_puzzle_background_compile_task_then( + procedure_input, + move |_, result| { + let mapped = result + .map_err(SpacetimeClientError::from_sdk_error) + .and_then(map_puzzle_background_compile_task_procedure_result); + send_once(&sender, mapped); + }, + ); + }, + ) + .await + } + pub async fn save_puzzle_generated_images( &self, input: PuzzleGeneratedImagesSaveRecordInput, diff --git a/server-rs/crates/spacetime-module/src/migration.rs b/server-rs/crates/spacetime-module/src/migration.rs index 6f249e3b..6de73df5 100644 --- a/server-rs/crates/spacetime-module/src/migration.rs +++ b/server-rs/crates/spacetime-module/src/migration.rs @@ -20,8 +20,8 @@ use crate::match3d::tables::{ match_3_d_work_profile, match3d_agent_message, match3d_agent_session, match3d_runtime_run, }; use crate::puzzle::{ - puzzle_agent_message, puzzle_agent_session, puzzle_event, puzzle_leaderboard_entry, - puzzle_runtime_run, puzzle_work_profile, + puzzle_agent_message, puzzle_agent_session, puzzle_background_compile_task, puzzle_event, + puzzle_leaderboard_entry, puzzle_runtime_run, puzzle_work_profile, }; use crate::puzzle_clear::tables::{ puzzle_clear_agent_session, puzzle_clear_event, puzzle_clear_runtime_run, @@ -229,6 +229,7 @@ macro_rules! migration_tables { asset_entity_binding, asset_event, puzzle_agent_session, + puzzle_background_compile_task, puzzle_agent_message, puzzle_work_profile, puzzle_event, diff --git a/server-rs/crates/spacetime-module/src/puzzle.rs b/server-rs/crates/spacetime-module/src/puzzle.rs index c5ed877c..2106cccb 100644 --- a/server-rs/crates/spacetime-module/src/puzzle.rs +++ b/server-rs/crates/spacetime-module/src/puzzle.rs @@ -10,14 +10,16 @@ use module_puzzle::{ PUZZLE_NEXT_LEVEL_MODE_SIMILAR_WORKS, PuzzleAgentMessageFinalizeInput, PuzzleAgentMessageKind, PuzzleAgentMessageRole, PuzzleAgentMessageSnapshot, PuzzleAgentSessionCreateInput, PuzzleAgentSessionGetInput, PuzzleAgentSessionProcedureResult, PuzzleAgentSessionSnapshot, - PuzzleAgentStage, PuzzleAnchorPack, PuzzleDraftCompileFailureInput, PuzzleDraftCompileInput, - PuzzleFormDraftSaveInput, PuzzleGeneratedImageCandidate, PuzzleGeneratedImagesSaveInput, - PuzzleLeaderboardEntry, PuzzleLeaderboardSubmitInput, PuzzlePublicationStatus, - PuzzlePublishInput, PuzzleRecommendedNextWork, PuzzleResultDraft, PuzzleRunDragInput, - PuzzleRunGetInput, PuzzleRunNextLevelInput, PuzzleRunPauseInput, PuzzleRunProcedureResult, - PuzzleRunPropInput, PuzzleRunSnapshot, PuzzleRunStartInput, PuzzleRunSwapInput, - PuzzleRuntimeLevelStatus, PuzzleSelectCoverImageInput, PuzzleUiBackgroundSaveInput, - PuzzleWorkDeleteInput, PuzzleWorkGetInput, PuzzleWorkLikeRecordInput as PuzzleWorkLikeInput, + PuzzleAgentStage, PuzzleAnchorPack, PuzzleBackgroundCompileTaskClaimInput, + PuzzleBackgroundCompileTaskProcedureResult, PuzzleBackgroundCompileTaskReleaseInput, + PuzzleDraftCompileFailureInput, PuzzleDraftCompileInput, PuzzleFormDraftSaveInput, + PuzzleGeneratedImageCandidate, PuzzleGeneratedImagesSaveInput, PuzzleLeaderboardEntry, + PuzzleLeaderboardSubmitInput, PuzzlePublicationStatus, PuzzlePublishInput, + PuzzleRecommendedNextWork, PuzzleResultDraft, PuzzleRunDragInput, PuzzleRunGetInput, + PuzzleRunNextLevelInput, PuzzleRunPauseInput, PuzzleRunProcedureResult, PuzzleRunPropInput, + PuzzleRunSnapshot, PuzzleRunStartInput, PuzzleRunSwapInput, PuzzleRuntimeLevelStatus, + PuzzleSelectCoverImageInput, PuzzleUiBackgroundSaveInput, PuzzleWorkDeleteInput, + PuzzleWorkGetInput, PuzzleWorkLikeRecordInput as PuzzleWorkLikeInput, PuzzleWorkPointIncentiveClaimInput, PuzzleWorkProcedureResult, PuzzleWorkProfile, PuzzleWorkRemixInput, PuzzleWorkUpsertInput, PuzzleWorksListInput, PuzzleWorksProcedureResult, apply_publish_overrides_to_draft, apply_selected_candidate, build_form_draft_from_seed, @@ -38,6 +40,7 @@ use spacetimedb::{ use crate::auth::user_account; const PUZZLE_POINT_INCENTIVE_DEFAULT_U64: u64 = 0; +const PUZZLE_BACKGROUND_COMPILE_TASK_LEASE_MICROS: i64 = 30 * 60 * 1_000_000; const WORK_VISIBLE_DEFAULT: bool = true; /// 拼图 Agent session 真相表。 @@ -62,6 +65,22 @@ pub struct PuzzleAgentSessionRow { updated_at: Timestamp, } +/// 拼图首图后台编译活动任务表。 +/// 中文注释:该表只保存跨 api-server 实例互斥 claim,不表达最终生成结果。 +#[spacetimedb::table( + accessor = puzzle_background_compile_task, + index(accessor = by_puzzle_background_compile_task_session_id, btree(columns = [session_id])) +)] +pub struct PuzzleBackgroundCompileTaskRow { + #[primary_key] + task_id: String, + claim_id: String, + session_id: String, + owner_user_id: String, + created_at: Timestamp, + updated_at: Timestamp, +} + /// 拼图 Agent 消息真相表。 #[spacetimedb::table( accessor = puzzle_agent_message, @@ -388,6 +407,44 @@ pub fn mark_puzzle_draft_generation_failed( } } +#[spacetimedb::procedure] +pub fn claim_puzzle_background_compile_task( + ctx: &mut ProcedureContext, + input: PuzzleBackgroundCompileTaskClaimInput, +) -> PuzzleBackgroundCompileTaskProcedureResult { + match ctx.try_with_tx(|tx| claim_puzzle_background_compile_task_tx(tx, input.clone())) { + Ok(claimed) => PuzzleBackgroundCompileTaskProcedureResult { + ok: true, + claimed, + error_message: None, + }, + Err(message) => PuzzleBackgroundCompileTaskProcedureResult { + ok: false, + claimed: false, + error_message: Some(message), + }, + } +} + +#[spacetimedb::procedure] +pub fn release_puzzle_background_compile_task( + ctx: &mut ProcedureContext, + input: PuzzleBackgroundCompileTaskReleaseInput, +) -> PuzzleBackgroundCompileTaskProcedureResult { + match ctx.try_with_tx(|tx| release_puzzle_background_compile_task_tx(tx, input.clone())) { + Ok(released) => PuzzleBackgroundCompileTaskProcedureResult { + ok: true, + claimed: released, + error_message: None, + }, + Err(message) => PuzzleBackgroundCompileTaskProcedureResult { + ok: false, + claimed: false, + error_message: Some(message), + }, + } +} + /// 保存拼图入口表单草稿。 /// 中文注释:该 procedure 只更新 session 与创作中心草稿卡,不触发图片生成或发布校验。 #[spacetimedb::procedure] @@ -1024,6 +1081,84 @@ fn compile_puzzle_agent_draft_tx( ) } +fn claim_puzzle_background_compile_task_tx( + ctx: &TxContext, + input: PuzzleBackgroundCompileTaskClaimInput, +) -> Result { + let task_id = normalize_required_puzzle_task_field(&input.task_id, "拼图后台任务 ID")?; + let claim_id = normalize_required_puzzle_task_field(&input.claim_id, "拼图后台任务 claim ID")?; + let session_id = normalize_required_puzzle_task_field(&input.session_id, "拼图 session ID")?; + let owner_user_id = normalize_required_puzzle_task_field(&input.owner_user_id, "拼图用户 ID")?; + let claimed_at = Timestamp::from_micros_since_unix_epoch(input.claimed_at_micros); + + get_owned_session_row(ctx, &session_id, &owner_user_id)?; + if let Some(existing) = ctx + .db + .puzzle_background_compile_task() + .task_id() + .find(&task_id) + { + if !is_stale_puzzle_background_compile_task(&existing, input.claimed_at_micros) { + return Ok(false); + } + ctx.db + .puzzle_background_compile_task() + .task_id() + .delete(&task_id); + } + + ctx.db + .puzzle_background_compile_task() + .insert(PuzzleBackgroundCompileTaskRow { + task_id, + claim_id, + session_id, + owner_user_id, + created_at: claimed_at, + updated_at: claimed_at, + }); + Ok(true) +} + +fn release_puzzle_background_compile_task_tx( + ctx: &TxContext, + input: PuzzleBackgroundCompileTaskReleaseInput, +) -> Result { + let task_id = normalize_required_puzzle_task_field(&input.task_id, "拼图后台任务 ID")?; + let claim_id = normalize_required_puzzle_task_field(&input.claim_id, "拼图后台任务 claim ID")?; + let session_id = normalize_required_puzzle_task_field(&input.session_id, "拼图 session ID")?; + let owner_user_id = normalize_required_puzzle_task_field(&input.owner_user_id, "拼图用户 ID")?; + + let Some(row) = ctx + .db + .puzzle_background_compile_task() + .task_id() + .find(&task_id) + else { + return Ok(false); + }; + if row.session_id != session_id || row.owner_user_id != owner_user_id { + return Err("无权释放该拼图后台任务".to_string()); + } + if row.claim_id != claim_id { + return Ok(false); + } + + ctx.db + .puzzle_background_compile_task() + .task_id() + .delete(&task_id); + Ok(true) +} + +fn is_stale_puzzle_background_compile_task( + row: &PuzzleBackgroundCompileTaskRow, + now_micros: i64, +) -> bool { + now_micros.saturating_sub(row.updated_at.to_micros_since_unix_epoch()) + >= PUZZLE_BACKGROUND_COMPILE_TASK_LEASE_MICROS +} + fn mark_puzzle_draft_generation_failed_tx( ctx: &TxContext, input: PuzzleDraftCompileFailureInput, @@ -2950,6 +3085,14 @@ fn get_owned_session_row( Ok(row) } +fn normalize_required_puzzle_task_field(value: &str, field_name: &str) -> Result { + let normalized = value.trim(); + if normalized.is_empty() { + return Err(format!("{field_name} 不能为空")); + } + Ok(normalized.to_string()) +} + fn get_owned_run_row( ctx: &TxContext, run_id: &str, diff --git a/src/services/apiClient.test.ts b/src/services/apiClient.test.ts index 9f657156..d57a6dac 100644 --- a/src/services/apiClient.test.ts +++ b/src/services/apiClient.test.ts @@ -66,6 +66,9 @@ describe('apiClient', () => { dispatchEvent: dispatchEventMock, localStorage: createLocalStorageMock(), }); + vi.stubGlobal('crypto', { + randomUUID: () => '11111111-2222-3333-4444-555555555555', + }); fetchMock.mockReset(); dispatchEventMock.mockReset(); clearStoredAccessToken({ emit: false }); @@ -121,6 +124,7 @@ describe('apiClient', () => { credentials: 'same-origin', headers: expect.objectContaining({ Authorization: 'Bearer expired-token', + 'x-request-id': 'web-11111111-2222-3333-4444-555555555555', 'x-genarrative-response-envelope': 'v1', }), }), @@ -140,6 +144,7 @@ describe('apiClient', () => { credentials: 'same-origin', headers: expect.objectContaining({ Authorization: 'Bearer fresh-token', + 'x-request-id': 'web-11111111-2222-3333-4444-555555555555', }), }), ); diff --git a/src/services/apiClient.ts b/src/services/apiClient.ts index 7d106555..bf8eacd4 100644 --- a/src/services/apiClient.ts +++ b/src/services/apiClient.ts @@ -40,6 +40,8 @@ export type ApiRequestOptions = { notifyAuthStateChange?: boolean; // 推荐页自动加载作品这类局部后台请求失败时,只应让当前卡片报错,不应清空全局登录态。 clearAuthOnUnauthorized?: boolean; + // 同一次业务请求在客户端重试时复用 request id,后端据此做计费幂等。 + requestId?: string; }; export const BACKGROUND_AUTH_REQUEST_OPTIONS = { @@ -99,6 +101,22 @@ function normalizeHeaders(headers?: HeadersInit) { return nextHeaders; } +function buildClientRequestId() { + const randomId = + typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function' + ? crypto.randomUUID() + : `${Date.now().toString(36)}-${Math.random().toString(36).slice(2)}`; + return `web-${randomId}`; +} + +function resolveRequestIdHeader(headers: Record, options: ApiRequestOptions) { + const explicitRequestId = options.requestId?.trim(); + const existingRequestId = Object.entries(headers).find( + ([key, value]) => key.toLowerCase() === REQUEST_ID_HEADER && value.trim(), + )?.[1]; + return explicitRequestId || existingRequestId || buildClientRequestId(); +} + function coerceMeta(value: unknown): Partial { if (!isRecord(value)) { return {}; @@ -582,12 +600,14 @@ export async function fetchWithApiAuth( const retry = resolveRetryOptions(method, options.retry); const authFailurePolicy = resolveAuthFailurePolicy(options); const requestSignal = init.signal ?? undefined; + const requestId = resolveRequestIdHeader(normalizeHeaders(init.headers), options); let attempt = 0; let refreshAttempted = false; for (;;) { try { let requestHeaders = withAuthorizationHeaders(init.headers, options); + requestHeaders[REQUEST_ID_HEADER] = requestId; let hasAuthHeader = Boolean( requestHeaders.Authorization?.trim() || requestHeaders.authorization?.trim(), @@ -603,6 +623,7 @@ export async function fetchWithApiAuth( // 避免把后端原始 “缺少 Bearer Token” 直接暴露给业务 UI。 await ensureStoredAccessToken(); requestHeaders = withAuthorizationHeaders(init.headers, options); + requestHeaders[REQUEST_ID_HEADER] = requestId; hasAuthHeader = Boolean( requestHeaders.Authorization?.trim() || requestHeaders.authorization?.trim(),