feat(api-server): audit external api failures

This commit is contained in:
kdletters
2026-05-21 16:33:13 +08:00
parent 487efff9c4
commit cc23b6020d
19 changed files with 2266 additions and 56 deletions

View File

@@ -0,0 +1,372 @@
use axum::http::StatusCode;
use module_runtime::RuntimeTrackingScopeKind;
use serde_json::{Value, json};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::{state::AppState, tracking::TrackingEventDraft};
pub(crate) const EXTERNAL_API_FAILURE_EVENT_KEY: &str = "external_api_call_failure";
pub(crate) const EXTERNAL_API_AUDIT_MODULE_KEY: &str = "external-api";
#[derive(Clone, Debug)]
pub(crate) struct ExternalApiFailureDraft {
pub(crate) provider: &'static str,
pub(crate) endpoint: String,
pub(crate) operation: String,
pub(crate) failure_stage: &'static str,
pub(crate) status_code: Option<u16>,
pub(crate) status_class: Option<&'static str>,
pub(crate) timeout: bool,
pub(crate) retryable: bool,
pub(crate) error_message: String,
pub(crate) error_source: Option<String>,
pub(crate) raw_excerpt: Option<String>,
pub(crate) latency_ms: Option<u64>,
pub(crate) prompt_chars: Option<usize>,
pub(crate) reference_image_count: Option<usize>,
pub(crate) image_model: Option<&'static str>,
}
impl ExternalApiFailureDraft {
pub(crate) fn new(
provider: &'static str,
endpoint: impl Into<String>,
operation: impl Into<String>,
failure_stage: &'static str,
error_message: impl Into<String>,
) -> Self {
Self {
provider,
endpoint: endpoint.into(),
operation: operation.into(),
failure_stage,
status_code: None,
status_class: None,
timeout: false,
retryable: false,
error_message: error_message.into(),
error_source: None,
raw_excerpt: None,
latency_ms: None,
prompt_chars: None,
reference_image_count: None,
image_model: None,
}
}
pub(crate) fn with_status_code(mut self, status_code: Option<u16>) -> Self {
self.status_code = status_code;
self
}
pub(crate) fn with_optional_status_class(mut self, status_class: Option<&'static str>) -> Self {
self.status_class = status_class;
self
}
pub(crate) fn with_timeout(mut self, timeout: bool) -> Self {
self.timeout = timeout;
self
}
pub(crate) fn with_retryable(mut self, retryable: bool) -> Self {
self.retryable = retryable;
self
}
pub(crate) fn with_error_source(mut self, error_source: Option<String>) -> Self {
self.error_source = error_source;
self
}
pub(crate) fn with_raw_excerpt(mut self, raw_excerpt: Option<String>) -> Self {
self.raw_excerpt = raw_excerpt;
self
}
pub(crate) fn with_latency_ms(mut self, latency_ms: Option<u64>) -> Self {
self.latency_ms = latency_ms;
self
}
pub(crate) fn with_prompt_chars(mut self, prompt_chars: Option<usize>) -> Self {
self.prompt_chars = prompt_chars;
self
}
pub(crate) fn with_reference_image_count(
mut self,
reference_image_count: Option<usize>,
) -> Self {
self.reference_image_count = reference_image_count;
self
}
pub(crate) fn with_image_model(mut self, image_model: Option<&'static str>) -> Self {
self.image_model = image_model;
self
}
}
/// 中文注释下载图片、OSS 读写等非标准 HTTP 状态统一显式归类,避免 OTLP 低基数 label 误落到 `transport`。
pub(crate) fn app_error_status_class(status_code: StatusCode) -> &'static str {
status_class(Some(status_code.as_u16()))
}
/// 中文注释:外部供应商失败同时进入 OTLP 和 tracking_event失败审计不能反向阻断主业务错误返回。
pub(crate) async fn record_external_api_failure(state: &AppState, draft: ExternalApiFailureDraft) {
record_external_api_failure_otlp(&draft);
let tracking_event = build_external_api_failure_tracking_draft(&draft);
if let Some(outbox) = state.tracking_outbox() {
match outbox
.enqueue(crate::tracking::build_tracking_event_input(
tracking_event.clone(),
))
.await
{
Ok(crate::tracking_outbox::TrackingOutboxEnqueueOutcome::Enqueued) => {}
Ok(crate::tracking_outbox::TrackingOutboxEnqueueOutcome::Dropped { reason }) => {
tracing::warn!(
provider = draft.provider,
endpoint = %draft.endpoint,
operation = %draft.operation,
failure_stage = draft.failure_stage,
reason,
"外部 API 失败审计写入 outbox 被保护阈值拒绝,回退同步直写 SpacetimeDB"
);
crate::tracking::record_tracking_event_after_success(
state,
&audit_request_context(),
tracking_event,
)
.await;
}
Err(error) => {
tracing::warn!(
provider = draft.provider,
endpoint = %draft.endpoint,
operation = %draft.operation,
failure_stage = draft.failure_stage,
error = %error,
"外部 API 失败审计写入 outbox 失败,回退同步直写 SpacetimeDB"
);
crate::tracking::record_tracking_event_after_success(
state,
&audit_request_context(),
tracking_event,
)
.await;
}
}
return;
}
crate::tracking::record_tracking_event_after_success(
state,
&audit_request_context(),
tracking_event,
)
.await;
}
pub(crate) fn build_external_api_failure_tracking_draft(
failure: &ExternalApiFailureDraft,
) -> TrackingEventDraft {
let mut draft = TrackingEventDraft::new(
EXTERNAL_API_FAILURE_EVENT_KEY,
EXTERNAL_API_AUDIT_MODULE_KEY,
);
draft.scope_kind = RuntimeTrackingScopeKind::Module;
draft.scope_id = failure.provider.to_string();
draft.metadata = build_external_api_failure_metadata(failure);
draft
}
fn build_external_api_failure_metadata(failure: &ExternalApiFailureDraft) -> Value {
let mut metadata = json!({
"provider": failure.provider,
"endpoint": failure.endpoint,
"operation": failure.operation,
"failureStage": failure.failure_stage,
"statusCode": failure.status_code,
"statusClass": failure.status_class.unwrap_or_else(|| status_class(failure.status_code)),
"timeout": failure.timeout,
"retryable": failure.retryable,
"errorMessage": truncate_field(failure.error_message.as_str(), 1_000),
"occurredAt": current_utc_iso_text(),
});
if let Some(latency_ms) = failure.latency_ms {
metadata["latencyMs"] = json!(latency_ms);
}
if let Some(prompt_chars) = failure.prompt_chars {
metadata["promptChars"] = json!(prompt_chars);
}
if let Some(reference_image_count) = failure.reference_image_count {
metadata["referenceImageCount"] = json!(reference_image_count);
}
if let Some(image_model) = failure.image_model {
metadata["imageModel"] = json!(image_model);
}
if let Some(source) = failure
.error_source
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
metadata["errorSource"] = json!(truncate_field(source, 1_000));
}
if let Some(excerpt) = failure
.raw_excerpt
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
metadata["rawExcerpt"] = json!(truncate_field(excerpt, 800));
}
metadata
}
pub(crate) fn is_retryable_external_api_failure(
status_code: Option<u16>,
timeout: bool,
connect: bool,
) -> bool {
timeout
|| connect
|| status_code.is_some_and(|status| {
status == StatusCode::TOO_MANY_REQUESTS.as_u16()
|| status == StatusCode::REQUEST_TIMEOUT.as_u16()
|| status >= 500
})
}
fn record_external_api_failure_otlp(failure: &ExternalApiFailureDraft) {
crate::telemetry::record_external_api_failure(
failure.provider,
failure.failure_stage,
failure
.status_class
.unwrap_or_else(|| status_class(failure.status_code)),
failure.retryable,
);
tracing::error!(
provider = failure.provider,
endpoint = %failure.endpoint,
operation = %failure.operation,
failure_stage = failure.failure_stage,
status_code = failure.status_code,
status_class = failure.status_class.unwrap_or_else(|| status_class(failure.status_code)),
timeout = failure.timeout,
retryable = failure.retryable,
latency_ms = failure.latency_ms,
prompt_chars = failure.prompt_chars,
reference_image_count = failure.reference_image_count,
image_model = failure.image_model,
error = %failure.error_message,
"外部 API 调用失败"
);
}
fn status_class(status_code: Option<u16>) -> &'static str {
match status_code {
Some(100..=199) => "1xx",
Some(200..=299) => "2xx",
Some(300..=399) => "3xx",
Some(400..=499) => "4xx",
Some(500..=599) => "5xx",
Some(_) => "unknown",
None => "transport",
}
}
fn audit_request_context() -> crate::request_context::RequestContext {
crate::request_context::RequestContext::new(
format!("external-api-audit-{}", Uuid::new_v4()),
"external-api audit".to_string(),
std::time::Duration::ZERO,
false,
)
}
fn truncate_field(value: &str, max_chars: usize) -> String {
value.chars().take(max_chars).collect()
}
fn current_utc_iso_text() -> String {
shared_kernel::format_rfc3339(OffsetDateTime::now_utc())
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
}
#[cfg(test)]
mod tests {
use serde_json::Value;
use super::*;
#[test]
fn external_api_failure_tracking_draft_uses_module_scope_and_safe_metadata() {
let draft = build_external_api_failure_tracking_draft(
&ExternalApiFailureDraft::new(
"vector-engine",
"https://vector.example/v1/images/generations",
"拼图 UI 背景图生成失败",
"upstream_status",
"上游 429",
)
.with_status_code(Some(429))
.with_retryable(true)
.with_latency_ms(Some(1234))
.with_prompt_chars(Some(88))
.with_reference_image_count(Some(2))
.with_image_model(Some("gpt-image-2-all")),
);
assert_eq!(draft.event_key, EXTERNAL_API_FAILURE_EVENT_KEY);
assert_eq!(draft.scope_kind, RuntimeTrackingScopeKind::Module);
assert_eq!(draft.scope_id, "vector-engine");
assert_eq!(draft.module_key, Some(EXTERNAL_API_AUDIT_MODULE_KEY));
let metadata = draft.metadata;
assert_eq!(metadata["provider"], "vector-engine");
assert_eq!(metadata["statusCode"], 429);
assert_eq!(metadata["statusClass"], "4xx");
assert_eq!(metadata["retryable"], true);
assert_eq!(metadata["latencyMs"], 1234);
assert_eq!(metadata["promptChars"], 88);
assert_eq!(metadata["referenceImageCount"], 2);
assert_eq!(metadata["imageModel"], "gpt-image-2-all");
assert!(matches!(metadata["occurredAt"], Value::String(_)));
}
#[test]
fn retryable_classification_keeps_transport_and_overload_failures_actionable() {
assert!(is_retryable_external_api_failure(None, true, false));
assert!(is_retryable_external_api_failure(None, false, true));
assert!(is_retryable_external_api_failure(Some(429), false, false));
assert!(is_retryable_external_api_failure(Some(502), false, false));
assert!(!is_retryable_external_api_failure(Some(400), false, false));
}
#[test]
fn app_error_status_class_can_override_successful_upstream_status() {
let draft = build_external_api_failure_tracking_draft(
&ExternalApiFailureDraft::new(
"vector-engine",
"https://cdn.example/generated.png",
"下载生成图片",
"image_download",
"下载生成图片失败",
)
.with_status_code(Some(200))
.with_optional_status_class(Some(app_error_status_class(StatusCode::BAD_GATEWAY))),
);
assert_eq!(draft.metadata["statusCode"], 200);
assert_eq!(draft.metadata["statusClass"], "5xx");
}
}