重写
This commit is contained in:
254
server-rs/crates/module-assets/src/asset_object_service.rs
Normal file
254
server-rs/crates/module-assets/src/asset_object_service.rs
Normal file
@@ -0,0 +1,254 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
fmt,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use platform_oss::{OssClient, OssError, OssHeadObjectRequest};
|
||||
use reqwest::Client;
|
||||
|
||||
use crate::{
|
||||
AssetObjectAccessPolicy, AssetObjectFieldError, AssetObjectRecord, AssetObjectUpsertSnapshot,
|
||||
ConfirmAssetObjectInput, ConfirmAssetObjectResult, INITIAL_ASSET_OBJECT_VERSION,
|
||||
build_asset_object_record, build_asset_object_upsert_input, generate_asset_object_id,
|
||||
normalize_optional_value, validate_asset_object_fields,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct InMemoryAssetObjectStore {
|
||||
inner: Arc<Mutex<HashMap<(String, String), AssetObjectUpsertSnapshot>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AssetObjectService {
|
||||
store: InMemoryAssetObjectStore,
|
||||
http_client: Client,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum ConfirmAssetObjectError {
|
||||
BucketMismatch,
|
||||
ContentLengthMismatch,
|
||||
Field(AssetObjectFieldError),
|
||||
Oss(OssError),
|
||||
Store(String),
|
||||
}
|
||||
|
||||
impl Default for InMemoryAssetObjectStore {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryAssetObjectStore {
|
||||
fn upsert_by_location(
|
||||
&self,
|
||||
record: AssetObjectUpsertSnapshot,
|
||||
) -> Result<AssetObjectRecord, ConfirmAssetObjectError> {
|
||||
let mut state = self
|
||||
.inner
|
||||
.lock()
|
||||
.map_err(|_| ConfirmAssetObjectError::Store("资产对象仓储锁已中毒".to_string()))?;
|
||||
|
||||
let key = (record.bucket.clone(), record.object_key.clone());
|
||||
let next_record = match state.get(&key) {
|
||||
Some(existing) => AssetObjectUpsertSnapshot {
|
||||
asset_object_id: existing.asset_object_id.clone(),
|
||||
created_at_micros: existing.created_at_micros,
|
||||
..record
|
||||
},
|
||||
None => record,
|
||||
};
|
||||
state.insert(key, next_record.clone());
|
||||
|
||||
Ok(build_asset_object_record(next_record))
|
||||
}
|
||||
}
|
||||
|
||||
impl AssetObjectService {
|
||||
pub fn new(store: InMemoryAssetObjectStore) -> Self {
|
||||
Self {
|
||||
store,
|
||||
http_client: Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn confirm_object(
|
||||
&self,
|
||||
oss_client: &OssClient,
|
||||
input: ConfirmAssetObjectInput,
|
||||
) -> Result<ConfirmAssetObjectResult, ConfirmAssetObjectError> {
|
||||
let configured_bucket = oss_client.config_bucket().to_string();
|
||||
let resolved_bucket = input
|
||||
.bucket
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.unwrap_or(configured_bucket.as_str())
|
||||
.to_string();
|
||||
|
||||
if resolved_bucket != configured_bucket {
|
||||
return Err(ConfirmAssetObjectError::BucketMismatch);
|
||||
}
|
||||
|
||||
validate_asset_object_fields(
|
||||
&resolved_bucket,
|
||||
&input.object_key,
|
||||
&input.asset_kind,
|
||||
INITIAL_ASSET_OBJECT_VERSION,
|
||||
)
|
||||
.map_err(ConfirmAssetObjectError::Field)?;
|
||||
|
||||
let head = oss_client
|
||||
.head_object(
|
||||
&self.http_client,
|
||||
OssHeadObjectRequest {
|
||||
object_key: input.object_key.clone(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(ConfirmAssetObjectError::Oss)?;
|
||||
|
||||
if let Some(expected_length) = input.content_length
|
||||
&& expected_length != head.content_length
|
||||
{
|
||||
return Err(ConfirmAssetObjectError::ContentLengthMismatch);
|
||||
}
|
||||
|
||||
// 进程内 store 仅保留给无 SpacetimeDB 配置场景的最小 fallback,因此这里继续使用稳定微秒值表达时间。
|
||||
let now_micros = chrono_like_utc_now_micros();
|
||||
let upsert_input = build_asset_object_upsert_input(
|
||||
generate_asset_object_id(now_micros),
|
||||
resolved_bucket,
|
||||
head.object_key,
|
||||
input
|
||||
.access_policy
|
||||
.unwrap_or(AssetObjectAccessPolicy::Private),
|
||||
head.content_type
|
||||
.or_else(|| normalize_optional_value(input.content_type)),
|
||||
head.content_length,
|
||||
normalize_optional_value(input.content_hash),
|
||||
input.asset_kind,
|
||||
input.source_job_id,
|
||||
input.owner_user_id,
|
||||
input.profile_id,
|
||||
input.entity_id,
|
||||
now_micros,
|
||||
)
|
||||
.map_err(ConfirmAssetObjectError::Field)?;
|
||||
|
||||
let record = self.store.upsert_by_location(AssetObjectUpsertSnapshot {
|
||||
asset_object_id: upsert_input.asset_object_id,
|
||||
bucket: upsert_input.bucket,
|
||||
object_key: upsert_input.object_key,
|
||||
access_policy: upsert_input.access_policy,
|
||||
content_type: upsert_input.content_type,
|
||||
content_length: upsert_input.content_length,
|
||||
content_hash: upsert_input.content_hash,
|
||||
version: upsert_input.version,
|
||||
source_job_id: upsert_input.source_job_id,
|
||||
owner_user_id: upsert_input.owner_user_id,
|
||||
profile_id: upsert_input.profile_id,
|
||||
entity_id: upsert_input.entity_id,
|
||||
asset_kind: upsert_input.asset_kind,
|
||||
created_at_micros: now_micros,
|
||||
updated_at_micros: now_micros,
|
||||
})?;
|
||||
|
||||
Ok(ConfirmAssetObjectResult { record })
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ConfirmAssetObjectError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::BucketMismatch => f.write_str("bucket 与当前服务端 OSS bucket 不一致"),
|
||||
Self::ContentLengthMismatch => {
|
||||
f.write_str("客户端声明的 contentLength 与 OSS 实际对象大小不一致")
|
||||
}
|
||||
Self::Field(error) => write!(f, "{error}"),
|
||||
Self::Oss(error) => write!(f, "{error}"),
|
||||
Self::Store(message) => f.write_str(message),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for ConfirmAssetObjectError {}
|
||||
|
||||
impl From<AssetObjectFieldError> for ConfirmAssetObjectError {
|
||||
fn from(value: AssetObjectFieldError) -> Self {
|
||||
Self::Field(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OssError> for ConfirmAssetObjectError {
|
||||
fn from(value: OssError) -> Self {
|
||||
Self::Oss(value)
|
||||
}
|
||||
}
|
||||
|
||||
fn chrono_like_utc_now_micros() -> i64 {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
let duration = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("system clock should be after unix epoch");
|
||||
i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn in_memory_store_upsert_keeps_same_primary_id_for_same_bucket_and_object_key() {
|
||||
let store = InMemoryAssetObjectStore::default();
|
||||
let first = store
|
||||
.upsert_by_location(AssetObjectUpsertSnapshot {
|
||||
asset_object_id: "assetobj_first".to_string(),
|
||||
bucket: "xushi-dev".to_string(),
|
||||
object_key: "generated-characters/hero/master.png".to_string(),
|
||||
access_policy: AssetObjectAccessPolicy::Private,
|
||||
content_type: Some("image/png".to_string()),
|
||||
content_length: 100,
|
||||
content_hash: None,
|
||||
version: INITIAL_ASSET_OBJECT_VERSION,
|
||||
source_job_id: None,
|
||||
owner_user_id: None,
|
||||
profile_id: None,
|
||||
entity_id: None,
|
||||
asset_kind: "character_visual".to_string(),
|
||||
created_at_micros: 1_000_000,
|
||||
updated_at_micros: 1_000_000,
|
||||
})
|
||||
.expect("first upsert should succeed");
|
||||
|
||||
let second = store
|
||||
.upsert_by_location(AssetObjectUpsertSnapshot {
|
||||
asset_object_id: "assetobj_second".to_string(),
|
||||
bucket: "xushi-dev".to_string(),
|
||||
object_key: "generated-characters/hero/master.png".to_string(),
|
||||
access_policy: AssetObjectAccessPolicy::Private,
|
||||
content_type: Some("image/png".to_string()),
|
||||
content_length: 100,
|
||||
content_hash: None,
|
||||
version: INITIAL_ASSET_OBJECT_VERSION,
|
||||
source_job_id: None,
|
||||
owner_user_id: None,
|
||||
profile_id: None,
|
||||
entity_id: None,
|
||||
asset_kind: "character_visual".to_string(),
|
||||
created_at_micros: 2_000_000,
|
||||
updated_at_micros: 2_000_000,
|
||||
})
|
||||
.expect("second upsert should succeed");
|
||||
|
||||
assert_eq!(first.asset_object_id, "assetobj_first");
|
||||
assert_eq!(second.asset_object_id, "assetobj_first");
|
||||
assert_eq!(second.created_at, "1.000000Z");
|
||||
assert_eq!(second.updated_at, "2.000000Z");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user