From 51509259479ca3306b22158aa3cc731a3a8a3236 Mon Sep 17 00:00:00 2001 From: kdletters <61648117+kdletters@users.noreply.github.com> Date: Fri, 5 Jun 2026 17:03:39 +0800 Subject: [PATCH] fix: send VectorEngine images with libcurl --- .hermes/shared-memory/pitfalls.md | 8 + ...】server-rs与SpacetimeDB数据契约-2026-05-15.md | 2 +- server-rs/Cargo.lock | 60 ++- server-rs/Cargo.toml | 1 + server-rs/crates/platform-image/Cargo.toml | 1 + .../src/vector_engine/client.rs | 205 +++++----- .../src/vector_engine/curl_transport.rs | 363 ++++++++++++++++++ .../platform-image/src/vector_engine/mod.rs | 1 + .../src/vector_engine/transport.rs | 134 +------ 9 files changed, 526 insertions(+), 249 deletions(-) create mode 100644 server-rs/crates/platform-image/src/vector_engine/curl_transport.rs diff --git a/.hermes/shared-memory/pitfalls.md b/.hermes/shared-memory/pitfalls.md index 1ea47a77..a85427e1 100644 --- a/.hermes/shared-memory/pitfalls.md +++ b/.hermes/shared-memory/pitfalls.md @@ -1730,6 +1730,14 @@ - 验证:定向测试 `cargo test -p api-server generated_asset_sheet_two_items_per_row --manifest-path server-rs/Cargo.toml -- --nocapture` 应通过,且错位透明样本应按连通域切出完整视图。 - 关联:`server-rs/crates/api-server/src/generated_asset_sheets.rs`、`server-rs/crates/api-server/src/match3d/item_assets.rs`。 +## 腾讯云 release 上 VectorEngine `SendRequest` 超时先查出口链路与重试 + +- 现象:release 机器调用 VectorEngine `gpt-image-2` 的 `/v1/images/generations` 或 `/v1/images/edits` 偶发 `client error (SendRequest) -> connection error -> Connection timed out (os error 110)`,应用层表现为 504;本地通常正常。 +- 原因:本地 DNS 可能走代理 / 加速出口,而腾讯云 release 直接解析到 VectorEngine 真实边缘节点。实测同一张约 2.37MB PNG、同一 edits 请求,`curl` 5/5 成功,但 `reqwest/hyper` 会间歇性超时;固定 `40.160.33.47` 也只能改善,不能根治。 +- 处理:不要优先关闭 multipart,也不要直接把 `SendRequest` 解释成上游业务拒绝。VectorEngine 图片 `generations` / `edits` 上游 POST 单独使用 `libcurl`;参考图下载和响应图片 URL 下载仍用 `reqwest`。send 阶段 timeout / connect error 在 `platform-image` 内最多重试 5 次,使用指数退避和短抖动;日志字段 `attempt`、`max_attempts`、`retry_delay_ms`、`reference_image_bytes_total`、`request_params` 是定位依据。 +- 验证:release 上先看 `journalctl -u genarrative-api.service` 中 `VectorEngine 图片请求发送失败,准备重试` 与最终 `HTTP 返回`;若仍失败,再用同一图片分别跑 curl 与最小 reqwest 探针对照。 +- 关联:`server-rs/crates/platform-image/src/vector_engine/client.rs`、`docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md`。 + ## 个人中心不再保留直达“存档”按钮入口 - 现象:2026-05-25 起,移动端“我的”页顶部改为品牌行 + 扫码 / 设置按钮,设置区和次级入口不再提供独立的 `存档` 按钮;用户仍可在“玩过”弹窗里查看可继续存档。 diff --git a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md index 3fbda7e4..5168a06a 100644 --- a/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md +++ b/docs/【后端架构】server-rs与SpacetimeDB数据契约-2026-05-15.md @@ -168,7 +168,7 @@ npm run check:server-rs-ddd ## 外部服务与资产 - LLM:`GENARRATIVE_LLM_*`,创意 Agent 另用 `APIMART_BASE_URL` / `APIMART_API_KEY`。 -- 图片生成:VectorEngine `gpt-image-2` 图片 provider 归属 `platform-image`,密钥只在后端环境变量中;`api-server` 内的 `openai_image_generation.rs` 只是兼容调用面和外部失败审计桥接,不再承载 provider 协议实现。实际外部生成运行记录统一落 `tracking_event`,`event_key = external_generation_run`,metadata 记录开始 / 结束时间、耗时、状态、成功标记、失败原因、provider task id 和结果摘要,不再写回过时的 `ai_task`。APIMart 只保留给创意 Agent `gpt-5` Responses 文本 / 多模态链路;DashScope 只按仍在使用的历史能力单独处理,不作为 GPT-image-2 兜底。 +- 图片生成:VectorEngine `gpt-image-2` 图片 provider 归属 `platform-image`,密钥只在后端环境变量中;`api-server` 内的 `openai_image_generation.rs` 只是兼容调用面和外部失败审计桥接,不再承载 provider 协议实现。实际外部生成运行记录统一落 `tracking_event`,`event_key = external_generation_run`,metadata 记录开始 / 结束时间、耗时、状态、成功标记、失败原因、provider task id 和结果摘要,不再写回过时的 `ai_task`。APIMart 只保留给创意 Agent `gpt-5` Responses 文本 / 多模态链路;DashScope 只按仍在使用的历史能力单独处理,不作为 GPT-image-2 兜底。VectorEngine `/v1/images/generations` 和 `/v1/images/edits` 上游 POST 使用 `libcurl` 发送;`reqwest` 只保留给参考图 URL 下载和响应中图片 URL 下载。`request_send` 阶段的 curl timeout / connect error 按可重试传输错误处理,最多尝试 5 次,并使用指数退避加短抖动;排障时优先看 `attempt`、`max_attempts`、`retry_delay_ms`、`reference_image_bytes_total` 和 `request_params`,不要把 `SendRequest` 当成上游业务错误。 - Match3D 物品 sheet:关卡整图完成后走 VectorEngine `/v1/images/edits` multipart `image`,模型为 `gpt-image-2`,`2K 1:1` 输出 `10*10` spritesheet;物品 sheet prompt 固定要求纯绿色绿幕背景,后端上传 OSS 前必须把绿幕扣成透明 PNG,并把透明整图写入 `itemSpritesheetImageSrc/itemSpritesheetImageObjectKey`。后端优先按透明 alpha 连通域从该 sheet 识别真实素材矩形并持久化 20 个物品、每个 5 个形态;识别数量不足时才回退 `10*10` 固定网格。通用系列素材图集的行列索引按每行 2 个物品计算,必须落在 `1..=10`,难度只决定运行态加载 3 / 9 / 15 / 20 种。 - Match3D UI spritesheet 和背景派生图:关卡整图作为参考图并发生成 `1K 1:1` UI spritesheet 与 `1K 9:16` 背景图,模型均为 `gpt-image-2`。UI spritesheet prompt 固定要求纯绿色绿幕背景,后端上传 OSS 前必须把绿幕扣成透明 PNG;背景图必须合成为全画幅不透明 PNG。 - Match3D 1:1 容器 UI:VectorEngine `/v1/images/edits` multipart 参考图。该容器参考图是后端生图协议输入,必须通过 `include_bytes!` 随 `api-server` 编译进二进制,避免 API 单独发布或运行目录缺少 `public/` 时生成失败。 diff --git a/server-rs/Cargo.lock b/server-rs/Cargo.lock index 4024b992..5b149323 100644 --- a/server-rs/Cargo.lock +++ b/server-rs/Cargo.lock @@ -636,6 +636,36 @@ dependencies = [ "memchr", ] +[[package]] +name = "curl" +version = "0.4.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79fc3b6dd0b87ba36e565715bf9a2ced221311db47bd18011676f24a6066edbc" +dependencies = [ + "curl-sys", + "libc", + "openssl-probe", + "openssl-sys", + "schannel", + "socket2 0.6.3", + "windows-sys 0.59.0", +] + +[[package]] +name = "curl-sys" +version = "0.4.88+curl-8.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644816de6547255eff4e491a1dda1c19b7237f00b62a61e6e64859ce4f2906d0" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", + "windows-sys 0.61.2", +] + [[package]] name = "darling" version = "0.23.0" @@ -1310,7 +1340,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -1668,6 +1698,18 @@ dependencies = [ "glob", ] +[[package]] +name = "libz-sys" +version = "1.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bc9657773828b90eeb625adff10eeac83cc21bbfd8e23a03eaa8a33c9e28d9" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2405,6 +2447,7 @@ name = "platform-image" version = "0.1.0" dependencies = [ "base64 0.22.1", + "curl", "image", "platform-oss", "reqwest 0.12.28", @@ -2606,7 +2649,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -2643,7 +2686,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", "windows-sys 0.52.0", ] @@ -4571,7 +4614,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -4651,6 +4694,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.61.2" diff --git a/server-rs/Cargo.toml b/server-rs/Cargo.toml index 404b3401..fdb306a3 100644 --- a/server-rs/Cargo.toml +++ b/server-rs/Cargo.toml @@ -96,6 +96,7 @@ axum = "0.8" base64 = "0.22" cbc = { version = "0.1", features = ["alloc"] } bytes = "1" +curl = "0.4" dotenvy = "0.15" flate2 = "1" futures-util = "0.3" diff --git a/server-rs/crates/platform-image/Cargo.toml b/server-rs/crates/platform-image/Cargo.toml index b5a6feca..9da08834 100644 --- a/server-rs/crates/platform-image/Cargo.toml +++ b/server-rs/crates/platform-image/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] base64 = { workspace = true } +curl = { workspace = true } image = { workspace = true, features = ["jpeg", "png", "webp"] } reqwest = { workspace = true, features = ["json", "multipart", "rustls-tls"] } serde_json = { workspace = true } diff --git a/server-rs/crates/platform-image/src/vector_engine/client.rs b/server-rs/crates/platform-image/src/vector_engine/client.rs index afae18da..6fcd23dd 100644 --- a/server-rs/crates/platform-image/src/vector_engine/client.rs +++ b/server-rs/crates/platform-image/src/vector_engine/client.rs @@ -1,19 +1,22 @@ -use reqwest::{header, multipart}; +use std::time::{SystemTime, UNIX_EPOCH}; -const VECTOR_ENGINE_SEND_MAX_ATTEMPTS: u32 = 3; +const VECTOR_ENGINE_SEND_MAX_ATTEMPTS: u32 = 5; const VECTOR_ENGINE_SEND_RETRY_BASE_DELAY_MS: u64 = 500; +const VECTOR_ENGINE_SEND_RETRY_MAX_JITTER_MS: u64 = 999; use super::{ constants::{GPT_IMAGE_2_MODEL, VECTOR_ENGINE_PROVIDER}, + curl_transport::{ + map_curl_error, send_vector_engine_json_request_with_curl, + send_vector_engine_multipart_edit_request_with_curl, + }, error::PlatformImageError, image_source::resolve_reference_images, request::{ - build_prompt_with_negative, build_vector_engine_image_edit_request_log_params, - build_vector_engine_image_request_body, normalize_image_size, - vector_engine_images_edit_url, vector_engine_images_generation_url, + build_vector_engine_image_edit_request_log_params, build_vector_engine_image_request_body, + normalize_image_size, vector_engine_images_edit_url, vector_engine_images_generation_url, }, response::handle_vector_engine_response, - transport::map_reqwest_error, types::{GeneratedImages, ReferenceImage, VectorEngineImageSettings}, }; @@ -55,27 +58,27 @@ pub async fn create_vector_engine_image_generation( let started_at = std::time::Instant::now(); let mut attempt = 1; let response = loop { - match http_client - .post(request_url.as_str()) - .header( - header::AUTHORIZATION, - format!("Bearer {}", settings.api_key), - ) - .header(header::ACCEPT, "application/json") - .header(header::CONTENT_TYPE, "application/json") - .json(&request_body) - .send() - .await + match send_vector_engine_json_request_with_curl( + request_url.as_str(), + settings.api_key.as_str(), + &request_body, + settings.request_timeout_ms, + ) + .await { Ok(response) => break response, Err(error) => { - if should_retry_vector_engine_send_error(&error, attempt) { + if should_retry_vector_engine_curl_send_error(&error, attempt) { retry_vector_engine_send_after_delay( "generation", request_url.as_str(), "request_send", attempt, - &error, + error.is_timeout(), + error.is_connect(), + true, + false, + error.to_string().as_str(), started_at.elapsed().as_millis() as u64, Some(prompt.chars().count()), Some(reference_images.len()), @@ -85,7 +88,7 @@ pub async fn create_vector_engine_image_generation( attempt += 1; continue; } - return Err(map_reqwest_error( + return Err(map_curl_error( format!("{failure_context}:创建图片生成任务失败").as_str(), request_url.as_str(), "request_send", @@ -98,11 +101,11 @@ pub async fn create_vector_engine_image_generation( } } }; - let response_status = response.status(); + let response_status = response.status; tracing::info!( provider = VECTOR_ENGINE_PROVIDER, endpoint = %request_url, - status = response_status.as_u16(), + status = response_status, prompt_chars = prompt.chars().count(), size = %normalized_size, reference_image_count = reference_images.len(), @@ -111,25 +114,11 @@ pub async fn create_vector_engine_image_generation( failure_context, "VectorEngine 图片生成 HTTP 返回" ); - let response_text = match response.text().await { - Ok(response_text) => response_text, - Err(error) => { - return Err(map_reqwest_error( - format!("{failure_context}:读取图片生成响应失败").as_str(), - request_url.as_str(), - "response_body", - error, - started_at.elapsed().as_millis() as u64, - Some(prompt.chars().count()), - Some(reference_images.len()), - Some(&request_body), - )); - } - }; + let response_text = response.body; handle_vector_engine_response( http_client, request_url.as_str(), - response_status.as_u16(), + response_status, response_text.as_str(), failure_context, started_at.elapsed().as_millis() as u64, @@ -219,34 +208,31 @@ pub async fn create_vector_engine_image_edit_with_references( ); let mut attempt = 1; let response = loop { - let form = build_vector_engine_image_edit_form( + match send_vector_engine_multipart_edit_request_with_curl( + request_url.as_str(), + settings.api_key.as_str(), prompt, negative_prompt, normalized_size.as_str(), candidate_count, reference_images, - failure_context, - )?; - match http_client - .post(request_url.as_str()) - .header( - header::AUTHORIZATION, - format!("Bearer {}", settings.api_key), - ) - .header(header::ACCEPT, "application/json") - .multipart(form) - .send() - .await + settings.request_timeout_ms, + ) + .await { Ok(response) => break response, Err(error) => { - if should_retry_vector_engine_send_error(&error, attempt) { + if should_retry_vector_engine_curl_send_error(&error, attempt) { retry_vector_engine_send_after_delay( "edit", request_url.as_str(), "request_send", attempt, - &error, + error.is_timeout(), + error.is_connect(), + true, + false, + error.to_string().as_str(), started_at.elapsed().as_millis() as u64, Some(prompt.chars().count()), Some(reference_image_count), @@ -256,7 +242,7 @@ pub async fn create_vector_engine_image_edit_with_references( attempt += 1; continue; } - return Err(map_reqwest_error( + return Err(map_curl_error( format!("{failure_context}:创建图片编辑任务失败").as_str(), request_url.as_str(), "request_send", @@ -269,11 +255,11 @@ pub async fn create_vector_engine_image_edit_with_references( } } }; - let response_status = response.status(); + let response_status = response.status; tracing::info!( provider = VECTOR_ENGINE_PROVIDER, endpoint = %request_url, - status = response_status.as_u16(), + status = response_status, prompt_chars = prompt.chars().count(), size = %normalized_size, reference_image_count, @@ -284,25 +270,11 @@ pub async fn create_vector_engine_image_edit_with_references( failure_context, "VectorEngine 图片编辑 HTTP 返回" ); - let response_text = match response.text().await { - Ok(response_text) => response_text, - Err(error) => { - return Err(map_reqwest_error( - format!("{failure_context}:读取图片编辑响应失败").as_str(), - request_url.as_str(), - "response_body", - error, - started_at.elapsed().as_millis() as u64, - Some(prompt.chars().count()), - Some(reference_image_count), - Some(&request_params), - )); - } - }; + let response_text = response.body; handle_vector_engine_response( http_client, request_url.as_str(), - response_status.as_u16(), + response_status, response_text.as_str(), failure_context, started_at.elapsed().as_millis() as u64, @@ -314,38 +286,10 @@ pub async fn create_vector_engine_image_edit_with_references( .await } -fn build_vector_engine_image_edit_form( - prompt: &str, - negative_prompt: Option<&str>, - normalized_size: &str, - candidate_count: u32, - reference_images: &[ReferenceImage], - failure_context: &str, -) -> Result { - let mut form = multipart::Form::new() - .text("model", GPT_IMAGE_2_MODEL.to_string()) - .text( - "prompt", - build_prompt_with_negative(prompt, negative_prompt), - ) - .text("n", candidate_count.clamp(1, 4).to_string()) - .text("size", normalized_size.to_string()); - - for reference_image in reference_images.iter().take(5) { - let image_part = multipart::Part::bytes(reference_image.bytes.clone()) - .file_name(reference_image.file_name.clone()) - .mime_str(reference_image.mime_type.as_str()) - .map_err(|error| PlatformImageError::InvalidRequest { - provider: VECTOR_ENGINE_PROVIDER, - message: format!("{failure_context}:构造参考图失败:{error}"), - })?; - form = form.part("image", image_part); - } - - Ok(form) -} - -fn should_retry_vector_engine_send_error(error: &reqwest::Error, attempt: u32) -> bool { +fn should_retry_vector_engine_curl_send_error( + error: &super::curl_transport::VectorEngineCurlError, + attempt: u32, +) -> bool { attempt < VECTOR_ENGINE_SEND_MAX_ATTEMPTS && (error.is_timeout() || error.is_connect()) } @@ -354,13 +298,17 @@ async fn retry_vector_engine_send_after_delay( request_url: &str, failure_stage: &'static str, attempt: u32, - error: &reqwest::Error, + timeout: bool, + connect: bool, + request: bool, + body: bool, + error: &str, elapsed_ms: u64, prompt_chars: Option, reference_image_count: Option, request_params: Option<&serde_json::Value>, ) { - let delay_ms = VECTOR_ENGINE_SEND_RETRY_BASE_DELAY_MS * u64::from(attempt); + let delay_ms = vector_engine_send_retry_delay_ms(attempt, vector_engine_send_retry_jitter_ms()); tracing::warn!( provider = VECTOR_ENGINE_PROVIDER, endpoint = %request_url, @@ -369,12 +317,12 @@ async fn retry_vector_engine_send_after_delay( attempt, max_attempts = VECTOR_ENGINE_SEND_MAX_ATTEMPTS, retry_delay_ms = delay_ms, - timeout = error.is_timeout(), - connect = error.is_connect(), - request = error.is_request(), - body = error.is_body(), - status = error.status().map(|status| status.as_u16()).unwrap_or_default(), - error = %error, + timeout, + connect, + request, + body, + status = 0, + error, elapsed_ms, prompt_chars, reference_image_count, @@ -385,3 +333,36 @@ async fn retry_vector_engine_send_after_delay( ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } + +fn vector_engine_send_retry_delay_ms(attempt: u32, jitter_ms: u64) -> u64 { + let exponential_factor = 1_u64 << attempt.saturating_sub(1).min(10); + let bounded_jitter_ms = jitter_ms.min(VECTOR_ENGINE_SEND_RETRY_MAX_JITTER_MS); + VECTOR_ENGINE_SEND_RETRY_BASE_DELAY_MS * exponential_factor + bounded_jitter_ms +} + +fn vector_engine_send_retry_jitter_ms() -> u64 { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.subsec_nanos()) + .unwrap_or_default(); + u64::from(nanos) % (VECTOR_ENGINE_SEND_RETRY_MAX_JITTER_MS + 1) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn vector_engine_send_retry_policy_allows_four_retries_before_final_attempt() { + assert_eq!(VECTOR_ENGINE_SEND_MAX_ATTEMPTS, 5); + } + + #[test] + fn vector_engine_send_retry_delay_uses_exponential_backoff_with_bounded_jitter() { + assert_eq!(vector_engine_send_retry_delay_ms(1, 0), 500); + assert_eq!(vector_engine_send_retry_delay_ms(2, 0), 1_000); + assert_eq!(vector_engine_send_retry_delay_ms(3, 0), 2_000); + assert_eq!(vector_engine_send_retry_delay_ms(4, 0), 4_000); + assert_eq!(vector_engine_send_retry_delay_ms(4, 999), 4_999); + } +} diff --git a/server-rs/crates/platform-image/src/vector_engine/curl_transport.rs b/server-rs/crates/platform-image/src/vector_engine/curl_transport.rs new file mode 100644 index 00000000..626e368c --- /dev/null +++ b/server-rs/crates/platform-image/src/vector_engine/curl_transport.rs @@ -0,0 +1,363 @@ +use std::{error::Error, fmt, time::Duration}; + +use curl::{ + FormError, + easy::{Easy, Form, List}, +}; +use serde_json::Value; + +use super::{ + audit::build_failure_audit, + constants::{GPT_IMAGE_2_MODEL, VECTOR_ENGINE_PROVIDER}, + error::PlatformImageError, + request::build_prompt_with_negative, + types::ReferenceImage, +}; + +#[derive(Debug)] +pub(crate) struct VectorEngineCurlResponse { + pub(crate) status: u16, + pub(crate) body: String, +} + +#[derive(Debug)] +pub(crate) enum VectorEngineCurlError { + Curl(curl::Error), + Form(FormError), + WorkerJoin(tokio::task::JoinError), +} + +impl VectorEngineCurlError { + pub(crate) fn is_timeout(&self) -> bool { + match self { + Self::Curl(error) => error.is_operation_timedout(), + Self::Form(_) | Self::WorkerJoin(_) => false, + } + } + + pub(crate) fn is_connect(&self) -> bool { + match self { + Self::Curl(error) => { + error.is_couldnt_connect() + || error.is_couldnt_resolve_host() + || error.is_couldnt_resolve_proxy() + } + Self::Form(_) | Self::WorkerJoin(_) => false, + } + } +} + +impl fmt::Display for VectorEngineCurlError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Curl(error) => write!(formatter, "{error}"), + Self::Form(error) => write!(formatter, "multipart form error: {error}"), + Self::WorkerJoin(error) => write!(formatter, "curl worker join failed: {error}"), + } + } +} + +impl Error for VectorEngineCurlError {} + +impl From for VectorEngineCurlError { + fn from(error: curl::Error) -> Self { + Self::Curl(error) + } +} + +impl From for VectorEngineCurlError { + fn from(error: FormError) -> Self { + Self::Form(error) + } +} + +pub(crate) async fn send_vector_engine_json_request_with_curl( + request_url: &str, + api_key: &str, + request_body: &Value, + timeout_ms: u64, +) -> Result { + let request_url = request_url.to_string(); + let api_key = api_key.to_string(); + let request_body = request_body.to_string(); + tokio::task::spawn_blocking(move || { + send_json_request_with_curl_blocking( + request_url.as_str(), + api_key.as_str(), + request_body.as_str(), + timeout_ms, + ) + }) + .await + .map_err(VectorEngineCurlError::WorkerJoin)? +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn send_vector_engine_multipart_edit_request_with_curl( + request_url: &str, + api_key: &str, + prompt: &str, + negative_prompt: Option<&str>, + normalized_size: &str, + candidate_count: u32, + reference_images: &[ReferenceImage], + timeout_ms: u64, +) -> Result { + let request_url = request_url.to_string(); + let api_key = api_key.to_string(); + let prompt = prompt.to_string(); + let negative_prompt = negative_prompt.map(str::to_string); + let normalized_size = normalized_size.to_string(); + let reference_images = reference_images.iter().take(5).cloned().collect::>(); + tokio::task::spawn_blocking(move || { + send_multipart_edit_request_with_curl_blocking( + request_url.as_str(), + api_key.as_str(), + prompt.as_str(), + negative_prompt.as_deref(), + normalized_size.as_str(), + candidate_count, + reference_images.as_slice(), + timeout_ms, + ) + }) + .await + .map_err(VectorEngineCurlError::WorkerJoin)? +} + +pub(crate) fn map_curl_error( + context: &str, + request_url: &str, + failure_stage: &'static str, + error: VectorEngineCurlError, + latency_ms: u64, + prompt_chars: Option, + reference_image_count: Option, + request_params: Option<&Value>, +) -> PlatformImageError { + let is_timeout = error.is_timeout(); + let is_connect = error.is_connect(); + let source = error.to_string(); + let message = format!("{context}:{source}"); + let audit = build_failure_audit( + request_url, + context, + failure_stage, + None, + None, + is_timeout, + is_connect, + message.as_str(), + Some(source.clone()), + None, + Some(latency_ms), + prompt_chars, + reference_image_count, + ); + tracing::warn!( + provider = VECTOR_ENGINE_PROVIDER, + endpoint = %request_url, + failure_stage, + timeout = is_timeout, + connect = is_connect, + request = true, + body = false, + status = 0, + source = %source, + source_chain = %source, + source_chain_depth = 1, + message = %message, + elapsed_ms = latency_ms, + prompt_chars, + reference_image_count, + request_params = %request_params + .map(|value| value.to_string()) + .unwrap_or_default(), + "VectorEngine 图片 libcurl 请求失败" + ); + + PlatformImageError::Request { + provider: VECTOR_ENGINE_PROVIDER, + message, + endpoint: Some(request_url.to_string()), + timeout: is_timeout, + connect: is_connect, + request: true, + body: false, + status_code: None, + source: Some(source), + audit: Some(audit), + } +} + +fn send_json_request_with_curl_blocking( + request_url: &str, + api_key: &str, + request_body: &str, + timeout_ms: u64, +) -> Result { + let mut headers = vector_engine_curl_headers(api_key)?; + headers.append("Content-Type: application/json")?; + let mut easy = Easy::new(); + easy.url(request_url)?; + easy.post(true)?; + easy.http_headers(headers)?; + easy.timeout(Duration::from_millis(timeout_ms.max(1)))?; + easy.post_fields_copy(request_body.as_bytes())?; + Ok(perform_curl_request(easy)?) +} + +#[allow(clippy::too_many_arguments)] +fn send_multipart_edit_request_with_curl_blocking( + request_url: &str, + api_key: &str, + prompt: &str, + negative_prompt: Option<&str>, + normalized_size: &str, + candidate_count: u32, + reference_images: &[ReferenceImage], + timeout_ms: u64, +) -> Result { + let mut form = Form::new(); + form.part("model") + .contents(GPT_IMAGE_2_MODEL.as_bytes()) + .add()?; + form.part("prompt") + .contents(build_prompt_with_negative(prompt, negative_prompt).as_bytes()) + .add()?; + form.part("n") + .contents(candidate_count.clamp(1, 4).to_string().as_bytes()) + .add()?; + form.part("size") + .contents(normalized_size.as_bytes()) + .add()?; + + for reference_image in reference_images { + form.part("image") + .contents(reference_image.bytes.as_slice()) + .content_type(reference_image.mime_type.as_str()) + .filename(reference_image.file_name.as_str()) + .add()?; + } + + let headers = vector_engine_curl_headers(api_key)?; + let mut easy = Easy::new(); + easy.url(request_url)?; + easy.httppost(form)?; + easy.http_headers(headers)?; + easy.timeout(Duration::from_millis(timeout_ms.max(1)))?; + Ok(perform_curl_request(easy)?) +} + +fn vector_engine_curl_headers(api_key: &str) -> Result { + let mut headers = List::new(); + headers.append(format!("Authorization: Bearer {api_key}").as_str())?; + headers.append("Accept: application/json")?; + Ok(headers) +} + +fn perform_curl_request(mut easy: Easy) -> Result { + let mut body = Vec::new(); + { + let mut transfer = easy.transfer(); + transfer.write_function(|data| { + body.extend_from_slice(data); + Ok(data.len()) + })?; + transfer.perform()?; + } + let status = easy.response_code()? as u16; + let body = String::from_utf8_lossy(body.as_slice()).into_owned(); + Ok(VectorEngineCurlResponse { status, body }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::vector_engine::types::ReferenceImage; + use std::time::Duration; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + }; + + #[tokio::test] + async fn vector_engine_curl_transport_posts_json_request() { + let (base_url, server) = start_single_response_server().await; + let response = send_vector_engine_json_request_with_curl( + format!("{base_url}/v1/images/generations").as_str(), + "test-key", + &serde_json::json!({"model":"gpt-image-2","prompt":"测试"}), + 1_000, + ) + .await + .expect("curl json request should succeed"); + + assert_eq!(response.status, 200); + assert_eq!(response.body, "{\"data\":[]}"); + server.abort(); + } + + #[tokio::test] + async fn vector_engine_curl_transport_posts_multipart_request() { + let (base_url, server) = start_single_response_server().await; + let response = send_vector_engine_multipart_edit_request_with_curl( + format!("{base_url}/v1/images/edits").as_str(), + "test-key", + "测试提示词", + None, + "1024x1024", + 1, + &[ReferenceImage { + bytes: b"reference".to_vec(), + mime_type: "image/png".to_string(), + file_name: "reference.png".to_string(), + }], + 1_000, + ) + .await + .expect("curl multipart request should succeed"); + + assert_eq!(response.status, 200); + assert_eq!(response.body, "{\"data\":[]}"); + server.abort(); + } + + async fn start_single_response_server() -> (String, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("mock server should bind"); + let addr = listener + .local_addr() + .expect("mock server addr should be readable"); + let server = tokio::spawn(async move { + let Ok((mut stream, _)) = listener.accept().await else { + return; + }; + let mut request = Vec::new(); + let mut buffer = [0_u8; 4096]; + loop { + let Ok(read) = stream.read(&mut buffer).await else { + return; + }; + if read == 0 { + return; + } + request.extend_from_slice(&buffer[..read]); + if request.windows(4).any(|window| window == b"\r\n\r\n") { + break; + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + let body = "{\"data\":[]}"; + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()).await; + }); + + (format!("http://{addr}"), server) + } +} diff --git a/server-rs/crates/platform-image/src/vector_engine/mod.rs b/server-rs/crates/platform-image/src/vector_engine/mod.rs index 6cdcf543..4bd39b30 100644 --- a/server-rs/crates/platform-image/src/vector_engine/mod.rs +++ b/server-rs/crates/platform-image/src/vector_engine/mod.rs @@ -1,6 +1,7 @@ mod audit; mod client; mod constants; +mod curl_transport; mod error; mod image_source; mod payload; diff --git a/server-rs/crates/platform-image/src/vector_engine/transport.rs b/server-rs/crates/platform-image/src/vector_engine/transport.rs index c74d6e04..6a63878b 100644 --- a/server-rs/crates/platform-image/src/vector_engine/transport.rs +++ b/server-rs/crates/platform-image/src/vector_engine/transport.rs @@ -1,10 +1,7 @@ -use std::{error::Error, time::Duration}; - -use serde_json::Value; +use std::time::Duration; use super::{ - audit::build_failure_audit, constants::VECTOR_ENGINE_PROVIDER, error::PlatformImageError, - types::VectorEngineImageSettings, + constants::VECTOR_ENGINE_PROVIDER, error::PlatformImageError, types::VectorEngineImageSettings, }; pub fn build_vector_engine_image_http_client( @@ -20,130 +17,3 @@ pub fn build_vector_engine_image_http_client( message: format!("构造 VectorEngine 图片生成 HTTP 客户端失败:{error}"), }) } - -pub(super) fn map_reqwest_error( - context: &str, - request_url: &str, - failure_stage: &'static str, - error: reqwest::Error, - latency_ms: u64, - prompt_chars: Option, - reference_image_count: Option, - request_params: Option<&Value>, -) -> PlatformImageError { - let is_timeout = error.is_timeout(); - let is_connect = error.is_connect(); - let source_chain_parts = collect_error_source_chain(&error); - let source = source_chain_parts.first().cloned(); - let source_chain_depth = source_chain_parts.len(); - let source_chain = if source_chain_parts.is_empty() { - None - } else { - Some(source_chain_parts.join(" -> ")) - }; - let message = format!("{context}:{error}"); - let audit = build_failure_audit( - request_url, - context, - failure_stage, - error.status().map(|status| status.as_u16()), - None, - is_timeout, - is_connect, - message.as_str(), - source_chain.clone().or_else(|| source.clone()), - None, - Some(latency_ms), - prompt_chars, - reference_image_count, - ); - tracing::warn!( - provider = VECTOR_ENGINE_PROVIDER, - endpoint = %request_url, - failure_stage, - timeout = is_timeout, - connect = is_connect, - request = error.is_request(), - body = error.is_body(), - status = error.status().map(|status| status.as_u16()).unwrap_or_default(), - source = %source.clone().unwrap_or_default(), - source_chain = %source_chain.clone().unwrap_or_default(), - source_chain_depth, - message = %message, - elapsed_ms = latency_ms, - prompt_chars, - reference_image_count, - request_params = %request_params - .map(|value| value.to_string()) - .unwrap_or_default(), - "VectorEngine 图片请求发送失败" - ); - - PlatformImageError::Request { - provider: VECTOR_ENGINE_PROVIDER, - message, - endpoint: Some(request_url.to_string()), - timeout: is_timeout, - connect: is_connect, - request: error.is_request(), - body: error.is_body(), - status_code: error.status().map(|status| status.as_u16()), - source: source_chain.or(source), - audit: Some(audit), - } -} - -fn collect_error_source_chain(error: &(dyn Error + 'static)) -> Vec { - let mut chain = Vec::new(); - let mut next = error.source(); - while let Some(source) = next { - chain.push(source.to_string()); - next = source.source(); - } - chain -} - -#[cfg(test)] -mod tests { - use super::*; - use std::fmt; - - #[derive(Debug)] - struct TestError { - message: &'static str, - source: Option>, - } - - impl fmt::Display for TestError { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter.write_str(self.message) - } - } - - impl Error for TestError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - self.source - .as_deref() - .map(|source| source as &(dyn Error + 'static)) - } - } - - #[test] - fn collect_error_source_chain_keeps_nested_causes() { - let error = TestError { - message: "top", - source: Some(Box::new(TestError { - message: "middle", - source: Some(Box::new(TestError { - message: "bottom", - source: None, - })), - })), - }; - - assert_eq!( - collect_error_source_chain(&error), - vec!["middle".to_string(), "bottom".to_string()] - ); - } -}