收口微信领域能力

将 api-server 微信 HTTP/BFF 适配统一迁移到 wechat 目录。

将微信支付和虚拟支付消息协议细节下沉到 platform-wechat。

拆分 platform-wechat 的订阅消息与支付模块并补齐依赖。

修正微信相关测试的用户 ID 夹具并同步后端架构文档。
This commit is contained in:
kdletters
2026-06-08 21:05:37 +08:00
parent 11c5e3edf4
commit 088470a315
34 changed files with 925 additions and 837 deletions

View File

@@ -0,0 +1,526 @@
use axum::{
Json,
extract::{Extension, Query, State},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Redirect, Response},
};
use module_auth::{
AuthLoginMethod, BindWechatPhoneInput, BindWechatVerifiedPhoneInput,
CreateWechatAuthStateInput, WechatAuthError,
};
use platform_auth::WechatAuthScene;
use shared_contracts::auth::{
WechatBindPhoneRequest, WechatBindPhoneResponse, WechatCallbackQuery,
WechatMiniProgramLoginRequest, WechatMiniProgramLoginResponse, WechatStartQuery,
WechatStartResponse,
};
use shared_kernel::normalize_optional_string;
use time::OffsetDateTime;
use url::Url;
use crate::{
api_response::json_success_body,
auth::AuthenticatedAccessToken,
auth_payload::map_auth_user_payload,
auth_session::{
attach_set_cookie_header, build_refresh_session_cookie_header, create_auth_session,
record_daily_login_tracking_event_after_auth_success,
},
http_error::AppError,
platform_errors::{attach_retry_after, map_wechat_provider_error},
request_context::RequestContext,
session_client::resolve_session_client_context,
state::AppState,
};
pub async fn start_wechat_login(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
headers: HeaderMap,
Query(query): Query<WechatStartQuery>,
) -> Result<Json<serde_json::Value>, AppError> {
if !state.config.wechat_auth_enabled {
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("微信登录暂未启用"));
}
let user_agent = headers
.get("user-agent")
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
let scene = resolve_wechat_scene(user_agent.as_deref())?;
let state_record = state
.wechat_auth_state_service()
.create_state(
CreateWechatAuthStateInput {
redirect_path: normalize_redirect_path(
query.redirect_path.as_deref(),
&state.config.wechat_redirect_path,
),
scene: map_wechat_scene_to_domain(&scene),
request_user_agent: user_agent.clone(),
},
OffsetDateTime::now_utc(),
)
.map_err(map_wechat_auth_error)?;
let authorization_url = state
.wechat_provider()
.build_authorization_url(
&resolve_wechat_callback_url(&state, &headers)?,
&state_record.state.state_token,
&scene,
)
.map_err(map_wechat_provider_error)?;
Ok(json_success_body(
Some(&request_context),
WechatStartResponse { authorization_url },
))
}
pub async fn handle_wechat_callback(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
headers: HeaderMap,
Query(query): Query<WechatCallbackQuery>,
) -> Result<impl IntoResponse, AppError> {
if !state.config.wechat_auth_enabled {
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("微信登录暂未启用"));
}
let fallback_redirect = state.config.wechat_redirect_path.clone();
let state_token = query
.state
.as_deref()
.unwrap_or_default()
.trim()
.to_string();
if state_token.is_empty() {
return Ok(Redirect::to(&build_auth_result_redirect_url(
&fallback_redirect,
&[
("auth_provider", "wechat"),
("auth_error", "微信登录状态已失效,请重新发起登录。"),
],
))
.into_response());
}
let consumed = match state
.wechat_auth_state_service()
.consume_state(&state_token, OffsetDateTime::now_utc())
{
Ok(value) => value,
Err(_) => {
return Ok(Redirect::to(&build_auth_result_redirect_url(
&fallback_redirect,
&[
("auth_provider", "wechat"),
("auth_error", "微信登录状态已失效,请重新发起登录。"),
],
))
.into_response());
}
};
let redirect_path = consumed.state.redirect_path.clone();
let session_client = resolve_session_client_context(&headers);
let result = match state
.wechat_provider()
.resolve_callback_profile(query.code.as_deref(), query.mock_code.as_deref())
.await
{
Ok(profile) => state
.wechat_auth_service()
.resolve_login(module_auth::ResolveWechatLoginInput {
profile: map_wechat_profile_to_domain(profile),
})
.await
.map_err(map_wechat_auth_error),
Err(error) => Err(map_wechat_provider_error(error)),
};
match result {
Ok(result) => {
let signed_session = create_auth_session(
&state,
&result.user,
&session_client,
AuthLoginMethod::Wechat,
)?;
state
.sync_auth_store_snapshot_to_spacetime()
.await
.map_err(|error| {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
.with_message(format!("同步认证快照失败:{error}"))
})?;
record_daily_login_tracking_event_after_auth_success(
&state,
&request_context,
&result.user.id,
AuthLoginMethod::Wechat,
)
.await;
let mut response = Redirect::to(&build_auth_result_redirect_url(
&redirect_path,
&[
("auth_provider", "wechat"),
("auth_token", signed_session.access_token.as_str()),
("auth_binding_status", result.user.binding_status.as_str()),
],
))
.into_response();
attach_set_cookie_header(
response.headers_mut(),
build_refresh_session_cookie_header(&state, &signed_session.refresh_token)?,
);
Ok(response)
}
Err(error) => Ok(Redirect::to(&build_auth_result_redirect_url(
&redirect_path,
&[("auth_provider", "wechat"), ("auth_error", error.message())],
))
.into_response()),
}
}
pub async fn bind_wechat_phone(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,
headers: HeaderMap,
Json(payload): Json<WechatBindPhoneRequest>,
) -> Result<impl IntoResponse, AppError> {
if !state.config.wechat_auth_enabled {
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("微信登录暂未启用"));
}
let result = if let Some(wechat_phone_code) = payload
.wechat_phone_code
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
let phone_profile = state
.wechat_provider()
.resolve_mini_program_phone_number(Some(wechat_phone_code))
.await
.map_err(map_wechat_provider_error)?;
state
.phone_auth_service()
.bind_wechat_verified_phone(BindWechatVerifiedPhoneInput {
user_id: authenticated.claims().user_id().to_string(),
phone_number: phone_profile.phone_number,
wechat_display_name: payload.display_name.clone(),
})
.await
.map_err(map_wechat_bind_phone_error)?
} else {
let phone = payload
.phone
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_message("缺少需要绑定的手机号")
})?;
let code = payload
.code
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST).with_message("缺少短信验证码")
})?;
state
.phone_auth_service()
.bind_wechat_phone(
BindWechatPhoneInput {
user_id: authenticated.claims().user_id().to_string(),
phone_number: phone.to_string(),
verify_code: code.to_string(),
wechat_display_name: payload.display_name.clone(),
},
OffsetDateTime::now_utc(),
)
.await
.map_err(map_wechat_bind_phone_error)?
};
let session_client = resolve_session_client_context(&headers);
let signed_session = create_auth_session(
&state,
&result.user,
&session_client,
AuthLoginMethod::Wechat,
)?;
state
.sync_auth_store_snapshot_to_spacetime()
.await
.map_err(|error| {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
.with_message(format!("同步认证快照失败:{error}"))
})?;
if result.activated_new_user {
crate::registration_reward::grant_new_user_registration_wallet_reward(
&state,
&request_context,
&result.user.id,
)
.await;
}
record_daily_login_tracking_event_after_auth_success(
&state,
&request_context,
&result.user.id,
AuthLoginMethod::Wechat,
)
.await;
let mut response_headers = HeaderMap::new();
attach_set_cookie_header(
&mut response_headers,
build_refresh_session_cookie_header(&state, &signed_session.refresh_token)?,
);
Ok((
response_headers,
json_success_body(
Some(&request_context),
WechatBindPhoneResponse {
token: signed_session.access_token,
user: map_auth_user_payload(result.user),
},
),
))
}
pub async fn login_wechat_mini_program(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
headers: HeaderMap,
Json(payload): Json<WechatMiniProgramLoginRequest>,
) -> Result<impl IntoResponse, AppError> {
if !state.config.wechat_auth_enabled {
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("微信登录暂未启用"));
}
let code = payload.code.trim();
if code.is_empty() {
return Err(
AppError::from_status(StatusCode::BAD_REQUEST).with_message("缺少微信授权 code")
);
}
let profile = state
.wechat_provider()
.resolve_mini_program_login_profile(Some(code))
.await
.map_err(map_wechat_provider_error)?;
let result = state
.wechat_auth_service()
.resolve_login(module_auth::ResolveWechatLoginInput {
profile: map_wechat_profile_to_domain_with_display_name(profile, payload.display_name),
})
.await
.map_err(map_wechat_auth_error)?;
let session_client = resolve_session_client_context(&headers);
let signed_session = create_auth_session(
&state,
&result.user,
&session_client,
AuthLoginMethod::Wechat,
)?;
state
.sync_auth_store_snapshot_to_spacetime()
.await
.map_err(|error| {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
.with_message(format!("同步认证快照失败:{error}"))
})?;
let mut response_headers = HeaderMap::new();
attach_set_cookie_header(
&mut response_headers,
build_refresh_session_cookie_header(&state, &signed_session.refresh_token)?,
);
Ok((
response_headers,
json_success_body(
Some(&request_context),
WechatMiniProgramLoginResponse {
token: signed_session.access_token,
binding_status: result.user.binding_status.as_str().to_string(),
user: map_auth_user_payload(result.user),
created: result.created,
},
),
))
}
fn resolve_wechat_scene(user_agent: Option<&str>) -> Result<WechatAuthScene, AppError> {
let user_agent = user_agent.unwrap_or_default();
let is_wechat = user_agent.contains("MicroMessenger");
let is_mobile = user_agent.contains("Android")
|| user_agent.contains("iPhone")
|| user_agent.contains("iPad")
|| user_agent.contains("Mobile");
if is_wechat {
return Ok(WechatAuthScene::WechatInApp);
}
if is_mobile {
return Err(AppError::from_status(StatusCode::BAD_REQUEST)
.with_message("当前浏览器请使用手机号登录,或在微信内打开后再使用微信登录"));
}
Ok(WechatAuthScene::Desktop)
}
fn map_wechat_scene_to_domain(scene: &WechatAuthScene) -> module_auth::WechatAuthScene {
match scene {
WechatAuthScene::Desktop => module_auth::WechatAuthScene::Desktop,
WechatAuthScene::WechatInApp => module_auth::WechatAuthScene::WechatInApp,
}
}
fn map_wechat_profile_to_domain(
profile: platform_auth::WechatIdentityProfile,
) -> module_auth::WechatIdentityProfile {
module_auth::WechatIdentityProfile {
provider_uid: profile.provider_uid,
provider_union_id: profile.provider_union_id,
display_name: profile.display_name,
avatar_url: profile.avatar_url,
session_key: profile.session_key,
}
}
fn map_wechat_profile_to_domain_with_display_name(
profile: platform_auth::WechatIdentityProfile,
display_name: Option<String>,
) -> module_auth::WechatIdentityProfile {
let mut profile = map_wechat_profile_to_domain(profile);
if let Some(display_name) = normalize_optional_string(display_name) {
profile.display_name = Some(display_name);
}
profile
}
fn normalize_redirect_path(raw_value: Option<&str>, fallback: &str) -> String {
let Some(raw_value) = raw_value.map(str::trim).filter(|value| !value.is_empty()) else {
return fallback.to_string();
};
if raw_value.starts_with('/') {
return raw_value.to_string();
}
Url::parse(raw_value)
.map(|url| {
format!(
"{}{}{}",
url.path(),
url.query().map(|v| format!("?{v}")).unwrap_or_default(),
url.fragment().map(|v| format!("#{v}")).unwrap_or_default()
)
})
.unwrap_or_else(|_| fallback.to_string())
}
fn resolve_wechat_callback_url(state: &AppState, headers: &HeaderMap) -> Result<String, AppError> {
let proto = headers
.get("x-forwarded-proto")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.split(',').next())
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("http");
let host = headers
.get("x-forwarded-host")
.or_else(|| headers.get("host"))
.and_then(|value| value.to_str().ok())
.and_then(|value| value.split(',').next())
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("127.0.0.1:3000");
Ok(format!(
"{proto}://{host}{}",
state.config.wechat_callback_path
))
}
fn build_auth_result_redirect_url(redirect_path: &str, params: &[(&str, &str)]) -> String {
let hash = params
.iter()
.map(|(key, value)| {
format!(
"{}={}",
urlencoding::encode(key),
urlencoding::encode(value)
)
})
.collect::<Vec<_>>()
.join("&");
let path_without_hash = redirect_path.split('#').next().unwrap_or("/");
format!(
"{}#{}",
if path_without_hash.is_empty() {
"/"
} else {
path_without_hash
},
hash
)
}
#[allow(dead_code)]
fn _assert_response_type(_: Response) {}
fn map_wechat_auth_error(error: WechatAuthError) -> AppError {
match error {
WechatAuthError::MissingProfile
| WechatAuthError::StateNotFound
| WechatAuthError::StateExpired
| WechatAuthError::StateConsumed
| WechatAuthError::MissingWechatIdentity => {
AppError::from_status(StatusCode::BAD_REQUEST).with_message(error.to_string())
}
WechatAuthError::UserNotFound => {
AppError::from_status(StatusCode::UNAUTHORIZED).with_message(error.to_string())
}
WechatAuthError::Store(_) | WechatAuthError::PasswordHash(_) => {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_message(error.to_string())
}
}
}
fn map_wechat_bind_phone_error(error: module_auth::PhoneAuthError) -> AppError {
match error {
module_auth::PhoneAuthError::InvalidPhoneNumber
| module_auth::PhoneAuthError::InvalidVerifyCode
| module_auth::PhoneAuthError::VerifyCodeNotFound
| module_auth::PhoneAuthError::VerifyCodeExpired
| module_auth::PhoneAuthError::UserStateMismatch => {
AppError::from_status(StatusCode::BAD_REQUEST).with_message(error.to_string())
}
module_auth::PhoneAuthError::SendCoolingDown {
retry_after_seconds,
} => {
let app_error = AppError::from_status(StatusCode::TOO_MANY_REQUESTS)
.with_message(error.to_string())
.with_details(serde_json::json!({ "retryAfterSeconds": retry_after_seconds }));
attach_retry_after(app_error, retry_after_seconds)
}
module_auth::PhoneAuthError::VerifyAttemptsExceeded => {
AppError::from_status(StatusCode::TOO_MANY_REQUESTS).with_message(error.to_string())
}
module_auth::PhoneAuthError::UserNotFound => {
AppError::from_status(StatusCode::UNAUTHORIZED).with_message(error.to_string())
}
module_auth::PhoneAuthError::SmsProviderInvalidConfig(_) => {
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_message(error.to_string())
}
module_auth::PhoneAuthError::SmsProviderUpstream(_) => {
AppError::from_status(StatusCode::BAD_GATEWAY).with_message(error.to_string())
}
module_auth::PhoneAuthError::Store(_) | module_auth::PhoneAuthError::PasswordHash(_) => {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_message(error.to_string())
}
}
}

