extract rust sse infrastructure

This commit is contained in:
2026-04-22 14:52:30 +08:00
parent 91fb8edee7
commit 28ba990123
4 changed files with 140 additions and 23 deletions

View File

@@ -6,14 +6,15 @@
这份文档用于冻结 `server-rs/crates/api-server` 内部的 SSE 基础设施抽取口径。 这份文档用于冻结 `server-rs/crates/api-server` 内部的 SSE 基础设施抽取口径。
本轮目标只有个: 本轮目标只有个:
1. 把当前散落在业务 handler 中的 `text/event-stream` 响应头与事件文本编码逻辑,收口为 `api-server` 可复用的 Rust 基础设施。 1. 把当前散落在业务 handler 中的 `text/event-stream` 响应头与事件文本编码逻辑,收口为 `api-server` 可复用的 Rust 基础设施。
2. 在同一基础设施上补出“实时写事件”的 writer 版本,为后续真流式接口预留稳定入口。
本轮不做: 本轮不做:
1. 不改前端消费协议 1. 不改前端消费协议
2. 不把 custom world message stream 改成真实逐段 token streaming 2. 不把 custom world message stream 当场改成真实逐段 token streaming
3. 不引入跨 crate 的共享 `shared-contracts` SSE runtime helper 3. 不引入跨 crate 的共享 `shared-contracts` SSE runtime helper
4. 不同时重构 story / runtime / txt mode 的未来流式接口 4. 不同时重构 story / runtime / txt mode 的未来流式接口
@@ -37,7 +38,9 @@
1. 标准 SSE 响应头构造 1. 标准 SSE 响应头构造
2. 单条事件编码 2. 单条事件编码
3. 缓冲式 SSE body builder 3. 缓冲式 SSE body builder
4. 一次性返回完整 SSE 文本的响应构造 4. 实时写事件的 SSE writer
5. 一次性返回完整 SSE 文本的响应构造
6. 基于流 body 的实时 SSE 响应构造
本轮明确不抽: 本轮明确不抽:
@@ -60,9 +63,17 @@
- 面向当前最小兼容场景 - 面向当前最小兼容场景
- 内部持有 `String` - 内部持有 `String`
- 提供 `push_json(event, payload)``into_response()` - 提供 `push_json(event, payload)``into_response()`
2. `build_sse_response(body)` 2. `SseStreamWriter`
- 面向后续真流式场景
- 内部持有实时写出的 channel sender
- 提供 `push_json(event, payload)`
- writer 被 drop 后,流自动结束
3. `new_sse_stream()`
- 返回 `(SseStreamWriter, Response)`
- 业务 handler 可先把 `Response` 返回,再异步持续推送事件
4. `build_sse_response(body)`
- 统一写入标准 SSE 响应头 - 统一写入标准 SSE 响应头
3. `encode_sse_event(body, event, payload)` 5. `encode_sse_event(body, event, payload)`
- 只负责把事件编码为: - 只负责把事件编码为:
```text ```text
event: xxx event: xxx
@@ -89,7 +100,7 @@
## 6. 与 custom world message stream 的关系 ## 6. 与 custom world message stream 的关系
`POST /api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 仍然保持 Stage 8 文档冻结的最小语义: `POST /api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 当前仍然保持 Stage 8 文档冻结的最小语义:
1. 业务层先完成 deterministic 写表 1. 业务层先完成 deterministic 写表
2. 读取最新 session snapshot 2. 读取最新 session snapshot
@@ -102,17 +113,19 @@
1. 事件编码和响应头不再手写在 `custom_world.rs` 1. 事件编码和响应头不再手写在 `custom_world.rs`
2. 改由 `sse.rs` 基础设施承接 2. 改由 `sse.rs` 基础设施承接
3. 同时补出实时 writer 版本,但当前不强制业务路由马上切换
## 7. 验收标准 ## 7. 验收标准
当以下条件满足时,本轮视为完成: 当以下条件满足时,本轮视为完成:
1. `api-server/src/sse.rs` 已提供可复用 SSE helper 1. `api-server/src/sse.rs` 已提供可复用 SSE helper
2. `custom_world.rs` 不再内联维护 SSE 编码与响应头细节 2. `api-server/src/sse.rs` 已同时提供缓冲式与实时写出式两种能力
3. `cargo fmt -p api-server` 通过 3. `custom_world.rs` 不再内联维护 SSE 编码与响应头细节
4. `cargo check -p api-server` 通过 4. `cargo fmt -p api-server` 通过
5. `npm run check:encoding` 通过 5. `cargo check -p api-server` 通过
6. `npm run check:encoding` 通过
## 8. 一句话结论 ## 8. 一句话结论
本轮把 Rust `api-server` 里的 SSE 能力收口为“最小传输层基础设施”,统一事件编码与响应头,但不改业务事件协议和当前 custom world 的同步伪流式语义 本轮把 Rust `api-server` 里的 SSE 能力收口为“双模式传输层基础设施”,同时提供缓冲式输出与实时写事件 writer,但不改当前业务事件协议,也不强制现有 custom world 路由立即切到真流式

13
server-rs/Cargo.lock generated
View File

