refactor: 收口前端 SSE 传输层

This commit is contained in:
2026-06-03 14:57:02 +08:00
parent 545ffa4b2c
commit 1eeb14c50f
10 changed files with 442 additions and 579 deletions

View File

@@ -35,13 +35,14 @@ import type {
TextStreamOptions,
} from './aiTypes';
import { fetchWithApiAuth, requestJson } from './apiClient';
import { type CharacterChatTargetStatus } from './rpgRuntimeChatTypes';
import { parseLineListContent } from './llmParsers';
import {
buildStoryMomentFromRuntimeProjection,
getStoryRuntimeProjection,
resolveRuntimeStoryAction,
} from './rpg-runtime/rpgRuntimeStoryClient';
import { type CharacterChatTargetStatus } from './rpgRuntimeChatTypes';
import { parseSseJsonObject, readSseJsonStream, readSseStream } from './sseStream';
const RUNTIME_API_BASE = '/api/runtime';
@@ -108,81 +109,96 @@ async function requestPlainTextStream(
throw new Error('streaming response body is unavailable');
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let accumulatedText = '';
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
await readSseStream(response, ({ data }) => {
if (data === '[DONE]') {
return false;
}
buffer += decoder.decode(value, { stream: true });
while (buffer.includes('\n\n')) {
const boundary = buffer.indexOf('\n\n');
const eventBlock = buffer.slice(0, boundary);
buffer = buffer.slice(boundary + 2);
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (!line.startsWith('data:')) {
continue;
}
const data = line.slice(5).trim();
if (!data || data === '[DONE]') {
continue;
}
try {
const parsed = JSON.parse(data);
const delta = parsed?.choices?.[0]?.delta?.content;
if (typeof delta === 'string' && delta.length > 0) {
accumulatedText += delta;
options.onUpdate?.(accumulatedText);
}
} catch {
// Ignore malformed SSE frames.
}
}
const parsed = parseSseJsonObject(data);
if (!parsed) {
return;
}
}
const delta = readPlainTextStreamDelta(parsed);
if (delta) {
accumulatedText += delta;
options.onUpdate?.(accumulatedText);
}
});
return accumulatedText.trim();
}
type ParsedSseEvent = {
event: string | null;
data: string;
};
function asRecord(value: unknown): Record<string, unknown> | null {
return typeof value === 'object' && value !== null
? (value as Record<string, unknown>)
: null;
}
function parseSseEventBlock(eventBlock: string): ParsedSseEvent | null {
let eventName: string | null = null;
const dataLines: string[] = [];
function readPlainTextStreamDelta(parsed: Record<string, unknown>) {
const choices = Array.isArray(parsed.choices) ? parsed.choices : [];
const firstChoice = asRecord(choices[0]);
const delta = asRecord(firstChoice?.delta);
const content = delta?.content;
return typeof content === 'string' ? content : '';
}
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (!line) continue;
if (line.startsWith('event:')) {
eventName = line.slice(6).trim() || null;
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
function readSseEventMessage(
parsed: Record<string, unknown>,
fallbackMessage: string,
) {
return typeof parsed.message === 'string' ? parsed.message : fallbackMessage;
}
if (dataLines.length === 0) {
return null;
}
function coerceNpcChatTurnResult(
parsed: Record<string, unknown>,
): NpcChatTurnResult {
return parsed as unknown as NpcChatTurnResult;
}
return {
event: eventName,
data: dataLines.join('\n'),
function readNpcReplyDelta(parsed: Record<string, unknown>) {
return typeof parsed.text === 'string' ? parsed.text : '';
}
function readNpcCompletedReply(result: NpcChatTurnResult) {
return typeof result.npcReply === 'string' ? result.npcReply : '';
}
async function readNpcChatTurnFromSse(
response: Response,
options: { onReplyUpdate?: (text: string) => void } = {},
): Promise<NpcChatTurnResult> {
let accumulatedReply = '';
const completedResultRef: { current: NpcChatTurnResult | null } = {
current: null,
};
await readSseJsonStream(response, ({ eventName, parsed }) => {
if (eventName === 'reply_delta') {
accumulatedReply = readNpcReplyDelta(parsed);
options.onReplyUpdate?.(accumulatedReply);
return;
}
if (eventName === 'complete') {
completedResultRef.current = coerceNpcChatTurnResult(parsed);
accumulatedReply = readNpcCompletedReply(completedResultRef.current);
options.onReplyUpdate?.(accumulatedReply);
return false;
}
if (eventName === 'error') {
throw new Error(readSseEventMessage(parsed, 'NPC 聊天续写失败'));
}
});
if (!completedResultRef.current) {
throw new Error('NPC 聊天续写结果为空');
}
return completedResultRef.current;
}
export async function generateInitialStory(
@@ -508,72 +524,9 @@ export async function streamNpcChatTurn(
throw new Error('streaming response body is unavailable');
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let accumulatedReply = '';
let completedResult: NpcChatTurnResult | null = null;
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
while (buffer.includes('\n\n')) {
const boundary = buffer.indexOf('\n\n');
const eventBlock = buffer.slice(0, boundary);
buffer = buffer.slice(boundary + 2);
const parsedEvent = parseSseEventBlock(eventBlock);
if (!parsedEvent) {
continue;
}
if (parsedEvent.data === '[DONE]') {
continue;
}
if (parsedEvent.event === 'reply_delta') {
const payloadRecord = JSON.parse(parsedEvent.data) as Record<
string,
unknown
>;
const nextText =
typeof payloadRecord.text === 'string' ? payloadRecord.text : '';
accumulatedReply = nextText;
options.onReplyUpdate?.(accumulatedReply);
continue;
}
if (parsedEvent.event === 'complete') {
completedResult = JSON.parse(parsedEvent.data) as NpcChatTurnResult;
accumulatedReply = completedResult.npcReply;
options.onReplyUpdate?.(accumulatedReply);
continue;
}
if (parsedEvent.event === 'error') {
const payloadRecord = JSON.parse(parsedEvent.data) as Record<
string,
unknown
>;
throw new Error(
typeof payloadRecord.message === 'string'
? payloadRecord.message
: 'NPC 聊天续写失败',
);
}
}
}
if (!completedResult) {
throw new Error('NPC 聊天续写结果为空');
}
return completedResult;
return readNpcChatTurnFromSse(response, {
onReplyUpdate: options.onReplyUpdate,
});
}
export async function streamNpcRecruitDialogue(

View File

@@ -1,5 +1,6 @@
import type { VisualNovelAgentStreamEvent } from '../../../packages/shared/src/contracts/visualNovel';
import type { TextStreamOptions } from '../aiTypes';
import { readSseJsonStream } from '../sseStream';
type CreationAgentSseOptions<TSession> = TextStreamOptions & {
fallbackMessage: string;
@@ -24,65 +25,6 @@ type CreationAgentSseOptions<TSession> = TextStreamOptions & {
| null;
};
function findSseEventBoundary(buffer: string) {
const lfBoundary = buffer.indexOf('\n\n');
const crlfBoundary = buffer.indexOf('\r\n\r\n');
if (lfBoundary === -1 && crlfBoundary === -1) {
return null;
}
if (lfBoundary === -1) {
return {
index: crlfBoundary,
length: 4,
};
}
if (crlfBoundary === -1 || lfBoundary < crlfBoundary) {
return {
index: lfBoundary,
length: 2,
};
}
return {
index: crlfBoundary,
length: 4,
};
}
function parseSseEventBlock(eventBlock: string) {
let eventName = 'message';
const dataLines: string[] = [];
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (line.startsWith('event:')) {
eventName = line.slice(6).trim() || 'message';
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
return {
eventName,
data: dataLines.join('\n'),
};
}
function parseJsonObject(data: string) {
try {
return JSON.parse(data) as Record<string, unknown>;
} catch {
return null;
}
}
type NormalizedCreationAgentSseEvent = NonNullable<
CreationAgentSseOptions<unknown>['normalizeEvent']
> extends (eventName: string, parsed: Record<string, unknown>) => infer TResult
@@ -147,71 +89,30 @@ export async function readCreationAgentSessionFromSse<TSession>(
response: Response,
options: CreationAgentSseOptions<TSession>,
) {
const streamBody = response.body;
if (!streamBody) {
throw new Error('streaming response body is unavailable');
}
const reader = streamBody.getReader();
const decoder = new TextDecoder('utf-8');
const resolveSession =
options.resolveSession ??
((rawSession: unknown) => (rawSession as TSession | null) ?? null);
let buffer = '';
let finalSession: TSession | null = null;
const normalizeEvent =
options.normalizeEvent ?? normalizeDefaultCreationAgentEvent;
const consumeBuffer = () => {
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
break;
}
const eventBlock = buffer.slice(0, boundary.index);
buffer = buffer.slice(boundary.index + boundary.length);
const { eventName, data } = parseSseEventBlock(eventBlock);
if (!data) {
continue;
}
const parsed = parseJsonObject(data);
if (!parsed) {
continue;
}
await readSseJsonStream(response, ({ eventName, parsed }) => {
const normalized = normalizeEvent(eventName, parsed);
if (normalized?.kind === 'reply_delta') {
options.onUpdate?.(normalized.text);
continue;
return;
}
if (normalized?.kind === 'session') {
finalSession = resolveSession(normalized.session);
continue;
return;
}
if (normalized?.kind === 'error') {
throw new Error(normalized.message || options.fallbackMessage);
}
}
};
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
consumeBuffer();
}
// 流结束后再 flush 一次解码器,避免 UTF-8 多字节字符残留在内部缓冲里。
buffer += decoder.decode();
consumeBuffer();
});
if (!finalSession) {
throw new Error(options.incompleteMessage);

View File

@@ -4,6 +4,7 @@ import type {
CreativeDraftEditResult,
} from '../../../packages/shared/src/contracts/creativeAgent';
import type { TextStreamOptions } from '../aiTypes';
import { readSseJsonStream } from '../sseStream';
type CreativeAgentSseOptions = TextStreamOptions & {
fallbackMessage: string;
@@ -16,65 +17,6 @@ type CreativeAgentSseResult = {
draftEditResult: CreativeDraftEditResult | null;
};
function findSseEventBoundary(buffer: string) {
const lfBoundary = buffer.indexOf('\n\n');
const crlfBoundary = buffer.indexOf('\r\n\r\n');
if (lfBoundary === -1 && crlfBoundary === -1) {
return null;
}
if (lfBoundary === -1) {
return {
index: crlfBoundary,
length: 4,
};
}
if (crlfBoundary === -1 || lfBoundary < crlfBoundary) {
return {
index: lfBoundary,
length: 2,
};
}
return {
index: crlfBoundary,
length: 4,
};
}
function parseSseEventBlock(eventBlock: string) {
let eventName = 'message';
const dataLines: string[] = [];
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (line.startsWith('event:')) {
eventName = line.slice(6).trim() || 'message';
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
return {
eventName,
data: dataLines.join('\n'),
};
}
function parseJsonObject(data: string) {
try {
return JSON.parse(data) as Record<string, unknown>;
} catch {
return null;
}
}
function normalizeCreativeAgentSseEvent(
eventName: string,
data: Record<string, unknown>,
@@ -105,13 +47,9 @@ function normalizeCreativeAgentSseEvent(
function handleParsedCreativeAgentEvent(
eventName: string,
parsed: Record<string, unknown> | null,
parsed: Record<string, unknown>,
options: CreativeAgentSseOptions,
): Partial<CreativeAgentSseResult> | null {
if (!parsed) {
return null;
}
const normalizedEvent = normalizeCreativeAgentSseEvent(eventName, parsed);
if (normalizedEvent) {
options.onEvent?.(normalizedEvent);
@@ -168,59 +106,24 @@ export async function readCreativeAgentResultFromSse(
response: Response,
options: CreativeAgentSseOptions,
): Promise<CreativeAgentSseResult> {
const streamBody = response.body;
if (!streamBody) {
throw new Error('streaming response body is unavailable');
}
const reader = streamBody.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
const result: CreativeAgentSseResult = {
session: null,
draftEditResult: null,
};
const consumeBuffer = () => {
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
break;
}
const eventBlock = buffer.slice(0, boundary.index);
buffer = buffer.slice(boundary.index + boundary.length);
const { eventName, data } = parseSseEventBlock(eventBlock);
if (!data) {
continue;
}
const nextResult = handleParsedCreativeAgentEvent(
eventName,
parseJsonObject(data),
options,
);
if (nextResult?.session) {
result.session = nextResult.session;
}
if (nextResult?.draftEditResult) {
result.draftEditResult = nextResult.draftEditResult;
}
await readSseJsonStream(response, ({ eventName, parsed }) => {
const nextResult = handleParsedCreativeAgentEvent(
eventName,
parsed,
options,
);
if (nextResult?.session) {
result.session = nextResult.session;
}
};
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
if (nextResult?.draftEditResult) {
result.draftEditResult = nextResult.draftEditResult;
}
buffer += decoder.decode(value, { stream: true });
consumeBuffer();
}
buffer += decoder.decode();
consumeBuffer();
});
if (!result.session) {
throw new Error(options.incompleteMessage);

View File

@@ -1,14 +1,14 @@
import type {
ClaimProfileTaskRewardResponse,
ConfirmWechatProfileRechargeOrderResponse,
CreateProfileRechargeOrderResponse,
ClaimProfileTaskRewardResponse,
PlatformBrowseHistoryBatchSyncRequest,
PlatformBrowseHistoryResponse,
PlatformBrowseHistoryWriteEntry,
ProfileDashboardSummary,
ProfilePlayStatsResponse,
ProfileReferralInviteCenterResponse,
ProfileRechargeCenterResponse,
ProfileReferralInviteCenterResponse,
ProfileSaveArchiveListResponse,
ProfileSaveArchiveResumeResponse,
ProfileTaskCenterResponse,
@@ -24,10 +24,11 @@ import { rehydrateSavedSnapshot } from '../../persistence/runtimeSnapshot';
import type { HydratedSavedGameSnapshot } from '../../persistence/runtimeSnapshotTypes';
import { fetchWithApiAuth } from '../apiClient';
import {
RUNTIME_BACKGROUND_AUTH_OPTIONS,
requestRpgRuntimeJson,
RUNTIME_BACKGROUND_AUTH_OPTIONS,
type RuntimeRequestOptions,
} from '../rpg-runtime/rpgRuntimeRequest';
import { readSseJsonStream } from '../sseStream';
export type { RuntimeRequestOptions };
@@ -132,65 +133,6 @@ type RechargeOrderSseEvent =
payload: { message: string };
};
function findSseEventBoundary(buffer: string) {
const lfBoundary = buffer.indexOf('\n\n');
const crlfBoundary = buffer.indexOf('\r\n\r\n');
if (lfBoundary === -1 && crlfBoundary === -1) {
return null;
}
if (lfBoundary === -1) {
return {
index: crlfBoundary,
length: 4,
};
}
if (crlfBoundary === -1 || lfBoundary < crlfBoundary) {
return {
index: lfBoundary,
length: 2,
};
}
return {
index: crlfBoundary,
length: 4,
};
}
function parseSseEventBlock(eventBlock: string) {
let eventName = 'message';
const dataLines: string[] = [];
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (line.startsWith('event:')) {
eventName = line.slice(6).trim() || 'message';
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
return {
eventName,
data: dataLines.join('\n'),
};
}
function parseJsonObject(data: string) {
try {
return JSON.parse(data) as Record<string, unknown>;
} catch {
return null;
}
}
function normalizeRechargeOrderSseEvent(
eventName: string,
parsed: Record<string, unknown>,
@@ -264,81 +206,33 @@ export async function watchWechatRpgProfileRechargeOrder(
throw new Error('streaming response body is unavailable');
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let finalResponse: ConfirmWechatProfileRechargeOrderResponse | null = null;
let lastResponse: ConfirmWechatProfileRechargeOrderResponse | null = null;
let streamDone = false;
const consumeBuffer = () => {
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
break;
}
const eventBlock = buffer.slice(0, boundary.index);
buffer = buffer.slice(boundary.index + boundary.length);
const { eventName, data } = parseSseEventBlock(eventBlock);
if (!data) {
continue;
}
const parsed = parseJsonObject(data);
if (!parsed) {
continue;
}
const normalized = normalizeRechargeOrderSseEvent(eventName, parsed);
if (!normalized) {
continue;
}
if (normalized.type === 'order') {
lastResponse = normalized.payload;
if (normalized.payload.order.status !== 'pending') {
finalResponse = normalized.payload;
}
continue;
}
if (normalized.type === 'done') {
streamDone = true;
if (!finalResponse && lastResponse) {
finalResponse = lastResponse;
}
continue;
}
throw new Error(normalized.payload.message || '订阅充值订单状态失败');
}
};
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
await readSseJsonStream(response, ({ eventName, parsed }) => {
const normalized = normalizeRechargeOrderSseEvent(eventName, parsed);
if (!normalized) {
return;
}
buffer += decoder.decode(value, { stream: true });
consumeBuffer();
if (finalResponse) {
break;
if (normalized.type === 'order') {
lastResponse = normalized.payload;
if (normalized.payload.order.status !== 'pending') {
finalResponse = normalized.payload;
return false;
}
return;
}
if (streamDone) {
break;
}
}
buffer += decoder.decode();
consumeBuffer();
if (!finalResponse) {
if (lastResponse) {
finalResponse = lastResponse;
if (normalized.type === 'done') {
if (!finalResponse && lastResponse) {
finalResponse = lastResponse;
}
return false;
}
}
throw new Error(normalized.payload.message || '订阅充值订单状态失败');
});
if (!finalResponse) {
throw new Error('充值订单状态流返回不完整');

View File

@@ -0,0 +1,98 @@
import { expect, test } from 'vitest';
import { readSseJsonStream, readSseStream } from './sseStream';
function createChunkedStreamResponse(chunks: Uint8Array[]) {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
for (const chunk of chunks) {
controller.enqueue(chunk);
}
controller.close();
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
},
});
}
test('readSseJsonStream flushes decoder tail and handles CRLF boundaries', async () => {
const encoder = new TextEncoder();
const prefix = encoder.encode('event: reply_delta\r\ndata: {"text":"');
const replyBytes = encoder.encode('溪上春风');
const suffix = encoder.encode('"}\r\n\r\n');
const splitIndex = replyBytes.length - 1;
const events: Array<{ eventName: string; parsed: Record<string, unknown> }> =
[];
await readSseJsonStream(
createChunkedStreamResponse([
new Uint8Array([...prefix, ...replyBytes.slice(0, splitIndex)]),
new Uint8Array([...replyBytes.slice(splitIndex), ...suffix]),
]),
({ eventName, parsed }) => {
events.push({ eventName, parsed });
},
);
expect(events).toEqual([
{
eventName: 'reply_delta',
parsed: { text: '溪上春风' },
},
]);
});
test('readSseJsonStream skips malformed json and keeps valid LF events', async () => {
const encoder = new TextEncoder();
const events: Array<{ eventName: string; parsed: Record<string, unknown> }> =
[];
await readSseJsonStream(
createChunkedStreamResponse([
encoder.encode(
'event: malformed\ndata: not-json\n\n' +
'event: ready\ndata: {"value":7}\n\n',
),
]),
({ eventName, parsed }) => {
events.push({ eventName, parsed });
},
);
expect(events).toEqual([
{
eventName: 'ready',
parsed: { value: 7 },
},
]);
});
test('readSseStream can stop early and cancel the reader', async () => {
const encoder = new TextEncoder();
let cancelled = false;
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(
encoder.encode(
'event: first\ndata: one\n\n' + 'event: second\ndata: two\n\n',
),
);
},
cancel() {
cancelled = true;
},
});
const events: string[] = [];
await readSseStream(new Response(stream), ({ eventName }) => {
events.push(eventName);
return false;
});
expect(events).toEqual(['first']);
expect(cancelled).toBe(true);
});

168
src/services/sseStream.ts Normal file
View File

@@ -0,0 +1,168 @@
export type SseStreamEvent = {
eventName: string;
data: string;
};
export type SseJsonStreamEvent = SseStreamEvent & {
parsed: Record<string, unknown>;
};
type SseEventBoundary = {
index: number;
length: number;
};
type SseStreamEventHandler<TEvent extends SseStreamEvent> = (
event: TEvent,
) => void | boolean;
function findSseEventBoundary(buffer: string): SseEventBoundary | null {
const lfBoundary = buffer.indexOf('\n\n');
const crlfBoundary = buffer.indexOf('\r\n\r\n');
if (lfBoundary === -1 && crlfBoundary === -1) {
return null;
}
if (lfBoundary === -1) {
return {
index: crlfBoundary,
length: 4,
};
}
if (crlfBoundary === -1 || lfBoundary < crlfBoundary) {
return {
index: lfBoundary,
length: 2,
};
}
return {
index: crlfBoundary,
length: 4,
};
}
function parseSseEventBlock(eventBlock: string): SseStreamEvent | null {
let eventName = 'message';
const dataLines: string[] = [];
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (line.startsWith('event:')) {
eventName = line.slice(6).trim() || 'message';
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
const data = dataLines.join('\n');
if (!data) {
return null;
}
return {
eventName,
data,
};
}
export function parseSseJsonObject(data: string): Record<string, unknown> | null {
try {
const parsed = JSON.parse(data) as unknown;
return typeof parsed === 'object' && parsed !== null
? (parsed as Record<string, unknown>)
: null;
} catch {
return null;
}
}
export async function readSseStream(
response: Response,
onEvent: SseStreamEventHandler<SseStreamEvent>,
) {
const streamBody = response.body;
if (!streamBody) {
throw new Error('streaming response body is unavailable');
}
const reader = streamBody.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let shouldContinue = true;
let completed = false;
const consumeBuffer = () => {
for (;;) {
if (!shouldContinue) {
break;
}
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
break;
}
const eventBlock = buffer.slice(0, boundary.index);
buffer = buffer.slice(boundary.index + boundary.length);
const event = parseSseEventBlock(eventBlock);
if (!event) {
continue;
}
if (onEvent(event) === false) {
shouldContinue = false;
}
}
};
try {
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
consumeBuffer();
if (!shouldContinue) {
break;
}
}
if (shouldContinue) {
// 流结束后 flush 解码器,避免 UTF-8 多字节字符残留在内部缓冲里。
buffer += decoder.decode();
consumeBuffer();
completed = true;
}
} finally {
if (!completed && typeof reader.cancel === 'function') {
await reader.cancel().catch(() => {});
}
reader.releaseLock?.();
}
}
export function readSseJsonStream(
response: Response,
onEvent: SseStreamEventHandler<SseJsonStreamEvent>,
) {
return readSseStream(response, (event) => {
const parsed = parseSseJsonObject(event.data);
if (!parsed) {
return;
}
return onEvent({
...event,
parsed,
});
});
}

View File

@@ -2,6 +2,7 @@ import type {
VisualNovelRunSnapshot,
VisualNovelRuntimeStreamEvent,
} from '../../../packages/shared/src/contracts/visualNovel';
import { readSseJsonStream } from '../sseStream';
type VisualNovelRuntimeSseOptions = {
fallbackMessage: string;
@@ -9,65 +10,6 @@ type VisualNovelRuntimeSseOptions = {
onEvent?: (event: VisualNovelRuntimeStreamEvent) => void;
};
function findSseEventBoundary(buffer: string) {
const lfBoundary = buffer.indexOf('\n\n');
const crlfBoundary = buffer.indexOf('\r\n\r\n');
if (lfBoundary === -1 && crlfBoundary === -1) {
return null;
}
if (lfBoundary === -1) {
return {
index: crlfBoundary,
length: 4,
};
}
if (crlfBoundary === -1 || lfBoundary < crlfBoundary) {
return {
index: lfBoundary,
length: 2,
};
}
return {
index: crlfBoundary,
length: 4,
};
}
function parseSseEventBlock(eventBlock: string) {
let eventName = 'message';
const dataLines: string[] = [];
for (const rawLine of eventBlock.split(/\r?\n/u)) {
const line = rawLine.trim();
if (line.startsWith('event:')) {
eventName = line.slice(6).trim() || 'message';
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
return {
eventName,
data: dataLines.join('\n'),
};
}
function parseJsonObject(data: string) {
try {
return JSON.parse(data) as Record<string, unknown>;
} catch {
return null;
}
}
function normalizeVisualNovelRuntimeEvent(
eventName: string,
parsed: Record<string, unknown>,
@@ -115,59 +57,19 @@ export async function readVisualNovelRuntimeRunFromSse(
response: Response,
options: VisualNovelRuntimeSseOptions,
) {
const streamBody = response.body;
if (!streamBody) {
throw new Error('streaming response body is unavailable');
}
const reader = streamBody.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let finalRun: VisualNovelRunSnapshot | null = null;
const consumeBuffer = () => {
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
break;
}
const eventBlock = buffer.slice(0, boundary.index);
buffer = buffer.slice(boundary.index + boundary.length);
const { eventName, data } = parseSseEventBlock(eventBlock);
if (!data) {
continue;
}
const parsed = parseJsonObject(data);
if (!parsed) {
continue;
}
const event = normalizeVisualNovelRuntimeEvent(eventName, parsed);
if (!event) {
continue;
}
const nextRun = handleVisualNovelRuntimeEvent(event, options);
if (nextRun) {
finalRun = nextRun;
}
}
};
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
await readSseJsonStream(response, ({ eventName, parsed }) => {
const event = normalizeVisualNovelRuntimeEvent(eventName, parsed);
if (!event) {
return;
}
buffer += decoder.decode(value, { stream: true });
consumeBuffer();
}
buffer += decoder.decode();
consumeBuffer();
const nextRun = handleVisualNovelRuntimeEvent(event, options);
if (nextRun) {
finalRun = nextRun;
}
});
if (!finalRun) {
throw new Error(options.incompleteMessage);