From 28ba990123619e8cb387b2cfa03f485ebfaeadcd Mon Sep 17 00:00:00 2001 From: kdletters Date: Wed, 22 Apr 2026 14:52:30 +0800 Subject: [PATCH] extract rust sse infrastructure --- ...ER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md | 35 ++++-- server-rs/Cargo.lock | 13 ++ server-rs/crates/api-server/Cargo.toml | 4 +- server-rs/crates/api-server/src/sse.rs | 111 ++++++++++++++++-- 4 files changed, 140 insertions(+), 23 deletions(-) diff --git a/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md b/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md index fe31457a..512569a7 100644 --- a/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md +++ b/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md @@ -6,14 +6,15 @@ 这份文档用于冻结 `server-rs/crates/api-server` 内部的 SSE 基础设施抽取口径。 -本轮目标只有一个: +本轮目标只有两个: 1. 把当前散落在业务 handler 中的 `text/event-stream` 响应头与事件文本编码逻辑,收口为 `api-server` 可复用的 Rust 基础设施。 +2. 在同一基础设施上补出“实时写事件”的 writer 版本,为后续真流式接口预留稳定入口。 本轮不做: 1. 不改前端消费协议 -2. 不把 custom world message stream 改成真实逐段 token streaming +2. 不把 custom world message stream 当场改成真实逐段 token streaming 3. 不引入跨 crate 的共享 `shared-contracts` SSE runtime helper 4. 不同时重构 story / runtime / txt mode 的未来流式接口 @@ -37,7 +38,9 @@ 1. 标准 SSE 响应头构造 2. 单条事件编码 3. 缓冲式 SSE body builder -4. 一次性返回完整 SSE 文本的响应构造 +4. 实时写事件的 SSE writer +5. 一次性返回完整 SSE 文本的响应构造 +6. 基于流 body 的实时 SSE 响应构造 本轮明确不抽: @@ -60,9 +63,17 @@ - 面向当前最小兼容场景 - 内部持有 `String` - 提供 `push_json(event, payload)` 与 `into_response()` -2. `build_sse_response(body)` +2. `SseStreamWriter` + - 面向后续真流式场景 + - 内部持有实时写出的 channel sender + - 提供 `push_json(event, payload)` + - writer 被 drop 后,流自动结束 +3. `new_sse_stream()` + - 返回 `(SseStreamWriter, Response)` + - 业务 handler 可先把 `Response` 返回,再异步持续推送事件 +4. `build_sse_response(body)` - 统一写入标准 SSE 响应头 -3. `encode_sse_event(body, event, payload)` +5. `encode_sse_event(body, event, payload)` - 只负责把事件编码为: ```text event: xxx @@ -89,7 +100,7 @@ ## 6. 与 custom world message stream 的关系 -`POST /api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 仍然保持 Stage 8 文档冻结的最小语义: +`POST /api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 当前仍然保持 Stage 8 文档冻结的最小语义: 1. 业务层先完成 deterministic 写表 2. 读取最新 session snapshot @@ -102,17 +113,19 @@ 1. 事件编码和响应头不再手写在 `custom_world.rs` 2. 改由 `sse.rs` 基础设施承接 +3. 同时补出实时 writer 版本,但当前不强制业务路由马上切换 ## 7. 验收标准 当以下条件满足时,本轮视为完成: 1. `api-server/src/sse.rs` 已提供可复用 SSE helper -2. `custom_world.rs` 不再内联维护 SSE 编码与响应头细节 -3. `cargo fmt -p api-server` 通过 -4. `cargo check -p api-server` 通过 -5. `npm run check:encoding` 通过 +2. `api-server/src/sse.rs` 已同时提供缓冲式与实时写出式两种能力 +3. `custom_world.rs` 不再内联维护 SSE 编码与响应头细节 +4. `cargo fmt -p api-server` 通过 +5. `cargo check -p api-server` 通过 +6. `npm run check:encoding` 通过 ## 8. 一句话结论 -本轮把 Rust `api-server` 里的 SSE 能力收口为“最小传输层基础设施”,统一事件编码与响应头,但不改业务事件协议和当前 custom world 的同步伪流式语义。 +本轮把 Rust `api-server` 里的 SSE 能力收口为“双模式传输层基础设施”,同时提供缓冲式输出与实时写事件 writer,但不改当前业务事件协议,也不强制现有 custom world 路由立即切到真流式。 diff --git a/server-rs/Cargo.lock b/server-rs/Cargo.lock index c5c1db01..2891bee9 100644 --- a/server-rs/Cargo.lock +++ b/server-rs/Cargo.lock @@ -71,6 +71,7 @@ version = "0.1.0" dependencies = [ "axum", "base64 0.22.1", + "bytes", "dotenvy", "hmac", "http-body-util", @@ -98,6 +99,7 @@ dependencies = [ "spacetime-client", "time", "tokio", + "tokio-stream", "tower", "tower-http", "tracing", @@ -3036,6 +3038,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.27.0" diff --git a/server-rs/crates/api-server/Cargo.toml b/server-rs/crates/api-server/Cargo.toml index f8748289..538192d4 100644 --- a/server-rs/crates/api-server/Cargo.toml +++ b/server-rs/crates/api-server/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] axum = "0.8" +bytes = "1" dotenvy = "0.15" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } module-ai = { path = "../module-ai" } @@ -27,7 +28,8 @@ shared-contracts = { path = "../shared-contracts" } shared-kernel = { path = "../shared-kernel" } shared-logging = { path = "../shared-logging" } spacetime-client = { path = "../spacetime-client" } -tokio = { version = "1", features = ["macros", "rt-multi-thread", "net"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "net", "sync"] } +tokio-stream = "0.1" time = { version = "0.3", features = ["formatting"] } tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" diff --git a/server-rs/crates/api-server/src/sse.rs b/server-rs/crates/api-server/src/sse.rs index 3dad8633..d3020430 100644 --- a/server-rs/crates/api-server/src/sse.rs +++ b/server-rs/crates/api-server/src/sse.rs @@ -1,9 +1,14 @@ use axum::{ + body::Body, http::{HeaderName, StatusCode, header}, response::{IntoResponse, Response}, }; +use bytes::Bytes; use serde::Serialize; use serde_json::json; +use std::convert::Infallible; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::http_error::AppError; @@ -30,6 +35,46 @@ impl SseEventBuffer { } } +/// 实时 SSE writer,适用于“先返回响应,再逐步推送事件”的真流式链路。 +#[derive(Clone)] +#[allow(dead_code)] +pub struct SseStreamWriter { + sender: mpsc::UnboundedSender>, +} + +#[allow(dead_code)] +impl SseStreamWriter { + pub fn push_json(&self, event: &str, payload: &T) -> Result<(), AppError> + where + T: Serialize, + { + let mut body = String::new(); + encode_sse_event(&mut body, event, payload)?; + self.sender.send(Ok(Bytes::from(body))).map_err(|_| { + AppError::from_status(StatusCode::GONE).with_details(json!({ + "provider": "sse", + "message": "实时 SSE 通道已关闭,无法继续写入事件", + })) + }) + } +} + +/// 创建一条实时 SSE 响应和对应 writer。 +/// +/// 典型用法: +/// 1. handler 先调用本函数拿到 `(writer, response)` +/// 2. 立即把 `response` 返回给客户端 +/// 3. 在后台任务里持续调用 `writer.push_json(...)` +/// 4. 所有 writer 被 drop 后,SSE 流自动结束 +#[allow(dead_code)] +pub fn new_sse_stream() -> (SseStreamWriter, Response) { + let (sender, receiver) = mpsc::unbounded_channel::>(); + let body = Body::from_stream(UnboundedReceiverStream::new(receiver)); + let response = build_sse_body_response(body); + + (SseStreamWriter { sender }, response) +} + pub fn encode_sse_event(body: &mut String, event: &str, payload: &T) -> Result<(), AppError> where T: Serialize, @@ -52,21 +97,34 @@ where } pub fn build_sse_response(body: String) -> Response { - ( - [ - (header::CONTENT_TYPE, "text/event-stream; charset=utf-8"), - (header::CACHE_CONTROL, "no-cache"), - // 反向代理场景下显式关闭缓冲,避免 SSE 事件被聚合后才下发。 - (HeaderName::from_static("x-accel-buffering"), "no"), - ], - body, - ) - .into_response() + build_sse_body_response(body) +} + +fn build_sse_body_response(body: impl IntoResponse) -> Response { + let mut response = body.into_response(); + let headers = response.headers_mut(); + headers.insert( + header::CONTENT_TYPE, + "text/event-stream; charset=utf-8" + .parse() + .expect("valid sse content-type"), + ); + headers.insert( + header::CACHE_CONTROL, + "no-cache".parse().expect("valid cache-control"), + ); + // 反向代理场景下显式关闭缓冲,避免 SSE 事件被聚合后才下发。 + headers.insert( + HeaderName::from_static("x-accel-buffering"), + "no".parse().expect("valid x-accel-buffering header"), + ); + + response } #[cfg(test)] mod tests { - use super::{SseEventBuffer, build_sse_response, encode_sse_event}; + use super::{SseEventBuffer, build_sse_response, encode_sse_event, new_sse_stream}; use axum::body::to_bytes; use serde_json::json; @@ -127,4 +185,35 @@ mod tests { "event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n" ); } + + #[tokio::test] + async fn sse_stream_writer_writes_events_into_live_response_body() { + let (writer, response) = new_sse_stream(); + + writer + .push_json("reply_delta", &json!({ "text": "hello" })) + .expect("first live event should encode"); + writer + .push_json("done", &json!({ "ok": true })) + .expect("second live event should encode"); + drop(writer); + + assert_eq!( + response + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()), + Some("text/event-stream; charset=utf-8") + ); + + let body = to_bytes(response.into_body(), usize::MAX) + .await + .expect("live response body should read"); + let text = String::from_utf8(body.to_vec()).expect("live body should be utf8"); + + assert_eq!( + text, + "event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n" + ); + } }