View File

@@ -0,0 +1,423 @@
use axum::{
Json,
extract::{Query, State},
http::{HeaderMap, HeaderValue, StatusCode, header::CONTENT_TYPE},
response::{IntoResponse, Response},
};
use bytes::Bytes;
use platform_wechat::pay::{
WechatMiniProgramMessagePushQuery, WechatMiniProgramOrderRequest, WechatPayConfig,
WechatPayError, WechatWebOrderRequest, decrypt_wechat_message_push_ciphertext,
parse_virtual_payment_notify, parse_wechat_mini_program_message_push_payload,
resolve_wechat_message_push_verify_response, verify_wechat_message_push_signature,
};
use serde::Serialize;
use serde_json::json;
use shared_kernel::offset_datetime_to_unix_micros;
use time::OffsetDateTime;
use tracing::{info, warn};
use crate::{config::AppConfig, http_error::AppError, state::AppState};
#[derive(Clone, Copy)]
enum VirtualPaymentNotifyResponseFormat {
Json,
Xml,
}
#[derive(Serialize)]
struct ApiWechatVirtualPaymentNotifyResponse {
#[serde(rename = "ErrCode")]
err_code: i32,
#[serde(rename = "ErrMsg")]
err_msg: String,
}
pub async fn handle_wechat_pay_notify(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Result<StatusCode, AppError> {
let notify = state
.wechat_pay_client()
.parse_notify(&headers, &body)
.map_err(map_wechat_pay_notify_error)?;
if notify.trade_state != "SUCCESS" {
info!(
order_id = notify.out_trade_no.as_str(),
trade_state = notify.trade_state.as_str(),
"收到非成功微信支付通知"
);
return Ok(StatusCode::NO_CONTENT);
}
let paid_at_micros = notify
.success_time
.as_deref()
.and_then(|value| shared_kernel::parse_rfc3339(value).ok())
.map(offset_datetime_to_unix_micros)
.unwrap_or_else(current_unix_micros);
state
.spacetime_client()
.mark_profile_recharge_order_paid(
notify.out_trade_no.clone(),
paid_at_micros,
notify.transaction_id.clone(),
)
.await
.map_err(|error| {
AppError::from_status(StatusCode::BAD_GATEWAY)
.with_message(format!("确认微信支付订单失败:{error}"))
})?;
info!(
order_id = notify.out_trade_no.as_str(),
"微信支付通知已确认订单入账"
);
Ok(StatusCode::NO_CONTENT)
}
pub async fn handle_wechat_virtual_payment_message_push_verify(
State(state): State<AppState>,
Query(query): Query<WechatMiniProgramMessagePushQuery>,
) -> Response {
let token = match read_wechat_message_push_config(
state.config.wechat_mini_program_message_token.as_deref(),
"WECHAT_MINIPROGRAM_MESSAGE_TOKEN",
) {
Ok(token) => token,
Err(error) => return build_wechat_message_push_verify_error_response(error),
};
let aes_key = match read_wechat_message_push_config(
state
.config
.wechat_mini_program_message_encoding_aes_key
.as_deref(),
"WECHAT_MINIPROGRAM_MESSAGE_ENCODING_AES_KEY",
) {
Ok(value) => value,
Err(error) => return build_wechat_message_push_verify_error_response(error),
};
match resolve_wechat_message_push_verify_response(
token,
aes_key,
state
.config
.wechat_mini_program_app_id
.as_deref()
.or(state.config.wechat_app_id.as_deref()),
&query,
) {
Ok(plaintext) => (StatusCode::OK, plaintext).into_response(),
Err(error) => build_wechat_message_push_verify_error_response(error),
}
}
pub async fn handle_wechat_virtual_payment_notify(
State(state): State<AppState>,
headers: HeaderMap,
Query(query): Query<WechatMiniProgramMessagePushQuery>,
body: Bytes,
) -> Response {
let response_format = detect_virtual_payment_notify_response_format(&headers, &body);
let encrypted_payload = match parse_wechat_mini_program_message_push_payload(&body) {
Ok(payload) => payload,
Err(error) => return build_virtual_payment_notify_error_response(error, response_format),
};
let token = match read_wechat_message_push_config(
state.config.wechat_mini_program_message_token.as_deref(),
"WECHAT_MINIPROGRAM_MESSAGE_TOKEN",
) {
Ok(token) => token,
Err(error) => return build_virtual_payment_notify_error_response(error, response_format),
};
let aes_key = match read_wechat_message_push_config(
state
.config
.wechat_mini_program_message_encoding_aes_key
.as_deref(),
"WECHAT_MINIPROGRAM_MESSAGE_ENCODING_AES_KEY",
) {
Ok(value) => value,
Err(error) => return build_virtual_payment_notify_error_response(error, response_format),
};
let signature = query
.msg_signature
.as_deref()
.or(query.signature.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("");
let timestamp = query.timestamp.as_deref().map(str::trim).unwrap_or("");
let nonce = query.nonce.as_deref().map(str::trim).unwrap_or("");
if signature.is_empty() || timestamp.is_empty() || nonce.is_empty() {
return build_virtual_payment_notify_error_response(
WechatPayError::InvalidRequest("微信消息推送加密参数不完整".to_string()),
response_format,
);
}
if !verify_wechat_message_push_signature(
token,
timestamp,
nonce,
encrypted_payload.encrypt.as_str(),
signature,
) {
return build_virtual_payment_notify_error_response(
WechatPayError::InvalidSignature("微信消息推送 msg_signature 无效".to_string()),
response_format,
);
}
let notify_body = match decrypt_wechat_message_push_ciphertext(
aes_key,
encrypted_payload.encrypt.as_str(),
state
.config
.wechat_mini_program_app_id
.as_deref()
.or(state.config.wechat_app_id.as_deref()),
) {
Ok(body) => body,
Err(error) => return build_virtual_payment_notify_error_response(error, response_format),
};
let notify = match parse_virtual_payment_notify(notify_body.as_bytes()) {
Ok(notify) => notify,
Err(error) => return build_virtual_payment_notify_error_response(error, response_format),
};
if notify.event != "xpay_goods_deliver_notify" && notify.event != "xpay_coin_pay_notify" {
info!(
event = notify.event.as_str(),
order_id = notify.out_trade_no.as_str(),
"收到非订单入账虚拟支付推送"
);
return build_virtual_payment_notify_success_response(response_format);
}
let paid_at_micros = notify.paid_at_micros.unwrap_or_else(current_unix_micros);
if state
.spacetime_client()
.mark_profile_recharge_order_paid(
notify.out_trade_no.clone(),
paid_at_micros,
notify.transaction_id.clone(),
)
.await
.is_err()
{
warn!(
order_id = notify.out_trade_no.as_str(),
"确认微信虚拟支付订单失败"
);
return build_virtual_payment_notify_error_response(
WechatPayError::Upstream("确认微信虚拟支付订单失败".to_string()),
response_format,
);
}
state.publish_profile_recharge_order_update(notify.out_trade_no.clone());
info!(
event = notify.event.as_str(),
order_id = notify.out_trade_no.as_str(),
"微信虚拟支付推送已确认订单入账"
);
build_virtual_payment_notify_success_response(response_format)
}
pub fn build_wechat_pay_config(config: &AppConfig) -> WechatPayConfig {
WechatPayConfig {
enabled: config.wechat_pay_enabled,
provider: config.wechat_pay_provider.clone(),
app_id: config
.wechat_mini_program_app_id
.clone()
.or_else(|| config.wechat_app_id.clone()),
mch_id: config.wechat_pay_mch_id.clone(),
merchant_serial_no: config.wechat_pay_merchant_serial_no.clone(),
private_key_pem: config.wechat_pay_private_key_pem.clone(),
private_key_path: config.wechat_pay_private_key_path.clone(),
platform_public_key_pem: config.wechat_pay_platform_public_key_pem.clone(),
platform_public_key_path: config.wechat_pay_platform_public_key_path.clone(),
platform_serial_no: config.wechat_pay_platform_serial_no.clone(),
api_v3_key: config.wechat_pay_api_v3_key.clone(),
notify_url: config.wechat_pay_notify_url.clone(),
jsapi_endpoint: config.wechat_pay_jsapi_endpoint.clone(),
}
}
pub fn map_wechat_pay_error(error: WechatPayError) -> AppError {
match error {
WechatPayError::Disabled => AppError::from_status(StatusCode::BAD_REQUEST)
.with_message("微信支付暂未启用")
.with_details(json!({ "provider": "wechat_pay" })),
WechatPayError::InvalidConfig(message) => {
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
.with_message(message)
.with_details(json!({ "provider": "wechat_pay" }))
}
WechatPayError::InvalidRequest(message) => AppError::from_status(StatusCode::BAD_REQUEST)
.with_message(message)
.with_details(json!({ "provider": "wechat_pay" })),
WechatPayError::RequestFailed(message)
| WechatPayError::Upstream(message)
| WechatPayError::Deserialize(message)
| WechatPayError::Crypto(message) => AppError::from_status(StatusCode::BAD_GATEWAY)
.with_message(message)
.with_details(json!({ "provider": "wechat_pay" })),
WechatPayError::InvalidSignature(message) => {
AppError::from_status(StatusCode::UNAUTHORIZED)
.with_message("微信支付通知签名无效")
.with_details(json!({ "provider": "wechat_pay", "reason": message }))
}
}
}
pub fn map_wechat_pay_init_error(error: WechatPayError) -> crate::state::AppStateInitError {
crate::state::AppStateInitError::WechatPay(error.to_string())
}
pub fn build_wechat_payment_request(
order_id: String,
product_title: String,
amount_cents: u64,
payer_openid: String,
) -> WechatMiniProgramOrderRequest {
WechatMiniProgramOrderRequest {
order_id,
description: format!("陶泥儿 - {product_title}"),
amount_cents,
payer_openid,
}
}
pub fn build_wechat_web_payment_request(
order_id: String,
product_title: String,
amount_cents: u64,
payer_client_ip: String,
) -> WechatWebOrderRequest {
WechatWebOrderRequest {
order_id,
description: format!("陶泥儿 - {product_title}"),
amount_cents,
payer_client_ip,
}
}
pub fn current_unix_micros() -> i64 {
let value = OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000;
i64::try_from(value).unwrap_or(i64::MAX)
}
fn map_wechat_pay_notify_error(error: WechatPayError) -> AppError {
warn!(error = %error, "微信支付通知处理失败");
map_wechat_pay_error(error)
}
fn read_wechat_message_push_config<'a>(
value: Option<&'a str>,
key: &str,
) -> Result<&'a str, WechatPayError> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| WechatPayError::InvalidConfig(format!("{key} 未配置")))
}
fn build_wechat_message_push_verify_error_response(error: WechatPayError) -> Response {
let message = match error {
WechatPayError::Disabled => "微信消息推送暂未启用".to_string(),
WechatPayError::InvalidConfig(message)
| WechatPayError::InvalidRequest(message)
| WechatPayError::RequestFailed(message)
| WechatPayError::Upstream(message)
| WechatPayError::Deserialize(message)
| WechatPayError::Crypto(message)
| WechatPayError::InvalidSignature(message) => message,
};
(StatusCode::BAD_REQUEST, message).into_response()
}
fn build_virtual_payment_notify_error_response(
error: WechatPayError,
response_format: VirtualPaymentNotifyResponseFormat,
) -> Response {
warn!(error = %error, "微信虚拟支付通知处理失败");
let message = match error {
WechatPayError::Disabled => "微信虚拟支付暂未启用".to_string(),
WechatPayError::InvalidConfig(message)
| WechatPayError::InvalidRequest(message)
| WechatPayError::RequestFailed(message)
| WechatPayError::Upstream(message)
| WechatPayError::Deserialize(message)
| WechatPayError::Crypto(message)
| WechatPayError::InvalidSignature(message) => message,
};
build_virtual_payment_notify_response(response_format, 1, message)
}
fn build_virtual_payment_notify_success_response(
response_format: VirtualPaymentNotifyResponseFormat,
) -> Response {
build_virtual_payment_notify_response(response_format, 0, "success")
}
fn build_virtual_payment_notify_response(
response_format: VirtualPaymentNotifyResponseFormat,
err_code: i32,
err_msg: impl Into<String>,
) -> Response {
let err_msg = err_msg.into();
match response_format {
VirtualPaymentNotifyResponseFormat::Json => Json(
build_wechat_virtual_payment_notify_response(err_code, err_msg),
)
.into_response(),
VirtualPaymentNotifyResponseFormat::Xml => {
let body = format!(
"<xml><ErrCode>{err_code}</ErrCode><ErrMsg><![CDATA[{err_msg}]]></ErrMsg></xml>"
);
let mut response = (StatusCode::OK, body).into_response();
response.headers_mut().insert(
CONTENT_TYPE,
HeaderValue::from_static("application/xml; charset=utf-8"),
);
response
}
}
}
fn build_wechat_virtual_payment_notify_response(
err_code: i32,
err_msg: impl Into<String>,
) -> ApiWechatVirtualPaymentNotifyResponse {
ApiWechatVirtualPaymentNotifyResponse {
err_code,
err_msg: err_msg.into(),
}
}
fn detect_virtual_payment_notify_response_format(
headers: &HeaderMap,
body: &[u8],
) -> VirtualPaymentNotifyResponseFormat {
let content_type = headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or("")
.to_ascii_lowercase();
if content_type.contains("xml") {
return VirtualPaymentNotifyResponseFormat::Xml;
}
let body_trimmed = body
.iter()
.copied()
.skip_while(|byte| byte.is_ascii_whitespace())
.next();
match body_trimmed {
Some(b'<') => VirtualPaymentNotifyResponseFormat::Xml,
_ => VirtualPaymentNotifyResponseFormat::Json,
}
}

