Fail closed when SpacetimeDB auth restore is unavailable
This commit is contained in:
@@ -2,11 +2,12 @@ use axum::{
|
||||
Router,
|
||||
body::Body,
|
||||
extract::{Extension, FromRef},
|
||||
http::Request,
|
||||
http::{Request, StatusCode},
|
||||
middleware,
|
||||
response::Response,
|
||||
routing::{get, post},
|
||||
};
|
||||
use serde_json::json;
|
||||
use tower_http::{
|
||||
classify::ServerErrorsFailureClass,
|
||||
trace::{DefaultOnRequest, TraceLayer},
|
||||
@@ -18,6 +19,7 @@ use crate::{
|
||||
backpressure::limit_concurrent_requests,
|
||||
creation_entry_config::require_creation_entry_route_enabled,
|
||||
error_middleware::normalize_error_response,
|
||||
http_error::AppError,
|
||||
modules,
|
||||
request_context::{RequestContext, attach_request_context, resolve_request_id},
|
||||
response_headers::propagate_request_id_header,
|
||||
@@ -164,6 +166,96 @@ pub fn build_router(state: AppState) -> Router {
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
pub fn build_spacetime_unavailable_router(message: String) -> Router {
|
||||
Router::new()
|
||||
.fallback(spacetime_unavailable_handler)
|
||||
.layer(Extension(SpacetimeUnavailableState {
|
||||
message: message.into(),
|
||||
}))
|
||||
// 依赖不可用模式不挂业务 state,统一返回 503,并继续保留 request_id / API 版本 / 耗时响应头。
|
||||
.layer(middleware::from_fn(normalize_error_response))
|
||||
.layer(middleware::from_fn(propagate_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(|request: &Request<Body>| {
|
||||
let request_id =
|
||||
resolve_request_id(request).unwrap_or_else(|| "unknown".to_string());
|
||||
let route = crate::telemetry::observability_route(request.uri().path());
|
||||
let scheme = crate::telemetry::resolve_request_scheme(request.headers());
|
||||
let span_name = format!("{} {}", request.method(), route);
|
||||
|
||||
info_span!(
|
||||
"http.request",
|
||||
otel.kind = "server",
|
||||
otel.name = %span_name,
|
||||
otel.status_code = tracing::field::Empty,
|
||||
http.response.status_code = tracing::field::Empty,
|
||||
method = %request.method(),
|
||||
http.request.method = %request.method(),
|
||||
http.route = %route,
|
||||
url.scheme = %scheme,
|
||||
url.path = %request.uri().path(),
|
||||
request_id = %request_id,
|
||||
status = tracing::field::Empty,
|
||||
latency_ms = tracing::field::Empty,
|
||||
)
|
||||
})
|
||||
.on_request(DefaultOnRequest::new().level(Level::INFO))
|
||||
.on_response(
|
||||
|response: &axum::response::Response,
|
||||
latency: std::time::Duration,
|
||||
span: &Span| {
|
||||
let latency_ms = latency.as_millis().min(u64::MAX as u128) as u64;
|
||||
let status = response.status().as_u16();
|
||||
span.record("status", status);
|
||||
span.record("http.response.status_code", status);
|
||||
span.record(
|
||||
"otel.status_code",
|
||||
if response.status().is_server_error() {
|
||||
"ERROR"
|
||||
} else {
|
||||
"OK"
|
||||
},
|
||||
);
|
||||
span.record("latency_ms", latency_ms);
|
||||
},
|
||||
)
|
||||
.on_failure(
|
||||
|failure: ServerErrorsFailureClass,
|
||||
latency: std::time::Duration,
|
||||
span: &Span| {
|
||||
let latency_ms = latency.as_millis().min(u64::MAX as u128) as u64;
|
||||
error!(
|
||||
parent: span,
|
||||
latency_ms,
|
||||
failure = %failure,
|
||||
"http request failed"
|
||||
);
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(middleware::from_fn(attach_request_context))
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct SpacetimeUnavailableState {
|
||||
message: std::sync::Arc<str>,
|
||||
}
|
||||
|
||||
async fn spacetime_unavailable_handler(
|
||||
Extension(state): Extension<SpacetimeUnavailableState>,
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
) -> Response {
|
||||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
|
||||
.with_message("SpacetimeDB 暂不可用,api-server 正在等待数据库恢复")
|
||||
.with_details(json!({
|
||||
"provider": "spacetimedb",
|
||||
"reason": "spacetime_startup_unavailable",
|
||||
"message": state.message.as_ref(),
|
||||
}))
|
||||
.into_response_with_context(Some(&request_context))
|
||||
}
|
||||
|
||||
async fn record_api_tracking_after_success(
|
||||
axum::extract::State(state): axum::extract::State<AppState>,
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
@@ -368,7 +460,7 @@ mod tests {
|
||||
|
||||
use crate::{config::AppConfig, state::AppState};
|
||||
|
||||
use super::build_router;
|
||||
use super::{build_router, build_spacetime_unavailable_router};
|
||||
|
||||
const TEST_PASSWORD: &str = "secret123";
|
||||
const INTERNAL_TEST_SECRET: &str = "test-internal-secret";
|
||||
@@ -564,6 +656,38 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spacetime_unavailable_router_returns_service_unavailable_for_requests() {
|
||||
let app = build_spacetime_unavailable_router("SpacetimeDB 启动恢复认证快照超时".to_string());
|
||||
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/api/auth/login-options")
|
||||
.header("x-request-id", "req-spacetime-unavailable")
|
||||
.body(Body::empty())
|
||||
.expect("request should build"),
|
||||
)
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
|
||||
assert_eq!(
|
||||
response
|
||||
.headers()
|
||||
.get("x-request-id")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("req-spacetime-unavailable")
|
||||
);
|
||||
let body = read_json_response(response).await;
|
||||
assert_eq!(body["error"]["code"], "SERVICE_UNAVAILABLE");
|
||||
assert_eq!(
|
||||
body["error"]["details"]["reason"],
|
||||
"spacetime_startup_unavailable"
|
||||
);
|
||||
assert_eq!(body["error"]["details"]["provider"], "spacetimedb");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn creation_entry_route_disabled_returns_service_unavailable() {
|
||||
let state = AppState::new(AppConfig::default()).expect("state should build");
|
||||
|
||||
@@ -11,7 +11,6 @@ use platform_speech::{
|
||||
};
|
||||
|
||||
const DEFAULT_INTERNAL_API_SECRET: &str = "genarrative-dev-internal-bridge";
|
||||
const DEFAULT_AUTH_STORE_PATH: &str = "server-rs/.data/auth-store.json";
|
||||
const SPACETIME_LOCAL_CONFIG_FILE: &str = "spacetime.local.json";
|
||||
pub(crate) const DEFAULT_VECTOR_ENGINE_IMAGE_REQUEST_TIMEOUT_MS: u64 = 1_000_000;
|
||||
|
||||
@@ -45,7 +44,6 @@ pub struct AppConfig {
|
||||
pub refresh_cookie_secure: bool,
|
||||
pub refresh_cookie_same_site: String,
|
||||
pub refresh_session_ttl_days: u32,
|
||||
pub auth_store_path: PathBuf,
|
||||
pub dev_password_entry_auto_register_enabled: bool,
|
||||
pub sms_auth_enabled: bool,
|
||||
pub sms_auth_provider: String,
|
||||
@@ -184,7 +182,6 @@ impl Default for AppConfig {
|
||||
refresh_cookie_secure: false,
|
||||
refresh_cookie_same_site: "Lax".to_string(),
|
||||
refresh_session_ttl_days: 30,
|
||||
auth_store_path: PathBuf::from(DEFAULT_AUTH_STORE_PATH),
|
||||
dev_password_entry_auto_register_enabled: false,
|
||||
sms_auth_enabled: false,
|
||||
sms_auth_provider: "mock".to_string(),
|
||||
@@ -433,9 +430,6 @@ impl AppConfig {
|
||||
config.refresh_session_ttl_days = refresh_session_ttl_days;
|
||||
}
|
||||
|
||||
if let Some(auth_store_path) = read_first_non_empty_env(&["GENARRATIVE_AUTH_STORE_PATH"]) {
|
||||
config.auth_store_path = PathBuf::from(auth_store_path);
|
||||
}
|
||||
if let Some(enabled) =
|
||||
read_first_bool_env(&["GENARRATIVE_DEV_PASSWORD_ENTRY_AUTO_REGISTER_ENABLED"])
|
||||
{
|
||||
|
||||
@@ -236,7 +236,6 @@ mod tests {
|
||||
AccessTokenClaims, AccessTokenClaimsInput, AuthProvider, BindingStatus, sign_access_token,
|
||||
};
|
||||
use serde_json::{Value, json};
|
||||
use std::path::PathBuf;
|
||||
use time::OffsetDateTime;
|
||||
use tower::ServiceExt;
|
||||
|
||||
@@ -394,12 +393,7 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn build_test_state(label: &str) -> AppState {
|
||||
let mut config = AppConfig::default();
|
||||
config.auth_store_path = PathBuf::from(format!(
|
||||
".codex-temp/api-server-auth-store-creation-doc-{label}.json"
|
||||
));
|
||||
let _ = std::fs::remove_file(&config.auth_store_path);
|
||||
|
||||
AppState::new(config).expect("state should build")
|
||||
let _ = label;
|
||||
AppState::new(AppConfig::default()).expect("state should build")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,9 +107,13 @@ use std::{
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::runtime::Builder as TokioRuntimeBuilder;
|
||||
use tokio::time::timeout;
|
||||
use tracing::{info, warn};
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::{app::build_router, config::AppConfig, state::AppState};
|
||||
use crate::{
|
||||
app::{build_router, build_spacetime_unavailable_router},
|
||||
config::AppConfig,
|
||||
state::{AppState, AppStateInitError},
|
||||
};
|
||||
|
||||
const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024;
|
||||
const AUTH_STORE_STARTUP_RESTORE_TIMEOUT: Duration = Duration::from_secs(8);
|
||||
@@ -156,14 +160,21 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
let otel_enabled = config.otel_enabled;
|
||||
let listener = build_tcp_listener(bind_address, listen_backlog)?;
|
||||
|
||||
let state = restore_app_state_for_startup(config)
|
||||
.await
|
||||
.map_err(|error| std::io::Error::other(format!("初始化应用状态失败:{error}")))?;
|
||||
state.puzzle_gallery_cache().spawn_cleanup_task();
|
||||
if let Some(outbox) = state.tracking_outbox() {
|
||||
outbox.spawn_worker();
|
||||
}
|
||||
let router = build_router(state);
|
||||
let router = match restore_app_state_for_startup(config).await {
|
||||
Ok(state) => {
|
||||
state.puzzle_gallery_cache().spawn_cleanup_task();
|
||||
if let Some(outbox) = state.tracking_outbox() {
|
||||
outbox.spawn_worker();
|
||||
}
|
||||
build_router(state)
|
||||
}
|
||||
Err(AppStateInitError::DependencyUnavailable(message)) => {
|
||||
build_spacetime_unavailable_router(message)
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(std::io::Error::other(format!("初始化应用状态失败:{error}")));
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
%bind_address,
|
||||
@@ -192,7 +203,6 @@ fn build_tcp_listener(
|
||||
async fn restore_app_state_for_startup(
|
||||
config: AppConfig,
|
||||
) -> Result<AppState, state::AppStateInitError> {
|
||||
let fallback_config = config.clone();
|
||||
match timeout(
|
||||
AUTH_STORE_STARTUP_RESTORE_TIMEOUT,
|
||||
AppState::try_restore_auth_store_from_spacetime(config),
|
||||
@@ -201,11 +211,13 @@ async fn restore_app_state_for_startup(
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
error!(
|
||||
timeout_seconds = AUTH_STORE_STARTUP_RESTORE_TIMEOUT.as_secs(),
|
||||
"启动恢复认证快照超时,跳过远端恢复并继续启动 api-server"
|
||||
"启动等待 SpacetimeDB 恢复认证快照超时,api-server 将进入依赖不可用模式"
|
||||
);
|
||||
AppState::new(fallback_config)
|
||||
Err(state::AppStateInitError::DependencyUnavailable(
|
||||
"SpacetimeDB 启动恢复认证快照超时".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
fmt, fs,
|
||||
fmt,
|
||||
sync::{Arc, Mutex},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use axum::extract::FromRef;
|
||||
@@ -300,6 +299,7 @@ pub enum AppStateInitError {
|
||||
Jwt(JwtError),
|
||||
RefreshCookie(RefreshCookieError),
|
||||
AuthStore(String),
|
||||
DependencyUnavailable(String),
|
||||
SmsProvider(SmsProviderError),
|
||||
WechatPay(String),
|
||||
Oss(OssError),
|
||||
@@ -308,12 +308,12 @@ pub enum AppStateInitError {
|
||||
|
||||
impl AppState {
|
||||
pub fn new(config: AppConfig) -> Result<Self, AppStateInitError> {
|
||||
#[cfg(test)]
|
||||
let auth_store = InMemoryAuthStore::default();
|
||||
#[cfg(not(test))]
|
||||
let auth_store = InMemoryAuthStore::from_persistence_path(config.auth_store_path.clone())
|
||||
.map_err(AppStateInitError::AuthStore)?;
|
||||
Self::new_with_auth_store(config, auth_store)
|
||||
Self::new_with_empty_auth_store(config)
|
||||
}
|
||||
|
||||
pub fn new_with_empty_auth_store(config: AppConfig) -> Result<Self, AppStateInitError> {
|
||||
// 中文注释:api-server 不再把本地 auth-store.json 当作用户认证真相源,启动恢复只允许来自 SpacetimeDB。
|
||||
Self::new_with_auth_store(config, InMemoryAuthStore::default())
|
||||
}
|
||||
|
||||
fn new_with_auth_store(
|
||||
@@ -549,8 +549,8 @@ impl AppState {
|
||||
OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000,
|
||||
)
|
||||
.map_err(|_| SpacetimeClientError::Runtime("认证快照更新时间超出 i64 范围".to_string()))?;
|
||||
// 本地 auth_store 是当前认证请求的即时真相源;SpacetimeDB 正式认证表用于跨进程恢复。
|
||||
// 远端数据库挂起或网络异常时,只降级远端恢复能力,不能让已成功的登录/刷新/退出回滚为失败。
|
||||
// 当前进程内 auth_store 是认证请求的即时工作集;SpacetimeDB 正式认证表用于跨进程恢复。
|
||||
// 远端数据库挂起或网络异常时,只降级后续恢复能力,不能让已成功的登录/刷新/退出回滚为失败。
|
||||
#[cfg(not(test))]
|
||||
if let Err(error) = self
|
||||
.spacetime_client
|
||||
@@ -577,64 +577,42 @@ impl AppState {
|
||||
pool_size: config.spacetime_pool_size,
|
||||
procedure_timeout: config.spacetime_procedure_timeout,
|
||||
});
|
||||
let mut candidates = Vec::new();
|
||||
let mut spacetime_restore_available = false;
|
||||
let mut restore_errors = Vec::new();
|
||||
|
||||
match spacetime_client
|
||||
.export_auth_store_snapshot_from_tables()
|
||||
.await
|
||||
{
|
||||
Ok(snapshot) => {
|
||||
spacetime_restore_available = true;
|
||||
if let Some(candidate) = auth_store_candidate_from_snapshot_record(
|
||||
snapshot,
|
||||
AuthStoreRestoreSource::SpacetimeTables,
|
||||
)? {
|
||||
candidates.push(candidate);
|
||||
let state = Self::new_with_auth_store(config, candidate.auth_store)?;
|
||||
info!(
|
||||
source = candidate.source.as_str(),
|
||||
updated_at_micros = candidate.updated_at_micros,
|
||||
"已恢复认证快照"
|
||||
);
|
||||
return Ok(state);
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(error = %error, "从 SpacetimeDB 表恢复认证快照失败");
|
||||
restore_errors.push(error.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
match spacetime_client.get_auth_store_snapshot().await {
|
||||
Ok(snapshot) => {
|
||||
if let Some(candidate) = auth_store_candidate_from_snapshot_record(
|
||||
snapshot,
|
||||
AuthStoreRestoreSource::SpacetimeSnapshot,
|
||||
)? {
|
||||
candidates.push(candidate);
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(error = %error, "从 SpacetimeDB 快照记录恢复认证快照失败");
|
||||
}
|
||||
if !spacetime_restore_available {
|
||||
return Err(AppStateInitError::DependencyUnavailable(format!(
|
||||
"SpacetimeDB 认证恢复不可用:{}",
|
||||
restore_errors.join("; ")
|
||||
)));
|
||||
}
|
||||
|
||||
if let Some(candidate) = auth_store_candidate_from_local_file(&config)? {
|
||||
candidates.push(candidate);
|
||||
}
|
||||
|
||||
if let Some(candidate) = select_auth_store_restore_candidate(candidates) {
|
||||
let source = candidate.source;
|
||||
let should_sync_to_spacetime = source == AuthStoreRestoreSource::LocalFile;
|
||||
let state = Self::new_with_auth_store(config, candidate.auth_store)?;
|
||||
info!(
|
||||
source = source.as_str(),
|
||||
updated_at_micros = candidate.updated_at_micros,
|
||||
"已恢复认证快照"
|
||||
);
|
||||
if should_sync_to_spacetime {
|
||||
if let Err(error) = state.sync_auth_store_snapshot_to_spacetime().await {
|
||||
warn!(
|
||||
error = %error,
|
||||
"本地认证快照回写 SpacetimeDB 失败,当前启动继续"
|
||||
);
|
||||
}
|
||||
}
|
||||
return Ok(state);
|
||||
}
|
||||
|
||||
Self::new(config)
|
||||
Self::new_with_empty_auth_store(config)
|
||||
}
|
||||
|
||||
pub fn refresh_session_service(&self) -> &RefreshSessionService {
|
||||
@@ -988,16 +966,12 @@ impl AppState {
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum AuthStoreRestoreSource {
|
||||
SpacetimeTables,
|
||||
SpacetimeSnapshot,
|
||||
LocalFile,
|
||||
}
|
||||
|
||||
impl AuthStoreRestoreSource {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::SpacetimeTables => "spacetime_tables",
|
||||
Self::SpacetimeSnapshot => "spacetime_snapshot",
|
||||
Self::LocalFile => "local_file",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1029,57 +1003,14 @@ fn auth_store_candidate_from_snapshot_record(
|
||||
}))
|
||||
}
|
||||
|
||||
fn auth_store_candidate_from_local_file(
|
||||
config: &AppConfig,
|
||||
) -> Result<Option<AuthStoreRestoreCandidate>, AppStateInitError> {
|
||||
if !config.auth_store_path.is_file() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let updated_at_micros = fs::metadata(&config.auth_store_path)
|
||||
.ok()
|
||||
.and_then(|metadata| metadata.modified().ok())
|
||||
.and_then(system_time_to_unix_micros);
|
||||
let auth_store = InMemoryAuthStore::from_persistence_path(config.auth_store_path.clone())
|
||||
.map_err(AppStateInitError::AuthStore)?;
|
||||
|
||||
Ok(Some(AuthStoreRestoreCandidate {
|
||||
source: AuthStoreRestoreSource::LocalFile,
|
||||
updated_at_micros,
|
||||
auth_store,
|
||||
}))
|
||||
}
|
||||
|
||||
fn system_time_to_unix_micros(system_time: SystemTime) -> Option<i64> {
|
||||
let duration = system_time.duration_since(UNIX_EPOCH).ok()?;
|
||||
i64::try_from(duration.as_micros()).ok()
|
||||
}
|
||||
|
||||
fn select_auth_store_restore_candidate(
|
||||
candidates: Vec<AuthStoreRestoreCandidate>,
|
||||
) -> Option<AuthStoreRestoreCandidate> {
|
||||
candidates.into_iter().max_by_key(|candidate| {
|
||||
(
|
||||
candidate.updated_at_micros.unwrap_or(i64::MIN),
|
||||
auth_store_restore_source_priority(candidate.source),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn auth_store_restore_source_priority(source: AuthStoreRestoreSource) -> u8 {
|
||||
match source {
|
||||
AuthStoreRestoreSource::SpacetimeTables => 3,
|
||||
AuthStoreRestoreSource::SpacetimeSnapshot => 2,
|
||||
AuthStoreRestoreSource::LocalFile => 1,
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for AppStateInitError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Jwt(error) => write!(f, "{error}"),
|
||||
Self::RefreshCookie(error) => write!(f, "{error}"),
|
||||
Self::AuthStore(error) | Self::WechatPay(error) => write!(f, "{error}"),
|
||||
Self::AuthStore(error) | Self::DependencyUnavailable(error) | Self::WechatPay(error) => {
|
||||
write!(f, "{error}")
|
||||
}
|
||||
Self::SmsProvider(error) => write!(f, "{error}"),
|
||||
Self::Oss(error) => write!(f, "{error}"),
|
||||
Self::Llm(error) => write!(f, "{error}"),
|
||||
|
||||
Reference in New Issue
Block a user