use serde::Serialize; use crate::{ProcedureContext, ReducerContext, SpacetimeType, Table, Timestamp}; use super::{ mapper::{ AuthUserSnapshot, PersistentAuthStoreSnapshot, RefreshSessionSnapshot, StoredPasswordUserSnapshot, StoredRefreshSessionSnapshot, StoredWechatIdentitySnapshot, sanitize_identity_component, }, tables::{ AuthIdentity, AuthStoreProjectionMeta, AuthStoreSnapshot, RefreshSession, UserAccount, auth_identity, auth_store_projection_meta, auth_store_snapshot, refresh_session, user_account, }, }; const AUTH_STORE_PROJECTION_META_ID: &str = "default"; const AUTH_STORE_SNAPSHOT_META_NEXT_USER_ID: &str = "meta/next_user_id"; const AUTH_STORE_SNAPSHOT_USER_PREFIX: &str = "user/"; const AUTH_STORE_SNAPSHOT_PHONE_PREFIX: &str = "phone/"; const AUTH_STORE_SNAPSHOT_SESSION_PREFIX: &str = "session/"; const AUTH_STORE_SNAPSHOT_SESSION_HASH_PREFIX: &str = "session_hash/"; const AUTH_STORE_SNAPSHOT_WECHAT_PREFIX: &str = "wechat/"; const AUTH_STORE_SNAPSHOT_UNION_PREFIX: &str = "union/"; #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct AuthStoreSnapshotRecord { pub snapshot_json: Option, pub updated_at_micros: Option, } #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct AuthStoreSnapshotUpsertInput { pub snapshot_json: String, pub updated_at_micros: i64, } #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct AuthStoreSnapshotProcedureResult { pub ok: bool, pub record: Option, pub error_message: Option, } fn normalize_user_account_tags( tags: Option>, ) -> Result, module_runtime::RuntimeProfileFieldError> { module_runtime::normalize_profile_user_tags(tags.unwrap_or_default()) } fn prefixed_snapshot_id(prefix: &str, value: &str) -> String { format!("{prefix}{}", sanitize_identity_component(value)) } fn upsert_auth_snapshot_row( ctx: &ReducerContext, snapshot_id: String, snapshot_json: String, updated_at: Timestamp, ) { if ctx .db .auth_store_snapshot() .snapshot_id() .find(&snapshot_id) .is_some() { ctx.db .auth_store_snapshot() .snapshot_id() .delete(&snapshot_id); } ctx.db.auth_store_snapshot().insert(AuthStoreSnapshot { snapshot_id, snapshot_json, updated_at, }); } fn auth_store_snapshot_user_row_id(user_id: &str) -> String { prefixed_snapshot_id(AUTH_STORE_SNAPSHOT_USER_PREFIX, user_id) } fn auth_store_snapshot_phone_row_id(phone_number: &str, user_id: &str) -> String { prefixed_snapshot_id( AUTH_STORE_SNAPSHOT_PHONE_PREFIX, &format!("{phone_number}|{user_id}"), ) } fn auth_store_snapshot_session_row_id(session_id: &str) -> String { prefixed_snapshot_id(AUTH_STORE_SNAPSHOT_SESSION_PREFIX, session_id) } fn auth_store_snapshot_session_hash_row_id(refresh_token_hash: &str, session_id: &str) -> String { prefixed_snapshot_id( AUTH_STORE_SNAPSHOT_SESSION_HASH_PREFIX, &format!("{refresh_token_hash}|{session_id}"), ) } fn auth_store_snapshot_wechat_row_id(provider_uid: &str, user_id: &str) -> String { prefixed_snapshot_id( AUTH_STORE_SNAPSHOT_WECHAT_PREFIX, &format!("{provider_uid}|{user_id}"), ) } fn auth_store_snapshot_union_row_id(union_id: &str, user_id: &str) -> String { prefixed_snapshot_id( AUTH_STORE_SNAPSHOT_UNION_PREFIX, &format!("{union_id}|{user_id}"), ) } fn snapshot_has_user_rows(snapshot: &PersistentAuthStoreSnapshot) -> bool { !snapshot.users_by_username.is_empty() } fn to_snapshot_row_json(label: &str, value: &T) -> Result { serde_json::to_string(value).map_err(|error| format!("{label} 序列化失败:{error}")) } #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct AuthStoreSnapshotImportRecord { pub imported_user_count: u32, pub imported_identity_count: u32, pub imported_refresh_session_count: u32, } #[derive(Clone, Debug, PartialEq, Eq, SpacetimeType)] pub struct AuthStoreSnapshotImportProcedureResult { pub ok: bool, pub record: Option, pub error_message: Option, } // Axum 运行期认证变更直接导入正式认证表,并把快照拆成行级记录;禁止再写 `auth_store_snapshot/default`。 #[spacetimedb::procedure] pub fn import_auth_store_snapshot_json( ctx: &mut ProcedureContext, input: AuthStoreSnapshotUpsertInput, ) -> AuthStoreSnapshotImportProcedureResult { match ctx.try_with_tx(|tx| import_auth_store_snapshot_json_tx(tx, input.clone())) { Ok(record) => AuthStoreSnapshotImportProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => AuthStoreSnapshotImportProcedureResult { ok: false, record: None, error_message: Some(message), }, } } // Axum 启动时可从正式表重新导出 module-auth 使用的整份认证快照。 #[spacetimedb::procedure] pub fn export_auth_store_snapshot_from_tables( ctx: &mut ProcedureContext, ) -> AuthStoreSnapshotProcedureResult { match ctx.try_with_tx(|tx| export_auth_store_snapshot_from_tables_tx(tx)) { Ok(record) => AuthStoreSnapshotProcedureResult { ok: true, record: Some(record), error_message: None, }, Err(message) => AuthStoreSnapshotProcedureResult { ok: false, record: None, error_message: Some(message), }, } } fn import_auth_store_snapshot_json_tx( ctx: &ReducerContext, input: AuthStoreSnapshotUpsertInput, ) -> Result { import_auth_store_snapshot_json_value_tx(ctx, &input.snapshot_json, input.updated_at_micros) } fn import_auth_store_snapshot_json_value_tx( ctx: &ReducerContext, snapshot_json: &str, updated_at_micros: i64, ) -> Result { let snapshot_json = snapshot_json.trim(); if snapshot_json.is_empty() { return Err("认证快照 JSON 不能为空".to_string()); } let parsed = serde_json::from_str::(snapshot_json) .map_err(|error| format!("认证快照 JSON 解析失败:{error}"))?; if !snapshot_has_user_rows(&parsed) { return Err("认证快照缺少用户记录,拒绝导入正式表".to_string()); } upsert_auth_store_snapshot_rows(ctx, &parsed, updated_at_micros)?; upsert_auth_projection_meta(ctx, updated_at_micros); let mut imported_user_count = 0_u32; let mut imported_identity_count = 0_u32; let mut imported_refresh_session_count = 0_u32; for stored_user in parsed.users_by_username.into_values() { let user = stored_user.user; let user_id = user.id.clone(); if ctx.db.user_account().user_id().find(&user_id).is_some() { ctx.db.user_account().user_id().delete(&user_id); } ctx.db.user_account().insert(UserAccount { user_id: user_id.clone(), public_user_code: user.public_user_code, username: user.username, display_name: user.display_name, avatar_url: user.avatar_url, phone_number_masked: user.phone_number_masked, phone_number_e164: stored_user.phone_number.clone(), login_method: user.login_method, binding_status: user.binding_status, wechat_bound: user.wechat_bound, password_hash: stored_user.password_hash, password_login_enabled: stored_user.password_login_enabled, token_version: user.token_version, user_tags: Some( module_runtime::normalize_profile_user_tags(user.user_tags) .map_err(|error| error.to_string())?, ), }); imported_user_count += 1; if let Some(phone_number) = stored_user.phone_number { let identity_id = format!("authi_phone_{}", sanitize_identity_component(&phone_number)); if ctx .db .auth_identity() .identity_id() .find(&identity_id) .is_some() { ctx.db.auth_identity().identity_id().delete(&identity_id); } ctx.db.auth_identity().insert(AuthIdentity { identity_id, user_id, provider: "phone".to_string(), provider_uid: phone_number.clone(), provider_union_id: None, phone_e164: Some(phone_number), display_name: None, avatar_url: None, }); imported_identity_count += 1; } } for identity in parsed.wechat_identity_by_provider_uid.into_values() { let identity_id = format!( "authi_wechat_{}", sanitize_identity_component(&identity.provider_uid) ); if ctx .db .auth_identity() .identity_id() .find(&identity_id) .is_some() { ctx.db.auth_identity().identity_id().delete(&identity_id); } ctx.db.auth_identity().insert(AuthIdentity { identity_id, user_id: identity.user_id, provider: "wechat".to_string(), provider_uid: identity.provider_uid, provider_union_id: identity.provider_union_id, phone_e164: None, display_name: identity.display_name, avatar_url: identity.avatar_url, }); imported_identity_count += 1; } for stored_session in parsed.sessions_by_id.into_values() { let session = stored_session.session; let client_info_json = serde_json::to_string(&session.client_info) .map_err(|error| format!("客户端身份序列化失败:{error}"))?; if ctx .db .refresh_session() .session_id() .find(&session.session_id) .is_some() { ctx.db .refresh_session() .session_id() .delete(&session.session_id); } ctx.db.refresh_session().insert(RefreshSession { session_id: session.session_id, user_id: session.user_id, refresh_token_hash: session.refresh_token_hash, issued_by_provider: session.issued_by_provider, client_info_json, expires_at: session.expires_at, revoked_at: session.revoked_at, created_at: session.created_at, updated_at: session.updated_at, last_seen_at: session.last_seen_at, }); imported_refresh_session_count += 1; } Ok(AuthStoreSnapshotImportRecord { imported_user_count, imported_identity_count, imported_refresh_session_count, }) } fn upsert_auth_store_snapshot_rows( ctx: &ReducerContext, snapshot: &PersistentAuthStoreSnapshot, updated_at_micros: i64, ) -> Result<(), String> { let updated_at = Timestamp::from_micros_since_unix_epoch(updated_at_micros); let desired_ids = auth_store_snapshot_row_ids(snapshot); for row in ctx.db.auth_store_snapshot().iter().collect::>() { if !desired_ids.contains(&row.snapshot_id) { ctx.db .auth_store_snapshot() .snapshot_id() .delete(&row.snapshot_id); } } upsert_auth_snapshot_row( ctx, AUTH_STORE_SNAPSHOT_META_NEXT_USER_ID.to_string(), to_snapshot_row_json("认证快照 next_user_id", &snapshot.next_user_id)?, updated_at, ); for user in snapshot.users_by_username.values() { upsert_auth_snapshot_row( ctx, auth_store_snapshot_user_row_id(&user.user.id), to_snapshot_row_json("认证快照用户", user)?, updated_at, ); } for (phone_number, user_id) in &snapshot.phone_to_user_id { upsert_auth_snapshot_row( ctx, auth_store_snapshot_phone_row_id(phone_number, user_id), to_snapshot_row_json("认证快照手机号索引", user_id)?, updated_at, ); } for session in snapshot.sessions_by_id.values() { upsert_auth_snapshot_row( ctx, auth_store_snapshot_session_row_id(&session.session.session_id), to_snapshot_row_json("认证快照会话", session)?, updated_at, ); } for (refresh_token_hash, session_id) in &snapshot.session_id_by_refresh_token_hash { upsert_auth_snapshot_row( ctx, auth_store_snapshot_session_hash_row_id(refresh_token_hash, session_id), to_snapshot_row_json("认证快照 refresh token 索引", session_id)?, updated_at, ); } for identity in snapshot.wechat_identity_by_provider_uid.values() { upsert_auth_snapshot_row( ctx, auth_store_snapshot_wechat_row_id(&identity.provider_uid, &identity.user_id), to_snapshot_row_json("认证快照微信身份", identity)?, updated_at, ); } for (union_id, user_id) in &snapshot.user_id_by_provider_union_id { upsert_auth_snapshot_row( ctx, auth_store_snapshot_union_row_id(union_id, user_id), to_snapshot_row_json("认证快照微信 union 索引", user_id)?, updated_at, ); } Ok(()) } fn auth_store_snapshot_row_ids( snapshot: &PersistentAuthStoreSnapshot, ) -> std::collections::HashSet { let mut ids = std::collections::HashSet::new(); ids.insert(AUTH_STORE_SNAPSHOT_META_NEXT_USER_ID.to_string()); for user in snapshot.users_by_username.values() { ids.insert(auth_store_snapshot_user_row_id(&user.user.id)); } for (phone_number, user_id) in &snapshot.phone_to_user_id { ids.insert(auth_store_snapshot_phone_row_id(phone_number, user_id)); } for session in snapshot.sessions_by_id.values() { ids.insert(auth_store_snapshot_session_row_id( &session.session.session_id, )); } for (refresh_token_hash, session_id) in &snapshot.session_id_by_refresh_token_hash { ids.insert(auth_store_snapshot_session_hash_row_id( refresh_token_hash, session_id, )); } for identity in snapshot.wechat_identity_by_provider_uid.values() { ids.insert(auth_store_snapshot_wechat_row_id( &identity.provider_uid, &identity.user_id, )); } for (union_id, user_id) in &snapshot.user_id_by_provider_union_id { ids.insert(auth_store_snapshot_union_row_id(union_id, user_id)); } ids } fn export_auth_store_snapshot_from_tables_tx( ctx: &ReducerContext, ) -> Result { let users = ctx.db.user_account().iter().collect::>(); let identities = ctx.db.auth_identity().iter().collect::>(); let sessions = ctx.db.refresh_session().iter().collect::>(); if users.is_empty() && identities.is_empty() && sessions.is_empty() { return Ok(AuthStoreSnapshotRecord { snapshot_json: None, updated_at_micros: None, }); } let updated_at_micros = ctx .db .auth_store_projection_meta() .meta_id() .find(&AUTH_STORE_PROJECTION_META_ID.to_string()) .map(|row| row.updated_at.to_micros_since_unix_epoch()); let valid_user_ids = users .iter() .map(|user| user.user_id.clone()) .collect::>(); let mut phone_identity_by_user_id = std::collections::HashMap::new(); let mut phone_to_user_id = std::collections::HashMap::new(); let mut wechat_identity_by_provider_uid = std::collections::HashMap::new(); let mut user_id_by_provider_union_id = std::collections::HashMap::new(); for identity in identities { if !valid_user_ids.contains(&identity.user_id) { continue; } match identity.provider.as_str() { "phone" => { let phone_number = identity .phone_e164 .clone() .unwrap_or_else(|| identity.provider_uid.clone()); phone_to_user_id.insert(phone_number.clone(), identity.user_id.clone()); phone_identity_by_user_id.insert(identity.user_id, phone_number); } "wechat" => { if let Some(union_id) = identity.provider_union_id.clone() { user_id_by_provider_union_id.insert(union_id, identity.user_id.clone()); } wechat_identity_by_provider_uid.insert( identity.provider_uid.clone(), StoredWechatIdentitySnapshot { user_id: identity.user_id, provider_uid: identity.provider_uid, provider_union_id: identity.provider_union_id, display_name: identity.display_name, avatar_url: identity.avatar_url, }, ); } _ => {} } } let mut next_user_id = 1_u64; let mut users_by_username = std::collections::HashMap::new(); for user in users { if let Some(numeric_id) = user .user_id .strip_prefix("user_") .and_then(|value| value.parse::().ok()) { next_user_id = next_user_id.max(numeric_id.saturating_add(1)); } let auth_user = AuthUserSnapshot { id: user.user_id.clone(), public_user_code: user.public_user_code, username: user.username.clone(), display_name: user.display_name, avatar_url: user.avatar_url, phone_number_masked: user.phone_number_masked, login_method: user.login_method, binding_status: user.binding_status, wechat_bound: user.wechat_bound, token_version: user.token_version, user_tags: normalize_user_account_tags(user.user_tags) .map_err(|error| error.to_string())?, }; users_by_username.insert( user.username, StoredPasswordUserSnapshot { user: auth_user, password_hash: user.password_hash, password_login_enabled: user.password_login_enabled, phone_number: user .phone_number_e164 .or_else(|| phone_identity_by_user_id.remove(&user.user_id)), }, ); } let mut sessions_by_id = std::collections::HashMap::new(); let mut session_id_by_refresh_token_hash = std::collections::HashMap::new(); for session in sessions { if !valid_user_ids.contains(&session.user_id) { continue; } let client_info = serde_json::from_str::(&session.client_info_json) .map_err(|error| format!("refresh session 客户端信息 JSON 解析失败:{error}"))?; session_id_by_refresh_token_hash.insert( session.refresh_token_hash.clone(), session.session_id.clone(), ); sessions_by_id.insert( session.session_id.clone(), StoredRefreshSessionSnapshot { session: RefreshSessionSnapshot { session_id: session.session_id, user_id: session.user_id, refresh_token_hash: session.refresh_token_hash, issued_by_provider: session.issued_by_provider, client_info, expires_at: session.expires_at, revoked_at: session.revoked_at, created_at: session.created_at, updated_at: session.updated_at, last_seen_at: session.last_seen_at, }, }, ); } let snapshot = PersistentAuthStoreSnapshot { next_user_id, users_by_username, phone_to_user_id, sessions_by_id, session_id_by_refresh_token_hash, wechat_identity_by_provider_uid, user_id_by_provider_union_id, }; if let Some(updated_at_micros) = updated_at_micros { upsert_auth_store_snapshot_rows(ctx, &snapshot, updated_at_micros)?; } let snapshot_json = serde_json::to_string_pretty(&snapshot) .map_err(|error| format!("序列化认证快照失败:{error}"))?; Ok(AuthStoreSnapshotRecord { snapshot_json: Some(snapshot_json), updated_at_micros, }) } fn upsert_auth_projection_meta(ctx: &ReducerContext, updated_at_micros: i64) { let meta_id = AUTH_STORE_PROJECTION_META_ID.to_string(); if ctx .db .auth_store_projection_meta() .meta_id() .find(&meta_id) .is_some() { ctx.db .auth_store_projection_meta() .meta_id() .delete(&meta_id); } ctx.db .auth_store_projection_meta() .insert(AuthStoreProjectionMeta { meta_id, updated_at: Timestamp::from_micros_since_unix_epoch(updated_at_micros), }); } #[cfg(test)] mod tests { use super::*; fn sample_snapshot() -> PersistentAuthStoreSnapshot { let user = StoredPasswordUserSnapshot { user: AuthUserSnapshot { id: "user_00000042".to_string(), public_user_code: "GN-000042".to_string(), username: "phone_42".to_string(), display_name: "测试玩家".to_string(), avatar_url: None, phone_number_masked: Some("138****8000".to_string()), login_method: "phone".to_string(), binding_status: "active".to_string(), wechat_bound: true, token_version: 3, user_tags: vec!["early".to_string()], }, password_hash: "hash-42".to_string(), password_login_enabled: true, phone_number: Some("+8613800008000".to_string()), }; let session = StoredRefreshSessionSnapshot { session: RefreshSessionSnapshot { session_id: "usess_42".to_string(), user_id: "user_00000042".to_string(), refresh_token_hash: "refresh-hash-42".to_string(), issued_by_provider: "phone".to_string(), client_info: serde_json::json!({"clientType":"web"}), expires_at: "2026-06-01T00:00:00Z".to_string(), revoked_at: None, created_at: "2026-05-27T00:00:00Z".to_string(), updated_at: "2026-05-27T00:00:00Z".to_string(), last_seen_at: "2026-05-27T00:00:00Z".to_string(), }, }; let identity = StoredWechatIdentitySnapshot { user_id: "user_00000042".to_string(), provider_uid: "wx-openid-42".to_string(), provider_union_id: Some("wx-union-42".to_string()), display_name: Some("微信玩家".to_string()), avatar_url: None, }; PersistentAuthStoreSnapshot { next_user_id: 43, users_by_username: std::collections::HashMap::from([("phone_42".to_string(), user)]), phone_to_user_id: std::collections::HashMap::from([( "+8613800008000".to_string(), "user_00000042".to_string(), )]), sessions_by_id: std::collections::HashMap::from([("usess_42".to_string(), session)]), session_id_by_refresh_token_hash: std::collections::HashMap::from([( "refresh-hash-42".to_string(), "usess_42".to_string(), )]), wechat_identity_by_provider_uid: std::collections::HashMap::from([( "wx-openid-42".to_string(), identity, )]), user_id_by_provider_union_id: std::collections::HashMap::from([( "wx-union-42".to_string(), "user_00000042".to_string(), )]), } } #[test] fn auth_store_snapshot_row_ids_are_row_level_without_default_aggregate() { let ids = auth_store_snapshot_row_ids(&sample_snapshot()); assert!(!ids.contains("default")); assert!(ids.contains(AUTH_STORE_SNAPSHOT_META_NEXT_USER_ID)); assert!(ids.contains(&auth_store_snapshot_user_row_id("user_00000042"))); assert!(ids.contains(&auth_store_snapshot_phone_row_id( "+8613800008000", "user_00000042" ))); assert!(ids.contains(&auth_store_snapshot_session_row_id("usess_42"))); assert!(ids.contains(&auth_store_snapshot_session_hash_row_id( "refresh-hash-42", "usess_42" ))); assert!(ids.contains(&auth_store_snapshot_wechat_row_id( "wx-openid-42", "user_00000042" ))); assert!(ids.contains(&auth_store_snapshot_union_row_id( "wx-union-42", "user_00000042" ))); } #[test] fn auth_store_snapshot_user_row_key_is_stable_after_username_change() { let before = sample_snapshot(); let mut after = sample_snapshot(); let mut renamed_user = after .users_by_username .remove("phone_42") .expect("sample user exists"); renamed_user.user.username = "renamed_42".to_string(); after .users_by_username .insert("renamed_42".to_string(), renamed_user); assert_eq!( auth_store_snapshot_row_ids(&before), auth_store_snapshot_row_ids(&after) ); } }