View File

@@ -0,0 +1,56 @@
use platform_auth::{
DEFAULT_WECHAT_ACCESS_TOKEN_ENDPOINT, DEFAULT_WECHAT_AUTHORIZE_ENDPOINT,
DEFAULT_WECHAT_JS_CODE_SESSION_ENDPOINT, DEFAULT_WECHAT_PHONE_NUMBER_ENDPOINT,
DEFAULT_WECHAT_STABLE_ACCESS_TOKEN_ENDPOINT, DEFAULT_WECHAT_USER_INFO_ENDPOINT,
WechatAuthConfig, WechatProvider,
};
use crate::config::AppConfig;
pub fn build_wechat_provider(config: &AppConfig) -> WechatProvider {
WechatProvider::new(WechatAuthConfig::new(
config.wechat_auth_enabled,
config.wechat_auth_provider.clone(),
config.wechat_app_id.clone(),
config.wechat_app_secret.clone(),
config.wechat_mini_program_app_id.clone(),
config.wechat_mini_program_app_secret.clone(),
normalize_wechat_endpoint(
&config.wechat_authorize_endpoint,
DEFAULT_WECHAT_AUTHORIZE_ENDPOINT,
),
normalize_wechat_endpoint(
&config.wechat_access_token_endpoint,
DEFAULT_WECHAT_ACCESS_TOKEN_ENDPOINT,
),
normalize_wechat_endpoint(
&config.wechat_user_info_endpoint,
DEFAULT_WECHAT_USER_INFO_ENDPOINT,
),
normalize_wechat_endpoint(
&config.wechat_js_code_session_endpoint,
DEFAULT_WECHAT_JS_CODE_SESSION_ENDPOINT,
),
normalize_wechat_endpoint(
&config.wechat_stable_access_token_endpoint,
DEFAULT_WECHAT_STABLE_ACCESS_TOKEN_ENDPOINT,
),
normalize_wechat_endpoint(
&config.wechat_phone_number_endpoint,
DEFAULT_WECHAT_PHONE_NUMBER_ENDPOINT,
),
config.wechat_mock_user_id.clone(),
config.wechat_mock_union_id.clone(),
config.wechat_mock_display_name.clone(),
config.wechat_mock_avatar_url.clone(),
))
}
fn normalize_wechat_endpoint(value: &str, fallback: &str) -> String {
let trimmed = value.trim();
if trimmed.is_empty() {
fallback.to_string()
} else {
trimmed.to_string()
}
}

