255 lines
8.6 KiB
Rust
255 lines
8.6 KiB
Rust
use std::{
|
||
collections::HashMap,
|
||
error::Error,
|
||
fmt,
|
||
sync::{Arc, Mutex},
|
||
};
|
||
|
||
use platform_oss::{OssClient, OssError, OssHeadObjectRequest};
|
||
use reqwest::Client;
|
||
|
||
use crate::{
|
||
AssetObjectAccessPolicy, AssetObjectFieldError, AssetObjectRecord, AssetObjectUpsertSnapshot,
|
||
ConfirmAssetObjectInput, ConfirmAssetObjectResult, INITIAL_ASSET_OBJECT_VERSION,
|
||
build_asset_object_record, build_asset_object_upsert_input, generate_asset_object_id,
|
||
normalize_optional_value, validate_asset_object_fields,
|
||
};
|
||
|
||
#[derive(Clone, Debug)]
|
||
pub struct InMemoryAssetObjectStore {
|
||
inner: Arc<Mutex<HashMap<(String, String), AssetObjectUpsertSnapshot>>>,
|
||
}
|
||
|
||
#[derive(Clone, Debug)]
|
||
pub struct AssetObjectService {
|
||
store: InMemoryAssetObjectStore,
|
||
http_client: Client,
|
||
}
|
||
|
||
#[derive(Debug, PartialEq, Eq)]
|
||
pub enum ConfirmAssetObjectError {
|
||
BucketMismatch,
|
||
ContentLengthMismatch,
|
||
Field(AssetObjectFieldError),
|
||
Oss(OssError),
|
||
Store(String),
|
||
}
|
||
|
||
impl Default for InMemoryAssetObjectStore {
|
||
fn default() -> Self {
|
||
Self {
|
||
inner: Arc::new(Mutex::new(HashMap::new())),
|
||
}
|
||
}
|
||
}
|
||
|
||
impl InMemoryAssetObjectStore {
|
||
fn upsert_by_location(
|
||
&self,
|
||
record: AssetObjectUpsertSnapshot,
|
||
) -> Result<AssetObjectRecord, ConfirmAssetObjectError> {
|
||
let mut state = self
|
||
.inner
|
||
.lock()
|
||
.map_err(|_| ConfirmAssetObjectError::Store("资产对象仓储锁已中毒".to_string()))?;
|
||
|
||
let key = (record.bucket.clone(), record.object_key.clone());
|
||
let next_record = match state.get(&key) {
|
||
Some(existing) => AssetObjectUpsertSnapshot {
|
||
asset_object_id: existing.asset_object_id.clone(),
|
||
created_at_micros: existing.created_at_micros,
|
||
..record
|
||
},
|
||
None => record,
|
||
};
|
||
state.insert(key, next_record.clone());
|
||
|
||
Ok(build_asset_object_record(next_record))
|
||
}
|
||
}
|
||
|
||
impl AssetObjectService {
|
||
pub fn new(store: InMemoryAssetObjectStore) -> Self {
|
||
Self {
|
||
store,
|
||
http_client: Client::new(),
|
||
}
|
||
}
|
||
|
||
pub async fn confirm_object(
|
||
&self,
|
||
oss_client: &OssClient,
|
||
input: ConfirmAssetObjectInput,
|
||
) -> Result<ConfirmAssetObjectResult, ConfirmAssetObjectError> {
|
||
let configured_bucket = oss_client.config_bucket().to_string();
|
||
let resolved_bucket = input
|
||
.bucket
|
||
.as_deref()
|
||
.map(str::trim)
|
||
.filter(|value| !value.is_empty())
|
||
.unwrap_or(configured_bucket.as_str())
|
||
.to_string();
|
||
|
||
if resolved_bucket != configured_bucket {
|
||
return Err(ConfirmAssetObjectError::BucketMismatch);
|
||
}
|
||
|
||
validate_asset_object_fields(
|
||
&resolved_bucket,
|
||
&input.object_key,
|
||
&input.asset_kind,
|
||
INITIAL_ASSET_OBJECT_VERSION,
|
||
)
|
||
.map_err(ConfirmAssetObjectError::Field)?;
|
||
|
||
let head = oss_client
|
||
.head_object(
|
||
&self.http_client,
|
||
OssHeadObjectRequest {
|
||
object_key: input.object_key.clone(),
|
||
},
|
||
)
|
||
.await
|
||
.map_err(ConfirmAssetObjectError::Oss)?;
|
||
|
||
if let Some(expected_length) = input.content_length
|
||
&& expected_length != head.content_length
|
||
{
|
||
return Err(ConfirmAssetObjectError::ContentLengthMismatch);
|
||
}
|
||
|
||
// 进程内 store 仅保留给无 SpacetimeDB 配置场景的最小 fallback,因此这里继续使用稳定微秒值表达时间。
|
||
let now_micros = chrono_like_utc_now_micros();
|
||
let upsert_input = build_asset_object_upsert_input(
|
||
generate_asset_object_id(now_micros),
|
||
resolved_bucket,
|
||
head.object_key,
|
||
input
|
||
.access_policy
|
||
.unwrap_or(AssetObjectAccessPolicy::Private),
|
||
head.content_type
|
||
.or_else(|| normalize_optional_value(input.content_type)),
|
||
head.content_length,
|
||
normalize_optional_value(input.content_hash),
|
||
input.asset_kind,
|
||
input.source_job_id,
|
||
input.owner_user_id,
|
||
input.profile_id,
|
||
input.entity_id,
|
||
now_micros,
|
||
)
|
||
.map_err(ConfirmAssetObjectError::Field)?;
|
||
|
||
let record = self.store.upsert_by_location(AssetObjectUpsertSnapshot {
|
||
asset_object_id: upsert_input.asset_object_id,
|
||
bucket: upsert_input.bucket,
|
||
object_key: upsert_input.object_key,
|
||
access_policy: upsert_input.access_policy,
|
||
content_type: upsert_input.content_type,
|
||
content_length: upsert_input.content_length,
|
||
content_hash: upsert_input.content_hash,
|
||
version: upsert_input.version,
|
||
source_job_id: upsert_input.source_job_id,
|
||
owner_user_id: upsert_input.owner_user_id,
|
||
profile_id: upsert_input.profile_id,
|
||
entity_id: upsert_input.entity_id,
|
||
asset_kind: upsert_input.asset_kind,
|
||
created_at_micros: now_micros,
|
||
updated_at_micros: now_micros,
|
||
})?;
|
||
|
||
Ok(ConfirmAssetObjectResult { record })
|
||
}
|
||
}
|
||
|
||
impl fmt::Display for ConfirmAssetObjectError {
|
||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||
match self {
|
||
Self::BucketMismatch => f.write_str("bucket 与当前服务端 OSS bucket 不一致"),
|
||
Self::ContentLengthMismatch => {
|
||
f.write_str("客户端声明的 contentLength 与 OSS 实际对象大小不一致")
|
||
}
|
||
Self::Field(error) => write!(f, "{error}"),
|
||
Self::Oss(error) => write!(f, "{error}"),
|
||
Self::Store(message) => f.write_str(message),
|
||
}
|
||
}
|
||
}
|
||
|
||
impl Error for ConfirmAssetObjectError {}
|
||
|
||
impl From<AssetObjectFieldError> for ConfirmAssetObjectError {
|
||
fn from(value: AssetObjectFieldError) -> Self {
|
||
Self::Field(value)
|
||
}
|
||
}
|
||
|
||
impl From<OssError> for ConfirmAssetObjectError {
|
||
fn from(value: OssError) -> Self {
|
||
Self::Oss(value)
|
||
}
|
||
}
|
||
|
||
fn chrono_like_utc_now_micros() -> i64 {
|
||
use std::time::{SystemTime, UNIX_EPOCH};
|
||
|
||
let duration = SystemTime::now()
|
||
.duration_since(UNIX_EPOCH)
|
||
.expect("system clock should be after unix epoch");
|
||
i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64")
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn in_memory_store_upsert_keeps_same_primary_id_for_same_bucket_and_object_key() {
|
||
let store = InMemoryAssetObjectStore::default();
|
||
let first = store
|
||
.upsert_by_location(AssetObjectUpsertSnapshot {
|
||
asset_object_id: "assetobj_first".to_string(),
|
||
bucket: "xushi-dev".to_string(),
|
||
object_key: "generated-characters/hero/master.png".to_string(),
|
||
access_policy: AssetObjectAccessPolicy::Private,
|
||
content_type: Some("image/png".to_string()),
|
||
content_length: 100,
|
||
content_hash: None,
|
||
version: INITIAL_ASSET_OBJECT_VERSION,
|
||
source_job_id: None,
|
||
owner_user_id: None,
|
||
profile_id: None,
|
||
entity_id: None,
|
||
asset_kind: "character_visual".to_string(),
|
||
created_at_micros: 1_000_000,
|
||
updated_at_micros: 1_000_000,
|
||
})
|
||
.expect("first upsert should succeed");
|
||
|
||
let second = store
|
||
.upsert_by_location(AssetObjectUpsertSnapshot {
|
||
asset_object_id: "assetobj_second".to_string(),
|
||
bucket: "xushi-dev".to_string(),
|
||
object_key: "generated-characters/hero/master.png".to_string(),
|
||
access_policy: AssetObjectAccessPolicy::Private,
|
||
content_type: Some("image/png".to_string()),
|
||
content_length: 100,
|
||
content_hash: None,
|
||
version: INITIAL_ASSET_OBJECT_VERSION,
|
||
source_job_id: None,
|
||
owner_user_id: None,
|
||
profile_id: None,
|
||
entity_id: None,
|
||
asset_kind: "character_visual".to_string(),
|
||
created_at_micros: 2_000_000,
|
||
updated_at_micros: 2_000_000,
|
||
})
|
||
.expect("second upsert should succeed");
|
||
|
||
assert_eq!(first.asset_object_id, "assetobj_first");
|
||
assert_eq!(second.asset_object_id, "assetobj_first");
|
||
assert_eq!(second.created_at, "1.000000Z");
|
||
assert_eq!(second.updated_at, "2.000000Z");
|
||
}
|
||
}
|