304 lines
10 KiB
Rust
304 lines
10 KiB
Rust
use axum::{
|
|
body::Body,
|
|
extract::State,
|
|
http::{HeaderMap, Request, Response},
|
|
middleware::Next,
|
|
};
|
|
use http_body_util::BodyExt;
|
|
use opentelemetry::{KeyValue, global, metrics::Counter};
|
|
use std::sync::{
|
|
Arc, OnceLock,
|
|
atomic::{AtomicI64, Ordering},
|
|
};
|
|
use tracing::{info, warn};
|
|
|
|
use crate::{request_context::resolve_request_id, state::AppState};
|
|
|
|
static HTTP_RESPONSE_BODY_IN_FLIGHT: AtomicI64 = AtomicI64::new(0);
|
|
static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock<Arc<AtomicI64>> = OnceLock::new();
|
|
|
|
// 集中维护 api-server HTTP 观测,避免在 handler 中散落高基数字段或重复创建 instrument。
|
|
pub async fn record_http_observability(
|
|
State(state): State<AppState>,
|
|
request: Request<Body>,
|
|
next: Next,
|
|
) -> Response<Body> {
|
|
let method = request.method().as_str().to_string();
|
|
let route = observability_route(request.uri().path());
|
|
let scheme = resolve_request_scheme(request.headers());
|
|
let path = request.uri().path().to_string();
|
|
let request_id = resolve_request_id(&request).unwrap_or_else(|| "unknown".to_string());
|
|
let base_labels = http_base_labels(method.clone(), route.clone());
|
|
let metrics = http_metrics();
|
|
metrics.in_flight.add(1, &base_labels);
|
|
let started_at = std::time::Instant::now();
|
|
|
|
let response = next.run(request).await;
|
|
let status = response.status().as_u16();
|
|
let status_class = status_class(status);
|
|
let latency_ms = started_at.elapsed().as_millis().min(u64::MAX as u128) as u64;
|
|
let slow_request = latency_ms >= state.config.slow_request_threshold_ms;
|
|
let labels = http_response_labels(base_labels, status);
|
|
metrics.requests.add(1, &labels);
|
|
metrics
|
|
.duration
|
|
.record(started_at.elapsed().as_secs_f64(), &labels);
|
|
metrics.in_flight.add(-1, &labels[..2]);
|
|
|
|
if slow_request {
|
|
warn!(
|
|
request_id = %request_id,
|
|
http.request.method = %method,
|
|
http.route = %route,
|
|
url.scheme = %scheme,
|
|
url.path = %path,
|
|
http.response.status_code = status,
|
|
status,
|
|
status_class,
|
|
latency_ms,
|
|
slow_request = true,
|
|
"http request completed slowly"
|
|
);
|
|
} else {
|
|
info!(
|
|
request_id = %request_id,
|
|
http.request.method = %method,
|
|
http.route = %route,
|
|
url.scheme = %scheme,
|
|
url.path = %path,
|
|
http.response.status_code = status,
|
|
status,
|
|
status_class,
|
|
latency_ms,
|
|
slow_request = false,
|
|
"http request completed"
|
|
);
|
|
}
|
|
|
|
track_response_body_in_flight(response)
|
|
}
|
|
|
|
pub(crate) fn update_http_request_permits_available(available: usize) {
|
|
let gauge = HTTP_REQUEST_PERMITS_AVAILABLE.get_or_init(|| {
|
|
let gauge = Arc::new(AtomicI64::new(0));
|
|
register_http_request_permits_available_metric(gauge.clone());
|
|
gauge
|
|
});
|
|
gauge.store(available.min(i64::MAX as usize) as i64, Ordering::Relaxed);
|
|
}
|
|
|
|
pub(crate) fn record_puzzle_gallery_cache_hit() {
|
|
puzzle_gallery_cache_metrics().hits.add(1, &[]);
|
|
}
|
|
|
|
pub(crate) fn record_puzzle_gallery_cache_miss() {
|
|
puzzle_gallery_cache_metrics().misses.add(1, &[]);
|
|
}
|
|
|
|
pub(crate) fn record_puzzle_gallery_cache_rebuild(duration: std::time::Duration, data_bytes: usize) {
|
|
let metrics = puzzle_gallery_cache_metrics();
|
|
metrics.rebuilds.add(1, &[]);
|
|
metrics
|
|
.rebuild_duration
|
|
.record(duration.as_secs_f64(), &[]);
|
|
metrics
|
|
.data_json_bytes
|
|
.record(data_bytes.min(u64::MAX as usize) as u64, &[]);
|
|
}
|
|
|
|
fn track_response_body_in_flight(response: Response<Body>) -> Response<Body> {
|
|
response.map(|body| {
|
|
HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_add(1, Ordering::Relaxed);
|
|
let guard = ResponseBodyInFlightGuard;
|
|
Body::new(body.map_frame(move |frame| {
|
|
let _guard = &guard;
|
|
frame
|
|
}))
|
|
})
|
|
}
|
|
|
|
struct HttpMetrics {
|
|
requests: Counter<u64>,
|
|
in_flight: opentelemetry::metrics::UpDownCounter<i64>,
|
|
duration: opentelemetry::metrics::Histogram<f64>,
|
|
}
|
|
|
|
struct PuzzleGalleryCacheMetrics {
|
|
hits: Counter<u64>,
|
|
misses: Counter<u64>,
|
|
rebuilds: Counter<u64>,
|
|
rebuild_duration: opentelemetry::metrics::Histogram<f64>,
|
|
data_json_bytes: opentelemetry::metrics::Histogram<u64>,
|
|
}
|
|
|
|
struct ResponseBodyInFlightGuard;
|
|
|
|
impl Drop for ResponseBodyInFlightGuard {
|
|
fn drop(&mut self) {
|
|
HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
|
|
fn http_metrics() -> &'static HttpMetrics {
|
|
static METRICS: std::sync::OnceLock<HttpMetrics> = std::sync::OnceLock::new();
|
|
METRICS.get_or_init(|| {
|
|
let meter = global::meter("genarrative-api");
|
|
HttpMetrics {
|
|
requests: meter
|
|
.u64_counter("genarrative.http.server.requests")
|
|
.with_description("HTTP request count grouped by route and status class")
|
|
.build(),
|
|
in_flight: meter
|
|
.i64_up_down_counter("http.server.active_requests")
|
|
.with_unit("{request}")
|
|
.with_description("Number of active HTTP server requests")
|
|
.build(),
|
|
duration: meter
|
|
.f64_histogram("http.server.request.duration")
|
|
.with_unit("s")
|
|
.with_description("Duration of HTTP server requests")
|
|
.build(),
|
|
}
|
|
})
|
|
}
|
|
|
|
fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics {
|
|
static METRICS: std::sync::OnceLock<PuzzleGalleryCacheMetrics> = std::sync::OnceLock::new();
|
|
METRICS.get_or_init(|| {
|
|
let meter = global::meter("genarrative-api");
|
|
PuzzleGalleryCacheMetrics {
|
|
hits: meter
|
|
.u64_counter("genarrative.puzzle_gallery.cache.hits")
|
|
.with_description("Puzzle gallery response cache hits")
|
|
.build(),
|
|
misses: meter
|
|
.u64_counter("genarrative.puzzle_gallery.cache.misses")
|
|
.with_description("Puzzle gallery response cache misses")
|
|
.build(),
|
|
rebuilds: meter
|
|
.u64_counter("genarrative.puzzle_gallery.cache.rebuilds")
|
|
.with_description("Puzzle gallery response cache rebuild count")
|
|
.build(),
|
|
rebuild_duration: meter
|
|
.f64_histogram("genarrative.puzzle_gallery.cache.rebuild.duration")
|
|
.with_unit("s")
|
|
.with_description("Puzzle gallery response cache rebuild duration")
|
|
.build(),
|
|
data_json_bytes: meter
|
|
.u64_histogram("genarrative.puzzle_gallery.cache.data_json_bytes")
|
|
.with_unit("By")
|
|
.with_description("Serialized puzzle gallery data JSON size")
|
|
.build(),
|
|
}
|
|
})
|
|
}
|
|
|
|
fn register_http_request_permits_available_metric(gauge: Arc<AtomicI64>) {
|
|
let meter = global::meter("genarrative-api");
|
|
meter
|
|
.i64_observable_up_down_counter("genarrative.http.server.request_permits.available")
|
|
.with_unit("{permit}")
|
|
.with_description("Available api-server HTTP backpressure permits")
|
|
.with_callback(move |observer| {
|
|
observer.observe(gauge.load(Ordering::Relaxed), &[]);
|
|
})
|
|
.build();
|
|
}
|
|
|
|
pub(crate) fn register_http_runtime_metrics() {
|
|
static REGISTERED: OnceLock<()> = OnceLock::new();
|
|
REGISTERED.get_or_init(|| {
|
|
let meter = global::meter("genarrative-api");
|
|
meter
|
|
.i64_observable_up_down_counter("genarrative.http.server.response_bodies.in_flight")
|
|
.with_unit("{response}")
|
|
.with_description("HTTP response bodies still owned by Axum/Hyper")
|
|
.with_callback(|observer| {
|
|
observer.observe(HTTP_RESPONSE_BODY_IN_FLIGHT.load(Ordering::Relaxed), &[]);
|
|
})
|
|
.build();
|
|
});
|
|
}
|
|
|
|
fn http_base_labels(method: String, route: String) -> Vec<KeyValue> {
|
|
vec![
|
|
KeyValue::new("http.request.method", method),
|
|
KeyValue::new("http.route", route),
|
|
]
|
|
}
|
|
|
|
fn http_response_labels(mut labels: Vec<KeyValue>, status: u16) -> Vec<KeyValue> {
|
|
labels.push(KeyValue::new("status_class", status_class(status)));
|
|
labels
|
|
}
|
|
|
|
fn status_class(status: u16) -> &'static str {
|
|
match status {
|
|
100..=199 => "1xx",
|
|
200..=299 => "2xx",
|
|
300..=399 => "3xx",
|
|
400..=499 => "4xx",
|
|
500..=599 => "5xx",
|
|
_ => "unknown",
|
|
}
|
|
}
|
|
|
|
pub(crate) fn observability_route(path: &str) -> String {
|
|
if path.starts_with("/api/runtime/puzzle/gallery") {
|
|
"/api/runtime/puzzle/gallery".to_string()
|
|
} else if path.starts_with("/api/runtime/custom-world-gallery") {
|
|
"/api/runtime/custom-world-gallery".to_string()
|
|
} else if path.starts_with("/admin/api/") {
|
|
"/admin/api/*".to_string()
|
|
} else if path.starts_with("/api/") {
|
|
"/api/*".to_string()
|
|
} else {
|
|
"other".to_string()
|
|
}
|
|
}
|
|
|
|
pub(crate) fn resolve_request_scheme(headers: &HeaderMap) -> String {
|
|
headers
|
|
.get("x-forwarded-proto")
|
|
.and_then(|value| value.to_str().ok())
|
|
.and_then(|value| value.split(',').next())
|
|
.map(str::trim)
|
|
.filter(|value| !value.is_empty())
|
|
.unwrap_or("http")
|
|
.to_string()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use axum::http::{HeaderMap, HeaderValue};
|
|
|
|
use super::{observability_route, resolve_request_scheme};
|
|
|
|
#[test]
|
|
fn observability_route_keeps_metrics_labels_low_cardinality() {
|
|
assert_eq!(
|
|
observability_route("/api/runtime/puzzle/gallery?cursor=abc"),
|
|
"/api/runtime/puzzle/gallery"
|
|
);
|
|
assert_eq!(
|
|
observability_route("/api/runtime/puzzle/runs/run-123/history"),
|
|
"/api/*"
|
|
);
|
|
assert_eq!(
|
|
observability_route("/admin/api/debug/http"),
|
|
"/admin/api/*"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn resolve_request_scheme_uses_forwarded_proto_first_value() {
|
|
let mut headers = HeaderMap::new();
|
|
headers.insert(
|
|
"x-forwarded-proto",
|
|
HeaderValue::from_static("https, http"),
|
|
);
|
|
|
|
assert_eq!(resolve_request_scheme(&headers), "https");
|
|
}
|
|
}
|