fix: 优化跳一跳运行态与地块资源

This commit is contained in:
2026-06-09 01:28:30 +08:00
parent c9c66f046b
commit a0473771f1
30 changed files with 3180 additions and 1010 deletions

View File

@@ -18,6 +18,7 @@ use super::{
},
response::handle_vector_engine_response,
types::{GeneratedImages, ReferenceImage, VectorEngineImageSettings},
util::truncate_raw,
};
pub async fn create_vector_engine_image_generation(
@@ -66,7 +67,25 @@ pub async fn create_vector_engine_image_generation(
)
.await
{
Ok(response) => break response,
Ok(response) => {
if should_retry_vector_engine_upstream_status(response.status, attempt) {
retry_vector_engine_upstream_status_after_delay(
"generation",
request_url.as_str(),
attempt,
response.status,
response.body.as_str(),
started_at.elapsed().as_millis() as u64,
Some(prompt.chars().count()),
Some(reference_images.len()),
Some(&request_body),
)
.await;
attempt += 1;
continue;
}
break response;
}
Err(error) => {
if should_retry_vector_engine_curl_send_error(&error, attempt) {
retry_vector_engine_send_after_delay(
@@ -75,7 +94,7 @@ pub async fn create_vector_engine_image_generation(
"request_send",
attempt,
error.is_timeout(),
error.is_connect(),
error.is_connect() || error.is_transient_transport(),
true,
false,
error.to_string().as_str(),
@@ -220,7 +239,25 @@ pub async fn create_vector_engine_image_edit_with_references(
)
.await
{
Ok(response) => break response,
Ok(response) => {
if should_retry_vector_engine_upstream_status(response.status, attempt) {
retry_vector_engine_upstream_status_after_delay(
"edit",
request_url.as_str(),
attempt,
response.status,
response.body.as_str(),
started_at.elapsed().as_millis() as u64,
Some(prompt.chars().count()),
Some(reference_image_count),
Some(&request_params),
)
.await;
attempt += 1;
continue;
}
break response;
}
Err(error) => {
if should_retry_vector_engine_curl_send_error(&error, attempt) {
retry_vector_engine_send_after_delay(
@@ -229,7 +266,7 @@ pub async fn create_vector_engine_image_edit_with_references(
"request_send",
attempt,
error.is_timeout(),
error.is_connect(),
error.is_connect() || error.is_transient_transport(),
true,
false,
error.to_string().as_str(),
@@ -290,7 +327,12 @@ fn should_retry_vector_engine_curl_send_error(
error: &super::curl_transport::VectorEngineCurlError,
attempt: u32,
) -> bool {
attempt < VECTOR_ENGINE_SEND_MAX_ATTEMPTS && (error.is_timeout() || error.is_connect())
attempt < VECTOR_ENGINE_SEND_MAX_ATTEMPTS
&& (error.is_timeout() || error.is_connect() || error.is_transient_transport())
}
fn should_retry_vector_engine_upstream_status(status: u16, attempt: u32) -> bool {
attempt < VECTOR_ENGINE_SEND_MAX_ATTEMPTS && (status == 408 || status == 429 || status >= 500)
}
async fn retry_vector_engine_send_after_delay(
@@ -334,6 +376,40 @@ async fn retry_vector_engine_send_after_delay(
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
async fn retry_vector_engine_upstream_status_after_delay(
request_kind: &'static str,
request_url: &str,
attempt: u32,
status: u16,
raw_body: &str,
elapsed_ms: u64,
prompt_chars: Option<usize>,
reference_image_count: Option<usize>,
request_params: Option<&serde_json::Value>,
) {
let delay_ms = vector_engine_send_retry_delay_ms(attempt, vector_engine_send_retry_jitter_ms());
tracing::warn!(
provider = VECTOR_ENGINE_PROVIDER,
endpoint = %request_url,
request_kind,
failure_stage = "upstream_status",
attempt,
max_attempts = VECTOR_ENGINE_SEND_MAX_ATTEMPTS,
retry_delay_ms = delay_ms,
status,
retryable = true,
elapsed_ms,
prompt_chars,
reference_image_count,
raw_excerpt = %truncate_raw(raw_body),
request_params = %request_params
.map(|value| value.to_string())
.unwrap_or_default(),
"VectorEngine 图片上游状态可重试,准备重试"
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
fn vector_engine_send_retry_delay_ms(attempt: u32, jitter_ms: u64) -> u64 {
let exponential_factor = 1_u64 << attempt.saturating_sub(1).min(10);
let bounded_jitter_ms = jitter_ms.min(VECTOR_ENGINE_SEND_RETRY_MAX_JITTER_MS);
@@ -357,6 +433,33 @@ mod tests {
assert_eq!(VECTOR_ENGINE_SEND_MAX_ATTEMPTS, 5);
}
#[test]
fn vector_engine_send_retry_policy_treats_ssl_reset_as_transient_transport() {
let error = super::super::curl_transport::VectorEngineCurlError::Curl(curl::Error::new(35));
assert!(error.is_transient_transport());
assert!(should_retry_vector_engine_curl_send_error(&error, 1));
assert!(!should_retry_vector_engine_curl_send_error(&error, 5));
}
#[test]
fn vector_engine_send_retry_policy_treats_recv_eof_as_transient_transport() {
let error = super::super::curl_transport::VectorEngineCurlError::Curl(curl::Error::new(56));
assert!(error.is_transient_transport());
assert!(should_retry_vector_engine_curl_send_error(&error, 1));
assert!(!should_retry_vector_engine_curl_send_error(&error, 5));
}
#[test]
fn vector_engine_send_retry_policy_treats_upstream_502_as_retryable() {
assert!(should_retry_vector_engine_upstream_status(502, 1));
assert!(should_retry_vector_engine_upstream_status(429, 1));
assert!(should_retry_vector_engine_upstream_status(408, 1));
assert!(!should_retry_vector_engine_upstream_status(400, 1));
assert!(!should_retry_vector_engine_upstream_status(502, 5));
}
#[test]
fn vector_engine_send_retry_delay_uses_exponential_backoff_with_bounded_jitter() {
assert_eq!(vector_engine_send_retry_delay_ms(1, 0), 500);

View File

@@ -45,6 +45,25 @@ impl VectorEngineCurlError {
Self::Form(_) | Self::WorkerJoin(_) => false,
}
}
pub(crate) fn is_transient_transport(&self) -> bool {
match self {
Self::Curl(error) => {
let message = error.to_string().to_ascii_lowercase();
error.is_ssl_connect_error()
|| error.is_recv_error()
|| error.is_send_error()
|| message.contains("connection reset")
|| message.contains("recv failure")
|| message.contains("receive failure")
|| message.contains("receiving data")
|| message.contains("unexpected eof")
|| message.contains("send failure")
|| message.contains("broken pipe")
}
Self::Form(_) | Self::WorkerJoin(_) => false,
}
}
}
impl fmt::Display for VectorEngineCurlError {
@@ -136,7 +155,7 @@ pub(crate) fn map_curl_error(
request_params: Option<&Value>,
) -> PlatformImageError {
let is_timeout = error.is_timeout();
let is_connect = error.is_connect();
let is_connect = error.is_connect() || error.is_transient_transport();
let source = error.to_string();
let message = format!("{context}{source}");
let audit = build_failure_audit(

View File

@@ -1,8 +1,8 @@
use platform_image::vector_engine::{
GPT_IMAGE_2_MODEL, ReferenceImage, VECTOR_ENGINE_PROVIDER, VectorEngineImageSettings,
build_vector_engine_image_http_client, build_vector_engine_image_request_body,
create_vector_engine_image_edit, vector_engine_images_edit_url,
vector_engine_images_generation_url,
create_vector_engine_image_edit, create_vector_engine_image_generation,
vector_engine_images_edit_url, vector_engine_images_generation_url,
};
use std::{
sync::{
@@ -109,3 +109,72 @@ async fn vector_engine_image_edit_retries_send_timeout_once_and_succeeds() {
assert_eq!(request_count.load(Ordering::SeqCst), 2);
server.abort();
}
#[tokio::test]
async fn vector_engine_image_generation_retries_upstream_502_once_and_succeeds() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("mock server should bind");
let server_addr = listener
.local_addr()
.expect("mock server address should be readable");
let request_count = Arc::new(AtomicUsize::new(0));
let request_count_for_server = Arc::clone(&request_count);
let server = tokio::spawn(async move {
loop {
let Ok((mut stream, _)) = listener.accept().await else {
break;
};
let request_index = request_count_for_server.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
let mut buffer = [0_u8; 4096];
let _ = stream.read(&mut buffer).await;
if request_index == 0 {
let body = "<html><head><title>502 Bad Gateway</title></head><body><center><h1>502 Bad Gateway</h1></center><hr><center>nginx</center></body></html>";
let response = format!(
"HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
return;
}
let body = r#"{"data":[{"b64_json":"iVBORw0KGgpyZXN0"}]}"#;
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
});
}
});
let settings = VectorEngineImageSettings {
base_url: format!("http://{server_addr}/v1"),
api_key: "test-key".to_string(),
request_timeout_ms: 1_000,
};
let http_client =
build_vector_engine_image_http_client(&settings).expect("client should build");
let generated = create_vector_engine_image_generation(
&http_client,
&settings,
"测试提示词",
None,
"1024x1024",
1,
&[],
"测试 VectorEngine 图片生成失败",
)
.await
.expect("second attempt should return generated image");
assert_eq!(generated.images.len(), 1);
assert_eq!(generated.images[0].mime_type, "image/png");
assert_eq!(request_count.load(Ordering::SeqCst), 2);
server.abort();
}