@@ -71,6 +71,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum",
"base64 0.22.1", "base64 0.22.1",
"bytes",
"dotenvy", "dotenvy",
"hmac", "hmac",
"http-body-util", "http-body-util",
@@ -98,6 +99,7 @@ dependencies = [
"spacetime-client", "spacetime-client",
"time", "time",
"tokio", "tokio",
"tokio-stream",
"tower", "tower",
"tower-http", "tower-http",
"tracing", "tracing",
@@ -3036,6 +3038,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.27.0" version = "0.27.0"

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies] [dependencies]
axum = "0.8" axum = "0.8"
bytes = "1"
dotenvy = "0.15" dotenvy = "0.15"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
module-ai = { path = "../module-ai" } module-ai = { path = "../module-ai" }
@@ -27,7 +28,8 @@ shared-contracts = { path = "../shared-contracts" }
shared-kernel = { path = "../shared-kernel" } shared-kernel = { path = "../shared-kernel" }
shared-logging = { path = "../shared-logging" } shared-logging = { path = "../shared-logging" }
spacetime-client = { path = "../spacetime-client" } spacetime-client = { path = "../spacetime-client" }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "net"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "net", "sync"] }
tokio-stream = "0.1"
time = { version = "0.3", features = ["formatting"] } time = { version = "0.3", features = ["formatting"] }
tower-http = { version = "0.6", features = ["trace"] } tower-http = { version = "0.6", features = ["trace"] }
tracing = "0.1" tracing = "0.1"

View File

@@ -1,9 +1,14 @@
use axum::{ use axum::{
body::Body,
http::{HeaderName, StatusCode, header}, http::{HeaderName, StatusCode, header},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use bytes::Bytes;
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::convert::Infallible;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::http_error::AppError; use crate::http_error::AppError;
@@ -30,6 +35,46 @@ impl SseEventBuffer {
} }
} }
/// 实时 SSE writer适用于“先返回响应再逐步推送事件”的真流式链路。
#[derive(Clone)]
#[allow(dead_code)]
pub struct SseStreamWriter {
sender: mpsc::UnboundedSender<Result<Bytes, Infallible>>,
}
#[allow(dead_code)]
impl SseStreamWriter {
pub fn push_json<T>(&self, event: &str, payload: &T) -> Result<(), AppError>
where
T: Serialize,
{
let mut body = String::new();
encode_sse_event(&mut body, event, payload)?;
self.sender.send(Ok(Bytes::from(body))).map_err(|_| {
AppError::from_status(StatusCode::GONE).with_details(json!({
"provider": "sse",
"message": "实时 SSE 通道已关闭,无法继续写入事件",
}))
})
}
}
/// 创建一条实时 SSE 响应和对应 writer。
///
/// 典型用法:
/// 1. handler 先调用本函数拿到 `(writer, response)`
/// 2. 立即把 `response` 返回给客户端
/// 3. 在后台任务里持续调用 `writer.push_json(...)`
/// 4. 所有 writer 被 drop 后SSE 流自动结束
#[allow(dead_code)]
pub fn new_sse_stream() -> (SseStreamWriter, Response) {
let (sender, receiver) = mpsc::unbounded_channel::<Result<Bytes, Infallible>>();
let body = Body::from_stream(UnboundedReceiverStream::new(receiver));
let response = build_sse_body_response(body);
(SseStreamWriter { sender }, response)
}
pub fn encode_sse_event<T>(body: &mut String, event: &str, payload: &T) -> Result<(), AppError> pub fn encode_sse_event<T>(body: &mut String, event: &str, payload: &T) -> Result<(), AppError>
where where
T: Serialize, T: Serialize,
@@ -52,21 +97,34 @@ where
} }
pub fn build_sse_response(body: String) -> Response { pub fn build_sse_response(body: String) -> Response {
( build_sse_body_response(body)
[ }
(header::CONTENT_TYPE, "text/event-stream; charset=utf-8"),
(header::CACHE_CONTROL, "no-cache"), fn build_sse_body_response(body: impl IntoResponse) -> Response {
let mut response = body.into_response();
let headers = response.headers_mut();
headers.insert(
header::CONTENT_TYPE,
"text/event-stream; charset=utf-8"
.parse()
.expect("valid sse content-type"),
);
headers.insert(
header::CACHE_CONTROL,
"no-cache".parse().expect("valid cache-control"),
);
// 反向代理场景下显式关闭缓冲,避免 SSE 事件被聚合后才下发。 // 反向代理场景下显式关闭缓冲,避免 SSE 事件被聚合后才下发。
(HeaderName::from_static("x-accel-buffering"), "no"), headers.insert(
], HeaderName::from_static("x-accel-buffering"),
body, "no".parse().expect("valid x-accel-buffering header"),
) );
.into_response()
response
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{SseEventBuffer, build_sse_response, encode_sse_event}; use super::{SseEventBuffer, build_sse_response, encode_sse_event, new_sse_stream};
use axum::body::to_bytes; use axum::body::to_bytes;
use serde_json::json; use serde_json::json;
@@ -127,4 +185,35 @@ mod tests {
"event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n" "event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n"
); );
} }
#[tokio::test]
async fn sse_stream_writer_writes_events_into_live_response_body() {
let (writer, response) = new_sse_stream();
writer
.push_json("reply_delta", &json!({ "text": "hello" }))
.expect("first live event should encode");
writer
.push_json("done", &json!({ "ok": true }))
.expect("second live event should encode");
drop(writer);
assert_eq!(
response
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("text/event-stream; charset=utf-8")
);
let body = to_bytes(response.into_body(), usize::MAX)
.await
.expect("live response body should read");
let text = String::from_utf8(body.to_vec()).expect("live body should be utf8");
assert_eq!(
text,
"event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n"
);
}
} }