Files
Genarrative/server-rs/crates/module-assets/src/asset_object_service.rs
kdletters cbc27bad4a
Some checks failed
CI / verify (push) Has been cancelled
init with react+axum+spacetimedb
2026-04-26 18:06:23 +08:00

255 lines
8.6 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::{
collections::HashMap,
error::Error,
fmt,
sync::{Arc, Mutex},
};
use platform_oss::{OssClient, OssError, OssHeadObjectRequest};
use reqwest::Client;
use crate::{
AssetObjectAccessPolicy, AssetObjectFieldError, AssetObjectRecord, AssetObjectUpsertSnapshot,
ConfirmAssetObjectInput, ConfirmAssetObjectResult, INITIAL_ASSET_OBJECT_VERSION,
build_asset_object_record, build_asset_object_upsert_input, generate_asset_object_id,
normalize_optional_value, validate_asset_object_fields,
};
#[derive(Clone, Debug)]
pub struct InMemoryAssetObjectStore {
inner: Arc<Mutex<HashMap<(String, String), AssetObjectUpsertSnapshot>>>,
}
#[derive(Clone, Debug)]
pub struct AssetObjectService {
store: InMemoryAssetObjectStore,
http_client: Client,
}
#[derive(Debug, PartialEq, Eq)]
pub enum ConfirmAssetObjectError {
BucketMismatch,
ContentLengthMismatch,
Field(AssetObjectFieldError),
Oss(OssError),
Store(String),
}
impl Default for InMemoryAssetObjectStore {
fn default() -> Self {
Self {
inner: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl InMemoryAssetObjectStore {
fn upsert_by_location(
&self,
record: AssetObjectUpsertSnapshot,
) -> Result<AssetObjectRecord, ConfirmAssetObjectError> {
let mut state = self
.inner
.lock()
.map_err(|_| ConfirmAssetObjectError::Store("资产对象仓储锁已中毒".to_string()))?;
let key = (record.bucket.clone(), record.object_key.clone());
let next_record = match state.get(&key) {
Some(existing) => AssetObjectUpsertSnapshot {
asset_object_id: existing.asset_object_id.clone(),
created_at_micros: existing.created_at_micros,
..record
},
None => record,
};
state.insert(key, next_record.clone());
Ok(build_asset_object_record(next_record))
}
}
impl AssetObjectService {
pub fn new(store: InMemoryAssetObjectStore) -> Self {
Self {
store,
http_client: Client::new(),
}
}
pub async fn confirm_object(
&self,
oss_client: &OssClient,
input: ConfirmAssetObjectInput,
) -> Result<ConfirmAssetObjectResult, ConfirmAssetObjectError> {
let configured_bucket = oss_client.config_bucket().to_string();
let resolved_bucket = input
.bucket
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(configured_bucket.as_str())
.to_string();
if resolved_bucket != configured_bucket {
return Err(ConfirmAssetObjectError::BucketMismatch);
}
validate_asset_object_fields(
&resolved_bucket,
&input.object_key,
&input.asset_kind,
INITIAL_ASSET_OBJECT_VERSION,
)
.map_err(ConfirmAssetObjectError::Field)?;
let head = oss_client
.head_object(
&self.http_client,
OssHeadObjectRequest {
object_key: input.object_key.clone(),
},
)
.await
.map_err(ConfirmAssetObjectError::Oss)?;
if let Some(expected_length) = input.content_length
&& expected_length != head.content_length
{
return Err(ConfirmAssetObjectError::ContentLengthMismatch);
}
// 进程内 store 仅保留给无 SpacetimeDB 配置场景的最小 fallback因此这里继续使用稳定微秒值表达时间。
let now_micros = chrono_like_utc_now_micros();
let upsert_input = build_asset_object_upsert_input(
generate_asset_object_id(now_micros),
resolved_bucket,
head.object_key,
input
.access_policy
.unwrap_or(AssetObjectAccessPolicy::Private),
head.content_type
.or_else(|| normalize_optional_value(input.content_type)),
head.content_length,
normalize_optional_value(input.content_hash),
input.asset_kind,
input.source_job_id,
input.owner_user_id,
input.profile_id,
input.entity_id,
now_micros,
)
.map_err(ConfirmAssetObjectError::Field)?;
let record = self.store.upsert_by_location(AssetObjectUpsertSnapshot {
asset_object_id: upsert_input.asset_object_id,
bucket: upsert_input.bucket,
object_key: upsert_input.object_key,
access_policy: upsert_input.access_policy,
content_type: upsert_input.content_type,
content_length: upsert_input.content_length,
content_hash: upsert_input.content_hash,
version: upsert_input.version,
source_job_id: upsert_input.source_job_id,
owner_user_id: upsert_input.owner_user_id,
profile_id: upsert_input.profile_id,
entity_id: upsert_input.entity_id,
asset_kind: upsert_input.asset_kind,
created_at_micros: now_micros,
updated_at_micros: now_micros,
})?;
Ok(ConfirmAssetObjectResult { record })
}
}
impl fmt::Display for ConfirmAssetObjectError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BucketMismatch => f.write_str("bucket 与当前服务端 OSS bucket 不一致"),
Self::ContentLengthMismatch => {
f.write_str("客户端声明的 contentLength 与 OSS 实际对象大小不一致")
}
Self::Field(error) => write!(f, "{error}"),
Self::Oss(error) => write!(f, "{error}"),
Self::Store(message) => f.write_str(message),
}
}
}
impl Error for ConfirmAssetObjectError {}
impl From<AssetObjectFieldError> for ConfirmAssetObjectError {
fn from(value: AssetObjectFieldError) -> Self {
Self::Field(value)
}
}
impl From<OssError> for ConfirmAssetObjectError {
fn from(value: OssError) -> Self {
Self::Oss(value)
}
}
fn chrono_like_utc_now_micros() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock should be after unix epoch");
i64::try_from(duration.as_micros()).expect("current unix micros should fit in i64")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn in_memory_store_upsert_keeps_same_primary_id_for_same_bucket_and_object_key() {
let store = InMemoryAssetObjectStore::default();
let first = store
.upsert_by_location(AssetObjectUpsertSnapshot {
asset_object_id: "assetobj_first".to_string(),
bucket: "xushi-dev".to_string(),
object_key: "generated-characters/hero/master.png".to_string(),
access_policy: AssetObjectAccessPolicy::Private,
content_type: Some("image/png".to_string()),
content_length: 100,
content_hash: None,
version: INITIAL_ASSET_OBJECT_VERSION,
source_job_id: None,
owner_user_id: None,
profile_id: None,
entity_id: None,
asset_kind: "character_visual".to_string(),
created_at_micros: 1_000_000,
updated_at_micros: 1_000_000,
})
.expect("first upsert should succeed");
let second = store
.upsert_by_location(AssetObjectUpsertSnapshot {
asset_object_id: "assetobj_second".to_string(),
bucket: "xushi-dev".to_string(),
object_key: "generated-characters/hero/master.png".to_string(),
access_policy: AssetObjectAccessPolicy::Private,
content_type: Some("image/png".to_string()),
content_length: 100,
content_hash: None,
version: INITIAL_ASSET_OBJECT_VERSION,
source_job_id: None,
owner_user_id: None,
profile_id: None,
entity_id: None,
asset_kind: "character_visual".to_string(),
created_at_micros: 2_000_000,
updated_at_micros: 2_000_000,
})
.expect("second upsert should succeed");
assert_eq!(first.asset_object_id, "assetobj_first");
assert_eq!(second.asset_object_id, "assetobj_first");
assert_eq!(second.created_at, "1.000000Z");
assert_eq!(second.updated_at, "2.000000Z");
}
}