fix: retry VectorEngine image send timeouts
This commit is contained in:
@@ -9,6 +9,6 @@ base64 = { workspace = true }
|
||||
image = { workspace = true, features = ["jpeg", "png", "webp"] }
|
||||
reqwest = { workspace = true, features = ["json", "multipart", "rustls-tls"] }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["time"] }
|
||||
tokio = { workspace = true, features = ["io-util", "macros", "net", "time"] }
|
||||
tracing = { workspace = true }
|
||||
platform-oss = { workspace = true }
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use reqwest::header;
|
||||
use reqwest::{header, multipart};
|
||||
|
||||
const VECTOR_ENGINE_SEND_MAX_ATTEMPTS: u32 = 3;
|
||||
const VECTOR_ENGINE_SEND_RETRY_BASE_DELAY_MS: u64 = 500;
|
||||
|
||||
use super::{
|
||||
constants::{GPT_IMAGE_2_MODEL, VECTOR_ENGINE_PROVIDER},
|
||||
@@ -50,30 +53,49 @@ pub async fn create_vector_engine_image_generation(
|
||||
reference_images,
|
||||
);
|
||||
let started_at = std::time::Instant::now();
|
||||
let response = match http_client
|
||||
.post(request_url.as_str())
|
||||
.header(
|
||||
header::AUTHORIZATION,
|
||||
format!("Bearer {}", settings.api_key),
|
||||
)
|
||||
.header(header::ACCEPT, "application/json")
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.json(&request_body)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
return Err(map_reqwest_error(
|
||||
format!("{failure_context}:创建图片生成任务失败").as_str(),
|
||||
request_url.as_str(),
|
||||
"request_send",
|
||||
error,
|
||||
started_at.elapsed().as_millis() as u64,
|
||||
Some(prompt.chars().count()),
|
||||
Some(reference_images.len()),
|
||||
Some(&request_body),
|
||||
));
|
||||
let mut attempt = 1;
|
||||
let response = loop {
|
||||
match http_client
|
||||
.post(request_url.as_str())
|
||||
.header(
|
||||
header::AUTHORIZATION,
|
||||
format!("Bearer {}", settings.api_key),
|
||||
)
|
||||
.header(header::ACCEPT, "application/json")
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.json(&request_body)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => break response,
|
||||
Err(error) => {
|
||||
if should_retry_vector_engine_send_error(&error, attempt) {
|
||||
retry_vector_engine_send_after_delay(
|
||||
"generation",
|
||||
request_url.as_str(),
|
||||
"request_send",
|
||||
attempt,
|
||||
&error,
|
||||
started_at.elapsed().as_millis() as u64,
|
||||
Some(prompt.chars().count()),
|
||||
Some(reference_images.len()),
|
||||
Some(&request_body),
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
continue;
|
||||
}
|
||||
return Err(map_reqwest_error(
|
||||
format!("{failure_context}:创建图片生成任务失败").as_str(),
|
||||
request_url.as_str(),
|
||||
"request_send",
|
||||
error,
|
||||
started_at.elapsed().as_millis() as u64,
|
||||
Some(prompt.chars().count()),
|
||||
Some(reference_images.len()),
|
||||
Some(&request_body),
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
let response_status = response.status();
|
||||
@@ -84,6 +106,7 @@ pub async fn create_vector_engine_image_generation(
|
||||
prompt_chars = prompt.chars().count(),
|
||||
size = %normalized_size,
|
||||
reference_image_count = reference_images.len(),
|
||||
attempt,
|
||||
elapsed_ms = started_at.elapsed().as_millis() as u64,
|
||||
failure_context,
|
||||
"VectorEngine 图片生成 HTTP 返回"
|
||||
@@ -167,26 +190,6 @@ pub async fn create_vector_engine_image_edit_with_references(
|
||||
reference_images,
|
||||
);
|
||||
|
||||
let mut form = reqwest::multipart::Form::new()
|
||||
.text("model", GPT_IMAGE_2_MODEL.to_string())
|
||||
.text(
|
||||
"prompt",
|
||||
build_prompt_with_negative(prompt, negative_prompt),
|
||||
)
|
||||
.text("n", candidate_count.clamp(1, 4).to_string())
|
||||
.text("size", normalized_size.clone());
|
||||
|
||||
for reference_image in reference_images.iter().take(5) {
|
||||
let image_part = reqwest::multipart::Part::bytes(reference_image.bytes.clone())
|
||||
.file_name(reference_image.file_name.clone())
|
||||
.mime_str(reference_image.mime_type.as_str())
|
||||
.map_err(|error| PlatformImageError::InvalidRequest {
|
||||
provider: VECTOR_ENGINE_PROVIDER,
|
||||
message: format!("{failure_context}:构造参考图失败:{error}"),
|
||||
})?;
|
||||
form = form.part("image", image_part);
|
||||
}
|
||||
|
||||
let reference_image_count = reference_images.iter().take(5).count();
|
||||
let reference_image_bytes_total: usize = reference_images
|
||||
.iter()
|
||||
@@ -214,29 +217,56 @@ pub async fn create_vector_engine_image_edit_with_references(
|
||||
failure_context,
|
||||
"VectorEngine 图片编辑请求参数"
|
||||
);
|
||||
let response = match http_client
|
||||
.post(request_url.as_str())
|
||||
.header(
|
||||
header::AUTHORIZATION,
|
||||
format!("Bearer {}", settings.api_key),
|
||||
)
|
||||
.header(header::ACCEPT, "application/json")
|
||||
.multipart(form)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
return Err(map_reqwest_error(
|
||||
format!("{failure_context}:创建图片编辑任务失败").as_str(),
|
||||
request_url.as_str(),
|
||||
"request_send",
|
||||
error,
|
||||
started_at.elapsed().as_millis() as u64,
|
||||
Some(prompt.chars().count()),
|
||||
Some(reference_image_count),
|
||||
Some(&request_params),
|
||||
));
|
||||
let mut attempt = 1;
|
||||
let response = loop {
|
||||
let form = build_vector_engine_image_edit_form(
|
||||
prompt,
|
||||
negative_prompt,
|
||||
normalized_size.as_str(),
|
||||
candidate_count,
|
||||
reference_images,
|
||||
failure_context,
|
||||
)?;
|
||||
match http_client
|
||||
.post(request_url.as_str())
|
||||
.header(
|
||||
header::AUTHORIZATION,
|
||||
format!("Bearer {}", settings.api_key),
|
||||
)
|
||||
.header(header::ACCEPT, "application/json")
|
||||
.multipart(form)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => break response,
|
||||
Err(error) => {
|
||||
if should_retry_vector_engine_send_error(&error, attempt) {
|
||||
retry_vector_engine_send_after_delay(
|
||||
"edit",
|
||||
request_url.as_str(),
|
||||
"request_send",
|
||||
attempt,
|
||||
&error,
|
||||
started_at.elapsed().as_millis() as u64,
|
||||
Some(prompt.chars().count()),
|
||||
Some(reference_image_count),
|
||||
Some(&request_params),
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
continue;
|
||||
}
|
||||
return Err(map_reqwest_error(
|
||||
format!("{failure_context}:创建图片编辑任务失败").as_str(),
|
||||
request_url.as_str(),
|
||||
"request_send",
|
||||
error,
|
||||
started_at.elapsed().as_millis() as u64,
|
||||
Some(prompt.chars().count()),
|
||||
Some(reference_image_count),
|
||||
Some(&request_params),
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
let response_status = response.status();
|
||||
@@ -249,6 +279,7 @@ pub async fn create_vector_engine_image_edit_with_references(
|
||||
reference_image_count,
|
||||
reference_image_bytes_total,
|
||||
request_params = %request_params,
|
||||
attempt,
|
||||
elapsed_ms = started_at.elapsed().as_millis() as u64,
|
||||
failure_context,
|
||||
"VectorEngine 图片编辑 HTTP 返回"
|
||||
@@ -282,3 +313,75 @@ pub async fn create_vector_engine_image_edit_with_references(
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn build_vector_engine_image_edit_form(
|
||||
prompt: &str,
|
||||
negative_prompt: Option<&str>,
|
||||
normalized_size: &str,
|
||||
candidate_count: u32,
|
||||
reference_images: &[ReferenceImage],
|
||||
failure_context: &str,
|
||||
) -> Result<multipart::Form, PlatformImageError> {
|
||||
let mut form = multipart::Form::new()
|
||||
.text("model", GPT_IMAGE_2_MODEL.to_string())
|
||||
.text(
|
||||
"prompt",
|
||||
build_prompt_with_negative(prompt, negative_prompt),
|
||||
)
|
||||
.text("n", candidate_count.clamp(1, 4).to_string())
|
||||
.text("size", normalized_size.to_string());
|
||||
|
||||
for reference_image in reference_images.iter().take(5) {
|
||||
let image_part = multipart::Part::bytes(reference_image.bytes.clone())
|
||||
.file_name(reference_image.file_name.clone())
|
||||
.mime_str(reference_image.mime_type.as_str())
|
||||
.map_err(|error| PlatformImageError::InvalidRequest {
|
||||
provider: VECTOR_ENGINE_PROVIDER,
|
||||
message: format!("{failure_context}:构造参考图失败:{error}"),
|
||||
})?;
|
||||
form = form.part("image", image_part);
|
||||
}
|
||||
|
||||
Ok(form)
|
||||
}
|
||||
|
||||
fn should_retry_vector_engine_send_error(error: &reqwest::Error, attempt: u32) -> bool {
|
||||
attempt < VECTOR_ENGINE_SEND_MAX_ATTEMPTS && (error.is_timeout() || error.is_connect())
|
||||
}
|
||||
|
||||
async fn retry_vector_engine_send_after_delay(
|
||||
request_kind: &'static str,
|
||||
request_url: &str,
|
||||
failure_stage: &'static str,
|
||||
attempt: u32,
|
||||
error: &reqwest::Error,
|
||||
elapsed_ms: u64,
|
||||
prompt_chars: Option<usize>,
|
||||
reference_image_count: Option<usize>,
|
||||
request_params: Option<&serde_json::Value>,
|
||||
) {
|
||||
let delay_ms = VECTOR_ENGINE_SEND_RETRY_BASE_DELAY_MS * u64::from(attempt);
|
||||
tracing::warn!(
|
||||
provider = VECTOR_ENGINE_PROVIDER,
|
||||
endpoint = %request_url,
|
||||
request_kind,
|
||||
failure_stage,
|
||||
attempt,
|
||||
max_attempts = VECTOR_ENGINE_SEND_MAX_ATTEMPTS,
|
||||
retry_delay_ms = delay_ms,
|
||||
timeout = error.is_timeout(),
|
||||
connect = error.is_connect(),
|
||||
request = error.is_request(),
|
||||
body = error.is_body(),
|
||||
status = error.status().map(|status| status.as_u16()).unwrap_or_default(),
|
||||
error = %error,
|
||||
elapsed_ms,
|
||||
prompt_chars,
|
||||
reference_image_count,
|
||||
request_params = %request_params
|
||||
.map(|value| value.to_string())
|
||||
.unwrap_or_default(),
|
||||
"VectorEngine 图片请求发送失败,准备重试"
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,20 @@
|
||||
use platform_image::vector_engine::{
|
||||
GPT_IMAGE_2_MODEL, VECTOR_ENGINE_PROVIDER, VectorEngineImageSettings,
|
||||
build_vector_engine_image_request_body, vector_engine_images_edit_url,
|
||||
GPT_IMAGE_2_MODEL, ReferenceImage, VECTOR_ENGINE_PROVIDER, VectorEngineImageSettings,
|
||||
build_vector_engine_image_http_client, build_vector_engine_image_request_body,
|
||||
create_vector_engine_image_edit, vector_engine_images_edit_url,
|
||||
vector_engine_images_generation_url,
|
||||
};
|
||||
use std::{
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpListener,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn vector_engine_module_exposes_provider_protocol_helpers() {
|
||||
@@ -30,3 +42,70 @@ fn vector_engine_module_exposes_provider_protocol_helpers() {
|
||||
"https://vector.example/v1/images/edits"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn vector_engine_image_edit_retries_send_timeout_once_and_succeeds() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("mock server should bind");
|
||||
let server_addr = listener
|
||||
.local_addr()
|
||||
.expect("mock server address should be readable");
|
||||
let request_count = Arc::new(AtomicUsize::new(0));
|
||||
let request_count_for_server = Arc::clone(&request_count);
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
loop {
|
||||
let Ok((mut stream, _)) = listener.accept().await else {
|
||||
break;
|
||||
};
|
||||
let request_index = request_count_for_server.fetch_add(1, Ordering::SeqCst);
|
||||
tokio::spawn(async move {
|
||||
let mut buffer = [0_u8; 4096];
|
||||
let _ = stream.read(&mut buffer).await;
|
||||
if request_index == 0 {
|
||||
tokio::time::sleep(Duration::from_millis(120)).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let body = r#"{"data":[{"b64_json":"iVBORw0KGgpyZXN0"}]}"#;
|
||||
let response = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
|
||||
body.len(),
|
||||
body
|
||||
);
|
||||
let _ = stream.write_all(response.as_bytes()).await;
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
let settings = VectorEngineImageSettings {
|
||||
base_url: format!("http://{server_addr}/v1"),
|
||||
api_key: "test-key".to_string(),
|
||||
request_timeout_ms: 40,
|
||||
};
|
||||
let http_client =
|
||||
build_vector_engine_image_http_client(&settings).expect("client should build");
|
||||
let reference_image = ReferenceImage {
|
||||
bytes: b"reference".to_vec(),
|
||||
mime_type: "image/png".to_string(),
|
||||
file_name: "reference.png".to_string(),
|
||||
};
|
||||
|
||||
let generated = create_vector_engine_image_edit(
|
||||
&http_client,
|
||||
&settings,
|
||||
"测试提示词",
|
||||
None,
|
||||
"1024x1024",
|
||||
&reference_image,
|
||||
"测试 VectorEngine 图片编辑失败",
|
||||
)
|
||||
.await
|
||||
.expect("second attempt should return generated image");
|
||||
|
||||
assert_eq!(generated.images.len(), 1);
|
||||
assert_eq!(generated.images[0].mime_type, "image/png");
|
||||
assert_eq!(request_count.load(Ordering::SeqCst), 2);
|
||||
server.abort();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user