View File

@@ -0,0 +1,246 @@
use std::collections::BTreeMap;
use axum::http::StatusCode;
use platform_wechat::WechatSubscribeMessageRequest;
use time::{OffsetDateTime, UtcOffset};
use tracing::{info, warn};
use crate::{http_error::AppError, platform_errors::map_wechat_error, state::AppState};
const GENERATION_RESULT_TASK_NAME: &str = "AI创作生成";
const DEFAULT_WORK_NAME: &str = "AI创作作品";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum GenerationResultSubscribeMessageStatus {
Succeeded,
Failed,
}
#[derive(Clone, Debug)]
pub struct GenerationResultSubscribeMessage {
pub owner_user_id: String,
pub task_name: Option<String>,
pub work_name: Option<String>,
pub status: GenerationResultSubscribeMessageStatus,
pub consumed_points: u64,
pub completed_at_micros: i64,
pub page: Option<String>,
}
pub async fn send_generation_result_subscribe_message_after_completion(
state: &AppState,
message: GenerationResultSubscribeMessage,
) {
if let Err(error) = send_generation_result_subscribe_message(state, message).await {
warn!(
error = %error,
"微信小程序生成结果订阅消息发送失败,已忽略"
);
}
}
async fn send_generation_result_subscribe_message(
state: &AppState,
message: GenerationResultSubscribeMessage,
) -> Result<(), AppError> {
if !state.config.wechat_mini_program_subscribe_message_enabled {
return Ok(());
}
let template_id = state
.config
.wechat_mini_program_generation_result_template_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
.with_message("微信订阅消息模板 ID 未配置")
})?;
let user = state
.auth_user_service()
.get_user_by_id(&message.owner_user_id)
.map_err(|error| {
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
.with_message(format!("读取微信订阅消息用户失败:{error}"))
})?
.ok_or_else(|| {
AppError::from_status(StatusCode::NOT_FOUND).with_message("微信订阅消息用户不存在")
})?;
let openid = user
.wechat_account
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_REQUEST)
.with_message("用户未绑定微信小程序 openid")
})?;
state
.wechat_client()
.send_subscribe_message(WechatSubscribeMessageRequest {
touser: openid.to_string(),
template_id: template_id.to_string(),
page: message
.page
.clone()
.or_else(|| Some("/pages/web-view/index".to_string())),
miniprogram_state: Some(
normalize_miniprogram_state(
&state.config.wechat_mini_program_subscribe_message_state,
)
.to_string(),
),
lang: Some("zh_CN".to_string()),
data: build_generation_result_template_data(&message),
})
.await
.map_err(map_wechat_error)?;
info!(
owner_user_id = %message.owner_user_id,
template_id,
"微信小程序生成结果订阅消息已发送"
);
Ok(())
}
fn build_generation_result_template_data(
message: &GenerationResultSubscribeMessage,
) -> BTreeMap<String, String> {
BTreeMap::from([
(
"thing1".to_string(),
truncate_template_value(
message
.task_name
.as_deref()
.unwrap_or(GENERATION_RESULT_TASK_NAME),
20,
),
),
(
"phrase2".to_string(),
truncate_template_value(message.status.template_status_label(), 5),
),
(
"time4".to_string(),
truncate_template_value(
&format_generation_completed_time(message.completed_at_micros),
20,
),
),
(
"thing5".to_string(),
truncate_template_value(
message.work_name.as_deref().unwrap_or(DEFAULT_WORK_NAME),
20,
),
),
(
"number6".to_string(),
truncate_template_value(&message.consumed_points.to_string(), 32),
),
])
}
impl GenerationResultSubscribeMessageStatus {
fn template_status_label(self) -> &'static str {
match self {
Self::Succeeded => "已完成",
Self::Failed => "生成失败",
}
}
}
fn truncate_template_value(value: &str, max_chars: usize) -> String {
let trimmed = value.trim();
let mut result = String::new();
for character in trimmed.chars().take(max_chars) {
result.push(character);
}
if result.is_empty() {
DEFAULT_WORK_NAME.to_string()
} else {
result
}
}
fn format_generation_completed_time(completed_at_micros: i64) -> String {
let seconds = completed_at_micros.div_euclid(1_000_000);
let Ok(utc_time) = OffsetDateTime::from_unix_timestamp(seconds) else {
return "1970-01-01 08:00".to_string();
};
let beijing_offset = UtcOffset::from_hms(8, 0, 0).unwrap_or(UtcOffset::UTC);
let local_time = utc_time.to_offset(beijing_offset);
format!(
"{:04}-{:02}-{:02} {:02}:{:02}",
local_time.year(),
u8::from(local_time.month()),
local_time.day(),
local_time.hour(),
local_time.minute()
)
}
fn normalize_miniprogram_state(value: &str) -> &'static str {
match value.trim().to_ascii_lowercase().as_str() {
"developer" | "develop" | "dev" => "developer",
"trial" => "trial",
_ => "formal",
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn failed_generation_result_template_uses_failed_status_and_zero_points() {
let data = build_generation_result_template_data(&GenerationResultSubscribeMessage {
owner_user_id: "user-1".to_string(),
task_name: Some("拼图".to_string()),
work_name: Some("首关拼图".to_string()),
status: GenerationResultSubscribeMessageStatus::Failed,
consumed_points: 0,
completed_at_micros: 1_762_000_000_000_000,
page: None,
});
assert_eq!(data.get("phrase2").map(String::as_str), Some("生成失败"));
assert_eq!(data.get("number6").map(String::as_str), Some("0"));
}
#[test]
fn generation_result_template_time_uses_wechat_time_format() {
let data = build_generation_result_template_data(&GenerationResultSubscribeMessage {
owner_user_id: "user-1".to_string(),
task_name: Some("拼图".to_string()),
work_name: Some("首关拼图".to_string()),
status: GenerationResultSubscribeMessageStatus::Succeeded,
consumed_points: 15,
completed_at_micros: 0,
page: None,
});
assert_eq!(
data.get("time4").map(String::as_str),
Some("1970-01-01 08:00")
);
}
#[test]
fn generation_result_template_uses_task_template_name() {
let data = build_generation_result_template_data(&GenerationResultSubscribeMessage {
owner_user_id: "user-1".to_string(),
task_name: Some("敲木鱼".to_string()),
work_name: Some("功德木鱼".to_string()),
status: GenerationResultSubscribeMessageStatus::Succeeded,
consumed_points: 10,
completed_at_micros: 0,
page: None,
});
assert_eq!(data.get("thing1").map(String::as_str), Some("敲木鱼"));
}
}