推进 SpacetimeDB adapter 与 client 收口

This commit is contained in:
2026-04-29 16:53:54 +08:00
parent f82775b852
commit 62934b0809
17 changed files with 1023 additions and 597 deletions

View File

@@ -66,7 +66,6 @@ fn upsert_asset_entity_binding(
return Err("asset_entity_binding.asset_object_id 对应的 asset_object 不存在".to_string());
}
let updated_at = Timestamp::from_micros_since_unix_epoch(input.updated_at_micros);
// 首版绑定按 entity_kind + entity_id + slot 幂等定位,后续访问量明确后再改为组合索引扫描。
let current = ctx.db.asset_entity_binding().iter().find(|row| {
row.entity_kind == input.entity_kind
@@ -80,7 +79,7 @@ fn upsert_asset_entity_binding(
.asset_entity_binding()
.binding_id()
.delete(&existing.binding_id);
let row = AssetEntityBinding {
let snapshot = AssetEntityBindingSnapshot {
binding_id: existing.binding_id.clone(),
asset_object_id: input.asset_object_id.clone(),
entity_kind: input.entity_kind.clone(),
@@ -89,27 +88,16 @@ fn upsert_asset_entity_binding(
asset_kind: input.asset_kind.clone(),
owner_user_id: input.owner_user_id.clone(),
profile_id: input.profile_id.clone(),
created_at: existing.created_at,
updated_at,
};
ctx.db.asset_entity_binding().insert(row);
AssetEntityBindingSnapshot {
binding_id: existing.binding_id,
asset_object_id: input.asset_object_id,
entity_kind: input.entity_kind,
entity_id: input.entity_id,
slot: input.slot,
asset_kind: input.asset_kind,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: input.updated_at_micros,
}
};
ctx.db
.asset_entity_binding()
.insert(build_asset_entity_binding_row(&snapshot));
snapshot
}
None => {
let created_at = updated_at;
let row = AssetEntityBinding {
let snapshot = AssetEntityBindingSnapshot {
binding_id: input.binding_id.clone(),
asset_object_id: input.asset_object_id.clone(),
entity_kind: input.entity_kind.clone(),
@@ -118,25 +106,30 @@ fn upsert_asset_entity_binding(
asset_kind: input.asset_kind.clone(),
owner_user_id: input.owner_user_id.clone(),
profile_id: input.profile_id.clone(),
created_at,
updated_at,
};
ctx.db.asset_entity_binding().insert(row);
AssetEntityBindingSnapshot {
binding_id: input.binding_id,
asset_object_id: input.asset_object_id,
entity_kind: input.entity_kind,
entity_id: input.entity_id,
slot: input.slot,
asset_kind: input.asset_kind,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
created_at_micros: input.updated_at_micros,
updated_at_micros: input.updated_at_micros,
}
};
ctx.db
.asset_entity_binding()
.insert(build_asset_entity_binding_row(&snapshot));
snapshot
}
};
Ok(snapshot)
}
fn build_asset_entity_binding_row(snapshot: &AssetEntityBindingSnapshot) -> AssetEntityBinding {
AssetEntityBinding {
binding_id: snapshot.binding_id.clone(),
asset_object_id: snapshot.asset_object_id.clone(),
entity_kind: snapshot.entity_kind.clone(),
entity_id: snapshot.entity_id.clone(),
slot: snapshot.slot.clone(),
asset_kind: snapshot.asset_kind.clone(),
owner_user_id: snapshot.owner_user_id.clone(),
profile_id: snapshot.profile_id.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros),
}
}

View File

@@ -91,7 +91,6 @@ pub(crate) fn upsert_asset_object(
)
.map_err(|error| error.to_string())?;
let updated_at = Timestamp::from_micros_since_unix_epoch(input.updated_at_micros);
// 这里先保持最小可发布实现:查重语义已经冻结,后续再把实现优化回组合索引扫描。
let current = ctx
.db
@@ -105,7 +104,7 @@ pub(crate) fn upsert_asset_object(
.asset_object()
.asset_object_id()
.delete(&existing.asset_object_id);
let row = AssetObject {
let snapshot = AssetObjectUpsertSnapshot {
asset_object_id: existing.asset_object_id.clone(),
bucket: input.bucket.clone(),
object_key: input.object_key.clone(),
@@ -119,32 +118,16 @@ pub(crate) fn upsert_asset_object(
profile_id: input.profile_id.clone(),
entity_id: input.entity_id.clone(),
asset_kind: input.asset_kind.clone(),
created_at: existing.created_at,
updated_at,
};
ctx.db.asset_object().insert(row);
AssetObjectUpsertSnapshot {
asset_object_id: existing.asset_object_id,
bucket: input.bucket,
object_key: input.object_key,
access_policy: input.access_policy,
content_type: input.content_type,
content_length: input.content_length,
content_hash: input.content_hash,
version: input.version,
source_job_id: input.source_job_id,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
entity_id: input.entity_id,
asset_kind: input.asset_kind,
created_at_micros: existing.created_at.to_micros_since_unix_epoch(),
updated_at_micros: input.updated_at_micros,
}
};
ctx.db
.asset_object()
.insert(build_asset_object_row(&snapshot));
snapshot
}
None => {
let created_at = updated_at;
let row = AssetObject {
let snapshot = AssetObjectUpsertSnapshot {
asset_object_id: input.asset_object_id.clone(),
bucket: input.bucket.clone(),
object_key: input.object_key.clone(),
@@ -158,28 +141,13 @@ pub(crate) fn upsert_asset_object(
profile_id: input.profile_id.clone(),
entity_id: input.entity_id.clone(),
asset_kind: input.asset_kind.clone(),
created_at,
updated_at,
};
ctx.db.asset_object().insert(row);
AssetObjectUpsertSnapshot {
asset_object_id: input.asset_object_id,
bucket: input.bucket,
object_key: input.object_key,
access_policy: input.access_policy,
content_type: input.content_type,
content_length: input.content_length,
content_hash: input.content_hash,
version: input.version,
source_job_id: input.source_job_id,
owner_user_id: input.owner_user_id,
profile_id: input.profile_id,
entity_id: input.entity_id,
asset_kind: input.asset_kind,
created_at_micros: input.updated_at_micros,
updated_at_micros: input.updated_at_micros,
}
};
ctx.db
.asset_object()
.insert(build_asset_object_row(&snapshot));
snapshot
}
};
@@ -244,3 +212,23 @@ fn object_key_to_legacy_image_src(object_key: &str) -> String {
}
format!("/{normalized}")
}
fn build_asset_object_row(snapshot: &AssetObjectUpsertSnapshot) -> AssetObject {
AssetObject {
asset_object_id: snapshot.asset_object_id.clone(),
bucket: snapshot.bucket.clone(),
object_key: snapshot.object_key.clone(),
access_policy: snapshot.access_policy,
content_type: snapshot.content_type.clone(),
content_length: snapshot.content_length,
content_hash: snapshot.content_hash.clone(),
version: snapshot.version,
source_job_id: snapshot.source_job_id.clone(),
owner_user_id: snapshot.owner_user_id.clone(),
profile_id: snapshot.profile_id.clone(),
entity_id: snapshot.entity_id.clone(),
asset_kind: snapshot.asset_kind.clone(),
created_at: Timestamp::from_micros_since_unix_epoch(snapshot.created_at_micros),
updated_at: Timestamp::from_micros_since_unix_epoch(snapshot.updated_at_micros),
}
}