chore: add loadtest observability setup

This commit is contained in:
kdletters
2026-05-16 22:44:30 +08:00
parent 7f16e88e57
commit 0305b79440
55 changed files with 2867 additions and 1622 deletions

View File

@@ -43,6 +43,7 @@ sha2 = { workspace = true }
shared-contracts = { workspace = true, features = ["oss-contracts"] }
shared-kernel = { workspace = true }
shared-logging = { workspace = true }
socket2 = { workspace = true }
spacetime-client = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time"] }
tokio-stream = { workspace = true }
@@ -50,6 +51,7 @@ futures-util = { workspace = true }
time = { workspace = true, features = ["formatting"] }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }
opentelemetry = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

View File

@@ -11,7 +11,7 @@ use tower_http::{
classify::ServerErrorsFailureClass,
trace::{DefaultOnRequest, TraceLayer},
};
use tracing::{Level, Span, error, info, info_span, warn};
use tracing::{Level, Span, error, info_span};
use crate::{
auth::{AuthenticatedAccessToken, require_bearer_auth},
@@ -22,6 +22,7 @@ use crate::{
response_headers::propagate_request_id_header,
runtime_inventory::get_runtime_inventory_state,
state::AppState,
telemetry::record_http_observability,
tracking::record_route_tracking_event_after_success,
vector_engine_audio_generation::{
create_background_music_task, create_sound_effect_task,
@@ -42,8 +43,6 @@ use crate::{
// 统一由这里构造 Axum 路由树,后续再逐项挂接中间件与业务路由。
pub fn build_router(state: AppState) -> Router {
let slow_request_threshold_ms = state.config.slow_request_threshold_ms;
Router::new()
.merge(modules::admin::router(state.clone()))
.merge(modules::health::router(state.clone()))
@@ -86,47 +85,55 @@ pub fn build_router(state: AppState) -> Router {
state.clone(),
record_api_tracking_after_success,
))
// HTTP 指标与请求完成日志放在 tracing span 内侧,日志事件可以继承当前 trace/span context。
.layer(middleware::from_fn_with_state(
state.clone(),
record_http_observability,
))
// 当前阶段先统一挂接 HTTP tracing后续 request_id、响应头与错误中间件继续在这里扩展。
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
let request_id =
resolve_request_id(request).unwrap_or_else(|| "unknown".to_string());
let route = crate::telemetry::observability_route(request.uri().path());
let scheme = crate::telemetry::resolve_request_scheme(request.headers());
let span_name = format!("{} {}", request.method(), route);
info_span!(
"http.request",
otel.kind = "server",
otel.name = %span_name,
otel.status_code = tracing::field::Empty,
http.response.status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
http.request.method = %request.method(),
http.route = %route,
url.scheme = %scheme,
url.path = %request.uri().path(),
request_id = %request_id,
status = tracing::field::Empty,
latency_ms = tracing::field::Empty,
)
})
.on_request(DefaultOnRequest::new().level(Level::INFO))
.on_response(
move |response: &axum::response::Response,
latency: std::time::Duration,
span: &Span| {
|response: &axum::response::Response,
latency: std::time::Duration,
span: &Span| {
let latency_ms = latency.as_millis().min(u64::MAX as u128) as u64;
let status = response.status().as_u16();
let slow_request = latency_ms >= slow_request_threshold_ms;
span.record("status", status);
span.record("http.response.status_code", status);
span.record(
"otel.status_code",
if response.status().is_server_error() {
"ERROR"
} else {
"OK"
},
);
span.record("latency_ms", latency_ms);
if slow_request {
warn!(
parent: span,
status,
latency_ms,
slow_request = true,
"http request completed slowly"
);
} else {
info!(
parent: span,
status,
latency_ms,
slow_request = false,
"http request completed"
);
}
},
)
.on_failure(

View File

@@ -752,10 +752,14 @@ mod tests {
};
use hmac::{Hmac, Mac};
use http_body_util::BodyExt;
use platform_auth::{
AccessTokenClaims, AccessTokenClaimsInput, AuthProvider, BindingStatus, sign_access_token,
};
use reqwest::{Method, multipart};
use serde_json::{Value, json};
use sha2::{Digest, Sha256};
use shared_kernel::new_uuid_simple_string;
use time::OffsetDateTime;
use tower::ServiceExt;
use crate::{app::build_router, config::AppConfig, state::AppState};
@@ -873,13 +877,17 @@ mod tests {
..AppConfig::default()
};
let app = build_router(AppState::new(config).expect("state should build"));
let state = AppState::new(config).expect("state should build");
let token =
seed_authenticated_token(&state, "13800138120", "sess_assets_direct_upload").await;
let app = build_router(state);
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/assets/direct-upload-tickets")
.header("authorization", format!("Bearer {token}"))
.header("content-type", "application/json")
.header("x-request-id", "req-oss-ticket")
.header("x-genarrative-response-envelope", "1")
@@ -1693,6 +1701,33 @@ mod tests {
Ok(fields)
}
async fn seed_authenticated_token(
state: &AppState,
phone_number: &str,
session_seed: &str,
) -> String {
let user = state
.seed_test_phone_user_with_password(phone_number, "secret123")
.await;
let claims = AccessTokenClaims::from_input(
AccessTokenClaimsInput {
user_id: user.id.clone(),
session_id: state.seed_test_refresh_session_for_user(&user, session_seed),
provider: AuthProvider::Password,
roles: vec!["user".to_string()],
token_version: user.token_version,
phone_verified: true,
binding_status: BindingStatus::Active,
display_name: Some(user.display_name.clone()),
},
state.auth_jwt_config(),
OffsetDateTime::now_utc(),
)
.expect("claims should build");
sign_access_token(&claims, state.auth_jwt_config()).expect("token should sign")
}
fn build_object_url(
config: &AppConfig,
object_key: &str,

View File

@@ -20,7 +20,10 @@ pub(crate) const DEFAULT_VECTOR_ENGINE_IMAGE_REQUEST_TIMEOUT_MS: u64 = 1_000_000
pub struct AppConfig {
pub bind_host: String,
pub bind_port: u16,
pub listen_backlog: i32,
pub worker_threads: Option<usize>,
pub log_filter: String,
pub otel_enabled: bool,
pub admin_username: Option<String>,
pub admin_password: Option<String>,
pub admin_token_ttl_seconds: u64,
@@ -147,7 +150,10 @@ impl Default for AppConfig {
Self {
bind_host: "127.0.0.1".to_string(),
bind_port: 3000,
listen_backlog: 1024,
worker_threads: None,
log_filter: "info,tower_http=info".to_string(),
otel_enabled: false,
admin_username: None,
admin_password: None,
admin_token_ttl_seconds: 4 * 60 * 60,
@@ -301,6 +307,17 @@ impl AppConfig {
{
config.log_filter = log_filter;
}
if let Some(listen_backlog) =
read_first_positive_i32_env(&["GENARRATIVE_API_LISTEN_BACKLOG"])
{
config.listen_backlog = listen_backlog;
}
if let Some(worker_threads) = read_first_usize_env(&["GENARRATIVE_API_WORKER_THREADS"]) {
config.worker_threads = Some(worker_threads);
}
if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) {
config.otel_enabled = otel_enabled;
}
config.admin_username = read_first_non_empty_env(&["GENARRATIVE_ADMIN_USERNAME"]);
config.admin_password = read_first_non_empty_env(&["GENARRATIVE_ADMIN_PASSWORD"]);
@@ -881,6 +898,14 @@ fn read_first_positive_u32_env(keys: &[&str]) -> Option<u32> {
})
}
fn read_first_positive_i32_env(keys: &[&str]) -> Option<i32> {
keys.iter().find_map(|key| {
env::var(key)
.ok()
.and_then(|value| parse_positive_i32(&value))
})
}
fn read_first_positive_u64_env(keys: &[&str]) -> Option<u64> {
keys.iter().find_map(|key| {
env::var(key)
@@ -971,6 +996,15 @@ fn parse_positive_u32(raw: &str) -> Option<u32> {
Some(value)
}
fn parse_positive_i32(raw: &str) -> Option<i32> {
let value = raw.trim().parse::<i32>().ok()?;
if value <= 0 {
return None;
}
Some(value)
}
fn parse_u32(raw: &str) -> Option<u32> {
raw.trim().parse::<u32>().ok()
}
@@ -1151,6 +1185,34 @@ mod tests {
}
}
#[test]
fn from_env_reads_api_runtime_performance_settings() {
let _guard = ENV_LOCK
.get_or_init(|| Mutex::new(()))
.lock()
.expect("env lock should not poison");
unsafe {
std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG");
std::env::remove_var("GENARRATIVE_API_WORKER_THREADS");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048");
std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6");
std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true");
}
let config = AppConfig::from_env();
assert_eq!(config.listen_backlog, 2048);
assert_eq!(config.worker_threads, Some(6));
assert!(config.otel_enabled);
unsafe {
std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG");
std::env::remove_var("GENARRATIVE_API_WORKER_THREADS");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
}
}
#[test]
fn from_env_reads_wechat_pay_settings() {
let _guard = ENV_LOCK

View File

@@ -75,6 +75,7 @@ mod square_hole_agent_turn;
mod state;
mod story_battles;
mod story_sessions;
mod telemetry;
mod tracking;
mod vector_engine_audio_generation;
mod visual_novel;
@@ -85,8 +86,15 @@ mod wechat_provider;
mod work_author;
mod work_play_tracking;
use shared_logging::init_tracing;
use std::{collections::HashSet, env, fs, io, panic, thread, time::Duration};
use shared_logging::{OtelConfig, init_tracing};
use socket2::{Domain, Protocol, Socket, Type};
use std::{
collections::HashSet,
env, fs, io,
net::{SocketAddr, TcpListener as StdTcpListener},
panic, thread,
time::Duration,
};
use tokio::net::TcpListener;
use tokio::runtime::Builder as TokioRuntimeBuilder;
use tokio::time::timeout;
@@ -103,12 +111,18 @@ fn main() -> Result<(), io::Error> {
.name("api-server-bootstrap".to_string())
.stack_size(API_SERVER_STARTUP_STACK_SIZE_BYTES)
.spawn(|| {
TokioRuntimeBuilder::new_multi_thread()
load_local_env_files();
let config = AppConfig::from_env();
let mut runtime_builder = TokioRuntimeBuilder::new_multi_thread();
runtime_builder
.enable_all()
.thread_name("api-server-worker")
.thread_stack_size(API_SERVER_STARTUP_STACK_SIZE_BYTES)
.build()?
.block_on(run_server())
.thread_stack_size(API_SERVER_STARTUP_STACK_SIZE_BYTES);
if let Some(worker_threads) = config.worker_threads {
runtime_builder.worker_threads(worker_threads);
}
runtime_builder.build()?.block_on(run_server(config))
})?;
match server_thread.join() {
@@ -117,28 +131,49 @@ fn main() -> Result<(), io::Error> {
}
}
async fn run_server() -> Result<(), io::Error> {
// 运行本地开发与联调时,优先从仓库根目录加载本地变量。
// 只尊重外层 shell 先注入的变量;后续本地文件需要能覆盖前序本地文件。
load_local_env_files();
// 统一先从配置对象读取监听地址,避免后续把环境变量读取散落到入口和路由层。
let config = AppConfig::from_env();
init_tracing(&config.log_filter)?;
async fn run_server(config: AppConfig) -> Result<(), io::Error> {
init_tracing(
&config.log_filter,
OtelConfig {
enabled: config.otel_enabled,
},
)?;
let bind_address = config.bind_socket_addr();
let listener = TcpListener::bind(bind_address).await?;
let listen_backlog = config.listen_backlog;
let worker_threads = config.worker_threads;
let otel_enabled = config.otel_enabled;
let listener = build_tcp_listener(bind_address, listen_backlog)?;
let state = restore_app_state_for_startup(config)
.await
.map_err(|error| std::io::Error::other(format!("初始化应用状态失败:{error}")))?;
let router = build_router(state);
info!(%bind_address, "api-server 已完成 tracing 初始化并开始监听");
info!(
%bind_address,
listen_backlog,
worker_threads = worker_threads.unwrap_or(0),
otel_enabled,
"api-server 已完成 tracing 初始化并开始监听"
);
axum::serve(listener, router).await
}
fn build_tcp_listener(
bind_address: SocketAddr,
listen_backlog: i32,
) -> Result<TcpListener, io::Error> {
let domain = Domain::for_address(bind_address);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&bind_address.into())?;
socket.listen(listen_backlog)?;
TcpListener::from_std(StdTcpListener::from(socket))
}
async fn restore_app_state_for_startup(
config: AppConfig,
) -> Result<AppState, state::AppStateInitError> {

View File

@@ -0,0 +1,182 @@
use axum::{
body::Body,
extract::State,
http::{HeaderMap, Request, Response},
middleware::Next,
};
use opentelemetry::{KeyValue, global, metrics::Counter};
use tracing::{info, warn};
use crate::{request_context::resolve_request_id, state::AppState};
// 集中维护 api-server HTTP 观测,避免在 handler 中散落高基数字段或重复创建 instrument。
pub async fn record_http_observability(
State(state): State<AppState>,
request: Request<Body>,
next: Next,
) -> Response<Body> {
let method = request.method().as_str().to_string();
let route = observability_route(request.uri().path());
let scheme = resolve_request_scheme(request.headers());
let path = request.uri().path().to_string();
let request_id = resolve_request_id(&request).unwrap_or_else(|| "unknown".to_string());
let base_labels = http_base_labels(method.clone(), route.clone());
let metrics = http_metrics();
metrics.in_flight.add(1, &base_labels);
let started_at = std::time::Instant::now();
let response = next.run(request).await;
let status = response.status().as_u16();
let status_class = status_class(status);
let latency_ms = started_at.elapsed().as_millis().min(u64::MAX as u128) as u64;
let slow_request = latency_ms >= state.config.slow_request_threshold_ms;
let labels = http_response_labels(base_labels, status);
metrics.requests.add(1, &labels);
metrics
.duration
.record(started_at.elapsed().as_secs_f64(), &labels);
metrics.in_flight.add(-1, &labels[..2]);
if slow_request {
warn!(
request_id = %request_id,
http.request.method = %method,
http.route = %route,
url.scheme = %scheme,
url.path = %path,
http.response.status_code = status,
status,
status_class,
latency_ms,
slow_request = true,
"http request completed slowly"
);
} else {
info!(
request_id = %request_id,
http.request.method = %method,
http.route = %route,
url.scheme = %scheme,
url.path = %path,
http.response.status_code = status,
status,
status_class,
latency_ms,
slow_request = false,
"http request completed"
);
}
response
}
struct HttpMetrics {
requests: Counter<u64>,
in_flight: opentelemetry::metrics::UpDownCounter<i64>,
duration: opentelemetry::metrics::Histogram<f64>,
}
fn http_metrics() -> &'static HttpMetrics {
static METRICS: std::sync::OnceLock<HttpMetrics> = std::sync::OnceLock::new();
METRICS.get_or_init(|| {
let meter = global::meter("genarrative-api");
HttpMetrics {
requests: meter
.u64_counter("genarrative.http.server.requests")
.with_description("HTTP request count grouped by route and status class")
.build(),
in_flight: meter
.i64_up_down_counter("http.server.active_requests")
.with_unit("{request}")
.with_description("Number of active HTTP server requests")
.build(),
duration: meter
.f64_histogram("http.server.request.duration")
.with_unit("s")
.with_description("Duration of HTTP server requests")
.build(),
}
})
}
fn http_base_labels(method: String, route: String) -> Vec<KeyValue> {
vec![
KeyValue::new("http.request.method", method),
KeyValue::new("http.route", route),
]
}
fn http_response_labels(mut labels: Vec<KeyValue>, status: u16) -> Vec<KeyValue> {
labels.push(KeyValue::new("status_class", status_class(status)));
labels
}
fn status_class(status: u16) -> &'static str {
match status {
100..=199 => "1xx",
200..=299 => "2xx",
300..=399 => "3xx",
400..=499 => "4xx",
500..=599 => "5xx",
_ => "unknown",
}
}
pub(crate) fn observability_route(path: &str) -> String {
if path.starts_with("/api/runtime/puzzle/gallery") {
"/api/runtime/puzzle/gallery".to_string()
} else if path.starts_with("/api/runtime/custom-world-gallery") {
"/api/runtime/custom-world-gallery".to_string()
} else if path.starts_with("/admin/api/") {
"/admin/api/*".to_string()
} else if path.starts_with("/api/") {
"/api/*".to_string()
} else {
"other".to_string()
}
}
pub(crate) fn resolve_request_scheme(headers: &HeaderMap) -> String {
headers
.get("x-forwarded-proto")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.split(',').next())
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("http")
.to_string()
}
#[cfg(test)]
mod tests {
use axum::http::{HeaderMap, HeaderValue};
use super::{observability_route, resolve_request_scheme};
#[test]
fn observability_route_keeps_metrics_labels_low_cardinality() {
assert_eq!(
observability_route("/api/runtime/puzzle/gallery?cursor=abc"),
"/api/runtime/puzzle/gallery"
);
assert_eq!(
observability_route("/api/runtime/puzzle/runs/run-123/history"),
"/api/*"
);
assert_eq!(
observability_route("/admin/api/debug/http"),
"/admin/api/*"
);
}
#[test]
fn resolve_request_scheme_uses_forwarded_proto_first_value() {
let mut headers = HeaderMap::new();
headers.insert(
"x-forwarded-proto",
HeaderValue::from_static("https, http"),
);
assert_eq!(resolve_request_scheme(&headers), "https");
}
}