use axum::{ body::Body, extract::State, http::{HeaderMap, Request, Response}, middleware::Next, }; use http_body_util::BodyExt; use opentelemetry::{KeyValue, global, metrics::Counter}; use std::sync::{ Arc, OnceLock, atomic::{AtomicI64, Ordering}, }; use tracing::{info, warn}; use crate::{ request_context::resolve_request_id, state::{AppState, HttpRequestPermitPoolKind}, }; static HTTP_RESPONSE_BODY_IN_FLIGHT: AtomicI64 = AtomicI64::new(0); static TRACKING_OUTBOX_PENDING_BYTES: AtomicI64 = AtomicI64::new(0); static TRACKING_OUTBOX_PENDING_FILES: AtomicI64 = AtomicI64::new(0); static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock = OnceLock::new(); // 集中维护 api-server HTTP 观测,避免在 handler 中散落高基数字段或重复创建 instrument。 pub async fn record_http_observability( State(state): State, request: Request, next: Next, ) -> Response { let method = request.method().as_str().to_string(); let route = observability_route(request.uri().path()); let scheme = resolve_request_scheme(request.headers()); let path = request.uri().path().to_string(); let request_id = resolve_request_id(&request).unwrap_or_else(|| "unknown".to_string()); let base_labels = http_base_labels(method.clone(), route.clone()); let metrics = http_metrics(); metrics.in_flight.add(1, &base_labels); let started_at = std::time::Instant::now(); let response = next.run(request).await; let status = response.status().as_u16(); let status_class = status_class(status); let latency_ms = started_at.elapsed().as_millis().min(u64::MAX as u128) as u64; let slow_request = latency_ms >= state.config.slow_request_threshold_ms; let labels = http_response_labels(base_labels, status); metrics.requests.add(1, &labels); metrics .duration .record(started_at.elapsed().as_secs_f64(), &labels); metrics.in_flight.add(-1, &labels[..2]); if slow_request { warn!( request_id = %request_id, http.request.method = %method, http.route = %route, url.scheme = %scheme, url.path = %path, http.response.status_code = status, status, status_class, latency_ms, slow_request = true, "http request completed slowly" ); } else { info!( request_id = %request_id, http.request.method = %method, http.route = %route, url.scheme = %scheme, url.path = %path, http.response.status_code = status, status, status_class, latency_ms, slow_request = false, "http request completed" ); } track_response_body_in_flight(response) } pub(crate) fn update_http_request_permits_available( pool: HttpRequestPermitPoolKind, available: usize, ) { HTTP_REQUEST_PERMITS_AVAILABLE .get_or_init(register_http_request_permits_available_metric) .store(pool, available); } pub(crate) fn record_puzzle_gallery_cache_hit() { puzzle_gallery_cache_metrics().hits.add(1, &[]); } pub(crate) fn record_puzzle_gallery_cache_stale_hit() { puzzle_gallery_cache_metrics().stale_hits.add(1, &[]); } pub(crate) fn record_puzzle_gallery_cache_miss() { puzzle_gallery_cache_metrics().misses.add(1, &[]); } pub(crate) fn record_puzzle_gallery_cache_refresh_started() { puzzle_gallery_cache_metrics().refreshes_started.add(1, &[]); } pub(crate) fn record_puzzle_gallery_cache_refresh_failed() { puzzle_gallery_cache_metrics().refreshes_failed.add(1, &[]); } pub(crate) fn record_puzzle_gallery_cache_rebuild( duration: std::time::Duration, data_bytes: usize, ) { let metrics = puzzle_gallery_cache_metrics(); metrics.rebuilds.add(1, &[]); metrics.rebuild_duration.record(duration.as_secs_f64(), &[]); metrics .data_json_bytes .record(data_bytes.min(u64::MAX as usize) as u64, &[]); } pub(crate) fn record_tracking_outbox_enqueued() { tracking_outbox_metrics().enqueued.add(1, &[]); } pub(crate) fn record_tracking_outbox_dropped(reason: &'static str) { tracking_outbox_metrics() .dropped .add(1, &[KeyValue::new("reason", reason)]); } pub(crate) fn record_tracking_outbox_sealed(reason: &'static str) { tracking_outbox_metrics() .sealed_files .add(1, &[KeyValue::new("reason", reason)]); } pub(crate) fn record_tracking_outbox_corrupt_file() { tracking_outbox_metrics().corrupt_files.add(1, &[]); } pub(crate) fn record_tracking_outbox_flush( duration: std::time::Duration, accepted_count: u32, file_bytes: u64, failed: bool, ) { let status_class = if failed { "error" } else { "ok" }; let labels = [KeyValue::new("status_class", status_class)]; let metrics = tracking_outbox_metrics(); metrics.flushes.add(1, &labels); metrics .flush_duration .record(duration.as_secs_f64(), &labels); metrics .flushed_events .add(u64::from(accepted_count), &labels); metrics.flushed_bytes.add(file_bytes, &labels); } pub(crate) fn update_tracking_outbox_pending_bytes(bytes: u64) { TRACKING_OUTBOX_PENDING_BYTES.store(bytes.min(i64::MAX as u64) as i64, Ordering::Relaxed); } pub(crate) fn update_tracking_outbox_pending_files(files: usize) { TRACKING_OUTBOX_PENDING_FILES.store(files.min(i64::MAX as usize) as i64, Ordering::Relaxed); } fn track_response_body_in_flight(response: Response) -> Response { response.map(|body| { HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_add(1, Ordering::Relaxed); let guard = ResponseBodyInFlightGuard; Body::new(body.map_frame(move |frame| { let _guard = &guard; frame })) }) } struct HttpMetrics { requests: Counter, in_flight: opentelemetry::metrics::UpDownCounter, duration: opentelemetry::metrics::Histogram, } struct PuzzleGalleryCacheMetrics { hits: Counter, stale_hits: Counter, misses: Counter, refreshes_started: Counter, refreshes_failed: Counter, rebuilds: Counter, rebuild_duration: opentelemetry::metrics::Histogram, data_json_bytes: opentelemetry::metrics::Histogram, } struct TrackingOutboxMetrics { enqueued: Counter, dropped: Counter, sealed_files: Counter, corrupt_files: Counter, flushes: Counter, flush_duration: opentelemetry::metrics::Histogram, flushed_events: Counter, flushed_bytes: Counter, } struct HttpRequestPermitsAvailableGauges { default: Arc, gallery: Arc, detail: Arc, admin: Arc, } impl HttpRequestPermitsAvailableGauges { fn new() -> Self { Self { default: Arc::new(AtomicI64::new(0)), gallery: Arc::new(AtomicI64::new(0)), detail: Arc::new(AtomicI64::new(0)), admin: Arc::new(AtomicI64::new(0)), } } fn store(&self, pool: HttpRequestPermitPoolKind, available: usize) { let value = available.min(i64::MAX as usize) as i64; match pool { HttpRequestPermitPoolKind::Default => &self.default, HttpRequestPermitPoolKind::Gallery => &self.gallery, HttpRequestPermitPoolKind::Detail => &self.detail, HttpRequestPermitPoolKind::Admin => &self.admin, } .store(value, Ordering::Relaxed); } } struct ResponseBodyInFlightGuard; impl Drop for ResponseBodyInFlightGuard { fn drop(&mut self) { HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_sub(1, Ordering::Relaxed); } } fn http_metrics() -> &'static HttpMetrics { static METRICS: std::sync::OnceLock = std::sync::OnceLock::new(); METRICS.get_or_init(|| { let meter = global::meter("genarrative-api"); HttpMetrics { requests: meter .u64_counter("genarrative.http.server.requests") .with_description("HTTP request count grouped by route and status class") .build(), in_flight: meter .i64_up_down_counter("http.server.active_requests") .with_unit("{request}") .with_description("Number of active HTTP server requests") .build(), duration: meter .f64_histogram("http.server.request.duration") .with_unit("s") .with_description("Duration of HTTP server requests") .build(), } }) } fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics { static METRICS: std::sync::OnceLock = std::sync::OnceLock::new(); METRICS.get_or_init(|| { let meter = global::meter("genarrative-api"); PuzzleGalleryCacheMetrics { hits: meter .u64_counter("genarrative.puzzle_gallery.cache.hits") .with_description("Puzzle gallery response cache hits") .build(), stale_hits: meter .u64_counter("genarrative.puzzle_gallery.cache.stale_hits") .with_description("Puzzle gallery stale response cache hits") .build(), misses: meter .u64_counter("genarrative.puzzle_gallery.cache.misses") .with_description("Puzzle gallery response cache misses") .build(), refreshes_started: meter .u64_counter("genarrative.puzzle_gallery.cache.refreshes_started") .with_description("Puzzle gallery background refresh start count") .build(), refreshes_failed: meter .u64_counter("genarrative.puzzle_gallery.cache.refreshes_failed") .with_description("Puzzle gallery background refresh failure count") .build(), rebuilds: meter .u64_counter("genarrative.puzzle_gallery.cache.rebuilds") .with_description("Puzzle gallery response cache rebuild count") .build(), rebuild_duration: meter .f64_histogram("genarrative.puzzle_gallery.cache.rebuild.duration") .with_unit("s") .with_description("Puzzle gallery response cache rebuild duration") .build(), data_json_bytes: meter .u64_histogram("genarrative.puzzle_gallery.cache.data_json_bytes") .with_unit("By") .with_description("Serialized puzzle gallery data JSON size") .build(), } }) } fn tracking_outbox_metrics() -> &'static TrackingOutboxMetrics { static METRICS: std::sync::OnceLock = std::sync::OnceLock::new(); METRICS.get_or_init(|| { let meter = global::meter("genarrative-api"); TrackingOutboxMetrics { enqueued: meter .u64_counter("genarrative.tracking_outbox.events.enqueued") .with_description("Tracking events appended to the local outbox") .build(), dropped: meter .u64_counter("genarrative.tracking_outbox.events.dropped") .with_description("Tracking events dropped by local outbox protection") .build(), sealed_files: meter .u64_counter("genarrative.tracking_outbox.files.sealed") .with_description("Tracking outbox active files sealed for flushing") .build(), corrupt_files: meter .u64_counter("genarrative.tracking_outbox.files.corrupt") .with_description( "Tracking outbox sealed files quarantined because they could not be parsed", ) .build(), flushes: meter .u64_counter("genarrative.tracking_outbox.flushes") .with_description("Tracking outbox sealed file flush attempts") .build(), flush_duration: meter .f64_histogram("genarrative.tracking_outbox.flush.duration") .with_unit("s") .with_description("Tracking outbox sealed file flush duration") .build(), flushed_events: meter .u64_counter("genarrative.tracking_outbox.events.flushed") .with_description("Tracking events accepted by SpacetimeDB batch procedure") .build(), flushed_bytes: meter .u64_counter("genarrative.tracking_outbox.bytes.flushed") .with_unit("By") .with_description("Tracking outbox bytes removed after successful flush") .build(), } }) } fn register_http_request_permits_available_metric() -> HttpRequestPermitsAvailableGauges { let gauges = HttpRequestPermitsAvailableGauges::new(); let meter = global::meter("genarrative-api"); let default_gauge = gauges.default.clone(); let gallery_gauge = gauges.gallery.clone(); let detail_gauge = gauges.detail.clone(); let admin_gauge = gauges.admin.clone(); meter .i64_observable_up_down_counter("genarrative.http.server.request_permits.available") .with_unit("{permit}") .with_description("Available api-server HTTP backpressure permits") .with_callback(move |observer| { observer.observe( default_gauge.load(Ordering::Relaxed), &[KeyValue::new( "pool", HttpRequestPermitPoolKind::Default.as_str(), )], ); observer.observe( gallery_gauge.load(Ordering::Relaxed), &[KeyValue::new( "pool", HttpRequestPermitPoolKind::Gallery.as_str(), )], ); observer.observe( detail_gauge.load(Ordering::Relaxed), &[KeyValue::new( "pool", HttpRequestPermitPoolKind::Detail.as_str(), )], ); observer.observe( admin_gauge.load(Ordering::Relaxed), &[KeyValue::new( "pool", HttpRequestPermitPoolKind::Admin.as_str(), )], ); }) .build(); gauges } pub(crate) fn register_http_runtime_metrics() { static REGISTERED: OnceLock<()> = OnceLock::new(); REGISTERED.get_or_init(|| { let meter = global::meter("genarrative-api"); meter .i64_observable_up_down_counter("genarrative.http.server.response_bodies.in_flight") .with_unit("{response}") .with_description("HTTP response bodies still owned by Axum/Hyper") .with_callback(|observer| { observer.observe(HTTP_RESPONSE_BODY_IN_FLIGHT.load(Ordering::Relaxed), &[]); }) .build(); meter .i64_observable_up_down_counter("genarrative.tracking_outbox.pending.bytes") .with_unit("By") .with_description("Tracking outbox bytes waiting on local disk") .with_callback(|observer| { observer.observe(TRACKING_OUTBOX_PENDING_BYTES.load(Ordering::Relaxed), &[]); }) .build(); meter .i64_observable_up_down_counter("genarrative.tracking_outbox.pending.files") .with_unit("{file}") .with_description("Tracking outbox sealed files waiting for flush") .with_callback(|observer| { observer.observe(TRACKING_OUTBOX_PENDING_FILES.load(Ordering::Relaxed), &[]); }) .build(); }); } fn http_base_labels(method: String, route: String) -> Vec { vec![ KeyValue::new("http.request.method", method), KeyValue::new("http.route", route), ] } fn http_response_labels(mut labels: Vec, status: u16) -> Vec { labels.push(KeyValue::new("status_class", status_class(status))); labels } fn status_class(status: u16) -> &'static str { match status { 100..=199 => "1xx", 200..=299 => "2xx", 300..=399 => "3xx", 400..=499 => "4xx", 500..=599 => "5xx", _ => "unknown", } } pub(crate) fn observability_route(path: &str) -> String { if path.starts_with("/api/runtime/puzzle/gallery") { "/api/runtime/puzzle/gallery".to_string() } else if path.starts_with("/api/runtime/custom-world-gallery") { "/api/runtime/custom-world-gallery".to_string() } else if path.starts_with("/admin/api/") { "/admin/api/*".to_string() } else if path.starts_with("/api/") { "/api/*".to_string() } else { "other".to_string() } } pub(crate) fn resolve_request_scheme(headers: &HeaderMap) -> String { headers .get("x-forwarded-proto") .and_then(|value| value.to_str().ok()) .and_then(|value| value.split(',').next()) .map(str::trim) .filter(|value| !value.is_empty()) .unwrap_or("http") .to_string() } #[cfg(test)] mod tests { use axum::http::{HeaderMap, HeaderValue}; use super::{observability_route, resolve_request_scheme}; #[test] fn observability_route_keeps_metrics_labels_low_cardinality() { assert_eq!( observability_route("/api/runtime/puzzle/gallery?cursor=abc"), "/api/runtime/puzzle/gallery" ); assert_eq!( observability_route("/api/runtime/puzzle/runs/run-123/history"), "/api/*" ); assert_eq!(observability_route("/admin/api/debug/http"), "/admin/api/*"); } #[test] fn resolve_request_scheme_uses_forwarded_proto_first_value() { let mut headers = HeaderMap::new(); headers.insert("x-forwarded-proto", HeaderValue::from_static("https, http")); assert_eq!(resolve_request_scheme(&headers), "https"); } }