修复资产计费边界风险
资产生成预扣费改为 fail-closed,避免钱包异常时继续调用外部生成 新增钱包退款 outbox,退款失败时本地落盘并后台重放 拼图首图后台任务改用 SpacetimeDB claim 表实现跨实例互斥 计费 ledger id 统一绑定 request_id,并让前端重试复用 x-request-id 同步 SpacetimeDB bindings、后端架构文档和 Hermes 决策记录
This commit is contained in:
@@ -4,7 +4,11 @@ use axum::http::StatusCode;
|
||||
use serde_json::json;
|
||||
use spacetime_client::SpacetimeClientError;
|
||||
|
||||
use crate::{http_error::AppError, state::AppState};
|
||||
use crate::{
|
||||
http_error::AppError,
|
||||
state::AppState,
|
||||
wallet_refund_outbox::{WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxRecord},
|
||||
};
|
||||
|
||||
pub(crate) const ASSET_OPERATION_POINTS_COST: u64 = 1;
|
||||
|
||||
@@ -90,22 +94,11 @@ async fn consume_asset_operation_points(
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
|
||||
// 中文注释:外部生图不应被 Maincloud 钱包短暂 503 阻断;此时跳过扣费,让业务链路继续,避免用户重复点击。
|
||||
tracing::warn!(
|
||||
owner_user_id,
|
||||
asset_kind,
|
||||
asset_id,
|
||||
error = %error,
|
||||
"资产操作泥点预扣因 SpacetimeDB 连接不可用而降级跳过"
|
||||
);
|
||||
Ok(false)
|
||||
}
|
||||
Err(error) => Err(map_asset_operation_wallet_error(error)),
|
||||
}
|
||||
}
|
||||
|
||||
/// 外部生成或发布 mutation 失败后补偿退款;退款失败只记日志,避免覆盖原始业务错误。
|
||||
/// 外部生成或发布 mutation 失败后补偿退款;立即退款失败会进入 outbox,避免覆盖原始业务错误。
|
||||
async fn refund_asset_operation_points(
|
||||
state: &AppState,
|
||||
owner_user_id: &str,
|
||||
@@ -117,22 +110,74 @@ async fn refund_asset_operation_points(
|
||||
"asset_operation_refund:{}:{}:{}",
|
||||
owner_user_id, asset_kind, asset_id
|
||||
);
|
||||
let created_at_micros = current_utc_micros();
|
||||
if let Err(error) = state
|
||||
.spacetime_client()
|
||||
.refund_profile_wallet_points(
|
||||
owner_user_id.to_string(),
|
||||
points_cost,
|
||||
ledger_id,
|
||||
current_utc_micros(),
|
||||
ledger_id.clone(),
|
||||
created_at_micros,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let refund_error = error.to_string();
|
||||
if let Some(outbox) = state.wallet_refund_outbox() {
|
||||
match outbox
|
||||
.enqueue(WalletRefundOutboxRecord {
|
||||
owner_user_id: owner_user_id.to_string(),
|
||||
amount: points_cost,
|
||||
ledger_id: ledger_id.clone(),
|
||||
created_at_micros,
|
||||
asset_kind: asset_kind.to_string(),
|
||||
asset_id: asset_id.to_string(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(WalletRefundOutboxEnqueueOutcome::Enqueued) => {
|
||||
tracing::warn!(
|
||||
owner_user_id,
|
||||
asset_kind,
|
||||
asset_id,
|
||||
ledger_id,
|
||||
error = %refund_error,
|
||||
"资产操作失败后的泥点退款立即执行失败,已写入 wallet refund outbox"
|
||||
);
|
||||
return;
|
||||
}
|
||||
Ok(WalletRefundOutboxEnqueueOutcome::Dropped { reason }) => {
|
||||
tracing::error!(
|
||||
owner_user_id,
|
||||
asset_kind,
|
||||
asset_id,
|
||||
ledger_id,
|
||||
reason,
|
||||
error = %refund_error,
|
||||
"资产操作失败后的泥点退款立即执行失败,且 wallet refund outbox 因容量限制丢弃"
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(outbox_error) => {
|
||||
tracing::error!(
|
||||
owner_user_id,
|
||||
asset_kind,
|
||||
asset_id,
|
||||
ledger_id,
|
||||
refund_error = %refund_error,
|
||||
outbox_error = %outbox_error,
|
||||
"资产操作失败后的泥点退款立即执行失败,且写入 wallet refund outbox 失败"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::error!(
|
||||
owner_user_id,
|
||||
asset_kind,
|
||||
asset_id,
|
||||
error = %error,
|
||||
"资产操作失败后的泥点退款失败"
|
||||
ledger_id,
|
||||
error = %refund_error,
|
||||
"资产操作失败后的泥点退款失败,且 wallet refund outbox 未启用"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -185,7 +230,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn asset_operation_billing_skips_spacetime_connectivity_errors() {
|
||||
fn asset_operation_connectivity_errors_are_classified_for_non_billing_fallbacks() {
|
||||
assert_eq!(ASSET_OPERATION_POINTS_COST, 1);
|
||||
assert!(should_skip_asset_operation_billing_for_connectivity(
|
||||
&SpacetimeClientError::ConnectDropped
|
||||
|
||||
@@ -306,11 +306,12 @@ pub async fn generate_bark_battle_image_asset(
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(ToString::to_string);
|
||||
let points_cost = resolve_bark_battle_image_asset_points_cost(&state, &payload).await;
|
||||
let billing_asset_id = request_context.request_id().to_string();
|
||||
let result = execute_billable_asset_operation_with_cost(
|
||||
&state,
|
||||
&owner_user_id,
|
||||
bark_battle_slot_asset_kind(&slot),
|
||||
asset_id.as_str(),
|
||||
billing_asset_id.as_str(),
|
||||
points_cost,
|
||||
async {
|
||||
generate_and_persist_bark_battle_image_asset(
|
||||
|
||||
@@ -722,7 +722,7 @@ pub async fn execute_big_fish_action(
|
||||
"big_fish_publish_game" => Some("big_fish_publish_game"),
|
||||
_ => None,
|
||||
};
|
||||
let billing_asset_id = format!("{session_id}:{now}");
|
||||
let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id());
|
||||
let session_operation = async {
|
||||
match action.as_str() {
|
||||
"big_fish_compile_draft" => {
|
||||
|
||||
@@ -32,6 +32,11 @@ pub struct AppConfig {
|
||||
pub tracking_outbox_batch_size: usize,
|
||||
pub tracking_outbox_flush_interval: Duration,
|
||||
pub tracking_outbox_max_bytes: u64,
|
||||
pub wallet_refund_outbox_enabled: bool,
|
||||
pub wallet_refund_outbox_dir: PathBuf,
|
||||
pub wallet_refund_outbox_batch_size: usize,
|
||||
pub wallet_refund_outbox_flush_interval: Duration,
|
||||
pub wallet_refund_outbox_max_bytes: u64,
|
||||
pub log_filter: String,
|
||||
pub otel_enabled: bool,
|
||||
pub admin_username: Option<String>,
|
||||
@@ -183,6 +188,11 @@ impl Default for AppConfig {
|
||||
tracking_outbox_batch_size: 500,
|
||||
tracking_outbox_flush_interval: Duration::from_millis(1_000),
|
||||
tracking_outbox_max_bytes: 256 * 1024 * 1024,
|
||||
wallet_refund_outbox_enabled: true,
|
||||
wallet_refund_outbox_dir: PathBuf::from("server-rs/.data/wallet-refund-outbox"),
|
||||
wallet_refund_outbox_batch_size: 100,
|
||||
wallet_refund_outbox_flush_interval: Duration::from_millis(1_000),
|
||||
wallet_refund_outbox_max_bytes: 64 * 1024 * 1024,
|
||||
log_filter: "info,tower_http=info".to_string(),
|
||||
otel_enabled: false,
|
||||
admin_username: None,
|
||||
@@ -409,6 +419,27 @@ impl AppConfig {
|
||||
{
|
||||
config.tracking_outbox_max_bytes = max_bytes;
|
||||
}
|
||||
if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED"]) {
|
||||
config.wallet_refund_outbox_enabled = enabled;
|
||||
}
|
||||
if let Some(dir) = read_first_non_empty_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_DIR"]) {
|
||||
config.wallet_refund_outbox_dir = PathBuf::from(dir);
|
||||
}
|
||||
if let Some(batch_size) =
|
||||
read_first_usize_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE"])
|
||||
{
|
||||
config.wallet_refund_outbox_batch_size = batch_size;
|
||||
}
|
||||
if let Some(flush_interval_ms) =
|
||||
read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS"])
|
||||
{
|
||||
config.wallet_refund_outbox_flush_interval = Duration::from_millis(flush_interval_ms);
|
||||
}
|
||||
if let Some(max_bytes) =
|
||||
read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES"])
|
||||
{
|
||||
config.wallet_refund_outbox_max_bytes = max_bytes;
|
||||
}
|
||||
if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) {
|
||||
config.otel_enabled = otel_enabled;
|
||||
}
|
||||
@@ -1380,6 +1411,11 @@ mod tests {
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES");
|
||||
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
|
||||
std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048");
|
||||
std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6");
|
||||
@@ -1396,6 +1432,14 @@ mod tests {
|
||||
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE", "250");
|
||||
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS", "2000");
|
||||
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES", "1048576");
|
||||
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED", "false");
|
||||
std::env::set_var(
|
||||
"GENARRATIVE_WALLET_REFUND_OUTBOX_DIR",
|
||||
"/tmp/genarrative-wallet-refund-outbox",
|
||||
);
|
||||
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE", "50");
|
||||
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS", "3000");
|
||||
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES", "524288");
|
||||
std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true");
|
||||
}
|
||||
|
||||
@@ -1421,6 +1465,17 @@ mod tests {
|
||||
std::time::Duration::from_millis(2_000)
|
||||
);
|
||||
assert_eq!(config.tracking_outbox_max_bytes, 1_048_576);
|
||||
assert!(!config.wallet_refund_outbox_enabled);
|
||||
assert_eq!(
|
||||
config.wallet_refund_outbox_dir,
|
||||
std::path::PathBuf::from("/tmp/genarrative-wallet-refund-outbox")
|
||||
);
|
||||
assert_eq!(config.wallet_refund_outbox_batch_size, 50);
|
||||
assert_eq!(
|
||||
config.wallet_refund_outbox_flush_interval,
|
||||
std::time::Duration::from_millis(3_000)
|
||||
);
|
||||
assert_eq!(config.wallet_refund_outbox_max_bytes, 524_288);
|
||||
assert!(config.otel_enabled);
|
||||
|
||||
unsafe {
|
||||
@@ -1436,6 +1491,11 @@ mod tests {
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS");
|
||||
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES");
|
||||
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -547,11 +547,12 @@ pub async fn generate_custom_world_scene_image(
|
||||
require_openai_image_settings(&state)
|
||||
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
|
||||
let asset_id = format!("custom-scene-{}", current_utc_millis());
|
||||
let billing_asset_id = request_context.request_id().to_string();
|
||||
let asset = execute_billable_asset_operation(
|
||||
&state,
|
||||
&owner_user_id,
|
||||
"scene_image",
|
||||
asset_id.as_str(),
|
||||
billing_asset_id.as_str(),
|
||||
async {
|
||||
let settings = require_openai_image_settings(&state)?.with_external_api_audit_context(
|
||||
&request_context,
|
||||
@@ -806,11 +807,12 @@ pub async fn generate_custom_world_cover_image(
|
||||
require_dashscope_settings(&state)
|
||||
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
|
||||
let asset_id = format!("custom-cover-{}", current_utc_millis());
|
||||
let billing_asset_id = request_context.request_id().to_string();
|
||||
let asset = execute_billable_asset_operation(
|
||||
&state,
|
||||
&owner_user_id,
|
||||
"custom_world_cover",
|
||||
asset_id.as_str(),
|
||||
billing_asset_id.as_str(),
|
||||
async {
|
||||
let settings = require_dashscope_settings(&state)?;
|
||||
let http_client = build_dashscope_http_client(&settings)?;
|
||||
@@ -1011,11 +1013,12 @@ pub async fn generate_custom_world_opening_cg(
|
||||
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
|
||||
|
||||
let opening_cg_id = normalized.opening_cg_id.clone();
|
||||
let billing_asset_id = request_context.request_id().to_string();
|
||||
let generated = execute_billable_asset_operation_with_cost(
|
||||
&state,
|
||||
&owner_user_id,
|
||||
"custom_world_opening_cg",
|
||||
opening_cg_id.as_str(),
|
||||
billing_asset_id.as_str(),
|
||||
OPENING_CG_POINTS_COST,
|
||||
async {
|
||||
let image_settings = require_openai_image_settings(&state)?
|
||||
|
||||
@@ -89,6 +89,7 @@ mod tracking_outbox;
|
||||
mod vector_engine_audio_generation;
|
||||
mod visual_novel;
|
||||
mod volcengine_speech;
|
||||
mod wallet_refund_outbox;
|
||||
mod wechat;
|
||||
mod wooden_fish;
|
||||
mod work_author;
|
||||
@@ -115,6 +116,7 @@ use crate::{
|
||||
config::AppConfig,
|
||||
state::{AppState, AppStateInitError},
|
||||
tracking_outbox::TrackingOutbox,
|
||||
wallet_refund_outbox::WalletRefundOutbox,
|
||||
};
|
||||
|
||||
const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024;
|
||||
@@ -125,6 +127,7 @@ const AUTH_STORE_STARTUP_RETRY_INTERVAL: Duration = Duration::from_secs(5);
|
||||
struct ShutdownContext {
|
||||
app_state: Option<AppState>,
|
||||
tracking_outbox: Option<Arc<TrackingOutbox>>,
|
||||
wallet_refund_outbox: Option<Arc<WalletRefundOutbox>>,
|
||||
outbox_flush_timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -178,11 +181,16 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
if let Some(outbox) = tracking_outbox.clone() {
|
||||
outbox.spawn_worker();
|
||||
}
|
||||
let wallet_refund_outbox = state.wallet_refund_outbox();
|
||||
if let Some(outbox) = wallet_refund_outbox.clone() {
|
||||
outbox.spawn_worker();
|
||||
}
|
||||
(
|
||||
build_router(state.clone()),
|
||||
ShutdownContext {
|
||||
app_state: Some(state),
|
||||
tracking_outbox,
|
||||
wallet_refund_outbox,
|
||||
outbox_flush_timeout,
|
||||
},
|
||||
)
|
||||
@@ -192,6 +200,7 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
ShutdownContext {
|
||||
app_state: None,
|
||||
tracking_outbox: None,
|
||||
wallet_refund_outbox: None,
|
||||
outbox_flush_timeout,
|
||||
},
|
||||
),
|
||||
@@ -271,12 +280,8 @@ async fn finalize_shutdown(context: ShutdownContext) {
|
||||
state.mark_not_ready();
|
||||
}
|
||||
|
||||
let Some(outbox) = context.tracking_outbox else {
|
||||
return;
|
||||
};
|
||||
|
||||
if context.outbox_flush_timeout.is_zero() {
|
||||
warn!("api-server 退出时 tracking outbox flush timeout 为 0,跳过主动 flush");
|
||||
warn!("api-server 退出时 outbox flush timeout 为 0,跳过主动 flush");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -284,22 +289,45 @@ async fn finalize_shutdown(context: ShutdownContext) {
|
||||
.outbox_flush_timeout
|
||||
.as_millis()
|
||||
.min(u128::from(u64::MAX)) as u64;
|
||||
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
|
||||
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
|
||||
Ok(Ok(())) => {
|
||||
info!("api-server 退出前 tracking outbox flush 完成");
|
||||
if let Some(outbox) = context.tracking_outbox {
|
||||
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
|
||||
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
|
||||
Ok(Ok(())) => {
|
||||
info!("api-server 退出前 tracking outbox flush 完成");
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(
|
||||
timeout_ms,
|
||||
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(
|
||||
timeout_ms,
|
||||
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(outbox) = context.wallet_refund_outbox {
|
||||
info!(timeout_ms, "api-server 退出前 flush wallet refund outbox");
|
||||
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
|
||||
Ok(Ok(())) => {
|
||||
info!("api-server 退出前 wallet refund outbox flush 完成");
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
"api-server 退出前 wallet refund outbox flush 未完成,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(
|
||||
timeout_ms,
|
||||
"api-server 退出前 wallet refund outbox flush 超时,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
convert::Infallible,
|
||||
future::Future,
|
||||
@@ -65,10 +65,7 @@ use spacetime_client::{
|
||||
|
||||
use crate::{
|
||||
api_response::json_success_body,
|
||||
asset_billing::{
|
||||
execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error,
|
||||
should_skip_asset_operation_billing_for_connectivity,
|
||||
},
|
||||
asset_billing::{execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error},
|
||||
auth::{AuthenticatedAccessToken, RuntimePrincipal},
|
||||
config::AppConfig,
|
||||
generated_asset_sheets::apply_generated_asset_sheet_green_screen_alpha,
|
||||
@@ -354,13 +351,6 @@ impl Match3DItemAssetsGenerationPlan {
|
||||
Self::Replace(plan) => plan.requested_item_names.len(),
|
||||
}
|
||||
}
|
||||
|
||||
fn billing_fingerprint_source(&self) -> String {
|
||||
match self {
|
||||
Self::Append(plan) => format!("append:{}", plan.requested_item_names.join("|")),
|
||||
Self::Replace(plan) => format!("replace:{}", plan.requested_item_names.join("|")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_match3d_generated_item_assets(assets: &[Match3DGeneratedItemAsset]) -> Option<String> {
|
||||
|
||||
@@ -162,7 +162,12 @@ pub(super) async fn compile_match3d_draft_for_session(
|
||||
let initial_tags = requested_tags
|
||||
.clone()
|
||||
.unwrap_or_else(|| fallback_work_metadata.tags.clone());
|
||||
let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, current_utc_micros());
|
||||
let billing_asset_id = format!(
|
||||
"{}:{}:{}",
|
||||
session_id,
|
||||
profile_id,
|
||||
request_context.request_id()
|
||||
);
|
||||
let points_cost = crate::creation_entry_config::resolve_creation_entry_mud_point_cost(
|
||||
state,
|
||||
"match3d",
|
||||
@@ -514,15 +519,6 @@ async fn consume_match3d_draft_generation_points(
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
|
||||
tracing::warn!(
|
||||
owner_user_id,
|
||||
billing_asset_id,
|
||||
error = %error,
|
||||
"抓大鹅草稿泥点预扣因 SpacetimeDB 连接不可用而降级跳过"
|
||||
);
|
||||
Ok(false)
|
||||
}
|
||||
Err(error) => Err(match3d_error_response(
|
||||
request_context,
|
||||
MATCH3D_AGENT_PROVIDER,
|
||||
|
||||
@@ -751,7 +751,6 @@ pub async fn generate_match3d_background_image_for_work(
|
||||
)?;
|
||||
let prompt = normalize_match3d_background_prompt(payload.prompt.as_str());
|
||||
ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?;
|
||||
let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str());
|
||||
|
||||
let context =
|
||||
load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id)
|
||||
@@ -763,7 +762,12 @@ pub async fn generate_match3d_background_image_for_work(
|
||||
config,
|
||||
assets,
|
||||
} = context;
|
||||
let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, prompt_fingerprint);
|
||||
let billing_asset_id = format!(
|
||||
"{}:{}:{}",
|
||||
session_id,
|
||||
profile_id,
|
||||
request_context.request_id()
|
||||
);
|
||||
let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost(
|
||||
&state,
|
||||
owner_user_id.as_str(),
|
||||
@@ -860,7 +864,6 @@ pub async fn generate_match3d_container_image_for_work(
|
||||
)?;
|
||||
let prompt = normalize_match3d_background_prompt(payload.prompt.as_str());
|
||||
ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?;
|
||||
let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str());
|
||||
|
||||
let context =
|
||||
load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id)
|
||||
@@ -874,7 +877,9 @@ pub async fn generate_match3d_container_image_for_work(
|
||||
} = context;
|
||||
let billing_asset_id = format!(
|
||||
"{}:{}:{}:container",
|
||||
session_id, profile_id, prompt_fingerprint
|
||||
session_id,
|
||||
profile_id,
|
||||
request_context.request_id()
|
||||
);
|
||||
let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost(
|
||||
&state,
|
||||
@@ -1017,7 +1022,7 @@ pub async fn generate_match3d_item_assets_for_work(
|
||||
session_id,
|
||||
profile_id,
|
||||
billed_item_count,
|
||||
build_match3d_prompt_fingerprint(generation_plan.billing_fingerprint_source().as_str())
|
||||
request_context.request_id()
|
||||
);
|
||||
let generated_assets = execute_billable_asset_operation_with_cost(
|
||||
&state,
|
||||
|
||||
@@ -1208,14 +1208,6 @@ pub(super) fn normalize_match3d_background_prompt(raw: &str) -> String {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub(super) fn build_match3d_prompt_fingerprint(value: &str) -> String {
|
||||
let mut hash = 0u32;
|
||||
for character in value.chars() {
|
||||
hash = hash.wrapping_mul(31).wrapping_add(character as u32);
|
||||
}
|
||||
format!("{hash:08x}")
|
||||
}
|
||||
|
||||
pub(super) fn build_fallback_match3d_background_prompt(config: &Match3DConfigJson) -> String {
|
||||
let theme = config.theme_text.trim();
|
||||
let normalized_theme = if theme.is_empty() { "抓大鹅" } else { theme };
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
sync::{Mutex, OnceLock},
|
||||
collections::BTreeMap,
|
||||
time::{Instant, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
@@ -56,17 +55,19 @@ use spacetime_client::{
|
||||
PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput,
|
||||
PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord,
|
||||
PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord,
|
||||
PuzzleAudioAssetRecord, PuzzleCreatorIntentRecord, PuzzleDraftCompileFailureRecordInput,
|
||||
PuzzleDraftLevelRecord, PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput,
|
||||
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
|
||||
PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput,
|
||||
PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord,
|
||||
PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord, PuzzleRunDragRecordInput,
|
||||
PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord,
|
||||
PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleSelectCoverImageRecordInput,
|
||||
PuzzleUiBackgroundSaveRecordInput, PuzzleWorkLikeReportRecordInput,
|
||||
PuzzleWorkPointIncentiveClaimRecordInput, PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput,
|
||||
PuzzleWorkUpsertRecordInput, SpacetimeClientError,
|
||||
PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
|
||||
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleCreatorIntentRecord,
|
||||
PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord,
|
||||
PuzzleFormDraftSaveRecordInput, PuzzleGeneratedImageCandidateRecord,
|
||||
PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord,
|
||||
PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord,
|
||||
PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord,
|
||||
PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunPauseRecordInput,
|
||||
PuzzleRunPropRecordInput, PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput,
|
||||
PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput,
|
||||
PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput,
|
||||
PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput,
|
||||
SpacetimeClientError,
|
||||
};
|
||||
use std::convert::Infallible;
|
||||
|
||||
@@ -134,38 +135,51 @@ const PUZZLE_UI_BACKGROUND_PROMPT_FALLBACK_MARKER: &str =
|
||||
const PUZZLE_VECTOR_ENGINE_SQUARE_IMAGE_SIZE: &str = "1024x1024";
|
||||
const PUZZLE_VECTOR_ENGINE_PORTRAIT_IMAGE_SIZE: &str = "1024x1536";
|
||||
|
||||
static PUZZLE_BACKGROUND_COMPILE_TASKS: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
|
||||
|
||||
fn puzzle_background_compile_tasks() -> &'static Mutex<HashSet<String>> {
|
||||
PUZZLE_BACKGROUND_COMPILE_TASKS.get_or_init(|| Mutex::new(HashSet::new()))
|
||||
fn build_puzzle_background_compile_task_id(session_id: &str) -> String {
|
||||
format!("puzzle_initial_background:{session_id}")
|
||||
}
|
||||
|
||||
fn try_register_puzzle_background_compile_task(session_id: &str) -> bool {
|
||||
match puzzle_background_compile_tasks().lock() {
|
||||
Ok(mut tasks) => tasks.insert(session_id.to_string()),
|
||||
Err(error) => {
|
||||
fn build_puzzle_background_compile_claim_id(task_id: &str, request_id: &str) -> String {
|
||||
format!("{task_id}:{request_id}")
|
||||
}
|
||||
|
||||
async fn release_claimed_puzzle_background_compile_task(
|
||||
state: &PuzzleApiState,
|
||||
task_id: &str,
|
||||
claim_id: &str,
|
||||
session_id: &str,
|
||||
owner_user_id: &str,
|
||||
) {
|
||||
let result = state
|
||||
.spacetime_client()
|
||||
.release_puzzle_background_compile_task(PuzzleBackgroundCompileTaskReleaseRecordInput {
|
||||
task_id: task_id.to_string(),
|
||||
claim_id: claim_id.to_string(),
|
||||
session_id: session_id.to_string(),
|
||||
owner_user_id: owner_user_id.to_string(),
|
||||
})
|
||||
.await;
|
||||
match result {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
tracing::warn!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
task_id,
|
||||
claim_id,
|
||||
session_id,
|
||||
error = %error,
|
||||
"拼图后台生成任务注册表锁已损坏,允许本次任务继续"
|
||||
owner_user_id,
|
||||
"拼图首图后台生成任务释放未命中当前 claim"
|
||||
);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn unregister_puzzle_background_compile_task(session_id: &str) {
|
||||
match puzzle_background_compile_tasks().lock() {
|
||||
Ok(mut tasks) => {
|
||||
tasks.remove(session_id);
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::warn!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
task_id,
|
||||
claim_id,
|
||||
session_id,
|
||||
owner_user_id,
|
||||
error = %error,
|
||||
"拼图后台生成任务注册表解锁失败,忽略清理"
|
||||
"拼图首图后台生成任务释放失败"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -588,7 +588,7 @@ pub async fn execute_puzzle_agent_action(
|
||||
let owner_user_id = authenticated.claims().user_id().to_string();
|
||||
let now = current_utc_micros();
|
||||
let action = payload.action.trim().to_string();
|
||||
let billing_asset_id = format!("{session_id}:{now}");
|
||||
let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id());
|
||||
let mut operation_consumed_points = 0;
|
||||
tracing::info!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
@@ -695,156 +695,207 @@ pub async fn execute_puzzle_agent_action(
|
||||
Err(response) => return Err(response),
|
||||
};
|
||||
let session = if ai_redraw {
|
||||
if !try_register_puzzle_background_compile_task(&compile_session_id) {
|
||||
tracing::info!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %compile_session_id,
|
||||
owner_user_id = %owner_user_id,
|
||||
"拼图首图后台生成任务已存在,本次 action 直接返回生成中会话"
|
||||
);
|
||||
state
|
||||
.spacetime_client()
|
||||
.get_puzzle_agent_session(compile_session_id.clone(), owner_user_id.clone())
|
||||
.await
|
||||
.map(mark_puzzle_initial_generation_started_snapshot)
|
||||
.map_err(map_puzzle_client_error)
|
||||
} else {
|
||||
let compiled_session = state
|
||||
.spacetime_client()
|
||||
.compile_puzzle_agent_draft(
|
||||
compile_session_id.clone(),
|
||||
owner_user_id.clone(),
|
||||
now,
|
||||
)
|
||||
.await
|
||||
.map_err(map_puzzle_compile_error);
|
||||
match compiled_session {
|
||||
Ok(compiled_session) => {
|
||||
let response_session = mark_puzzle_initial_generation_started_snapshot(
|
||||
compiled_session.clone(),
|
||||
);
|
||||
let background_state = state.clone();
|
||||
let background_request_context = request_context.clone();
|
||||
let background_session_id = compile_session_id.clone();
|
||||
let background_owner_user_id = owner_user_id.clone();
|
||||
let background_prompt_text = prompt_text.map(str::to_string);
|
||||
let background_reference_image_src =
|
||||
primary_reference_image_src.map(str::to_string);
|
||||
let background_image_model = payload.image_model.clone();
|
||||
let background_points_cost = puzzle_draft_generation_points_cost;
|
||||
let background_work_name = compiled_session
|
||||
.draft
|
||||
.as_ref()
|
||||
.map(|draft| draft.work_title.clone());
|
||||
let background_billing_asset_id =
|
||||
format!("{background_session_id}:compile_puzzle_draft");
|
||||
tokio::spawn(async move {
|
||||
let operation_owner_user_id = background_owner_user_id.clone();
|
||||
let background_root_state = background_state.root_state().clone();
|
||||
let operation_state = background_state.clone();
|
||||
let result = execute_billable_asset_operation_with_cost(
|
||||
&background_root_state,
|
||||
&background_owner_user_id,
|
||||
"puzzle_initial_image",
|
||||
&background_billing_asset_id,
|
||||
background_points_cost,
|
||||
async move {
|
||||
generate_puzzle_initial_cover_from_compiled_session(
|
||||
&operation_state,
|
||||
&background_request_context,
|
||||
compiled_session,
|
||||
operation_owner_user_id,
|
||||
background_prompt_text.as_deref(),
|
||||
background_reference_image_src.as_deref(),
|
||||
background_image_model.as_deref(),
|
||||
current_utc_micros(),
|
||||
)
|
||||
.await
|
||||
},
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(session) => {
|
||||
send_generation_result_subscribe_message_after_completion(
|
||||
&background_root_state,
|
||||
GenerationResultSubscribeMessage {
|
||||
owner_user_id: background_owner_user_id.clone(),
|
||||
task_name: Some("拼图".to_string()),
|
||||
work_name: session
|
||||
.draft
|
||||
.as_ref()
|
||||
.map(|draft| draft.work_title.clone()),
|
||||
status:
|
||||
GenerationResultSubscribeMessageStatus::Succeeded,
|
||||
consumed_points: background_points_cost,
|
||||
completed_at_micros: current_utc_micros(),
|
||||
page: Some("/pages/web-view/index".to_string()),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
tracing::info!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %session.session_id,
|
||||
owner_user_id = %background_owner_user_id,
|
||||
"拼图首图后台生成任务完成"
|
||||
);
|
||||
}
|
||||
Err(error) => {
|
||||
let error_message = error.body_text();
|
||||
let failed_at_micros = current_utc_micros();
|
||||
let failure_result = background_state
|
||||
.spacetime_client()
|
||||
.mark_puzzle_draft_generation_failed(
|
||||
PuzzleDraftCompileFailureRecordInput {
|
||||
session_id: background_session_id.clone(),
|
||||
owner_user_id: background_owner_user_id.clone(),
|
||||
error_message: error_message.clone(),
|
||||
failed_at_micros,
|
||||
},
|
||||
let background_task_id =
|
||||
build_puzzle_background_compile_task_id(&compile_session_id);
|
||||
let background_claim_id = build_puzzle_background_compile_claim_id(
|
||||
&background_task_id,
|
||||
request_context.request_id(),
|
||||
);
|
||||
let claim_result = state
|
||||
.spacetime_client()
|
||||
.claim_puzzle_background_compile_task(
|
||||
PuzzleBackgroundCompileTaskClaimRecordInput {
|
||||
task_id: background_task_id.clone(),
|
||||
claim_id: background_claim_id.clone(),
|
||||
session_id: compile_session_id.clone(),
|
||||
owner_user_id: owner_user_id.clone(),
|
||||
claimed_at_micros: current_utc_micros(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(map_puzzle_client_error);
|
||||
match claim_result {
|
||||
Ok(false) => {
|
||||
tracing::info!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %compile_session_id,
|
||||
owner_user_id = %owner_user_id,
|
||||
task_id = %background_task_id,
|
||||
"拼图首图后台生成任务已存在,本次 action 直接返回生成中会话"
|
||||
);
|
||||
state
|
||||
.spacetime_client()
|
||||
.get_puzzle_agent_session(
|
||||
compile_session_id.clone(),
|
||||
owner_user_id.clone(),
|
||||
)
|
||||
.await
|
||||
.map(mark_puzzle_initial_generation_started_snapshot)
|
||||
.map_err(map_puzzle_client_error)
|
||||
}
|
||||
Ok(true) => {
|
||||
let compiled_session = state
|
||||
.spacetime_client()
|
||||
.compile_puzzle_agent_draft(
|
||||
compile_session_id.clone(),
|
||||
owner_user_id.clone(),
|
||||
now,
|
||||
)
|
||||
.await
|
||||
.map_err(map_puzzle_compile_error);
|
||||
match compiled_session {
|
||||
Ok(compiled_session) => {
|
||||
let response_session =
|
||||
mark_puzzle_initial_generation_started_snapshot(
|
||||
compiled_session.clone(),
|
||||
);
|
||||
let background_state = state.clone();
|
||||
let background_request_context = request_context.clone();
|
||||
let background_session_id = compile_session_id.clone();
|
||||
let background_owner_user_id = owner_user_id.clone();
|
||||
let background_task_id = background_task_id.clone();
|
||||
let background_claim_id = background_claim_id.clone();
|
||||
let background_prompt_text = prompt_text.map(str::to_string);
|
||||
let background_reference_image_src =
|
||||
primary_reference_image_src.map(str::to_string);
|
||||
let background_image_model = payload.image_model.clone();
|
||||
let background_points_cost = puzzle_draft_generation_points_cost;
|
||||
let background_work_name = compiled_session
|
||||
.draft
|
||||
.as_ref()
|
||||
.map(|draft| draft.work_title.clone());
|
||||
let background_billing_asset_id = format!(
|
||||
"{background_session_id}:compile_puzzle_draft:{}",
|
||||
background_request_context.request_id()
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
let operation_owner_user_id = background_owner_user_id.clone();
|
||||
let background_root_state =
|
||||
background_state.root_state().clone();
|
||||
let operation_state = background_state.clone();
|
||||
let result = execute_billable_asset_operation_with_cost(
|
||||
&background_root_state,
|
||||
&background_owner_user_id,
|
||||
"puzzle_initial_image",
|
||||
&background_billing_asset_id,
|
||||
background_points_cost,
|
||||
async move {
|
||||
generate_puzzle_initial_cover_from_compiled_session(
|
||||
&operation_state,
|
||||
&background_request_context,
|
||||
compiled_session,
|
||||
operation_owner_user_id,
|
||||
background_prompt_text.as_deref(),
|
||||
background_reference_image_src.as_deref(),
|
||||
background_image_model.as_deref(),
|
||||
current_utc_micros(),
|
||||
)
|
||||
.await;
|
||||
if let Err(mark_error) = failure_result {
|
||||
tracing::warn!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %background_session_id,
|
||||
owner_user_id = %background_owner_user_id,
|
||||
message = %mark_error,
|
||||
"拼图首图后台生成失败态回写失败"
|
||||
);
|
||||
} else {
|
||||
.await
|
||||
},
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(session) => {
|
||||
send_generation_result_subscribe_message_after_completion(
|
||||
&background_root_state,
|
||||
GenerationResultSubscribeMessage {
|
||||
owner_user_id: background_owner_user_id.clone(),
|
||||
task_name: Some("拼图".to_string()),
|
||||
work_name: background_work_name.clone(),
|
||||
work_name: session
|
||||
.draft
|
||||
.as_ref()
|
||||
.map(|draft| draft.work_title.clone()),
|
||||
status:
|
||||
GenerationResultSubscribeMessageStatus::Failed,
|
||||
consumed_points: 0,
|
||||
completed_at_micros: failed_at_micros,
|
||||
GenerationResultSubscribeMessageStatus::Succeeded,
|
||||
consumed_points: background_points_cost,
|
||||
completed_at_micros: current_utc_micros(),
|
||||
page: Some("/pages/web-view/index".to_string()),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
tracing::info!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %session.session_id,
|
||||
owner_user_id = %background_owner_user_id,
|
||||
"拼图首图后台生成任务完成"
|
||||
);
|
||||
}
|
||||
Err(error) => {
|
||||
let error_message = error.body_text();
|
||||
let failed_at_micros = current_utc_micros();
|
||||
let failure_result = background_state
|
||||
.spacetime_client()
|
||||
.mark_puzzle_draft_generation_failed(
|
||||
PuzzleDraftCompileFailureRecordInput {
|
||||
session_id: background_session_id.clone(),
|
||||
owner_user_id: background_owner_user_id
|
||||
.clone(),
|
||||
error_message: error_message.clone(),
|
||||
failed_at_micros,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
if let Err(mark_error) = failure_result {
|
||||
tracing::warn!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %background_session_id,
|
||||
owner_user_id = %background_owner_user_id,
|
||||
message = %mark_error,
|
||||
"拼图首图后台生成失败态回写失败"
|
||||
);
|
||||
} else {
|
||||
send_generation_result_subscribe_message_after_completion(
|
||||
&background_root_state,
|
||||
GenerationResultSubscribeMessage {
|
||||
owner_user_id: background_owner_user_id
|
||||
.clone(),
|
||||
task_name: Some("拼图".to_string()),
|
||||
work_name: background_work_name.clone(),
|
||||
status:
|
||||
GenerationResultSubscribeMessageStatus::Failed,
|
||||
consumed_points: 0,
|
||||
completed_at_micros: failed_at_micros,
|
||||
page: Some(
|
||||
"/pages/web-view/index".to_string(),
|
||||
),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
tracing::warn!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %background_session_id,
|
||||
owner_user_id = %background_owner_user_id,
|
||||
message = %error_message,
|
||||
"拼图首图后台生成任务失败"
|
||||
);
|
||||
}
|
||||
tracing::warn!(
|
||||
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
|
||||
session_id = %background_session_id,
|
||||
owner_user_id = %background_owner_user_id,
|
||||
message = %error_message,
|
||||
"拼图首图后台生成任务失败"
|
||||
);
|
||||
}
|
||||
}
|
||||
unregister_puzzle_background_compile_task(&background_session_id);
|
||||
});
|
||||
Ok(response_session)
|
||||
}
|
||||
Err(error) => {
|
||||
unregister_puzzle_background_compile_task(&compile_session_id);
|
||||
Err(error)
|
||||
release_claimed_puzzle_background_compile_task(
|
||||
&background_state,
|
||||
&background_task_id,
|
||||
&background_claim_id,
|
||||
&background_session_id,
|
||||
&background_owner_user_id,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
Ok(response_session)
|
||||
}
|
||||
Err(error) => {
|
||||
release_claimed_puzzle_background_compile_task(
|
||||
&state,
|
||||
&background_task_id,
|
||||
&background_claim_id,
|
||||
&compile_session_id,
|
||||
&owner_user_id,
|
||||
)
|
||||
.await;
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => Err(error),
|
||||
}
|
||||
} else {
|
||||
compile_puzzle_draft_with_uploaded_cover(
|
||||
@@ -2231,7 +2282,7 @@ pub async fn use_puzzle_runtime_prop(
|
||||
}
|
||||
};
|
||||
let should_sync_freeze_boundary = matches!(prop_kind.as_str(), "freezeTime" | "freeze_time");
|
||||
let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, current_utc_micros());
|
||||
let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, request_context.request_id());
|
||||
let reducer_owner_user_id = owner_user_id.clone();
|
||||
let reducer_run_id = run_id.clone();
|
||||
let fallback_run_id = run_id.clone();
|
||||
|
||||
@@ -41,6 +41,7 @@ use tracing::{info, warn};
|
||||
use crate::config::AppConfig;
|
||||
use crate::puzzle_gallery_cache::PuzzleGalleryCache;
|
||||
use crate::tracking_outbox::TrackingOutbox;
|
||||
use crate::wallet_refund_outbox::WalletRefundOutbox;
|
||||
use crate::wechat::pay::{build_wechat_pay_config, map_wechat_pay_init_error};
|
||||
use crate::wechat::provider::build_wechat_provider;
|
||||
use crate::work_author::{
|
||||
@@ -263,6 +264,7 @@ pub struct AppStateInner {
|
||||
spacetime_client: SpacetimeClient,
|
||||
puzzle_gallery_cache: PuzzleGalleryCache,
|
||||
tracking_outbox: Option<Arc<TrackingOutbox>>,
|
||||
wallet_refund_outbox: Option<Arc<WalletRefundOutbox>>,
|
||||
llm_client: Option<LlmClient>,
|
||||
creative_agent_gpt5_client: Option<LlmClient>,
|
||||
creative_agent_executor: Arc<MockLangChainRustAgentExecutor>,
|
||||
@@ -406,6 +408,8 @@ impl AppState {
|
||||
procedure_timeout: config.spacetime_procedure_timeout,
|
||||
});
|
||||
let tracking_outbox = TrackingOutbox::from_config(&config, spacetime_client.clone());
|
||||
let wallet_refund_outbox =
|
||||
WalletRefundOutbox::from_config(&config, spacetime_client.clone());
|
||||
let llm_client = build_llm_client(&config)?;
|
||||
let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?;
|
||||
let http_request_permit_pools = HttpRequestPermitPools::from_config(&config);
|
||||
@@ -441,6 +445,7 @@ impl AppState {
|
||||
spacetime_client,
|
||||
puzzle_gallery_cache: PuzzleGalleryCache::new(),
|
||||
tracking_outbox,
|
||||
wallet_refund_outbox,
|
||||
llm_client,
|
||||
creative_agent_gpt5_client,
|
||||
creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor),
|
||||
@@ -922,6 +927,10 @@ impl AppState {
|
||||
self.tracking_outbox.clone()
|
||||
}
|
||||
|
||||
pub fn wallet_refund_outbox(&self) -> Option<Arc<WalletRefundOutbox>> {
|
||||
self.wallet_refund_outbox.clone()
|
||||
}
|
||||
|
||||
pub fn llm_client(&self) -> Option<&LlmClient> {
|
||||
self.llm_client.as_ref()
|
||||
}
|
||||
|
||||
463
server-rs/crates/api-server/src/wallet_refund_outbox.rs
Normal file
463
server-rs/crates/api-server/src/wallet_refund_outbox.rs
Normal file
@@ -0,0 +1,463 @@
|
||||
use std::{
|
||||
fmt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
use spacetime_client::{SpacetimeClient, SpacetimeClientError};
|
||||
use tokio::{
|
||||
fs::{self, File, OpenOptions},
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::{Mutex, Notify},
|
||||
time::sleep,
|
||||
};
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::config::AppConfig;
|
||||
|
||||
const PENDING_FILE_PREFIX: &str = "refund-";
|
||||
const CORRUPT_FILE_PREFIX: &str = "corrupt-";
|
||||
const TEMP_FILE_PREFIX: &str = "tmp-";
|
||||
const OUTBOX_FILE_EXTENSION: &str = ".json";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WalletRefundOutbox {
|
||||
dir: PathBuf,
|
||||
batch_size: usize,
|
||||
flush_interval: Duration,
|
||||
max_bytes: u64,
|
||||
spacetime_client: SpacetimeClient,
|
||||
enqueue_lock: Arc<Mutex<()>>,
|
||||
flush_notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub(crate) struct WalletRefundOutboxRecord {
|
||||
pub owner_user_id: String,
|
||||
pub amount: u64,
|
||||
pub ledger_id: String,
|
||||
pub created_at_micros: i64,
|
||||
pub asset_kind: String,
|
||||
pub asset_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WalletRefundOutboxEnqueueOutcome {
|
||||
Enqueued,
|
||||
Dropped { reason: &'static str },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WalletRefundOutboxError {
|
||||
Io(std::io::Error),
|
||||
Json(serde_json::Error),
|
||||
Spacetime(SpacetimeClientError),
|
||||
}
|
||||
|
||||
impl WalletRefundOutbox {
|
||||
pub fn from_config(config: &AppConfig, spacetime_client: SpacetimeClient) -> Option<Arc<Self>> {
|
||||
if !config.wallet_refund_outbox_enabled {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(Arc::new(Self {
|
||||
dir: config.wallet_refund_outbox_dir.clone(),
|
||||
batch_size: config.wallet_refund_outbox_batch_size.max(1),
|
||||
flush_interval: config.wallet_refund_outbox_flush_interval,
|
||||
max_bytes: config.wallet_refund_outbox_max_bytes,
|
||||
spacetime_client,
|
||||
enqueue_lock: Arc::new(Mutex::new(())),
|
||||
flush_notify: Arc::new(Notify::new()),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn enqueue(
|
||||
&self,
|
||||
record: WalletRefundOutboxRecord,
|
||||
) -> Result<WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxError> {
|
||||
let _guard = self.enqueue_lock.lock().await;
|
||||
fs::create_dir_all(&self.dir).await?;
|
||||
|
||||
let pending_path = self.pending_path_for_ledger(&record.ledger_id);
|
||||
if fs::metadata(&pending_path).await.is_ok() {
|
||||
self.flush_notify.notify_one();
|
||||
return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued);
|
||||
}
|
||||
|
||||
let bytes = serde_json::to_vec(&record)?;
|
||||
let line_bytes = bytes.len().min(u64::MAX as usize) as u64;
|
||||
let current_bytes = directory_size_if_exists(&self.dir).unwrap_or(0);
|
||||
if current_bytes.saturating_add(line_bytes) > self.max_bytes {
|
||||
return Ok(WalletRefundOutboxEnqueueOutcome::Dropped {
|
||||
reason: "max_bytes",
|
||||
});
|
||||
}
|
||||
|
||||
let temp_path = self.temp_path();
|
||||
let mut file = OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&temp_path)
|
||||
.await?;
|
||||
file.write_all(&bytes).await?;
|
||||
file.flush().await?;
|
||||
file.sync_data().await?;
|
||||
drop(file);
|
||||
if fs::metadata(&pending_path).await.is_ok() {
|
||||
let _ = fs::remove_file(&temp_path).await;
|
||||
self.flush_notify.notify_one();
|
||||
return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued);
|
||||
}
|
||||
fs::rename(&temp_path, &pending_path).await?;
|
||||
sync_directory_metadata(&self.dir).await?;
|
||||
self.flush_notify.notify_one();
|
||||
Ok(WalletRefundOutboxEnqueueOutcome::Enqueued)
|
||||
}
|
||||
|
||||
pub fn spawn_worker(self: Arc<Self>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = sleep(self.flush_interval) => {
|
||||
if let Err(error) = self.flush_pending_files_once().await {
|
||||
warn!(error = %error, "wallet refund outbox 重放退款失败,将保留文件等待重试");
|
||||
}
|
||||
}
|
||||
_ = self.flush_notify.notified() => {
|
||||
if let Err(error) = self.flush_pending_files_once().await {
|
||||
warn!(error = %error, "wallet refund outbox 主动重放退款失败,将保留文件等待重试");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn flush_for_shutdown(&self) -> Result<(), WalletRefundOutboxError> {
|
||||
self.flush_pending_files_once().await
|
||||
}
|
||||
|
||||
async fn flush_pending_files_once(&self) -> Result<(), WalletRefundOutboxError> {
|
||||
fs::create_dir_all(&self.dir).await?;
|
||||
let pending_files = self.list_pending_files().await?;
|
||||
for path in pending_files.into_iter().take(self.batch_size) {
|
||||
let record = match read_refund_record(&path).await {
|
||||
Ok(record) => record,
|
||||
Err(error) if error.is_data_corruption() => {
|
||||
let corrupt_path = self.corrupt_path_for(&path);
|
||||
fs::rename(&path, &corrupt_path).await?;
|
||||
sync_directory_metadata(&self.dir).await?;
|
||||
warn!(
|
||||
error = %error,
|
||||
source = %path.display(),
|
||||
target = %corrupt_path.display(),
|
||||
"wallet refund outbox 文件无法解析,已隔离"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
|
||||
match self
|
||||
.spacetime_client
|
||||
.refund_profile_wallet_points(
|
||||
record.owner_user_id.clone(),
|
||||
record.amount,
|
||||
record.ledger_id.clone(),
|
||||
record.created_at_micros,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
match fs::remove_file(&path).await {
|
||||
Ok(()) => {}
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(error) => return Err(error.into()),
|
||||
}
|
||||
sync_directory_metadata(&self.dir).await?;
|
||||
debug!(
|
||||
ledger_id = %record.ledger_id,
|
||||
owner_user_id = %record.owner_user_id,
|
||||
asset_kind = %record.asset_kind,
|
||||
asset_id = %record.asset_id,
|
||||
path = %path.display(),
|
||||
"wallet refund outbox 退款已重放并删除文件"
|
||||
);
|
||||
}
|
||||
Err(error) => return Err(WalletRefundOutboxError::Spacetime(error)),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_pending_files(&self) -> Result<Vec<PathBuf>, WalletRefundOutboxError> {
|
||||
let mut entries = fs::read_dir(&self.dir).await?;
|
||||
let mut files = Vec::new();
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
let Some(name) = path.file_name().and_then(|value| value.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
if name.starts_with(PENDING_FILE_PREFIX) && name.ends_with(OUTBOX_FILE_EXTENSION) {
|
||||
files.push(path);
|
||||
}
|
||||
}
|
||||
files.sort();
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
fn pending_path_for_ledger(&self, ledger_id: &str) -> PathBuf {
|
||||
self.dir.join(format!(
|
||||
"{PENDING_FILE_PREFIX}{}{OUTBOX_FILE_EXTENSION}",
|
||||
ledger_id_hash(ledger_id)
|
||||
))
|
||||
}
|
||||
|
||||
fn temp_path(&self) -> PathBuf {
|
||||
self.dir.join(format!(
|
||||
"{TEMP_FILE_PREFIX}{}-{uuid}{OUTBOX_FILE_EXTENSION}",
|
||||
current_unix_micros(),
|
||||
uuid = uuid::Uuid::new_v4()
|
||||
))
|
||||
}
|
||||
|
||||
fn corrupt_path_for(&self, path: &Path) -> PathBuf {
|
||||
let name = path
|
||||
.file_name()
|
||||
.and_then(|value| value.to_str())
|
||||
.unwrap_or("unknown.json");
|
||||
self.dir.join(format!(
|
||||
"{CORRUPT_FILE_PREFIX}{}-{uuid}-{name}",
|
||||
current_unix_micros(),
|
||||
uuid = uuid::Uuid::new_v4()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for WalletRefundOutbox {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("WalletRefundOutbox")
|
||||
.field("dir", &self.dir)
|
||||
.field("batch_size", &self.batch_size)
|
||||
.field("flush_interval", &self.flush_interval)
|
||||
.field("max_bytes", &self.max_bytes)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for WalletRefundOutboxError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Io(error) => write!(f, "{error}"),
|
||||
Self::Json(error) => write!(f, "{error}"),
|
||||
Self::Spacetime(error) => write!(f, "{error}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for WalletRefundOutboxError {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
Self::Io(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for WalletRefundOutboxError {
|
||||
fn from(value: serde_json::Error) -> Self {
|
||||
Self::Json(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl WalletRefundOutboxError {
|
||||
fn is_data_corruption(&self) -> bool {
|
||||
matches!(self, Self::Json(_))
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_refund_record(
|
||||
path: &Path,
|
||||
) -> Result<WalletRefundOutboxRecord, WalletRefundOutboxError> {
|
||||
let mut file = File::open(path).await?;
|
||||
let mut bytes = Vec::new();
|
||||
file.read_to_end(&mut bytes).await?;
|
||||
Ok(serde_json::from_slice::<WalletRefundOutboxRecord>(&bytes)?)
|
||||
}
|
||||
|
||||
fn directory_size_if_exists(path: &Path) -> Result<u64, std::io::Error> {
|
||||
if !path.is_dir() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut total = 0u64;
|
||||
for entry in std::fs::read_dir(path)? {
|
||||
let entry = entry?;
|
||||
if !is_pending_outbox_file_name(&entry.file_name()) {
|
||||
continue;
|
||||
}
|
||||
let metadata = entry.metadata()?;
|
||||
if metadata.is_file() {
|
||||
total = total.saturating_add(metadata.len());
|
||||
}
|
||||
}
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
fn current_unix_micros() -> u128 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_micros()
|
||||
}
|
||||
|
||||
fn ledger_id_hash(ledger_id: &str) -> String {
|
||||
hex::encode(Sha256::digest(ledger_id.as_bytes()))
|
||||
}
|
||||
|
||||
fn is_pending_outbox_file_name(name: &std::ffi::OsStr) -> bool {
|
||||
name.to_str().is_some_and(|value| {
|
||||
value.starts_with(PENDING_FILE_PREFIX) && value.ends_with(OUTBOX_FILE_EXTENSION)
|
||||
})
|
||||
}
|
||||
|
||||
async fn sync_directory_metadata(path: &Path) -> Result<(), WalletRefundOutboxError> {
|
||||
let path = path.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let dir = std::fs::File::open(path)?;
|
||||
dir.sync_all()
|
||||
})
|
||||
.await
|
||||
.map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error.to_string()))??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn sample_record(ledger_id: &str) -> WalletRefundOutboxRecord {
|
||||
WalletRefundOutboxRecord {
|
||||
owner_user_id: "user-1".to_string(),
|
||||
amount: 2,
|
||||
ledger_id: ledger_id.to_string(),
|
||||
created_at_micros: 1_713_680_000_000_000,
|
||||
asset_kind: "puzzle_initial_image".to_string(),
|
||||
asset_id: "asset-1".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn test_dir(name: &str) -> PathBuf {
|
||||
let dir = std::env::temp_dir().join(format!(
|
||||
"genarrative-wallet-refund-outbox-{name}-{}",
|
||||
current_unix_micros()
|
||||
));
|
||||
let _ = std::fs::remove_dir_all(&dir);
|
||||
dir
|
||||
}
|
||||
|
||||
fn test_outbox(dir: PathBuf, max_bytes: u64) -> Arc<WalletRefundOutbox> {
|
||||
let config = AppConfig {
|
||||
wallet_refund_outbox_dir: dir,
|
||||
wallet_refund_outbox_batch_size: 500,
|
||||
wallet_refund_outbox_flush_interval: Duration::from_secs(60),
|
||||
wallet_refund_outbox_max_bytes: max_bytes,
|
||||
..AppConfig::default()
|
||||
};
|
||||
WalletRefundOutbox::from_config(
|
||||
&config,
|
||||
SpacetimeClient::new(spacetime_client::SpacetimeClientConfig {
|
||||
server_url: "http://127.0.0.1:1".to_string(),
|
||||
database: "missing".to_string(),
|
||||
token: None,
|
||||
pool_size: 1,
|
||||
procedure_timeout: Duration::from_millis(10),
|
||||
}),
|
||||
)
|
||||
.expect("outbox should be enabled")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_is_idempotent_per_ledger_id() {
|
||||
let dir = test_dir("idempotent");
|
||||
let outbox = test_outbox(dir.clone(), 1024 * 1024);
|
||||
|
||||
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
|
||||
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
|
||||
|
||||
let pending_count = std::fs::read_dir(&dir)
|
||||
.unwrap()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| is_pending_outbox_file_name(&entry.file_name()))
|
||||
.count();
|
||||
assert_eq!(pending_count, 1);
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_drops_when_outbox_exceeds_max_bytes() {
|
||||
let dir = test_dir("max-bytes");
|
||||
let outbox = test_outbox(dir.clone(), 1);
|
||||
|
||||
let outcome = outbox.enqueue(sample_record("ledger-1")).await.unwrap();
|
||||
|
||||
assert!(matches!(
|
||||
outcome,
|
||||
WalletRefundOutboxEnqueueOutcome::Dropped {
|
||||
reason: "max_bytes"
|
||||
}
|
||||
));
|
||||
assert!(!dir.exists() || std::fs::read_dir(&dir).unwrap().next().is_none());
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn flush_quarantines_corrupt_file() {
|
||||
let dir = test_dir("corrupt");
|
||||
std::fs::create_dir_all(&dir).unwrap();
|
||||
let pending_path = dir.join(format!("{PENDING_FILE_PREFIX}bad{OUTBOX_FILE_EXTENSION}"));
|
||||
std::fs::write(&pending_path, b"{not-json}").unwrap();
|
||||
let outbox = test_outbox(dir.clone(), 1024 * 1024);
|
||||
|
||||
outbox.flush_pending_files_once().await.unwrap();
|
||||
|
||||
assert!(!pending_path.exists());
|
||||
let corrupt_count = std::fs::read_dir(&dir)
|
||||
.unwrap()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| {
|
||||
entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.is_some_and(|name| name.starts_with(CORRUPT_FILE_PREFIX))
|
||||
})
|
||||
.count();
|
||||
assert_eq!(corrupt_count, 1);
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_flush_keeps_file_when_spacetime_is_unavailable() {
|
||||
let dir = test_dir("shutdown");
|
||||
let outbox = test_outbox(dir.clone(), 1024 * 1024);
|
||||
|
||||
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
|
||||
let result = outbox.flush_for_shutdown().await;
|
||||
|
||||
assert!(
|
||||
matches!(result, Err(WalletRefundOutboxError::Spacetime(_))),
|
||||
"missing test SpacetimeDB should keep refund file for retry"
|
||||
);
|
||||
let pending_count = std::fs::read_dir(&dir)
|
||||
.unwrap()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| is_pending_outbox_file_name(&entry.file_name()))
|
||||
.count();
|
||||
assert_eq!(pending_count, 1);
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user