chore: add structured OSS logs
This commit is contained in:
@@ -12,6 +12,7 @@ serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
time = { workspace = true, features = ["formatting"] }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["macros", "rt"] }
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
5. 服务端 `PutObject` 上传 helper
|
||||
6. `x-oss-meta-*` 元数据归一化与大小限制校验
|
||||
7. `content-type`、`content-length-range`、`success_action_status` policy 条件生成
|
||||
8. `PostObject` 签名、`GetObject` 读签名、`HEAD Object` 和 `PutObject` 的结构化日志
|
||||
|
||||
当前仍未落地的内容:
|
||||
|
||||
@@ -34,8 +35,9 @@
|
||||
1. 当前产品口径为服务器上传 AI 生成资源、Web 端只负责读取。
|
||||
2. 因此 `STS` 不作为默认上传主链,`api-server` 只暴露禁用式 contract,避免浏览器拿到 OSS 写权限。
|
||||
3. 服务端生成资源应优先复用 `OssClient::put_object`,上传成功后再走对象确认链路写入 `asset_object`。
|
||||
4. 读签名和 `HEAD Object` 的入参必须直接传 object_key,不要把 bucket 名拼进路径;例如 `generated-square-hole-assets/.../image.png` 才是正确入参,`xushi-dev/...` 这类前缀不属于 object_key。
|
||||
5. OSS V4 `x-oss-date` 必须固定为 `yyyyMMdd'T'HHmmss'Z'`,不能依赖 `time::Time::to_string()`;后者在小时小于 10 时可能输出非补零时间,导致签名格式错误。
|
||||
4. 读签名和 `HEAD Object` 的入参必须直接传 object_key,不要把 bucket 名拼进路径;例如 `generated-square-hole-assets/.../image.png` 才是正确入参,`xushi-dev/...` 这类前缀不属于 object_key。
|
||||
5. OSS V4 `x-oss-date` 必须固定为 `yyyyMMdd'T'HHmmss'Z'`,不能依赖 `time::Time::to_string()`;后者在小时小于 10 时可能输出非补零时间,导致签名格式错误。
|
||||
6. 结构化日志只记录 `provider`、`operation`、`bucket`、`endpoint`、`object_key` / `key_prefix`、`access`、`content_type`、`content_length`、`status`、`status_class`、`error_kind` 和 `elapsed_ms` 等排障字段;禁止输出 AccessKey、policy、signature、Authorization header 或完整 signed URL。
|
||||
|
||||
## 3. 边界约束
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::BTreeMap, error::Error, fmt};
|
||||
use std::{collections::BTreeMap, error::Error, fmt, time::Instant};
|
||||
|
||||
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
|
||||
use hmac::{Hmac, Mac};
|
||||
@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Value, json};
|
||||
use sha2::{Digest, Sha256};
|
||||
use time::{Duration, OffsetDateTime, format_description::well_known::Rfc3339};
|
||||
use tracing::{info, warn};
|
||||
|
||||
type HmacSha256 = Hmac<Sha256>;
|
||||
|
||||
@@ -19,6 +20,7 @@ const OSS_V4_ALGORITHM: &str = "OSS4-HMAC-SHA256";
|
||||
const OSS_V4_REQUEST: &str = "aliyun_v4_request";
|
||||
const OSS_V4_SERVICE: &str = "oss";
|
||||
const OSS_UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD";
|
||||
const OSS_PROVIDER: &str = "aliyun-oss";
|
||||
|
||||
pub const LEGACY_PUBLIC_PREFIXES: [&str; 13] = [
|
||||
"generated-character-drafts",
|
||||
@@ -369,105 +371,154 @@ impl OssClient {
|
||||
&self,
|
||||
request: OssPostObjectRequest,
|
||||
) -> Result<OssPostObjectResponse, OssError> {
|
||||
let max_size_bytes = request
|
||||
.max_size_bytes
|
||||
.unwrap_or(self.config.default_post_max_size_bytes);
|
||||
let expire_seconds = request
|
||||
.expire_seconds
|
||||
.unwrap_or(self.config.default_post_expire_seconds);
|
||||
let success_action_status = request
|
||||
.success_action_status
|
||||
.unwrap_or(self.config.default_success_action_status);
|
||||
let started_at = Instant::now();
|
||||
let requested_prefix = request.prefix.as_str();
|
||||
let requested_content_type = request
|
||||
.content_type
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let requested_metadata_count = request.metadata.len();
|
||||
|
||||
if max_size_bytes == 0 {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"maxSizeBytes 必须大于 0".to_string(),
|
||||
));
|
||||
let result = (|| {
|
||||
let max_size_bytes = request
|
||||
.max_size_bytes
|
||||
.unwrap_or(self.config.default_post_max_size_bytes);
|
||||
let expire_seconds = request
|
||||
.expire_seconds
|
||||
.unwrap_or(self.config.default_post_expire_seconds);
|
||||
let success_action_status = request
|
||||
.success_action_status
|
||||
.unwrap_or(self.config.default_success_action_status);
|
||||
|
||||
if max_size_bytes == 0 {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"maxSizeBytes 必须大于 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if expire_seconds == 0 {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"expireSeconds 必须大于 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if !(100..=999).contains(&success_action_status) {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"successActionStatus 必须是三位 HTTP 状态码".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let sanitized_segments = request
|
||||
.path_segments
|
||||
.iter()
|
||||
.map(|segment| sanitize_path_segment(segment))
|
||||
.filter(|segment| !segment.is_empty())
|
||||
.collect::<Vec<_>>();
|
||||
let file_name = sanitize_file_name(&request.file_name)?;
|
||||
let object_key = build_object_key(request.prefix, &sanitized_segments, &file_name);
|
||||
let legacy_public_path = format!("/{}", object_key);
|
||||
let content_type = normalize_optional_value(request.content_type);
|
||||
let metadata = normalize_metadata(request.metadata)?;
|
||||
|
||||
let expires_at = OffsetDateTime::now_utc()
|
||||
.checked_add(Duration::seconds(i64::try_from(expire_seconds).map_err(
|
||||
|_| OssError::InvalidRequest("expireSeconds 超出可支持范围".to_string()),
|
||||
)?))
|
||||
.ok_or_else(|| {
|
||||
OssError::InvalidRequest("expireSeconds 计算结果溢出".to_string())
|
||||
})?;
|
||||
let expires_at = expires_at.format(&Rfc3339).map_err(|error| {
|
||||
OssError::SerializePolicy(format!("格式化过期时间失败:{error}"))
|
||||
})?;
|
||||
|
||||
let signed_at = OffsetDateTime::now_utc();
|
||||
let signature_scope = build_v4_signature_scope(&self.config.endpoint, signed_at)?;
|
||||
let signature_date = build_v4_signature_date(signed_at)?;
|
||||
let credential = format!("{}/{}", self.config.access_key_id, signature_scope);
|
||||
let policy_json = build_policy_json(
|
||||
&self.config.bucket,
|
||||
&object_key,
|
||||
&expires_at,
|
||||
max_size_bytes,
|
||||
success_action_status,
|
||||
content_type.as_deref(),
|
||||
&metadata,
|
||||
&credential,
|
||||
&signature_date,
|
||||
);
|
||||
let policy = serde_json::to_string(&policy_json).map_err(|error| {
|
||||
OssError::SerializePolicy(format!("序列化 policy 失败:{error}"))
|
||||
})?;
|
||||
let encoded_policy = BASE64_STANDARD.encode(policy.as_bytes());
|
||||
let signature = sign_v4_content(
|
||||
&self.config.access_key_secret,
|
||||
&signature_scope,
|
||||
&encoded_policy,
|
||||
)?;
|
||||
|
||||
Ok(OssPostObjectResponse {
|
||||
signature_version: "v4",
|
||||
provider: OSS_PROVIDER,
|
||||
bucket: self.config.bucket.clone(),
|
||||
endpoint: self.config.endpoint.clone(),
|
||||
host: self.config.upload_host(),
|
||||
object_key: object_key.clone(),
|
||||
legacy_public_path,
|
||||
content_type: content_type.clone(),
|
||||
access: request.access,
|
||||
key_prefix: build_key_prefix(request.prefix, &sanitized_segments),
|
||||
expires_at,
|
||||
max_size_bytes,
|
||||
success_action_status,
|
||||
form_fields: OssPostObjectFormFields {
|
||||
key: object_key,
|
||||
policy: encoded_policy,
|
||||
signature_version: OSS_V4_ALGORITHM.to_string(),
|
||||
credential,
|
||||
date: signature_date,
|
||||
signature,
|
||||
success_action_status: success_action_status.to_string(),
|
||||
content_type,
|
||||
metadata,
|
||||
},
|
||||
})
|
||||
})();
|
||||
|
||||
match &result {
|
||||
Ok(response) => info!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "sign_post_object",
|
||||
bucket = %response.bucket,
|
||||
endpoint = %response.endpoint,
|
||||
object_key = %response.object_key,
|
||||
key_prefix = %response.key_prefix,
|
||||
access = oss_access_label(response.access),
|
||||
content_type = %response.content_type.as_deref().unwrap_or(""),
|
||||
max_size_bytes = response.max_size_bytes,
|
||||
success_action_status = response.success_action_status,
|
||||
metadata_count = response.form_fields.metadata.len(),
|
||||
expires_at = %response.expires_at,
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS PostObject 签名完成"
|
||||
),
|
||||
Err(error) => warn!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "sign_post_object",
|
||||
bucket = %self.config.bucket(),
|
||||
endpoint = %self.config.endpoint(),
|
||||
key_prefix = requested_prefix,
|
||||
content_type = %requested_content_type,
|
||||
metadata_count = requested_metadata_count,
|
||||
error_kind = oss_error_kind_label(error),
|
||||
message = %error,
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS PostObject 签名失败"
|
||||
),
|
||||
}
|
||||
|
||||
if expire_seconds == 0 {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"expireSeconds 必须大于 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if !(100..=999).contains(&success_action_status) {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"successActionStatus 必须是三位 HTTP 状态码".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let sanitized_segments = request
|
||||
.path_segments
|
||||
.iter()
|
||||
.map(|segment| sanitize_path_segment(segment))
|
||||
.filter(|segment| !segment.is_empty())
|
||||
.collect::<Vec<_>>();
|
||||
let file_name = sanitize_file_name(&request.file_name)?;
|
||||
let object_key = build_object_key(request.prefix, &sanitized_segments, &file_name);
|
||||
let legacy_public_path = format!("/{}", object_key);
|
||||
let content_type = normalize_optional_value(request.content_type);
|
||||
let metadata = normalize_metadata(request.metadata)?;
|
||||
|
||||
let expires_at = OffsetDateTime::now_utc()
|
||||
.checked_add(Duration::seconds(i64::try_from(expire_seconds).map_err(
|
||||
|_| OssError::InvalidRequest("expireSeconds 超出可支持范围".to_string()),
|
||||
)?))
|
||||
.ok_or_else(|| OssError::InvalidRequest("expireSeconds 计算结果溢出".to_string()))?;
|
||||
let expires_at = expires_at
|
||||
.format(&Rfc3339)
|
||||
.map_err(|error| OssError::SerializePolicy(format!("格式化过期时间失败:{error}")))?;
|
||||
|
||||
let signed_at = OffsetDateTime::now_utc();
|
||||
let signature_scope = build_v4_signature_scope(&self.config.endpoint, signed_at)?;
|
||||
let signature_date = build_v4_signature_date(signed_at)?;
|
||||
let credential = format!("{}/{}", self.config.access_key_id, signature_scope);
|
||||
let policy_json = build_policy_json(
|
||||
&self.config.bucket,
|
||||
&object_key,
|
||||
&expires_at,
|
||||
max_size_bytes,
|
||||
success_action_status,
|
||||
content_type.as_deref(),
|
||||
&metadata,
|
||||
&credential,
|
||||
&signature_date,
|
||||
);
|
||||
let policy = serde_json::to_string(&policy_json)
|
||||
.map_err(|error| OssError::SerializePolicy(format!("序列化 policy 失败:{error}")))?;
|
||||
let encoded_policy = BASE64_STANDARD.encode(policy.as_bytes());
|
||||
let signature = sign_v4_content(
|
||||
&self.config.access_key_secret,
|
||||
&signature_scope,
|
||||
&encoded_policy,
|
||||
)?;
|
||||
|
||||
Ok(OssPostObjectResponse {
|
||||
signature_version: "v4",
|
||||
provider: "aliyun-oss",
|
||||
bucket: self.config.bucket.clone(),
|
||||
endpoint: self.config.endpoint.clone(),
|
||||
host: self.config.upload_host(),
|
||||
object_key: object_key.clone(),
|
||||
legacy_public_path,
|
||||
content_type: content_type.clone(),
|
||||
access: request.access,
|
||||
key_prefix: build_key_prefix(request.prefix, &sanitized_segments),
|
||||
expires_at,
|
||||
max_size_bytes,
|
||||
success_action_status,
|
||||
form_fields: OssPostObjectFormFields {
|
||||
key: object_key,
|
||||
policy: encoded_policy,
|
||||
signature_version: OSS_V4_ALGORITHM.to_string(),
|
||||
credential,
|
||||
date: signature_date,
|
||||
signature,
|
||||
success_action_status: success_action_status.to_string(),
|
||||
content_type,
|
||||
metadata,
|
||||
},
|
||||
})
|
||||
result
|
||||
}
|
||||
|
||||
// 私有 bucket 的对象读取统一走短期签名 URL,避免把长期主凭证下发给浏览器。
|
||||
@@ -475,81 +526,119 @@ impl OssClient {
|
||||
&self,
|
||||
request: OssSignedGetObjectUrlRequest,
|
||||
) -> Result<OssSignedGetObjectUrlResponse, OssError> {
|
||||
let expire_seconds = request
|
||||
.expire_seconds
|
||||
.unwrap_or(self.config.default_read_expire_seconds);
|
||||
let started_at = Instant::now();
|
||||
let requested_object_key = request
|
||||
.object_key
|
||||
.trim()
|
||||
.trim_start_matches('/')
|
||||
.trim()
|
||||
.to_string();
|
||||
|
||||
if expire_seconds == 0 {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"expireSeconds 必须大于 0".to_string(),
|
||||
));
|
||||
let result = (|| {
|
||||
let expire_seconds = request
|
||||
.expire_seconds
|
||||
.unwrap_or(self.config.default_read_expire_seconds);
|
||||
|
||||
if expire_seconds == 0 {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"expireSeconds 必须大于 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let object_key = normalize_object_key(&request.object_key)?;
|
||||
let expires_at = OffsetDateTime::now_utc()
|
||||
.checked_add(Duration::seconds(i64::try_from(expire_seconds).map_err(
|
||||
|_| OssError::InvalidRequest("expireSeconds 超出可支持范围".to_string()),
|
||||
)?))
|
||||
.ok_or_else(|| {
|
||||
OssError::InvalidRequest("expireSeconds 计算结果溢出".to_string())
|
||||
})?;
|
||||
let expires_at_text = expires_at
|
||||
.format(&Rfc3339)
|
||||
.map_err(|error| OssError::Sign(format!("格式化过期时间失败:{error}")))?;
|
||||
|
||||
let signed_at = OffsetDateTime::now_utc();
|
||||
let signed_at_text = build_v4_signature_date(signed_at)?;
|
||||
let signature_scope = build_v4_signature_scope(&self.config.endpoint, signed_at)?;
|
||||
let credential = format!("{}/{}", self.config.access_key_id, signature_scope);
|
||||
let mut query = BTreeMap::from([
|
||||
("x-oss-additional-headers".to_string(), "host".to_string()),
|
||||
(
|
||||
"x-oss-signature-version".to_string(),
|
||||
OSS_V4_ALGORITHM.to_string(),
|
||||
),
|
||||
("x-oss-credential".to_string(), credential),
|
||||
("x-oss-date".to_string(), signed_at_text),
|
||||
("x-oss-expires".to_string(), expire_seconds.to_string()),
|
||||
]);
|
||||
let canonical_uri = build_v4_canonical_uri(&self.config.bucket, Some(&object_key));
|
||||
let object_url_path = format!("/{}", encode_url_path(&object_key));
|
||||
let additional_headers = "host";
|
||||
let canonical_headers =
|
||||
format!("host:{}.{}\n", self.config.bucket(), self.config.endpoint());
|
||||
let canonical_query = build_canonical_query_string(&query);
|
||||
let canonical_request = build_v4_canonical_request(
|
||||
Method::GET.as_str(),
|
||||
&canonical_uri,
|
||||
&canonical_query,
|
||||
&canonical_headers,
|
||||
additional_headers,
|
||||
OSS_UNSIGNED_PAYLOAD,
|
||||
);
|
||||
let string_to_sign = build_v4_string_to_sign(
|
||||
query["x-oss-date"].as_str(),
|
||||
&signature_scope,
|
||||
&canonical_request,
|
||||
);
|
||||
let signature = sign_v4_content(
|
||||
&self.config.access_key_secret,
|
||||
&signature_scope,
|
||||
&string_to_sign,
|
||||
)?;
|
||||
query.insert("x-oss-signature".to_string(), signature);
|
||||
let signed_url = format!(
|
||||
"{}{}?{}",
|
||||
self.config.upload_host(),
|
||||
object_url_path,
|
||||
build_canonical_query_string(&query)
|
||||
);
|
||||
|
||||
Ok(OssSignedGetObjectUrlResponse {
|
||||
provider: OSS_PROVIDER,
|
||||
bucket: self.config.bucket.clone(),
|
||||
endpoint: self.config.endpoint.clone(),
|
||||
host: self.config.upload_host(),
|
||||
object_key,
|
||||
expires_at: expires_at_text,
|
||||
signed_url,
|
||||
})
|
||||
})();
|
||||
|
||||
match &result {
|
||||
Ok(response) => info!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "sign_get_object_url",
|
||||
bucket = %response.bucket,
|
||||
endpoint = %response.endpoint,
|
||||
object_key = %response.object_key,
|
||||
expires_at = %response.expires_at,
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS GetObject 读签名完成"
|
||||
),
|
||||
Err(error) => warn!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "sign_get_object_url",
|
||||
bucket = %self.config.bucket(),
|
||||
endpoint = %self.config.endpoint(),
|
||||
object_key = %requested_object_key,
|
||||
error_kind = oss_error_kind_label(error),
|
||||
message = %error,
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS GetObject 读签名失败"
|
||||
),
|
||||
}
|
||||
|
||||
let object_key = normalize_object_key(&request.object_key)?;
|
||||
let expires_at = OffsetDateTime::now_utc()
|
||||
.checked_add(Duration::seconds(i64::try_from(expire_seconds).map_err(
|
||||
|_| OssError::InvalidRequest("expireSeconds 超出可支持范围".to_string()),
|
||||
)?))
|
||||
.ok_or_else(|| OssError::InvalidRequest("expireSeconds 计算结果溢出".to_string()))?;
|
||||
let expires_at_text = expires_at
|
||||
.format(&Rfc3339)
|
||||
.map_err(|error| OssError::Sign(format!("格式化过期时间失败:{error}")))?;
|
||||
|
||||
let signed_at = OffsetDateTime::now_utc();
|
||||
let signed_at_text = build_v4_signature_date(signed_at)?;
|
||||
let signature_scope = build_v4_signature_scope(&self.config.endpoint, signed_at)?;
|
||||
let credential = format!("{}/{}", self.config.access_key_id, signature_scope);
|
||||
let mut query = BTreeMap::from([
|
||||
("x-oss-additional-headers".to_string(), "host".to_string()),
|
||||
(
|
||||
"x-oss-signature-version".to_string(),
|
||||
OSS_V4_ALGORITHM.to_string(),
|
||||
),
|
||||
("x-oss-credential".to_string(), credential),
|
||||
("x-oss-date".to_string(), signed_at_text),
|
||||
("x-oss-expires".to_string(), expire_seconds.to_string()),
|
||||
]);
|
||||
let canonical_uri = build_v4_canonical_uri(&self.config.bucket, Some(&object_key));
|
||||
let object_url_path = format!("/{}", encode_url_path(&object_key));
|
||||
let additional_headers = "host";
|
||||
let canonical_headers =
|
||||
format!("host:{}.{}\n", self.config.bucket(), self.config.endpoint());
|
||||
let canonical_query = build_canonical_query_string(&query);
|
||||
let canonical_request = build_v4_canonical_request(
|
||||
Method::GET.as_str(),
|
||||
&canonical_uri,
|
||||
&canonical_query,
|
||||
&canonical_headers,
|
||||
additional_headers,
|
||||
OSS_UNSIGNED_PAYLOAD,
|
||||
);
|
||||
let string_to_sign = build_v4_string_to_sign(
|
||||
query["x-oss-date"].as_str(),
|
||||
&signature_scope,
|
||||
&canonical_request,
|
||||
);
|
||||
let signature = sign_v4_content(
|
||||
&self.config.access_key_secret,
|
||||
&signature_scope,
|
||||
&string_to_sign,
|
||||
)?;
|
||||
query.insert("x-oss-signature".to_string(), signature);
|
||||
let signed_url = format!(
|
||||
"{}{}?{}",
|
||||
self.config.upload_host(),
|
||||
object_url_path,
|
||||
build_canonical_query_string(&query)
|
||||
);
|
||||
|
||||
Ok(OssSignedGetObjectUrlResponse {
|
||||
provider: "aliyun-oss",
|
||||
bucket: self.config.bucket.clone(),
|
||||
endpoint: self.config.endpoint.clone(),
|
||||
host: self.config.upload_host(),
|
||||
object_key,
|
||||
expires_at: expires_at_text,
|
||||
signed_url,
|
||||
})
|
||||
result
|
||||
}
|
||||
|
||||
// 上传完成确认前,服务端必须自己探测一次对象,不能只相信客户端回传的 object_key。
|
||||
@@ -558,59 +647,107 @@ impl OssClient {
|
||||
client: &reqwest::Client,
|
||||
request: OssHeadObjectRequest,
|
||||
) -> Result<OssHeadObjectResponse, OssError> {
|
||||
let object_key = normalize_object_key(&request.object_key)?;
|
||||
let target_url = build_object_url(&self.config.bucket, &self.config.endpoint, &object_key)
|
||||
.map_err(|error| OssError::Request(format!("构造 OSS 对象 URL 失败:{error}")))?;
|
||||
let response = send_signed_request(
|
||||
client,
|
||||
&self.config,
|
||||
Method::HEAD,
|
||||
Some(&object_key),
|
||||
target_url,
|
||||
)
|
||||
.await?;
|
||||
let started_at = Instant::now();
|
||||
let requested_object_key = request
|
||||
.object_key
|
||||
.trim()
|
||||
.trim_start_matches('/')
|
||||
.trim()
|
||||
.to_string();
|
||||
let mut response_status = None;
|
||||
|
||||
if response.status() == reqwest::StatusCode::NOT_FOUND {
|
||||
return Err(OssError::ObjectNotFound(format!(
|
||||
"OSS 对象不存在:{}",
|
||||
request.object_key
|
||||
)));
|
||||
let result = async {
|
||||
let object_key = normalize_object_key(&request.object_key)?;
|
||||
let target_url =
|
||||
build_object_url(&self.config.bucket, &self.config.endpoint, &object_key).map_err(
|
||||
|error| OssError::Request(format!("构造 OSS 对象 URL 失败:{error}")),
|
||||
)?;
|
||||
let response = send_signed_request(
|
||||
client,
|
||||
&self.config,
|
||||
Method::HEAD,
|
||||
Some(&object_key),
|
||||
target_url,
|
||||
)
|
||||
.await?;
|
||||
response_status = Some(response.status().as_u16());
|
||||
|
||||
if response.status() == reqwest::StatusCode::NOT_FOUND {
|
||||
return Err(OssError::ObjectNotFound(format!(
|
||||
"OSS 对象不存在:{}",
|
||||
request.object_key
|
||||
)));
|
||||
}
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(OssError::Request(format!(
|
||||
"OSS HEAD Object 失败,状态码:{}",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let headers = response.headers();
|
||||
let content_length = headers
|
||||
.get(reqwest::header::CONTENT_LENGTH)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<u64>().ok())
|
||||
.unwrap_or(0);
|
||||
let content_type = headers
|
||||
.get(reqwest::header::CONTENT_TYPE)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.to_string());
|
||||
let etag = headers
|
||||
.get(reqwest::header::ETAG)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.trim_matches('"').to_string());
|
||||
let last_modified = headers
|
||||
.get(reqwest::header::LAST_MODIFIED)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.to_string());
|
||||
|
||||
Ok(OssHeadObjectResponse {
|
||||
bucket: self.config.bucket.clone(),
|
||||
object_key,
|
||||
content_length,
|
||||
content_type,
|
||||
etag,
|
||||
last_modified,
|
||||
})
|
||||
}
|
||||
.await;
|
||||
|
||||
match &result {
|
||||
Ok(response) => info!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "head_object",
|
||||
bucket = %response.bucket,
|
||||
endpoint = %self.config.endpoint(),
|
||||
object_key = %response.object_key,
|
||||
status = response_status.unwrap_or(reqwest::StatusCode::OK.as_u16()),
|
||||
status_class = http_status_class_from_option(response_status),
|
||||
content_length = response.content_length,
|
||||
content_type = %response.content_type.as_deref().unwrap_or(""),
|
||||
etag_present = response.etag.is_some(),
|
||||
last_modified_present = response.last_modified.is_some(),
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS HEAD Object 完成"
|
||||
),
|
||||
Err(error) => warn!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "head_object",
|
||||
bucket = %self.config.bucket(),
|
||||
endpoint = %self.config.endpoint(),
|
||||
object_key = %requested_object_key,
|
||||
status = response_status.unwrap_or_default(),
|
||||
status_class = http_status_class_from_option(response_status),
|
||||
error_kind = oss_error_kind_label(error),
|
||||
message = %error,
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS HEAD Object 失败"
|
||||
),
|
||||
}
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(OssError::Request(format!(
|
||||
"OSS HEAD Object 失败,状态码:{}",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let headers = response.headers();
|
||||
let content_length = headers
|
||||
.get(reqwest::header::CONTENT_LENGTH)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<u64>().ok())
|
||||
.unwrap_or(0);
|
||||
let content_type = headers
|
||||
.get(reqwest::header::CONTENT_TYPE)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.to_string());
|
||||
let etag = headers
|
||||
.get(reqwest::header::ETAG)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.trim_matches('"').to_string());
|
||||
let last_modified = headers
|
||||
.get(reqwest::header::LAST_MODIFIED)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.to_string());
|
||||
|
||||
Ok(OssHeadObjectResponse {
|
||||
bucket: self.config.bucket.clone(),
|
||||
object_key,
|
||||
content_length,
|
||||
content_type,
|
||||
etag,
|
||||
last_modified,
|
||||
})
|
||||
result
|
||||
}
|
||||
|
||||
// AI 生成资源默认由服务端上传 OSS,Web 端只拿签名读地址,不直接持有写权限。
|
||||
@@ -619,73 +756,128 @@ impl OssClient {
|
||||
client: &reqwest::Client,
|
||||
request: OssPutObjectRequest,
|
||||
) -> Result<OssPutObjectResponse, OssError> {
|
||||
if request.body.is_empty() {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"服务端上传对象内容不能为空".to_string(),
|
||||
));
|
||||
let started_at = Instant::now();
|
||||
let requested_prefix = request.prefix.as_str();
|
||||
let requested_content_type = request
|
||||
.content_type
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let requested_content_length = request.body.len();
|
||||
let requested_metadata_count = request.metadata.len();
|
||||
let mut response_status = None;
|
||||
|
||||
let result = async {
|
||||
if request.body.is_empty() {
|
||||
return Err(OssError::InvalidRequest(
|
||||
"服务端上传对象内容不能为空".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let sanitized_segments = request
|
||||
.path_segments
|
||||
.iter()
|
||||
.map(|segment| sanitize_path_segment(segment))
|
||||
.filter(|segment| !segment.is_empty())
|
||||
.collect::<Vec<_>>();
|
||||
let file_name = sanitize_file_name(&request.file_name)?;
|
||||
let object_key = build_object_key(request.prefix, &sanitized_segments, &file_name);
|
||||
let content_type = normalize_optional_value(request.content_type);
|
||||
let metadata = normalize_metadata(request.metadata)?;
|
||||
let target_url =
|
||||
build_object_url(&self.config.bucket, &self.config.endpoint, &object_key).map_err(
|
||||
|error| OssError::Request(format!("构造 OSS 对象 URL 失败:{error}")),
|
||||
)?;
|
||||
let content_length = u64::try_from(request.body.len())
|
||||
.map_err(|_| OssError::InvalidRequest("上传对象大小超出可支持范围".to_string()))?;
|
||||
let builder = signed_request_builder(
|
||||
client,
|
||||
&self.config,
|
||||
Method::PUT,
|
||||
Some(&object_key),
|
||||
target_url,
|
||||
content_type.as_deref(),
|
||||
&metadata,
|
||||
)?
|
||||
.header(reqwest::header::CONTENT_LENGTH, content_length)
|
||||
.body(request.body);
|
||||
|
||||
let response = builder
|
||||
.send()
|
||||
.await
|
||||
.map_err(|error| OssError::Request(format!("请求 OSS 失败:{error}")))?;
|
||||
response_status = Some(response.status().as_u16());
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(OssError::Request(format!(
|
||||
"OSS PutObject 失败,状态码:{}",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let headers = response.headers();
|
||||
let etag = headers
|
||||
.get(reqwest::header::ETAG)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.trim_matches('"').to_string());
|
||||
let last_modified = headers
|
||||
.get(reqwest::header::LAST_MODIFIED)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.to_string());
|
||||
|
||||
Ok(OssPutObjectResponse {
|
||||
provider: OSS_PROVIDER,
|
||||
bucket: self.config.bucket.clone(),
|
||||
endpoint: self.config.endpoint.clone(),
|
||||
host: self.config.upload_host(),
|
||||
legacy_public_path: format!("/{object_key}"),
|
||||
object_key,
|
||||
content_type,
|
||||
content_length,
|
||||
access: request.access,
|
||||
etag,
|
||||
last_modified,
|
||||
})
|
||||
}
|
||||
.await;
|
||||
|
||||
match &result {
|
||||
Ok(response) => info!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "put_object",
|
||||
bucket = %response.bucket,
|
||||
endpoint = %response.endpoint,
|
||||
object_key = %response.object_key,
|
||||
access = oss_access_label(response.access),
|
||||
status = response_status.unwrap_or(reqwest::StatusCode::OK.as_u16()),
|
||||
status_class = http_status_class_from_option(response_status),
|
||||
content_length = response.content_length,
|
||||
content_type = %response.content_type.as_deref().unwrap_or(""),
|
||||
etag_present = response.etag.is_some(),
|
||||
last_modified_present = response.last_modified.is_some(),
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS PutObject 上传完成"
|
||||
),
|
||||
Err(error) => warn!(
|
||||
provider = OSS_PROVIDER,
|
||||
operation = "put_object",
|
||||
bucket = %self.config.bucket(),
|
||||
endpoint = %self.config.endpoint(),
|
||||
key_prefix = requested_prefix,
|
||||
content_length = requested_content_length,
|
||||
content_type = %requested_content_type,
|
||||
metadata_count = requested_metadata_count,
|
||||
status = response_status.unwrap_or_default(),
|
||||
status_class = http_status_class_from_option(response_status),
|
||||
error_kind = oss_error_kind_label(error),
|
||||
message = %error,
|
||||
elapsed_ms = elapsed_ms(started_at),
|
||||
"OSS PutObject 上传失败"
|
||||
),
|
||||
}
|
||||
|
||||
let sanitized_segments = request
|
||||
.path_segments
|
||||
.iter()
|
||||
.map(|segment| sanitize_path_segment(segment))
|
||||
.filter(|segment| !segment.is_empty())
|
||||
.collect::<Vec<_>>();
|
||||
let file_name = sanitize_file_name(&request.file_name)?;
|
||||
let object_key = build_object_key(request.prefix, &sanitized_segments, &file_name);
|
||||
let content_type = normalize_optional_value(request.content_type);
|
||||
let metadata = normalize_metadata(request.metadata)?;
|
||||
let target_url = build_object_url(&self.config.bucket, &self.config.endpoint, &object_key)
|
||||
.map_err(|error| OssError::Request(format!("构造 OSS 对象 URL 失败:{error}")))?;
|
||||
let content_length = u64::try_from(request.body.len())
|
||||
.map_err(|_| OssError::InvalidRequest("上传对象大小超出可支持范围".to_string()))?;
|
||||
let builder = signed_request_builder(
|
||||
client,
|
||||
&self.config,
|
||||
Method::PUT,
|
||||
Some(&object_key),
|
||||
target_url,
|
||||
content_type.as_deref(),
|
||||
&metadata,
|
||||
)?
|
||||
.header(reqwest::header::CONTENT_LENGTH, content_length)
|
||||
.body(request.body);
|
||||
|
||||
let response = builder
|
||||
.send()
|
||||
.await
|
||||
.map_err(|error| OssError::Request(format!("请求 OSS 失败:{error}")))?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(OssError::Request(format!(
|
||||
"OSS PutObject 失败,状态码:{}",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let headers = response.headers();
|
||||
let etag = headers
|
||||
.get(reqwest::header::ETAG)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.trim_matches('"').to_string());
|
||||
let last_modified = headers
|
||||
.get(reqwest::header::LAST_MODIFIED)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.to_string());
|
||||
|
||||
Ok(OssPutObjectResponse {
|
||||
provider: "aliyun-oss",
|
||||
bucket: self.config.bucket.clone(),
|
||||
endpoint: self.config.endpoint.clone(),
|
||||
host: self.config.upload_host(),
|
||||
legacy_public_path: format!("/{object_key}"),
|
||||
object_key,
|
||||
content_type,
|
||||
content_length,
|
||||
access: request.access,
|
||||
etag,
|
||||
last_modified,
|
||||
})
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -717,6 +909,43 @@ impl OssError {
|
||||
}
|
||||
}
|
||||
|
||||
fn elapsed_ms(started_at: Instant) -> u64 {
|
||||
started_at.elapsed().as_millis().min(u64::MAX as u128) as u64
|
||||
}
|
||||
|
||||
fn oss_access_label(access: OssObjectAccess) -> &'static str {
|
||||
match access {
|
||||
OssObjectAccess::Public => "public",
|
||||
OssObjectAccess::Private => "private",
|
||||
}
|
||||
}
|
||||
|
||||
fn oss_error_kind_label(error: &OssError) -> &'static str {
|
||||
match error.kind() {
|
||||
OssErrorKind::InvalidConfig => "invalid_config",
|
||||
OssErrorKind::InvalidRequest => "invalid_request",
|
||||
OssErrorKind::ObjectNotFound => "object_not_found",
|
||||
OssErrorKind::Request => "request",
|
||||
OssErrorKind::SerializePolicy => "serialize_policy",
|
||||
OssErrorKind::Sign => "sign",
|
||||
}
|
||||
}
|
||||
|
||||
fn http_status_class_from_option(status: Option<u16>) -> &'static str {
|
||||
status.map(http_status_class).unwrap_or("unknown")
|
||||
}
|
||||
|
||||
fn http_status_class(status: u16) -> &'static str {
|
||||
match status {
|
||||
100..=199 => "1xx",
|
||||
200..=299 => "2xx",
|
||||
300..=399 => "3xx",
|
||||
400..=499 => "4xx",
|
||||
500..=599 => "5xx",
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
fn build_policy_json(
|
||||
bucket: &str,
|
||||
object_key: &str,
|
||||
@@ -1295,6 +1524,18 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn structured_log_labels_are_stable() {
|
||||
assert_eq!(
|
||||
oss_error_kind_label(&OssError::InvalidRequest("bad input".to_string())),
|
||||
"invalid_request"
|
||||
);
|
||||
assert_eq!(oss_access_label(OssObjectAccess::Private), "private");
|
||||
assert_eq!(http_status_class(204), "2xx");
|
||||
assert_eq!(http_status_class(404), "4xx");
|
||||
assert_eq!(http_status_class_from_option(None), "unknown");
|
||||
}
|
||||
|
||||
fn build_client() -> OssClient {
|
||||
OssClient::new(
|
||||
OssConfig::new(
|
||||
|
||||
Reference in New Issue
Block a user