Close DDD refactor and remove generated asset proxy
This commit is contained in:
@@ -2,13 +2,17 @@ use axum::{
|
||||
Json,
|
||||
extract::{Extension, State},
|
||||
http::StatusCode,
|
||||
response::Response,
|
||||
response::{
|
||||
IntoResponse, Response,
|
||||
sse::{Event, Sse},
|
||||
},
|
||||
};
|
||||
use platform_llm::{LlmMessage, LlmMessageRole, LlmTextRequest};
|
||||
use serde_json::Value;
|
||||
use serde_json::{Value, json};
|
||||
use shared_contracts::llm::{
|
||||
LlmChatCompletionRequest, LlmChatCompletionResponse, LlmChatMessagePayload, LlmChatMessageRole,
|
||||
};
|
||||
use std::convert::Infallible;
|
||||
|
||||
use crate::{
|
||||
api_response::json_success_body, auth::AuthenticatedAccessToken, http_error::AppError,
|
||||
@@ -20,15 +24,7 @@ pub async fn proxy_llm_chat_completions(
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
Extension(_authenticated): Extension<AuthenticatedAccessToken>,
|
||||
Json(payload): Json<LlmChatCompletionRequest>,
|
||||
) -> Result<Json<Value>, Response> {
|
||||
if payload.stream {
|
||||
return Err(llm_error_response(
|
||||
&request_context,
|
||||
AppError::from_status(StatusCode::NOT_IMPLEMENTED)
|
||||
.with_message("Rust `api-server` 首版暂不支持流式 LLM 代理"),
|
||||
));
|
||||
}
|
||||
|
||||
) -> Result<Response, Response> {
|
||||
let llm_client = state.llm_client().ok_or_else(|| {
|
||||
llm_error_response(
|
||||
&request_context,
|
||||
@@ -48,6 +44,10 @@ pub async fn proxy_llm_chat_completions(
|
||||
enable_web_search: false,
|
||||
};
|
||||
|
||||
if payload.stream {
|
||||
return Ok(stream_llm_chat_completions(llm_client.clone(), request).into_response());
|
||||
}
|
||||
|
||||
let response = llm_client
|
||||
.request_text(request)
|
||||
.await
|
||||
@@ -61,7 +61,78 @@ pub async fn proxy_llm_chat_completions(
|
||||
content: response.content,
|
||||
finish_reason: response.finish_reason,
|
||||
},
|
||||
))
|
||||
)
|
||||
.into_response())
|
||||
}
|
||||
|
||||
fn stream_llm_chat_completions(
|
||||
llm_client: platform_llm::LlmClient,
|
||||
request: LlmTextRequest,
|
||||
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
|
||||
let stream = async_stream::stream! {
|
||||
let (delta_tx, mut delta_rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
|
||||
let llm_stream = llm_client.stream_text(request, move |delta| {
|
||||
let _ = delta_tx.send(json!({
|
||||
"delta": delta.delta_text,
|
||||
"content": delta.accumulated_text,
|
||||
"finishReason": delta.finish_reason,
|
||||
}));
|
||||
});
|
||||
tokio::pin!(llm_stream);
|
||||
|
||||
let llm_result = loop {
|
||||
// `platform-llm` 负责上游 SSE 解析;这里尽快把增量转成 API 层 SSE 事件。
|
||||
tokio::select! {
|
||||
result = &mut llm_stream => break result,
|
||||
maybe_delta = delta_rx.recv() => {
|
||||
if let Some(delta) = maybe_delta {
|
||||
yield Ok::<Event, Infallible>(llm_sse_json_event_or_error("delta", delta));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
while let Some(delta) = delta_rx.recv().await {
|
||||
yield Ok::<Event, Infallible>(llm_sse_json_event_or_error("delta", delta));
|
||||
}
|
||||
|
||||
match llm_result {
|
||||
Ok(response) => {
|
||||
yield Ok::<Event, Infallible>(llm_sse_json_event_or_error(
|
||||
"complete",
|
||||
json!(LlmChatCompletionResponse {
|
||||
id: response.response_id,
|
||||
model: response.model,
|
||||
content: response.content,
|
||||
finish_reason: response.finish_reason,
|
||||
}),
|
||||
));
|
||||
}
|
||||
Err(error) => {
|
||||
let app_error = map_llm_error(error);
|
||||
yield Ok::<Event, Infallible>(llm_sse_json_event_or_error(
|
||||
"error",
|
||||
json!({
|
||||
"code": app_error.code(),
|
||||
"message": app_error.message(),
|
||||
}),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
yield Ok::<Event, Infallible>(Event::default().data("[DONE]"));
|
||||
};
|
||||
|
||||
Sse::new(stream)
|
||||
}
|
||||
|
||||
fn llm_sse_json_event_or_error(event_name: &str, payload: Value) -> Event {
|
||||
match serde_json::to_string(&payload) {
|
||||
Ok(payload_text) => Event::default().event(event_name).data(payload_text),
|
||||
Err(_) => Event::default()
|
||||
.event("error")
|
||||
.data("{\"code\":\"INTERNAL_SERVER_ERROR\",\"message\":\"SSE payload 序列化失败\"}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_chat_message(message: LlmChatMessagePayload) -> LlmMessage {
|
||||
@@ -105,6 +176,7 @@ mod tests {
|
||||
status_line: &'static str,
|
||||
content_type: &'static str,
|
||||
body: String,
|
||||
extra_headers: Vec<(&'static str, &'static str)>,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -113,6 +185,7 @@ mod tests {
|
||||
status_line: "200 OK",
|
||||
content_type: "application/json; charset=utf-8",
|
||||
body: r#"{"id":"resp_api_server_01","model":"ark-router-test","choices":[{"message":{"content":"代理成功"},"finish_reason":"stop"}]}"#.to_string(),
|
||||
extra_headers: Vec::new(),
|
||||
}]);
|
||||
let state = seed_authenticated_state(AppConfig {
|
||||
llm_base_url: server_url,
|
||||
@@ -176,8 +249,25 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn llm_chat_completions_rejects_stream_mode() {
|
||||
let state = seed_authenticated_state(AppConfig::default()).await;
|
||||
async fn llm_chat_completions_streams_sse_payload() {
|
||||
let server_url = spawn_mock_server(vec![MockResponse {
|
||||
status_line: "200 OK",
|
||||
content_type: "text/event-stream; charset=utf-8",
|
||||
body: concat!(
|
||||
"data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\n",
|
||||
"data: {\"choices\":[{\"delta\":{\"content\":\"好\"}}]}\n\n",
|
||||
"data: {\"choices\":[{\"finish_reason\":\"stop\"}]}\n\n",
|
||||
"data: [DONE]\n\n"
|
||||
)
|
||||
.to_string(),
|
||||
extra_headers: vec![("x-request-id", "req_llm_stream_01")],
|
||||
}]);
|
||||
let state = seed_authenticated_state(AppConfig {
|
||||
llm_base_url: server_url,
|
||||
llm_api_key: Some("test-key".to_string()),
|
||||
..AppConfig::default()
|
||||
})
|
||||
.await;
|
||||
let token = issue_access_token(&state);
|
||||
let app = build_router(state);
|
||||
|
||||
@@ -188,7 +278,6 @@ mod tests {
|
||||
.uri("/api/llm/chat/completions")
|
||||
.header("authorization", format!("Bearer {token}"))
|
||||
.header("content-type", "application/json")
|
||||
.header("x-genarrative-response-envelope", "v1")
|
||||
.body(Body::from(
|
||||
json!({
|
||||
"stream": true,
|
||||
@@ -203,7 +292,14 @@ mod tests {
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(response.status(), StatusCode::NOT_IMPLEMENTED);
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
assert_eq!(
|
||||
response
|
||||
.headers()
|
||||
.get("content-type")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("text/event-stream")
|
||||
);
|
||||
|
||||
let body = response
|
||||
.into_body()
|
||||
@@ -211,14 +307,15 @@ mod tests {
|
||||
.await
|
||||
.expect("body should collect")
|
||||
.to_bytes();
|
||||
let payload: Value =
|
||||
serde_json::from_slice(&body).expect("response body should be valid json");
|
||||
let body_text = String::from_utf8(body.to_vec()).expect("body should be utf8");
|
||||
|
||||
assert_eq!(payload["ok"], Value::Bool(false));
|
||||
assert_eq!(
|
||||
payload["error"]["code"],
|
||||
Value::String("NOT_IMPLEMENTED".to_string())
|
||||
);
|
||||
assert!(body_text.contains("event: delta"));
|
||||
assert!(body_text.contains(r#""delta":"你""#));
|
||||
assert!(body_text.contains(r#""content":"你好""#));
|
||||
assert!(body_text.contains("event: complete"));
|
||||
assert!(body_text.contains(r#""id":"req_llm_stream_01""#));
|
||||
assert!(body_text.contains(r#""finishReason":"stop""#));
|
||||
assert!(body_text.contains("data: [DONE]"));
|
||||
}
|
||||
|
||||
async fn seed_authenticated_state(config: AppConfig) -> AppState {
|
||||
@@ -306,13 +403,17 @@ mod tests {
|
||||
|
||||
fn write_response(stream: &mut std::net::TcpStream, response: MockResponse) {
|
||||
let body = response.body;
|
||||
let raw_response = format!(
|
||||
"HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
|
||||
let mut raw_response = format!(
|
||||
"HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n",
|
||||
response.status_line,
|
||||
response.content_type,
|
||||
body.len(),
|
||||
body
|
||||
body.len()
|
||||
);
|
||||
for (name, value) in response.extra_headers {
|
||||
raw_response.push_str(format!("{name}: {value}\r\n").as_str());
|
||||
}
|
||||
raw_response.push_str("\r\n");
|
||||
raw_response.push_str(body.as_str());
|
||||
|
||||
stream
|
||||
.write_all(raw_response.as_bytes())
|
||||
|
||||
Reference in New Issue
Block a user