use std::future::Future; use axum::http::StatusCode; use serde_json::json; use spacetime_client::SpacetimeClientError; use crate::{http_error::AppError, state::AppState}; pub(crate) const ASSET_OPERATION_POINTS_COST: u64 = 1; /// 资产操作统一执行入口:业务层只声明操作类型与资源 ID,钱包扣退费由服务层收口。 pub(crate) async fn execute_billable_asset_operation( state: &AppState, owner_user_id: &str, asset_kind: &str, asset_id: &str, operation: Fut, ) -> Result where Fut: Future>, { execute_billable_asset_operation_with_cost( state, owner_user_id, asset_kind, asset_id, ASSET_OPERATION_POINTS_COST, operation, ) .await } /// 生图等特殊操作可声明独立泥点成本,避免修改全局资产操作默认价格。 pub(crate) async fn execute_billable_asset_operation_with_cost( state: &AppState, owner_user_id: &str, asset_kind: &str, asset_id: &str, points_cost: u64, operation: Fut, ) -> Result where Fut: Future>, { let points_consumed = consume_asset_operation_points(state, owner_user_id, asset_kind, asset_id, points_cost) .await?; match operation.await { Ok(value) => Ok(value), Err(error) => { if points_consumed { refund_asset_operation_points( state, owner_user_id, asset_kind, asset_id, points_cost, ) .await; } Err(error) } } } /// 资产操作统一预扣泥点;扣费流水 ID 由业务资源 ID 参与构造,保证重试幂等。 async fn consume_asset_operation_points( state: &AppState, owner_user_id: &str, asset_kind: &str, asset_id: &str, points_cost: u64, ) -> Result { let ledger_id = format!( "asset_operation_consume:{}:{}:{}", owner_user_id, asset_kind, asset_id ); match state .spacetime_client() .consume_profile_wallet_points( owner_user_id.to_string(), points_cost, ledger_id, current_utc_micros(), ) .await { Ok(_) => Ok(true), Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => { // 中文注释:外部生图不应被 Maincloud 钱包短暂 503 阻断;此时跳过扣费,让业务链路继续,避免用户重复点击。 tracing::warn!( owner_user_id, asset_kind, asset_id, error = %error, "资产操作泥点预扣因 SpacetimeDB 连接不可用而降级跳过" ); Ok(false) } Err(error) => Err(map_asset_operation_wallet_error(error)), } } /// 外部生成或发布 mutation 失败后补偿退款;退款失败只记日志,避免覆盖原始业务错误。 async fn refund_asset_operation_points( state: &AppState, owner_user_id: &str, asset_kind: &str, asset_id: &str, points_cost: u64, ) { let ledger_id = format!( "asset_operation_refund:{}:{}:{}", owner_user_id, asset_kind, asset_id ); if let Err(error) = state .spacetime_client() .refund_profile_wallet_points( owner_user_id.to_string(), points_cost, ledger_id, current_utc_micros(), ) .await { tracing::error!( owner_user_id, asset_kind, asset_id, error = %error, "资产操作失败后的泥点退款失败" ); } } pub(crate) fn map_asset_operation_wallet_error(error: SpacetimeClientError) -> AppError { let message = error.to_string(); tracing::warn!( provider = "profile-wallet", error = %message, "资产操作泥点预扣失败" ); let status = match &error { SpacetimeClientError::Procedure(message) if message.contains("泥点余额不足") => { StatusCode::CONFLICT } _ => StatusCode::BAD_GATEWAY, }; AppError::from_status(status).with_details(json!({ "provider": "profile-wallet", "message": message, })) } pub(crate) fn should_skip_asset_operation_billing_for_connectivity( error: &SpacetimeClientError, ) -> bool { match error { SpacetimeClientError::ConnectDropped | SpacetimeClientError::Timeout => true, SpacetimeClientError::Build(message) | SpacetimeClientError::Procedure(message) | SpacetimeClientError::Runtime(message) => { message.contains("503") || message.contains("Service Unavailable") || message.contains("Failed to connect") || message.contains("WebSocket") || message.contains("No such procedure") || message.contains("连接已断开") || message.contains("连接在返回结果前已断开") } } } fn current_utc_micros() -> i64 { time::OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000 } #[cfg(test)] mod tests { use super::*; #[test] fn asset_operation_billing_skips_spacetime_connectivity_errors() { assert_eq!(ASSET_OPERATION_POINTS_COST, 1); assert!(should_skip_asset_operation_billing_for_connectivity( &SpacetimeClientError::ConnectDropped )); assert!(should_skip_asset_operation_billing_for_connectivity( &SpacetimeClientError::Runtime( "Failed to connect: HTTP error: 503 Service Unavailable".to_string(), ), )); assert!(should_skip_asset_operation_billing_for_connectivity( &SpacetimeClientError::Procedure( "No such procedure: consume_profile_wallet_points_and_return".to_string(), ), )); assert!(!should_skip_asset_operation_billing_for_connectivity( &SpacetimeClientError::Procedure("泥点余额不足".to_string()), )); } }