perf(api-server): tune gallery load shedding

This commit is contained in:
kdletters
2026-05-19 01:00:33 +08:00
parent 3eb292b403
commit 8038b6a6ee
22 changed files with 1178 additions and 80 deletions

View File

@@ -1,7 +1,7 @@
use axum::{
Router,
body::Body,
extract::Extension,
extract::{Extension, FromRef},
http::Request,
middleware,
response::Response,
@@ -22,7 +22,7 @@ use crate::{
request_context::{RequestContext, attach_request_context, resolve_request_id},
response_headers::propagate_request_id_header,
runtime_inventory::get_runtime_inventory_state,
state::AppState,
state::{AppState, BackpressureState},
telemetry::record_http_observability,
tracking::record_route_tracking_event_after_success,
vector_engine_audio_generation::{
@@ -79,7 +79,7 @@ pub fn build_router(state: AppState) -> Router {
))
// HTTP 背压在业务路由外侧快拒绝,避免过载请求继续占用 SpacetimeDB facade 与业务执行资源。
.layer(middleware::from_fn_with_state(
state.clone(),
BackpressureState::from_ref(&state),
limit_concurrent_requests,
))
// 错误归一化层放在 tracing 里侧,让 tracing 记录到最终对外返回的状态与错误体形态。

View File

@@ -13,11 +13,11 @@ use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use crate::{
http_error::AppError,
request_context::RequestContext,
state::{AppState, HttpRequestPermitPool},
state::{BackpressureState, HttpRequestPermitPool, HttpRequestPermitPoolKind},
};
pub async fn limit_concurrent_requests(
State(state): State<AppState>,
State(state): State<BackpressureState>,
request: Request,
next: Next,
) -> Response {
@@ -25,29 +25,38 @@ pub async fn limit_concurrent_requests(
return next.run(request).await;
}
let Some(permit_pool) = state.http_request_permit_pool() else {
let requested_pool = classify_request_permit_pool(request.uri().path());
let Some((permit_pool_kind, permit_pool)) = state.request_permit_pool(requested_pool) else {
return next.run(request).await;
};
match acquire_http_request_permit(permit_pool) {
match acquire_http_request_permit(permit_pool_kind, permit_pool) {
Ok(permit) => hold_permit_until_response_body_dropped(next.run(request).await, permit),
Err(_) => reject_overloaded_request(&request),
}
}
fn acquire_http_request_permit(
permit_pool_kind: HttpRequestPermitPoolKind,
permit_pool: Arc<HttpRequestPermitPool>,
) -> Result<HttpRequestPermitGuard, TryAcquireError> {
match permit_pool.clone().try_acquire_owned() {
Ok(permit) => {
crate::telemetry::update_http_request_permits_available(permit_pool.available_permits());
crate::telemetry::update_http_request_permits_available(
permit_pool_kind,
permit_pool.available_permits(),
);
Ok(HttpRequestPermitGuard {
permit_pool_kind,
permit: Some(permit),
permit_pool,
})
}
Err(error) => {
crate::telemetry::update_http_request_permits_available(permit_pool.available_permits());
crate::telemetry::update_http_request_permits_available(
permit_pool_kind,
permit_pool.available_permits(),
);
Err(error)
}
}
@@ -66,6 +75,7 @@ fn hold_permit_until_response_body_dropped(
}
struct HttpRequestPermitGuard {
permit_pool_kind: HttpRequestPermitPoolKind,
permit: Option<OwnedSemaphorePermit>,
permit_pool: Arc<HttpRequestPermitPool>,
}
@@ -73,7 +83,10 @@ struct HttpRequestPermitGuard {
impl Drop for HttpRequestPermitGuard {
fn drop(&mut self) {
drop(self.permit.take());
crate::telemetry::update_http_request_permits_available(self.permit_pool.available_permits());
crate::telemetry::update_http_request_permits_available(
self.permit_pool_kind,
self.permit_pool.available_permits(),
);
}
}
@@ -92,6 +105,44 @@ fn should_bypass_backpressure(request: &Request<Body>) -> bool {
request.uri().path() == "/healthz"
}
fn classify_request_permit_pool(path: &str) -> HttpRequestPermitPoolKind {
if is_gallery_list_path(path) {
HttpRequestPermitPoolKind::Gallery
} else if is_gallery_detail_path(path) {
HttpRequestPermitPoolKind::Detail
} else if path.starts_with("/admin/api/") {
HttpRequestPermitPoolKind::Admin
} else {
HttpRequestPermitPoolKind::Default
}
}
fn is_gallery_list_path(path: &str) -> bool {
matches!(
path,
"/api/runtime/puzzle/gallery" | "/api/runtime/custom-world-gallery"
)
}
fn is_gallery_detail_path(path: &str) -> bool {
let puzzle_prefix = "/api/runtime/puzzle/gallery/";
if let Some(profile_id) = path.strip_prefix(puzzle_prefix) {
return !profile_id.is_empty() && !profile_id.contains('/');
}
let custom_world_prefix = "/api/runtime/custom-world-gallery/";
if let Some(remainder) = path.strip_prefix(custom_world_prefix) {
let mut segments = remainder.split('/');
return matches!(
(segments.next(), segments.next(), segments.next()),
(Some(owner_user_id), Some(profile_id), None)
if !owner_user_id.is_empty() && !profile_id.is_empty()
);
}
false
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -107,9 +158,14 @@ mod tests {
use tokio::sync::Notify;
use tower::ServiceExt;
use crate::{config::AppConfig, state::AppState};
use axum::extract::FromRef;
use super::limit_concurrent_requests;
use crate::{
config::AppConfig,
state::{AppState, BackpressureState},
};
use super::{classify_request_permit_pool, limit_concurrent_requests};
#[derive(Clone)]
struct HeldRequestGate {
@@ -138,13 +194,50 @@ mod tests {
let mut config = AppConfig::default();
config.max_concurrent_requests = Some(max_concurrent_requests);
let state = AppState::new(config).expect("state should build");
let backpressure_state = BackpressureState::from_ref(&state);
Router::new()
.route("/held", get(held_request))
.route("/fast", get(fast_request))
.route("/healthz", get(fast_request))
.layer(middleware::from_fn_with_state(
state.clone(),
backpressure_state,
limit_concurrent_requests,
))
.layer(Extension(gate))
.with_state(state)
}
fn build_grouped_test_app(
default_max_concurrent_requests: usize,
gallery_max_concurrent_requests: usize,
admin_max_concurrent_requests: usize,
gate: HeldRequestGate,
) -> Router {
let mut config = AppConfig::default();
config.max_concurrent_requests = Some(default_max_concurrent_requests);
config.gallery_max_concurrent_requests = Some(gallery_max_concurrent_requests);
config.admin_max_concurrent_requests = Some(admin_max_concurrent_requests);
let state = AppState::new(config).expect("state should build");
let backpressure_state = BackpressureState::from_ref(&state);
Router::new()
.route("/held", get(held_request))
.route("/api/runtime/puzzle/gallery", get(held_request))
.route("/api/runtime/custom-world-gallery", get(held_request))
.route("/api/runtime/puzzle/gallery/profile-1", get(held_request))
.route(
"/api/runtime/puzzle/gallery/profile-1/like",
get(fast_request),
)
.route(
"/api/runtime/custom-world-gallery/user-1/profile-1",
get(held_request),
)
.route("/admin/api/overview", get(held_request))
.route("/fast", get(fast_request))
.layer(middleware::from_fn_with_state(
backpressure_state,
limit_concurrent_requests,
))
.layer(Extension(gate))
@@ -242,4 +335,147 @@ mod tests {
.expect("third request should complete");
assert_eq!(accepted_response.status(), StatusCode::OK);
}
#[tokio::test]
async fn gallery_pool_rejects_gallery_without_blocking_default_routes() {
let gate = HeldRequestGate {
entered: Arc::new(Notify::new()),
release: Arc::new(Notify::new()),
};
let app = build_grouped_test_app(2, 1, 1, gate.clone());
let entered = gate.entered.notified();
let held_response = tokio::spawn(
app.clone()
.oneshot(test_request("/api/runtime/puzzle/gallery")),
);
entered.await;
let rejected_gallery_response = app
.clone()
.oneshot(test_request("/api/runtime/custom-world-gallery"))
.await
.expect("rejected gallery request should complete");
assert_eq!(
rejected_gallery_response.status(),
StatusCode::TOO_MANY_REQUESTS
);
let accepted_default_response = app
.clone()
.oneshot(test_request("/fast"))
.await
.expect("default request should complete");
assert_eq!(accepted_default_response.status(), StatusCode::OK);
gate.release.notify_one();
let completed_response = held_response
.await
.expect("held request task should join")
.expect("held request should complete");
assert_eq!(completed_response.status(), StatusCode::OK);
}
#[tokio::test]
async fn detail_pool_falls_back_to_default_when_unset() {
let gate = HeldRequestGate {
entered: Arc::new(Notify::new()),
release: Arc::new(Notify::new()),
};
let mut config = AppConfig::default();
config.max_concurrent_requests = Some(1);
config.detail_max_concurrent_requests = None;
let state = AppState::new(config).expect("state should build");
let backpressure_state = BackpressureState::from_ref(&state);
let app = Router::new()
.route("/api/runtime/puzzle/gallery/profile-1", get(held_request))
.route("/fast", get(fast_request))
.layer(middleware::from_fn_with_state(
backpressure_state,
limit_concurrent_requests,
))
.layer(Extension(gate.clone()))
.with_state(state);
let entered = gate.entered.notified();
let held_response = tokio::spawn(
app.clone()
.oneshot(test_request("/api/runtime/puzzle/gallery/profile-1")),
);
entered.await;
let rejected_default_response = app
.clone()
.oneshot(test_request("/fast"))
.await
.expect("default request should complete");
assert_eq!(
rejected_default_response.status(),
StatusCode::TOO_MANY_REQUESTS
);
gate.release.notify_one();
let completed_response = held_response
.await
.expect("held request task should join")
.expect("held request should complete");
assert_eq!(completed_response.status(), StatusCode::OK);
}
#[tokio::test]
async fn admin_pool_is_isolated_from_default_routes() {
let gate = HeldRequestGate {
entered: Arc::new(Notify::new()),
release: Arc::new(Notify::new()),
};
let app = build_grouped_test_app(2, 1, 1, gate.clone());
let entered = gate.entered.notified();
let held_response = tokio::spawn(app.clone().oneshot(test_request("/admin/api/overview")));
entered.await;
let rejected_admin_response = app
.clone()
.oneshot(test_request("/admin/api/overview"))
.await
.expect("rejected admin request should complete");
assert_eq!(
rejected_admin_response.status(),
StatusCode::TOO_MANY_REQUESTS
);
let accepted_default_response = app
.clone()
.oneshot(test_request("/fast"))
.await
.expect("default request should complete");
assert_eq!(accepted_default_response.status(), StatusCode::OK);
gate.release.notify_one();
let completed_response = held_response
.await
.expect("held request task should join")
.expect("held request should complete");
assert_eq!(completed_response.status(), StatusCode::OK);
}
#[test]
fn classifies_only_exact_gallery_detail_paths_as_detail() {
assert_eq!(
classify_request_permit_pool("/api/runtime/puzzle/gallery/profile-1"),
crate::state::HttpRequestPermitPoolKind::Detail
);
assert_eq!(
classify_request_permit_pool("/api/runtime/puzzle/gallery/profile-1/like"),
crate::state::HttpRequestPermitPoolKind::Default
);
assert_eq!(
classify_request_permit_pool("/api/runtime/custom-world-gallery/user-1/profile-1"),
crate::state::HttpRequestPermitPoolKind::Detail
);
assert_eq!(
classify_request_permit_pool("/api/runtime/custom-world-gallery/user-1/profile-1/like"),
crate::state::HttpRequestPermitPoolKind::Default
);
}
}

View File

@@ -23,6 +23,9 @@ pub struct AppConfig {
pub listen_backlog: i32,
pub worker_threads: Option<usize>,
pub max_concurrent_requests: Option<usize>,
pub gallery_max_concurrent_requests: Option<usize>,
pub detail_max_concurrent_requests: Option<usize>,
pub admin_max_concurrent_requests: Option<usize>,
pub log_filter: String,
pub otel_enabled: bool,
pub admin_username: Option<String>,
@@ -154,6 +157,9 @@ impl Default for AppConfig {
listen_backlog: 1024,
worker_threads: None,
max_concurrent_requests: None,
gallery_max_concurrent_requests: None,
detail_max_concurrent_requests: None,
admin_max_concurrent_requests: None,
log_filter: "info,tower_http=info".to_string(),
otel_enabled: false,
admin_username: None,
@@ -322,6 +328,21 @@ impl AppConfig {
{
config.max_concurrent_requests = Some(max_concurrent_requests);
}
if let Some(max_concurrent_requests) =
read_first_usize_env(&["GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS"])
{
config.gallery_max_concurrent_requests = Some(max_concurrent_requests);
}
if let Some(max_concurrent_requests) =
read_first_usize_env(&["GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS"])
{
config.detail_max_concurrent_requests = Some(max_concurrent_requests);
}
if let Some(max_concurrent_requests) =
read_first_usize_env(&["GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS"])
{
config.admin_max_concurrent_requests = Some(max_concurrent_requests);
}
if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) {
config.otel_enabled = otel_enabled;
}
@@ -1206,10 +1227,16 @@ mod tests {
std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG");
std::env::remove_var("GENARRATIVE_API_WORKER_THREADS");
std::env::remove_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048");
std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6");
std::env::set_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS", "128");
std::env::set_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS", "64");
std::env::set_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS", "32");
std::env::set_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS", "16");
std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true");
}
@@ -1217,12 +1244,18 @@ mod tests {
assert_eq!(config.listen_backlog, 2048);
assert_eq!(config.worker_threads, Some(6));
assert_eq!(config.max_concurrent_requests, Some(128));
assert_eq!(config.gallery_max_concurrent_requests, Some(64));
assert_eq!(config.detail_max_concurrent_requests, Some(32));
assert_eq!(config.admin_max_concurrent_requests, Some(16));
assert!(config.otel_enabled);
unsafe {
std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG");
std::env::remove_var("GENARRATIVE_API_WORKER_THREADS");
std::env::remove_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_GALLERY_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_DETAIL_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_API_ADMIN_MAX_CONCURRENT_REQUESTS");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
}
}

View File

@@ -1,4 +1,7 @@
use std::sync::OnceLock;
use std::{
sync::{Mutex, OnceLock},
time::Instant,
};
use opentelemetry::global;
use tracing::warn;
@@ -52,6 +55,38 @@ fn register_process_metrics_once() {
})
.build();
meter
.f64_observable_counter("process.cpu.time")
.with_unit("s")
.with_description("api-server total user plus system CPU time")
.with_callback(|observer| {
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
return;
};
if let Some(cpu_time_seconds) = snapshot.cpu_time_seconds {
observer.observe(cpu_time_seconds, &[]);
}
})
.build();
meter
.f64_observable_gauge("genarrative.process.cpu.usage_percent")
.with_unit("%")
.with_description("api-server process CPU usage between metric collections")
.with_callback(|observer| {
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
return;
};
if let Some(cpu_time_seconds) = snapshot.cpu_time_seconds {
if let Some(usage_percent) =
process_cpu_usage_percent(cpu_time_seconds, Instant::now())
{
observer.observe(usage_percent, &[]);
}
}
})
.build();
meter
.i64_observable_up_down_counter("process.thread.count")
.with_unit("{thread}")
@@ -97,11 +132,12 @@ fn to_i64(value: u64) -> i64 {
value.min(i64::MAX as u64) as i64
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq)]
struct ProcessMetricsSnapshot {
rss_bytes: u64,
private_bytes: Option<u64>,
virtual_bytes: Option<u64>,
cpu_time_seconds: Option<f64>,
thread_count: u64,
windows_handle_count: Option<u64>,
unix_fd_count: Option<u64>,
@@ -111,12 +147,56 @@ impl ProcessMetricsSnapshot {
fn collect() -> Option<Self> {
collect_process_metrics()
.inspect_err(|error| {
warn!(%error, "采集 api-server 进程内存指标失败");
warn!(%error, "采集 api-server 进程指标失败");
})
.ok()
}
}
#[derive(Debug, Clone, Copy)]
struct CpuUsageSample {
cpu_time_seconds: f64,
observed_at: Instant,
}
fn process_cpu_usage_percent(cpu_time_seconds: f64, observed_at: Instant) -> Option<f64> {
static LAST_SAMPLE: OnceLock<Mutex<Option<CpuUsageSample>>> = OnceLock::new();
let mut last_sample = LAST_SAMPLE.get_or_init(|| Mutex::new(None)).lock().ok()?;
let previous = *last_sample;
*last_sample = Some(CpuUsageSample {
cpu_time_seconds,
observed_at,
});
let previous = previous?;
let wall_delta_seconds = observed_at
.checked_duration_since(previous.observed_at)?
.as_secs_f64();
cpu_usage_ratio_between_samples(
previous.cpu_time_seconds,
cpu_time_seconds,
0.0,
wall_delta_seconds,
)
.map(|ratio| ratio * 100.0)
}
fn cpu_usage_ratio_between_samples(
previous_cpu_seconds: f64,
current_cpu_seconds: f64,
previous_wall_seconds: f64,
current_wall_seconds: f64,
) -> Option<f64> {
let cpu_delta_seconds = current_cpu_seconds - previous_cpu_seconds;
let wall_delta_seconds = current_wall_seconds - previous_wall_seconds;
if cpu_delta_seconds < 0.0 || wall_delta_seconds <= 0.0 {
return None;
}
Some(cpu_delta_seconds / wall_delta_seconds)
}
#[cfg(windows)]
fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
use windows_sys::Win32::{
@@ -149,16 +229,52 @@ fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
Some(u64::from(handle_count))
};
let cpu_time_seconds = windows_process_cpu_time_seconds(handle);
Ok(ProcessMetricsSnapshot {
rss_bytes: counters.WorkingSetSize as u64,
private_bytes: Some(counters.PrivateUsage as u64),
virtual_bytes: Some(counters.PrivateUsage as u64),
cpu_time_seconds,
thread_count: u64::from(unsafe { GetCurrentProcessId() }.thread_count()?),
windows_handle_count: handle_count,
unix_fd_count: None,
})
}
#[cfg(windows)]
fn windows_process_cpu_time_seconds(handle: windows_sys::Win32::Foundation::HANDLE) -> Option<f64> {
use windows_sys::Win32::{
Foundation::FILETIME,
System::Threading::GetProcessTimes,
};
let mut creation_time = FILETIME::default();
let mut exit_time = FILETIME::default();
let mut kernel_time = FILETIME::default();
let mut user_time = FILETIME::default();
let ok = unsafe {
GetProcessTimes(
handle,
&mut creation_time,
&mut exit_time,
&mut kernel_time,
&mut user_time,
)
};
if ok == 0 {
return None;
}
let total_100ns = filetime_100ns(kernel_time) + filetime_100ns(user_time);
Some(total_100ns as f64 / 10_000_000.0)
}
#[cfg(windows)]
fn filetime_100ns(filetime: windows_sys::Win32::Foundation::FILETIME) -> u64 {
((filetime.dwHighDateTime as u64) << 32) | u64::from(filetime.dwLowDateTime)
}
#[cfg(windows)]
trait WindowsProcessThreadCount {
fn thread_count(self) -> Result<u32, String>;
@@ -207,6 +323,8 @@ fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
.map_err(|error| format!("read /proc/self/status failed: {error}"))?;
let statm = std::fs::read_to_string("/proc/self/statm")
.map_err(|error| format!("read /proc/self/statm failed: {error}"))?;
let stat = std::fs::read_to_string("/proc/self/stat")
.map_err(|error| format!("read /proc/self/stat failed: {error}"))?;
let page_size = linux_page_size_bytes()?;
let rss_bytes = parse_status_kb(&status, "VmRSS:")
@@ -218,6 +336,7 @@ fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
.or_else(|| parse_statm_pages(&statm, 0).map(|value| value * page_size))
.ok_or_else(|| "missing VmSize/statm size field".to_string())?;
let private_bytes = parse_status_kb(&status, "VmData:").map(|value| value * 1024);
let cpu_time_seconds = linux_cpu_time_seconds(&stat)?;
let thread_count = parse_status_u64(&status, "Threads:")
.ok_or_else(|| "missing Threads field".to_string())?;
@@ -225,12 +344,52 @@ fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
rss_bytes,
private_bytes,
virtual_bytes: Some(virtual_bytes),
cpu_time_seconds: Some(cpu_time_seconds),
thread_count,
windows_handle_count: None,
unix_fd_count: linux_fd_count(),
})
}
#[cfg(target_os = "linux")]
fn linux_cpu_time_seconds(stat: &str) -> Result<f64, String> {
let cpu_ticks = parse_linux_proc_stat_cpu_ticks(stat)
.ok_or_else(|| "missing /proc/self/stat utime/stime fields".to_string())?;
let ticks_per_second = linux_clock_ticks_per_second()?;
Ok(cpu_ticks as f64 / ticks_per_second as f64)
}
#[cfg(target_os = "linux")]
fn linux_clock_ticks_per_second() -> Result<u64, String> {
static CLOCK_TICKS_PER_SECOND: OnceLock<Result<u64, String>> = OnceLock::new();
CLOCK_TICKS_PER_SECOND
.get_or_init(|| {
let output = std::process::Command::new("getconf")
.arg("CLK_TCK")
.output()
.map_err(|error| format!("getconf CLK_TCK failed: {error}"))?;
if !output.status.success() {
return Err(format!("getconf CLK_TCK exited with {}", output.status));
}
let text = String::from_utf8(output.stdout)
.map_err(|error| format!("getconf CLK_TCK output is not utf8: {error}"))?;
text.trim()
.parse::<u64>()
.map_err(|error| format!("parse CLK_TCK failed: {error}"))
})
.clone()
}
#[cfg(target_os = "linux")]
fn parse_linux_proc_stat_cpu_ticks(stat: &str) -> Option<u64> {
let fields_after_comm = stat.rsplit_once(") ")?.1;
let mut fields = fields_after_comm.split_whitespace();
let utime = fields.nth(11)?.parse::<u64>().ok()?;
let stime = fields.next()?.parse::<u64>().ok()?;
Some(utime + stime)
}
#[cfg(target_os = "linux")]
fn linux_page_size_bytes() -> Result<u64, String> {
let output = std::process::Command::new("getconf")
@@ -282,8 +441,12 @@ fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
#[cfg(test)]
mod tests {
use super::cpu_usage_ratio_between_samples;
#[cfg(target_os = "linux")]
use super::{parse_statm_pages, parse_status_kb, parse_status_u64};
use super::{
parse_linux_proc_stat_cpu_ticks, parse_statm_pages, parse_status_kb, parse_status_u64,
};
#[cfg(target_os = "linux")]
#[test]
@@ -303,4 +466,28 @@ mod tests {
assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 1), Some(20));
assert_eq!(parse_statm_pages("100 20", 7), None);
}
#[cfg(target_os = "linux")]
#[test]
fn parses_linux_proc_stat_cpu_ticks_with_space_in_process_name() {
let stat = "123 (api server) S 1 2 3 4 5 6 7 8 9 10 120 30 0 0 20 0 18 0 12345";
assert_eq!(parse_linux_proc_stat_cpu_ticks(stat), Some(150));
}
#[test]
fn cpu_usage_ratio_uses_cpu_time_delta_over_wall_time() {
assert_eq!(
cpu_usage_ratio_between_samples(10.0, 12.5, 100.0, 101.0),
Some(2.5)
);
assert_eq!(
cpu_usage_ratio_between_samples(10.0, 9.0, 100.0, 101.0),
None
);
assert_eq!(
cpu_usage_ratio_between_samples(10.0, 11.0, 100.0, 100.0),
None
);
}
}

View File

@@ -1534,6 +1534,13 @@ pub async fn list_puzzle_gallery(
crate::telemetry::record_puzzle_gallery_cache_hit();
return Ok(puzzle_gallery_cached_json(&request_context, response));
}
if let Some(response) = state.puzzle_gallery_cache().read_stale_response().await {
crate::telemetry::record_puzzle_gallery_cache_stale_hit();
spawn_puzzle_gallery_cache_refresh(state.clone());
return Ok(puzzle_gallery_cached_json(&request_context, response));
}
crate::telemetry::record_puzzle_gallery_cache_miss();
let _rebuild_guard = state.puzzle_gallery_cache().acquire_rebuild_guard().await;
if let Some(response) = state.puzzle_gallery_cache().read_fresh_response().await {
@@ -1579,7 +1586,57 @@ pub async fn list_puzzle_gallery(
cached_response.data_json_len(),
);
Ok(puzzle_gallery_cached_json(&request_context, cached_response))
Ok(puzzle_gallery_cached_json(
&request_context,
cached_response,
))
}
fn spawn_puzzle_gallery_cache_refresh(state: AppState) {
let Some(rebuild_guard) = state
.puzzle_gallery_cache()
.try_acquire_owned_rebuild_guard()
else {
return;
};
crate::telemetry::record_puzzle_gallery_cache_refresh_started();
tokio::spawn(async move {
let _rebuild_guard = rebuild_guard;
let rebuild_started_at = std::time::Instant::now();
let refresh_result = async {
let items = state.spacetime_client().list_puzzle_gallery().await?;
let response = build_puzzle_gallery_window_response(
items
.into_iter()
.map(|item| map_puzzle_gallery_card_response(&state, item))
.collect(),
);
state
.puzzle_gallery_cache()
.store_response(response)
.await
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))
}
.await;
match refresh_result {
Ok(cached_response) => {
crate::telemetry::record_puzzle_gallery_cache_rebuild(
rebuild_started_at.elapsed(),
cached_response.data_json_len(),
);
}
Err(error) => {
crate::telemetry::record_puzzle_gallery_cache_refresh_failed();
tracing::warn!(
provider = PUZZLE_GALLERY_PROVIDER,
error = %error,
"puzzle gallery cache background refresh failed"
);
}
}
});
}
pub async fn get_puzzle_gallery_detail(

View File

@@ -10,7 +10,7 @@ use shared_contracts::{
puzzle_works::PuzzleWorkSummaryResponse,
};
use tokio::{
sync::{Mutex, MutexGuard, RwLock},
sync::{Mutex, MutexGuard, OwnedMutexGuard, RwLock},
time,
};
@@ -69,6 +69,18 @@ impl PuzzleGalleryCache {
})
}
pub async fn read_stale_response(&self) -> Option<PuzzleGalleryCachedResponse> {
let guard = self.inner.read().await;
let entry = guard.as_ref()?;
Some(PuzzleGalleryCachedResponse {
data_json: entry.data_json.clone(),
})
}
pub fn try_acquire_owned_rebuild_guard(&self) -> Option<OwnedMutexGuard<()>> {
self.rebuild_lock.clone().try_lock_owned().ok()
}
pub async fn store_response(
&self,
response: PuzzleGalleryResponse,
@@ -205,4 +217,36 @@ mod tests {
assert!(!response.has_more);
assert_eq!(response.next_cursor, None);
}
#[tokio::test]
async fn stale_response_remains_readable_after_fresh_ttl() {
let cache = PuzzleGalleryCache::new();
let response =
build_puzzle_gallery_window_response((0..8).map(build_summary).collect::<Vec<_>>());
cache
.store_response(response)
.await
.expect("cache response should serialize");
{
let mut guard = cache.inner.write().await;
let entry = guard.as_mut().expect("cache entry should exist");
entry.built_at = Instant::now() - PUZZLE_GALLERY_CACHE_TTL - Duration::from_secs(1);
}
assert!(cache.read_fresh_response().await.is_none());
assert!(cache.read_stale_response().await.is_some());
}
#[tokio::test]
async fn try_owned_rebuild_guard_allows_only_one_refresher() {
let cache = PuzzleGalleryCache::new();
let first_guard = cache.try_acquire_owned_rebuild_guard();
assert!(first_guard.is_some());
assert!(cache.try_acquire_owned_rebuild_guard().is_none());
drop(first_guard);
assert!(cache.try_acquire_owned_rebuild_guard().is_some());
}
}

View File

@@ -6,6 +6,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use axum::extract::FromRef;
use module_ai::{AiTaskService, InMemoryAiTaskStore};
use module_auth::{
AuthUserService, InMemoryAuthStore, PasswordEntryService, PhoneAuthService,
@@ -39,13 +40,113 @@ const ADMIN_ROLE: &str = "admin";
pub type HttpRequestPermitPool = Semaphore;
// 当前阶段先保留最小共享状态壳,后续逐步接入配置、客户端与平台适配。
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum HttpRequestPermitPoolKind {
Default,
Gallery,
Detail,
Admin,
}
impl HttpRequestPermitPoolKind {
pub fn as_str(self) -> &'static str {
match self {
Self::Default => "default",
Self::Gallery => "gallery",
Self::Detail => "detail",
Self::Admin => "admin",
}
}
}
#[derive(Clone, Debug)]
pub struct AppState {
pub struct HttpRequestPermitPools {
default: Option<Arc<HttpRequestPermitPool>>,
gallery: Option<Arc<HttpRequestPermitPool>>,
detail: Option<Arc<HttpRequestPermitPool>>,
admin: Option<Arc<HttpRequestPermitPool>>,
}
impl HttpRequestPermitPools {
fn from_config(config: &AppConfig) -> Self {
Self {
default: config
.max_concurrent_requests
.map(HttpRequestPermitPool::new)
.map(Arc::new),
gallery: config
.gallery_max_concurrent_requests
.map(HttpRequestPermitPool::new)
.map(Arc::new),
detail: config
.detail_max_concurrent_requests
.map(HttpRequestPermitPool::new)
.map(Arc::new),
admin: config
.admin_max_concurrent_requests
.map(HttpRequestPermitPool::new)
.map(Arc::new),
}
}
pub fn pool(
&self,
kind: HttpRequestPermitPoolKind,
) -> Option<(HttpRequestPermitPoolKind, Arc<HttpRequestPermitPool>)> {
let selected = match kind {
HttpRequestPermitPoolKind::Default => self.default.clone(),
HttpRequestPermitPoolKind::Gallery => self.gallery.clone(),
HttpRequestPermitPoolKind::Detail => self.detail.clone(),
HttpRequestPermitPoolKind::Admin => self.admin.clone(),
};
selected.map(|pool| (kind, pool)).or_else(|| {
self.default
.clone()
.map(|pool| (HttpRequestPermitPoolKind::Default, pool))
})
}
}
#[derive(Clone, Debug)]
pub struct BackpressureState {
permit_pools: HttpRequestPermitPools,
}
impl BackpressureState {
pub fn request_permit_pool(
&self,
kind: HttpRequestPermitPoolKind,
) -> Option<(HttpRequestPermitPoolKind, Arc<HttpRequestPermitPool>)> {
self.permit_pools.pool(kind)
}
}
#[derive(Clone, Debug)]
pub struct AppState(Arc<AppStateInner>);
impl std::ops::Deref for AppState {
type Target = AppStateInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FromRef<AppState> for BackpressureState {
fn from_ref(state: &AppState) -> Self {
Self {
permit_pools: state.http_request_permit_pools(),
}
}
}
// Axum/Hyper 会在路由树和连接 service 上频繁 clone stateAppState 外层必须保持浅拷贝。
#[derive(Debug)]
pub struct AppStateInner {
// 配置会在后续中间件、路由和平台适配接入时逐步消费。
#[allow(dead_code)]
pub config: AppConfig,
http_request_permit_pool: Option<Arc<HttpRequestPermitPool>>,
http_request_permit_pools: HttpRequestPermitPools,
auth_jwt_config: JwtConfig,
admin_runtime: Option<AdminRuntime>,
refresh_cookie_config: RefreshCookieConfig,
@@ -198,14 +299,11 @@ impl AppState {
});
let llm_client = build_llm_client(&config)?;
let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?;
let http_request_permit_pool = config
.max_concurrent_requests
.map(HttpRequestPermitPool::new)
.map(Arc::new);
let http_request_permit_pools = HttpRequestPermitPools::from_config(&config);
Ok(Self {
Ok(Self(Arc::new(AppStateInner {
config,
http_request_permit_pool,
http_request_permit_pools,
auth_jwt_config,
admin_runtime,
refresh_cookie_config,
@@ -232,7 +330,7 @@ impl AppState {
creative_agent_sessions: Arc::new(Mutex::new(HashMap::new())),
#[cfg(test)]
test_runtime_snapshot_store: Arc::new(Mutex::new(HashMap::new())),
})
})))
}
pub fn auth_jwt_config(&self) -> &JwtConfig {
@@ -247,8 +345,8 @@ impl AppState {
&self.refresh_cookie_config
}
pub fn http_request_permit_pool(&self) -> Option<Arc<HttpRequestPermitPool>> {
self.http_request_permit_pool.clone()
pub fn http_request_permit_pools(&self) -> HttpRequestPermitPools {
self.http_request_permit_pools.clone()
}
pub async fn upsert_creation_entry_type_config(

View File

@@ -12,10 +12,14 @@ use std::sync::{
};
use tracing::{info, warn};
use crate::{request_context::resolve_request_id, state::AppState};
use crate::{
request_context::resolve_request_id,
state::{AppState, HttpRequestPermitPoolKind},
};
static HTTP_RESPONSE_BODY_IN_FLIGHT: AtomicI64 = AtomicI64::new(0);
static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock<Arc<AtomicI64>> = OnceLock::new();
static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock<HttpRequestPermitsAvailableGauges> =
OnceLock::new();
// 集中维护 api-server HTTP 观测,避免在 handler 中散落高基数字段或重复创建 instrument。
pub async fn record_http_observability(
@@ -78,29 +82,42 @@ pub async fn record_http_observability(
track_response_body_in_flight(response)
}
pub(crate) fn update_http_request_permits_available(available: usize) {
let gauge = HTTP_REQUEST_PERMITS_AVAILABLE.get_or_init(|| {
let gauge = Arc::new(AtomicI64::new(0));
register_http_request_permits_available_metric(gauge.clone());
gauge
});
gauge.store(available.min(i64::MAX as usize) as i64, Ordering::Relaxed);
pub(crate) fn update_http_request_permits_available(
pool: HttpRequestPermitPoolKind,
available: usize,
) {
HTTP_REQUEST_PERMITS_AVAILABLE
.get_or_init(register_http_request_permits_available_metric)
.store(pool, available);
}
pub(crate) fn record_puzzle_gallery_cache_hit() {
puzzle_gallery_cache_metrics().hits.add(1, &[]);
}
pub(crate) fn record_puzzle_gallery_cache_stale_hit() {
puzzle_gallery_cache_metrics().stale_hits.add(1, &[]);
}
pub(crate) fn record_puzzle_gallery_cache_miss() {
puzzle_gallery_cache_metrics().misses.add(1, &[]);
}
pub(crate) fn record_puzzle_gallery_cache_rebuild(duration: std::time::Duration, data_bytes: usize) {
pub(crate) fn record_puzzle_gallery_cache_refresh_started() {
puzzle_gallery_cache_metrics().refreshes_started.add(1, &[]);
}
pub(crate) fn record_puzzle_gallery_cache_refresh_failed() {
puzzle_gallery_cache_metrics().refreshes_failed.add(1, &[]);
}
pub(crate) fn record_puzzle_gallery_cache_rebuild(
duration: std::time::Duration,
data_bytes: usize,
) {
let metrics = puzzle_gallery_cache_metrics();
metrics.rebuilds.add(1, &[]);
metrics
.rebuild_duration
.record(duration.as_secs_f64(), &[]);
metrics.rebuild_duration.record(duration.as_secs_f64(), &[]);
metrics
.data_json_bytes
.record(data_bytes.min(u64::MAX as usize) as u64, &[]);
@@ -125,12 +142,44 @@ struct HttpMetrics {
struct PuzzleGalleryCacheMetrics {
hits: Counter<u64>,
stale_hits: Counter<u64>,
misses: Counter<u64>,
refreshes_started: Counter<u64>,
refreshes_failed: Counter<u64>,
rebuilds: Counter<u64>,
rebuild_duration: opentelemetry::metrics::Histogram<f64>,
data_json_bytes: opentelemetry::metrics::Histogram<u64>,
}
struct HttpRequestPermitsAvailableGauges {
default: Arc<AtomicI64>,
gallery: Arc<AtomicI64>,
detail: Arc<AtomicI64>,
admin: Arc<AtomicI64>,
}
impl HttpRequestPermitsAvailableGauges {
fn new() -> Self {
Self {
default: Arc::new(AtomicI64::new(0)),
gallery: Arc::new(AtomicI64::new(0)),
detail: Arc::new(AtomicI64::new(0)),
admin: Arc::new(AtomicI64::new(0)),
}
}
fn store(&self, pool: HttpRequestPermitPoolKind, available: usize) {
let value = available.min(i64::MAX as usize) as i64;
match pool {
HttpRequestPermitPoolKind::Default => &self.default,
HttpRequestPermitPoolKind::Gallery => &self.gallery,
HttpRequestPermitPoolKind::Detail => &self.detail,
HttpRequestPermitPoolKind::Admin => &self.admin,
}
.store(value, Ordering::Relaxed);
}
}
struct ResponseBodyInFlightGuard;
impl Drop for ResponseBodyInFlightGuard {
@@ -171,10 +220,22 @@ fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics {
.u64_counter("genarrative.puzzle_gallery.cache.hits")
.with_description("Puzzle gallery response cache hits")
.build(),
stale_hits: meter
.u64_counter("genarrative.puzzle_gallery.cache.stale_hits")
.with_description("Puzzle gallery stale response cache hits")
.build(),
misses: meter
.u64_counter("genarrative.puzzle_gallery.cache.misses")
.with_description("Puzzle gallery response cache misses")
.build(),
refreshes_started: meter
.u64_counter("genarrative.puzzle_gallery.cache.refreshes_started")
.with_description("Puzzle gallery background refresh start count")
.build(),
refreshes_failed: meter
.u64_counter("genarrative.puzzle_gallery.cache.refreshes_failed")
.with_description("Puzzle gallery background refresh failure count")
.build(),
rebuilds: meter
.u64_counter("genarrative.puzzle_gallery.cache.rebuilds")
.with_description("Puzzle gallery response cache rebuild count")
@@ -193,16 +254,49 @@ fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics {
})
}
fn register_http_request_permits_available_metric(gauge: Arc<AtomicI64>) {
fn register_http_request_permits_available_metric() -> HttpRequestPermitsAvailableGauges {
let gauges = HttpRequestPermitsAvailableGauges::new();
let meter = global::meter("genarrative-api");
let default_gauge = gauges.default.clone();
let gallery_gauge = gauges.gallery.clone();
let detail_gauge = gauges.detail.clone();
let admin_gauge = gauges.admin.clone();
meter
.i64_observable_up_down_counter("genarrative.http.server.request_permits.available")
.with_unit("{permit}")
.with_description("Available api-server HTTP backpressure permits")
.with_callback(move |observer| {
observer.observe(gauge.load(Ordering::Relaxed), &[]);
observer.observe(
default_gauge.load(Ordering::Relaxed),
&[KeyValue::new(
"pool",
HttpRequestPermitPoolKind::Default.as_str(),
)],
);
observer.observe(
gallery_gauge.load(Ordering::Relaxed),
&[KeyValue::new(
"pool",
HttpRequestPermitPoolKind::Gallery.as_str(),
)],
);
observer.observe(
detail_gauge.load(Ordering::Relaxed),
&[KeyValue::new(
"pool",
HttpRequestPermitPoolKind::Detail.as_str(),
)],
);
observer.observe(
admin_gauge.load(Ordering::Relaxed),
&[KeyValue::new(
"pool",
HttpRequestPermitPoolKind::Admin.as_str(),
)],
);
})
.build();
gauges
}
pub(crate) fn register_http_runtime_metrics() {
@@ -284,19 +378,13 @@ mod tests {
observability_route("/api/runtime/puzzle/runs/run-123/history"),
"/api/*"
);
assert_eq!(
observability_route("/admin/api/debug/http"),
"/admin/api/*"
);
assert_eq!(observability_route("/admin/api/debug/http"), "/admin/api/*");
}
#[test]
fn resolve_request_scheme_uses_forwarded_proto_first_value() {
let mut headers = HeaderMap::new();
headers.insert(
"x-forwarded-proto",
HeaderValue::from_static("https, http"),
);
headers.insert("x-forwarded-proto", HeaderValue::from_static("https, http"));
assert_eq!(resolve_request_scheme(&headers), "https");
}