From 2a75a19ece8d801963c9d43f9e84a19dc7bc0089 Mon Sep 17 00:00:00 2001 From: kdletters Date: Wed, 13 May 2026 20:44:22 +0800 Subject: [PATCH] fix: handle visual novel typed SSE events --- .../spacetime-module/src/runtime/profile.rs | 19 +++ .../creationAgentClientFactory.ts | 24 ++- .../creation-agent/creationAgentSse.test.ts | 55 +++++- .../creation-agent/creationAgentSse.ts | 161 +++++++++++------- .../rpg-creation/rpgCreationAgentClient.ts | 2 +- .../visualNovelCreationClient.ts | 10 +- 6 files changed, 206 insertions(+), 65 deletions(-) diff --git a/server-rs/crates/spacetime-module/src/runtime/profile.rs b/server-rs/crates/spacetime-module/src/runtime/profile.rs index e0fcfcd5..2d27ca06 100644 --- a/server-rs/crates/spacetime-module/src/runtime/profile.rs +++ b/server-rs/crates/spacetime-module/src/runtime/profile.rs @@ -1409,6 +1409,12 @@ fn build_public_work_like_id(source_type: &str, profile_id: &str, user_id: &str) mod tests { use super::*; + #[test] + fn duplicate_tracking_event_ids_are_treated_as_idempotent_replays() { + assert!(should_skip_existing_tracking_event_id(true)); + assert!(!should_skip_existing_tracking_event_id(false)); + } + #[test] fn recent_public_work_play_counts_group_requested_profiles_in_window() { let now_micros = PUBLIC_WORK_PLAY_DAY_MICROS * 10; @@ -3223,6 +3229,10 @@ fn record_daily_login_tracking_event(ctx: &ReducerContext, user_id: &str) -> Res ) } +fn should_skip_existing_tracking_event_id(event_exists: bool) -> bool { + event_exists +} + fn record_tracking_event( ctx: &ReducerContext, input: RuntimeTrackingEventInput, @@ -3242,6 +3252,15 @@ fn record_tracking_event( .map_err(|error| error.to_string())?; let occurred_at = Timestamp::from_micros_since_unix_epoch(validated_input.occurred_at_micros); let day_key = runtime_profile_beijing_day_key(validated_input.occurred_at_micros); + if should_skip_existing_tracking_event_id( + ctx.db + .tracking_event() + .event_id() + .find(&validated_input.event_id) + .is_some(), + ) { + return Ok(()); + } // 中文注释:埋点事实与日期维表使用同一北京时间业务日桶,先幂等补齐维表,避免后续周/月/季/年聚合缺少 bucket 映射。 ensure_analytics_date_dimension_row(ctx, day_key)?; ctx.db.tracking_event().insert(TrackingEvent { diff --git a/src/services/creation-agent/creationAgentClientFactory.ts b/src/services/creation-agent/creationAgentClientFactory.ts index 688cffec..208108da 100644 --- a/src/services/creation-agent/creationAgentClientFactory.ts +++ b/src/services/creation-agent/creationAgentClientFactory.ts @@ -68,6 +68,28 @@ async function openCreationAgentSsePost( return response; } +type CreationAgentNormalizedStreamEvent = + | { + kind: 'reply_delta'; + text: string; + } + | { + kind: 'session'; + session: unknown; + } + | { + kind: 'error'; + message: string; + } + | null; + +type CreationAgentStreamOptions = TextStreamOptions & { + normalizeEvent?: ( + eventName: string, + parsed: Record, + ) => CreationAgentNormalizedStreamEvent; +}; + /** * 三类作品创作 Agent 都遵循同一组 HTTP/SSE 端点形状。 * 这里统一请求骨架,玩法 client 只保留路径、类型与中文错误文案差异。 @@ -128,7 +150,7 @@ export function createCreationAgentClient< const streamMessage = async ( sessionId: string, payload: TSendMessagePayload, - options: TextStreamOptions = {}, + options: CreationAgentStreamOptions = {}, ): Promise => { const response = await openCreationAgentSsePost( `${apiBase}/${encodeURIComponent(sessionId)}/messages/stream`, diff --git a/src/services/creation-agent/creationAgentSse.test.ts b/src/services/creation-agent/creationAgentSse.test.ts index 70314f3b..daaca683 100644 --- a/src/services/creation-agent/creationAgentSse.test.ts +++ b/src/services/creation-agent/creationAgentSse.test.ts @@ -1,6 +1,9 @@ -import { expect, test } from 'vitest'; +import { expect, test, vi } from 'vitest'; -import { readCreationAgentSessionFromSse } from './creationAgentSse'; +import { + normalizeVisualNovelAgentStreamEvent, + readCreationAgentSessionFromSse, +} from './creationAgentSse'; function createChunkedStreamResponse(chunks: Uint8Array[]) { const stream = new ReadableStream({ @@ -76,3 +79,51 @@ test('readCreationAgentSessionFromSse keeps streamed updates before error event' expect(updates).toEqual(['先把方洞万能的反差定住。']); }); + +test('readCreationAgentSessionFromSse can normalize typed visual novel stream events', async () => { + const encoder = new TextEncoder(); + const session = { + sessionId: 'vn-session-1', + ownerUserId: 'user-1', + progressPercent: 100, + stage: 'draft_ready', + }; + const onUpdate = vi.fn(); + + const response = createChunkedStreamResponse([ + encoder.encode( + 'data: {"type":"start","sessionId":"vn-session-1"}\n\n' + + 'data: {"type":"phase","phase":"synthesis"}\n\n' + + 'data: {"type":"text_delta","text":"视觉小说底稿已生成。"}\n\n' + + `data: ${JSON.stringify({ type: 'complete', session })}\n\n` + + 'data: {"type":"done"}\n\n', + ), + ]); + + await expect( + readCreationAgentSessionFromSse(response, { + fallbackMessage: '发送失败', + incompleteMessage: '结果不完整', + normalizeEvent: normalizeVisualNovelAgentStreamEvent, + onUpdate, + }), + ).resolves.toEqual(session); + expect(onUpdate).toHaveBeenCalledWith('视觉小说底稿已生成。'); +}); + +test('readCreationAgentSessionFromSse surfaces typed visual novel error events', async () => { + const encoder = new TextEncoder(); + const response = createChunkedStreamResponse([ + encoder.encode( + 'data: {"type":"error","message":"视觉小说流式创作失败","retryable":true}\n\n', + ), + ]); + + await expect( + readCreationAgentSessionFromSse(response, { + fallbackMessage: '发送失败', + incompleteMessage: '结果不完整', + normalizeEvent: normalizeVisualNovelAgentStreamEvent, + }), + ).rejects.toThrow('视觉小说流式创作失败'); +}); diff --git a/src/services/creation-agent/creationAgentSse.ts b/src/services/creation-agent/creationAgentSse.ts index 219f9219..f8ed2f8a 100644 --- a/src/services/creation-agent/creationAgentSse.ts +++ b/src/services/creation-agent/creationAgentSse.ts @@ -1,9 +1,27 @@ +import type { VisualNovelAgentStreamEvent } from '../../../packages/shared/src/contracts/visualNovel'; import type { TextStreamOptions } from '../aiTypes'; type CreationAgentSseOptions = TextStreamOptions & { fallbackMessage: string; incompleteMessage: string; resolveSession?: (rawSession: unknown) => TSession | null; + normalizeEvent?: ( + eventName: string, + parsed: Record, + ) => + | { + kind: 'reply_delta'; + text: string; + } + | { + kind: 'session'; + session: unknown; + } + | { + kind: 'error'; + message: string; + } + | null; }; function findSseEventBoundary(buffer: string) { @@ -65,6 +83,66 @@ function parseJsonObject(data: string) { } } +type NormalizedCreationAgentSseEvent = NonNullable< + CreationAgentSseOptions['normalizeEvent'] +> extends (eventName: string, parsed: Record) => infer TResult + ? TResult + : never; + +function normalizeDefaultCreationAgentEvent( + eventName: string, + parsed: Record, +): NormalizedCreationAgentSseEvent { + if (eventName === 'reply_delta') { + const text = parsed.text; + return typeof text === 'string' ? { kind: 'reply_delta', text } : null; + } + + if (eventName === 'session' && parsed.session) { + return { kind: 'session', session: parsed.session }; + } + + if (eventName === 'error') { + const message = + typeof parsed.message === 'string' && parsed.message.trim() + ? parsed.message.trim() + : ''; + return { kind: 'error', message }; + } + + return null; +} + +export function normalizeVisualNovelAgentStreamEvent( + eventName: string, + parsed: Record, +): NormalizedCreationAgentSseEvent { + const typedEventName = + eventName === 'message' && typeof parsed.type === 'string' + ? parsed.type + : eventName; + const event = { + ...parsed, + type: typedEventName, + } as VisualNovelAgentStreamEvent; + + switch (event.type) { + case 'text_delta': + return typeof event.text === 'string' + ? { kind: 'reply_delta', text: event.text } + : null; + case 'complete': + return event.session ? { kind: 'session', session: event.session } : null; + case 'error': + return { + kind: 'error', + message: event.message.trim(), + }; + default: + return normalizeDefaultCreationAgentEvent(eventName, parsed); + } +} + export async function readCreationAgentSessionFromSse( response: Response, options: CreationAgentSseOptions, @@ -81,15 +159,10 @@ export async function readCreationAgentSessionFromSse( ((rawSession: unknown) => (rawSession as TSession | null) ?? null); let buffer = ''; let finalSession: TSession | null = null; + const normalizeEvent = + options.normalizeEvent ?? normalizeDefaultCreationAgentEvent; - for (;;) { - const { done, value } = await reader.read(); - if (done) { - break; - } - - buffer += decoder.decode(value, { stream: true }); - + const consumeBuffer = () => { for (;;) { const boundary = findSseEventBoundary(buffer); if (!boundary) { @@ -105,70 +178,40 @@ export async function readCreationAgentSessionFromSse( } const parsed = parseJsonObject(data); + if (!parsed) { + continue; + } + const normalized = normalizeEvent(eventName, parsed); - if (eventName === 'reply_delta' && parsed) { - const text = parsed.text; - if (typeof text === 'string') { - options.onUpdate?.(text); - } + if (normalized?.kind === 'reply_delta') { + options.onUpdate?.(normalized.text); continue; } - if (eventName === 'session' && parsed?.session) { - finalSession = resolveSession(parsed.session); + if (normalized?.kind === 'session') { + finalSession = resolveSession(normalized.session); continue; } - if (eventName === 'error' && parsed) { - const message = - typeof parsed.message === 'string' && parsed.message.trim() - ? parsed.message.trim() - : options.fallbackMessage; - throw new Error(message); + if (normalized?.kind === 'error') { + throw new Error(normalized.message || options.fallbackMessage); } } + }; + + for (;;) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + consumeBuffer(); } // 流结束后再 flush 一次解码器,避免 UTF-8 多字节字符残留在内部缓冲里。 buffer += decoder.decode(); - - for (;;) { - const boundary = findSseEventBoundary(buffer); - if (!boundary) { - break; - } - - const eventBlock = buffer.slice(0, boundary.index); - buffer = buffer.slice(boundary.index + boundary.length); - const { eventName, data } = parseSseEventBlock(eventBlock); - - if (!data) { - continue; - } - - const parsed = parseJsonObject(data); - - if (eventName === 'reply_delta' && parsed) { - const text = parsed.text; - if (typeof text === 'string') { - options.onUpdate?.(text); - } - continue; - } - - if (eventName === 'session' && parsed?.session) { - finalSession = resolveSession(parsed.session); - continue; - } - - if (eventName === 'error' && parsed) { - const message = - typeof parsed.message === 'string' && parsed.message.trim() - ? parsed.message.trim() - : options.fallbackMessage; - throw new Error(message); - } - } + consumeBuffer(); if (!finalSession) { throw new Error(options.incompleteMessage); diff --git a/src/services/rpg-creation/rpgCreationAgentClient.ts b/src/services/rpg-creation/rpgCreationAgentClient.ts index 593465a7..0a909009 100644 --- a/src/services/rpg-creation/rpgCreationAgentClient.ts +++ b/src/services/rpg-creation/rpgCreationAgentClient.ts @@ -72,7 +72,7 @@ export async function streamRpgCreationMessage( sessionId: string, payload: SendRpgAgentMessageRequest, options: TextStreamOptions = {}, -) { +): Promise { const response = await openRpgCreationSsePost( `/api/runtime${RPG_AGENT_API_BASE}/${encodeURIComponent(sessionId)}/messages/stream`, payload, diff --git a/src/services/visual-novel-creation/visualNovelCreationClient.ts b/src/services/visual-novel-creation/visualNovelCreationClient.ts index c755531b..d72ee86c 100644 --- a/src/services/visual-novel-creation/visualNovelCreationClient.ts +++ b/src/services/visual-novel-creation/visualNovelCreationClient.ts @@ -9,7 +9,10 @@ import type { } from '../../../packages/shared/src/contracts/visualNovel'; import type { TextStreamOptions } from '../aiTypes'; import { type ApiRetryOptions, requestJson } from '../apiClient'; -import { createCreationAgentClient } from '../creation-agent'; +import { + createCreationAgentClient, + normalizeVisualNovelAgentStreamEvent, +} from '../creation-agent'; const VISUAL_NOVEL_AGENT_API_BASE = '/api/creation/visual-novel/sessions'; const VISUAL_NOVEL_CREATION_WRITE_RETRY: ApiRetryOptions = { @@ -61,7 +64,10 @@ export function streamVisualNovelMessage( payload: SendVisualNovelMessageRequest, options: TextStreamOptions = {}, ) { - return visualNovelAgentHttpClient.streamMessage(sessionId, payload, options); + return visualNovelAgentHttpClient.streamMessage(sessionId, payload, { + ...options, + normalizeEvent: normalizeVisualNovelAgentStreamEvent, + }); } export function executeVisualNovelAction(