diff --git a/.hermes/shared-memory/development-workflow.md b/.hermes/shared-memory/development-workflow.md index 9b077958..f1268ce4 100644 --- a/.hermes/shared-memory/development-workflow.md +++ b/.hermes/shared-memory/development-workflow.md @@ -198,7 +198,7 @@ npm run check:server-rs-ddd ## 生产压测与观测默认口径 - 作品列表 50 HTTP req/s 压测使用 `scripts/loadtest/README.md` 中的 K6 命令;当前脚本一次 iteration 请求两个公开列表接口,因此目标 50 HTTP req/s 对应 `PEAK_RPS=25`。 -- 生产 `api-server` 默认 backlog、worker threads、systemd 限制、Nginx upstream timing log 和 OTLP 开关以 `docs/【开发运维】本地开发验证与生产运维-2026-05-15.md` 为准。 +- 生产 `api-server` 默认 backlog、worker threads、HTTP 并发背压、systemd 限制、Nginx upstream timing log 和 OTLP 开关以 `docs/【开发运维】本地开发验证与生产运维-2026-05-15.md` 为准。 - OpenTelemetry 现阶段可选发送 traces / metrics / logs,但不会取代本地 `journalctl -u genarrative-api.service`、`logs/api-server/` 与 `/var/log/nginx/genarrative.*.log`。 - 指标 label 不写 raw URI、userId、profileId 或 request_id;request_id 只用于 trace/log 串联。 diff --git a/deploy/env/api-server.env.example b/deploy/env/api-server.env.example index 1d727052..dbf49936 100644 --- a/deploy/env/api-server.env.example +++ b/deploy/env/api-server.env.example @@ -7,6 +7,7 @@ GENARRATIVE_API_PORT=8082 GENARRATIVE_API_LOG=info,tower_http=info GENARRATIVE_API_LISTEN_BACKLOG=1024 GENARRATIVE_API_WORKER_THREADS=4 +GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=512 GENARRATIVE_OTEL_ENABLED=false OTEL_SERVICE_NAME=genarrative-api OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 diff --git a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md index f15f9b7e..d39d2529 100644 --- a/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md +++ b/docs/【开发运维】本地开发验证与生产运维-2026-05-15.md @@ -154,6 +154,7 @@ Jenkins 按 web / api / Spacetime module / build / deploy / publish 拆分 50 HTTP req/s 首版压测优化口径: - `api-server` 生产模板默认 `GENARRATIVE_API_LISTEN_BACKLOG=1024`、`GENARRATIVE_API_WORKER_THREADS=4`;本地未设置 worker threads 时继续使用 Tokio 默认值。 +- `GENARRATIVE_API_MAX_CONCURRENT_REQUESTS=512` 开启应用内 HTTP 并发背压,超过并发许可时直接返回 `429 Too Many Requests` 和 `Retry-After: 1`,`/healthz` 不受该限制。该值不是 RPS 限速;如果压测中 429 上升但内存和 p95 收敛,说明背压正在保护进程,需要结合真实容量调阈值或在 Nginx 前置限流。直连 `api-server` 的极高 RPS 压测若出现 `connection refused`,通常已经打到 TCP 监听 / accept 层,应同时检查 backlog、Nginx upstream keepalive 和前置限流。 - `genarrative-api.service` 设置 `LimitNOFILE=65535`、`TasksMax=2048`;上线后用 `systemctl show genarrative-api.service -p LimitNOFILE -p TasksMax` 和 `cat /proc/$(pidof api-server)/limits` 核对。 - Nginx `/api/` 与 `/admin/api/` 通过 `genarrative_api` upstream 代理到 `127.0.0.1:8082`,upstream keepalive 为 64;压测时看 `/var/log/nginx/genarrative.access.log` 中的 `request_time`、`upstream_connect_time`、`upstream_header_time`、`upstream_response_time`、`upstream_status`、`request_id`。 - 作品列表 K6 脚本一次 iteration 默认请求两个公开接口,因此约 50 HTTP req/s 的目标命令使用 `SCENARIO=spike START_RPS=5 PEAK_RPS=25 HOLD=60s END_RPS=5 DETAIL_RATIO=0 npm run loadtest:k6:works`。 diff --git a/scripts/loadtest/README.md b/scripts/loadtest/README.md index be788df5..9ae8dc31 100644 --- a/scripts/loadtest/README.md +++ b/scripts/loadtest/README.md @@ -226,6 +226,8 @@ npm run loadtest:k6:works ## 排障 - 如果公开 gallery 返回 `creation_entry_disabled` 或 503,检查本地 creation entry 配置是否禁用了对应入口。 +- 如果高压下返回 429,优先确认目标环境是否设置了 `GENARRATIVE_API_MAX_CONCURRENT_REQUESTS`。429 表示 api-server 应用层背压已生效,不等同于业务错误;继续看内存、p95、`http_req_failed` 和 OTLP / Nginx timing 判断阈值是否偏低。 +- 如果直连 `api-server` 压测出现 `connection refused` 或 status 0,说明压力已经打到 TCP 监听 / accept 层;此时同时检查 `GENARRATIVE_API_LISTEN_BACKLOG`、Nginx upstream keepalive 和是否需要在 Nginx 前置限流,不能只靠应用层背压解释。 - 如果个人作品列表返回 401,确认 `AUTH_TOKEN` 是当前 api-server 可识别的 access token。 - 如果详情全部 404,确认是否已向目标环境导入与 `WORKS_DATA` 一致的数据。 @@ -315,6 +317,7 @@ Rider 的 Logs 面板展示的是 OTLP log event 自身字段,不会自动把 ```bash systemctl show genarrative-api.service -p LimitNOFILE -p TasksMax cat /proc/$(pidof api-server)/limits +tr '\0' '\n' < /proc/$(pidof api-server)/environ | grep GENARRATIVE_API_MAX_CONCURRENT_REQUESTS ss -ltnp | grep 8082 curl -sS http://127.0.0.1:8082/healthz ``` diff --git a/scripts/loadtest/data/works-list.sample.from-migration-1.json b/scripts/loadtest/data/works-list.sample.from-migration-1.json new file mode 100644 index 00000000..0a8b9def --- /dev/null +++ b/scripts/loadtest/data/works-list.sample.from-migration-1.json @@ -0,0 +1,218 @@ +{ + "source": "spacetime-migration-1.json", + "generatedAt": "2026-05-16T13:35:40.282Z", + "counts": { + "puzzle_work_profile": 3, + "custom_world_profile": 1, + "match3d_work_profile": 0, + "square_hole_work_profile": 0, + "visual_novel_work_profile": 0 + }, + "tables": { + "puzzle_work_profile": [ + { + "profile_id": "profile-001", + "work_id": "work-001", + "owner_user_id": "user-001", + "author_display_name": "author-001", + "cover_asset_id": "asset-001", + "cover_image_src": "/generated-puzzle-assets/puzzle-session-f38101d7277040fcb6fbc41fea8b714a/puzzle-session-f38101d7277040fcb6fbc41fea8b714a-candidate-2/asset-1777649330373133/image.png", + "work_title": "化学家", + "level_name": "文学家", + "summary": "几个文学家正站在山上面对着瀑布侃侃而谈", + "work_description": "一个穿着白大褂的化学家正在做酷炫的化学实验,背景是化学实验室", + "levels_json": "[{\"level_id\":\"puzzle-level-1777649242577-7\",\"level_name\":\"文学家\",\"picture_description\":\"几个文学家正站在山上面对着瀑布侃侃而谈\",\"candidates\":[{\"candidate_id\":\"puzzle-session-f38101d7277040fcb6fbc41fea8b714a-candidate-2\",\"image_src\":\"/generated-puzzle-assets/puzzle-session-f38101d7277040fcb6fbc41fea8b714a/puzzle-session-f38101d7277040fcb6fbc41fea8b714a-candidate-2/asset-1777649330373133/image.png\",\"asset_id\":\"asset-1777649330373133\",\"prompt\":\"几个文学家正站在山上面对着瀑布侃侃而谈\",\"actual_prompt\":\"请生成一张高清插画。画面主体:几个文学家正站在山上面对着瀑布侃侃而谈。画面…", + "anchor_pack_json": "{\"theme_promise\":{\"key\":\"themePromise\",\"label\":\"题材承诺\",\"value\":\"化学家\",\"status\":\"Locked\"},\"visual_subject\":{\"key\":\"visualSubject\",\"label\":\"画面主体\",\"value\":\"一个穿着白大褂的化学家正在做酷炫的化学实验,背景是化学实验室\",\"status\":\"Locked\"},\"visual_mood\":{\"key\":\"visualMood\",\"label\":\"视觉气质\",\"value\":\"清晰、适合拼图切块\",\"status\":\"Inferred\"},\"composition_hooks\":{\"key\":\"compositionHooks\",\"label\":\"拼图记忆点\",\"value\":\"主体轮廓、色块分区、局部细节\",\"status\":\"Inferred\"},\"tags_and_forbidden\":{\"key\":\"tagsAndForbidden\",\"label\":\"标签与禁忌\",\"value\":\"化学家、拼图、插画;禁止标题字\",\"status\":\"I…", + "theme_tags_json": "[\"化学家\",\"拼图\",\"插画\",\"禁止标题字\"]", + "publication_status": { + "Published": [] + }, + "play_count": 1, + "like_count": 0, + "remix_count": 1, + "updated_at": { + "__timestamp_micros_since_unix_epoch__": 1777703338322544 + }, + "created_at": { + "__timestamp_micros_since_unix_epoch__": 1777648804043558 + }, + "published_at": { + "__timestamp_micros_since_unix_epoch__": 1777649364112270 + } + }, + { + "profile_id": "profile-002", + "work_id": "work-002", + "owner_user_id": "user-002", + "author_display_name": "author-002", + "work_title": "我不知道", + "level_name": "", + "summary": "你猜我是谁", + "work_description": "你猜我是谁", + "levels_json": "[{\"level_id\":\"puzzle-level-1\",\"level_name\":\"\",\"picture_description\":\"真不知道\",\"candidates\":[],\"selected_candidate_id\":null,\"cover_image_src\":null,\"cover_asset_id\":null,\"generation_status\":\"idle\"}]", + "anchor_pack_json": "{\"theme_promise\":{\"key\":\"themePromise\",\"label\":\"题材承诺\",\"value\":\"我不知道\",\"status\":\"Locked\"},\"visual_subject\":{\"key\":\"visualSubject\",\"label\":\"画面主体\",\"value\":\"真不知道\",\"status\":\"Locked\"},\"visual_mood\":{\"key\":\"visualMood\",\"label\":\"视觉气质\",\"value\":\"清晰、适合拼图切块\",\"status\":\"Inferred\"},\"composition_hooks\":{\"key\":\"compositionHooks\",\"label\":\"拼图记忆点\",\"value\":\"主体轮廓、色块分区、局部细节\",\"status\":\"Inferred\"},\"tags_and_forbidden\":{\"key\":\"tagsAndForbidden\",\"label\":\"标签与禁忌\",\"value\":\"我不知道、拼图、插画;禁止标题字\",\"status\":\"Inferred\"}}", + "theme_tags_json": "[\"我不知道\"]", + "publication_status": { + "Draft": [] + }, + "play_count": 0, + "like_count": 0, + "remix_count": 0, + "updated_at": { + "__timestamp_micros_since_unix_epoch__": 1777619351714201 + }, + "created_at": { + "__timestamp_micros_since_unix_epoch__": 1777619336673245 + } + }, + { + "profile_id": "profile-003", + "work_id": "work-003", + "owner_user_id": "user-003", + "author_display_name": "author-002", + "work_title": "", + "level_name": "", + "summary": "", + "work_description": "", + "levels_json": "[{\"level_id\":\"puzzle-level-1\",\"level_name\":\"\",\"picture_description\":\"\",\"candidates\":[],\"selected_candidate_id\":null,\"cover_image_src\":null,\"cover_asset_id\":null,\"generation_status\":\"idle\"}]", + "anchor_pack_json": "{\"theme_promise\":{\"key\":\"themePromise\",\"label\":\"题材承诺\",\"value\":\"\",\"status\":\"Missing\"},\"visual_subject\":{\"key\":\"visualSubject\",\"label\":\"画面主体\",\"value\":\"\",\"status\":\"Missing\"},\"visual_mood\":{\"key\":\"visualMood\",\"label\":\"视觉气质\",\"value\":\"\",\"status\":\"Missing\"},\"composition_hooks\":{\"key\":\"compositionHooks\",\"label\":\"拼图记忆点\",\"value\":\"\",\"status\":\"Missing\"},\"tags_and_forbidden\":{\"key\":\"tagsAndForbidden\",\"label\":\"标签与禁忌\",\"value\":\"\",\"status\":\"Missing\"}}", + "theme_tags_json": "[\"拼图\",\"插画\",\"清晰构图\"]", + "publication_status": { + "Draft": [] + }, + "play_count": 0, + "like_count": 0, + "remix_count": 0, + "updated_at": { + "__timestamp_micros_since_unix_epoch__": 1777622285252380 + }, + "created_at": { + "__timestamp_micros_since_unix_epoch__": 1777622285252380 + } + } + ], + "custom_world_profile": [ + { + "profile_id": "profile-081", + "owner_user_id": "user-002", + "author_display_name": "author-012", + "author_public_user_code": "author-code-001", + "world_name": "青春飞扬校园", + "summary_text": "在现代校园中,玩家摆脱内卷,追求真实成长", + "subtitle": "反内卷的自由学习之旅", + "profile_payload_json": "{\"anchorContent\":null,\"anchorPack\":null,\"attributeSchema\":{\"generatedFrom\":{\"conflictCore\":\"与传统教育模式的冲突\",\"settingSummary\":\"在现代校园中,玩家摆脱内卷,追求真实成长\",\"tone\":\"积极向上,充满活力与创新\",\"worldName\":\"青春飞扬校园\",\"worldType\":\"CUSTOM\"},\"id\":\"schema:rpg-agent:1e15b44d:v1\",\"schemaVersion\":1,\"slots\":[{\"name\":\"知识储备\",\"slotId\":\"axis_a\"},{\"name\":\"创新思维\",\"slotId\":\"axis_b\"},{\"name\":\"社交能力\",\"slotId\":\"axis_c\"},{\"name\":\"抗压能力\",\"slotId\":\"axis_d\"},{\"name\":\"自我认知\",\"slotId\":\"axis_e\"},{\"name\":\"团队协作\",\"slotId\":\"axis_f\"}],\"worldId\":\"custom:青春飞扬校…", + "publication_status": { + "Draft": [] + }, + "play_count": 0, + "like_count": 0, + "remix_count": 0, + "updated_at": { + "__timestamp_micros_since_unix_epoch__": 1777532006629209 + }, + "created_at": { + "__timestamp_micros_since_unix_epoch__": 1777531745887256 + } + } + ], + "match3d_work_profile": [], + "square_hole_work_profile": [], + "visual_novel_work_profile": [] + }, + "profileIds": { + "puzzle": [ + "profile-001", + "profile-002", + "profile-003" + ], + "customWorld": [ + "profile-081" + ], + "match3d": [], + "squareHole": [], + "bigFish": [], + "visualNovel": [] + }, + "workIds": { + "puzzle": [ + "work-001", + "work-002", + "work-003" + ], + "customWorld": [], + "match3d": [], + "squareHole": [], + "bigFish": [], + "visualNovel": [] + }, + "normalizedWorks": [ + { + "type": "puzzle", + "workId": "work-001", + "profileId": "profile-001", + "ownerUserId": "user-001", + "title": "化学家", + "subtitle": "几个文学家正站在山上面对着瀑布侃侃而谈", + "publicationStatus": { + "Published": [] + }, + "playCount": 1, + "likeCount": 0, + "remixCount": 1, + "coverImageSrc": "/generated-puzzle-assets/puzzle-session-f38101d7277040fcb6fbc41fea8b714a/puzzle-session-f38101d7277040fcb6fbc41fea8b714a-candidate-2/asset-1777649330373133/image.png", + "updatedAt": { + "__timestamp_micros_since_unix_epoch__": 1777703338322544 + } + }, + { + "type": "puzzle", + "workId": "work-002", + "profileId": "profile-002", + "ownerUserId": "user-002", + "title": "我不知道", + "subtitle": "你猜我是谁", + "publicationStatus": { + "Draft": [] + }, + "playCount": 0, + "likeCount": 0, + "remixCount": 0, + "updatedAt": { + "__timestamp_micros_since_unix_epoch__": 1777619351714201 + } + }, + { + "type": "puzzle", + "workId": "work-003", + "profileId": "profile-003", + "ownerUserId": "user-003", + "title": "", + "subtitle": "", + "publicationStatus": { + "Draft": [] + }, + "playCount": 0, + "likeCount": 0, + "remixCount": 0, + "updatedAt": { + "__timestamp_micros_since_unix_epoch__": 1777622285252380 + } + }, + { + "type": "customWorld", + "profileId": "profile-081", + "ownerUserId": "user-002", + "title": "青春飞扬校园", + "subtitle": "反内卷的自由学习之旅", + "publicationStatus": { + "Draft": [] + }, + "playCount": 0, + "likeCount": 0, + "remixCount": 0, + "updatedAt": { + "__timestamp_micros_since_unix_epoch__": 1777532006629209 + } + } + ] +} diff --git a/server-rs/crates/api-server/Cargo.toml b/server-rs/crates/api-server/Cargo.toml index 92b07599..127c256f 100644 --- a/server-rs/crates/api-server/Cargo.toml +++ b/server-rs/crates/api-server/Cargo.toml @@ -11,6 +11,7 @@ base64 = { workspace = true } bytes = { workspace = true } dotenvy = { workspace = true } image = { workspace = true, features = ["jpeg", "png", "webp"] } +http-body-util = { workspace = true } reqwest = { workspace = true, features = ["json", "multipart", "rustls-tls"] } webp = { workspace = true } module-ai = { workspace = true } @@ -45,7 +46,7 @@ shared-kernel = { workspace = true } shared-logging = { workspace = true } socket2 = { workspace = true } spacetime-client = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "net", "time", "sync"] } tokio-stream = { workspace = true } futures-util = { workspace = true } time = { workspace = true, features = ["formatting"] } diff --git a/server-rs/crates/api-server/src/app.rs b/server-rs/crates/api-server/src/app.rs index 70cda406..ec886eb2 100644 --- a/server-rs/crates/api-server/src/app.rs +++ b/server-rs/crates/api-server/src/app.rs @@ -15,6 +15,7 @@ use tracing::{Level, Span, error, info_span}; use crate::{ auth::{AuthenticatedAccessToken, require_bearer_auth}, + backpressure::limit_concurrent_requests, creation_entry_config::require_creation_entry_route_enabled, error_middleware::normalize_error_response, modules, @@ -76,6 +77,11 @@ pub fn build_router(state: AppState) -> Router { state.clone(), require_creation_entry_route_enabled, )) + // HTTP 背压在业务路由外侧快拒绝,避免过载请求继续占用 SpacetimeDB facade 与业务执行资源。 + .layer(middleware::from_fn_with_state( + state.clone(), + limit_concurrent_requests, + )) // 错误归一化层放在 tracing 里侧,让 tracing 记录到最终对外返回的状态与错误体形态。 .layer(middleware::from_fn(normalize_error_response)) // 响应头回写放在错误归一化外侧,确保最终写回的是归一化后的最终响应。 diff --git a/server-rs/crates/api-server/src/backpressure.rs b/server-rs/crates/api-server/src/backpressure.rs new file mode 100644 index 00000000..4b310c56 --- /dev/null +++ b/server-rs/crates/api-server/src/backpressure.rs @@ -0,0 +1,221 @@ +use std::sync::Arc; + +use axum::{ + body::Body, + extract::{Request, State}, + http::{HeaderValue, StatusCode, header::RETRY_AFTER}, + middleware::Next, + response::Response, +}; +use http_body_util::BodyExt; +use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; + +use crate::{ + http_error::AppError, + request_context::RequestContext, + state::{AppState, HttpRequestPermitPool}, +}; + +pub async fn limit_concurrent_requests( + State(state): State, + request: Request, + next: Next, +) -> Response { + if should_bypass_backpressure(&request) { + return next.run(request).await; + } + + let Some(permit_pool) = state.http_request_permit_pool() else { + return next.run(request).await; + }; + + match acquire_http_request_permit(permit_pool) { + Ok(permit) => hold_permit_until_response_body_dropped(next.run(request).await, permit), + Err(_) => reject_overloaded_request(&request), + } +} + +fn acquire_http_request_permit( + permit_pool: Arc, +) -> Result { + permit_pool.try_acquire_owned() +} + +fn hold_permit_until_response_body_dropped( + response: Response, + permit: OwnedSemaphorePermit, +) -> Response { + response.map(|body| { + Body::new(body.map_frame(move |frame| { + let _permit_guard = &permit; + frame + })) + }) +} + +fn reject_overloaded_request(request: &Request) -> Response { + let request_context = request.extensions().get::().cloned(); + let mut response = AppError::from_status(StatusCode::TOO_MANY_REQUESTS) + .with_message("服务繁忙,请稍后重试") + .into_response_with_context(request_context.as_ref()); + response + .headers_mut() + .insert(RETRY_AFTER, HeaderValue::from_static("1")); + response +} + +fn should_bypass_backpressure(request: &Request) -> bool { + request.uri().path() == "/healthz" +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use axum::{ + Router, + body::Body, + extract::Extension, + http::{Request, StatusCode, header::RETRY_AFTER}, + middleware, + routing::get, + }; + use tokio::sync::Notify; + use tower::ServiceExt; + + use crate::{config::AppConfig, state::AppState}; + + use super::limit_concurrent_requests; + + #[derive(Clone)] + struct HeldRequestGate { + entered: Arc, + release: Arc, + } + + async fn held_request(Extension(gate): Extension) -> &'static str { + gate.entered.notify_one(); + gate.release.notified().await; + "ok" + } + + async fn fast_request() -> &'static str { + "ok" + } + + fn test_request(path: &str) -> Request { + Request::builder() + .uri(path) + .body(Body::empty()) + .expect("test request should build") + } + + fn build_test_app(max_concurrent_requests: usize, gate: HeldRequestGate) -> Router { + let mut config = AppConfig::default(); + config.max_concurrent_requests = Some(max_concurrent_requests); + let state = AppState::new(config).expect("state should build"); + + Router::new() + .route("/held", get(held_request)) + .route("/fast", get(fast_request)) + .route("/healthz", get(fast_request)) + .layer(middleware::from_fn_with_state( + state.clone(), + limit_concurrent_requests, + )) + .layer(Extension(gate)) + .with_state(state) + } + + #[tokio::test] + async fn returns_429_when_concurrency_permits_are_exhausted() { + let gate = HeldRequestGate { + entered: Arc::new(Notify::new()), + release: Arc::new(Notify::new()), + }; + let app = build_test_app(1, gate.clone()); + let entered = gate.entered.notified(); + + let held_response = tokio::spawn(app.clone().oneshot(test_request("/held"))); + entered.await; + + let rejected_response = app + .clone() + .oneshot(test_request("/fast")) + .await + .expect("rejected request should complete"); + assert_eq!(rejected_response.status(), StatusCode::TOO_MANY_REQUESTS); + assert_eq!( + rejected_response + .headers() + .get(RETRY_AFTER) + .and_then(|value| value.to_str().ok()), + Some("1") + ); + + gate.release.notify_one(); + let completed_response = held_response + .await + .expect("held request task should join") + .expect("held request should complete"); + assert_eq!(completed_response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn healthz_bypasses_concurrency_backpressure() { + let gate = HeldRequestGate { + entered: Arc::new(Notify::new()), + release: Arc::new(Notify::new()), + }; + let app = build_test_app(1, gate.clone()); + let entered = gate.entered.notified(); + + let held_response = tokio::spawn(app.clone().oneshot(test_request("/held"))); + entered.await; + + let health_response = app + .clone() + .oneshot(test_request("/healthz")) + .await + .expect("healthz request should complete"); + assert_eq!(health_response.status(), StatusCode::OK); + + gate.release.notify_one(); + let completed_response = held_response + .await + .expect("held request task should join") + .expect("held request should complete"); + assert_eq!(completed_response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn permit_is_held_until_response_body_is_dropped() { + let gate = HeldRequestGate { + entered: Arc::new(Notify::new()), + release: Arc::new(Notify::new()), + }; + let app = build_test_app(1, gate); + + let first_response = app + .clone() + .oneshot(test_request("/fast")) + .await + .expect("first request should complete"); + assert_eq!(first_response.status(), StatusCode::OK); + + let rejected_response = app + .clone() + .oneshot(test_request("/fast")) + .await + .expect("second request should complete"); + assert_eq!(rejected_response.status(), StatusCode::TOO_MANY_REQUESTS); + + drop(first_response); + + let accepted_response = app + .oneshot(test_request("/fast")) + .await + .expect("third request should complete"); + assert_eq!(accepted_response.status(), StatusCode::OK); + } +} diff --git a/server-rs/crates/api-server/src/config.rs b/server-rs/crates/api-server/src/config.rs index 955664de..890d79d2 100644 --- a/server-rs/crates/api-server/src/config.rs +++ b/server-rs/crates/api-server/src/config.rs @@ -22,6 +22,7 @@ pub struct AppConfig { pub bind_port: u16, pub listen_backlog: i32, pub worker_threads: Option, + pub max_concurrent_requests: Option, pub log_filter: String, pub otel_enabled: bool, pub admin_username: Option, @@ -152,6 +153,7 @@ impl Default for AppConfig { bind_port: 3000, listen_backlog: 1024, worker_threads: None, + max_concurrent_requests: None, log_filter: "info,tower_http=info".to_string(), otel_enabled: false, admin_username: None, @@ -315,6 +317,11 @@ impl AppConfig { if let Some(worker_threads) = read_first_usize_env(&["GENARRATIVE_API_WORKER_THREADS"]) { config.worker_threads = Some(worker_threads); } + if let Some(max_concurrent_requests) = + read_first_usize_env(&["GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"]) + { + config.max_concurrent_requests = Some(max_concurrent_requests); + } if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) { config.otel_enabled = otel_enabled; } @@ -1195,20 +1202,24 @@ mod tests { unsafe { std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG"); std::env::remove_var("GENARRATIVE_API_WORKER_THREADS"); + std::env::remove_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"); std::env::remove_var("GENARRATIVE_OTEL_ENABLED"); std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048"); std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6"); + std::env::set_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS", "128"); std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true"); } let config = AppConfig::from_env(); assert_eq!(config.listen_backlog, 2048); assert_eq!(config.worker_threads, Some(6)); + assert_eq!(config.max_concurrent_requests, Some(128)); assert!(config.otel_enabled); unsafe { std::env::remove_var("GENARRATIVE_API_LISTEN_BACKLOG"); std::env::remove_var("GENARRATIVE_API_WORKER_THREADS"); + std::env::remove_var("GENARRATIVE_API_MAX_CONCURRENT_REQUESTS"); std::env::remove_var("GENARRATIVE_OTEL_ENABLED"); } } diff --git a/server-rs/crates/api-server/src/main.rs b/server-rs/crates/api-server/src/main.rs index d1f15cd9..40e880b3 100644 --- a/server-rs/crates/api-server/src/main.rs +++ b/server-rs/crates/api-server/src/main.rs @@ -13,6 +13,7 @@ mod auth_payload; mod auth_public_user; mod auth_session; mod auth_sessions; +mod backpressure; mod bark_battle; mod big_fish; mod big_fish_agent_turn; diff --git a/server-rs/crates/api-server/src/state.rs b/server-rs/crates/api-server/src/state.rs index ee4b9a7e..64cc88b4 100644 --- a/server-rs/crates/api-server/src/state.rs +++ b/server-rs/crates/api-server/src/state.rs @@ -27,6 +27,7 @@ use shared_contracts::creation_entry_config::CreationEntryConfigResponse; use shared_contracts::creative_agent::CreativeAgentSessionSnapshot; use spacetime_client::{SpacetimeClient, SpacetimeClientConfig, SpacetimeClientError}; use time::OffsetDateTime; +use tokio::sync::Semaphore; use tracing::{info, warn}; use crate::config::AppConfig; @@ -35,12 +36,15 @@ use crate::wechat_provider::build_wechat_provider; const ADMIN_ROLE: &str = "admin"; +pub type HttpRequestPermitPool = Semaphore; + // 当前阶段先保留最小共享状态壳,后续逐步接入配置、客户端与平台适配。 #[derive(Clone, Debug)] pub struct AppState { // 配置会在后续中间件、路由和平台适配接入时逐步消费。 #[allow(dead_code)] pub config: AppConfig, + http_request_permit_pool: Option>, auth_jwt_config: JwtConfig, admin_runtime: Option, refresh_cookie_config: RefreshCookieConfig, @@ -192,9 +196,14 @@ impl AppState { }); let llm_client = build_llm_client(&config)?; let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?; + let http_request_permit_pool = config + .max_concurrent_requests + .map(HttpRequestPermitPool::new) + .map(Arc::new); Ok(Self { config, + http_request_permit_pool, auth_jwt_config, admin_runtime, refresh_cookie_config, @@ -235,6 +244,10 @@ impl AppState { &self.refresh_cookie_config } + pub fn http_request_permit_pool(&self) -> Option> { + self.http_request_permit_pool.clone() + } + pub async fn upsert_creation_entry_type_config( &self, input: module_runtime::CreationEntryTypeAdminUpsertInput,