From d5627c536d82773071a9ca24ae7ae88a5980c26c Mon Sep 17 00:00:00 2001 From: kdletters Date: Wed, 22 Apr 2026 15:34:17 +0800 Subject: [PATCH] switch api server sse to axum builtin --- docs/technical/README.md | 2 +- ...ER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md | 154 +++++------- server-rs/crates/api-server/Cargo.toml | 2 +- .../crates/api-server/src/custom_world.rs | 46 +++- server-rs/crates/api-server/src/main.rs | 1 - server-rs/crates/api-server/src/sse.rs | 219 ------------------ 6 files changed, 100 insertions(+), 324 deletions(-) delete mode 100644 server-rs/crates/api-server/src/sse.rs diff --git a/docs/technical/README.md b/docs/technical/README.md index a20eb255..318c1cb8 100644 --- a/docs/technical/README.md +++ b/docs/technical/README.md @@ -42,7 +42,7 @@ - [SPACETIMEDB_CUSTOM_WORLD_AGENT_SESSION_STAGE6_DESIGN_2026-04-22.md](./SPACETIMEDB_CUSTOM_WORLD_AGENT_SESSION_STAGE6_DESIGN_2026-04-22.md):冻结 `M5` Agent session create / snapshot 的最小 SpacetimeDB 与 Axum facade 闭环,明确本轮不迁移 LLM、SSE、卡片更新和完整 action registry。 - [SPACETIMEDB_CUSTOM_WORLD_AGENT_MESSAGE_STAGE7_DESIGN_2026-04-22.md](./SPACETIMEDB_CUSTOM_WORLD_AGENT_MESSAGE_STAGE7_DESIGN_2026-04-22.md):冻结 `M5` Agent `message submit / operation query` 的 deterministic 最小闭环,明确同步写入 user/assistant 消息、`process_message` operation 与 session 进度推进规则。 - [SPACETIMEDB_CUSTOM_WORLD_AGENT_MESSAGE_STREAM_STAGE8_DESIGN_2026-04-22.md](./SPACETIMEDB_CUSTOM_WORLD_AGENT_MESSAGE_STREAM_STAGE8_DESIGN_2026-04-22.md):冻结 `M5` Agent `/messages/stream` 的最小兼容 SSE facade,明确复用 Stage 7 的同步写表逻辑,只输出当前前端真实消费的 `reply_delta / session / done / error` 事件。 -- [RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md](./RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md):冻结 `server-rs/crates/api-server` 内部 SSE 基础设施抽取口径,统一 Rust 侧 `text/event-stream` 响应头、事件编码与缓冲式 SSE 输出 helper。 +- [RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md](./RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md):冻结 `server-rs/crates/api-server` 的 SSE 使用口径,明确统一使用 Axum 内建 `Sse`,不再保留自定义 `sse.rs` 模块。 - [SPACETIMEDB_CUSTOM_WORLD_LIBRARY_DETAIL_STAGE5_EXTENSION_DESIGN_2026-04-22.md](./SPACETIMEDB_CUSTOM_WORLD_LIBRARY_DETAIL_STAGE5_EXTENSION_DESIGN_2026-04-22.md):补齐 `M5` Stage 5 遗漏的 owner-only `GET /api/runtime/custom-world-library/:profileId` 设计,冻结单条 profile detail 的 SpacetimeDB procedure、client facade、404 语义与 Axum 路由扩展方式。 - [SPACETIMEDB_CUSTOM_WORLD_WORKS_AND_AGENT_EXTENSION_STAGE9_DESIGN_2026-04-22.md](./SPACETIMEDB_CUSTOM_WORLD_WORKS_AND_AGENT_EXTENSION_STAGE9_DESIGN_2026-04-22.md):冻结 `M5` 剩余主链的 works、card detail、publish gate、supportedActions、action registry 与 AI/OSS 兼容路由边界,作为 Stage 9 到收口阶段的统一落地依据。 - [M6_CUSTOM_WORLD_ASSET_OSS_INTEGRATION_STAGE1_2026-04-22.md](./M6_CUSTOM_WORLD_ASSET_OSS_INTEGRATION_STAGE1_2026-04-22.md):冻结 `M6` 第一批 custom world 场景图、封面图、封面上传从本地 `public/` 临时落地切到 `OSS + asset_object + asset_entity_binding` 正式真相链的边界与槽位约定。 diff --git a/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md b/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md index 512569a7..aa9001eb 100644 --- a/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md +++ b/docs/technical/RUST_API_SERVER_SSE_INFRASTRUCTURE_DESIGN_2026-04-22.md @@ -1,131 +1,105 @@ -# Rust `api-server` SSE 基础设施设计(2026-04-22) +# Rust `api-server` SSE 使用口径(2026-04-22) 日期:`2026-04-22` ## 1. 文档目的 -这份文档用于冻结 `server-rs/crates/api-server` 内部的 SSE 基础设施抽取口径。 +这份文档用于冻结 `server-rs/crates/api-server` 的 SSE 实现口径。 -本轮目标只有两个: +本轮结论调整为:Rust `api-server` 不再维护自定义 `sse.rs` 基础设施,统一使用 Axum 内建的 `axum::response::sse::{Event, Sse}` 能力。 -1. 把当前散落在业务 handler 中的 `text/event-stream` 响应头与事件文本编码逻辑,收口为 `api-server` 可复用的 Rust 基础设施。 -2. 在同一基础设施上补出“实时写事件”的 writer 版本,为后续真流式接口预留稳定入口。 +本轮目标只有三个: + +1. 删除 `server-rs/crates/api-server/src/sse.rs` 自定义模块。 +2. 把现有 custom world message stream 切到 Axum 官方 SSE 类型。 +3. 保持现有业务事件协议与“一次性返回完整事件序列”的兼容语义不变。 本轮不做: -1. 不改前端消费协议 -2. 不把 custom world message stream 当场改成真实逐段 token streaming -3. 不引入跨 crate 的共享 `shared-contracts` SSE runtime helper -4. 不同时重构 story / runtime / txt mode 的未来流式接口 +1. 不改前端消费协议。 +2. 不把 custom world message stream 当场改成真实逐段 token streaming。 +3. 不引入跨 crate 的共享 SSE runtime helper。 +4. 不抽象 `reply_delta / session / done / error` 等业务事件名。 ## 2. 当前问题 -当前 Rust 侧 SSE 能力只在一个地方手写: +上一轮曾在 `server-rs/crates/api-server/src/sse.rs` 中抽出自定义 SSE helper,用于统一响应头、事件编码、缓冲式输出和实时 writer。 -1. `server-rs/crates/api-server/src/custom_world.rs` +继续保留这套自定义模块的问题是: -当前实现存在以下问题: +1. Axum 已经提供 `Sse`、`Event::json_data(...)` 和标准 SSE body 编码。 +2. 自定义文本编码需要自行维护换行、JSON 序列化、响应头等细节。 +3. 后续真流式接口如果继续沿用自定义 writer,会和 Axum 官方生态产生重复抽象。 +4. 当前项目已经以 Axum 作为 Rust HTTP 框架,优先使用框架内建能力更简单。 -1. `append_sse_event(...)` 与 `build_event_stream_response(...)` 直接写在业务文件里 -2. SSE 响应头、事件编码规则没有统一入口 -3. 后续如果再新增第二条 Rust SSE 路由,极容易复制一份近似实现 -4. 业务层和传输层耦合在一起,不利于测试 +## 3. 统一实现口径 -## 3. 抽取边界 +Rust `api-server` 的 SSE 路由统一使用: -本轮只抽以下基础能力: +```rust +use axum::response::sse::{Event, Sse}; +``` -1. 标准 SSE 响应头构造 -2. 单条事件编码 -3. 缓冲式 SSE body builder -4. 实时写事件的 SSE writer -5. 一次性返回完整 SSE 文本的响应构造 -6. 基于流 body 的实时 SSE 响应构造 +有限事件序列使用: -本轮明确不抽: +```rust +let stream = tokio_stream::iter(events); +Sse::new(stream).into_response() +``` -1. `reply_delta / session / done / error` 这些业务事件名 -2. 事件发送顺序 -3. custom world session 的查询与回复文本推导 -4. 业务错误到 SSE `error` 事件的映射策略 +实时流式接口后续直接使用: -原因固定如下: +```rust +Sse::new(event_stream) +``` -1. 这些内容属于业务协议,而不是通用传输设施 -2. 当前不同链路未来很可能有不同事件集合 -3. 先把传输层抽干净,后续真实流式能力才能稳定复用 +如需保持长连接,可在真实长流接口中追加: -## 4. 基础设施 API 口径 +```rust +.keep_alive(axum::response::sse::KeepAlive::default()) +``` -本轮在 `server-rs/crates/api-server/src/sse.rs` 提供: +## 4. custom world message stream 边界 -1. `SseEventBuffer` - - 面向当前最小兼容场景 - - 内部持有 `String` - - 提供 `push_json(event, payload)` 与 `into_response()` -2. `SseStreamWriter` - - 面向后续真流式场景 - - 内部持有实时写出的 channel sender - - 提供 `push_json(event, payload)` - - writer 被 drop 后,流自动结束 -3. `new_sse_stream()` - - 返回 `(SseStreamWriter, Response)` - - 业务 handler 可先把 `Response` 返回,再异步持续推送事件 -4. `build_sse_response(body)` - - 统一写入标准 SSE 响应头 -5. `encode_sse_event(body, event, payload)` - - 只负责把事件编码为: - ```text - event: xxx - data: {...} +`POST /api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 当前继续保持 Stage 8 文档冻结的最小语义: - ``` +1. 业务层先完成 deterministic 写表。 +2. 读取最新 session snapshot。 +3. 组装 `reply_delta`。 +4. 组装 `session`。 +5. 组装 `done`。 +6. 通过 Axum `Sse` 返回完整事件序列。 -## 5. 标准响应头 +本轮只替换传输层实现,不改变事件顺序、事件名和 payload 结构。 -所有通过本基础设施输出的 SSE 响应,统一包含: +## 5. 响应头说明 -1. `Content-Type: text/event-stream; charset=utf-8` +Axum `Sse` 默认写入: + +1. `Content-Type: text/event-stream` 2. `Cache-Control: no-cache` -3. `X-Accel-Buffering: no` -当前不默认加入: - -1. `Connection: keep-alive` +当前不再额外写入自定义 `X-Accel-Buffering: no` helper。 原因: -1. 当前 Rust `axum` 一次性 body 返回场景不依赖显式设置该头 -2. 保持最小必要头集合,避免提前固化未来长连接策略 +1. 本轮目标是移除项目自定义 SSE 模块,避免继续维护传输层封装。 +2. 当前 custom world stream 仍是短生命周期的兼容事件序列,不是长时间 token streaming。 +3. 如果未来某条真实长流接口需要反向代理禁用缓冲,应在该路由或统一 HTTP 中间件层显式评估,而不是恢复自定义 SSE 编码器。 -## 6. 与 custom world message stream 的关系 - -`POST /api/runtime/custom-world/agent/sessions/:sessionId/messages/stream` 当前仍然保持 Stage 8 文档冻结的最小语义: - -1. 业务层先完成 deterministic 写表 -2. 读取最新 session snapshot -3. 组装 `reply_delta` -4. 组装 `session` -5. 组装 `done` -6. 一次性返回完整 SSE 文本 - -本轮变化只在于: - -1. 事件编码和响应头不再手写在 `custom_world.rs` -2. 改由 `sse.rs` 基础设施承接 -3. 同时补出实时 writer 版本,但当前不强制业务路由马上切换 - -## 7. 验收标准 +## 6. 验收标准 当以下条件满足时,本轮视为完成: -1. `api-server/src/sse.rs` 已提供可复用 SSE helper -2. `api-server/src/sse.rs` 已同时提供缓冲式与实时写出式两种能力 -3. `custom_world.rs` 不再内联维护 SSE 编码与响应头细节 -4. `cargo fmt -p api-server` 通过 -5. `cargo check -p api-server` 通过 -6. `npm run check:encoding` 通过 +1. `api-server/src/sse.rs` 已删除。 +2. `api-server/src/main.rs` 不再声明 `mod sse;`。 +3. `custom_world.rs` 不再依赖 `crate::sse::SseEventBuffer`。 +4. custom world message stream 使用 Axum `Sse` 构造响应。 +5. 为旧自定义 writer 引入的 `bytes`、`tokio::sync` feature 等依赖已清理。 +6. `cargo fmt -p api-server` 通过。 +7. `cargo check -p api-server` 通过。 +8. `npm run check:encoding` 通过。 -## 8. 一句话结论 +## 7. 一句话结论 -本轮把 Rust `api-server` 里的 SSE 能力收口为“双模式传输层基础设施”,同时提供缓冲式输出与实时写事件 writer,但不改当前业务事件协议,也不强制现有 custom world 路由立即切到真流式。 +Rust `api-server` 的 SSE 能力以 Axum 内建 `Sse` 为唯一实现入口,不再保留项目自定义 `sse.rs` 模块;当前 custom world stream 只替换传输层,不改变业务协议。 diff --git a/server-rs/crates/api-server/Cargo.toml b/server-rs/crates/api-server/Cargo.toml index 538192d4..dd9df5db 100644 --- a/server-rs/crates/api-server/Cargo.toml +++ b/server-rs/crates/api-server/Cargo.toml @@ -28,7 +28,7 @@ shared-contracts = { path = "../shared-contracts" } shared-kernel = { path = "../shared-kernel" } shared-logging = { path = "../shared-logging" } spacetime-client = { path = "../spacetime-client" } -tokio = { version = "1", features = ["macros", "rt-multi-thread", "net", "sync"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "net"] } tokio-stream = "0.1" time = { version = "0.3", features = ["formatting"] } tower-http = { version = "0.6", features = ["trace"] } diff --git a/server-rs/crates/api-server/src/custom_world.rs b/server-rs/crates/api-server/src/custom_world.rs index 8550b12e..bf473ab8 100644 --- a/server-rs/crates/api-server/src/custom_world.rs +++ b/server-rs/crates/api-server/src/custom_world.rs @@ -2,7 +2,10 @@ use axum::{ Json, extract::{Extension, Path, State, rejection::JsonRejection}, http::StatusCode, - response::Response, + response::{ + IntoResponse, Response, + sse::{Event, Sse}, + }, }; use module_custom_world::{ CustomWorldThemeMode, empty_agent_anchor_content_json, empty_agent_asset_coverage_json, @@ -34,10 +37,11 @@ use spacetime_client::{ CustomWorldResultPreviewBlockerRecord, CustomWorldSupportedActionRecord, CustomWorldWorkSummaryRecord, SpacetimeClientError, }; +use std::convert::Infallible; use crate::{ api_response::json_success_body, auth::AuthenticatedAccessToken, http_error::AppError, - request_context::RequestContext, sse::SseEventBuffer, state::AppState, + request_context::RequestContext, state::AppState, }; pub async fn get_custom_world_library( @@ -576,17 +580,23 @@ pub async fn stream_custom_world_agent_message( let session_response = map_custom_world_agent_session_response(session); let reply_text = resolve_stream_reply_text(&session_response); - // 这里先用“一次性构造完整 SSE 文本”的最小兼容方案, - // 复用 Stage 7 的同步 deterministic 写表逻辑,保证前端当前的 reader 协议可直接消费。 - let mut sse = SseEventBuffer::new(); - sse.push_json("reply_delta", &json!({ "text": reply_text })) - .map_err(|error| custom_world_error_response(&request_context, error))?; - sse.push_json("session", &json!({ "session": session_response })) - .map_err(|error| custom_world_error_response(&request_context, error))?; - sse.push_json("done", &json!({ "ok": true })) - .map_err(|error| custom_world_error_response(&request_context, error))?; + // 这里仍保持“一次性返回完整事件序列”的兼容语义; + // SSE 编码、标准响应头与 body frame 交给 Axum 内建实现维护。 + let events = vec![ + custom_world_sse_json_event("reply_delta", json!({ "text": reply_text })) + .map_err(|error| custom_world_error_response(&request_context, error))?, + custom_world_sse_json_event("session", json!({ "session": session_response })) + .map_err(|error| custom_world_error_response(&request_context, error))?, + custom_world_sse_json_event("done", json!({ "ok": true })) + .map_err(|error| custom_world_error_response(&request_context, error))?, + ]; + let stream = tokio_stream::iter( + events + .into_iter() + .map(|event| Ok::(event)), + ); - Ok(sse.into_response()) + Ok(Sse::new(stream).into_response()) } pub async fn get_custom_world_agent_operation( @@ -983,6 +993,18 @@ fn custom_world_error_response(request_context: &RequestContext, error: AppError error.into_response_with_context(Some(request_context)) } +fn custom_world_sse_json_event(event_name: &str, payload: Value) -> Result { + Event::default() + .event(event_name) + .json_data(payload) + .map_err(|error| { + AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ + "provider": "sse", + "message": format!("SSE payload 序列化失败:{error}"), + })) + }) +} + fn resolve_author_display_name(_authenticated: &AuthenticatedAccessToken) -> String { "玩家".to_string() } diff --git a/server-rs/crates/api-server/src/main.rs b/server-rs/crates/api-server/src/main.rs index 5323963b..f40f0446 100644 --- a/server-rs/crates/api-server/src/main.rs +++ b/server-rs/crates/api-server/src/main.rs @@ -28,7 +28,6 @@ mod runtime_save; mod runtime_settings; mod runtime_story; mod session_client; -mod sse; mod state; mod story_battles; mod story_sessions; diff --git a/server-rs/crates/api-server/src/sse.rs b/server-rs/crates/api-server/src/sse.rs deleted file mode 100644 index d3020430..00000000 --- a/server-rs/crates/api-server/src/sse.rs +++ /dev/null @@ -1,219 +0,0 @@ -use axum::{ - body::Body, - http::{HeaderName, StatusCode, header}, - response::{IntoResponse, Response}, -}; -use bytes::Bytes; -use serde::Serialize; -use serde_json::json; -use std::convert::Infallible; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; - -use crate::http_error::AppError; - -/// 最小缓冲式 SSE builder,适用于“先完成业务,再一次性返回完整 SSE 文本”的兼容链路。 -#[derive(Default)] -pub struct SseEventBuffer { - body: String, -} - -impl SseEventBuffer { - pub fn new() -> Self { - Self::default() - } - - pub fn push_json(&mut self, event: &str, payload: &T) -> Result<(), AppError> - where - T: Serialize, - { - encode_sse_event(&mut self.body, event, payload) - } - - pub fn into_response(self) -> Response { - build_sse_response(self.body) - } -} - -/// 实时 SSE writer,适用于“先返回响应,再逐步推送事件”的真流式链路。 -#[derive(Clone)] -#[allow(dead_code)] -pub struct SseStreamWriter { - sender: mpsc::UnboundedSender>, -} - -#[allow(dead_code)] -impl SseStreamWriter { - pub fn push_json(&self, event: &str, payload: &T) -> Result<(), AppError> - where - T: Serialize, - { - let mut body = String::new(); - encode_sse_event(&mut body, event, payload)?; - self.sender.send(Ok(Bytes::from(body))).map_err(|_| { - AppError::from_status(StatusCode::GONE).with_details(json!({ - "provider": "sse", - "message": "实时 SSE 通道已关闭,无法继续写入事件", - })) - }) - } -} - -/// 创建一条实时 SSE 响应和对应 writer。 -/// -/// 典型用法: -/// 1. handler 先调用本函数拿到 `(writer, response)` -/// 2. 立即把 `response` 返回给客户端 -/// 3. 在后台任务里持续调用 `writer.push_json(...)` -/// 4. 所有 writer 被 drop 后,SSE 流自动结束 -#[allow(dead_code)] -pub fn new_sse_stream() -> (SseStreamWriter, Response) { - let (sender, receiver) = mpsc::unbounded_channel::>(); - let body = Body::from_stream(UnboundedReceiverStream::new(receiver)); - let response = build_sse_body_response(body); - - (SseStreamWriter { sender }, response) -} - -pub fn encode_sse_event(body: &mut String, event: &str, payload: &T) -> Result<(), AppError> -where - T: Serialize, -{ - let payload_text = serde_json::to_string(payload).map_err(|error| { - AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_details(json!({ - "provider": "sse", - "message": format!("SSE payload 序列化失败:{error}"), - })) - })?; - - body.push_str("event: "); - body.push_str(event); - body.push('\n'); - body.push_str("data: "); - body.push_str(&payload_text); - body.push_str("\n\n"); - - Ok(()) -} - -pub fn build_sse_response(body: String) -> Response { - build_sse_body_response(body) -} - -fn build_sse_body_response(body: impl IntoResponse) -> Response { - let mut response = body.into_response(); - let headers = response.headers_mut(); - headers.insert( - header::CONTENT_TYPE, - "text/event-stream; charset=utf-8" - .parse() - .expect("valid sse content-type"), - ); - headers.insert( - header::CACHE_CONTROL, - "no-cache".parse().expect("valid cache-control"), - ); - // 反向代理场景下显式关闭缓冲,避免 SSE 事件被聚合后才下发。 - headers.insert( - HeaderName::from_static("x-accel-buffering"), - "no".parse().expect("valid x-accel-buffering header"), - ); - - response -} - -#[cfg(test)] -mod tests { - use super::{SseEventBuffer, build_sse_response, encode_sse_event, new_sse_stream}; - use axum::body::to_bytes; - use serde_json::json; - - #[tokio::test] - async fn encode_sse_event_writes_standard_format() { - let mut body = String::new(); - encode_sse_event(&mut body, "reply_delta", &json!({ "text": "hello" })) - .expect("encoding should succeed"); - - assert_eq!(body, "event: reply_delta\ndata: {\"text\":\"hello\"}\n\n"); - } - - #[tokio::test] - async fn build_sse_response_sets_standard_headers() { - let response = build_sse_response("event: done\ndata: {\"ok\":true}\n\n".to_string()); - - assert_eq!( - response - .headers() - .get(header::CONTENT_TYPE) - .and_then(|value| value.to_str().ok()), - Some("text/event-stream; charset=utf-8") - ); - assert_eq!( - response - .headers() - .get(header::CACHE_CONTROL) - .and_then(|value| value.to_str().ok()), - Some("no-cache") - ); - assert_eq!( - response - .headers() - .get(HeaderName::from_static("x-accel-buffering")) - .and_then(|value| value.to_str().ok()), - Some("no") - ); - } - - #[tokio::test] - async fn sse_event_buffer_collects_events_and_returns_response() { - let mut buffer = SseEventBuffer::new(); - buffer - .push_json("reply_delta", &json!({ "text": "hello" })) - .expect("first event should encode"); - buffer - .push_json("done", &json!({ "ok": true })) - .expect("second event should encode"); - - let response = buffer.into_response(); - let body = to_bytes(response.into_body(), usize::MAX) - .await - .expect("response body should read"); - let text = String::from_utf8(body.to_vec()).expect("body should be utf8"); - - assert_eq!( - text, - "event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n" - ); - } - - #[tokio::test] - async fn sse_stream_writer_writes_events_into_live_response_body() { - let (writer, response) = new_sse_stream(); - - writer - .push_json("reply_delta", &json!({ "text": "hello" })) - .expect("first live event should encode"); - writer - .push_json("done", &json!({ "ok": true })) - .expect("second live event should encode"); - drop(writer); - - assert_eq!( - response - .headers() - .get(header::CONTENT_TYPE) - .and_then(|value| value.to_str().ok()), - Some("text/event-stream; charset=utf-8") - ); - - let body = to_bytes(response.into_body(), usize::MAX) - .await - .expect("live response body should read"); - let text = String::from_utf8(body.to_vec()).expect("live body should be utf8"); - - assert_eq!( - text, - "event: reply_delta\ndata: {\"text\":\"hello\"}\n\nevent: done\ndata: {\"ok\":true}\n\n" - ); - } -}