fix: handle visual novel typed SSE events

This commit is contained in:
2026-05-13 20:44:22 +08:00
parent c3fbf7a30b
commit 2a75a19ece
6 changed files with 206 additions and 65 deletions

View File

@@ -1409,6 +1409,12 @@ fn build_public_work_like_id(source_type: &str, profile_id: &str, user_id: &str)
mod tests {
use super::*;
#[test]
fn duplicate_tracking_event_ids_are_treated_as_idempotent_replays() {
assert!(should_skip_existing_tracking_event_id(true));
assert!(!should_skip_existing_tracking_event_id(false));
}
#[test]
fn recent_public_work_play_counts_group_requested_profiles_in_window() {
let now_micros = PUBLIC_WORK_PLAY_DAY_MICROS * 10;
@@ -3223,6 +3229,10 @@ fn record_daily_login_tracking_event(ctx: &ReducerContext, user_id: &str) -> Res
)
}
fn should_skip_existing_tracking_event_id(event_exists: bool) -> bool {
event_exists
}
fn record_tracking_event(
ctx: &ReducerContext,
input: RuntimeTrackingEventInput,
@@ -3242,6 +3252,15 @@ fn record_tracking_event(
.map_err(|error| error.to_string())?;
let occurred_at = Timestamp::from_micros_since_unix_epoch(validated_input.occurred_at_micros);
let day_key = runtime_profile_beijing_day_key(validated_input.occurred_at_micros);
if should_skip_existing_tracking_event_id(
ctx.db
.tracking_event()
.event_id()
.find(&validated_input.event_id)
.is_some(),
) {
return Ok(());
}
// 中文注释:埋点事实与日期维表使用同一北京时间业务日桶,先幂等补齐维表,避免后续周/月/季/年聚合缺少 bucket 映射。
ensure_analytics_date_dimension_row(ctx, day_key)?;
ctx.db.tracking_event().insert(TrackingEvent {

View File

@@ -68,6 +68,28 @@ async function openCreationAgentSsePost(
return response;
}
type CreationAgentNormalizedStreamEvent =
| {
kind: 'reply_delta';
text: string;
}
| {
kind: 'session';
session: unknown;
}
| {
kind: 'error';
message: string;
}
| null;
type CreationAgentStreamOptions = TextStreamOptions & {
normalizeEvent?: (
eventName: string,
parsed: Record<string, unknown>,
) => CreationAgentNormalizedStreamEvent;
};
/**
* 三类作品创作 Agent 都遵循同一组 HTTP/SSE 端点形状。
* 这里统一请求骨架,玩法 client 只保留路径、类型与中文错误文案差异。
@@ -128,7 +150,7 @@ export function createCreationAgentClient<
const streamMessage = async (
sessionId: string,
payload: TSendMessagePayload,
options: TextStreamOptions = {},
options: CreationAgentStreamOptions = {},
): Promise<TSession> => {
const response = await openCreationAgentSsePost(
`${apiBase}/${encodeURIComponent(sessionId)}/messages/stream`,

View File

@@ -1,6 +1,9 @@
import { expect, test } from 'vitest';
import { expect, test, vi } from 'vitest';
import { readCreationAgentSessionFromSse } from './creationAgentSse';
import {
normalizeVisualNovelAgentStreamEvent,
readCreationAgentSessionFromSse,
} from './creationAgentSse';
function createChunkedStreamResponse(chunks: Uint8Array[]) {
const stream = new ReadableStream<Uint8Array>({
@@ -76,3 +79,51 @@ test('readCreationAgentSessionFromSse keeps streamed updates before error event'
expect(updates).toEqual(['先把方洞万能的反差定住。']);
});
test('readCreationAgentSessionFromSse can normalize typed visual novel stream events', async () => {
const encoder = new TextEncoder();
const session = {
sessionId: 'vn-session-1',
ownerUserId: 'user-1',
progressPercent: 100,
stage: 'draft_ready',
};
const onUpdate = vi.fn();
const response = createChunkedStreamResponse([
encoder.encode(
'data: {"type":"start","sessionId":"vn-session-1"}\n\n' +
'data: {"type":"phase","phase":"synthesis"}\n\n' +
'data: {"type":"text_delta","text":"视觉小说底稿已生成。"}\n\n' +
`data: ${JSON.stringify({ type: 'complete', session })}\n\n` +
'data: {"type":"done"}\n\n',
),
]);
await expect(
readCreationAgentSessionFromSse(response, {
fallbackMessage: '发送失败',
incompleteMessage: '结果不完整',
normalizeEvent: normalizeVisualNovelAgentStreamEvent,
onUpdate,
}),
).resolves.toEqual(session);
expect(onUpdate).toHaveBeenCalledWith('视觉小说底稿已生成。');
});
test('readCreationAgentSessionFromSse surfaces typed visual novel error events', async () => {
const encoder = new TextEncoder();
const response = createChunkedStreamResponse([
encoder.encode(
'data: {"type":"error","message":"视觉小说流式创作失败","retryable":true}\n\n',
),
]);
await expect(
readCreationAgentSessionFromSse(response, {
fallbackMessage: '发送失败',
incompleteMessage: '结果不完整',
normalizeEvent: normalizeVisualNovelAgentStreamEvent,
}),
).rejects.toThrow('视觉小说流式创作失败');
});

View File

@@ -1,9 +1,27 @@
import type { VisualNovelAgentStreamEvent } from '../../../packages/shared/src/contracts/visualNovel';
import type { TextStreamOptions } from '../aiTypes';
type CreationAgentSseOptions<TSession> = TextStreamOptions & {
fallbackMessage: string;
incompleteMessage: string;
resolveSession?: (rawSession: unknown) => TSession | null;
normalizeEvent?: (
eventName: string,
parsed: Record<string, unknown>,
) =>
| {
kind: 'reply_delta';
text: string;
}
| {
kind: 'session';
session: unknown;
}
| {
kind: 'error';
message: string;
}
| null;
};
function findSseEventBoundary(buffer: string) {
@@ -65,6 +83,66 @@ function parseJsonObject(data: string) {
}
}
type NormalizedCreationAgentSseEvent = NonNullable<
CreationAgentSseOptions<unknown>['normalizeEvent']
> extends (eventName: string, parsed: Record<string, unknown>) => infer TResult
? TResult
: never;
function normalizeDefaultCreationAgentEvent(
eventName: string,
parsed: Record<string, unknown>,
): NormalizedCreationAgentSseEvent {
if (eventName === 'reply_delta') {
const text = parsed.text;
return typeof text === 'string' ? { kind: 'reply_delta', text } : null;
}
if (eventName === 'session' && parsed.session) {
return { kind: 'session', session: parsed.session };
}
if (eventName === 'error') {
const message =
typeof parsed.message === 'string' && parsed.message.trim()
? parsed.message.trim()
: '';
return { kind: 'error', message };
}
return null;
}
export function normalizeVisualNovelAgentStreamEvent(
eventName: string,
parsed: Record<string, unknown>,
): NormalizedCreationAgentSseEvent {
const typedEventName =
eventName === 'message' && typeof parsed.type === 'string'
? parsed.type
: eventName;
const event = {
...parsed,
type: typedEventName,
} as VisualNovelAgentStreamEvent;
switch (event.type) {
case 'text_delta':
return typeof event.text === 'string'
? { kind: 'reply_delta', text: event.text }
: null;
case 'complete':
return event.session ? { kind: 'session', session: event.session } : null;
case 'error':
return {
kind: 'error',
message: event.message.trim(),
};
default:
return normalizeDefaultCreationAgentEvent(eventName, parsed);
}
}
export async function readCreationAgentSessionFromSse<TSession>(
response: Response,
options: CreationAgentSseOptions<TSession>,
@@ -81,15 +159,10 @@ export async function readCreationAgentSessionFromSse<TSession>(
((rawSession: unknown) => (rawSession as TSession | null) ?? null);
let buffer = '';
let finalSession: TSession | null = null;
const normalizeEvent =
options.normalizeEvent ?? normalizeDefaultCreationAgentEvent;
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
const consumeBuffer = () => {
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
@@ -105,70 +178,40 @@ export async function readCreationAgentSessionFromSse<TSession>(
}
const parsed = parseJsonObject(data);
if (!parsed) {
continue;
}
const normalized = normalizeEvent(eventName, parsed);
if (eventName === 'reply_delta' && parsed) {
const text = parsed.text;
if (typeof text === 'string') {
options.onUpdate?.(text);
}
if (normalized?.kind === 'reply_delta') {
options.onUpdate?.(normalized.text);
continue;
}
if (eventName === 'session' && parsed?.session) {
finalSession = resolveSession(parsed.session);
if (normalized?.kind === 'session') {
finalSession = resolveSession(normalized.session);
continue;
}
if (eventName === 'error' && parsed) {
const message =
typeof parsed.message === 'string' && parsed.message.trim()
? parsed.message.trim()
: options.fallbackMessage;
throw new Error(message);
if (normalized?.kind === 'error') {
throw new Error(normalized.message || options.fallbackMessage);
}
}
};
for (;;) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
consumeBuffer();
}
// 流结束后再 flush 一次解码器,避免 UTF-8 多字节字符残留在内部缓冲里。
buffer += decoder.decode();
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
break;
}
const eventBlock = buffer.slice(0, boundary.index);
buffer = buffer.slice(boundary.index + boundary.length);
const { eventName, data } = parseSseEventBlock(eventBlock);
if (!data) {
continue;
}
const parsed = parseJsonObject(data);
if (eventName === 'reply_delta' && parsed) {
const text = parsed.text;
if (typeof text === 'string') {
options.onUpdate?.(text);
}
continue;
}
if (eventName === 'session' && parsed?.session) {
finalSession = resolveSession(parsed.session);
continue;
}
if (eventName === 'error' && parsed) {
const message =
typeof parsed.message === 'string' && parsed.message.trim()
? parsed.message.trim()
: options.fallbackMessage;
throw new Error(message);
}
}
consumeBuffer();
if (!finalSession) {
throw new Error(options.incompleteMessage);

View File

@@ -72,7 +72,7 @@ export async function streamRpgCreationMessage(
sessionId: string,
payload: SendRpgAgentMessageRequest,
options: TextStreamOptions = {},
) {
): Promise<RpgAgentSessionSnapshot> {
const response = await openRpgCreationSsePost(
`/api/runtime${RPG_AGENT_API_BASE}/${encodeURIComponent(sessionId)}/messages/stream`,
payload,

View File

@@ -9,7 +9,10 @@ import type {
} from '../../../packages/shared/src/contracts/visualNovel';
import type { TextStreamOptions } from '../aiTypes';
import { type ApiRetryOptions, requestJson } from '../apiClient';
import { createCreationAgentClient } from '../creation-agent';
import {
createCreationAgentClient,
normalizeVisualNovelAgentStreamEvent,
} from '../creation-agent';
const VISUAL_NOVEL_AGENT_API_BASE = '/api/creation/visual-novel/sessions';
const VISUAL_NOVEL_CREATION_WRITE_RETRY: ApiRetryOptions = {
@@ -61,7 +64,10 @@ export function streamVisualNovelMessage(
payload: SendVisualNovelMessageRequest,
options: TextStreamOptions = {},
) {
return visualNovelAgentHttpClient.streamMessage(sessionId, payload, options);
return visualNovelAgentHttpClient.streamMessage(sessionId, payload, {
...options,
normalizeEvent: normalizeVisualNovelAgentStreamEvent,
});
}
export function executeVisualNovelAction(