合并 origin/master

合入 master 的钱包退款 outbox、拼图后台编译互斥与公开链路更新

保留当前分支外部生成 worker 队列语义,并对齐拼图首图 claim 释放顺序
This commit is contained in:
2026-06-11 23:06:41 +08:00
70 changed files with 3167 additions and 538 deletions

View File

@@ -4,7 +4,11 @@ use axum::http::StatusCode;
use serde_json::json;
use spacetime_client::SpacetimeClientError;
use crate::{http_error::AppError, state::AppState};
use crate::{
http_error::AppError,
state::AppState,
wallet_refund_outbox::{WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxRecord},
};
pub(crate) const ASSET_OPERATION_POINTS_COST: u64 = 1;
@@ -104,22 +108,11 @@ async fn consume_asset_operation_points(
.await
{
Ok(_) => Ok(true),
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
// 中文注释:外部生图不应被 Maincloud 钱包短暂 503 阻断;此时跳过扣费,让业务链路继续,避免用户重复点击。
tracing::warn!(
owner_user_id,
asset_kind,
asset_id,
error = %error,
"资产操作泥点预扣因 SpacetimeDB 连接不可用而降级跳过"
);
Ok(false)
}
Err(error) => Err(map_asset_operation_wallet_error(error)),
}
}
/// 外部生成或发布 mutation 失败后补偿退款;退款失败只记日志,避免覆盖原始业务错误。
/// 外部生成或发布 mutation 失败后补偿退款;立即退款失败会进入 outbox,避免覆盖原始业务错误。
async fn refund_asset_operation_points(
state: &AppState,
owner_user_id: &str,
@@ -131,22 +124,74 @@ async fn refund_asset_operation_points(
"asset_operation_refund:{}:{}:{}",
owner_user_id, asset_kind, asset_id
);
let created_at_micros = current_utc_micros();
if let Err(error) = state
.spacetime_client()
.refund_profile_wallet_points(
owner_user_id.to_string(),
points_cost,
ledger_id,
current_utc_micros(),
ledger_id.clone(),
created_at_micros,
)
.await
{
let refund_error = error.to_string();
if let Some(outbox) = state.wallet_refund_outbox() {
match outbox
.enqueue(WalletRefundOutboxRecord {
owner_user_id: owner_user_id.to_string(),
amount: points_cost,
ledger_id: ledger_id.clone(),
created_at_micros,
asset_kind: asset_kind.to_string(),
asset_id: asset_id.to_string(),
})
.await
{
Ok(WalletRefundOutboxEnqueueOutcome::Enqueued) => {
tracing::warn!(
owner_user_id,
asset_kind,
asset_id,
ledger_id,
error = %refund_error,
"资产操作失败后的泥点退款立即执行失败,已写入 wallet refund outbox"
);
return;
}
Ok(WalletRefundOutboxEnqueueOutcome::Dropped { reason }) => {
tracing::error!(
owner_user_id,
asset_kind,
asset_id,
ledger_id,
reason,
error = %refund_error,
"资产操作失败后的泥点退款立即执行失败,且 wallet refund outbox 因容量限制丢弃"
);
return;
}
Err(outbox_error) => {
tracing::error!(
owner_user_id,
asset_kind,
asset_id,
ledger_id,
refund_error = %refund_error,
outbox_error = %outbox_error,
"资产操作失败后的泥点退款立即执行失败,且写入 wallet refund outbox 失败"
);
return;
}
}
}
tracing::error!(
owner_user_id,
asset_kind,
asset_id,
error = %error,
"资产操作失败后的泥点退款失败"
ledger_id,
error = %refund_error,
"资产操作失败后的泥点退款失败,且 wallet refund outbox 未启用"
);
}
}
@@ -199,7 +244,7 @@ mod tests {
use super::*;
#[test]
fn asset_operation_billing_skips_spacetime_connectivity_errors() {
fn asset_operation_connectivity_errors_are_classified_for_non_billing_fallbacks() {
assert_eq!(ASSET_OPERATION_POINTS_COST, 1);
assert!(should_skip_asset_operation_billing_for_connectivity(
&SpacetimeClientError::ConnectDropped

View File

@@ -37,7 +37,7 @@ use time::{Duration as TimeDuration, OffsetDateTime};
use crate::{
api_response::json_success_body,
asset_billing::execute_billable_asset_operation_with_cost,
auth::AuthenticatedAccessToken,
auth::{AuthenticatedAccessToken, RuntimePrincipal},
generated_image_assets::{
GeneratedImageAssetAdapter, GeneratedImageAssetDataUrl,
adapter::{GeneratedImageAssetAdapterMetadata, GeneratedImageAssetPersistInput},
@@ -306,11 +306,12 @@ pub async fn generate_bark_battle_image_asset(
.filter(|value| !value.is_empty())
.map(ToString::to_string);
let points_cost = resolve_bark_battle_image_asset_points_cost(&state, &payload).await;
let billing_asset_id = request_context.request_id().to_string();
let result = execute_billable_asset_operation_with_cost(
&state,
&owner_user_id,
bark_battle_slot_asset_kind(&slot),
asset_id.as_str(),
billing_asset_id.as_str(),
points_cost,
async {
generate_and_persist_bark_battle_image_asset(
@@ -506,13 +507,13 @@ pub async fn get_bark_battle_runtime_config(
State(state): State<AppState>,
Path(work_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &work_id, "workId")?;
let config = state
.spacetime_client()
.get_bark_battle_runtime_config(work_id, Some(authenticated.claims().user_id().to_string()))
.get_bark_battle_runtime_config(work_id, Some(principal.subject().to_string()))
.await
.map_err(|error| {
bark_battle_error_response(&request_context, map_bark_battle_client_error(error))
@@ -526,7 +527,7 @@ pub async fn start_bark_battle_run(
State(state): State<AppState>,
Path(work_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<BarkBattleRunStartRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let maybe_payload = payload.ok().map(|Json(payload)| payload);
@@ -543,7 +544,7 @@ pub async fn start_bark_battle_run(
};
ensure_non_empty(&request_context, &work_id, "workId")?;
let owner_user_id = authenticated.claims().user_id().to_string();
let owner_user_id = principal.subject().to_string();
let runtime_config = state
.spacetime_client()
.get_bark_battle_runtime_config(work_id.clone(), Some(owner_user_id.clone()))
@@ -593,12 +594,13 @@ pub async fn start_bark_battle_run(
record_work_play_start_after_success(
&state,
&request_context,
WorkPlayTrackingDraft::new(
WorkPlayTrackingDraft::runtime_principal(
BARK_BATTLE_PLAY_TYPE_ID,
work_id.clone(),
&authenticated,
&principal,
"/api/runtime/bark-battle/...",
)
.owner_user_id(owner_user_id.clone())
.extra(json!({
"runId": run_snapshot.run_id,
"workId": work_id,
@@ -607,6 +609,7 @@ pub async fn start_bark_battle_run(
"difficultyPreset": runtime_config.difficulty_preset,
"sourceRoute": request.source_route,
"clientRuntimeVersion": request.client_runtime_version,
"principalKind": principal.kind().as_str(),
})),
)
.await;
@@ -638,12 +641,12 @@ pub async fn get_bark_battle_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &run_id, "runId")?;
let run = state
.spacetime_client()
.get_bark_battle_run(run_id, authenticated.claims().user_id().to_string())
.get_bark_battle_run(run_id, principal.subject().to_string())
.await
.map_err(|error| {
bark_battle_error_response(&request_context, map_bark_battle_client_error(error))
@@ -657,7 +660,7 @@ pub async fn finish_bark_battle_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<BarkBattleRunFinishRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = bark_battle_json(payload, &request_context)?;
@@ -698,7 +701,7 @@ pub async fn finish_bark_battle_run(
.finish_bark_battle_run(BarkBattleRunFinishRecordInput {
run_id,
run_token: payload.run_token,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
work_id: payload.work_id.clone(),
config_version: u64::from(payload.config_version),
ruleset_version: payload.ruleset_version.clone(),

View File

@@ -63,7 +63,7 @@ use crate::{
},
api_response::json_success_body,
asset_billing::execute_billable_asset_operation,
auth::AuthenticatedAccessToken,
auth::{AuthenticatedAccessToken, RuntimePrincipal},
character_visual_assets::try_apply_background_alpha_to_png,
http_error::AppError,
platform_errors::map_oss_error,
@@ -224,7 +224,7 @@ pub async fn record_big_fish_play(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<RecordBigFishPlayRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = payload.map_err(|error| {
@@ -242,7 +242,7 @@ pub async fn record_big_fish_play(
.spacetime_client()
.record_big_fish_play(BigFishPlayReportRecordInput {
session_id: session_id.clone(),
user_id: authenticated.claims().user_id().to_string(),
user_id: principal.subject().to_string(),
elapsed_ms: payload.elapsed_ms.unwrap_or(0),
reported_at_micros: current_utc_micros(),
})
@@ -254,13 +254,14 @@ pub async fn record_big_fish_play(
record_work_play_start_after_success(
&state,
&request_context,
WorkPlayTrackingDraft::new(
WorkPlayTrackingDraft::runtime_principal(
"big-fish",
session_id.clone(),
&authenticated,
&principal,
"/api/runtime/big-fish/sessions/{session_id}/play",
)
.run_id(session_id.clone()),
.run_id(session_id.clone())
.owner_user_id(principal.subject().to_string()),
)
.await;
@@ -279,7 +280,7 @@ pub async fn start_big_fish_run(
State(state): State<AppState>,
Path(session_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &session_id, "sessionId")?;
@@ -288,7 +289,7 @@ pub async fn start_big_fish_run(
.start_big_fish_run(BigFishRunStartRecordInput {
run_id: build_prefixed_uuid_id("big-fish-run-"),
session_id,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
started_at_micros: current_utc_micros(),
})
.await
@@ -339,13 +340,13 @@ pub async fn get_big_fish_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, &run_id, "runId")?;
let run = state
.spacetime_client()
.get_big_fish_run(run_id, authenticated.claims().user_id().to_string())
.get_big_fish_run(run_id, principal.subject().to_string())
.await
.map_err(|error| {
big_fish_error_response(&request_context, map_big_fish_client_error(error))
@@ -363,7 +364,7 @@ pub async fn submit_big_fish_input(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<SubmitBigFishInputRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = payload.map_err(|error| {
@@ -384,7 +385,7 @@ pub async fn submit_big_fish_input(
.spacetime_client()
.submit_big_fish_input(BigFishInputSubmitRecordInput {
run_id,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
x: payload.x,
y: payload.y,
submitted_at_micros: current_utc_micros(),
@@ -721,7 +722,7 @@ pub async fn execute_big_fish_action(
"big_fish_publish_game" => Some("big_fish_publish_game"),
_ => None,
};
let billing_asset_id = format!("{session_id}:{now}");
let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id());
let session_operation = async {
match action.as_str() {
"big_fish_compile_draft" => {

View File

@@ -38,6 +38,11 @@ pub struct AppConfig {
pub tracking_outbox_batch_size: usize,
pub tracking_outbox_flush_interval: Duration,
pub tracking_outbox_max_bytes: u64,
pub wallet_refund_outbox_enabled: bool,
pub wallet_refund_outbox_dir: PathBuf,
pub wallet_refund_outbox_batch_size: usize,
pub wallet_refund_outbox_flush_interval: Duration,
pub wallet_refund_outbox_max_bytes: u64,
pub log_filter: String,
pub otel_enabled: bool,
pub admin_username: Option<String>,
@@ -239,6 +244,11 @@ impl Default for AppConfig {
tracking_outbox_batch_size: 500,
tracking_outbox_flush_interval: Duration::from_millis(1_000),
tracking_outbox_max_bytes: 256 * 1024 * 1024,
wallet_refund_outbox_enabled: true,
wallet_refund_outbox_dir: PathBuf::from("server-rs/.data/wallet-refund-outbox"),
wallet_refund_outbox_batch_size: 100,
wallet_refund_outbox_flush_interval: Duration::from_millis(1_000),
wallet_refund_outbox_max_bytes: 64 * 1024 * 1024,
log_filter: "info,tower_http=info".to_string(),
otel_enabled: false,
admin_username: None,
@@ -494,6 +504,27 @@ impl AppConfig {
{
config.tracking_outbox_max_bytes = max_bytes;
}
if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED"]) {
config.wallet_refund_outbox_enabled = enabled;
}
if let Some(dir) = read_first_non_empty_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_DIR"]) {
config.wallet_refund_outbox_dir = PathBuf::from(dir);
}
if let Some(batch_size) =
read_first_usize_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE"])
{
config.wallet_refund_outbox_batch_size = batch_size;
}
if let Some(flush_interval_ms) =
read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS"])
{
config.wallet_refund_outbox_flush_interval = Duration::from_millis(flush_interval_ms);
}
if let Some(max_bytes) =
read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES"])
{
config.wallet_refund_outbox_max_bytes = max_bytes;
}
if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) {
config.otel_enabled = otel_enabled;
}
@@ -1593,6 +1624,11 @@ mod tests {
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048");
std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6");
@@ -1609,6 +1645,14 @@ mod tests {
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE", "250");
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS", "2000");
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES", "1048576");
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED", "false");
std::env::set_var(
"GENARRATIVE_WALLET_REFUND_OUTBOX_DIR",
"/tmp/genarrative-wallet-refund-outbox",
);
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE", "50");
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS", "3000");
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES", "524288");
std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true");
}
@@ -1634,6 +1678,17 @@ mod tests {
std::time::Duration::from_millis(2_000)
);
assert_eq!(config.tracking_outbox_max_bytes, 1_048_576);
assert!(!config.wallet_refund_outbox_enabled);
assert_eq!(
config.wallet_refund_outbox_dir,
std::path::PathBuf::from("/tmp/genarrative-wallet-refund-outbox")
);
assert_eq!(config.wallet_refund_outbox_batch_size, 50);
assert_eq!(
config.wallet_refund_outbox_flush_interval,
std::time::Duration::from_millis(3_000)
);
assert_eq!(config.wallet_refund_outbox_max_bytes, 524_288);
assert!(config.otel_enabled);
unsafe {
@@ -1649,6 +1704,11 @@ mod tests {
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
}
}

View File

@@ -547,11 +547,12 @@ pub async fn generate_custom_world_scene_image(
require_openai_image_settings(&state)
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
let asset_id = format!("custom-scene-{}", current_utc_millis());
let billing_asset_id = request_context.request_id().to_string();
let asset = execute_billable_asset_operation(
&state,
&owner_user_id,
"scene_image",
asset_id.as_str(),
billing_asset_id.as_str(),
async {
let settings = require_openai_image_settings(&state)?.with_external_api_audit_context(
&request_context,
@@ -806,11 +807,12 @@ pub async fn generate_custom_world_cover_image(
require_dashscope_settings(&state)
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
let asset_id = format!("custom-cover-{}", current_utc_millis());
let billing_asset_id = request_context.request_id().to_string();
let asset = execute_billable_asset_operation(
&state,
&owner_user_id,
"custom_world_cover",
asset_id.as_str(),
billing_asset_id.as_str(),
async {
let settings = require_dashscope_settings(&state)?;
let http_client = build_dashscope_http_client(&settings)?;
@@ -1011,11 +1013,12 @@ pub async fn generate_custom_world_opening_cg(
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
let opening_cg_id = normalized.opening_cg_id.clone();
let billing_asset_id = request_context.request_id().to_string();
let generated = execute_billable_asset_operation_with_cost(
&state,
&owner_user_id,
"custom_world_opening_cg",
opening_cg_id.as_str(),
billing_asset_id.as_str(),
OPENING_CG_POINTS_COST,
async {
let image_settings = require_openai_image_settings(&state)?

View File

@@ -19,7 +19,7 @@ use crate::{
ExternalGenerationWriteLeaseGuard, PuzzleCompileDraftWorkerPayload,
PuzzleGenerateImagesWorkerPayload, PuzzleGenerateUiBackgroundWorkerPayload,
execute_puzzle_compile_draft_worker_job, execute_puzzle_generate_images_worker_job,
execute_puzzle_generate_ui_background_worker_job,
execute_puzzle_generate_ui_background_worker_job, release_puzzle_compile_background_claim,
},
request_context::RequestContext,
state::{AppState, PuzzleApiState},
@@ -256,13 +256,13 @@ async fn process_external_generation_job_once(
match execute_puzzle_compile_draft_worker_job(
&puzzle_state,
&request_context,
payload,
payload.clone(),
write_guard,
)
.await
{
Ok(session) => {
complete_job(
let result = complete_job(
&state,
&worker_id,
&job,
@@ -274,12 +274,23 @@ async fn process_external_generation_job_once(
.to_string(),
),
)
.await
.await;
if result.is_ok() {
release_puzzle_compile_background_claim(&puzzle_state, &payload);
}
result
}
Err(error) => {
let message = error.body_text();
fail_queue_job_after_worker_error(&state, &worker_id, &job, &error, &message)
.await?;
let should_release_claim = error.should_fail_queue_job();
let result = fail_queue_job_after_worker_error(
&state, &worker_id, &job, &error, &message,
)
.await;
if result.is_ok() && should_release_claim {
release_puzzle_compile_background_claim(&puzzle_state, &payload);
}
result?;
Err(message)
}
}

View File

@@ -90,6 +90,7 @@ mod tracking_outbox;
mod vector_engine_audio_generation;
mod visual_novel;
mod volcengine_speech;
mod wallet_refund_outbox;
mod wechat;
mod wooden_fish;
mod work_author;
@@ -117,6 +118,7 @@ use crate::{
external_generation_worker::run_external_generation_worker,
state::{AppState, AppStateInitError},
tracking_outbox::TrackingOutbox,
wallet_refund_outbox::WalletRefundOutbox,
};
const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024;
@@ -127,6 +129,7 @@ const AUTH_STORE_STARTUP_RETRY_INTERVAL: Duration = Duration::from_secs(5);
struct ShutdownContext {
app_state: Option<AppState>,
tracking_outbox: Option<Arc<TrackingOutbox>>,
wallet_refund_outbox: Option<Arc<WalletRefundOutbox>>,
outbox_flush_timeout: Duration,
}
@@ -204,6 +207,7 @@ async fn run_http_role(config: AppConfig) -> Result<(), io::Error> {
Ok(state) => {
spawn_app_state_background_workers(&state);
let tracking_outbox = state.tracking_outbox();
let wallet_refund_outbox = state.wallet_refund_outbox();
let worker_state = process_role
.runs_external_generation_worker()
.then(|| state.clone());
@@ -212,6 +216,7 @@ async fn run_http_role(config: AppConfig) -> Result<(), io::Error> {
ShutdownContext {
app_state: Some(state),
tracking_outbox,
wallet_refund_outbox,
outbox_flush_timeout,
},
worker_state,
@@ -222,6 +227,7 @@ async fn run_http_role(config: AppConfig) -> Result<(), io::Error> {
ShutdownContext {
app_state: None,
tracking_outbox: None,
wallet_refund_outbox: None,
outbox_flush_timeout,
},
None,
@@ -310,12 +316,8 @@ async fn finalize_shutdown(context: ShutdownContext) {
state.mark_not_ready();
}
let Some(outbox) = context.tracking_outbox else {
return;
};
if context.outbox_flush_timeout.is_zero() {
warn!("api-server 退出时 tracking outbox flush timeout 为 0跳过主动 flush");
warn!("api-server 退出时 outbox flush timeout 为 0跳过主动 flush");
return;
}
@@ -323,22 +325,45 @@ async fn finalize_shutdown(context: ShutdownContext) {
.outbox_flush_timeout
.as_millis()
.min(u128::from(u64::MAX)) as u64;
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 tracking outbox flush 完成");
if let Some(outbox) = context.tracking_outbox {
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 tracking outbox flush 完成");
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
if let Some(outbox) = context.wallet_refund_outbox {
info!(timeout_ms, "api-server 退出前 flush wallet refund outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 wallet refund outbox flush 完成");
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 wallet refund outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 wallet refund outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
}
}
}
@@ -348,6 +373,9 @@ fn spawn_app_state_background_workers(state: &AppState) {
if let Some(outbox) = state.tracking_outbox() {
outbox.spawn_worker();
}
if let Some(outbox) = state.wallet_refund_outbox() {
outbox.spawn_worker();
}
}
fn build_tcp_listener(

View File

@@ -1,4 +1,4 @@
use std::{
use std::{
collections::BTreeMap,
convert::Infallible,
future::Future,
@@ -65,11 +65,8 @@ use spacetime_client::{
use crate::{
api_response::json_success_body,
asset_billing::{
execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error,
should_skip_asset_operation_billing_for_connectivity,
},
auth::AuthenticatedAccessToken,
asset_billing::{execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error},
auth::{AuthenticatedAccessToken, RuntimePrincipal},
config::AppConfig,
generated_asset_sheets::apply_generated_asset_sheet_green_screen_alpha,
http_error::AppError,
@@ -354,13 +351,6 @@ impl Match3DItemAssetsGenerationPlan {
Self::Replace(plan) => plan.requested_item_names.len(),
}
}
fn billing_fingerprint_source(&self) -> String {
match self {
Self::Append(plan) => format!("append:{}", plan.requested_item_names.join("|")),
Self::Replace(plan) => format!("replace:{}", plan.requested_item_names.join("|")),
}
}
}
fn serialize_match3d_generated_item_assets(assets: &[Match3DGeneratedItemAsset]) -> Option<String> {

View File

@@ -162,7 +162,12 @@ pub(super) async fn compile_match3d_draft_for_session(
let initial_tags = requested_tags
.clone()
.unwrap_or_else(|| fallback_work_metadata.tags.clone());
let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, current_utc_micros());
let billing_asset_id = format!(
"{}:{}:{}",
session_id,
profile_id,
request_context.request_id()
);
let points_cost = crate::creation_entry_config::resolve_creation_entry_mud_point_cost(
state,
"match3d",
@@ -514,15 +519,6 @@ async fn consume_match3d_draft_generation_points(
.await
{
Ok(_) => Ok(true),
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
tracing::warn!(
owner_user_id,
billing_asset_id,
error = %error,
"抓大鹅草稿泥点预扣因 SpacetimeDB 连接不可用而降级跳过"
);
Ok(false)
}
Err(error) => Err(match3d_error_response(
request_context,
MATCH3D_AGENT_PROVIDER,

View File

@@ -751,7 +751,6 @@ pub async fn generate_match3d_background_image_for_work(
)?;
let prompt = normalize_match3d_background_prompt(payload.prompt.as_str());
ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?;
let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str());
let context =
load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id)
@@ -763,7 +762,12 @@ pub async fn generate_match3d_background_image_for_work(
config,
assets,
} = context;
let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, prompt_fingerprint);
let billing_asset_id = format!(
"{}:{}:{}",
session_id,
profile_id,
request_context.request_id()
);
let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost(
&state,
owner_user_id.as_str(),
@@ -860,7 +864,6 @@ pub async fn generate_match3d_container_image_for_work(
)?;
let prompt = normalize_match3d_background_prompt(payload.prompt.as_str());
ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?;
let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str());
let context =
load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id)
@@ -874,7 +877,9 @@ pub async fn generate_match3d_container_image_for_work(
} = context;
let billing_asset_id = format!(
"{}:{}:{}:container",
session_id, profile_id, prompt_fingerprint
session_id,
profile_id,
request_context.request_id()
);
let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost(
&state,
@@ -1017,7 +1022,7 @@ pub async fn generate_match3d_item_assets_for_work(
session_id,
profile_id,
billed_item_count,
build_match3d_prompt_fingerprint(generation_plan.billing_fingerprint_source().as_str())
request_context.request_id()
);
let generated_assets = execute_billable_asset_operation_with_cost(
&state,
@@ -1171,7 +1176,7 @@ pub async fn start_match3d_run(
State(state): State<AppState>,
Path(profile_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<StartMatch3DRunRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let maybe_payload = payload.ok().map(|Json(payload)| payload);
@@ -1191,7 +1196,7 @@ pub async fn start_match3d_run(
.spacetime_client()
.start_match3d_run(Match3DRunStartRecordInput {
run_id: build_prefixed_uuid_id(MATCH3D_RUN_ID_PREFIX),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
profile_id: profile_id.clone(),
started_at_ms: current_utc_ms(),
item_type_count_override: maybe_payload
@@ -1211,15 +1216,17 @@ pub async fn start_match3d_run(
record_work_play_start_after_success(
&state,
&request_context,
WorkPlayTrackingDraft::new(
WorkPlayTrackingDraft::runtime_principal(
"match3d",
profile_id.clone(),
&authenticated,
&principal,
"/api/runtime/match3d/...",
)
.profile_id(profile_id.clone())
.owner_user_id(principal.subject().to_string())
.extra(json!({
"runId": run.run_id,
"principalKind": principal.kind().as_str(),
})),
)
.await;
@@ -1236,13 +1243,13 @@ pub async fn get_match3d_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, MATCH3D_RUNTIME_PROVIDER, &run_id, "runId")?;
let run = state
.spacetime_client()
.get_match3d_run(run_id, authenticated.claims().user_id().to_string())
.get_match3d_run(run_id, principal.subject().to_string())
.await
.map_err(|error| {
match3d_error_response(
@@ -1264,7 +1271,7 @@ pub async fn click_match3d_item(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<ClickMatch3DItemRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = match3d_json(payload, &request_context, MATCH3D_RUNTIME_PROVIDER)?;
@@ -1286,7 +1293,7 @@ pub async fn click_match3d_item(
.spacetime_client()
.click_match3d_item(Match3DRunClickRecordInput {
run_id: payload.run_id.unwrap_or(run_id),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
item_instance_id: payload.item_instance_id,
client_snapshot_version: payload.client_snapshot_version.min(u32::MAX as u64) as u32,
client_event_id: payload.client_event_id,
@@ -1313,7 +1320,7 @@ pub async fn stop_match3d_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<StopMatch3DRunRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let _ = payload.ok();
@@ -1323,7 +1330,7 @@ pub async fn stop_match3d_run(
.spacetime_client()
.stop_match3d_run(Match3DRunStopRecordInput {
run_id,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
stopped_at_ms: current_utc_ms(),
})
.await
@@ -1347,7 +1354,7 @@ pub async fn restart_match3d_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, MATCH3D_RUNTIME_PROVIDER, &run_id, "runId")?;
@@ -1356,7 +1363,7 @@ pub async fn restart_match3d_run(
.restart_match3d_run(Match3DRunRestartRecordInput {
source_run_id: run_id,
next_run_id: build_prefixed_uuid_id(MATCH3D_RUN_ID_PREFIX),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
restarted_at_ms: current_utc_ms(),
})
.await
@@ -1380,7 +1387,7 @@ pub async fn finish_match3d_time_up(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&request_context, MATCH3D_RUNTIME_PROVIDER, &run_id, "runId")?;
@@ -1388,7 +1395,7 @@ pub async fn finish_match3d_time_up(
.spacetime_client()
.finish_match3d_time_up(Match3DRunTimeUpRecordInput {
run_id,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
finished_at_ms: current_utc_ms(),
})
.await

View File

@@ -1208,14 +1208,6 @@ pub(super) fn normalize_match3d_background_prompt(raw: &str) -> String {
.to_string()
}
pub(super) fn build_match3d_prompt_fingerprint(value: &str) -> String {
let mut hash = 0u32;
for character in value.chars() {
hash = hash.wrapping_mul(31).wrapping_add(character as u32);
}
format!("{hash:08x}")
}
pub(super) fn build_fallback_match3d_background_prompt(config: &Match3DConfigJson) -> String {
let theme = config.theme_text.trim();
let normalized_theme = if theme.is_empty() { "抓大鹅" } else { theme };

View File

@@ -4,7 +4,7 @@ use axum::{
};
use crate::{
auth::require_bearer_auth,
auth::{require_bearer_auth, require_runtime_principal_auth},
bark_battle::{
create_bark_battle_draft, delete_bark_battle_work, finish_bark_battle_run,
generate_bark_battle_image_asset, get_bark_battle_run, get_bark_battle_runtime_config,
@@ -66,26 +66,28 @@ pub fn router(state: AppState) -> Router<AppState> {
"/api/runtime/bark-battle/works/{work_id}/config",
get(get_bark_battle_runtime_config).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/bark-battle/works/{work_id}/runs",
post(start_bark_battle_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/bark-battle/runs/{run_id}",
get(get_bark_battle_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/bark-battle/runs/{run_id}/finish",
post(finish_bark_battle_run)
.route_layer(middleware::from_fn_with_state(state, require_bearer_auth)),
post(finish_bark_battle_run).route_layer(middleware::from_fn_with_state(
state,
require_runtime_principal_auth,
)),
)
}

View File

@@ -4,7 +4,7 @@ use axum::{
};
use crate::{
auth::require_bearer_auth,
auth::{require_bearer_auth, require_runtime_principal_auth},
big_fish::{
create_big_fish_session, delete_big_fish_work, execute_big_fish_action, get_big_fish_run,
get_big_fish_session, get_big_fish_works, list_big_fish_gallery,
@@ -85,35 +85,35 @@ pub fn router(state: AppState) -> Router<AppState> {
"/api/runtime/big-fish/sessions/{session_id}/play",
post(record_big_fish_play).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/big-fish/works/{session_id}/play",
post(record_big_fish_play).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/big-fish/sessions/{session_id}/runs",
post(start_big_fish_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/big-fish/runs/{run_id}",
get(get_big_fish_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/big-fish/runs/{run_id}/input",
post(submit_big_fish_input).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
}

View File

@@ -4,7 +4,7 @@ use axum::{
};
use crate::{
auth::require_bearer_auth,
auth::{require_bearer_auth, require_runtime_principal_auth},
match3d::{
click_match3d_item, compile_match3d_agent_draft, create_match3d_agent_session,
delete_match3d_work, execute_match3d_agent_action, finish_match3d_time_up,
@@ -139,42 +139,42 @@ pub fn router(state: AppState) -> Router<AppState> {
"/api/runtime/match3d/works/{profile_id}/runs",
post(start_match3d_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/match3d/runs/{run_id}",
get(get_match3d_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/match3d/runs/{run_id}/click",
post(click_match3d_item).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/match3d/runs/{run_id}/stop",
post(stop_match3d_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/match3d/runs/{run_id}/restart",
post(restart_match3d_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/match3d/runs/{run_id}/time-up",
post(finish_match3d_time_up).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
}

View File

@@ -4,7 +4,7 @@ use axum::{
};
use crate::{
auth::require_bearer_auth,
auth::{require_bearer_auth, require_runtime_principal_auth},
square_hole::{
compile_square_hole_agent_draft, create_square_hole_agent_session, delete_square_hole_work,
drop_square_hole_shape, execute_square_hole_agent_action, finish_square_hole_time_up,
@@ -101,42 +101,42 @@ pub fn router(state: AppState) -> Router<AppState> {
"/api/runtime/square-hole/works/{profile_id}/runs",
post(start_square_hole_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/square-hole/runs/{run_id}",
get(get_square_hole_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/square-hole/runs/{run_id}/drop",
post(drop_square_hole_shape).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/square-hole/runs/{run_id}/stop",
post(stop_square_hole_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/square-hole/runs/{run_id}/restart",
post(restart_square_hole_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/square-hole/runs/{run_id}/time-up",
post(finish_square_hole_time_up).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
}

View File

@@ -4,7 +4,7 @@ use axum::{
};
use crate::{
auth::require_bearer_auth,
auth::{require_bearer_auth, require_runtime_principal_auth},
state::AppState,
vector_engine_audio_generation::{
create_background_music_task, create_sound_effect_task,
@@ -151,33 +151,35 @@ pub fn router(state: AppState) -> Router<AppState> {
"/api/runtime/visual-novel/works/{profile_id}/runs",
post(start_visual_novel_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/visual-novel/runs/{run_id}",
get(get_visual_novel_run).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/visual-novel/runs/{run_id}/actions/stream",
post(stream_visual_novel_action).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/visual-novel/runs/{run_id}/history",
get(list_visual_novel_history).route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
require_runtime_principal_auth,
)),
)
.route(
"/api/runtime/visual-novel/runs/{run_id}/regenerate",
post(regenerate_visual_novel_run)
.route_layer(middleware::from_fn_with_state(state, require_bearer_auth)),
post(regenerate_visual_novel_run).route_layer(middleware::from_fn_with_state(
state,
require_runtime_principal_auth,
)),
)
}

View File

@@ -55,7 +55,8 @@ use spacetime_client::{
ExternalGenerationJobEnqueueRecordInput, PuzzleAgentMessageRecord,
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleCreatorIntentRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleCreatorIntentRecord,
PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord,
PuzzleFormDraftSaveRecordInput, PuzzleGeneratedImageCandidateRecord,
PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord,
@@ -138,6 +139,75 @@ const PUZZLE_UI_BACKGROUND_PROMPT_FALLBACK_MARKER: &str =
const PUZZLE_VECTOR_ENGINE_SQUARE_IMAGE_SIZE: &str = "1024x1024";
const PUZZLE_VECTOR_ENGINE_PORTRAIT_IMAGE_SIZE: &str = "1024x1536";
fn build_puzzle_background_compile_task_id(session_id: &str) -> String {
format!("puzzle_initial_background:{session_id}")
}
fn build_puzzle_background_compile_claim_id(task_id: &str, request_id: &str) -> String {
format!("{task_id}:{request_id}")
}
async fn release_claimed_puzzle_background_compile_task(
state: &PuzzleApiState,
task_id: &str,
claim_id: &str,
session_id: &str,
owner_user_id: &str,
) {
let result = state
.spacetime_client()
.release_puzzle_background_compile_task(PuzzleBackgroundCompileTaskReleaseRecordInput {
task_id: task_id.to_string(),
claim_id: claim_id.to_string(),
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
})
.await;
match result {
Ok(true) => {}
Ok(false) => {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
task_id,
claim_id,
session_id,
owner_user_id,
"拼图首图后台生成任务释放未命中当前 claim"
);
}
Err(error) => {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
task_id,
claim_id,
session_id,
owner_user_id,
error = %error,
"拼图首图后台生成任务释放失败"
);
}
}
}
pub(crate) fn spawn_release_claimed_puzzle_background_compile_task(
state: PuzzleApiState,
task_id: String,
claim_id: String,
session_id: String,
owner_user_id: String,
) {
tokio::spawn(async move {
release_claimed_puzzle_background_compile_task(
&state,
&task_id,
&claim_id,
&session_id,
&owner_user_id,
)
.await;
});
}
fn has_puzzle_cover_image_src(value: &Option<String>) -> bool {
value
.as_deref()

View File

@@ -157,6 +157,10 @@ pub(crate) struct PuzzleCompileDraftWorkerPayload {
#[serde(default)]
pub image_model: Option<String>,
pub requested_at_micros: i64,
#[serde(default)]
pub background_task_id: Option<String>,
#[serde(default)]
pub background_claim_id: Option<String>,
}
pub(crate) async fn execute_puzzle_compile_draft_worker_job(
@@ -228,6 +232,11 @@ pub(crate) async fn execute_puzzle_compile_draft_worker_job(
)
.await;
}
release_inline_puzzle_compile_background_claim(
state,
&payload,
&external_generation_guard,
);
Ok(session)
}
Err(error) => {
@@ -255,6 +264,11 @@ pub(crate) async fn execute_puzzle_compile_draft_worker_job(
},
)
.await;
release_inline_puzzle_compile_background_claim(
state,
&payload,
&external_generation_guard,
);
Err(PuzzleExternalGenerationWorkerError::with_failure_state_written(error))
}
Err(mark_error) => {
@@ -265,6 +279,37 @@ pub(crate) async fn execute_puzzle_compile_draft_worker_job(
}
}
fn release_inline_puzzle_compile_background_claim(
state: &PuzzleApiState,
payload: &PuzzleCompileDraftWorkerPayload,
external_generation_guard: &ExternalGenerationWriteLeaseGuard,
) {
if external_generation_guard.job_id.is_some() {
return;
}
release_puzzle_compile_background_claim(state, payload);
}
pub(crate) fn release_puzzle_compile_background_claim(
state: &PuzzleApiState,
payload: &PuzzleCompileDraftWorkerPayload,
) {
let (Some(task_id), Some(claim_id)) = (
payload.background_task_id.as_ref(),
payload.background_claim_id.as_ref(),
) else {
return;
};
spawn_release_claimed_puzzle_background_compile_task(
state.clone(),
task_id.clone(),
claim_id.clone(),
payload.session_id.clone(),
payload.owner_user_id.clone(),
);
}
pub(crate) async fn mark_puzzle_compile_failure_for_worker(
state: &PuzzleApiState,
session_id: &str,

View File

@@ -588,7 +588,7 @@ pub async fn execute_puzzle_agent_action(
let owner_user_id = authenticated.claims().user_id().to_string();
let now = current_utc_micros();
let action = payload.action.trim().to_string();
let billing_asset_id = format!("{session_id}:{now}");
let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id());
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %session_id,
@@ -658,6 +658,79 @@ pub async fn execute_puzzle_agent_action(
reference_image_src: primary_reference_image_src.map(ToOwned::to_owned),
image_model: payload.image_model.clone(),
requested_at_micros: now,
background_task_id: None,
background_claim_id: None,
};
let worker_payload = if ai_redraw {
let background_task_id =
build_puzzle_background_compile_task_id(&compile_session_id);
let background_claim_id = build_puzzle_background_compile_claim_id(
&background_task_id,
request_context.request_id(),
);
let claim_result = state
.spacetime_client()
.claim_puzzle_background_compile_task(
PuzzleBackgroundCompileTaskClaimRecordInput {
task_id: background_task_id.clone(),
claim_id: background_claim_id.clone(),
session_id: compile_session_id.clone(),
owner_user_id: owner_user_id.clone(),
claimed_at_micros: current_utc_micros(),
},
)
.await
.map_err(|error| {
puzzle_error_response(
&request_context,
PUZZLE_AGENT_API_BASE_PROVIDER,
map_puzzle_client_error(error),
)
})?;
if !claim_result {
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %compile_session_id,
owner_user_id = %owner_user_id,
task_id = %background_task_id,
"拼图首图后台生成任务已存在,本次 action 直接返回生成中会话"
);
let session = state
.spacetime_client()
.get_puzzle_agent_session(compile_session_id.clone(), owner_user_id.clone())
.await
.map(mark_puzzle_initial_generation_started_snapshot)
.map_err(|error| {
puzzle_error_response(
&request_context,
PUZZLE_AGENT_API_BASE_PROVIDER,
map_puzzle_client_error(error),
)
})?;
return Ok(json_success_body(
Some(&request_context),
PuzzleAgentActionResponse {
operation: PuzzleAgentOperationResponse {
operation_id: background_task_id,
operation_type: "compile_puzzle_draft".to_string(),
status: "running".to_string(),
phase_label: "首关拼图草稿".to_string(),
phase_detail: "首关草稿生成已在后台处理中。".to_string(),
progress: session.progress_percent.max(10),
error: None,
},
session: map_puzzle_agent_session_response(session),
},
));
}
PuzzleCompileDraftWorkerPayload {
background_task_id: Some(background_task_id),
background_claim_id: Some(background_claim_id),
..worker_payload
}
} else {
worker_payload
};
if state
.root_state()
@@ -675,7 +748,7 @@ pub async fn execute_puzzle_agent_action(
let session = execute_puzzle_compile_draft_worker_job(
&state,
&request_context,
worker_payload,
worker_payload.clone(),
ExternalGenerationWriteLeaseGuard::inline(),
)
.await
@@ -707,6 +780,18 @@ pub async fn execute_puzzle_agent_action(
));
}
let request_payload_json = serde_json::to_string(&worker_payload).map_err(|error| {
if let (Some(task_id), Some(claim_id)) = (
worker_payload.background_task_id.as_deref(),
worker_payload.background_claim_id.as_deref(),
) {
spawn_release_claimed_puzzle_background_compile_task(
state.clone(),
task_id.to_string(),
claim_id.to_string(),
compile_session_id.clone(),
owner_user_id.clone(),
);
}
puzzle_error_response(
&request_context,
PUZZLE_AGENT_API_BASE_PROVIDER,
@@ -736,6 +821,18 @@ pub async fn execute_puzzle_agent_action(
})
.await
.map_err(|error| {
if let (Some(task_id), Some(claim_id)) = (
worker_payload.background_task_id.as_deref(),
worker_payload.background_claim_id.as_deref(),
) {
spawn_release_claimed_puzzle_background_compile_task(
state.clone(),
task_id.to_string(),
claim_id.to_string(),
compile_session_id.clone(),
owner_user_id.clone(),
);
}
puzzle_error_response(
&request_context,
PUZZLE_AGENT_API_BASE_PROVIDER,
@@ -2034,7 +2131,7 @@ pub async fn use_puzzle_runtime_prop(
}
};
let should_sync_freeze_boundary = matches!(prop_kind.as_str(), "freezeTime" | "freeze_time");
let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, current_utc_micros());
let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, request_context.request_id());
let reducer_owner_user_id = owner_user_id.clone();
let reducer_run_id = run_id.clone();
let fallback_run_id = run_id.clone();

View File

@@ -69,7 +69,7 @@ use crate::generated_image_assets::{
use crate::{
ai_generation_drafts::{AiGenerationDraftContext, AiGenerationDraftWriter},
api_response::json_success_body,
auth::AuthenticatedAccessToken,
auth::{AuthenticatedAccessToken, RuntimePrincipal},
http_error::AppError,
openai_image_generation::{
DownloadedOpenAiImage, build_openai_image_http_client, create_openai_image_generation,
@@ -739,7 +739,7 @@ pub async fn start_square_hole_run(
State(state): State<AppState>,
Path(profile_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<StartSquareHoleRunRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let maybe_payload = payload.ok().map(|Json(payload)| payload);
@@ -758,7 +758,7 @@ pub async fn start_square_hole_run(
.spacetime_client()
.start_square_hole_run(SquareHoleRunStartRecordInput {
run_id: build_prefixed_uuid_id(SQUARE_HOLE_RUN_ID_PREFIX),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
profile_id: profile_id.clone(),
started_at_ms: current_utc_ms(),
})
@@ -774,15 +774,17 @@ pub async fn start_square_hole_run(
record_work_play_start_after_success(
&state,
&request_context,
WorkPlayTrackingDraft::new(
WorkPlayTrackingDraft::runtime_principal(
"square-hole",
profile_id.clone(),
&authenticated,
&principal,
"/api/runtime/square-hole/...",
)
.profile_id(profile_id.clone())
.owner_user_id(principal.subject().to_string())
.extra(json!({
"runId": run.run_id,
"principalKind": principal.kind().as_str(),
})),
)
.await;
@@ -799,7 +801,7 @@ pub async fn get_square_hole_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(
&request_context,
@@ -810,7 +812,7 @@ pub async fn get_square_hole_run(
let run = state
.spacetime_client()
.get_square_hole_run(run_id, authenticated.claims().user_id().to_string())
.get_square_hole_run(run_id, principal.subject().to_string())
.await
.map_err(|error| {
square_hole_error_response(
@@ -832,7 +834,7 @@ pub async fn drop_square_hole_shape(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<DropSquareHoleShapeRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = square_hole_json(payload, &request_context, SQUARE_HOLE_RUNTIME_PROVIDER)?;
@@ -859,7 +861,7 @@ pub async fn drop_square_hole_shape(
.spacetime_client()
.drop_square_hole_shape(SquareHoleRunDropRecordInput {
run_id: payload.run_id.unwrap_or(run_id),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
hole_id: payload.hole_id,
client_snapshot_version: payload.client_snapshot_version,
client_event_id: payload.client_event_id,
@@ -887,7 +889,7 @@ pub async fn stop_square_hole_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<StopSquareHoleRunRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let _ = payload.ok();
@@ -902,7 +904,7 @@ pub async fn stop_square_hole_run(
.spacetime_client()
.stop_square_hole_run(SquareHoleRunStopRecordInput {
run_id,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
stopped_at_ms: current_utc_ms(),
})
.await
@@ -926,7 +928,7 @@ pub async fn restart_square_hole_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(
&request_context,
@@ -940,7 +942,7 @@ pub async fn restart_square_hole_run(
.restart_square_hole_run(SquareHoleRunRestartRecordInput {
source_run_id: run_id,
next_run_id: build_prefixed_uuid_id(SQUARE_HOLE_RUN_ID_PREFIX),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
restarted_at_ms: current_utc_ms(),
})
.await
@@ -964,7 +966,7 @@ pub async fn finish_square_hole_time_up(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(
&request_context,
@@ -977,7 +979,7 @@ pub async fn finish_square_hole_time_up(
.spacetime_client()
.finish_square_hole_time_up(SquareHoleRunTimeUpRecordInput {
run_id,
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
finished_at_ms: current_utc_ms(),
})
.await

View File

@@ -41,6 +41,7 @@ use tracing::{info, warn};
use crate::config::AppConfig;
use crate::puzzle_gallery_cache::PuzzleGalleryCache;
use crate::tracking_outbox::TrackingOutbox;
use crate::wallet_refund_outbox::WalletRefundOutbox;
use crate::wechat::pay::{build_wechat_pay_config, map_wechat_pay_init_error};
use crate::wechat::provider::build_wechat_provider;
use crate::work_author::{
@@ -263,6 +264,7 @@ pub struct AppStateInner {
spacetime_client: SpacetimeClient,
puzzle_gallery_cache: PuzzleGalleryCache,
tracking_outbox: Option<Arc<TrackingOutbox>>,
wallet_refund_outbox: Option<Arc<WalletRefundOutbox>>,
llm_client: Option<LlmClient>,
creative_agent_gpt5_client: Option<LlmClient>,
creative_agent_executor: Arc<MockLangChainRustAgentExecutor>,
@@ -406,6 +408,8 @@ impl AppState {
procedure_timeout: config.spacetime_procedure_timeout,
});
let tracking_outbox = TrackingOutbox::from_config(&config, spacetime_client.clone());
let wallet_refund_outbox =
WalletRefundOutbox::from_config(&config, spacetime_client.clone());
let llm_client = build_llm_client(&config)?;
let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?;
let http_request_permit_pools = HttpRequestPermitPools::from_config(&config);
@@ -441,6 +445,7 @@ impl AppState {
spacetime_client,
puzzle_gallery_cache: PuzzleGalleryCache::new(),
tracking_outbox,
wallet_refund_outbox,
llm_client,
creative_agent_gpt5_client,
creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor),
@@ -922,6 +927,10 @@ impl AppState {
self.tracking_outbox.clone()
}
pub fn wallet_refund_outbox(&self) -> Option<Arc<WalletRefundOutbox>> {
self.wallet_refund_outbox.clone()
}
pub fn llm_client(&self) -> Option<&LlmClient> {
self.llm_client.as_ref()
}

View File

@@ -30,7 +30,7 @@ use time::OffsetDateTime;
use crate::{
api_response::json_success_body,
auth::AuthenticatedAccessToken,
auth::{AuthenticatedAccessToken, RuntimePrincipal},
http_error::AppError,
prompt::visual_novel as vn_prompt,
request_context::RequestContext,
@@ -434,7 +434,7 @@ pub async fn start_visual_novel_run(
State(state): State<AppState>,
Path(profile_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<contract::VisualNovelStartRunRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = parse_json_payload(&request_context, payload)?;
@@ -453,7 +453,7 @@ pub async fn start_visual_novel_run(
.spacetime_client()
.start_visual_novel_run(VisualNovelRunStartRecordInput {
run_id: build_prefixed_uuid_id(domain::VISUAL_NOVEL_RUN_ID_PREFIX),
owner_user_id: authenticated.claims().user_id().to_string(),
owner_user_id: principal.subject().to_string(),
profile_id: profile_id.clone(),
mode: run_mode_to_wire(&payload.mode).to_string(),
snapshot_json: None,
@@ -467,16 +467,18 @@ pub async fn start_visual_novel_run(
record_work_play_start_after_success(
&state,
&request_context,
WorkPlayTrackingDraft::new(
WorkPlayTrackingDraft::runtime_principal(
"visual-novel",
profile_id.clone(),
&authenticated,
&principal,
"/api/runtime/visual-novel/...",
)
.profile_id(profile_id.clone())
.owner_user_id(principal.subject().to_string())
.extra(json!({
"mode": run_mode_to_wire(&payload.mode),
"runId": run.run_id,
"principalKind": principal.kind().as_str(),
})),
)
.await;
@@ -493,12 +495,12 @@ pub async fn get_visual_novel_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&run_id, "runId")?;
let run = state
.spacetime_client()
.get_visual_novel_run(run_id, authenticated.claims().user_id().to_string())
.get_visual_novel_run(run_id, principal.subject().to_string())
.await
.map_err(|error| {
visual_novel_error_response(&request_context, map_spacetime_error(error))
@@ -516,13 +518,13 @@ pub async fn stream_visual_novel_action(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<contract::VisualNovelRuntimeActionRequest>, JsonRejection>,
) -> Result<Response, Response> {
let Json(payload) = parse_json_payload(&request_context, payload)?;
ensure_non_empty(&run_id, "runId")?;
ensure_non_empty(&payload.client_event_id, "clientEventId")?;
let owner_user_id = authenticated.claims().user_id().to_string();
let owner_user_id = principal.subject().to_string();
let run = state
.spacetime_client()
.get_visual_novel_run(run_id.clone(), owner_user_id.clone())
@@ -569,12 +571,12 @@ pub async fn list_visual_novel_history(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
) -> Result<Json<Value>, Response> {
ensure_non_empty(&run_id, "runId")?;
let history = state
.spacetime_client()
.list_visual_novel_runtime_history(run_id, authenticated.claims().user_id().to_string())
.list_visual_novel_runtime_history(run_id, principal.subject().to_string())
.await
.map_err(|error| {
visual_novel_error_response(&request_context, map_spacetime_error(error))
@@ -595,13 +597,13 @@ pub async fn regenerate_visual_novel_run(
State(state): State<AppState>,
Path(run_id): Path<String>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Extension(principal): Extension<RuntimePrincipal>,
payload: Result<Json<contract::VisualNovelRegenerateRequest>, JsonRejection>,
) -> Result<Json<Value>, Response> {
let Json(payload) = parse_json_payload(&request_context, payload)?;
ensure_non_empty(&run_id, "runId")?;
ensure_non_empty(&payload.history_entry_id, "historyEntryId")?;
let owner_user_id = authenticated.claims().user_id().to_string();
let owner_user_id = principal.subject().to_string();
let run = state
.spacetime_client()
.get_visual_novel_run(run_id.clone(), owner_user_id.clone())

View File

@@ -0,0 +1,463 @@
use std::{
fmt,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use spacetime_client::{SpacetimeClient, SpacetimeClientError};
use tokio::{
fs::{self, File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
sync::{Mutex, Notify},
time::sleep,
};
use tracing::{debug, warn};
use crate::config::AppConfig;
const PENDING_FILE_PREFIX: &str = "refund-";
const CORRUPT_FILE_PREFIX: &str = "corrupt-";
const TEMP_FILE_PREFIX: &str = "tmp-";
const OUTBOX_FILE_EXTENSION: &str = ".json";
#[derive(Clone)]
pub struct WalletRefundOutbox {
dir: PathBuf,
batch_size: usize,
flush_interval: Duration,
max_bytes: u64,
spacetime_client: SpacetimeClient,
enqueue_lock: Arc<Mutex<()>>,
flush_notify: Arc<Notify>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct WalletRefundOutboxRecord {
pub owner_user_id: String,
pub amount: u64,
pub ledger_id: String,
pub created_at_micros: i64,
pub asset_kind: String,
pub asset_id: String,
}
#[derive(Debug)]
pub enum WalletRefundOutboxEnqueueOutcome {
Enqueued,
Dropped { reason: &'static str },
}
#[derive(Debug)]
pub enum WalletRefundOutboxError {
Io(std::io::Error),
Json(serde_json::Error),
Spacetime(SpacetimeClientError),
}
impl WalletRefundOutbox {
pub fn from_config(config: &AppConfig, spacetime_client: SpacetimeClient) -> Option<Arc<Self>> {
if !config.wallet_refund_outbox_enabled {
return None;
}
Some(Arc::new(Self {
dir: config.wallet_refund_outbox_dir.clone(),
batch_size: config.wallet_refund_outbox_batch_size.max(1),
flush_interval: config.wallet_refund_outbox_flush_interval,
max_bytes: config.wallet_refund_outbox_max_bytes,
spacetime_client,
enqueue_lock: Arc::new(Mutex::new(())),
flush_notify: Arc::new(Notify::new()),
}))
}
pub async fn enqueue(
&self,
record: WalletRefundOutboxRecord,
) -> Result<WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxError> {
let _guard = self.enqueue_lock.lock().await;
fs::create_dir_all(&self.dir).await?;
let pending_path = self.pending_path_for_ledger(&record.ledger_id);
if fs::metadata(&pending_path).await.is_ok() {
self.flush_notify.notify_one();
return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued);
}
let bytes = serde_json::to_vec(&record)?;
let line_bytes = bytes.len().min(u64::MAX as usize) as u64;
let current_bytes = directory_size_if_exists(&self.dir).unwrap_or(0);
if current_bytes.saturating_add(line_bytes) > self.max_bytes {
return Ok(WalletRefundOutboxEnqueueOutcome::Dropped {
reason: "max_bytes",
});
}
let temp_path = self.temp_path();
let mut file = OpenOptions::new()
.create_new(true)
.write(true)
.open(&temp_path)
.await?;
file.write_all(&bytes).await?;
file.flush().await?;
file.sync_data().await?;
drop(file);
if fs::metadata(&pending_path).await.is_ok() {
let _ = fs::remove_file(&temp_path).await;
self.flush_notify.notify_one();
return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued);
}
fs::rename(&temp_path, &pending_path).await?;
sync_directory_metadata(&self.dir).await?;
self.flush_notify.notify_one();
Ok(WalletRefundOutboxEnqueueOutcome::Enqueued)
}
pub fn spawn_worker(self: Arc<Self>) {
tokio::spawn(async move {
loop {
tokio::select! {
_ = sleep(self.flush_interval) => {
if let Err(error) = self.flush_pending_files_once().await {
warn!(error = %error, "wallet refund outbox 重放退款失败,将保留文件等待重试");
}
}
_ = self.flush_notify.notified() => {
if let Err(error) = self.flush_pending_files_once().await {
warn!(error = %error, "wallet refund outbox 主动重放退款失败,将保留文件等待重试");
}
}
}
}
});
}
pub async fn flush_for_shutdown(&self) -> Result<(), WalletRefundOutboxError> {
self.flush_pending_files_once().await
}
async fn flush_pending_files_once(&self) -> Result<(), WalletRefundOutboxError> {
fs::create_dir_all(&self.dir).await?;
let pending_files = self.list_pending_files().await?;
for path in pending_files.into_iter().take(self.batch_size) {
let record = match read_refund_record(&path).await {
Ok(record) => record,
Err(error) if error.is_data_corruption() => {
let corrupt_path = self.corrupt_path_for(&path);
fs::rename(&path, &corrupt_path).await?;
sync_directory_metadata(&self.dir).await?;
warn!(
error = %error,
source = %path.display(),
target = %corrupt_path.display(),
"wallet refund outbox 文件无法解析,已隔离"
);
continue;
}
Err(error) => return Err(error),
};
match self
.spacetime_client
.refund_profile_wallet_points(
record.owner_user_id.clone(),
record.amount,
record.ledger_id.clone(),
record.created_at_micros,
)
.await
{
Ok(_) => {
match fs::remove_file(&path).await {
Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
Err(error) => return Err(error.into()),
}
sync_directory_metadata(&self.dir).await?;
debug!(
ledger_id = %record.ledger_id,
owner_user_id = %record.owner_user_id,
asset_kind = %record.asset_kind,
asset_id = %record.asset_id,
path = %path.display(),
"wallet refund outbox 退款已重放并删除文件"
);
}
Err(error) => return Err(WalletRefundOutboxError::Spacetime(error)),
}
}
Ok(())
}
async fn list_pending_files(&self) -> Result<Vec<PathBuf>, WalletRefundOutboxError> {
let mut entries = fs::read_dir(&self.dir).await?;
let mut files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let Some(name) = path.file_name().and_then(|value| value.to_str()) else {
continue;
};
if name.starts_with(PENDING_FILE_PREFIX) && name.ends_with(OUTBOX_FILE_EXTENSION) {
files.push(path);
}
}
files.sort();
Ok(files)
}
fn pending_path_for_ledger(&self, ledger_id: &str) -> PathBuf {
self.dir.join(format!(
"{PENDING_FILE_PREFIX}{}{OUTBOX_FILE_EXTENSION}",
ledger_id_hash(ledger_id)
))
}
fn temp_path(&self) -> PathBuf {
self.dir.join(format!(
"{TEMP_FILE_PREFIX}{}-{uuid}{OUTBOX_FILE_EXTENSION}",
current_unix_micros(),
uuid = uuid::Uuid::new_v4()
))
}
fn corrupt_path_for(&self, path: &Path) -> PathBuf {
let name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("unknown.json");
self.dir.join(format!(
"{CORRUPT_FILE_PREFIX}{}-{uuid}-{name}",
current_unix_micros(),
uuid = uuid::Uuid::new_v4()
))
}
}
impl fmt::Debug for WalletRefundOutbox {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalletRefundOutbox")
.field("dir", &self.dir)
.field("batch_size", &self.batch_size)
.field("flush_interval", &self.flush_interval)
.field("max_bytes", &self.max_bytes)
.finish()
}
}
impl fmt::Display for WalletRefundOutboxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
Self::Json(error) => write!(f, "{error}"),
Self::Spacetime(error) => write!(f, "{error}"),
}
}
}
impl From<std::io::Error> for WalletRefundOutboxError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<serde_json::Error> for WalletRefundOutboxError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value)
}
}
impl WalletRefundOutboxError {
fn is_data_corruption(&self) -> bool {
matches!(self, Self::Json(_))
}
}
async fn read_refund_record(
path: &Path,
) -> Result<WalletRefundOutboxRecord, WalletRefundOutboxError> {
let mut file = File::open(path).await?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).await?;
Ok(serde_json::from_slice::<WalletRefundOutboxRecord>(&bytes)?)
}
fn directory_size_if_exists(path: &Path) -> Result<u64, std::io::Error> {
if !path.is_dir() {
return Ok(0);
}
let mut total = 0u64;
for entry in std::fs::read_dir(path)? {
let entry = entry?;
if !is_pending_outbox_file_name(&entry.file_name()) {
continue;
}
let metadata = entry.metadata()?;
if metadata.is_file() {
total = total.saturating_add(metadata.len());
}
}
Ok(total)
}
fn current_unix_micros() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros()
}
fn ledger_id_hash(ledger_id: &str) -> String {
hex::encode(Sha256::digest(ledger_id.as_bytes()))
}
fn is_pending_outbox_file_name(name: &std::ffi::OsStr) -> bool {
name.to_str().is_some_and(|value| {
value.starts_with(PENDING_FILE_PREFIX) && value.ends_with(OUTBOX_FILE_EXTENSION)
})
}
async fn sync_directory_metadata(path: &Path) -> Result<(), WalletRefundOutboxError> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let dir = std::fs::File::open(path)?;
dir.sync_all()
})
.await
.map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error.to_string()))??;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_record(ledger_id: &str) -> WalletRefundOutboxRecord {
WalletRefundOutboxRecord {
owner_user_id: "user-1".to_string(),
amount: 2,
ledger_id: ledger_id.to_string(),
created_at_micros: 1_713_680_000_000_000,
asset_kind: "puzzle_initial_image".to_string(),
asset_id: "asset-1".to_string(),
}
}
fn test_dir(name: &str) -> PathBuf {
let dir = std::env::temp_dir().join(format!(
"genarrative-wallet-refund-outbox-{name}-{}",
current_unix_micros()
));
let _ = std::fs::remove_dir_all(&dir);
dir
}
fn test_outbox(dir: PathBuf, max_bytes: u64) -> Arc<WalletRefundOutbox> {
let config = AppConfig {
wallet_refund_outbox_dir: dir,
wallet_refund_outbox_batch_size: 500,
wallet_refund_outbox_flush_interval: Duration::from_secs(60),
wallet_refund_outbox_max_bytes: max_bytes,
..AppConfig::default()
};
WalletRefundOutbox::from_config(
&config,
SpacetimeClient::new(spacetime_client::SpacetimeClientConfig {
server_url: "http://127.0.0.1:1".to_string(),
database: "missing".to_string(),
token: None,
pool_size: 1,
procedure_timeout: Duration::from_millis(10),
}),
)
.expect("outbox should be enabled")
}
#[tokio::test]
async fn enqueue_is_idempotent_per_ledger_id() {
let dir = test_dir("idempotent");
let outbox = test_outbox(dir.clone(), 1024 * 1024);
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
let pending_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| is_pending_outbox_file_name(&entry.file_name()))
.count();
assert_eq!(pending_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn enqueue_drops_when_outbox_exceeds_max_bytes() {
let dir = test_dir("max-bytes");
let outbox = test_outbox(dir.clone(), 1);
let outcome = outbox.enqueue(sample_record("ledger-1")).await.unwrap();
assert!(matches!(
outcome,
WalletRefundOutboxEnqueueOutcome::Dropped {
reason: "max_bytes"
}
));
assert!(!dir.exists() || std::fs::read_dir(&dir).unwrap().next().is_none());
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn flush_quarantines_corrupt_file() {
let dir = test_dir("corrupt");
std::fs::create_dir_all(&dir).unwrap();
let pending_path = dir.join(format!("{PENDING_FILE_PREFIX}bad{OUTBOX_FILE_EXTENSION}"));
std::fs::write(&pending_path, b"{not-json}").unwrap();
let outbox = test_outbox(dir.clone(), 1024 * 1024);
outbox.flush_pending_files_once().await.unwrap();
assert!(!pending_path.exists());
let corrupt_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| {
entry
.file_name()
.to_str()
.is_some_and(|name| name.starts_with(CORRUPT_FILE_PREFIX))
})
.count();
assert_eq!(corrupt_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn shutdown_flush_keeps_file_when_spacetime_is_unavailable() {
let dir = test_dir("shutdown");
let outbox = test_outbox(dir.clone(), 1024 * 1024);
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
let result = outbox.flush_for_shutdown().await;
assert!(
matches!(result, Err(WalletRefundOutboxError::Spacetime(_))),
"missing test SpacetimeDB should keep refund file for retry"
);
let pending_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| is_pending_outbox_file_name(&entry.file_name()))
.count();
assert_eq!(pending_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
}

View File

@@ -20,6 +20,14 @@ pub struct PuzzleAgentSessionProcedureResult {
pub error_message: Option<String>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleBackgroundCompileTaskProcedureResult {
pub ok: bool,
pub claimed: bool,
pub error_message: Option<String>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleWorksProcedureResult {

View File

@@ -71,6 +71,25 @@ pub struct PuzzleDraftCompileInput {
pub external_generation_lease_token: Option<String>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleBackgroundCompileTaskClaimInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub claimed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleBackgroundCompileTaskReleaseInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleDraftCompileFailureInput {

View File

@@ -54,7 +54,8 @@ pub use mapper::{
PublicWorkGalleryEntryRecord, PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord,
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleClearActionRequest, PuzzleClearActionResponse, PuzzleClearActionType,
PuzzleClearBoardCell, PuzzleClearBoardSnapshot, PuzzleClearCardAsset, PuzzleClearDraftResponse,
PuzzleClearGenerationStatus, PuzzleClearImageAsset, PuzzleClearNextLevelRequest,
@@ -353,7 +354,7 @@ type ProcedureResultSender<T> =
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
struct SpacetimeConnectionPool {
slots: Vec<tokio::sync::Mutex<PooledConnectionSlot>>,
slots: Vec<PooledConnectionSlot>,
permits: Arc<Semaphore>,
}
@@ -376,8 +377,10 @@ impl SpacetimeStageError {
}
struct PooledConnectionSlot {
connection: Option<PooledConnection>,
in_use: bool,
// 槽位占用标记独立成原子量:抢占/复位不依赖锁,租约 Drop 兜底可以同步完成。
in_use: AtomicBool,
// in_use=true 的持有者独占本槽连接,正常情况下锁上不会有竞争。
connection: tokio::sync::Mutex<Option<PooledConnection>>,
}
struct PooledConnection {
@@ -390,9 +393,28 @@ struct PooledConnection {
struct PooledConnectionLease {
slot_index: usize,
connection: Option<PooledConnection>,
pool: Arc<SpacetimeConnectionPool>,
_permit: OwnedSemaphorePermit,
}
impl Drop for PooledConnectionLease {
// 租约 Drop 兜底:请求 future 被取消(如客户端断开导致 handler 被丢弃)时,
// 也必须归还连接并复位槽位,否则槽位会永久停留在 in_use 状态、连接池逐渐耗尽。
fn drop(&mut self) {
let slot = &self.pool.slots[self.slot_index];
if let Some(connection) = self.connection.take() {
if !connection.is_broken() {
if let Ok(mut slot_connection) = slot.connection.try_lock() {
*slot_connection = Some(connection);
}
// try_lock 理论上不会失败in_use 持有者独占);万一失败只丢弃连接,不丢槽位。
}
}
slot.in_use.store(false, Ordering::Release);
// _permit 随 Drop 自动归还信号量。
}
}
impl SpacetimeClient {
pub fn new(config: SpacetimeClientConfig) -> Self {
let pool_size = config.pool_size.max(1) as usize;
@@ -405,11 +427,9 @@ impl SpacetimeClient {
..config
};
let slots = (0..pool_size)
.map(|_| {
tokio::sync::Mutex::new(PooledConnectionSlot {
connection: None,
in_use: false,
})
.map(|_| PooledConnectionSlot {
in_use: AtomicBool::new(false),
connection: tokio::sync::Mutex::new(None),
})
.collect::<Vec<_>>();
let pool = Arc::new(SpacetimeConnectionPool {
@@ -683,42 +703,49 @@ impl SpacetimeClient {
)
})?;
loop {
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
if let Ok(mut slot_guard) = slot.try_lock() {
if slot_guard.in_use {
continue;
}
let reusable_connection = slot_guard
.connection
.take()
.filter(|connection| !connection.is_broken());
slot_guard.in_use = true;
drop(slot_guard);
// 持有 permit 即保证最多 pool_size 个并发持有者,必然能抢到一个空闲槽位;
// CAS 抢占后立即构造租约,后续任何失败/取消都由租约 Drop 兜底复位槽位。
let slot_index = self
.pool
.slots
.iter()
.position(|slot| {
slot.in_use
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
})
.ok_or_else(|| {
SpacetimeStageError::new(
SpacetimeClientStage::PoolAcquire,
SpacetimeClientError::Runtime(
"SpacetimeDB 连接池 permit 与槽位状态不一致".to_string(),
),
)
})?;
let connection = if let Some(connection) = reusable_connection {
connection
} else {
match self.build_pooled_connection(operation_timeout).await {
Ok(connection) => connection,
Err(error) => {
let mut slot_guard = self.pool.slots[slot_index].lock().await;
slot_guard.in_use = false;
return Err(error);
}
}
};
let mut lease = PooledConnectionLease {
slot_index,
connection: None,
pool: self.pool.clone(),
_permit: permit,
};
return Ok(PooledConnectionLease {
slot_index,
connection: Some(connection),
_permit: permit,
});
}
}
let reusable_connection = self.pool.slots[slot_index]
.connection
.lock()
.await
.take()
.filter(|connection| !connection.is_broken());
tokio::task::yield_now().await;
}
let connection = if let Some(connection) = reusable_connection {
connection
} else {
// 建连失败时直接返回错误,槽位与 permit 由 lease Drop 自动归还。
self.build_pooled_connection(operation_timeout).await?
};
lease.connection = Some(connection);
Ok(lease)
}
async fn build_pooled_connection(
@@ -916,18 +943,10 @@ impl SpacetimeClient {
Ok(subscription)
}
async fn release_connection(&self, mut lease: PooledConnectionLease) {
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
slot_guard.in_use = false;
let Some(connection) = lease.connection.take() else {
slot_guard.connection = None;
return;
};
if connection.is_broken() {
slot_guard.connection = None;
} else {
slot_guard.connection = Some(connection);
}
async fn release_connection(&self, lease: PooledConnectionLease) {
// 显式归还与“请求被取消”的隐式归还共用同一套租约 Drop 兜底逻辑,
// 保证任何路径下槽位与 permit 都会复位,连接池不会被慢慢泄漏占满。
drop(lease);
}
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
@@ -1132,4 +1151,78 @@ mod tests {
SpacetimeClientError::Runtime(_)
));
}
fn test_client(pool_size: u32, procedure_timeout: Duration) -> SpacetimeClient {
SpacetimeClient::new(SpacetimeClientConfig {
// 指向本机不可达端口:测试只验证连接池行为,不需要真实 SpacetimeDB。
server_url: "http://127.0.0.1:9".to_string(),
database: "pool-test".to_string(),
token: None,
pool_size,
procedure_timeout,
})
}
/// 复现线上故障机制:修复前请求 future 被取消时租约不会归还,槽位永久停留在 in_use
/// 后续 acquire 拿着 permit 空转挂死。修复后租约 Drop 必须同时复位槽位与 permit。
#[tokio::test]
async fn dropped_lease_releases_slot_and_permit() {
let client = test_client(1, Duration::from_millis(200));
let permit = client
.pool
.permits
.clone()
.acquire_owned()
.await
.expect("permit should acquire");
client.pool.slots[0].in_use.store(true, Ordering::SeqCst);
assert_eq!(client.pool.permits.available_permits(), 0);
// 模拟请求被取消:租约未经过 release_connection 直接被 Drop。
let lease = PooledConnectionLease {
slot_index: 0,
connection: None,
pool: client.pool.clone(),
_permit: permit,
};
drop(lease);
assert!(
!client.pool.slots[0].in_use.load(Ordering::SeqCst),
"租约 Drop 后槽位必须复位,否则连接池会被泄漏占满"
);
assert_eq!(
client.pool.permits.available_permits(),
1,
"租约 Drop 后 permit 必须归还"
);
}
/// 池内 permit 全部被占用持续在途请求acquire 必须在超时窗口内返回
/// pool_acquire 超时,而不是无限等待。
#[tokio::test]
async fn acquire_times_out_at_pool_acquire_when_pool_is_busy() {
let client = test_client(1, Duration::from_millis(200));
let _held_permit = client
.pool
.permits
.clone()
.acquire_owned()
.await
.expect("permit should acquire");
let result = tokio::time::timeout(
Duration::from_secs(5),
client.acquire_connection_with_timeout(Duration::from_millis(200)),
)
.await
.expect("acquire 必须在超时窗口内返回,而不是空转挂死");
let error = match result {
Ok(_) => panic!("池占满时应返回 pool_acquire 超时"),
Err(error) => error,
};
assert_eq!(error.stage, SpacetimeClientStage::PoolAcquire);
assert!(matches!(error.error, SpacetimeClientError::Timeout));
}
}

View File

@@ -107,7 +107,8 @@ pub use self::puzzle::{
PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord,
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleCreatorIntentRecord, PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord,
PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, PuzzleGalleryCardRecord,
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
@@ -208,10 +209,10 @@ pub(crate) use self::public_work::{
map_public_work_gallery_entry, map_public_work_gallery_entry_to_detail_entry,
};
pub(crate) use self::puzzle::{
map_puzzle_agent_session_procedure_result, map_puzzle_gallery_card_view_row,
map_puzzle_run_procedure_result, map_puzzle_work_procedure_result,
map_puzzle_works_procedure_result, map_runtime_profile_wallet_ledger_source_type_back,
parse_puzzle_agent_stage_record,
map_puzzle_agent_session_procedure_result, map_puzzle_background_compile_task_procedure_result,
map_puzzle_gallery_card_view_row, map_puzzle_run_procedure_result,
map_puzzle_work_procedure_result, map_puzzle_works_procedure_result,
map_runtime_profile_wallet_ledger_source_type_back, parse_puzzle_agent_stage_record,
};
pub(crate) use self::puzzle_clear::{
map_puzzle_clear_agent_session_procedure_result, map_puzzle_clear_gallery_card_view_row,

View File

@@ -13,6 +13,16 @@ pub(crate) fn map_puzzle_agent_session_procedure_result(
Ok(map_puzzle_agent_session_snapshot(session))
}
pub(crate) fn map_puzzle_background_compile_task_procedure_result(
result: PuzzleBackgroundCompileTaskProcedureResult,
) -> Result<bool, SpacetimeClientError> {
if !result.ok {
return Err(SpacetimeClientError::procedure_failed(result.error_message));
}
Ok(result.claimed)
}
pub(crate) fn map_puzzle_work_procedure_result(
result: PuzzleWorkProcedureResult,
) -> Result<PuzzleWorkProfileRecord, SpacetimeClientError> {
@@ -614,6 +624,23 @@ pub struct PuzzleFormDraftSaveRecordInput {
pub saved_at_micros: i64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PuzzleBackgroundCompileTaskClaimRecordInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub claimed_at_micros: i64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PuzzleBackgroundCompileTaskReleaseRecordInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PuzzleAgentMessageSubmitRecordInput {
pub session_id: String,

View File

@@ -205,6 +205,7 @@ pub mod chapter_progression_type;
pub mod checkpoint_wooden_fish_run_procedure;
pub mod claim_external_generation_jobs_and_return_procedure;
pub mod claim_profile_task_reward_and_return_procedure;
pub mod claim_puzzle_background_compile_task_procedure;
pub mod claim_puzzle_work_point_incentive_procedure;
pub mod clear_database_migration_import_chunks_procedure;
pub mod clear_platform_browse_history_and_return_procedure;
@@ -642,6 +643,11 @@ pub mod puzzle_anchor_item_type;
pub mod puzzle_anchor_pack_type;
pub mod puzzle_anchor_status_type;
pub mod puzzle_audio_asset_type;
pub mod puzzle_background_compile_task_claim_input_type;
pub mod puzzle_background_compile_task_procedure_result_type;
pub mod puzzle_background_compile_task_release_input_type;
pub mod puzzle_background_compile_task_row_type;
pub mod puzzle_background_compile_task_table;
pub mod puzzle_board_snapshot_type;
pub mod puzzle_cell_position_type;
pub mod puzzle_clear_agent_session_create_input_type;
@@ -781,6 +787,7 @@ pub mod redeem_profile_reward_code_procedure;
pub mod refresh_session_table;
pub mod refresh_session_type;
pub mod refund_profile_wallet_points_and_return_procedure;
pub mod release_puzzle_background_compile_task_procedure;
pub mod remix_big_fish_work_procedure;
pub mod remix_custom_world_profile_procedure;
pub mod remix_puzzle_work_procedure;
@@ -1329,6 +1336,7 @@ pub use chapter_progression_type::ChapterProgression;
pub use checkpoint_wooden_fish_run_procedure::checkpoint_wooden_fish_run;
pub use claim_external_generation_jobs_and_return_procedure::claim_external_generation_jobs_and_return;
pub use claim_profile_task_reward_and_return_procedure::claim_profile_task_reward_and_return;
pub use claim_puzzle_background_compile_task_procedure::claim_puzzle_background_compile_task;
pub use claim_puzzle_work_point_incentive_procedure::claim_puzzle_work_point_incentive;
pub use clear_database_migration_import_chunks_procedure::clear_database_migration_import_chunks;
pub use clear_platform_browse_history_and_return_procedure::clear_platform_browse_history_and_return;
@@ -1766,6 +1774,11 @@ pub use puzzle_anchor_item_type::PuzzleAnchorItem;
pub use puzzle_anchor_pack_type::PuzzleAnchorPack;
pub use puzzle_anchor_status_type::PuzzleAnchorStatus;
pub use puzzle_audio_asset_type::PuzzleAudioAsset;
pub use puzzle_background_compile_task_claim_input_type::PuzzleBackgroundCompileTaskClaimInput;
pub use puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult;
pub use puzzle_background_compile_task_release_input_type::PuzzleBackgroundCompileTaskReleaseInput;
pub use puzzle_background_compile_task_row_type::PuzzleBackgroundCompileTaskRow;
pub use puzzle_background_compile_task_table::*;
pub use puzzle_board_snapshot_type::PuzzleBoardSnapshot;
pub use puzzle_cell_position_type::PuzzleCellPosition;
pub use puzzle_clear_agent_session_create_input_type::PuzzleClearAgentSessionCreateInput;
@@ -1905,6 +1918,7 @@ pub use redeem_profile_reward_code_procedure::redeem_profile_reward_code;
pub use refresh_session_table::*;
pub use refresh_session_type::RefreshSession;
pub use refund_profile_wallet_points_and_return_procedure::refund_profile_wallet_points_and_return;
pub use release_puzzle_background_compile_task_procedure::release_puzzle_background_compile_task;
pub use remix_big_fish_work_procedure::remix_big_fish_work;
pub use remix_custom_world_profile_procedure::remix_custom_world_profile;
pub use remix_puzzle_work_procedure::remix_puzzle_work;
@@ -2602,6 +2616,7 @@ pub struct DbUpdate {
public_work_play_daily_stat: __sdk::TableUpdate<PublicWorkPlayDailyStat>,
puzzle_agent_message: __sdk::TableUpdate<PuzzleAgentMessageRow>,
puzzle_agent_session: __sdk::TableUpdate<PuzzleAgentSessionRow>,
puzzle_background_compile_task: __sdk::TableUpdate<PuzzleBackgroundCompileTaskRow>,
puzzle_clear_agent_session: __sdk::TableUpdate<PuzzleClearAgentSessionRow>,
puzzle_clear_event: __sdk::TableUpdate<PuzzleClearEventRow>,
puzzle_clear_gallery_card_view: __sdk::TableUpdate<PuzzleClearGalleryCardViewRow>,
@@ -2890,6 +2905,11 @@ impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate {
"puzzle_agent_session" => db_update.puzzle_agent_session.append(
puzzle_agent_session_table::parse_table_update(table_update)?,
),
"puzzle_background_compile_task" => {
db_update.puzzle_background_compile_task.append(
puzzle_background_compile_task_table::parse_table_update(table_update)?,
)
}
"puzzle_clear_agent_session" => db_update.puzzle_clear_agent_session.append(
puzzle_clear_agent_session_table::parse_table_update(table_update)?,
),
@@ -3415,6 +3435,12 @@ impl __sdk::DbUpdate for DbUpdate {
&self.puzzle_agent_session,
)
.with_updates_by_pk(|row| &row.session_id);
diff.puzzle_background_compile_task = cache
.apply_diff_to_table::<PuzzleBackgroundCompileTaskRow>(
"puzzle_background_compile_task",
&self.puzzle_background_compile_task,
)
.with_updates_by_pk(|row| &row.task_id);
diff.puzzle_clear_agent_session = cache
.apply_diff_to_table::<PuzzleClearAgentSessionRow>(
"puzzle_clear_agent_session",
@@ -3873,6 +3899,9 @@ impl __sdk::DbUpdate for DbUpdate {
"puzzle_agent_session" => db_update
.puzzle_agent_session
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
"puzzle_background_compile_task" => db_update
.puzzle_background_compile_task
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
"puzzle_clear_agent_session" => db_update
.puzzle_clear_agent_session
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
@@ -4240,6 +4269,9 @@ impl __sdk::DbUpdate for DbUpdate {
"puzzle_agent_session" => db_update
.puzzle_agent_session
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
"puzzle_background_compile_task" => db_update
.puzzle_background_compile_task
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
"puzzle_clear_agent_session" => db_update
.puzzle_clear_agent_session
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
@@ -4459,6 +4491,7 @@ pub struct AppliedDiff<'r> {
public_work_play_daily_stat: __sdk::TableAppliedDiff<'r, PublicWorkPlayDailyStat>,
puzzle_agent_message: __sdk::TableAppliedDiff<'r, PuzzleAgentMessageRow>,
puzzle_agent_session: __sdk::TableAppliedDiff<'r, PuzzleAgentSessionRow>,
puzzle_background_compile_task: __sdk::TableAppliedDiff<'r, PuzzleBackgroundCompileTaskRow>,
puzzle_clear_agent_session: __sdk::TableAppliedDiff<'r, PuzzleClearAgentSessionRow>,
puzzle_clear_event: __sdk::TableAppliedDiff<'r, PuzzleClearEventRow>,
puzzle_clear_gallery_card_view: __sdk::TableAppliedDiff<'r, PuzzleClearGalleryCardViewRow>,
@@ -4883,6 +4916,11 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
&self.puzzle_agent_session,
event,
);
callbacks.invoke_table_row_callbacks::<PuzzleBackgroundCompileTaskRow>(
"puzzle_background_compile_task",
&self.puzzle_background_compile_task,
event,
);
callbacks.invoke_table_row_callbacks::<PuzzleClearAgentSessionRow>(
"puzzle_clear_agent_session",
&self.puzzle_clear_agent_session,
@@ -5821,6 +5859,7 @@ impl __sdk::SpacetimeModule for RemoteModule {
public_work_play_daily_stat_table::register_table(client_cache);
puzzle_agent_message_table::register_table(client_cache);
puzzle_agent_session_table::register_table(client_cache);
puzzle_background_compile_task_table::register_table(client_cache);
puzzle_clear_agent_session_table::register_table(client_cache);
puzzle_clear_event_table::register_table(client_cache);
puzzle_clear_gallery_card_view_table::register_table(client_cache);
@@ -5941,6 +5980,7 @@ impl __sdk::SpacetimeModule for RemoteModule {
"public_work_play_daily_stat",
"puzzle_agent_message",
"puzzle_agent_session",
"puzzle_background_compile_task",
"puzzle_clear_agent_session",
"puzzle_clear_event",
"puzzle_clear_gallery_card_view",

View File

@@ -0,0 +1,59 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::puzzle_background_compile_task_claim_input_type::PuzzleBackgroundCompileTaskClaimInput;
use super::puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct ClaimPuzzleBackgroundCompileTaskArgs {
pub input: PuzzleBackgroundCompileTaskClaimInput,
}
impl __sdk::InModule for ClaimPuzzleBackgroundCompileTaskArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `claim_puzzle_background_compile_task`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait claim_puzzle_background_compile_task {
fn claim_puzzle_background_compile_task(&self, input: PuzzleBackgroundCompileTaskClaimInput) {
self.claim_puzzle_background_compile_task_then(input, |_, _| {});
}
fn claim_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskClaimInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
);
}
impl claim_puzzle_background_compile_task for super::RemoteProcedures {
fn claim_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskClaimInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
) {
self.imp
.invoke_procedure_with_callback::<_, PuzzleBackgroundCompileTaskProcedureResult>(
"claim_puzzle_background_compile_task",
ClaimPuzzleBackgroundCompileTaskArgs { input },
__callback,
);
}
}

View File

@@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskClaimInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub claimed_at_micros: i64,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskClaimInput {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,17 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskProcedureResult {
pub ok: bool,
pub claimed: bool,
pub error_message: Option<String>,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskProcedureResult {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,18 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskReleaseInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskReleaseInput {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,66 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskRow {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub created_at: __sdk::Timestamp,
pub updated_at: __sdk::Timestamp,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskRow {
type Module = super::RemoteModule;
}
/// Column accessor struct for the table `PuzzleBackgroundCompileTaskRow`.
///
/// Provides typed access to columns for query building.
pub struct PuzzleBackgroundCompileTaskRowCols {
pub task_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub claim_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub session_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub owner_user_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub created_at: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, __sdk::Timestamp>,
pub updated_at: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, __sdk::Timestamp>,
}
impl __sdk::__query_builder::HasCols for PuzzleBackgroundCompileTaskRow {
type Cols = PuzzleBackgroundCompileTaskRowCols;
fn cols(table_name: &'static str) -> Self::Cols {
PuzzleBackgroundCompileTaskRowCols {
task_id: __sdk::__query_builder::Col::new(table_name, "task_id"),
claim_id: __sdk::__query_builder::Col::new(table_name, "claim_id"),
session_id: __sdk::__query_builder::Col::new(table_name, "session_id"),
owner_user_id: __sdk::__query_builder::Col::new(table_name, "owner_user_id"),
created_at: __sdk::__query_builder::Col::new(table_name, "created_at"),
updated_at: __sdk::__query_builder::Col::new(table_name, "updated_at"),
}
}
}
/// Indexed column accessor struct for the table `PuzzleBackgroundCompileTaskRow`.
///
/// Provides typed access to indexed columns for query building.
pub struct PuzzleBackgroundCompileTaskRowIxCols {
pub session_id: __sdk::__query_builder::IxCol<PuzzleBackgroundCompileTaskRow, String>,
pub task_id: __sdk::__query_builder::IxCol<PuzzleBackgroundCompileTaskRow, String>,
}
impl __sdk::__query_builder::HasIxCols for PuzzleBackgroundCompileTaskRow {
type IxCols = PuzzleBackgroundCompileTaskRowIxCols;
fn ix_cols(table_name: &'static str) -> Self::IxCols {
PuzzleBackgroundCompileTaskRowIxCols {
session_id: __sdk::__query_builder::IxCol::new(table_name, "session_id"),
task_id: __sdk::__query_builder::IxCol::new(table_name, "task_id"),
}
}
}
impl __sdk::__query_builder::CanBeLookupTable for PuzzleBackgroundCompileTaskRow {}

View File

@@ -0,0 +1,169 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use super::puzzle_background_compile_task_row_type::PuzzleBackgroundCompileTaskRow;
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
/// Table handle for the table `puzzle_background_compile_task`.
///
/// Obtain a handle from the [`PuzzleBackgroundCompileTaskTableAccess::puzzle_background_compile_task`] method on [`super::RemoteTables`],
/// like `ctx.db.puzzle_background_compile_task()`.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.puzzle_background_compile_task().on_insert(...)`.
pub struct PuzzleBackgroundCompileTaskTableHandle<'ctx> {
imp: __sdk::TableHandle<PuzzleBackgroundCompileTaskRow>,
ctx: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the table `puzzle_background_compile_task`.
///
/// Implemented for [`super::RemoteTables`].
pub trait PuzzleBackgroundCompileTaskTableAccess {
#[allow(non_snake_case)]
/// Obtain a [`PuzzleBackgroundCompileTaskTableHandle`], which mediates access to the table `puzzle_background_compile_task`.
fn puzzle_background_compile_task(&self) -> PuzzleBackgroundCompileTaskTableHandle<'_>;
}
impl PuzzleBackgroundCompileTaskTableAccess for super::RemoteTables {
fn puzzle_background_compile_task(&self) -> PuzzleBackgroundCompileTaskTableHandle<'_> {
PuzzleBackgroundCompileTaskTableHandle {
imp: self
.imp
.get_table::<PuzzleBackgroundCompileTaskRow>("puzzle_background_compile_task"),
ctx: std::marker::PhantomData,
}
}
}
pub struct PuzzleBackgroundCompileTaskInsertCallbackId(__sdk::CallbackId);
pub struct PuzzleBackgroundCompileTaskDeleteCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::Table for PuzzleBackgroundCompileTaskTableHandle<'ctx> {
type Row = PuzzleBackgroundCompileTaskRow;
type EventContext = super::EventContext;
fn count(&self) -> u64 {
self.imp.count()
}
fn iter(&self) -> impl Iterator<Item = PuzzleBackgroundCompileTaskRow> + '_ {
self.imp.iter()
}
type InsertCallbackId = PuzzleBackgroundCompileTaskInsertCallbackId;
fn on_insert(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> PuzzleBackgroundCompileTaskInsertCallbackId {
PuzzleBackgroundCompileTaskInsertCallbackId(self.imp.on_insert(Box::new(callback)))
}
fn remove_on_insert(&self, callback: PuzzleBackgroundCompileTaskInsertCallbackId) {
self.imp.remove_on_insert(callback.0)
}
type DeleteCallbackId = PuzzleBackgroundCompileTaskDeleteCallbackId;
fn on_delete(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> PuzzleBackgroundCompileTaskDeleteCallbackId {
PuzzleBackgroundCompileTaskDeleteCallbackId(self.imp.on_delete(Box::new(callback)))
}
fn remove_on_delete(&self, callback: PuzzleBackgroundCompileTaskDeleteCallbackId) {
self.imp.remove_on_delete(callback.0)
}
}
pub struct PuzzleBackgroundCompileTaskUpdateCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::TableWithPrimaryKey for PuzzleBackgroundCompileTaskTableHandle<'ctx> {
type UpdateCallbackId = PuzzleBackgroundCompileTaskUpdateCallbackId;
fn on_update(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static,
) -> PuzzleBackgroundCompileTaskUpdateCallbackId {
PuzzleBackgroundCompileTaskUpdateCallbackId(self.imp.on_update(Box::new(callback)))
}
fn remove_on_update(&self, callback: PuzzleBackgroundCompileTaskUpdateCallbackId) {
self.imp.remove_on_update(callback.0)
}
}
/// Access to the `task_id` unique index on the table `puzzle_background_compile_task`,
/// which allows point queries on the field of the same name
/// via the [`PuzzleBackgroundCompileTaskTaskIdUnique::find`] method.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.puzzle_background_compile_task().task_id().find(...)`.
pub struct PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> {
imp: __sdk::UniqueConstraintHandle<PuzzleBackgroundCompileTaskRow, String>,
phantom: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
impl<'ctx> PuzzleBackgroundCompileTaskTableHandle<'ctx> {
/// Get a handle on the `task_id` unique index on the table `puzzle_background_compile_task`.
pub fn task_id(&self) -> PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> {
PuzzleBackgroundCompileTaskTaskIdUnique {
imp: self.imp.get_unique_constraint::<String>("task_id"),
phantom: std::marker::PhantomData,
}
}
}
impl<'ctx> PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> {
/// Find the subscribed row whose `task_id` column value is equal to `col_val`,
/// if such a row is present in the client cache.
pub fn find(&self, col_val: &String) -> Option<PuzzleBackgroundCompileTaskRow> {
self.imp.find(col_val)
}
}
#[doc(hidden)]
pub(super) fn register_table(client_cache: &mut __sdk::ClientCache<super::RemoteModule>) {
let _table = client_cache
.get_or_make_table::<PuzzleBackgroundCompileTaskRow>("puzzle_background_compile_task");
_table.add_unique_constraint::<String>("task_id", |row| &row.task_id);
}
#[doc(hidden)]
pub(super) fn parse_table_update(
raw_updates: __ws::v2::TableUpdate,
) -> __sdk::Result<__sdk::TableUpdate<PuzzleBackgroundCompileTaskRow>> {
__sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| {
__sdk::InternalError::failed_parse(
"TableUpdate<PuzzleBackgroundCompileTaskRow>",
"TableUpdate",
)
.with_cause(e)
.into()
})
}
#[allow(non_camel_case_types)]
/// Extension trait for query builder access to the table `PuzzleBackgroundCompileTaskRow`.
///
/// Implemented for [`__sdk::QueryTableAccessor`].
pub trait puzzle_background_compile_taskQueryTableAccess {
#[allow(non_snake_case)]
/// Get a query builder for the table `PuzzleBackgroundCompileTaskRow`.
fn puzzle_background_compile_task(
&self,
) -> __sdk::__query_builder::Table<PuzzleBackgroundCompileTaskRow>;
}
impl puzzle_background_compile_taskQueryTableAccess for __sdk::QueryTableAccessor {
fn puzzle_background_compile_task(
&self,
) -> __sdk::__query_builder::Table<PuzzleBackgroundCompileTaskRow> {
__sdk::__query_builder::Table::new("puzzle_background_compile_task")
}
}

View File

@@ -0,0 +1,62 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult;
use super::puzzle_background_compile_task_release_input_type::PuzzleBackgroundCompileTaskReleaseInput;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct ReleasePuzzleBackgroundCompileTaskArgs {
pub input: PuzzleBackgroundCompileTaskReleaseInput,
}
impl __sdk::InModule for ReleasePuzzleBackgroundCompileTaskArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `release_puzzle_background_compile_task`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait release_puzzle_background_compile_task {
fn release_puzzle_background_compile_task(
&self,
input: PuzzleBackgroundCompileTaskReleaseInput,
) {
self.release_puzzle_background_compile_task_then(input, |_, _| {});
}
fn release_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskReleaseInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
);
}
impl release_puzzle_background_compile_task for super::RemoteProcedures {
fn release_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskReleaseInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
) {
self.imp
.invoke_procedure_with_callback::<_, PuzzleBackgroundCompileTaskProcedureResult>(
"release_puzzle_background_compile_task",
ReleasePuzzleBackgroundCompileTaskArgs { input },
__callback,
);
}
}

View File

@@ -1,8 +1,10 @@
use super::*;
use crate::mapper::*;
use crate::module_bindings::claim_puzzle_background_compile_task_procedure::claim_puzzle_background_compile_task;
use crate::module_bindings::claim_puzzle_work_point_incentive_procedure::claim_puzzle_work_point_incentive;
use crate::module_bindings::delete_puzzle_work_procedure::delete_puzzle_work;
use crate::module_bindings::record_puzzle_work_like_procedure::record_puzzle_work_like;
use crate::module_bindings::release_puzzle_background_compile_task_procedure::release_puzzle_background_compile_task;
use crate::module_bindings::remix_puzzle_work_procedure::remix_puzzle_work;
use crate::module_bindings::save_puzzle_ui_background_procedure::save_puzzle_ui_background;
@@ -275,6 +277,67 @@ impl SpacetimeClient {
.await
}
pub async fn claim_puzzle_background_compile_task(
&self,
input: PuzzleBackgroundCompileTaskClaimRecordInput,
) -> Result<bool, SpacetimeClientError> {
let procedure_input = PuzzleBackgroundCompileTaskClaimInput {
task_id: input.task_id,
claim_id: input.claim_id,
session_id: input.session_id,
owner_user_id: input.owner_user_id,
claimed_at_micros: input.claimed_at_micros,
};
self.call_after_connect(
"claim_puzzle_background_compile_task",
move |connection, sender| {
connection
.procedures()
.claim_puzzle_background_compile_task_then(
procedure_input,
move |_, result| {
let mapped = result
.map_err(SpacetimeClientError::from_sdk_error)
.and_then(map_puzzle_background_compile_task_procedure_result);
send_once(&sender, mapped);
},
);
},
)
.await
}
pub async fn release_puzzle_background_compile_task(
&self,
input: PuzzleBackgroundCompileTaskReleaseRecordInput,
) -> Result<bool, SpacetimeClientError> {
let procedure_input = PuzzleBackgroundCompileTaskReleaseInput {
task_id: input.task_id,
claim_id: input.claim_id,
session_id: input.session_id,
owner_user_id: input.owner_user_id,
};
self.call_after_connect(
"release_puzzle_background_compile_task",
move |connection, sender| {
connection
.procedures()
.release_puzzle_background_compile_task_then(
procedure_input,
move |_, result| {
let mapped = result
.map_err(SpacetimeClientError::from_sdk_error)
.and_then(map_puzzle_background_compile_task_procedure_result);
send_once(&sender, mapped);
},
);
},
)
.await
}
pub async fn save_puzzle_generated_images(
&self,
input: PuzzleGeneratedImagesSaveRecordInput,

View File

@@ -20,8 +20,8 @@ use crate::match3d::tables::{
match_3_d_work_profile, match3d_agent_message, match3d_agent_session, match3d_runtime_run,
};
use crate::puzzle::{
puzzle_agent_message, puzzle_agent_session, puzzle_event, puzzle_leaderboard_entry,
puzzle_runtime_run, puzzle_work_profile,
puzzle_agent_message, puzzle_agent_session, puzzle_background_compile_task, puzzle_event,
puzzle_leaderboard_entry, puzzle_runtime_run, puzzle_work_profile,
};
use crate::puzzle_clear::tables::{
puzzle_clear_agent_session, puzzle_clear_event, puzzle_clear_runtime_run,
@@ -230,6 +230,7 @@ macro_rules! migration_tables {
asset_entity_binding,
asset_event,
puzzle_agent_session,
puzzle_background_compile_task,
puzzle_agent_message,
puzzle_work_profile,
puzzle_event,

View File

@@ -10,19 +10,20 @@ use module_puzzle::{
PUZZLE_NEXT_LEVEL_MODE_SIMILAR_WORKS, PuzzleAgentMessageFinalizeInput, PuzzleAgentMessageKind,
PuzzleAgentMessageRole, PuzzleAgentMessageSnapshot, PuzzleAgentSessionCreateInput,
PuzzleAgentSessionGetInput, PuzzleAgentSessionProcedureResult, PuzzleAgentSessionSnapshot,
PuzzleAgentStage, PuzzleAnchorPack, PuzzleDraftCompileFailureInput, PuzzleDraftCompileInput,
PuzzleFormDraftSaveInput, PuzzleGeneratedImageCandidate, PuzzleGeneratedImagesSaveInput,
PuzzleLeaderboardEntry, PuzzleLeaderboardSubmitInput, PuzzleLevelGenerationFailureInput,
PuzzlePublicationStatus, PuzzlePublishInput, PuzzleRecommendedNextWork, PuzzleResultDraft,
PuzzleRunDragInput, PuzzleRunGetInput, PuzzleRunNextLevelInput, PuzzleRunPauseInput,
PuzzleRunProcedureResult, PuzzleRunPropInput, PuzzleRunSnapshot, PuzzleRunStartInput,
PuzzleRunSwapInput, PuzzleRuntimeLevelStatus, PuzzleSelectCoverImageInput,
PuzzleUiBackgroundSaveInput, PuzzleWorkDeleteInput, PuzzleWorkGetInput,
PuzzleWorkLikeRecordInput as PuzzleWorkLikeInput, PuzzleWorkPointIncentiveClaimInput,
PuzzleWorkProcedureResult, PuzzleWorkProfile, PuzzleWorkRemixInput, PuzzleWorkUpsertInput,
PuzzleWorksListInput, PuzzleWorksProcedureResult, apply_publish_overrides_to_draft,
apply_selected_candidate, build_form_draft_from_seed, build_result_preview,
compile_result_draft_from_seed, create_work_profile, infer_anchor_pack,
PuzzleAgentStage, PuzzleAnchorPack, PuzzleBackgroundCompileTaskClaimInput,
PuzzleBackgroundCompileTaskProcedureResult, PuzzleBackgroundCompileTaskReleaseInput,
PuzzleDraftCompileFailureInput, PuzzleDraftCompileInput, PuzzleFormDraftSaveInput,
PuzzleGeneratedImageCandidate, PuzzleGeneratedImagesSaveInput, PuzzleLeaderboardEntry,
PuzzleLeaderboardSubmitInput, PuzzleLevelGenerationFailureInput, PuzzlePublicationStatus,
PuzzlePublishInput, PuzzleRecommendedNextWork, PuzzleResultDraft, PuzzleRunDragInput,
PuzzleRunGetInput, PuzzleRunNextLevelInput, PuzzleRunPauseInput, PuzzleRunProcedureResult,
PuzzleRunPropInput, PuzzleRunSnapshot, PuzzleRunStartInput, PuzzleRunSwapInput,
PuzzleRuntimeLevelStatus, PuzzleSelectCoverImageInput, PuzzleUiBackgroundSaveInput,
PuzzleWorkDeleteInput, PuzzleWorkGetInput, PuzzleWorkLikeRecordInput as PuzzleWorkLikeInput,
PuzzleWorkPointIncentiveClaimInput, PuzzleWorkProcedureResult, PuzzleWorkProfile,
PuzzleWorkRemixInput, PuzzleWorkUpsertInput, PuzzleWorksListInput, PuzzleWorksProcedureResult,
apply_publish_overrides_to_draft, apply_selected_candidate, build_form_draft_from_seed,
build_result_preview, compile_result_draft_from_seed, create_work_profile, infer_anchor_pack,
mark_failed_puzzle_result_draft_generation, normalize_puzzle_draft, normalize_puzzle_levels,
normalize_theme_tags, publish_work_profile, replace_puzzle_level, select_next_profiles,
selected_profile_level_after_runtime_level, selected_puzzle_level, tag_similarity_score,
@@ -40,6 +41,7 @@ use crate::auth::user_account;
use crate::validate_external_generation_job_lease_for_tx;
const PUZZLE_POINT_INCENTIVE_DEFAULT_U64: u64 = 0;
const PUZZLE_BACKGROUND_COMPILE_TASK_LEASE_MICROS: i64 = 30 * 60 * 1_000_000;
const WORK_VISIBLE_DEFAULT: bool = true;
const PUZZLE_EXTERNAL_GENERATION_SOURCE_MODULE: &str = "puzzle";
const PUZZLE_COMPILE_DRAFT_JOB_KIND: &str = "puzzle_compile_draft";
@@ -68,6 +70,22 @@ pub struct PuzzleAgentSessionRow {
updated_at: Timestamp,
}
/// 拼图首图后台编译活动任务表。
/// 中文注释:该表只保存跨 api-server 实例互斥 claim不表达最终生成结果。
#[spacetimedb::table(
accessor = puzzle_background_compile_task,
index(accessor = by_puzzle_background_compile_task_session_id, btree(columns = [session_id]))
)]
pub struct PuzzleBackgroundCompileTaskRow {
#[primary_key]
task_id: String,
claim_id: String,
session_id: String,
owner_user_id: String,
created_at: Timestamp,
updated_at: Timestamp,
}
/// 拼图 Agent 消息真相表。
#[spacetimedb::table(
accessor = puzzle_agent_message,
@@ -413,6 +431,43 @@ pub fn mark_puzzle_level_generation_failed(
}
}
#[spacetimedb::procedure]
pub fn claim_puzzle_background_compile_task(
ctx: &mut ProcedureContext,
input: PuzzleBackgroundCompileTaskClaimInput,
) -> PuzzleBackgroundCompileTaskProcedureResult {
match ctx.try_with_tx(|tx| claim_puzzle_background_compile_task_tx(tx, input.clone())) {
Ok(claimed) => PuzzleBackgroundCompileTaskProcedureResult {
ok: true,
claimed,
error_message: None,
},
Err(message) => PuzzleBackgroundCompileTaskProcedureResult {
ok: false,
claimed: false,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn release_puzzle_background_compile_task(
ctx: &mut ProcedureContext,
input: PuzzleBackgroundCompileTaskReleaseInput,
) -> PuzzleBackgroundCompileTaskProcedureResult {
match ctx.try_with_tx(|tx| release_puzzle_background_compile_task_tx(tx, input.clone())) {
Ok(released) => PuzzleBackgroundCompileTaskProcedureResult {
ok: true,
claimed: released,
error_message: None,
},
Err(message) => PuzzleBackgroundCompileTaskProcedureResult {
ok: false,
claimed: false,
error_message: Some(message),
},
}
}
/// 保存拼图入口表单草稿。
/// 中文注释:该 procedure 只更新 session 与创作中心草稿卡,不触发图片生成或发布校验。
#[spacetimedb::procedure]
@@ -1060,6 +1115,84 @@ fn compile_puzzle_agent_draft_tx(
)
}
fn claim_puzzle_background_compile_task_tx(
ctx: &TxContext,
input: PuzzleBackgroundCompileTaskClaimInput,
) -> Result<bool, String> {
let task_id = normalize_required_puzzle_task_field(&input.task_id, "拼图后台任务 ID")?;
let claim_id = normalize_required_puzzle_task_field(&input.claim_id, "拼图后台任务 claim ID")?;
let session_id = normalize_required_puzzle_task_field(&input.session_id, "拼图 session ID")?;
let owner_user_id = normalize_required_puzzle_task_field(&input.owner_user_id, "拼图用户 ID")?;
let claimed_at = Timestamp::from_micros_since_unix_epoch(input.claimed_at_micros);
get_owned_session_row(ctx, &session_id, &owner_user_id)?;
if let Some(existing) = ctx
.db
.puzzle_background_compile_task()
.task_id()
.find(&task_id)
{
if !is_stale_puzzle_background_compile_task(&existing, input.claimed_at_micros) {
return Ok(false);
}
ctx.db
.puzzle_background_compile_task()
.task_id()
.delete(&task_id);
}
ctx.db
.puzzle_background_compile_task()
.insert(PuzzleBackgroundCompileTaskRow {
task_id,
claim_id,
session_id,
owner_user_id,
created_at: claimed_at,
updated_at: claimed_at,
});
Ok(true)
}
fn release_puzzle_background_compile_task_tx(
ctx: &TxContext,
input: PuzzleBackgroundCompileTaskReleaseInput,
) -> Result<bool, String> {
let task_id = normalize_required_puzzle_task_field(&input.task_id, "拼图后台任务 ID")?;
let claim_id = normalize_required_puzzle_task_field(&input.claim_id, "拼图后台任务 claim ID")?;
let session_id = normalize_required_puzzle_task_field(&input.session_id, "拼图 session ID")?;
let owner_user_id = normalize_required_puzzle_task_field(&input.owner_user_id, "拼图用户 ID")?;
let Some(row) = ctx
.db
.puzzle_background_compile_task()
.task_id()
.find(&task_id)
else {
return Ok(false);
};
if row.session_id != session_id || row.owner_user_id != owner_user_id {
return Err("无权释放该拼图后台任务".to_string());
}
if row.claim_id != claim_id {
return Ok(false);
}
ctx.db
.puzzle_background_compile_task()
.task_id()
.delete(&task_id);
Ok(true)
}
fn is_stale_puzzle_background_compile_task(
row: &PuzzleBackgroundCompileTaskRow,
now_micros: i64,
) -> bool {
now_micros.saturating_sub(row.updated_at.to_micros_since_unix_epoch())
>= PUZZLE_BACKGROUND_COMPILE_TASK_LEASE_MICROS
}
fn mark_puzzle_draft_generation_failed_tx(
ctx: &TxContext,
input: PuzzleDraftCompileFailureInput,
@@ -3162,6 +3295,14 @@ fn get_owned_session_row(
Ok(row)
}
fn normalize_required_puzzle_task_field(value: &str, field_name: &str) -> Result<String, String> {
let normalized = value.trim();
if normalized.is_empty() {
return Err(format!("{field_name} 不能为空"));
}
Ok(normalized.to_string())
}
fn get_owned_run_row(
ctx: &TxContext,
run_id: &str,