1
This commit is contained in:
@@ -1,11 +1,18 @@
|
||||
import { Readable } from 'node:stream';
|
||||
|
||||
import type { Response as ExpressResponse } from 'express';
|
||||
import type {
|
||||
Request as ExpressRequest,
|
||||
Response as ExpressResponse,
|
||||
} from 'express';
|
||||
import type { Logger } from 'pino';
|
||||
|
||||
import type { AppConfig } from '../config.js';
|
||||
import { upstreamError } from '../errors.js';
|
||||
import { extractApiErrorMessage } from '../http.js';
|
||||
import { HttpError, upstreamError } from '../errors.js';
|
||||
import {
|
||||
extractApiErrorMessage,
|
||||
prepareApiResponse,
|
||||
prepareEventStreamResponse,
|
||||
} from '../http.js';
|
||||
|
||||
export type ChatMessage = {
|
||||
role: 'system' | 'user' | 'assistant';
|
||||
@@ -18,6 +25,14 @@ type CompletionRequest = {
|
||||
messages: ChatMessage[];
|
||||
};
|
||||
|
||||
type RequestExecutionOptions = {
|
||||
signal?: AbortSignal;
|
||||
timeoutMs?: number;
|
||||
debugLabel?: string;
|
||||
};
|
||||
|
||||
const DEFAULT_LLM_REQUEST_TIMEOUT_MS = 30000;
|
||||
|
||||
function normalizeBaseUrl(baseUrl: string) {
|
||||
return baseUrl.replace(/\/+$/u, '');
|
||||
}
|
||||
@@ -26,11 +41,69 @@ function buildCompletionUrl(baseUrl: string) {
|
||||
return `${normalizeBaseUrl(baseUrl)}/chat/completions`;
|
||||
}
|
||||
|
||||
function isAbortLikeError(error: unknown) {
|
||||
return (
|
||||
(typeof DOMException !== 'undefined' &&
|
||||
error instanceof DOMException &&
|
||||
error.name === 'AbortError') ||
|
||||
(error instanceof Error && error.name === 'AbortError')
|
||||
);
|
||||
}
|
||||
|
||||
function readTimeoutMs(config: AppConfig) {
|
||||
const parsed = Number(config.rawEnv.LLM_REQUEST_TIMEOUT_MS);
|
||||
return Number.isFinite(parsed) && parsed > 0
|
||||
? Math.round(parsed)
|
||||
: DEFAULT_LLM_REQUEST_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
export class UpstreamLlmTimeoutError extends HttpError {
|
||||
constructor(message = 'LLM 上游请求超时') {
|
||||
super(502, message, {
|
||||
code: 'UPSTREAM_TIMEOUT',
|
||||
});
|
||||
this.name = 'UpstreamLlmTimeoutError';
|
||||
}
|
||||
}
|
||||
|
||||
export class UpstreamLlmConnectivityError extends HttpError {
|
||||
constructor(message = '无法连接 LLM 上游服务') {
|
||||
super(502, message, {
|
||||
code: 'UPSTREAM_CONNECTIVITY',
|
||||
});
|
||||
this.name = 'UpstreamLlmConnectivityError';
|
||||
}
|
||||
}
|
||||
|
||||
export function isUpstreamLlmTimeoutError(
|
||||
error: unknown,
|
||||
): error is UpstreamLlmTimeoutError {
|
||||
return (
|
||||
error instanceof UpstreamLlmTimeoutError ||
|
||||
(error instanceof HttpError && error.code === 'UPSTREAM_TIMEOUT')
|
||||
);
|
||||
}
|
||||
|
||||
export function isUpstreamLlmConnectivityError(
|
||||
error: unknown,
|
||||
): error is UpstreamLlmConnectivityError {
|
||||
return (
|
||||
error instanceof UpstreamLlmConnectivityError ||
|
||||
(error instanceof HttpError && error.code === 'UPSTREAM_CONNECTIVITY')
|
||||
);
|
||||
}
|
||||
|
||||
export class UpstreamLlmClient {
|
||||
readonly logger: Logger;
|
||||
private readonly requestTimeoutMs: number;
|
||||
|
||||
constructor(
|
||||
private readonly config: AppConfig,
|
||||
private readonly logger: Logger,
|
||||
) {}
|
||||
logger: Logger,
|
||||
) {
|
||||
this.logger = logger;
|
||||
this.requestTimeoutMs = readTimeoutMs(config);
|
||||
}
|
||||
|
||||
private resolveModel(model?: string) {
|
||||
return model?.trim() || this.config.llm.model;
|
||||
@@ -47,24 +120,128 @@ export class UpstreamLlmClient {
|
||||
};
|
||||
}
|
||||
|
||||
async requestCompletion(body: CompletionRequest, signal?: AbortSignal) {
|
||||
const response = await fetch(buildCompletionUrl(this.config.llm.baseUrl), {
|
||||
method: 'POST',
|
||||
headers: this.buildHeaders(),
|
||||
body: JSON.stringify({
|
||||
...body,
|
||||
model: this.resolveModel(body.model),
|
||||
}),
|
||||
signal,
|
||||
});
|
||||
private createRequestSignal(
|
||||
externalSignal?: AbortSignal,
|
||||
timeoutMs = this.requestTimeoutMs,
|
||||
) {
|
||||
const controller = new AbortController();
|
||||
let timedOut = false;
|
||||
const handleAbort = () => controller.abort(externalSignal?.reason);
|
||||
const timeout = setTimeout(() => {
|
||||
timedOut = true;
|
||||
controller.abort();
|
||||
}, timeoutMs);
|
||||
|
||||
if (externalSignal) {
|
||||
if (externalSignal.aborted) {
|
||||
handleAbort();
|
||||
} else {
|
||||
externalSignal.addEventListener('abort', handleAbort, {
|
||||
once: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
signal: controller.signal,
|
||||
didTimeout() {
|
||||
return timedOut;
|
||||
},
|
||||
cleanup() {
|
||||
clearTimeout(timeout);
|
||||
externalSignal?.removeEventListener('abort', handleAbort);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private attachRequestAbort(request: ExpressRequest) {
|
||||
const controller = new AbortController();
|
||||
const handleClose = () => controller.abort();
|
||||
request.on('close', handleClose);
|
||||
|
||||
return {
|
||||
signal: controller.signal,
|
||||
cleanup() {
|
||||
request.removeListener('close', handleClose);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async requestCompletion(
|
||||
body: CompletionRequest,
|
||||
options: RequestExecutionOptions = {},
|
||||
) {
|
||||
const timeoutMs =
|
||||
typeof options.timeoutMs === 'number' && options.timeoutMs > 0
|
||||
? Math.round(options.timeoutMs)
|
||||
: this.requestTimeoutMs;
|
||||
const requestSignal = this.createRequestSignal(options.signal, timeoutMs);
|
||||
const model = this.resolveModel(body.model);
|
||||
const debugLabel =
|
||||
typeof options.debugLabel === 'string' && options.debugLabel.trim()
|
||||
? options.debugLabel.trim()
|
||||
: undefined;
|
||||
|
||||
this.logger.debug(
|
||||
{
|
||||
llm_model: model,
|
||||
llm_stream: body.stream === true,
|
||||
llm_timeout_ms: timeoutMs,
|
||||
llm_debug_label: debugLabel,
|
||||
},
|
||||
'llm upstream request started',
|
||||
);
|
||||
|
||||
let response: globalThis.Response;
|
||||
try {
|
||||
response = await fetch(buildCompletionUrl(this.config.llm.baseUrl), {
|
||||
method: 'POST',
|
||||
headers: this.buildHeaders(),
|
||||
body: JSON.stringify({
|
||||
...body,
|
||||
model,
|
||||
}),
|
||||
signal: requestSignal.signal,
|
||||
});
|
||||
} catch (error) {
|
||||
requestSignal.cleanup();
|
||||
if (requestSignal.didTimeout() && isAbortLikeError(error)) {
|
||||
throw new UpstreamLlmTimeoutError();
|
||||
}
|
||||
|
||||
if (error instanceof TypeError) {
|
||||
throw new UpstreamLlmConnectivityError();
|
||||
}
|
||||
|
||||
this.logger.warn(
|
||||
{
|
||||
err: error,
|
||||
llm_model: model,
|
||||
llm_stream: body.stream === true,
|
||||
llm_debug_label: debugLabel,
|
||||
},
|
||||
'llm upstream request failed',
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
|
||||
requestSignal.cleanup();
|
||||
|
||||
if (!response.ok) {
|
||||
const rawText = await response.text();
|
||||
throw upstreamError(
|
||||
extractApiErrorMessage(rawText, 'LLM 上游请求失败'),
|
||||
);
|
||||
throw upstreamError(extractApiErrorMessage(rawText, 'LLM 上游请求失败'));
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
{
|
||||
llm_model: model,
|
||||
llm_stream: body.stream === true,
|
||||
llm_status: response.status,
|
||||
llm_debug_label: debugLabel,
|
||||
},
|
||||
'llm upstream request succeeded',
|
||||
);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -73,6 +250,8 @@ export class UpstreamLlmClient {
|
||||
userPrompt: string;
|
||||
model?: string;
|
||||
signal?: AbortSignal;
|
||||
timeoutMs?: number;
|
||||
debugLabel?: string;
|
||||
}) {
|
||||
const response = await this.requestCompletion(
|
||||
{
|
||||
@@ -82,7 +261,11 @@ export class UpstreamLlmClient {
|
||||
{ role: 'user', content: params.userPrompt },
|
||||
],
|
||||
},
|
||||
params.signal,
|
||||
{
|
||||
signal: params.signal,
|
||||
timeoutMs: params.timeoutMs,
|
||||
debugLabel: params.debugLabel,
|
||||
},
|
||||
);
|
||||
const rawText = await response.text();
|
||||
const parsed = JSON.parse(rawText) as {
|
||||
@@ -101,69 +284,116 @@ export class UpstreamLlmClient {
|
||||
return content;
|
||||
}
|
||||
|
||||
async forwardCompletion(body: Record<string, unknown>, response: ExpressResponse) {
|
||||
const upstreamResponse = await fetch(buildCompletionUrl(this.config.llm.baseUrl), {
|
||||
method: 'POST',
|
||||
headers: this.buildHeaders(),
|
||||
body: JSON.stringify({
|
||||
...body,
|
||||
model:
|
||||
typeof body.model === 'string' && body.model.trim()
|
||||
? body.model
|
||||
: this.config.llm.model,
|
||||
}),
|
||||
});
|
||||
async forwardCompletion(
|
||||
request: ExpressRequest,
|
||||
body: Record<string, unknown>,
|
||||
response: ExpressResponse,
|
||||
) {
|
||||
const requestAbort = this.attachRequestAbort(request);
|
||||
let upstreamResponse: globalThis.Response;
|
||||
|
||||
if (!upstreamResponse.ok) {
|
||||
const rawText = await upstreamResponse.text();
|
||||
throw upstreamError(
|
||||
extractApiErrorMessage(rawText, 'LLM 上游请求失败'),
|
||||
);
|
||||
try {
|
||||
upstreamResponse = await fetch(buildCompletionUrl(this.config.llm.baseUrl), {
|
||||
method: 'POST',
|
||||
headers: this.buildHeaders(),
|
||||
body: JSON.stringify({
|
||||
...body,
|
||||
model:
|
||||
typeof body.model === 'string' && body.model.trim()
|
||||
? body.model
|
||||
: this.config.llm.model,
|
||||
}),
|
||||
signal: requestAbort.signal,
|
||||
});
|
||||
} catch (error) {
|
||||
requestAbort.cleanup();
|
||||
if (requestAbort.signal.aborted && response.writableEnded) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
response.status(upstreamResponse.status);
|
||||
response.setHeader(
|
||||
'Content-Type',
|
||||
upstreamResponse.headers.get('content-type') || 'application/json; charset=utf-8',
|
||||
);
|
||||
if (!upstreamResponse.ok) {
|
||||
requestAbort.cleanup();
|
||||
const rawText = await upstreamResponse.text();
|
||||
throw upstreamError(extractApiErrorMessage(rawText, 'LLM 上游请求失败'));
|
||||
}
|
||||
|
||||
prepareApiResponse(request, response, {
|
||||
statusCode: upstreamResponse.status,
|
||||
headers: {
|
||||
'Content-Type':
|
||||
upstreamResponse.headers.get('content-type') ||
|
||||
'application/json; charset=utf-8',
|
||||
},
|
||||
});
|
||||
|
||||
if (!upstreamResponse.body) {
|
||||
requestAbort.cleanup();
|
||||
response.end();
|
||||
return;
|
||||
}
|
||||
|
||||
await Readable.fromWeb(upstreamResponse.body as never).pipe(response);
|
||||
try {
|
||||
await Readable.fromWeb(upstreamResponse.body as never).pipe(response);
|
||||
} finally {
|
||||
requestAbort.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
async forwardSseText(params: {
|
||||
request: ExpressRequest;
|
||||
systemPrompt: string;
|
||||
userPrompt: string;
|
||||
response: ExpressResponse;
|
||||
model?: string;
|
||||
}) {
|
||||
const upstreamResponse = await this.requestCompletion({
|
||||
model: params.model,
|
||||
stream: true,
|
||||
messages: [
|
||||
{ role: 'system', content: params.systemPrompt },
|
||||
{ role: 'user', content: params.userPrompt },
|
||||
],
|
||||
const requestAbort = this.attachRequestAbort(params.request);
|
||||
let upstreamResponse: globalThis.Response;
|
||||
|
||||
try {
|
||||
upstreamResponse = await this.requestCompletion(
|
||||
{
|
||||
model: params.model,
|
||||
stream: true,
|
||||
messages: [
|
||||
{ role: 'system', content: params.systemPrompt },
|
||||
{ role: 'user', content: params.userPrompt },
|
||||
],
|
||||
},
|
||||
{
|
||||
signal: requestAbort.signal,
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
requestAbort.cleanup();
|
||||
if (requestAbort.signal.aborted && params.response.writableEnded) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
prepareEventStreamResponse(params.request, params.response, {
|
||||
statusCode: upstreamResponse.status,
|
||||
headers: {
|
||||
'Content-Type':
|
||||
upstreamResponse.headers.get('content-type') ||
|
||||
'text/event-stream; charset=utf-8',
|
||||
},
|
||||
});
|
||||
|
||||
params.response.status(upstreamResponse.status);
|
||||
params.response.setHeader(
|
||||
'Content-Type',
|
||||
upstreamResponse.headers.get('content-type') || 'text/event-stream; charset=utf-8',
|
||||
);
|
||||
params.response.setHeader('Cache-Control', 'no-cache');
|
||||
params.response.setHeader('Connection', 'keep-alive');
|
||||
params.response.setHeader('X-Accel-Buffering', 'no');
|
||||
|
||||
if (!upstreamResponse.body) {
|
||||
requestAbort.cleanup();
|
||||
params.response.end();
|
||||
return;
|
||||
}
|
||||
|
||||
await Readable.fromWeb(upstreamResponse.body as never).pipe(params.response);
|
||||
try {
|
||||
await Readable.fromWeb(upstreamResponse.body as never).pipe(
|
||||
params.response,
|
||||
);
|
||||
} finally {
|
||||
requestAbort.cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user