fix: lock recharge flow until virtual payment settles

This commit is contained in:
kdletters
2026-06-02 01:47:39 +08:00
parent 1cb11bc1dd
commit 2fdeb34567
13 changed files with 1167 additions and 246 deletions

View File

@@ -321,9 +321,8 @@ fn validate_admin_creation_entry_config(
let unified_creation_spec_json = unified_creation_spec
.as_ref()
.map(|spec| {
encode_unified_creation_spec_response(spec).map_err(|error| {
AppError::from_status(StatusCode::BAD_REQUEST).with_message(error)
})
encode_unified_creation_spec_response(spec)
.map_err(|error| AppError::from_status(StatusCode::BAD_REQUEST).with_message(error))
})
.transpose()?;
Ok(module_runtime::CreationEntryTypeAdminUpsertInput {

View File

@@ -14,7 +14,8 @@ use crate::{
create_profile_recharge_order, get_profile_analytics_metric, get_profile_dashboard,
get_profile_play_stats, get_profile_recharge_center, get_profile_referral_invite_center,
get_profile_task_center, get_profile_wallet_ledger, redeem_profile_referral_invite_code,
redeem_profile_reward_code, submit_profile_feedback,
redeem_profile_reward_code, stream_wechat_profile_recharge_order_events,
submit_profile_feedback,
},
runtime_save::{list_profile_save_archives, resume_profile_save_archive},
state::AppState,
@@ -73,6 +74,12 @@ pub fn router(state: AppState) -> Router<AppState> {
middleware::from_fn_with_state(state.clone(), require_bearer_auth),
),
)
.route(
"/api/profile/recharge/orders/{order_id}/wechat/events",
get(stream_wechat_profile_recharge_order_events).route_layer(
middleware::from_fn_with_state(state.clone(), require_bearer_auth),
),
)
.route(
"/api/profile/feedback",
post(submit_profile_feedback)

View File

@@ -2,7 +2,10 @@ use axum::{
Json,
extract::{Extension, Path, Query, State},
http::{HeaderMap, StatusCode},
response::Response,
response::{
IntoResponse, Response,
sse::{Event, Sse},
},
};
use hmac::{Hmac, Mac};
use module_runtime::{
@@ -23,7 +26,7 @@ use module_runtime::{
RuntimeProfileWalletLedgerSourceType, RuntimeReferralInviteCenterRecord,
RuntimeTrackingScopeKind,
};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sha2::Sha256;
use shared_contracts::runtime::{
@@ -63,10 +66,12 @@ use shared_contracts::runtime::{
RedeemProfileRewardCodeRequest, RedeemProfileRewardCodeResponse, SubmitProfileFeedbackRequest,
SubmitProfileFeedbackResponse, TRACKING_SCOPE_KIND_MODULE, TRACKING_SCOPE_KIND_SITE,
TRACKING_SCOPE_KIND_USER, TRACKING_SCOPE_KIND_WORK, WechatMiniProgramPaymentParamsResponse,
WechatMiniProgramVirtualPayParamsResponse,
WechatMiniProgramVirtualPayParamsResponse, WechatProfileRechargeOrderDoneEvent,
WechatProfileRechargeOrderErrorEvent,
};
use shared_kernel::{offset_datetime_to_unix_micros, parse_rfc3339};
use spacetime_client::SpacetimeClientError;
use std::{convert::Infallible, time::Duration};
use time::OffsetDateTime;
use crate::{
@@ -347,19 +352,19 @@ pub async fn confirm_wechat_profile_recharge_order(
if order.status == RuntimeProfileRechargeOrderStatus::Paid {
return Ok(json_success_body(
Some(&request_context),
ConfirmWechatProfileRechargeOrderResponse {
order: build_profile_recharge_order_response(order),
center: build_profile_recharge_center_response(center),
},
build_wechat_profile_recharge_order_confirmation(center, order),
));
}
if order.status != RuntimeProfileRechargeOrderStatus::Pending {
return Ok(json_success_body(
Some(&request_context),
ConfirmWechatProfileRechargeOrderResponse {
order: build_profile_recharge_order_response(order),
center: build_profile_recharge_center_response(center),
},
build_wechat_profile_recharge_order_confirmation(center, order),
));
}
if order.payment_channel == PROFILE_RECHARGE_PAYMENT_CHANNEL_WECHAT_MINI_PROGRAM_VIRTUAL {
return Ok(json_success_body(
Some(&request_context),
build_wechat_profile_recharge_order_confirmation(center, order),
));
}
@@ -381,10 +386,7 @@ pub async fn confirm_wechat_profile_recharge_order(
if wechat_order.trade_state != "SUCCESS" {
return Ok(json_success_body(
Some(&request_context),
ConfirmWechatProfileRechargeOrderResponse {
order: build_profile_recharge_order_response(order),
center: build_profile_recharge_center_response(center),
},
build_wechat_profile_recharge_order_confirmation(center, order),
));
}
@@ -406,13 +408,94 @@ pub async fn confirm_wechat_profile_recharge_order(
Ok(json_success_body(
Some(&request_context),
ConfirmWechatProfileRechargeOrderResponse {
order: build_profile_recharge_order_response(order),
center: build_profile_recharge_center_response(center),
},
build_wechat_profile_recharge_order_confirmation(center, order),
))
}
pub async fn stream_wechat_profile_recharge_order_events(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
Path(order_id): Path<String>,
) -> Result<Response, Response> {
let user_id = authenticated.claims().user_id().to_string();
let (center, order) = load_user_wechat_profile_recharge_order(
&state,
&request_context,
user_id.clone(),
order_id.clone(),
)
.await?;
let stream_state = state.clone();
let stream_context = request_context.clone();
let stream = async_stream::stream! {
let initial_response = build_wechat_profile_recharge_order_confirmation(center, order.clone());
yield Ok::<Event, Infallible>(wechat_profile_recharge_sse_json_event(
"order",
&initial_response,
));
if order.status != RuntimeProfileRechargeOrderStatus::Pending {
yield Ok::<Event, Infallible>(wechat_profile_recharge_sse_json_event(
"done",
&WechatProfileRechargeOrderDoneEvent {
order_id: order.order_id.clone(),
status: build_profile_recharge_order_status(order.status),
},
));
return;
}
let mut updates = stream_state.subscribe_profile_recharge_order_updates();
let mut poll_interval = tokio::time::interval(Duration::from_millis(1200));
for _ in 0..25 {
tokio::select! {
maybe_order_id = updates.recv() => {
if !matches!(maybe_order_id, Ok(ref value) if value == &order_id) {
continue;
}
}
_ = poll_interval.tick() => {}
}
match load_user_wechat_profile_recharge_order(
&stream_state,
&stream_context,
user_id.clone(),
order_id.clone(),
).await {
Ok((center, order)) => {
let response = build_wechat_profile_recharge_order_confirmation(center, order.clone());
yield Ok::<Event, Infallible>(wechat_profile_recharge_sse_json_event(
"order",
&response,
));
if order.status != RuntimeProfileRechargeOrderStatus::Pending {
yield Ok::<Event, Infallible>(wechat_profile_recharge_sse_json_event(
"done",
&WechatProfileRechargeOrderDoneEvent {
order_id: order.order_id.clone(),
status: build_profile_recharge_order_status(order.status),
},
));
return;
}
}
Err(_) => {
yield Ok::<Event, Infallible>(wechat_profile_recharge_sse_json_event(
"error",
&WechatProfileRechargeOrderErrorEvent {
message: "读取充值订单状态失败".to_string(),
},
));
return;
}
}
}
};
Ok(Sse::new(stream).into_response())
}
pub async fn submit_profile_feedback(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
@@ -1029,6 +1112,81 @@ fn runtime_profile_error_response(request_context: &RequestContext, error: AppEr
error.into_response_with_context(Some(request_context))
}
async fn load_user_wechat_profile_recharge_order(
state: &AppState,
request_context: &RequestContext,
user_id: String,
order_id: String,
) -> Result<
(
RuntimeProfileRechargeCenterRecord,
RuntimeProfileRechargeOrderRecord,
),
Response,
> {
let (center, order) = state
.spacetime_client()
.get_profile_recharge_order(order_id)
.await
.map_err(|error| {
runtime_profile_error_response(request_context, map_runtime_profile_client_error(error))
})?;
if order.user_id != user_id {
return Err(runtime_profile_error_response(
request_context,
AppError::from_status(StatusCode::NOT_FOUND).with_message("充值订单不存在"),
));
}
if !is_wechat_recharge_payment_channel(&order.payment_channel) {
return Err(runtime_profile_error_response(
request_context,
AppError::from_status(StatusCode::BAD_REQUEST)
.with_message("该充值订单不是微信支付订单"),
));
}
Ok((center, order))
}
fn build_wechat_profile_recharge_order_confirmation(
center: RuntimeProfileRechargeCenterRecord,
order: RuntimeProfileRechargeOrderRecord,
) -> ConfirmWechatProfileRechargeOrderResponse {
ConfirmWechatProfileRechargeOrderResponse {
order: build_profile_recharge_order_response(order),
center: build_profile_recharge_center_response(center),
}
}
fn build_profile_recharge_order_status(status: RuntimeProfileRechargeOrderStatus) -> String {
match status {
RuntimeProfileRechargeOrderStatus::Pending => "pending",
RuntimeProfileRechargeOrderStatus::Paid => "paid",
RuntimeProfileRechargeOrderStatus::Failed => "failed",
RuntimeProfileRechargeOrderStatus::Closed => "closed",
RuntimeProfileRechargeOrderStatus::Refunded => "refunded",
}
.to_string()
}
fn wechat_profile_recharge_sse_json_event<T>(event_name: &str, payload: &T) -> Event
where
T: Serialize,
{
Event::default()
.event(event_name)
.json_data(payload)
.unwrap_or_else(|_| {
Event::default()
.event("error")
.json_data(&WechatProfileRechargeOrderErrorEvent {
message: "充值订单状态事件序列化失败".to_string(),
})
.unwrap_or_else(|_| Event::default().event("error").data("{}"))
})
}
fn normalize_recharge_payment_channel(raw: Option<String>) -> Result<String, AppError> {
raw.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())

View File

@@ -27,7 +27,7 @@ use shared_contracts::creation_entry_config::CreationEntryConfigResponse;
use shared_contracts::creative_agent::CreativeAgentSessionSnapshot;
use spacetime_client::{SpacetimeClient, SpacetimeClientConfig, SpacetimeClientError};
use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tokio::sync::{Semaphore, broadcast};
use tracing::{info, warn};
use crate::config::AppConfig;
@@ -257,6 +257,7 @@ pub struct AppStateInner {
// Phase 1 任务 E 的 creative session facade 暂存在 api-server。
// creative_agent_* 表由任务 D 收口后,这里只保留读写 facade。
creative_agent_sessions: Arc<Mutex<HashMap<String, CreativeAgentSessionRuntimeRecord>>>,
profile_recharge_order_updates: broadcast::Sender<String>,
#[cfg(test)]
// 测试环境允许在未启动 SpacetimeDB 时,用内存快照兜底当前 runtime story 回归链。
test_runtime_snapshot_store: Arc<Mutex<HashMap<String, RuntimeSnapshotRecord>>>,
@@ -394,6 +395,7 @@ impl AppState {
let llm_client = build_llm_client(&config)?;
let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?;
let http_request_permit_pools = HttpRequestPermitPools::from_config(&config);
let (profile_recharge_order_updates, _) = broadcast::channel(128);
Ok(Self(Arc::new(AppStateInner {
config,
@@ -423,6 +425,7 @@ impl AppState {
creative_agent_gpt5_client,
creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor),
creative_agent_sessions: Arc::new(Mutex::new(HashMap::new())),
profile_recharge_order_updates,
#[cfg(test)]
test_runtime_snapshot_store: Arc::new(Mutex::new(HashMap::new())),
})))
@@ -710,6 +713,16 @@ impl AppState {
self.creative_agent_executor.clone()
}
pub fn subscribe_profile_recharge_order_updates(
&self,
) -> tokio::sync::broadcast::Receiver<String> {
self.profile_recharge_order_updates.subscribe()
}
pub fn publish_profile_recharge_order_update(&self, order_id: impl Into<String>) {
let _ = self.profile_recharge_order_updates.send(order_id.into());
}
pub fn get_creative_agent_session(
&self,
session_id: &str,

View File

@@ -9,9 +9,7 @@ use axum::{
};
use base64::{
Engine as _, alphabet,
engine::general_purpose::{
GeneralPurpose, GeneralPurposeConfig, STANDARD as BASE64_STANDARD,
},
engine::general_purpose::{GeneralPurpose, GeneralPurposeConfig, STANDARD as BASE64_STANDARD},
};
use bytes::Bytes;
use cbc::cipher::{BlockDecryptMut, KeyIvInit, block_padding::NoPadding};
@@ -1017,6 +1015,8 @@ pub async fn handle_wechat_virtual_payment_notify(
);
}
state.publish_profile_recharge_order_update(notify.out_trade_no.clone());
info!(
event = notify.event.as_str(),
order_id = notify.out_trade_no.as_str(),
@@ -1152,9 +1152,7 @@ fn resolve_wechat_message_push_verify_response(
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
WechatPayError::InvalidRequest("微信消息推送校验参数不完整".to_string())
})?;
.ok_or_else(|| WechatPayError::InvalidRequest("微信消息推送校验参数不完整".to_string()))?;
if !verify_wechat_message_push_signature(token, timestamp, nonce, "", signature) {
return Err(WechatPayError::InvalidSignature(
"微信消息推送校验签名无效".to_string(),

View File

@@ -262,8 +262,18 @@ mod tests {
);
let jump_hop = build_phase1_unified_creation_spec("jump-hop").expect("jump-hop spec");
assert!(jump_hop.fields.iter().any(|field| field.id == "stylePreset"));
assert!(jump_hop.fields.iter().any(|field| field.id == "endMoodPrompt"));
assert!(
jump_hop
.fields
.iter()
.any(|field| field.id == "stylePreset")
);
assert!(
jump_hop
.fields
.iter()
.any(|field| field.id == "endMoodPrompt")
);
let wooden_fish =
build_phase1_unified_creation_spec("wooden-fish").expect("wooden-fish spec");

View File

@@ -313,6 +313,19 @@ pub struct ConfirmWechatProfileRechargeOrderResponse {
pub center: ProfileRechargeCenterResponse,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct WechatProfileRechargeOrderDoneEvent {
pub order_id: String,
pub status: String,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct WechatProfileRechargeOrderErrorEvent {
pub message: String,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ProfileFeedbackEvidenceItemRequest {