import { Readable } from 'node:stream'; import type { Request as ExpressRequest, Response as ExpressResponse, } from 'express'; import type { Logger } from 'pino'; import type { AppConfig } from '../config.js'; import { HttpError, upstreamError } from '../errors.js'; import { extractApiErrorMessage, prepareApiResponse, prepareEventStreamResponse, } from '../http.js'; export type ChatMessage = { role: 'system' | 'user' | 'assistant'; content: string; }; type CompletionRequest = { model?: string; stream?: boolean; messages: ChatMessage[]; }; type RequestExecutionOptions = { signal?: AbortSignal; timeoutMs?: number; debugLabel?: string; }; const DEFAULT_LLM_REQUEST_TIMEOUT_MS = 30000; function normalizeBaseUrl(baseUrl: string) { return baseUrl.replace(/\/+$/u, ''); } function buildCompletionUrl(baseUrl: string) { return `${normalizeBaseUrl(baseUrl)}/chat/completions`; } function isAbortLikeError(error: unknown) { return ( (typeof DOMException !== 'undefined' && error instanceof DOMException && error.name === 'AbortError') || (error instanceof Error && error.name === 'AbortError') ); } function readTimeoutMs(config: AppConfig) { const parsed = Number(config.rawEnv.LLM_REQUEST_TIMEOUT_MS); return Number.isFinite(parsed) && parsed > 0 ? Math.round(parsed) : DEFAULT_LLM_REQUEST_TIMEOUT_MS; } export class UpstreamLlmTimeoutError extends HttpError { constructor(message = 'LLM 上游请求超时') { super(502, message, { code: 'UPSTREAM_TIMEOUT', }); this.name = 'UpstreamLlmTimeoutError'; } } export class UpstreamLlmConnectivityError extends HttpError { constructor(message = '无法连接 LLM 上游服务') { super(502, message, { code: 'UPSTREAM_CONNECTIVITY', }); this.name = 'UpstreamLlmConnectivityError'; } } export function isUpstreamLlmTimeoutError( error: unknown, ): error is UpstreamLlmTimeoutError { return ( error instanceof UpstreamLlmTimeoutError || (error instanceof HttpError && error.code === 'UPSTREAM_TIMEOUT') ); } export function isUpstreamLlmConnectivityError( error: unknown, ): error is UpstreamLlmConnectivityError { return ( error instanceof UpstreamLlmConnectivityError || (error instanceof HttpError && error.code === 'UPSTREAM_CONNECTIVITY') ); } export class UpstreamLlmClient { readonly logger: Logger; private readonly requestTimeoutMs: number; constructor( private readonly config: AppConfig, logger: Logger, ) { this.logger = logger; this.requestTimeoutMs = readTimeoutMs(config); } private resolveModel(model?: string) { return model?.trim() || this.config.llm.model; } private buildHeaders() { if (!this.config.llm.apiKey) { throw upstreamError('服务端缺少 LLM_API_KEY'); } return { Authorization: `Bearer ${this.config.llm.apiKey}`, 'Content-Type': 'application/json', }; } private createRequestSignal( externalSignal?: AbortSignal, timeoutMs = this.requestTimeoutMs, ) { const controller = new AbortController(); let timedOut = false; const handleAbort = () => controller.abort(externalSignal?.reason); const timeout = setTimeout(() => { timedOut = true; controller.abort(); }, timeoutMs); if (externalSignal) { if (externalSignal.aborted) { handleAbort(); } else { externalSignal.addEventListener('abort', handleAbort, { once: true, }); } } return { signal: controller.signal, didTimeout() { return timedOut; }, cleanup() { clearTimeout(timeout); externalSignal?.removeEventListener('abort', handleAbort); }, }; } private attachRequestAbort(request: ExpressRequest) { const controller = new AbortController(); const handleClose = () => controller.abort(); request.on('close', handleClose); return { signal: controller.signal, cleanup() { request.removeListener('close', handleClose); }, }; } async requestCompletion( body: CompletionRequest, options: RequestExecutionOptions = {}, ) { const timeoutMs = typeof options.timeoutMs === 'number' && options.timeoutMs > 0 ? Math.round(options.timeoutMs) : this.requestTimeoutMs; const requestSignal = this.createRequestSignal(options.signal, timeoutMs); const model = this.resolveModel(body.model); const debugLabel = typeof options.debugLabel === 'string' && options.debugLabel.trim() ? options.debugLabel.trim() : undefined; const enableDebugLog = this.config.rawEnv.LLM_DEBUG_LOG === 'true'; if (enableDebugLog) { this.logger.info( { llm_model: model, llm_debug_label: debugLabel, llm_messages: body.messages, }, '[LLM_DEBUG] Request prompt', ); } this.logger.debug( { llm_model: model, llm_stream: body.stream === true, llm_timeout_ms: timeoutMs, llm_debug_label: debugLabel, }, 'llm upstream request started', ); let response: globalThis.Response; try { response = await fetch(buildCompletionUrl(this.config.llm.baseUrl), { method: 'POST', headers: this.buildHeaders(), body: JSON.stringify({ ...body, model, }), signal: requestSignal.signal, }); } catch (error) { requestSignal.cleanup(); if (requestSignal.didTimeout() && isAbortLikeError(error)) { throw new UpstreamLlmTimeoutError(); } if (error instanceof TypeError) { throw new UpstreamLlmConnectivityError(); } this.logger.warn( { err: error, llm_model: model, llm_stream: body.stream === true, llm_debug_label: debugLabel, }, 'llm upstream request failed', ); throw error; } requestSignal.cleanup(); if (!response.ok) { const rawText = await response.text(); throw upstreamError(extractApiErrorMessage(rawText, 'LLM 上游请求失败')); } this.logger.debug( { llm_model: model, llm_stream: body.stream === true, llm_status: response.status, llm_debug_label: debugLabel, }, 'llm upstream request succeeded', ); return response; } async requestMessageContent(params: { systemPrompt: string; userPrompt: string; model?: string; signal?: AbortSignal; timeoutMs?: number; debugLabel?: string; }) { const response = await this.requestCompletion( { model: params.model, messages: [ { role: 'system', content: params.systemPrompt }, { role: 'user', content: params.userPrompt }, ], }, { signal: params.signal, timeoutMs: params.timeoutMs, debugLabel: params.debugLabel, }, ); const rawText = await response.text(); const parsed = JSON.parse(rawText) as { choices?: Array<{ message?: { content?: string; }; }>; }; const content = parsed.choices?.[0]?.message?.content?.trim(); if (!content) { throw upstreamError('LLM 返回内容为空'); } const enableDebugLog = this.config.rawEnv.LLM_DEBUG_LOG === 'true'; if (enableDebugLog) { this.logger.info( { llm_debug_label: params.debugLabel, llm_response_content: content, llm_response_length: content.length, }, '[LLM_DEBUG] Response content', ); } return content; } async streamMessageContent(params: { systemPrompt: string; userPrompt: string; model?: string; signal?: AbortSignal; timeoutMs?: number; debugLabel?: string; onUpdate?: (text: string) => void; }) { const response = await this.requestCompletion( { model: params.model, stream: true, messages: [ { role: 'system', content: params.systemPrompt }, { role: 'user', content: params.userPrompt }, ], }, { signal: params.signal, timeoutMs: params.timeoutMs, debugLabel: params.debugLabel, }, ); if (!response.body) { throw upstreamError('LLM 流式响应体不可用'); } const reader = response.body.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; let accumulatedText = ''; for (;;) { const { done, value } = await reader.read(); if (done) { break; } buffer += decoder.decode(value, { stream: true }); while (buffer.includes('\n\n')) { const boundary = buffer.indexOf('\n\n'); const eventBlock = buffer.slice(0, boundary); buffer = buffer.slice(boundary + 2); for (const rawLine of eventBlock.split(/\r?\n/u)) { const line = rawLine.trim(); if (!line.startsWith('data:')) { continue; } const data = line.slice(5).trim(); if (!data || data === '[DONE]') { continue; } try { const parsed = JSON.parse(data) as { choices?: Array<{ delta?: { content?: string; }; }>; }; const delta = parsed.choices?.[0]?.delta?.content; if (typeof delta === 'string' && delta.length > 0) { accumulatedText += delta; params.onUpdate?.(accumulatedText); } } catch { // Ignore malformed SSE frames from the upstream model. } } } } const content = accumulatedText.trim(); if (!content) { throw upstreamError('LLM 返回内容为空'); } return content; } async forwardCompletion( request: ExpressRequest, body: Record, response: ExpressResponse, ) { const requestAbort = this.attachRequestAbort(request); let upstreamResponse: globalThis.Response; try { upstreamResponse = await fetch(buildCompletionUrl(this.config.llm.baseUrl), { method: 'POST', headers: this.buildHeaders(), body: JSON.stringify({ ...body, model: typeof body.model === 'string' && body.model.trim() ? body.model : this.config.llm.model, }), signal: requestAbort.signal, }); } catch (error) { requestAbort.cleanup(); if (requestAbort.signal.aborted && response.writableEnded) { return; } throw error; } if (!upstreamResponse.ok) { requestAbort.cleanup(); const rawText = await upstreamResponse.text(); throw upstreamError(extractApiErrorMessage(rawText, 'LLM 上游请求失败')); } prepareApiResponse(request, response, { statusCode: upstreamResponse.status, headers: { 'Content-Type': upstreamResponse.headers.get('content-type') || 'application/json; charset=utf-8', }, }); if (!upstreamResponse.body) { requestAbort.cleanup(); response.end(); return; } try { await Readable.fromWeb(upstreamResponse.body as never).pipe(response); } finally { requestAbort.cleanup(); } } async forwardSseText(params: { request: ExpressRequest; systemPrompt: string; userPrompt: string; response: ExpressResponse; model?: string; }) { const requestAbort = this.attachRequestAbort(params.request); let upstreamResponse: globalThis.Response; try { upstreamResponse = await this.requestCompletion( { model: params.model, stream: true, messages: [ { role: 'system', content: params.systemPrompt }, { role: 'user', content: params.userPrompt }, ], }, { signal: requestAbort.signal, }, ); } catch (error) { requestAbort.cleanup(); if (requestAbort.signal.aborted && params.response.writableEnded) { return; } throw error; } prepareEventStreamResponse(params.request, params.response, { statusCode: upstreamResponse.status, headers: { 'Content-Type': upstreamResponse.headers.get('content-type') || 'text/event-stream; charset=utf-8', }, }); if (!upstreamResponse.body) { requestAbort.cleanup(); params.response.end(); return; } try { await Readable.fromWeb(upstreamResponse.body as never).pipe( params.response, ); } finally { requestAbort.cleanup(); } } }