Merge remote-tracking branch 'origin/master' into codex/tiaoyitiao
This commit is contained in:
@@ -877,6 +877,46 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn readyz_reports_readiness_and_draining_state() {
|
||||
let state = AppState::new(AppConfig::default()).expect("state should build");
|
||||
let app = build_router(state.clone());
|
||||
|
||||
let ready_response = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/readyz")
|
||||
.header("x-request-id", "req-ready")
|
||||
.body(Body::empty())
|
||||
.expect("readyz request should build"),
|
||||
)
|
||||
.await
|
||||
.expect("readyz request should succeed");
|
||||
assert_eq!(ready_response.status(), StatusCode::OK);
|
||||
let ready_body = read_json_response(ready_response).await;
|
||||
assert_eq!(ready_body["ok"], Value::Bool(true));
|
||||
assert_eq!(ready_body["ready"], Value::Bool(true));
|
||||
|
||||
state.mark_not_ready();
|
||||
let draining_response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/readyz")
|
||||
.header("x-request-id", "req-draining")
|
||||
.body(Body::empty())
|
||||
.expect("readyz request should build"),
|
||||
)
|
||||
.await
|
||||
.expect("readyz request should succeed");
|
||||
assert_eq!(draining_response.status(), StatusCode::SERVICE_UNAVAILABLE);
|
||||
let draining_body = read_json_response(draining_response).await;
|
||||
assert_eq!(
|
||||
draining_body["error"]["details"]["reason"],
|
||||
"api_server_draining"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn creative_agent_draft_edit_rejects_unconfirmed_template_session() {
|
||||
let app = build_internal_creative_agent_app();
|
||||
|
||||
@@ -102,7 +102,7 @@ fn reject_overloaded_request(request: &Request<Body>) -> Response {
|
||||
}
|
||||
|
||||
fn should_bypass_backpressure(request: &Request<Body>) -> bool {
|
||||
request.uri().path() == "/healthz"
|
||||
matches!(request.uri().path(), "/healthz" | "/readyz")
|
||||
}
|
||||
|
||||
fn classify_request_permit_pool(path: &str) -> HttpRequestPermitPoolKind {
|
||||
@@ -200,6 +200,7 @@ mod tests {
|
||||
.route("/held", get(held_request))
|
||||
.route("/fast", get(fast_request))
|
||||
.route("/healthz", get(fast_request))
|
||||
.route("/readyz", get(fast_request))
|
||||
.layer(middleware::from_fn_with_state(
|
||||
backpressure_state,
|
||||
limit_concurrent_requests,
|
||||
@@ -297,6 +298,13 @@ mod tests {
|
||||
.expect("healthz request should complete");
|
||||
assert_eq!(health_response.status(), StatusCode::OK);
|
||||
|
||||
let ready_response = app
|
||||
.clone()
|
||||
.oneshot(test_request("/readyz"))
|
||||
.await
|
||||
.expect("readyz request should complete");
|
||||
assert_eq!(ready_response.status(), StatusCode::OK);
|
||||
|
||||
gate.release.notify_one();
|
||||
let completed_response = held_response
|
||||
.await
|
||||
|
||||
@@ -25,6 +25,7 @@ pub struct AppConfig {
|
||||
pub gallery_max_concurrent_requests: Option<usize>,
|
||||
pub detail_max_concurrent_requests: Option<usize>,
|
||||
pub admin_max_concurrent_requests: Option<usize>,
|
||||
pub shutdown_outbox_flush_timeout: Duration,
|
||||
pub tracking_outbox_enabled: bool,
|
||||
pub tracking_outbox_dir: PathBuf,
|
||||
pub tracking_outbox_batch_size: usize,
|
||||
@@ -169,6 +170,7 @@ impl Default for AppConfig {
|
||||
gallery_max_concurrent_requests: None,
|
||||
detail_max_concurrent_requests: None,
|
||||
admin_max_concurrent_requests: None,
|
||||
shutdown_outbox_flush_timeout: Duration::from_millis(5_000),
|
||||
tracking_outbox_enabled: true,
|
||||
tracking_outbox_dir: PathBuf::from("server-rs/.data/tracking-outbox"),
|
||||
tracking_outbox_batch_size: 500,
|
||||
@@ -365,6 +367,11 @@ impl AppConfig {
|
||||
{
|
||||
config.admin_max_concurrent_requests = Some(max_concurrent_requests);
|
||||
}
|
||||
if let Some(timeout_ms) =
|
||||
read_first_positive_u64_env(&["GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS"])
|
||||
{
|
||||
config.shutdown_outbox_flush_timeout = Duration::from_millis(timeout_ms);
|
||||
}
|
||||
if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_TRACKING_OUTBOX_ENABLED"]) {
|
||||
config.tracking_outbox_enabled = enabled;
|
||||
}
|
||||
@@ -1324,6 +1331,7 @@ mod tests {
|
||||
std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_DIR");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
|
||||
@@ -1336,6 +1344,7 @@ mod tests {
|
||||
std::env::set_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS", "64");
|
||||
std::env::set_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS", "32");
|
||||
std::env::set_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS", "16");
|
||||
std::env::set_var("GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS", "3000");
|
||||
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED", "false");
|
||||
std::env::set_var(
|
||||
"GENARRATIVE_TRACKING_OUTBOX_DIR",
|
||||
@@ -1354,6 +1363,10 @@ mod tests {
|
||||
assert_eq!(config.gallery_max_concurrent_requests, Some(64));
|
||||
assert_eq!(config.detail_max_concurrent_requests, Some(32));
|
||||
assert_eq!(config.admin_max_concurrent_requests, Some(16));
|
||||
assert_eq!(
|
||||
config.shutdown_outbox_flush_timeout,
|
||||
std::time::Duration::from_millis(3_000)
|
||||
);
|
||||
assert!(!config.tracking_outbox_enabled);
|
||||
assert_eq!(
|
||||
config.tracking_outbox_dir,
|
||||
@@ -1374,6 +1387,7 @@ mod tests {
|
||||
std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_API_SHUTDOWN_OUTBOX_FLUSH_TIMEOUT_MS");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_ENABLED");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_DIR");
|
||||
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
use axum::{Json, extract::Extension};
|
||||
use axum::{
|
||||
Json,
|
||||
extract::{Extension, State},
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use serde_json::{Value, json};
|
||||
|
||||
use crate::{api_response::json_success_body, request_context::RequestContext};
|
||||
use crate::{
|
||||
api_response::json_success_body, http_error::AppError, request_context::RequestContext,
|
||||
state::AppState,
|
||||
};
|
||||
|
||||
pub async fn health_check(Extension(request_context): Extension<RequestContext>) -> Json<Value> {
|
||||
json_success_body(
|
||||
@@ -12,3 +20,28 @@ pub async fn health_check(Extension(request_context): Extension<RequestContext>)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn readiness_check(
|
||||
State(state): State<AppState>,
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
) -> Response {
|
||||
if state.is_ready() {
|
||||
return json_success_body(
|
||||
Some(&request_context),
|
||||
json!({
|
||||
"ok": true,
|
||||
"ready": true,
|
||||
"service": "genarrative-api-server",
|
||||
}),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
|
||||
.with_message("api-server 正在退出,不再接收新流量")
|
||||
.with_details(json!({
|
||||
"reason": "api_server_draining",
|
||||
"ready": false,
|
||||
}))
|
||||
.into_response_with_context(Some(&request_context))
|
||||
}
|
||||
|
||||
@@ -99,25 +99,35 @@ use shared_logging::{OtelConfig, init_tracing};
|
||||
use socket2::{Domain, Protocol, Socket, Type};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
env, fs, io,
|
||||
env, fs, future, io,
|
||||
net::{SocketAddr, TcpListener as StdTcpListener},
|
||||
panic, thread,
|
||||
panic,
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::runtime::Builder as TokioRuntimeBuilder;
|
||||
use tokio::time::timeout;
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::{
|
||||
app::{build_router, build_spacetime_unavailable_router},
|
||||
config::AppConfig,
|
||||
state::{AppState, AppStateInitError},
|
||||
tracking_outbox::TrackingOutbox,
|
||||
};
|
||||
|
||||
const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024;
|
||||
const AUTH_STORE_STARTUP_RESTORE_TIMEOUT: Duration = Duration::from_secs(8);
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ShutdownContext {
|
||||
app_state: Option<AppState>,
|
||||
tracking_outbox: Option<Arc<TrackingOutbox>>,
|
||||
outbox_flush_timeout: Duration,
|
||||
}
|
||||
|
||||
fn main() -> Result<(), io::Error> {
|
||||
// Windows 本地调试下 Axum 路由树和启动恢复链较重,显式放大启动线程栈,避免 debug 构建在进入监听前栈溢出。
|
||||
let server_thread = thread::Builder::new()
|
||||
@@ -158,19 +168,33 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
let listen_backlog = config.listen_backlog;
|
||||
let worker_threads = config.worker_threads;
|
||||
let otel_enabled = config.otel_enabled;
|
||||
let outbox_flush_timeout = config.shutdown_outbox_flush_timeout;
|
||||
let listener = build_tcp_listener(bind_address, listen_backlog)?;
|
||||
|
||||
let router = match restore_app_state_for_startup(config).await {
|
||||
let (router, shutdown_context) = match restore_app_state_for_startup(config).await {
|
||||
Ok(state) => {
|
||||
state.puzzle_gallery_cache().spawn_cleanup_task();
|
||||
if let Some(outbox) = state.tracking_outbox() {
|
||||
let tracking_outbox = state.tracking_outbox();
|
||||
if let Some(outbox) = tracking_outbox.clone() {
|
||||
outbox.spawn_worker();
|
||||
}
|
||||
build_router(state)
|
||||
}
|
||||
Err(AppStateInitError::DependencyUnavailable(message)) => {
|
||||
build_spacetime_unavailable_router(message)
|
||||
(
|
||||
build_router(state.clone()),
|
||||
ShutdownContext {
|
||||
app_state: Some(state),
|
||||
tracking_outbox,
|
||||
outbox_flush_timeout,
|
||||
},
|
||||
)
|
||||
}
|
||||
Err(AppStateInitError::DependencyUnavailable(message)) => (
|
||||
build_spacetime_unavailable_router(message),
|
||||
ShutdownContext {
|
||||
app_state: None,
|
||||
tracking_outbox: None,
|
||||
outbox_flush_timeout,
|
||||
},
|
||||
),
|
||||
Err(error) => {
|
||||
return Err(std::io::Error::other(format!(
|
||||
"初始化应用状态失败:{error}"
|
||||
@@ -186,7 +210,98 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
"api-server 已完成 tracing 初始化并开始监听"
|
||||
);
|
||||
|
||||
axum::serve(listener, router).await
|
||||
let result = axum::serve(listener, router)
|
||||
.with_graceful_shutdown(shutdown_signal(shutdown_context.clone()))
|
||||
.await;
|
||||
finalize_shutdown(shutdown_context).await;
|
||||
result
|
||||
}
|
||||
|
||||
async fn shutdown_signal(context: ShutdownContext) {
|
||||
let signal = wait_for_shutdown_signal().await;
|
||||
if let Some(state) = context.app_state.as_ref() {
|
||||
state.mark_not_ready();
|
||||
}
|
||||
info!(
|
||||
signal,
|
||||
"api-server 收到退出信号,已标记 readiness 不可用并开始排空 HTTP 请求"
|
||||
);
|
||||
}
|
||||
|
||||
async fn wait_for_shutdown_signal() -> &'static str {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
tokio::select! {
|
||||
signal = wait_for_ctrl_c_signal() => signal,
|
||||
signal = wait_for_sigterm_signal() => signal,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
wait_for_ctrl_c_signal().await
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_ctrl_c_signal() -> &'static str {
|
||||
if let Err(error) = tokio::signal::ctrl_c().await {
|
||||
error!(error = %error, "监听 SIGINT 失败,无法通过 Ctrl-C 触发优雅退出");
|
||||
future::pending::<()>().await;
|
||||
}
|
||||
"sigint"
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn wait_for_sigterm_signal() -> &'static str {
|
||||
let mut signal = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
{
|
||||
Ok(signal) => signal,
|
||||
Err(error) => {
|
||||
error!(error = %error, "监听 SIGTERM 失败,无法通过 systemd terminate 触发优雅退出");
|
||||
future::pending::<()>().await;
|
||||
unreachable!("pending future never returns");
|
||||
}
|
||||
};
|
||||
signal.recv().await;
|
||||
"sigterm"
|
||||
}
|
||||
|
||||
async fn finalize_shutdown(context: ShutdownContext) {
|
||||
if let Some(state) = context.app_state.as_ref() {
|
||||
state.mark_not_ready();
|
||||
}
|
||||
|
||||
let Some(outbox) = context.tracking_outbox else {
|
||||
return;
|
||||
};
|
||||
|
||||
if context.outbox_flush_timeout.is_zero() {
|
||||
warn!("api-server 退出时 tracking outbox flush timeout 为 0,跳过主动 flush");
|
||||
return;
|
||||
}
|
||||
|
||||
let timeout_ms = context
|
||||
.outbox_flush_timeout
|
||||
.as_millis()
|
||||
.min(u128::from(u64::MAX)) as u64;
|
||||
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
|
||||
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
|
||||
Ok(Ok(())) => {
|
||||
info!("api-server 退出前 tracking outbox flush 完成");
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(
|
||||
timeout_ms,
|
||||
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_tcp_listener(
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
use axum::{Router, routing::get};
|
||||
|
||||
use crate::{health::health_check, state::AppState};
|
||||
use crate::{
|
||||
health::{health_check, readiness_check},
|
||||
state::AppState,
|
||||
};
|
||||
|
||||
pub fn router(_state: AppState) -> Router<AppState> {
|
||||
Router::new().route("/healthz", get(health_check))
|
||||
Router::new()
|
||||
.route("/healthz", get(health_check))
|
||||
.route("/readyz", get(readiness_check))
|
||||
}
|
||||
|
||||
@@ -2,7 +2,10 @@ use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
fmt,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
use axum::extract::FromRef;
|
||||
@@ -229,6 +232,7 @@ pub struct AppStateInner {
|
||||
// 配置会在后续中间件、路由和平台适配接入时逐步消费。
|
||||
#[allow(dead_code)]
|
||||
pub config: AppConfig,
|
||||
ready: AtomicBool,
|
||||
http_request_permit_pools: HttpRequestPermitPools,
|
||||
auth_jwt_config: JwtConfig,
|
||||
admin_runtime: Option<AdminRuntime>,
|
||||
@@ -399,6 +403,7 @@ impl AppState {
|
||||
|
||||
Ok(Self(Arc::new(AppStateInner {
|
||||
config,
|
||||
ready: AtomicBool::new(true),
|
||||
http_request_permit_pools,
|
||||
auth_jwt_config,
|
||||
admin_runtime,
|
||||
@@ -447,6 +452,14 @@ impl AppState {
|
||||
self.http_request_permit_pools.clone()
|
||||
}
|
||||
|
||||
pub fn is_ready(&self) -> bool {
|
||||
self.ready.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn mark_not_ready(&self) {
|
||||
self.ready.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
pub async fn upsert_creation_entry_type_config(
|
||||
&self,
|
||||
input: module_runtime::CreationEntryTypeAdminUpsertInput,
|
||||
|
||||
@@ -159,6 +159,16 @@ impl TrackingOutbox {
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn flush_for_shutdown(&self) -> Result<(), TrackingOutboxError> {
|
||||
{
|
||||
let mut inner = self.inner.lock().await;
|
||||
self.ensure_initialized_locked(&mut inner).await?;
|
||||
self.seal_active_locked(&mut inner, "shutdown").await?;
|
||||
}
|
||||
|
||||
self.flush_sealed_files_once().await
|
||||
}
|
||||
|
||||
async fn seal_active_if_due(&self) -> Result<(), TrackingOutboxError> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
self.ensure_initialized_locked(&mut inner).await?;
|
||||
@@ -176,7 +186,11 @@ impl TrackingOutbox {
|
||||
crate::telemetry::update_tracking_outbox_pending_files(sealed_files.len());
|
||||
for path in sealed_files {
|
||||
let started_at = Instant::now();
|
||||
let metadata = fs::metadata(&path).await?;
|
||||
let metadata = match fs::metadata(&path).await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
let file_bytes = metadata.len();
|
||||
let events = match read_outbox_events(&path).await {
|
||||
Ok(events) => events,
|
||||
@@ -203,7 +217,11 @@ impl TrackingOutbox {
|
||||
|
||||
match self.spacetime_client.record_tracking_events(events).await {
|
||||
Ok(accepted_count) => {
|
||||
fs::remove_file(&path).await?;
|
||||
match fs::remove_file(&path).await {
|
||||
Ok(()) => {}
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(error) => return Err(error.into()),
|
||||
}
|
||||
self.subtract_total_bytes(file_bytes).await;
|
||||
crate::telemetry::record_tracking_outbox_flush(
|
||||
started_at.elapsed(),
|
||||
@@ -596,6 +614,34 @@ mod tests {
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_flush_seals_active_file_for_later_retry() {
|
||||
let dir = test_dir("shutdown");
|
||||
let outbox = test_outbox(dir.clone(), 500, 1024 * 1024);
|
||||
|
||||
outbox.enqueue(sample_event("event-1")).await.unwrap();
|
||||
let result = outbox.flush_for_shutdown().await;
|
||||
|
||||
assert!(
|
||||
matches!(result, Err(TrackingOutboxError::Spacetime(_))),
|
||||
"missing test SpacetimeDB should keep sealed file for retry"
|
||||
);
|
||||
assert!(!dir.join(ACTIVE_FILE_NAME).exists());
|
||||
let sealed_count = std::fs::read_dir(&dir)
|
||||
.unwrap()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| {
|
||||
entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX))
|
||||
})
|
||||
.count();
|
||||
assert_eq!(sealed_count, 1);
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn directory_size_excludes_quarantined_corrupt_files() {
|
||||
let dir = test_dir("directory-size");
|
||||
|
||||
Reference in New Issue
Block a user