Merge branch 'codex/cache-view-procedure-hotpaths'
This commit is contained in:
@@ -11,6 +11,7 @@ base64 = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
image = { workspace = true, features = ["jpeg", "png", "webp"] }
|
||||
http-body-util = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json", "multipart", "rustls-tls"] }
|
||||
webp = { workspace = true }
|
||||
module-ai = { workspace = true }
|
||||
@@ -45,7 +46,7 @@ shared-kernel = { workspace = true }
|
||||
shared-logging = { workspace = true }
|
||||
socket2 = { workspace = true }
|
||||
spacetime-client = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time"] }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync"] }
|
||||
tokio-stream = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
time = { workspace = true, features = ["formatting"] }
|
||||
@@ -57,6 +58,9 @@ urlencoding = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v4"] }
|
||||
zip = { workspace = true, features = ["deflate"] }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
windows-sys = { workspace = true, features = ["Win32_Foundation", "Win32_System_Diagnostics_ToolHelp", "Win32_System_ProcessStatus", "Win32_System_Threading"] }
|
||||
|
||||
[dev-dependencies]
|
||||
base64 = { workspace = true }
|
||||
hmac = { workspace = true }
|
||||
|
||||
@@ -1,4 +1,13 @@
|
||||
use axum::Json;
|
||||
use std::convert::Infallible;
|
||||
|
||||
use axum::{
|
||||
Json,
|
||||
body::Body,
|
||||
http::{HeaderValue, header},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use futures_util::stream;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
#[cfg(test)]
|
||||
@@ -32,6 +41,30 @@ where
|
||||
Json(serde_json::to_value(data).unwrap_or(Value::Null))
|
||||
}
|
||||
|
||||
pub fn json_success_data_bytes_response(
|
||||
request_context: Option<&RequestContext>,
|
||||
data_json: Bytes,
|
||||
) -> Response {
|
||||
if let Some(context) = request_context
|
||||
&& context.wants_envelope()
|
||||
{
|
||||
let meta = serde_json::to_vec(&build_api_response_meta(Some(context)))
|
||||
.map(Bytes::from)
|
||||
.unwrap_or_else(|_| Bytes::from_static(b"null"));
|
||||
let chunks = [
|
||||
Bytes::from_static(b"{\"ok\":true,\"data\":"),
|
||||
data_json,
|
||||
Bytes::from_static(b",\"error\":null,\"meta\":"),
|
||||
meta,
|
||||
Bytes::from_static(b"}"),
|
||||
];
|
||||
let stream = stream::iter(chunks.into_iter().map(Ok::<Bytes, Infallible>));
|
||||
return json_body_response(Body::from_stream(stream));
|
||||
}
|
||||
|
||||
json_bytes_response(data_json)
|
||||
}
|
||||
|
||||
pub fn json_error_body(
|
||||
request_context: Option<&RequestContext>,
|
||||
error: &ApiErrorPayload,
|
||||
@@ -65,6 +98,19 @@ fn build_api_response_meta(request_context: Option<&RequestContext>) -> ApiRespo
|
||||
)
|
||||
}
|
||||
|
||||
fn json_bytes_response(bytes: Bytes) -> Response {
|
||||
json_body_response(Body::from(bytes))
|
||||
}
|
||||
|
||||
fn json_body_response(body: Body) -> Response {
|
||||
let mut response = body.into_response();
|
||||
response.headers_mut().insert(
|
||||
header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/json; charset=utf-8"),
|
||||
);
|
||||
response
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -106,6 +152,31 @@ mod tests {
|
||||
assert!(body.get("meta").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn success_response_streams_cached_data_inside_standard_envelope() {
|
||||
use http_body_util::BodyExt;
|
||||
|
||||
let request_context = build_request_context(true);
|
||||
let response = json_success_data_bytes_response(
|
||||
Some(&request_context),
|
||||
Bytes::from_static(br#"{"items":[]}"#),
|
||||
);
|
||||
let body = response
|
||||
.into_body()
|
||||
.collect()
|
||||
.await
|
||||
.expect("response body should collect")
|
||||
.to_bytes();
|
||||
let payload: Value = serde_json::from_slice(&body).expect("body should be json");
|
||||
|
||||
assert_eq!(payload["ok"], Value::Bool(true));
|
||||
assert_eq!(payload["data"]["items"], Value::Array(Vec::new()));
|
||||
assert_eq!(
|
||||
payload["meta"]["requestId"],
|
||||
Value::String("req-test".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn error_body_returns_legacy_shape_without_envelope_header() {
|
||||
let request_context = build_request_context(false);
|
||||
|
||||
@@ -15,6 +15,7 @@ use tracing::{Level, Span, error, info_span};
|
||||
|
||||
use crate::{
|
||||
auth::{AuthenticatedAccessToken, require_bearer_auth},
|
||||
backpressure::limit_concurrent_requests,
|
||||
creation_entry_config::require_creation_entry_route_enabled,
|
||||
error_middleware::normalize_error_response,
|
||||
modules,
|
||||
@@ -76,6 +77,11 @@ pub fn build_router(state: AppState) -> Router {
|
||||
state.clone(),
|
||||
require_creation_entry_route_enabled,
|
||||
))
|
||||
// HTTP 背压在业务路由外侧快拒绝,避免过载请求继续占用 SpacetimeDB facade 与业务执行资源。
|
||||
.layer(middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
limit_concurrent_requests,
|
||||
))
|
||||
// 错误归一化层放在 tracing 里侧,让 tracing 记录到最终对外返回的状态与错误体形态。
|
||||
.layer(middleware::from_fn(normalize_error_response))
|
||||
// 响应头回写放在错误归一化外侧,确保最终写回的是归一化后的最终响应。
|
||||
|
||||
245
server-rs/crates/api-server/src/backpressure.rs
Normal file
245
server-rs/crates/api-server/src/backpressure.rs
Normal file
@@ -0,0 +1,245 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::{Request, State},
|
||||
http::{HeaderValue, StatusCode, header::RETRY_AFTER},
|
||||
middleware::Next,
|
||||
response::Response,
|
||||
};
|
||||
use http_body_util::BodyExt;
|
||||
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
|
||||
|
||||
use crate::{
|
||||
http_error::AppError,
|
||||
request_context::RequestContext,
|
||||
state::{AppState, HttpRequestPermitPool},
|
||||
};
|
||||
|
||||
pub async fn limit_concurrent_requests(
|
||||
State(state): State<AppState>,
|
||||
request: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
if should_bypass_backpressure(&request) {
|
||||
return next.run(request).await;
|
||||
}
|
||||
|
||||
let Some(permit_pool) = state.http_request_permit_pool() else {
|
||||
return next.run(request).await;
|
||||
};
|
||||
|
||||
match acquire_http_request_permit(permit_pool) {
|
||||
Ok(permit) => hold_permit_until_response_body_dropped(next.run(request).await, permit),
|
||||
Err(_) => reject_overloaded_request(&request),
|
||||
}
|
||||
}
|
||||
|
||||
fn acquire_http_request_permit(
|
||||
permit_pool: Arc<HttpRequestPermitPool>,
|
||||
) -> Result<HttpRequestPermitGuard, TryAcquireError> {
|
||||
match permit_pool.clone().try_acquire_owned() {
|
||||
Ok(permit) => {
|
||||
crate::telemetry::update_http_request_permits_available(permit_pool.available_permits());
|
||||
Ok(HttpRequestPermitGuard {
|
||||
permit: Some(permit),
|
||||
permit_pool,
|
||||
})
|
||||
}
|
||||
Err(error) => {
|
||||
crate::telemetry::update_http_request_permits_available(permit_pool.available_permits());
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn hold_permit_until_response_body_dropped(
|
||||
response: Response,
|
||||
permit: HttpRequestPermitGuard,
|
||||
) -> Response {
|
||||
response.map(|body| {
|
||||
Body::new(body.map_frame(move |frame| {
|
||||
let _permit_guard = &permit;
|
||||
frame
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
struct HttpRequestPermitGuard {
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
permit_pool: Arc<HttpRequestPermitPool>,
|
||||
}
|
||||
|
||||
impl Drop for HttpRequestPermitGuard {
|
||||
fn drop(&mut self) {
|
||||
drop(self.permit.take());
|
||||
crate::telemetry::update_http_request_permits_available(self.permit_pool.available_permits());
|
||||
}
|
||||
}
|
||||
|
||||
fn reject_overloaded_request(request: &Request<Body>) -> Response {
|
||||
let request_context = request.extensions().get::<RequestContext>().cloned();
|
||||
let mut response = AppError::from_status(StatusCode::TOO_MANY_REQUESTS)
|
||||
.with_message("服务繁忙,请稍后重试")
|
||||
.into_response_with_context(request_context.as_ref());
|
||||
response
|
||||
.headers_mut()
|
||||
.insert(RETRY_AFTER, HeaderValue::from_static("1"));
|
||||
response
|
||||
}
|
||||
|
||||
fn should_bypass_backpressure(request: &Request<Body>) -> bool {
|
||||
request.uri().path() == "/healthz"
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
Router,
|
||||
body::Body,
|
||||
extract::Extension,
|
||||
http::{Request, StatusCode, header::RETRY_AFTER},
|
||||
middleware,
|
||||
routing::get,
|
||||
};
|
||||
use tokio::sync::Notify;
|
||||
use tower::ServiceExt;
|
||||
|
||||
use crate::{config::AppConfig, state::AppState};
|
||||
|
||||
use super::limit_concurrent_requests;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HeldRequestGate {
|
||||
entered: Arc<Notify>,
|
||||
release: Arc<Notify>,
|
||||
}
|
||||
|
||||
async fn held_request(Extension(gate): Extension<HeldRequestGate>) -> &'static str {
|
||||
gate.entered.notify_one();
|
||||
gate.release.notified().await;
|
||||
"ok"
|
||||
}
|
||||
|
||||
async fn fast_request() -> &'static str {
|
||||
"ok"
|
||||
}
|
||||
|
||||
fn test_request(path: &str) -> Request<Body> {
|
||||
Request::builder()
|
||||
.uri(path)
|
||||
.body(Body::empty())
|
||||
.expect("test request should build")
|
||||
}
|
||||
|
||||
fn build_test_app(max_concurrent_requests: usize, gate: HeldRequestGate) -> Router {
|
||||
let mut config = AppConfig::default();
|
||||
config.max_concurrent_requests = Some(max_concurrent_requests);
|
||||
let state = AppState::new(config).expect("state should build");
|
||||
|
||||
Router::new()
|
||||
.route("/held", get(held_request))
|
||||
.route("/fast", get(fast_request))
|
||||
.route("/healthz", get(fast_request))
|
||||
.layer(middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
limit_concurrent_requests,
|
||||
))
|
||||
.layer(Extension(gate))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_429_when_concurrency_permits_are_exhausted() {
|
||||
let gate = HeldRequestGate {
|
||||
entered: Arc::new(Notify::new()),
|
||||
release: Arc::new(Notify::new()),
|
||||
};
|
||||
let app = build_test_app(1, gate.clone());
|
||||
let entered = gate.entered.notified();
|
||||
|
||||
let held_response = tokio::spawn(app.clone().oneshot(test_request("/held")));
|
||||
entered.await;
|
||||
|
||||
let rejected_response = app
|
||||
.clone()
|
||||
.oneshot(test_request("/fast"))
|
||||
.await
|
||||
.expect("rejected request should complete");
|
||||
assert_eq!(rejected_response.status(), StatusCode::TOO_MANY_REQUESTS);
|
||||
assert_eq!(
|
||||
rejected_response
|
||||
.headers()
|
||||
.get(RETRY_AFTER)
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("1")
|
||||
);
|
||||
|
||||
gate.release.notify_one();
|
||||
let completed_response = held_response
|
||||
.await
|
||||
.expect("held request task should join")
|
||||
.expect("held request should complete");
|
||||
assert_eq!(completed_response.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn healthz_bypasses_concurrency_backpressure() {
|
||||
let gate = HeldRequestGate {
|
||||
entered: Arc::new(Notify::new()),
|
||||
release: Arc::new(Notify::new()),
|
||||
};
|
||||
let app = build_test_app(1, gate.clone());
|
||||
let entered = gate.entered.notified();
|
||||
|
||||
let held_response = tokio::spawn(app.clone().oneshot(test_request("/held")));
|
||||
entered.await;
|
||||
|
||||
let health_response = app
|
||||
.clone()
|
||||
.oneshot(test_request("/healthz"))
|
||||
.await
|
||||
.expect("healthz request should complete");
|
||||
assert_eq!(health_response.status(), StatusCode::OK);
|
||||
|
||||
gate.release.notify_one();
|
||||
let completed_response = held_response
|
||||
.await
|
||||
.expect("held request task should join")
|
||||
.expect("held request should complete");
|
||||
assert_eq!(completed_response.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn permit_is_held_until_response_body_is_dropped() {
|
||||
let gate = HeldRequestGate {
|
||||
entered: Arc::new(Notify::new()),
|
||||
release: Arc::new(Notify::new()),
|
||||
};
|
||||
let app = build_test_app(1, gate);
|
||||
|
||||
let first_response = app
|
||||
.clone()
|
||||
.oneshot(test_request("/fast"))
|
||||
.await
|
||||
.expect("first request should complete");
|
||||
assert_eq!(first_response.status(), StatusCode::OK);
|
||||
|
||||
let rejected_response = app
|
||||
.clone()
|
||||
.oneshot(test_request("/fast"))
|
||||
.await
|
||||
.expect("second request should complete");
|
||||
assert_eq!(rejected_response.status(), StatusCode::TOO_MANY_REQUESTS);
|
||||
|
||||
drop(first_response);
|
||||
|
||||
let accepted_response = app
|
||||
.oneshot(test_request("/fast"))
|
||||
.await
|
||||
.expect("third request should complete");
|
||||
assert_eq!(accepted_response.status(), StatusCode::OK);
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ pub struct AppConfig {
|
||||
pub bind_port: u16,
|
||||
pub listen_backlog: i32,
|
||||
pub worker_threads: Option<usize>,
|
||||
pub max_concurrent_requests: Option<usize>,
|
||||
pub log_filter: String,
|
||||
pub otel_enabled: bool,
|
||||
pub admin_username: Option<String>,
|
||||
@@ -152,6 +153,7 @@ impl Default for AppConfig {
|
||||
bind_port: 3000,
|
||||
listen_backlog: 1024,
|
||||
worker_threads: None,
|
||||
max_concurrent_requests: None,
|
||||
log_filter: "info,tower_http=info".to_string(),
|
||||
otel_enabled: false,
|
||||
admin_username: None,
|
||||
@@ -315,6 +317,11 @@ impl AppConfig {
|
||||
if let Some(worker_threads) = read_first_usize_env(&["GENARRATIVE_API_WORKER_THREADS"]) {
|
||||
config.worker_threads = Some(worker_threads);
|
||||
}
|
||||
if let Some(max_concurrent_requests) =
|
||||
read_first_usize_env(&["GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"])
|
||||
{
|
||||
config.max_concurrent_requests = Some(max_concurrent_requests);
|
||||
}
|
||||
if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) {
|
||||
config.otel_enabled = otel_enabled;
|
||||
}
|
||||
@@ -1198,20 +1205,24 @@ mod tests {
|
||||
unsafe {
|
||||
std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG");
|
||||
std::env::remove_var("GENARRATIVE_API_WORKER_THREADS");
|
||||
std::env::remove_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
|
||||
std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048");
|
||||
std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6");
|
||||
std::env::set_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS", "128");
|
||||
std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true");
|
||||
}
|
||||
|
||||
let config = AppConfig::from_env();
|
||||
assert_eq!(config.listen_backlog, 2048);
|
||||
assert_eq!(config.worker_threads, Some(6));
|
||||
assert_eq!(config.max_concurrent_requests, Some(128));
|
||||
assert!(config.otel_enabled);
|
||||
|
||||
unsafe {
|
||||
std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG");
|
||||
std::env::remove_var("GENARRATIVE_API_WORKER_THREADS");
|
||||
std::env::remove_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS");
|
||||
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ mod auth_payload;
|
||||
mod auth_public_user;
|
||||
mod auth_session;
|
||||
mod auth_sessions;
|
||||
mod backpressure;
|
||||
mod bark_battle;
|
||||
mod big_fish;
|
||||
mod big_fish_agent_turn;
|
||||
@@ -55,9 +56,11 @@ mod password_management;
|
||||
mod phone_auth;
|
||||
mod platform_errors;
|
||||
mod profile_identity;
|
||||
mod process_metrics;
|
||||
mod prompt;
|
||||
mod puzzle;
|
||||
mod puzzle_agent_turn;
|
||||
mod puzzle_gallery_cache;
|
||||
mod refresh_session;
|
||||
mod registration_reward;
|
||||
mod request_context;
|
||||
@@ -138,6 +141,8 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
enabled: config.otel_enabled,
|
||||
},
|
||||
)?;
|
||||
process_metrics::register_process_metrics();
|
||||
telemetry::register_http_runtime_metrics();
|
||||
|
||||
let bind_address = config.bind_socket_addr();
|
||||
let listen_backlog = config.listen_backlog;
|
||||
@@ -148,6 +153,7 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
|
||||
let state = restore_app_state_for_startup(config)
|
||||
.await
|
||||
.map_err(|error| std::io::Error::other(format!("初始化应用状态失败:{error}")))?;
|
||||
state.puzzle_gallery_cache().spawn_cleanup_task();
|
||||
let router = build_router(state);
|
||||
|
||||
info!(
|
||||
|
||||
306
server-rs/crates/api-server/src/process_metrics.rs
Normal file
306
server-rs/crates/api-server/src/process_metrics.rs
Normal file
@@ -0,0 +1,306 @@
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use opentelemetry::global;
|
||||
use tracing::warn;
|
||||
|
||||
// 进程指标只描述 api-server 自身,不携带请求、用户或作品维度,避免 OTLP 指标高基数膨胀。
|
||||
pub(crate) fn register_process_metrics() {
|
||||
static REGISTERED: OnceLock<()> = OnceLock::new();
|
||||
REGISTERED.get_or_init(register_process_metrics_once);
|
||||
}
|
||||
|
||||
fn register_process_metrics_once() {
|
||||
let meter = global::meter("genarrative-api");
|
||||
|
||||
meter
|
||||
.i64_observable_up_down_counter("process.memory.usage")
|
||||
.with_unit("By")
|
||||
.with_description("api-server process physical memory usage")
|
||||
.with_callback(|observer| {
|
||||
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
|
||||
return;
|
||||
};
|
||||
observer.observe(to_i64(snapshot.rss_bytes), &[]);
|
||||
})
|
||||
.build();
|
||||
|
||||
meter
|
||||
.i64_observable_up_down_counter("process.memory.virtual")
|
||||
.with_unit("By")
|
||||
.with_description("api-server committed virtual memory")
|
||||
.with_callback(|observer| {
|
||||
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
|
||||
return;
|
||||
};
|
||||
if let Some(virtual_bytes) = snapshot.virtual_bytes {
|
||||
observer.observe(to_i64(virtual_bytes), &[]);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
meter
|
||||
.i64_observable_up_down_counter("genarrative.process.memory.private")
|
||||
.with_unit("By")
|
||||
.with_description("api-server private memory for local diagnostics")
|
||||
.with_callback(|observer| {
|
||||
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
|
||||
return;
|
||||
};
|
||||
if let Some(private_bytes) = snapshot.private_bytes {
|
||||
observer.observe(to_i64(private_bytes), &[]);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
meter
|
||||
.i64_observable_up_down_counter("process.thread.count")
|
||||
.with_unit("{thread}")
|
||||
.with_description("api-server process thread count")
|
||||
.with_callback(|observer| {
|
||||
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
|
||||
return;
|
||||
};
|
||||
observer.observe(to_i64(snapshot.thread_count), &[]);
|
||||
})
|
||||
.build();
|
||||
|
||||
meter
|
||||
.i64_observable_up_down_counter("process.windows.handle.count")
|
||||
.with_unit("{handle}")
|
||||
.with_description("api-server process handle count on Windows")
|
||||
.with_callback(|observer| {
|
||||
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
|
||||
return;
|
||||
};
|
||||
if let Some(handle_count) = snapshot.windows_handle_count {
|
||||
observer.observe(to_i64(handle_count), &[]);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
meter
|
||||
.i64_observable_up_down_counter("process.unix.file_descriptor.count")
|
||||
.with_unit("{file_descriptor}")
|
||||
.with_description("api-server process file descriptor count on Unix")
|
||||
.with_callback(|observer| {
|
||||
let Some(snapshot) = ProcessMetricsSnapshot::collect() else {
|
||||
return;
|
||||
};
|
||||
if let Some(fd_count) = snapshot.unix_fd_count {
|
||||
observer.observe(to_i64(fd_count), &[]);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
fn to_i64(value: u64) -> i64 {
|
||||
value.min(i64::MAX as u64) as i64
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct ProcessMetricsSnapshot {
|
||||
rss_bytes: u64,
|
||||
private_bytes: Option<u64>,
|
||||
virtual_bytes: Option<u64>,
|
||||
thread_count: u64,
|
||||
windows_handle_count: Option<u64>,
|
||||
unix_fd_count: Option<u64>,
|
||||
}
|
||||
|
||||
impl ProcessMetricsSnapshot {
|
||||
fn collect() -> Option<Self> {
|
||||
collect_process_metrics()
|
||||
.inspect_err(|error| {
|
||||
warn!(%error, "采集 api-server 进程内存指标失败");
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
|
||||
use windows_sys::Win32::{
|
||||
System::{
|
||||
ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS_EX},
|
||||
Threading::{GetCurrentProcess, GetCurrentProcessId, GetProcessHandleCount},
|
||||
},
|
||||
};
|
||||
|
||||
let handle = unsafe { GetCurrentProcess() };
|
||||
let mut counters = PROCESS_MEMORY_COUNTERS_EX {
|
||||
cb: std::mem::size_of::<PROCESS_MEMORY_COUNTERS_EX>() as u32,
|
||||
..Default::default()
|
||||
};
|
||||
let ok = unsafe {
|
||||
GetProcessMemoryInfo(
|
||||
handle,
|
||||
std::ptr::addr_of_mut!(counters).cast(),
|
||||
counters.cb,
|
||||
)
|
||||
};
|
||||
if ok == 0 {
|
||||
return Err("GetProcessMemoryInfo returned false".to_string());
|
||||
}
|
||||
|
||||
let mut handle_count = 0_u32;
|
||||
let handle_count = if unsafe { GetProcessHandleCount(handle, &mut handle_count) } == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(u64::from(handle_count))
|
||||
};
|
||||
|
||||
Ok(ProcessMetricsSnapshot {
|
||||
rss_bytes: counters.WorkingSetSize as u64,
|
||||
private_bytes: Some(counters.PrivateUsage as u64),
|
||||
virtual_bytes: Some(counters.PrivateUsage as u64),
|
||||
thread_count: u64::from(unsafe { GetCurrentProcessId() }.thread_count()?),
|
||||
windows_handle_count: handle_count,
|
||||
unix_fd_count: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
trait WindowsProcessThreadCount {
|
||||
fn thread_count(self) -> Result<u32, String>;
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
impl WindowsProcessThreadCount for u32 {
|
||||
fn thread_count(self) -> Result<u32, String> {
|
||||
use windows_sys::Win32::{
|
||||
Foundation::{CloseHandle, INVALID_HANDLE_VALUE},
|
||||
System::Diagnostics::ToolHelp::{
|
||||
CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next,
|
||||
TH32CS_SNAPPROCESS,
|
||||
},
|
||||
};
|
||||
|
||||
let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
|
||||
if snapshot == INVALID_HANDLE_VALUE {
|
||||
return Err("CreateToolhelp32Snapshot returned INVALID_HANDLE_VALUE".to_string());
|
||||
}
|
||||
|
||||
let mut entry = PROCESSENTRY32 {
|
||||
dwSize: std::mem::size_of::<PROCESSENTRY32>() as u32,
|
||||
..Default::default()
|
||||
};
|
||||
let mut found = None;
|
||||
let mut ok = unsafe { Process32First(snapshot, &mut entry) };
|
||||
while ok != 0 {
|
||||
if entry.th32ProcessID == self {
|
||||
found = Some(entry.cntThreads);
|
||||
break;
|
||||
}
|
||||
ok = unsafe { Process32Next(snapshot, &mut entry) };
|
||||
}
|
||||
unsafe {
|
||||
CloseHandle(snapshot);
|
||||
}
|
||||
|
||||
found.ok_or_else(|| format!("process {self} not found in ToolHelp snapshot"))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
|
||||
let status = std::fs::read_to_string("/proc/self/status")
|
||||
.map_err(|error| format!("read /proc/self/status failed: {error}"))?;
|
||||
let statm = std::fs::read_to_string("/proc/self/statm")
|
||||
.map_err(|error| format!("read /proc/self/statm failed: {error}"))?;
|
||||
let page_size = linux_page_size_bytes()?;
|
||||
|
||||
let rss_bytes = parse_status_kb(&status, "VmRSS:")
|
||||
.map(|value| value * 1024)
|
||||
.or_else(|| parse_statm_pages(&statm, 1).map(|value| value * page_size))
|
||||
.ok_or_else(|| "missing VmRSS/statm resident field".to_string())?;
|
||||
let virtual_bytes = parse_status_kb(&status, "VmSize:")
|
||||
.map(|value| value * 1024)
|
||||
.or_else(|| parse_statm_pages(&statm, 0).map(|value| value * page_size))
|
||||
.ok_or_else(|| "missing VmSize/statm size field".to_string())?;
|
||||
let private_bytes = parse_status_kb(&status, "VmData:").map(|value| value * 1024);
|
||||
let thread_count = parse_status_u64(&status, "Threads:")
|
||||
.ok_or_else(|| "missing Threads field".to_string())?;
|
||||
|
||||
Ok(ProcessMetricsSnapshot {
|
||||
rss_bytes,
|
||||
private_bytes,
|
||||
virtual_bytes: Some(virtual_bytes),
|
||||
thread_count,
|
||||
windows_handle_count: None,
|
||||
unix_fd_count: linux_fd_count(),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn linux_page_size_bytes() -> Result<u64, String> {
|
||||
let output = std::process::Command::new("getconf")
|
||||
.arg("PAGESIZE")
|
||||
.output()
|
||||
.map_err(|error| format!("getconf PAGESIZE failed: {error}"))?;
|
||||
if !output.status.success() {
|
||||
return Err(format!("getconf PAGESIZE exited with {}", output.status));
|
||||
}
|
||||
let text = String::from_utf8(output.stdout)
|
||||
.map_err(|error| format!("getconf PAGESIZE output is not utf8: {error}"))?;
|
||||
text.trim()
|
||||
.parse::<u64>()
|
||||
.map_err(|error| format!("parse PAGESIZE failed: {error}"))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn linux_fd_count() -> Option<u64> {
|
||||
let entries = std::fs::read_dir("/proc/self/fd").ok()?;
|
||||
Some(entries.filter_map(Result::ok).count() as u64)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn parse_status_kb(status: &str, key: &str) -> Option<u64> {
|
||||
parse_status_u64(status, key)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn parse_status_u64(status: &str, key: &str) -> Option<u64> {
|
||||
status.lines().find_map(|line| {
|
||||
let rest = line.strip_prefix(key)?.trim();
|
||||
rest.split_whitespace().next()?.parse::<u64>().ok()
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn parse_statm_pages(statm: &str, index: usize) -> Option<u64> {
|
||||
statm
|
||||
.split_whitespace()
|
||||
.nth(index)?
|
||||
.parse::<u64>()
|
||||
.ok()
|
||||
}
|
||||
|
||||
#[cfg(not(any(windows, target_os = "linux")))]
|
||||
fn collect_process_metrics() -> Result<ProcessMetricsSnapshot, String> {
|
||||
Err("process metrics are only implemented for Windows and Linux".to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[cfg(target_os = "linux")]
|
||||
use super::{parse_statm_pages, parse_status_kb, parse_status_u64};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
#[test]
|
||||
fn parses_linux_proc_status_memory_fields() {
|
||||
let status = "Name:\tapi-server\nVmSize:\t 123456 kB\nVmRSS:\t 7890 kB\nVmData:\t 3456 kB\nThreads:\t37\n";
|
||||
|
||||
assert_eq!(parse_status_kb(status, "VmRSS:"), Some(7890));
|
||||
assert_eq!(parse_status_kb(status, "VmSize:"), Some(123456));
|
||||
assert_eq!(parse_status_kb(status, "VmData:"), Some(3456));
|
||||
assert_eq!(parse_status_u64(status, "Threads:"), Some(37));
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
#[test]
|
||||
fn parses_linux_statm_pages() {
|
||||
assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 0), Some(100));
|
||||
assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 1), Some(20));
|
||||
assert_eq!(parse_statm_pages("100 20", 7), None);
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,7 @@ use shared_contracts::{
|
||||
PuzzleResultPreviewBlockerResponse, PuzzleResultPreviewEnvelopeResponse,
|
||||
PuzzleResultPreviewFindingResponse, SendPuzzleAgentMessageRequest,
|
||||
},
|
||||
puzzle_gallery::{PuzzleGalleryDetailResponse, PuzzleGalleryResponse},
|
||||
puzzle_gallery::PuzzleGalleryDetailResponse,
|
||||
puzzle_runtime::{
|
||||
AdvancePuzzleNextLevelRequest, DragPuzzlePieceRequest, PuzzleBoardSnapshotResponse,
|
||||
PuzzleCellPositionResponse, PuzzleLeaderboardEntryResponse, PuzzleMergedGroupStateResponse,
|
||||
@@ -59,16 +59,16 @@ use spacetime_client::{
|
||||
PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord,
|
||||
PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord,
|
||||
PuzzleAudioAssetRecord, PuzzleCreatorIntentRecord, PuzzleDraftLevelRecord,
|
||||
PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, PuzzleGeneratedImageCandidateRecord,
|
||||
PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord,
|
||||
PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord,
|
||||
PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord,
|
||||
PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunPauseRecordInput,
|
||||
PuzzleRunPropRecordInput, PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput,
|
||||
PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput,
|
||||
PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput,
|
||||
PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput,
|
||||
SpacetimeClientError,
|
||||
PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, PuzzleGalleryCardRecord,
|
||||
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
|
||||
PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput,
|
||||
PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord,
|
||||
PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord, PuzzleRunDragRecordInput,
|
||||
PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord,
|
||||
PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleSelectCoverImageRecordInput,
|
||||
PuzzleUiBackgroundSaveRecordInput, PuzzleWorkLikeReportRecordInput,
|
||||
PuzzleWorkPointIncentiveClaimRecordInput, PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput,
|
||||
PuzzleWorkUpsertRecordInput, SpacetimeClientError,
|
||||
};
|
||||
use std::convert::Infallible;
|
||||
|
||||
@@ -103,6 +103,7 @@ use crate::{
|
||||
PuzzleAgentTurnRequest, build_failed_finalize_record_input, build_finalize_record_input,
|
||||
run_puzzle_agent_turn,
|
||||
},
|
||||
puzzle_gallery_cache::{build_puzzle_gallery_window_response, puzzle_gallery_cached_json},
|
||||
request_context::RequestContext,
|
||||
state::AppState,
|
||||
vector_engine_audio_generation::{
|
||||
@@ -1528,7 +1529,19 @@ pub async fn claim_puzzle_work_point_incentive(
|
||||
pub async fn list_puzzle_gallery(
|
||||
State(state): State<AppState>,
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
) -> Result<Json<Value>, Response> {
|
||||
) -> Result<Response, Response> {
|
||||
if let Some(response) = state.puzzle_gallery_cache().read_fresh_response().await {
|
||||
crate::telemetry::record_puzzle_gallery_cache_hit();
|
||||
return Ok(puzzle_gallery_cached_json(&request_context, response));
|
||||
}
|
||||
crate::telemetry::record_puzzle_gallery_cache_miss();
|
||||
let _rebuild_guard = state.puzzle_gallery_cache().acquire_rebuild_guard().await;
|
||||
if let Some(response) = state.puzzle_gallery_cache().read_fresh_response().await {
|
||||
crate::telemetry::record_puzzle_gallery_cache_hit();
|
||||
return Ok(puzzle_gallery_cached_json(&request_context, response));
|
||||
}
|
||||
|
||||
let rebuild_started_at = std::time::Instant::now();
|
||||
let items = state
|
||||
.spacetime_client()
|
||||
.list_puzzle_gallery()
|
||||
@@ -1541,15 +1554,32 @@ pub async fn list_puzzle_gallery(
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(json_success_body(
|
||||
Some(&request_context),
|
||||
PuzzleGalleryResponse {
|
||||
items: items
|
||||
.into_iter()
|
||||
.map(|item| map_puzzle_work_summary_response(&state, item))
|
||||
.collect(),
|
||||
},
|
||||
))
|
||||
let response = build_puzzle_gallery_window_response(
|
||||
items
|
||||
.into_iter()
|
||||
.map(|item| map_puzzle_gallery_card_response(&state, item))
|
||||
.collect(),
|
||||
);
|
||||
let cached_response = state
|
||||
.puzzle_gallery_cache()
|
||||
.store_response(response)
|
||||
.await
|
||||
.map_err(|error| {
|
||||
puzzle_error_response(
|
||||
&request_context,
|
||||
PUZZLE_GALLERY_PROVIDER,
|
||||
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({
|
||||
"provider": PUZZLE_GALLERY_PROVIDER,
|
||||
"message": format!("拼图广场缓存序列化失败:{error}"),
|
||||
})),
|
||||
)
|
||||
})?;
|
||||
crate::telemetry::record_puzzle_gallery_cache_rebuild(
|
||||
rebuild_started_at.elapsed(),
|
||||
cached_response.data_json_len(),
|
||||
);
|
||||
|
||||
Ok(puzzle_gallery_cached_json(&request_context, cached_response))
|
||||
}
|
||||
|
||||
pub async fn get_puzzle_gallery_detail(
|
||||
|
||||
@@ -342,6 +342,49 @@ pub(super) fn map_puzzle_work_summary_response(
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn map_puzzle_gallery_card_response(
|
||||
state: &AppState,
|
||||
item: PuzzleGalleryCardRecord,
|
||||
) -> PuzzleWorkSummaryResponse {
|
||||
let author = resolve_work_author_by_user_id(
|
||||
state,
|
||||
&item.owner_user_id,
|
||||
Some(&item.author_display_name),
|
||||
None,
|
||||
);
|
||||
PuzzleWorkSummaryResponse {
|
||||
work_id: item.work_id,
|
||||
profile_id: item.profile_id,
|
||||
owner_user_id: item.owner_user_id,
|
||||
source_session_id: item.source_session_id,
|
||||
author_display_name: author.display_name,
|
||||
work_title: item.work_title,
|
||||
work_description: item.work_description,
|
||||
level_name: item.level_name,
|
||||
summary: item.summary,
|
||||
theme_tags: item.theme_tags,
|
||||
cover_image_src: item.cover_image_src,
|
||||
cover_asset_id: item.cover_asset_id,
|
||||
publication_status: item.publication_status,
|
||||
updated_at: item.updated_at,
|
||||
published_at: item.published_at,
|
||||
play_count: item.play_count,
|
||||
remix_count: item.remix_count,
|
||||
like_count: item.like_count,
|
||||
recent_play_count_7d: item.recent_play_count_7d,
|
||||
point_incentive_total_half_points: item.point_incentive_total_half_points,
|
||||
point_incentive_claimed_points: item.point_incentive_claimed_points,
|
||||
point_incentive_total_points: item.point_incentive_total_half_points as f64 / 2.0,
|
||||
point_incentive_claimable_points: item
|
||||
.point_incentive_total_half_points
|
||||
.saturating_div(2)
|
||||
.saturating_sub(item.point_incentive_claimed_points),
|
||||
publish_ready: item.publish_ready,
|
||||
generation_status: item.generation_status,
|
||||
levels: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn map_puzzle_work_profile_response(
|
||||
state: &AppState,
|
||||
item: PuzzleWorkProfileRecord,
|
||||
|
||||
208
server-rs/crates/api-server/src/puzzle_gallery_cache.rs
Normal file
208
server-rs/crates/api-server/src/puzzle_gallery_cache.rs
Normal file
@@ -0,0 +1,208 @@
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use axum::response::Response;
|
||||
use bytes::Bytes;
|
||||
use shared_contracts::{
|
||||
puzzle_gallery::{PuzzleGalleryResponse, PuzzleGalleryWorkRefResponse},
|
||||
puzzle_works::PuzzleWorkSummaryResponse,
|
||||
};
|
||||
use tokio::{
|
||||
sync::{Mutex, MutexGuard, RwLock},
|
||||
time,
|
||||
};
|
||||
|
||||
use crate::{api_response::json_success_data_bytes_response, request_context::RequestContext};
|
||||
|
||||
const PUZZLE_GALLERY_PRIMARY_ITEM_COUNT: usize = 10;
|
||||
const PUZZLE_GALLERY_PREVIEW_REF_COUNT: usize = 10;
|
||||
const PUZZLE_GALLERY_CACHE_TTL: Duration = Duration::from_secs(5);
|
||||
const PUZZLE_GALLERY_CACHE_MAX_IDLE: Duration = Duration::from_secs(300);
|
||||
const PUZZLE_GALLERY_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PuzzleGalleryCache {
|
||||
inner: Arc<RwLock<Option<PuzzleGalleryCacheEntry>>>,
|
||||
rebuild_lock: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct PuzzleGalleryCacheEntry {
|
||||
data_json: Bytes,
|
||||
built_at: Instant,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PuzzleGalleryCachedResponse {
|
||||
data_json: Bytes,
|
||||
}
|
||||
|
||||
impl PuzzleGalleryCachedResponse {
|
||||
pub fn data_json_len(&self) -> usize {
|
||||
self.data_json.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl PuzzleGalleryCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(None)),
|
||||
rebuild_lock: Arc::new(Mutex::new(())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn acquire_rebuild_guard(&self) -> MutexGuard<'_, ()> {
|
||||
self.rebuild_lock.lock().await
|
||||
}
|
||||
|
||||
pub async fn read_fresh_response(&self) -> Option<PuzzleGalleryCachedResponse> {
|
||||
let guard = self.inner.read().await;
|
||||
let entry = guard.as_ref()?;
|
||||
let now = Instant::now();
|
||||
if now.duration_since(entry.built_at) > PUZZLE_GALLERY_CACHE_TTL {
|
||||
return None;
|
||||
}
|
||||
Some(PuzzleGalleryCachedResponse {
|
||||
data_json: entry.data_json.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn store_response(
|
||||
&self,
|
||||
response: PuzzleGalleryResponse,
|
||||
) -> Result<PuzzleGalleryCachedResponse, serde_json::Error> {
|
||||
let now = Instant::now();
|
||||
let cached = PuzzleGalleryCachedResponse {
|
||||
data_json: Bytes::from(serde_json::to_vec(&response)?),
|
||||
};
|
||||
*self.inner.write().await = Some(PuzzleGalleryCacheEntry {
|
||||
data_json: cached.data_json.clone(),
|
||||
built_at: now,
|
||||
});
|
||||
Ok(cached)
|
||||
}
|
||||
|
||||
pub fn spawn_cleanup_task(&self) {
|
||||
let cache = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = time::interval(PUZZLE_GALLERY_CACHE_CLEANUP_INTERVAL);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
cache.cleanup_idle_entry().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn cleanup_idle_entry(&self) {
|
||||
let mut guard = self.inner.write().await;
|
||||
if let Some(entry) = guard.as_ref()
|
||||
&& Instant::now().duration_since(entry.built_at) > PUZZLE_GALLERY_CACHE_MAX_IDLE
|
||||
{
|
||||
*guard = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_puzzle_gallery_window_response(
|
||||
items: Vec<PuzzleWorkSummaryResponse>,
|
||||
) -> PuzzleGalleryResponse {
|
||||
let total_count = items.len().min(u32::MAX as usize) as u32;
|
||||
let preview_refs = items
|
||||
.iter()
|
||||
.skip(PUZZLE_GALLERY_PRIMARY_ITEM_COUNT)
|
||||
.take(PUZZLE_GALLERY_PREVIEW_REF_COUNT)
|
||||
.map(|item| PuzzleGalleryWorkRefResponse {
|
||||
work_id: item.work_id.clone(),
|
||||
profile_id: item.profile_id.clone(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let next_cursor = items
|
||||
.get(PUZZLE_GALLERY_PRIMARY_ITEM_COUNT + PUZZLE_GALLERY_PREVIEW_REF_COUNT)
|
||||
.map(|item| item.profile_id.clone());
|
||||
let has_more =
|
||||
items.len() > PUZZLE_GALLERY_PRIMARY_ITEM_COUNT + PUZZLE_GALLERY_PREVIEW_REF_COUNT;
|
||||
|
||||
PuzzleGalleryResponse {
|
||||
items: items
|
||||
.into_iter()
|
||||
.take(PUZZLE_GALLERY_PRIMARY_ITEM_COUNT)
|
||||
.collect(),
|
||||
preview_refs,
|
||||
has_more,
|
||||
next_cursor,
|
||||
total_count,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn puzzle_gallery_cached_json(
|
||||
request_context: &RequestContext,
|
||||
response: PuzzleGalleryCachedResponse,
|
||||
) -> Response {
|
||||
json_success_data_bytes_response(Some(request_context), response.data_json)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn build_summary(index: usize) -> PuzzleWorkSummaryResponse {
|
||||
PuzzleWorkSummaryResponse {
|
||||
work_id: format!("work-{index}"),
|
||||
profile_id: format!("profile-{index}"),
|
||||
owner_user_id: "user-1".to_string(),
|
||||
source_session_id: None,
|
||||
author_display_name: "作者".to_string(),
|
||||
work_title: format!("作品 {index}"),
|
||||
work_description: "描述".to_string(),
|
||||
level_name: "第一关".to_string(),
|
||||
summary: "摘要".to_string(),
|
||||
theme_tags: Vec::new(),
|
||||
cover_image_src: None,
|
||||
cover_asset_id: None,
|
||||
publication_status: "published".to_string(),
|
||||
updated_at: "2026-05-01T00:00:00Z".to_string(),
|
||||
published_at: None,
|
||||
play_count: 0,
|
||||
remix_count: 0,
|
||||
like_count: 0,
|
||||
recent_play_count_7d: 0,
|
||||
point_incentive_total_half_points: 0,
|
||||
point_incentive_claimed_points: 0,
|
||||
point_incentive_total_points: 0.0,
|
||||
point_incentive_claimable_points: 0,
|
||||
publish_ready: true,
|
||||
generation_status: Some("ready".to_string()),
|
||||
levels: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_window_returns_primary_cards_preview_refs_and_cursor() {
|
||||
let response =
|
||||
build_puzzle_gallery_window_response((0..25).map(build_summary).collect::<Vec<_>>());
|
||||
|
||||
assert_eq!(response.total_count, 25);
|
||||
assert_eq!(response.items.len(), 10);
|
||||
assert_eq!(response.preview_refs.len(), 10);
|
||||
assert_eq!(response.items[0].profile_id, "profile-0");
|
||||
assert_eq!(response.items[9].profile_id, "profile-9");
|
||||
assert_eq!(response.preview_refs[0].profile_id, "profile-10");
|
||||
assert_eq!(response.preview_refs[9].profile_id, "profile-19");
|
||||
assert!(response.has_more);
|
||||
assert_eq!(response.next_cursor.as_deref(), Some("profile-20"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_window_handles_short_gallery_without_more_cursor() {
|
||||
let response =
|
||||
build_puzzle_gallery_window_response((0..8).map(build_summary).collect::<Vec<_>>());
|
||||
|
||||
assert_eq!(response.total_count, 8);
|
||||
assert_eq!(response.items.len(), 8);
|
||||
assert!(response.preview_refs.is_empty());
|
||||
assert!(!response.has_more);
|
||||
assert_eq!(response.next_cursor, None);
|
||||
}
|
||||
}
|
||||
@@ -27,20 +27,25 @@ use shared_contracts::creation_entry_config::CreationEntryConfigResponse;
|
||||
use shared_contracts::creative_agent::CreativeAgentSessionSnapshot;
|
||||
use spacetime_client::{SpacetimeClient, SpacetimeClientConfig, SpacetimeClientError};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::config::AppConfig;
|
||||
use crate::puzzle_gallery_cache::PuzzleGalleryCache;
|
||||
use crate::wechat_pay::{WechatPayClient, map_wechat_pay_init_error};
|
||||
use crate::wechat_provider::build_wechat_provider;
|
||||
|
||||
const ADMIN_ROLE: &str = "admin";
|
||||
|
||||
pub type HttpRequestPermitPool = Semaphore;
|
||||
|
||||
// 当前阶段先保留最小共享状态壳,后续逐步接入配置、客户端与平台适配。
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AppState {
|
||||
// 配置会在后续中间件、路由和平台适配接入时逐步消费。
|
||||
#[allow(dead_code)]
|
||||
pub config: AppConfig,
|
||||
http_request_permit_pool: Option<Arc<HttpRequestPermitPool>>,
|
||||
auth_jwt_config: JwtConfig,
|
||||
admin_runtime: Option<AdminRuntime>,
|
||||
refresh_cookie_config: RefreshCookieConfig,
|
||||
@@ -60,6 +65,7 @@ pub struct AppState {
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
ai_task_service: AiTaskService,
|
||||
spacetime_client: SpacetimeClient,
|
||||
puzzle_gallery_cache: PuzzleGalleryCache,
|
||||
llm_client: Option<LlmClient>,
|
||||
creative_agent_gpt5_client: Option<LlmClient>,
|
||||
creative_agent_executor: Arc<MockLangChainRustAgentExecutor>,
|
||||
@@ -192,9 +198,14 @@ impl AppState {
|
||||
});
|
||||
let llm_client = build_llm_client(&config)?;
|
||||
let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?;
|
||||
let http_request_permit_pool = config
|
||||
.max_concurrent_requests
|
||||
.map(HttpRequestPermitPool::new)
|
||||
.map(Arc::new);
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
http_request_permit_pool,
|
||||
auth_jwt_config,
|
||||
admin_runtime,
|
||||
refresh_cookie_config,
|
||||
@@ -214,6 +225,7 @@ impl AppState {
|
||||
wechat_pay_client,
|
||||
ai_task_service,
|
||||
spacetime_client,
|
||||
puzzle_gallery_cache: PuzzleGalleryCache::new(),
|
||||
llm_client,
|
||||
creative_agent_gpt5_client,
|
||||
creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor),
|
||||
@@ -235,6 +247,10 @@ impl AppState {
|
||||
&self.refresh_cookie_config
|
||||
}
|
||||
|
||||
pub fn http_request_permit_pool(&self) -> Option<Arc<HttpRequestPermitPool>> {
|
||||
self.http_request_permit_pool.clone()
|
||||
}
|
||||
|
||||
pub async fn upsert_creation_entry_type_config(
|
||||
&self,
|
||||
input: module_runtime::CreationEntryTypeAdminUpsertInput,
|
||||
@@ -464,6 +480,10 @@ impl AppState {
|
||||
&self.spacetime_client
|
||||
}
|
||||
|
||||
pub fn puzzle_gallery_cache(&self) -> &PuzzleGalleryCache {
|
||||
&self.puzzle_gallery_cache
|
||||
}
|
||||
|
||||
pub fn llm_client(&self) -> Option<&LlmClient> {
|
||||
self.llm_client.as_ref()
|
||||
}
|
||||
|
||||
@@ -4,11 +4,19 @@ use axum::{
|
||||
http::{HeaderMap, Request, Response},
|
||||
middleware::Next,
|
||||
};
|
||||
use http_body_util::BodyExt;
|
||||
use opentelemetry::{KeyValue, global, metrics::Counter};
|
||||
use std::sync::{
|
||||
Arc, OnceLock,
|
||||
atomic::{AtomicI64, Ordering},
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{request_context::resolve_request_id, state::AppState};
|
||||
|
||||
static HTTP_RESPONSE_BODY_IN_FLIGHT: AtomicI64 = AtomicI64::new(0);
|
||||
static HTTP_REQUEST_PERMITS_AVAILABLE: OnceLock<Arc<AtomicI64>> = OnceLock::new();
|
||||
|
||||
// 集中维护 api-server HTTP 观测,避免在 handler 中散落高基数字段或重复创建 instrument。
|
||||
pub async fn record_http_observability(
|
||||
State(state): State<AppState>,
|
||||
@@ -67,7 +75,46 @@ pub async fn record_http_observability(
|
||||
);
|
||||
}
|
||||
|
||||
response
|
||||
track_response_body_in_flight(response)
|
||||
}
|
||||
|
||||
pub(crate) fn update_http_request_permits_available(available: usize) {
|
||||
let gauge = HTTP_REQUEST_PERMITS_AVAILABLE.get_or_init(|| {
|
||||
let gauge = Arc::new(AtomicI64::new(0));
|
||||
register_http_request_permits_available_metric(gauge.clone());
|
||||
gauge
|
||||
});
|
||||
gauge.store(available.min(i64::MAX as usize) as i64, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn record_puzzle_gallery_cache_hit() {
|
||||
puzzle_gallery_cache_metrics().hits.add(1, &[]);
|
||||
}
|
||||
|
||||
pub(crate) fn record_puzzle_gallery_cache_miss() {
|
||||
puzzle_gallery_cache_metrics().misses.add(1, &[]);
|
||||
}
|
||||
|
||||
pub(crate) fn record_puzzle_gallery_cache_rebuild(duration: std::time::Duration, data_bytes: usize) {
|
||||
let metrics = puzzle_gallery_cache_metrics();
|
||||
metrics.rebuilds.add(1, &[]);
|
||||
metrics
|
||||
.rebuild_duration
|
||||
.record(duration.as_secs_f64(), &[]);
|
||||
metrics
|
||||
.data_json_bytes
|
||||
.record(data_bytes.min(u64::MAX as usize) as u64, &[]);
|
||||
}
|
||||
|
||||
fn track_response_body_in_flight(response: Response<Body>) -> Response<Body> {
|
||||
response.map(|body| {
|
||||
HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_add(1, Ordering::Relaxed);
|
||||
let guard = ResponseBodyInFlightGuard;
|
||||
Body::new(body.map_frame(move |frame| {
|
||||
let _guard = &guard;
|
||||
frame
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
struct HttpMetrics {
|
||||
@@ -76,6 +123,22 @@ struct HttpMetrics {
|
||||
duration: opentelemetry::metrics::Histogram<f64>,
|
||||
}
|
||||
|
||||
struct PuzzleGalleryCacheMetrics {
|
||||
hits: Counter<u64>,
|
||||
misses: Counter<u64>,
|
||||
rebuilds: Counter<u64>,
|
||||
rebuild_duration: opentelemetry::metrics::Histogram<f64>,
|
||||
data_json_bytes: opentelemetry::metrics::Histogram<u64>,
|
||||
}
|
||||
|
||||
struct ResponseBodyInFlightGuard;
|
||||
|
||||
impl Drop for ResponseBodyInFlightGuard {
|
||||
fn drop(&mut self) {
|
||||
HTTP_RESPONSE_BODY_IN_FLIGHT.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
fn http_metrics() -> &'static HttpMetrics {
|
||||
static METRICS: std::sync::OnceLock<HttpMetrics> = std::sync::OnceLock::new();
|
||||
METRICS.get_or_init(|| {
|
||||
@@ -99,6 +162,64 @@ fn http_metrics() -> &'static HttpMetrics {
|
||||
})
|
||||
}
|
||||
|
||||
fn puzzle_gallery_cache_metrics() -> &'static PuzzleGalleryCacheMetrics {
|
||||
static METRICS: std::sync::OnceLock<PuzzleGalleryCacheMetrics> = std::sync::OnceLock::new();
|
||||
METRICS.get_or_init(|| {
|
||||
let meter = global::meter("genarrative-api");
|
||||
PuzzleGalleryCacheMetrics {
|
||||
hits: meter
|
||||
.u64_counter("genarrative.puzzle_gallery.cache.hits")
|
||||
.with_description("Puzzle gallery response cache hits")
|
||||
.build(),
|
||||
misses: meter
|
||||
.u64_counter("genarrative.puzzle_gallery.cache.misses")
|
||||
.with_description("Puzzle gallery response cache misses")
|
||||
.build(),
|
||||
rebuilds: meter
|
||||
.u64_counter("genarrative.puzzle_gallery.cache.rebuilds")
|
||||
.with_description("Puzzle gallery response cache rebuild count")
|
||||
.build(),
|
||||
rebuild_duration: meter
|
||||
.f64_histogram("genarrative.puzzle_gallery.cache.rebuild.duration")
|
||||
.with_unit("s")
|
||||
.with_description("Puzzle gallery response cache rebuild duration")
|
||||
.build(),
|
||||
data_json_bytes: meter
|
||||
.u64_histogram("genarrative.puzzle_gallery.cache.data_json_bytes")
|
||||
.with_unit("By")
|
||||
.with_description("Serialized puzzle gallery data JSON size")
|
||||
.build(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn register_http_request_permits_available_metric(gauge: Arc<AtomicI64>) {
|
||||
let meter = global::meter("genarrative-api");
|
||||
meter
|
||||
.i64_observable_up_down_counter("genarrative.http.server.request_permits.available")
|
||||
.with_unit("{permit}")
|
||||
.with_description("Available api-server HTTP backpressure permits")
|
||||
.with_callback(move |observer| {
|
||||
observer.observe(gauge.load(Ordering::Relaxed), &[]);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
pub(crate) fn register_http_runtime_metrics() {
|
||||
static REGISTERED: OnceLock<()> = OnceLock::new();
|
||||
REGISTERED.get_or_init(|| {
|
||||
let meter = global::meter("genarrative-api");
|
||||
meter
|
||||
.i64_observable_up_down_counter("genarrative.http.server.response_bodies.in_flight")
|
||||
.with_unit("{response}")
|
||||
.with_description("HTTP response bodies still owned by Axum/Hyper")
|
||||
.with_callback(|observer| {
|
||||
observer.observe(HTTP_RESPONSE_BODY_IN_FLIGHT.load(Ordering::Relaxed), &[]);
|
||||
})
|
||||
.build();
|
||||
});
|
||||
}
|
||||
|
||||
fn http_base_labels(method: String, route: String) -> Vec<KeyValue> {
|
||||
vec![
|
||||
KeyValue::new("http.request.method", method),
|
||||
|
||||
Reference in New Issue
Block a user