diff --git a/.hermes/shared-memory/decision-log.md b/.hermes/shared-memory/decision-log.md index 2a616fff..f9f9bdd3 100644 --- a/.hermes/shared-memory/decision-log.md +++ b/.hermes/shared-memory/decision-log.md @@ -16,6 +16,14 @@ --- +## 2026-06-03 前端 SSE 客户端传输层统一收口 + +- 背景:创作 Agent、创意互动 Agent、视觉小说运行态和微信充值订单状态等多个前端 client 曾各自手写 SSE 边界扫描、`TextDecoder` 解码、JSON 解析和流结束 flush,导致 CRLF / LF、UTF-8 尾部、多行 `data:` 和提前停止释放 reader 的处理容易漂移。 +- 决策:前端 SSE 传输层统一使用 `src/services/sseStream.ts`;`readSseStream` 负责事件边界、解码 flush、多行 data 和提前停止取消 reader,`readSseJsonStream` 负责 JSON object 事件解析与异常 JSON 静默跳过。业务 client 只保留领域事件归一化、结果聚合和中文错误文案,后续不得复制 `findSseEventBoundary`、`parseSseEventBlock` 或手写 reader 循环。 +- 影响范围:`src/services/sseStream.ts`、`src/services/aiService.ts`、`src/services/creation-agent/creationAgentSse.ts`、`src/services/creative-agent/creativeAgentSse.ts`、`src/services/visual-novel-runtime/visualNovelRuntimeSse.ts`、`src/services/rpg-entry/rpgProfileClient.ts`、前端 SSE 相关测试与架构文档。 +- 验证方式:`npm run test -- src/services/sseStream.test.ts src/services/creation-agent/creationAgentSse.test.ts src/services/creative-agent/creativeAgentSse.test.ts src/services/visual-novel-runtime/visualNovelRuntimeSse.test.ts src/services/rpg-entry/rpgProfileClient.test.ts src/services/ai.test.ts`、`npm run typecheck`、`npm run check:encoding`、相关文件 `npx eslint ... --max-warnings 0` 通过。 +- 关联文档:`docs/technical/【前端架构】SSE客户端传输层收口约定-2026-06-03.md`。 + ## 2026-06-03 最近创作只复用创作模板入口 - 背景:底部加号创作入口的“最近创作”最初由真实作品架摘要驱动,但页面曾按作品标题、摘要和生成状态渲染独立最近创作卡,和其它模板页签的卡片样式及点击语义不一致。 diff --git a/docs/README.md b/docs/README.md index 0bd60aaa..154cd937 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,6 +37,8 @@ SpacetimeDB 表结构变更、自动迁移边界和保留旧数据的分阶段 AI 文字游戏模板接入以 [AI_NATIVE_TEXT_GAME_TEMPLATE_MOKU_REFERENCE_PRD_2026-05-05.md](./prd/AI_NATIVE_TEXT_GAME_TEMPLATE_MOKU_REFERENCE_PRD_2026-05-05.md) 为最新口径:只吸收 MOKU / 幕间类 AI 文游的剧本游乐场、自由行动、AI GM、记忆和模拟器强反馈经验,禁止迁入外部社区、支付、榜单、私有存档或回放。 +前端 Server-Sent Events 客户端传输层收口到 `src/services/sseStream.ts`,事件边界、UTF-8 flush、JSON 解析跳过和提前取消约定见 [【前端架构】SSE客户端传输层收口约定-2026-06-03.md](./technical/%E3%80%90%E5%89%8D%E7%AB%AF%E6%9E%B6%E6%9E%84%E3%80%91SSE%E5%AE%A2%E6%88%B7%E7%AB%AF%E4%BC%A0%E8%BE%93%E5%B1%82%E6%94%B6%E5%8F%A3%E7%BA%A6%E5%AE%9A-2026-06-03.md)。 + ## 推荐阅读顺序 1. 先看 [经验沉淀](./experience/README.md),快速建立这个项目的开发共识。 diff --git a/docs/technical/【前端架构】SSE客户端传输层收口约定-2026-06-03.md b/docs/technical/【前端架构】SSE客户端传输层收口约定-2026-06-03.md new file mode 100644 index 00000000..0948019a --- /dev/null +++ b/docs/technical/【前端架构】SSE客户端传输层收口约定-2026-06-03.md @@ -0,0 +1,34 @@ +# SSE 客户端传输层收口约定 + +更新时间:`2026-06-03` + +## 背景 + +前端多个服务 client 需要读取 Server-Sent Events,包括创作 Agent、创意互动 Agent、视觉小说运行态和微信充值订单状态。旧实现分别在各自文件里手写事件边界查找、`TextDecoder` 解码、JSON 解析和流结束 flush,容易出现 CRLF / LF 边界不一致、UTF-8 多字节字符尾部丢失、错误事件处理漂移,以及长连接达到最终状态后没有及时释放的问题。 + +## 决策 + +前端 SSE 的传输层统一收口到 `src/services/sseStream.ts`: + +- `readSseStream` 负责读取 `Response.body`、识别 `\n\n` 与 `\r\n\r\n` 事件边界、合并多行 `data:`、flush `TextDecoder` 尾部缓冲,并支持事件处理函数返回 `false` 后取消 reader。 +- `readSseJsonStream` 只在传输事件基础上解析 JSON object,空 data 与异常 JSON 继续按旧口径静默跳过。 +- 各业务 client 只保留领域事件归一化、最终结果聚合和中文错误文案,不再重复实现 SSE 边界扫描、reader 循环或 UTF-8 flush。 +- OpenAI 兼容流、`[DONE]` 哨兵或其它非 JSON SSE 可直接使用 `readSseStream`;业务 JSON 事件优先使用 `readSseJsonStream`。 + +## 落地范围 + +本次先收口以下客户端: + +- `src/services/aiService.ts` +- `src/services/creation-agent/creationAgentSse.ts` +- `src/services/creative-agent/creativeAgentSse.ts` +- `src/services/visual-novel-runtime/visualNovelRuntimeSse.ts` +- `src/services/rpg-entry/rpgProfileClient.ts` + +后续新增 SSE client 时不得复制 `findSseEventBoundary`、`parseSseEventBlock` 或手写 reader 循环;若确实需要特殊 framing,应先扩展 `sseStream.ts` 的传输能力,再在业务 client 中处理领域语义。 + +## 验收 + +- `src/services/sseStream.test.ts` 覆盖 CRLF / LF 边界、UTF-8 尾部 flush、异常 JSON 跳过和提前停止取消 reader。 +- 已有 OpenAI 兼容文本流、NPC 聊天流、创作 Agent、创意互动 Agent、视觉小说运行态和充值订单状态测试继续通过。 +- `npm run typecheck` 不产生新的类型错误。 diff --git a/src/services/aiService.ts b/src/services/aiService.ts index eda0fb45..4112042e 100644 --- a/src/services/aiService.ts +++ b/src/services/aiService.ts @@ -35,13 +35,14 @@ import type { TextStreamOptions, } from './aiTypes'; import { fetchWithApiAuth, requestJson } from './apiClient'; -import { type CharacterChatTargetStatus } from './rpgRuntimeChatTypes'; import { parseLineListContent } from './llmParsers'; import { buildStoryMomentFromRuntimeProjection, getStoryRuntimeProjection, resolveRuntimeStoryAction, } from './rpg-runtime/rpgRuntimeStoryClient'; +import { type CharacterChatTargetStatus } from './rpgRuntimeChatTypes'; +import { parseSseJsonObject, readSseJsonStream, readSseStream } from './sseStream'; const RUNTIME_API_BASE = '/api/runtime'; @@ -108,81 +109,96 @@ async function requestPlainTextStream( throw new Error('streaming response body is unavailable'); } - const reader = response.body.getReader(); - const decoder = new TextDecoder('utf-8'); - let buffer = ''; let accumulatedText = ''; - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; + await readSseStream(response, ({ data }) => { + if (data === '[DONE]') { + return false; } - buffer += decoder.decode(value, { stream: true }); - - while (buffer.includes('\n\n')) { - const boundary = buffer.indexOf('\n\n'); - const eventBlock = buffer.slice(0, boundary); - buffer = buffer.slice(boundary + 2); - - for (const rawLine of eventBlock.split(/\r?\n/u)) { - const line = rawLine.trim(); - if (!line.startsWith('data:')) { - continue; - } - - const data = line.slice(5).trim(); - if (!data || data === '[DONE]') { - continue; - } - - try { - const parsed = JSON.parse(data); - const delta = parsed?.choices?.[0]?.delta?.content; - if (typeof delta === 'string' && delta.length > 0) { - accumulatedText += delta; - options.onUpdate?.(accumulatedText); - } - } catch { - // Ignore malformed SSE frames. - } - } + const parsed = parseSseJsonObject(data); + if (!parsed) { + return; } - } + + const delta = readPlainTextStreamDelta(parsed); + if (delta) { + accumulatedText += delta; + options.onUpdate?.(accumulatedText); + } + }); return accumulatedText.trim(); } -type ParsedSseEvent = { - event: string | null; - data: string; -}; +function asRecord(value: unknown): Record | null { + return typeof value === 'object' && value !== null + ? (value as Record) + : null; +} -function parseSseEventBlock(eventBlock: string): ParsedSseEvent | null { - let eventName: string | null = null; - const dataLines: string[] = []; +function readPlainTextStreamDelta(parsed: Record) { + const choices = Array.isArray(parsed.choices) ? parsed.choices : []; + const firstChoice = asRecord(choices[0]); + const delta = asRecord(firstChoice?.delta); + const content = delta?.content; + return typeof content === 'string' ? content : ''; +} - for (const rawLine of eventBlock.split(/\r?\n/u)) { - const line = rawLine.trim(); - if (!line) continue; - if (line.startsWith('event:')) { - eventName = line.slice(6).trim() || null; - continue; - } - if (line.startsWith('data:')) { - dataLines.push(line.slice(5).trim()); - } - } +function readSseEventMessage( + parsed: Record, + fallbackMessage: string, +) { + return typeof parsed.message === 'string' ? parsed.message : fallbackMessage; +} - if (dataLines.length === 0) { - return null; - } +function coerceNpcChatTurnResult( + parsed: Record, +): NpcChatTurnResult { + return parsed as unknown as NpcChatTurnResult; +} - return { - event: eventName, - data: dataLines.join('\n'), +function readNpcReplyDelta(parsed: Record) { + return typeof parsed.text === 'string' ? parsed.text : ''; +} + +function readNpcCompletedReply(result: NpcChatTurnResult) { + return typeof result.npcReply === 'string' ? result.npcReply : ''; +} + +async function readNpcChatTurnFromSse( + response: Response, + options: { onReplyUpdate?: (text: string) => void } = {}, +): Promise { + let accumulatedReply = ''; + const completedResultRef: { current: NpcChatTurnResult | null } = { + current: null, }; + + await readSseJsonStream(response, ({ eventName, parsed }) => { + if (eventName === 'reply_delta') { + accumulatedReply = readNpcReplyDelta(parsed); + options.onReplyUpdate?.(accumulatedReply); + return; + } + + if (eventName === 'complete') { + completedResultRef.current = coerceNpcChatTurnResult(parsed); + accumulatedReply = readNpcCompletedReply(completedResultRef.current); + options.onReplyUpdate?.(accumulatedReply); + return false; + } + + if (eventName === 'error') { + throw new Error(readSseEventMessage(parsed, 'NPC 聊天续写失败')); + } + }); + + if (!completedResultRef.current) { + throw new Error('NPC 聊天续写结果为空'); + } + + return completedResultRef.current; } export async function generateInitialStory( @@ -508,72 +524,9 @@ export async function streamNpcChatTurn( throw new Error('streaming response body is unavailable'); } - const reader = response.body.getReader(); - const decoder = new TextDecoder('utf-8'); - let buffer = ''; - let accumulatedReply = ''; - let completedResult: NpcChatTurnResult | null = null; - - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; - } - - buffer += decoder.decode(value, { stream: true }); - - while (buffer.includes('\n\n')) { - const boundary = buffer.indexOf('\n\n'); - const eventBlock = buffer.slice(0, boundary); - buffer = buffer.slice(boundary + 2); - - const parsedEvent = parseSseEventBlock(eventBlock); - if (!parsedEvent) { - continue; - } - - if (parsedEvent.data === '[DONE]') { - continue; - } - - if (parsedEvent.event === 'reply_delta') { - const payloadRecord = JSON.parse(parsedEvent.data) as Record< - string, - unknown - >; - const nextText = - typeof payloadRecord.text === 'string' ? payloadRecord.text : ''; - accumulatedReply = nextText; - options.onReplyUpdate?.(accumulatedReply); - continue; - } - - if (parsedEvent.event === 'complete') { - completedResult = JSON.parse(parsedEvent.data) as NpcChatTurnResult; - accumulatedReply = completedResult.npcReply; - options.onReplyUpdate?.(accumulatedReply); - continue; - } - - if (parsedEvent.event === 'error') { - const payloadRecord = JSON.parse(parsedEvent.data) as Record< - string, - unknown - >; - throw new Error( - typeof payloadRecord.message === 'string' - ? payloadRecord.message - : 'NPC 聊天续写失败', - ); - } - } - } - - if (!completedResult) { - throw new Error('NPC 聊天续写结果为空'); - } - - return completedResult; + return readNpcChatTurnFromSse(response, { + onReplyUpdate: options.onReplyUpdate, + }); } export async function streamNpcRecruitDialogue( diff --git a/src/services/creation-agent/creationAgentSse.ts b/src/services/creation-agent/creationAgentSse.ts index f8ed2f8a..a12dda95 100644 --- a/src/services/creation-agent/creationAgentSse.ts +++ b/src/services/creation-agent/creationAgentSse.ts @@ -1,5 +1,6 @@ import type { VisualNovelAgentStreamEvent } from '../../../packages/shared/src/contracts/visualNovel'; import type { TextStreamOptions } from '../aiTypes'; +import { readSseJsonStream } from '../sseStream'; type CreationAgentSseOptions = TextStreamOptions & { fallbackMessage: string; @@ -24,65 +25,6 @@ type CreationAgentSseOptions = TextStreamOptions & { | null; }; -function findSseEventBoundary(buffer: string) { - const lfBoundary = buffer.indexOf('\n\n'); - const crlfBoundary = buffer.indexOf('\r\n\r\n'); - - if (lfBoundary === -1 && crlfBoundary === -1) { - return null; - } - - if (lfBoundary === -1) { - return { - index: crlfBoundary, - length: 4, - }; - } - - if (crlfBoundary === -1 || lfBoundary < crlfBoundary) { - return { - index: lfBoundary, - length: 2, - }; - } - - return { - index: crlfBoundary, - length: 4, - }; -} - -function parseSseEventBlock(eventBlock: string) { - let eventName = 'message'; - const dataLines: string[] = []; - - for (const rawLine of eventBlock.split(/\r?\n/u)) { - const line = rawLine.trim(); - - if (line.startsWith('event:')) { - eventName = line.slice(6).trim() || 'message'; - continue; - } - - if (line.startsWith('data:')) { - dataLines.push(line.slice(5).trim()); - } - } - - return { - eventName, - data: dataLines.join('\n'), - }; -} - -function parseJsonObject(data: string) { - try { - return JSON.parse(data) as Record; - } catch { - return null; - } -} - type NormalizedCreationAgentSseEvent = NonNullable< CreationAgentSseOptions['normalizeEvent'] > extends (eventName: string, parsed: Record) => infer TResult @@ -147,71 +89,30 @@ export async function readCreationAgentSessionFromSse( response: Response, options: CreationAgentSseOptions, ) { - const streamBody = response.body; - if (!streamBody) { - throw new Error('streaming response body is unavailable'); - } - - const reader = streamBody.getReader(); - const decoder = new TextDecoder('utf-8'); const resolveSession = options.resolveSession ?? ((rawSession: unknown) => (rawSession as TSession | null) ?? null); - let buffer = ''; let finalSession: TSession | null = null; const normalizeEvent = options.normalizeEvent ?? normalizeDefaultCreationAgentEvent; - const consumeBuffer = () => { - for (;;) { - const boundary = findSseEventBoundary(buffer); - if (!boundary) { - break; - } - - const eventBlock = buffer.slice(0, boundary.index); - buffer = buffer.slice(boundary.index + boundary.length); - const { eventName, data } = parseSseEventBlock(eventBlock); - - if (!data) { - continue; - } - - const parsed = parseJsonObject(data); - if (!parsed) { - continue; - } + await readSseJsonStream(response, ({ eventName, parsed }) => { const normalized = normalizeEvent(eventName, parsed); if (normalized?.kind === 'reply_delta') { options.onUpdate?.(normalized.text); - continue; + return; } if (normalized?.kind === 'session') { finalSession = resolveSession(normalized.session); - continue; + return; } if (normalized?.kind === 'error') { throw new Error(normalized.message || options.fallbackMessage); } - } - }; - - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; - } - - buffer += decoder.decode(value, { stream: true }); - consumeBuffer(); - } - - // 流结束后再 flush 一次解码器,避免 UTF-8 多字节字符残留在内部缓冲里。 - buffer += decoder.decode(); - consumeBuffer(); + }); if (!finalSession) { throw new Error(options.incompleteMessage); diff --git a/src/services/creative-agent/creativeAgentSse.ts b/src/services/creative-agent/creativeAgentSse.ts index 9e2bdf74..306d6547 100644 --- a/src/services/creative-agent/creativeAgentSse.ts +++ b/src/services/creative-agent/creativeAgentSse.ts @@ -4,6 +4,7 @@ import type { CreativeDraftEditResult, } from '../../../packages/shared/src/contracts/creativeAgent'; import type { TextStreamOptions } from '../aiTypes'; +import { readSseJsonStream } from '../sseStream'; type CreativeAgentSseOptions = TextStreamOptions & { fallbackMessage: string; @@ -16,65 +17,6 @@ type CreativeAgentSseResult = { draftEditResult: CreativeDraftEditResult | null; }; -function findSseEventBoundary(buffer: string) { - const lfBoundary = buffer.indexOf('\n\n'); - const crlfBoundary = buffer.indexOf('\r\n\r\n'); - - if (lfBoundary === -1 && crlfBoundary === -1) { - return null; - } - - if (lfBoundary === -1) { - return { - index: crlfBoundary, - length: 4, - }; - } - - if (crlfBoundary === -1 || lfBoundary < crlfBoundary) { - return { - index: lfBoundary, - length: 2, - }; - } - - return { - index: crlfBoundary, - length: 4, - }; -} - -function parseSseEventBlock(eventBlock: string) { - let eventName = 'message'; - const dataLines: string[] = []; - - for (const rawLine of eventBlock.split(/\r?\n/u)) { - const line = rawLine.trim(); - - if (line.startsWith('event:')) { - eventName = line.slice(6).trim() || 'message'; - continue; - } - - if (line.startsWith('data:')) { - dataLines.push(line.slice(5).trim()); - } - } - - return { - eventName, - data: dataLines.join('\n'), - }; -} - -function parseJsonObject(data: string) { - try { - return JSON.parse(data) as Record; - } catch { - return null; - } -} - function normalizeCreativeAgentSseEvent( eventName: string, data: Record, @@ -105,13 +47,9 @@ function normalizeCreativeAgentSseEvent( function handleParsedCreativeAgentEvent( eventName: string, - parsed: Record | null, + parsed: Record, options: CreativeAgentSseOptions, ): Partial | null { - if (!parsed) { - return null; - } - const normalizedEvent = normalizeCreativeAgentSseEvent(eventName, parsed); if (normalizedEvent) { options.onEvent?.(normalizedEvent); @@ -168,59 +106,24 @@ export async function readCreativeAgentResultFromSse( response: Response, options: CreativeAgentSseOptions, ): Promise { - const streamBody = response.body; - if (!streamBody) { - throw new Error('streaming response body is unavailable'); - } - - const reader = streamBody.getReader(); - const decoder = new TextDecoder('utf-8'); - let buffer = ''; const result: CreativeAgentSseResult = { session: null, draftEditResult: null, }; - const consumeBuffer = () => { - for (;;) { - const boundary = findSseEventBoundary(buffer); - if (!boundary) { - break; - } - - const eventBlock = buffer.slice(0, boundary.index); - buffer = buffer.slice(boundary.index + boundary.length); - const { eventName, data } = parseSseEventBlock(eventBlock); - if (!data) { - continue; - } - - const nextResult = handleParsedCreativeAgentEvent( - eventName, - parseJsonObject(data), - options, - ); - if (nextResult?.session) { - result.session = nextResult.session; - } - if (nextResult?.draftEditResult) { - result.draftEditResult = nextResult.draftEditResult; - } + await readSseJsonStream(response, ({ eventName, parsed }) => { + const nextResult = handleParsedCreativeAgentEvent( + eventName, + parsed, + options, + ); + if (nextResult?.session) { + result.session = nextResult.session; } - }; - - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; + if (nextResult?.draftEditResult) { + result.draftEditResult = nextResult.draftEditResult; } - - buffer += decoder.decode(value, { stream: true }); - consumeBuffer(); - } - - buffer += decoder.decode(); - consumeBuffer(); + }); if (!result.session) { throw new Error(options.incompleteMessage); diff --git a/src/services/rpg-entry/rpgProfileClient.ts b/src/services/rpg-entry/rpgProfileClient.ts index 0751d4db..8aab97bc 100644 --- a/src/services/rpg-entry/rpgProfileClient.ts +++ b/src/services/rpg-entry/rpgProfileClient.ts @@ -1,14 +1,14 @@ import type { + ClaimProfileTaskRewardResponse, ConfirmWechatProfileRechargeOrderResponse, CreateProfileRechargeOrderResponse, - ClaimProfileTaskRewardResponse, PlatformBrowseHistoryBatchSyncRequest, PlatformBrowseHistoryResponse, PlatformBrowseHistoryWriteEntry, ProfileDashboardSummary, ProfilePlayStatsResponse, - ProfileReferralInviteCenterResponse, ProfileRechargeCenterResponse, + ProfileReferralInviteCenterResponse, ProfileSaveArchiveListResponse, ProfileSaveArchiveResumeResponse, ProfileTaskCenterResponse, @@ -24,10 +24,11 @@ import { rehydrateSavedSnapshot } from '../../persistence/runtimeSnapshot'; import type { HydratedSavedGameSnapshot } from '../../persistence/runtimeSnapshotTypes'; import { fetchWithApiAuth } from '../apiClient'; import { - RUNTIME_BACKGROUND_AUTH_OPTIONS, requestRpgRuntimeJson, + RUNTIME_BACKGROUND_AUTH_OPTIONS, type RuntimeRequestOptions, } from '../rpg-runtime/rpgRuntimeRequest'; +import { readSseJsonStream } from '../sseStream'; export type { RuntimeRequestOptions }; @@ -132,65 +133,6 @@ type RechargeOrderSseEvent = payload: { message: string }; }; -function findSseEventBoundary(buffer: string) { - const lfBoundary = buffer.indexOf('\n\n'); - const crlfBoundary = buffer.indexOf('\r\n\r\n'); - - if (lfBoundary === -1 && crlfBoundary === -1) { - return null; - } - - if (lfBoundary === -1) { - return { - index: crlfBoundary, - length: 4, - }; - } - - if (crlfBoundary === -1 || lfBoundary < crlfBoundary) { - return { - index: lfBoundary, - length: 2, - }; - } - - return { - index: crlfBoundary, - length: 4, - }; -} - -function parseSseEventBlock(eventBlock: string) { - let eventName = 'message'; - const dataLines: string[] = []; - - for (const rawLine of eventBlock.split(/\r?\n/u)) { - const line = rawLine.trim(); - - if (line.startsWith('event:')) { - eventName = line.slice(6).trim() || 'message'; - continue; - } - - if (line.startsWith('data:')) { - dataLines.push(line.slice(5).trim()); - } - } - - return { - eventName, - data: dataLines.join('\n'), - }; -} - -function parseJsonObject(data: string) { - try { - return JSON.parse(data) as Record; - } catch { - return null; - } -} - function normalizeRechargeOrderSseEvent( eventName: string, parsed: Record, @@ -264,81 +206,33 @@ export async function watchWechatRpgProfileRechargeOrder( throw new Error('streaming response body is unavailable'); } - const reader = response.body.getReader(); - const decoder = new TextDecoder('utf-8'); - let buffer = ''; let finalResponse: ConfirmWechatProfileRechargeOrderResponse | null = null; let lastResponse: ConfirmWechatProfileRechargeOrderResponse | null = null; - let streamDone = false; - const consumeBuffer = () => { - for (;;) { - const boundary = findSseEventBoundary(buffer); - if (!boundary) { - break; - } - - const eventBlock = buffer.slice(0, boundary.index); - buffer = buffer.slice(boundary.index + boundary.length); - const { eventName, data } = parseSseEventBlock(eventBlock); - - if (!data) { - continue; - } - - const parsed = parseJsonObject(data); - if (!parsed) { - continue; - } - const normalized = normalizeRechargeOrderSseEvent(eventName, parsed); - if (!normalized) { - continue; - } - - if (normalized.type === 'order') { - lastResponse = normalized.payload; - if (normalized.payload.order.status !== 'pending') { - finalResponse = normalized.payload; - } - continue; - } - - if (normalized.type === 'done') { - streamDone = true; - if (!finalResponse && lastResponse) { - finalResponse = lastResponse; - } - continue; - } - - throw new Error(normalized.payload.message || '订阅充值订单状态失败'); - } - }; - - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; + await readSseJsonStream(response, ({ eventName, parsed }) => { + const normalized = normalizeRechargeOrderSseEvent(eventName, parsed); + if (!normalized) { + return; } - buffer += decoder.decode(value, { stream: true }); - consumeBuffer(); - if (finalResponse) { - break; + if (normalized.type === 'order') { + lastResponse = normalized.payload; + if (normalized.payload.order.status !== 'pending') { + finalResponse = normalized.payload; + return false; + } + return; } - if (streamDone) { - break; - } - } - buffer += decoder.decode(); - consumeBuffer(); - - if (!finalResponse) { - if (lastResponse) { - finalResponse = lastResponse; + if (normalized.type === 'done') { + if (!finalResponse && lastResponse) { + finalResponse = lastResponse; + } + return false; } - } + + throw new Error(normalized.payload.message || '订阅充值订单状态失败'); + }); if (!finalResponse) { throw new Error('充值订单状态流返回不完整'); diff --git a/src/services/sseStream.test.ts b/src/services/sseStream.test.ts new file mode 100644 index 00000000..8049d0ba --- /dev/null +++ b/src/services/sseStream.test.ts @@ -0,0 +1,98 @@ +import { expect, test } from 'vitest'; + +import { readSseJsonStream, readSseStream } from './sseStream'; + +function createChunkedStreamResponse(chunks: Uint8Array[]) { + const stream = new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream; charset=utf-8', + }, + }); +} + +test('readSseJsonStream flushes decoder tail and handles CRLF boundaries', async () => { + const encoder = new TextEncoder(); + const prefix = encoder.encode('event: reply_delta\r\ndata: {"text":"'); + const replyBytes = encoder.encode('溪上春风'); + const suffix = encoder.encode('"}\r\n\r\n'); + const splitIndex = replyBytes.length - 1; + const events: Array<{ eventName: string; parsed: Record }> = + []; + + await readSseJsonStream( + createChunkedStreamResponse([ + new Uint8Array([...prefix, ...replyBytes.slice(0, splitIndex)]), + new Uint8Array([...replyBytes.slice(splitIndex), ...suffix]), + ]), + ({ eventName, parsed }) => { + events.push({ eventName, parsed }); + }, + ); + + expect(events).toEqual([ + { + eventName: 'reply_delta', + parsed: { text: '溪上春风' }, + }, + ]); +}); + +test('readSseJsonStream skips malformed json and keeps valid LF events', async () => { + const encoder = new TextEncoder(); + const events: Array<{ eventName: string; parsed: Record }> = + []; + + await readSseJsonStream( + createChunkedStreamResponse([ + encoder.encode( + 'event: malformed\ndata: not-json\n\n' + + 'event: ready\ndata: {"value":7}\n\n', + ), + ]), + ({ eventName, parsed }) => { + events.push({ eventName, parsed }); + }, + ); + + expect(events).toEqual([ + { + eventName: 'ready', + parsed: { value: 7 }, + }, + ]); +}); + +test('readSseStream can stop early and cancel the reader', async () => { + const encoder = new TextEncoder(); + let cancelled = false; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + 'event: first\ndata: one\n\n' + 'event: second\ndata: two\n\n', + ), + ); + }, + cancel() { + cancelled = true; + }, + }); + const events: string[] = []; + + await readSseStream(new Response(stream), ({ eventName }) => { + events.push(eventName); + return false; + }); + + expect(events).toEqual(['first']); + expect(cancelled).toBe(true); +}); diff --git a/src/services/sseStream.ts b/src/services/sseStream.ts new file mode 100644 index 00000000..6659faaa --- /dev/null +++ b/src/services/sseStream.ts @@ -0,0 +1,168 @@ +export type SseStreamEvent = { + eventName: string; + data: string; +}; + +export type SseJsonStreamEvent = SseStreamEvent & { + parsed: Record; +}; + +type SseEventBoundary = { + index: number; + length: number; +}; + +type SseStreamEventHandler = ( + event: TEvent, +) => void | boolean; + +function findSseEventBoundary(buffer: string): SseEventBoundary | null { + const lfBoundary = buffer.indexOf('\n\n'); + const crlfBoundary = buffer.indexOf('\r\n\r\n'); + + if (lfBoundary === -1 && crlfBoundary === -1) { + return null; + } + + if (lfBoundary === -1) { + return { + index: crlfBoundary, + length: 4, + }; + } + + if (crlfBoundary === -1 || lfBoundary < crlfBoundary) { + return { + index: lfBoundary, + length: 2, + }; + } + + return { + index: crlfBoundary, + length: 4, + }; +} + +function parseSseEventBlock(eventBlock: string): SseStreamEvent | null { + let eventName = 'message'; + const dataLines: string[] = []; + + for (const rawLine of eventBlock.split(/\r?\n/u)) { + const line = rawLine.trim(); + + if (line.startsWith('event:')) { + eventName = line.slice(6).trim() || 'message'; + continue; + } + + if (line.startsWith('data:')) { + dataLines.push(line.slice(5).trim()); + } + } + + const data = dataLines.join('\n'); + if (!data) { + return null; + } + + return { + eventName, + data, + }; +} + +export function parseSseJsonObject(data: string): Record | null { + try { + const parsed = JSON.parse(data) as unknown; + return typeof parsed === 'object' && parsed !== null + ? (parsed as Record) + : null; + } catch { + return null; + } +} + +export async function readSseStream( + response: Response, + onEvent: SseStreamEventHandler, +) { + const streamBody = response.body; + if (!streamBody) { + throw new Error('streaming response body is unavailable'); + } + + const reader = streamBody.getReader(); + const decoder = new TextDecoder('utf-8'); + let buffer = ''; + let shouldContinue = true; + let completed = false; + + const consumeBuffer = () => { + for (;;) { + if (!shouldContinue) { + break; + } + + const boundary = findSseEventBoundary(buffer); + if (!boundary) { + break; + } + + const eventBlock = buffer.slice(0, boundary.index); + buffer = buffer.slice(boundary.index + boundary.length); + const event = parseSseEventBlock(eventBlock); + if (!event) { + continue; + } + + if (onEvent(event) === false) { + shouldContinue = false; + } + } + }; + + try { + for (;;) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + consumeBuffer(); + if (!shouldContinue) { + break; + } + } + + if (shouldContinue) { + // 流结束后 flush 解码器,避免 UTF-8 多字节字符残留在内部缓冲里。 + buffer += decoder.decode(); + consumeBuffer(); + completed = true; + } + } finally { + if (!completed && typeof reader.cancel === 'function') { + await reader.cancel().catch(() => {}); + } + reader.releaseLock?.(); + } +} + +export function readSseJsonStream( + response: Response, + onEvent: SseStreamEventHandler, +) { + return readSseStream(response, (event) => { + const parsed = parseSseJsonObject(event.data); + if (!parsed) { + return; + } + + return onEvent({ + ...event, + parsed, + }); + }); +} diff --git a/src/services/visual-novel-runtime/visualNovelRuntimeSse.ts b/src/services/visual-novel-runtime/visualNovelRuntimeSse.ts index a4892f2f..1aac77e8 100644 --- a/src/services/visual-novel-runtime/visualNovelRuntimeSse.ts +++ b/src/services/visual-novel-runtime/visualNovelRuntimeSse.ts @@ -2,6 +2,7 @@ import type { VisualNovelRunSnapshot, VisualNovelRuntimeStreamEvent, } from '../../../packages/shared/src/contracts/visualNovel'; +import { readSseJsonStream } from '../sseStream'; type VisualNovelRuntimeSseOptions = { fallbackMessage: string; @@ -9,65 +10,6 @@ type VisualNovelRuntimeSseOptions = { onEvent?: (event: VisualNovelRuntimeStreamEvent) => void; }; -function findSseEventBoundary(buffer: string) { - const lfBoundary = buffer.indexOf('\n\n'); - const crlfBoundary = buffer.indexOf('\r\n\r\n'); - - if (lfBoundary === -1 && crlfBoundary === -1) { - return null; - } - - if (lfBoundary === -1) { - return { - index: crlfBoundary, - length: 4, - }; - } - - if (crlfBoundary === -1 || lfBoundary < crlfBoundary) { - return { - index: lfBoundary, - length: 2, - }; - } - - return { - index: crlfBoundary, - length: 4, - }; -} - -function parseSseEventBlock(eventBlock: string) { - let eventName = 'message'; - const dataLines: string[] = []; - - for (const rawLine of eventBlock.split(/\r?\n/u)) { - const line = rawLine.trim(); - - if (line.startsWith('event:')) { - eventName = line.slice(6).trim() || 'message'; - continue; - } - - if (line.startsWith('data:')) { - dataLines.push(line.slice(5).trim()); - } - } - - return { - eventName, - data: dataLines.join('\n'), - }; -} - -function parseJsonObject(data: string) { - try { - return JSON.parse(data) as Record; - } catch { - return null; - } -} - function normalizeVisualNovelRuntimeEvent( eventName: string, parsed: Record, @@ -115,59 +57,19 @@ export async function readVisualNovelRuntimeRunFromSse( response: Response, options: VisualNovelRuntimeSseOptions, ) { - const streamBody = response.body; - if (!streamBody) { - throw new Error('streaming response body is unavailable'); - } - - const reader = streamBody.getReader(); - const decoder = new TextDecoder('utf-8'); - let buffer = ''; let finalRun: VisualNovelRunSnapshot | null = null; - const consumeBuffer = () => { - for (;;) { - const boundary = findSseEventBoundary(buffer); - if (!boundary) { - break; - } - - const eventBlock = buffer.slice(0, boundary.index); - buffer = buffer.slice(boundary.index + boundary.length); - const { eventName, data } = parseSseEventBlock(eventBlock); - if (!data) { - continue; - } - - const parsed = parseJsonObject(data); - if (!parsed) { - continue; - } - - const event = normalizeVisualNovelRuntimeEvent(eventName, parsed); - if (!event) { - continue; - } - - const nextRun = handleVisualNovelRuntimeEvent(event, options); - if (nextRun) { - finalRun = nextRun; - } - } - }; - - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; + await readSseJsonStream(response, ({ eventName, parsed }) => { + const event = normalizeVisualNovelRuntimeEvent(eventName, parsed); + if (!event) { + return; } - buffer += decoder.decode(value, { stream: true }); - consumeBuffer(); - } - - buffer += decoder.decode(); - consumeBuffer(); + const nextRun = handleVisualNovelRuntimeEvent(event, options); + if (nextRun) { + finalRun = nextRun; + } + }); if (!finalRun) { throw new Error(options.incompleteMessage);