修复多端登录互相顶号

单设备退出只撤销当前 refresh session,不再提升账号级 token_version

认证中间件和 refresh 接口在本进程未命中会话时按需刷新 SpacetimeDB 认证工作集

补充多端登录与跨进程会话补水回归测试

同步项目文档和 Hermes 共享决策记录
This commit is contained in:
2026-06-07 20:54:35 +08:00
parent a5143fa0cb
commit cc84656a1f
9 changed files with 463 additions and 55 deletions

View File

@@ -3844,6 +3844,111 @@ mod tests {
assert_eq!(me_response.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn logout_current_device_keeps_other_device_session_alive() {
let state = AppState::new(AppConfig::default()).expect("state should build");
seed_phone_user_with_password(&state, "13800138031", TEST_PASSWORD).await;
let app = build_router(state);
let first_login_response = password_login_request_with_client(
app.clone(),
"13800138031",
TEST_PASSWORD,
"logout-current-device",
"203.0.113.41",
)
.await;
let first_refresh_cookie = first_login_response
.headers()
.get("set-cookie")
.and_then(|value| value.to_str().ok())
.expect("first refresh cookie should exist")
.to_string();
let first_login_body = first_login_response
.into_body()
.collect()
.await
.expect("first login body should collect")
.to_bytes();
let first_access_token = read_access_token(&first_login_body);
let second_login_response = password_login_request_with_client(
app.clone(),
"13800138031",
TEST_PASSWORD,
"logout-other-device",
"203.0.113.42",
)
.await;
let second_refresh_cookie = second_login_response
.headers()
.get("set-cookie")
.and_then(|value| value.to_str().ok())
.expect("second refresh cookie should exist")
.to_string();
let second_login_body = second_login_response
.into_body()
.collect()
.await
.expect("second login body should collect")
.to_bytes();
let second_access_token = read_access_token(&second_login_body);
let logout_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/auth/logout")
.header("authorization", format!("Bearer {first_access_token}"))
.header("cookie", first_refresh_cookie)
.body(Body::empty())
.expect("logout request should build"),
)
.await
.expect("logout request should succeed");
assert_eq!(logout_response.status(), StatusCode::OK);
let first_me_response = app
.clone()
.oneshot(
Request::builder()
.uri("/api/auth/me")
.header("authorization", format!("Bearer {first_access_token}"))
.body(Body::empty())
.expect("first me request should build"),
)
.await
.expect("first me request should succeed");
assert_eq!(first_me_response.status(), StatusCode::UNAUTHORIZED);
let second_me_response = app
.clone()
.oneshot(
Request::builder()
.uri("/api/auth/me")
.header("authorization", format!("Bearer {second_access_token}"))
.body(Body::empty())
.expect("second me request should build"),
)
.await
.expect("second me request should succeed");
assert_eq!(second_me_response.status(), StatusCode::OK);
let second_refresh_response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/auth/refresh")
.header("cookie", second_refresh_cookie)
.body(Body::empty())
.expect("second refresh request should build"),
)
.await
.expect("second refresh request should succeed");
assert_eq!(second_refresh_response.status(), StatusCode::OK);
}
#[tokio::test]
async fn logout_succeeds_without_refresh_cookie_when_bearer_token_is_valid() {
let state = AppState::new(AppConfig::default()).expect("state should build");

View File

@@ -135,7 +135,10 @@ pub async fn require_bearer_auth(
mut request: Request,
next: Next,
) -> Result<Response, AppError> {
let Some(authenticated) = authenticate_request(&state, &request)? else {
let path = request.uri().path().to_string();
let headers = request.headers().clone();
let request_id = request_id_from_request(&request);
let Some(authenticated) = authenticate_request(&state, path, headers, request_id).await? else {
return Err(AppError::from_status(StatusCode::UNAUTHORIZED));
};
request.extensions_mut().insert(authenticated.clone());
@@ -151,7 +154,11 @@ pub async fn require_runtime_principal_auth(
mut request: Request,
next: Next,
) -> Result<Response, AppError> {
let Some(principal) = authenticate_runtime_principal(&state, &request)? else {
let path = request.uri().path().to_string();
let headers = request.headers().clone();
let request_id = request_id_from_request(&request);
let Some(principal) = authenticate_runtime_principal(&state, path, headers, request_id).await?
else {
return Err(AppError::from_status(StatusCode::UNAUTHORIZED));
};
request.extensions_mut().insert(principal.clone());
@@ -162,24 +169,21 @@ pub async fn require_runtime_principal_auth(
Ok(response)
}
fn authenticate_runtime_principal(
async fn authenticate_runtime_principal(
state: &AppState,
request: &Request,
path: String,
headers: HeaderMap,
request_id: String,
) -> Result<Option<RuntimePrincipal>, AppError> {
if !request.headers().contains_key(AUTHORIZATION) {
if !headers.contains_key(AUTHORIZATION) {
return Ok(None);
}
match authenticate_request(state, request) {
match authenticate_request(state, path, headers.clone(), request_id.clone()).await {
Ok(Some(authenticated)) => Ok(Some(RuntimePrincipal::User(authenticated))),
Ok(None) => Ok(None),
Err(_) => {
let bearer_token = extract_bearer_token(request.headers())?;
let request_id = request
.extensions()
.get::<RequestContext>()
.map(|context| context.request_id().to_string())
.unwrap_or_else(|| "unknown".to_string());
let bearer_token = extract_bearer_token(&headers)?;
let claims = verify_runtime_guest_token(&bearer_token, state.auth_jwt_config())
.map_err(|error| {
warn!(
@@ -202,26 +206,23 @@ fn authenticate_runtime_principal(
}
}
fn authenticate_request(
async fn authenticate_request(
state: &AppState,
request: &Request,
path: String,
headers: HeaderMap,
request_id: String,
) -> Result<Option<AuthenticatedAccessToken>, AppError> {
if allows_internal_forwarded_auth(request.uri().path()) {
if let Some(claims) = try_build_internal_forwarded_claims(&state, request.headers()) {
if allows_internal_forwarded_auth(&path) {
if let Some(claims) = try_build_internal_forwarded_claims(state, &headers) {
return Ok(Some(AuthenticatedAccessToken::new(claims)));
}
}
if !request.headers().contains_key(AUTHORIZATION) {
if !headers.contains_key(AUTHORIZATION) {
return Ok(None);
}
let bearer_token = extract_bearer_token(request.headers())?;
let request_id = request
.extensions()
.get::<RequestContext>()
.map(|context| context.request_id().to_string())
.unwrap_or_else(|| "unknown".to_string());
let bearer_token = extract_bearer_token(&headers)?;
let claims = verify_access_token(&bearer_token, state.auth_jwt_config()).map_err(|error| {
warn!(
%request_id,
@@ -230,7 +231,7 @@ fn authenticate_request(
);
AppError::from_status(StatusCode::UNAUTHORIZED)
})?;
let current_user = state
let mut current_user = state
.auth_user_service()
.get_user_by_id(claims.user_id())
.map_err(|error| {
@@ -240,15 +241,52 @@ fn authenticate_request(
"Bearer JWT 用户快照读取失败"
);
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
})?
.ok_or_else(|| {
warn!(
%request_id,
user_id = %claims.user_id(),
"Bearer JWT 对应用户不存在"
);
AppError::from_status(StatusCode::UNAUTHORIZED)
})?;
if current_user.is_none() {
warn!(
%request_id,
user_id = %claims.user_id(),
"Bearer JWT 对应用户不存在,准备刷新认证工作集后复查"
);
if refresh_auth_store_for_stale_bearer(state, &request_id, claims.user_id()).await {
current_user = state
.auth_user_service()
.get_user_by_id(claims.user_id())
.map_err(|error| {
warn!(
%request_id,
error = %error,
"Bearer JWT 用户快照刷新后读取失败"
);
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
})?;
}
}
let Some(mut current_user) = current_user else {
warn!(
%request_id,
user_id = %claims.user_id(),
"Bearer JWT 对应用户不存在"
);
return Err(AppError::from_status(StatusCode::UNAUTHORIZED));
};
if current_user.token_version != claims.token_version() {
if refresh_auth_store_for_stale_bearer(state, &request_id, claims.user_id()).await
&& let Some(refreshed_user) = state
.auth_user_service()
.get_user_by_id(claims.user_id())
.map_err(|error| {
warn!(
%request_id,
error = %error,
"Bearer JWT 用户版本刷新后读取失败"
);
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
})?
{
current_user = refreshed_user;
}
}
if current_user.token_version != claims.token_version() {
warn!(
%request_id,
@@ -261,7 +299,7 @@ fn authenticate_request(
.with_message("当前登录态已失效,请重新登录"));
}
let session_is_active = state
let mut session_is_active = state
.refresh_session_service()
.is_session_active_for_user(
claims.user_id(),
@@ -278,6 +316,27 @@ fn authenticate_request(
);
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
})?;
if !session_is_active
&& refresh_auth_store_for_stale_bearer(state, &request_id, claims.user_id()).await
{
session_is_active = state
.refresh_session_service()
.is_session_active_for_user(
claims.user_id(),
claims.session_id(),
OffsetDateTime::now_utc(),
)
.map_err(|error| {
warn!(
%request_id,
user_id = %claims.user_id(),
session_id = %claims.session_id(),
error = %error,
"Bearer JWT refresh session 刷新后状态读取失败"
);
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR)
})?;
}
if !session_is_active {
warn!(
%request_id,
@@ -292,6 +351,33 @@ fn authenticate_request(
Ok(Some(AuthenticatedAccessToken::new(claims)))
}
fn request_id_from_request(request: &Request) -> String {
request
.extensions()
.get::<RequestContext>()
.map(|context| context.request_id().to_string())
.unwrap_or_else(|| "unknown".to_string())
}
async fn refresh_auth_store_for_stale_bearer(
state: &AppState,
request_id: &str,
user_id: &str,
) -> bool {
match state.refresh_auth_store_from_spacetime().await {
Ok(refreshed) => refreshed,
Err(error) => {
warn!(
%request_id,
user_id = %user_id,
error = %error,
"刷新认证工作集失败,继续按本进程现有状态处理"
);
false
}
}
}
pub async fn inspect_auth_claims(
Extension(request_context): Extension<RequestContext>,
Extension(authenticated): Extension<AuthenticatedAccessToken>,

View File

@@ -7,6 +7,7 @@ use module_auth::{RefreshSessionError, RotateRefreshSessionInput};
use platform_auth::hash_refresh_session_token;
use shared_contracts::auth::RefreshSessionResponse;
use time::OffsetDateTime;
use tracing::warn;
use crate::{
api_response::json_success_body,
@@ -39,16 +40,48 @@ pub async fn refresh_session(
let next_refresh_token = platform_auth::create_refresh_session_token();
let next_refresh_token_hash = hash_refresh_session_token(&next_refresh_token);
let rotated = state
.refresh_session_service()
.rotate_session(
RotateRefreshSessionInput {
refresh_token_hash,
next_refresh_token_hash,
},
OffsetDateTime::now_utc(),
)
.map_err(|error| map_refresh_error_with_clear_cookie(&state, error))?;
let rotated = match state.refresh_session_service().rotate_session(
RotateRefreshSessionInput {
refresh_token_hash: refresh_token_hash.clone(),
next_refresh_token_hash: next_refresh_token_hash.clone(),
},
OffsetDateTime::now_utc(),
) {
Ok(rotated) => rotated,
Err(RefreshSessionError::SessionNotFound) => {
match state.refresh_auth_store_from_spacetime().await {
Ok(true) => {}
Ok(false) => {
return Err(map_refresh_error_with_clear_cookie(
&state,
RefreshSessionError::SessionNotFound,
));
}
Err(error) => {
warn!(
request_id = request_context.request_id(),
error = %error,
"refresh session 本地未命中后刷新认证工作集失败"
);
return Err(map_refresh_error_with_clear_cookie(
&state,
RefreshSessionError::SessionNotFound,
));
}
}
state
.refresh_session_service()
.rotate_session(
RotateRefreshSessionInput {
refresh_token_hash,
next_refresh_token_hash,
},
OffsetDateTime::now_utc(),
)
.map_err(|error| map_refresh_error_with_clear_cookie(&state, error))?
}
Err(error) => return Err(map_refresh_error_with_clear_cookie(&state, error)),
};
let access_token = sign_access_token_for_user(
&state,
&rotated.user,

View File

@@ -12,7 +12,8 @@ use axum::extract::FromRef;
use module_ai::{AiTaskService, InMemoryAiTaskStore};
use module_auth::{
AuthUserService, InMemoryAuthStore, PasswordEntryService, PhoneAuthService,
RefreshSessionService, WechatAuthService, WechatAuthStateService,
RefreshAuthStoreSnapshotResult, RefreshSessionService, WechatAuthService,
WechatAuthStateService,
};
use module_runtime::RuntimeSnapshotRecord;
#[cfg(test)]
@@ -660,6 +661,46 @@ impl AppState {
Ok(())
}
pub fn refresh_auth_store_from_snapshot_json(
&self,
snapshot_json: &str,
) -> Result<RefreshAuthStoreSnapshotResult, SpacetimeClientError> {
self.auth_store
.refresh_from_snapshot_json(snapshot_json)
.map_err(SpacetimeClientError::Runtime)
}
pub async fn refresh_auth_store_from_spacetime(&self) -> Result<bool, SpacetimeClientError> {
#[cfg(test)]
{
return Ok(false);
}
#[cfg(not(test))]
{
let snapshot = self
.spacetime_client
.export_auth_store_snapshot_from_tables()
.await?;
let Some(snapshot_json) = snapshot
.snapshot_json
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(false);
};
let result = self.refresh_auth_store_from_snapshot_json(snapshot_json)?;
info!(
user_count = result.user_count,
session_count = result.session_count,
updated_at_micros = snapshot.updated_at_micros,
"已按需刷新本进程认证工作集"
);
Ok(true)
}
}
pub async fn try_restore_auth_store_from_spacetime(
config: AppConfig,
) -> Result<Self, AppStateInitError> {