refactor: modularize api server assets and handlers

This commit is contained in:
2026-05-14 22:54:52 +08:00
parent 4ba1ebbbdf
commit 1b54db4f92
47 changed files with 8081 additions and 6142 deletions

View File

@@ -17,8 +17,7 @@ use module_assets::{
};
use platform_llm::{LlmMessage, LlmTextRequest};
use platform_oss::{
LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess, OssPutObjectRequest,
OssSignedGetObjectUrlRequest,
LegacyAssetPrefix, OssHeadObjectRequest, OssObjectAccess, OssSignedGetObjectUrlRequest,
};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value, json};
@@ -26,6 +25,11 @@ use spacetime_client::SpacetimeClientError;
use tokio::time::sleep;
use webp::Encoder as WebpEncoder;
use crate::generated_image_assets::{
GeneratedImageAssetAdapter, GeneratedImageAssetDataUrl,
adapter::GeneratedImageAssetAdapterMetadata, adapter::GeneratedImageAssetPersistInput,
normalize_generated_image_asset_mime,
};
use crate::{
api_response::json_success_body,
asset_billing::{execute_billable_asset_operation, execute_billable_asset_operation_with_cost},
@@ -1084,482 +1088,15 @@ pub async fn generate_custom_world_opening_cg(
))
}
async fn persist_custom_world_asset(
state: &AppState,
owner_user_id: &str,
upload: PreparedAssetUpload,
mut response: GeneratedAssetResponse,
) -> Result<GeneratedAssetResponse, AppError> {
let oss_client = state.oss_client().ok_or_else(|| {
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_details(json!({
"provider": "aliyun-oss",
"reason": "OSS 未完成环境变量配置",
}))
})?;
let http_client = reqwest::Client::new();
let put_result = oss_client
.put_object(
&http_client,
OssPutObjectRequest {
prefix: upload.prefix,
path_segments: upload.path_segments,
file_name: upload.file_name,
content_type: Some(upload.content_type.clone()),
access: OssObjectAccess::Private,
metadata: build_asset_metadata(
upload.asset_kind,
owner_user_id,
upload.profile_id.as_deref(),
upload.entity_kind,
upload.entity_id.as_str(),
upload.slot,
),
body: upload.body,
},
)
.await
.map_err(map_custom_world_asset_oss_error)?;
// custom world 图片链正式改为 OSS 真值确认,不再把 put_object 返回值直接当成唯一对象真相。
let head = oss_client
.head_object(
&http_client,
OssHeadObjectRequest {
object_key: put_result.object_key.clone(),
},
)
.await
.map_err(map_custom_world_asset_oss_error)?;
let now_micros = current_utc_micros();
let asset_object = state
.spacetime_client()
.confirm_asset_object(
build_asset_object_upsert_input(
generate_asset_object_id(now_micros),
head.bucket,
head.object_key,
AssetObjectAccessPolicy::Private,
head.content_type.or(Some(upload.content_type)),
head.content_length,
head.etag,
upload.asset_kind.to_string(),
upload.source_job_id,
Some(owner_user_id.to_string()),
upload.profile_id.clone(),
Some(upload.entity_id.clone()),
now_micros,
)
.map_err(map_asset_object_prepare_error)?,
)
.await
.map_err(map_custom_world_asset_spacetime_error)?;
state
.spacetime_client()
.bind_asset_object_to_entity(
build_asset_entity_binding_input(
generate_asset_binding_id(now_micros),
asset_object.asset_object_id,
upload.entity_kind.to_string(),
upload.entity_id,
upload.slot.to_string(),
upload.asset_kind.to_string(),
Some(owner_user_id.to_string()),
upload.profile_id,
now_micros,
)
.map_err(map_asset_binding_prepare_error)?,
)
.await
.map_err(map_custom_world_asset_spacetime_error)?;
response.image_src = put_result.legacy_public_path;
Ok(response)
}
mod assets;
async fn generate_opening_cg_storyboard(
state: &AppState,
owner_user_id: &str,
http_client: &reqwest::Client,
settings: &crate::openai_image_generation::OpenAiImageSettings,
normalized: &NormalizedOpeningCgRequest,
reference_images: &[String],
) -> Result<GeneratedOpeningCgStoryboard, AppError> {
let generated = create_openai_image_generation(
http_client,
settings,
normalized.storyboard_prompt.as_str(),
None,
OPENING_CG_STORYBOARD_IMAGE_SIZE,
1,
reference_images,
"开局 CG 故事板生成失败",
)
.await?;
let downloaded = generated
.images
.into_iter()
.next()
.map(downloaded_openai_to_custom_world_image)
.ok_or_else(|| {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "vector-engine",
"message": "开局 CG 故事板生成成功但未返回图片。",
}))
})?;
let asset_id = format!("opening-cg-storyboard-{}", current_utc_millis());
let upload = PreparedAssetUpload {
prefix: LegacyAssetPrefix::CustomWorldScenes,
path_segments: vec![
sanitize_storage_segment(
normalized
.profile_id
.as_deref()
.unwrap_or(normalized.world_name.as_str()),
"world",
),
"opening-cg".to_string(),
sanitize_storage_segment(normalized.opening_cg_id.as_str(), "opening-cg"),
],
file_name: format!("storyboard.{}", downloaded.extension),
content_type: downloaded.mime_type,
body: downloaded.bytes,
asset_kind: OPENING_CG_STORYBOARD_ASSET_KIND,
entity_kind: OPENING_CG_ENTITY_KIND,
entity_id: normalized
.profile_id
.clone()
.unwrap_or_else(|| normalized.world_name.clone()),
profile_id: normalized.profile_id.clone(),
slot: OPENING_CG_STORYBOARD_SLOT,
source_job_id: Some(generated.task_id.clone()),
};
let asset = persist_custom_world_asset(
state,
owner_user_id,
upload,
GeneratedAssetResponse {
image_src: String::new(),
asset_id: asset_id.clone(),
source_type: "generated".to_string(),
model: Some(GPT_IMAGE_2_MODEL.to_string()),
size: Some(OPENING_CG_STORYBOARD_IMAGE_SIZE.to_string()),
task_id: Some(generated.task_id.clone()),
prompt: Some(normalized.storyboard_prompt.clone()),
actual_prompt: generated.actual_prompt,
},
)
.await?;
use assets::persist_custom_world_asset;
Ok(GeneratedOpeningCgStoryboard {
image_src: asset.image_src,
asset_id,
})
}
mod opening_cg;
async fn generate_opening_cg_video(
state: &AppState,
owner_user_id: &str,
http_client: &reqwest::Client,
settings: &ArkVideoSettings,
normalized: &NormalizedOpeningCgRequest,
storyboard_reference_data_url: &str,
) -> Result<GeneratedOpeningCgVideo, AppError> {
let upstream_task_id = create_ark_storyboard_to_video_task(
http_client,
settings,
normalized.video_prompt.as_str(),
storyboard_reference_data_url,
)
.await?;
let video_url =
wait_for_ark_content_generation_task(http_client, settings, upstream_task_id.as_str())
.await?;
let downloaded =
download_generated_video(http_client, video_url.as_str(), "下载开局 CG 视频失败").await?;
let asset_id = format!("opening-cg-video-{}", current_utc_millis());
let video_src = persist_opening_cg_video_asset(
state,
owner_user_id,
normalized,
asset_id.as_str(),
Some(upstream_task_id.clone()),
downloaded,
)
.await?;
Ok(GeneratedOpeningCgVideo {
video_src,
asset_id,
})
}
async fn persist_opening_cg_video_asset(
state: &AppState,
owner_user_id: &str,
normalized: &NormalizedOpeningCgRequest,
asset_id: &str,
source_job_id: Option<String>,
video: DownloadedRemoteVideo,
) -> Result<String, AppError> {
let upload = PreparedAssetUpload {
prefix: LegacyAssetPrefix::CustomWorldScenes,
path_segments: vec![
sanitize_storage_segment(
normalized
.profile_id
.as_deref()
.unwrap_or(normalized.world_name.as_str()),
"world",
),
"opening-cg".to_string(),
sanitize_storage_segment(normalized.opening_cg_id.as_str(), "opening-cg"),
],
file_name: format!("opening.{}", video.extension),
content_type: video.mime_type,
body: video.bytes,
asset_kind: OPENING_CG_VIDEO_ASSET_KIND,
entity_kind: OPENING_CG_ENTITY_KIND,
entity_id: normalized
.profile_id
.clone()
.unwrap_or_else(|| normalized.world_name.clone()),
profile_id: normalized.profile_id.clone(),
slot: OPENING_CG_VIDEO_SLOT,
source_job_id,
};
let asset = persist_custom_world_asset(
state,
owner_user_id,
upload,
GeneratedAssetResponse {
image_src: String::new(),
asset_id: asset_id.to_string(),
source_type: "generated".to_string(),
model: Some("ark-seedance".to_string()),
size: Some(format!(
"{}:{}:{}s",
OPENING_CG_VIDEO_RESOLUTION,
OPENING_CG_VIDEO_RATIO,
OPENING_CG_VIDEO_DURATION_SECONDS
)),
task_id: None,
prompt: Some(normalized.video_prompt.clone()),
actual_prompt: None,
},
)
.await?;
Ok(asset.image_src)
}
async fn create_ark_storyboard_to_video_task(
http_client: &reqwest::Client,
settings: &ArkVideoSettings,
prompt: &str,
storyboard_reference_data_url: &str,
) -> Result<String, AppError> {
let response = http_client
.post(format!("{}/contents/generations/tasks", settings.base_url))
.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {}", settings.api_key),
)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.json(&json!({
"model": settings.model,
"content": [
{
"type": "text",
"text": prompt,
},
{
"type": "image_url",
"image_url": {
"url": storyboard_reference_data_url,
},
"role": "reference_image",
}
],
"resolution": OPENING_CG_VIDEO_RESOLUTION,
"ratio": OPENING_CG_VIDEO_RATIO,
"duration": OPENING_CG_VIDEO_DURATION_SECONDS,
"watermark": false,
"audio": true,
"generate_audio": true,
"web_search": true,
"enable_web_search": true,
}))
.send()
.await
.map_err(|error| {
map_ark_video_request_error(format!("请求 Seedance 视频服务失败:{error}"))
})?;
let status = response.status();
let text = response.text().await.map_err(|error| {
map_ark_video_request_error(format!("读取 Seedance 视频任务响应失败:{error}"))
})?;
if !status.is_success() {
return Err(parse_ark_video_upstream_error(
text.as_str(),
"创建开局 CG 视频任务失败。",
));
}
let payload = parse_ark_video_json_payload(text.as_str(), "创建开局 CG 视频任务失败。")?;
extract_ark_task_id(&payload.payload).ok_or_else(|| {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "ark",
"message": "开局 CG 视频任务未返回任务 id。",
}))
})
}
async fn wait_for_ark_content_generation_task(
http_client: &reqwest::Client,
settings: &ArkVideoSettings,
task_id: &str,
) -> Result<String, AppError> {
let deadline = Instant::now() + Duration::from_millis(settings.request_timeout_ms);
while Instant::now() < deadline {
let response = http_client
.get(format!(
"{}/contents/generations/tasks/{}",
settings.base_url, task_id
))
.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {}", settings.api_key),
)
.send()
.await
.map_err(|error| {
map_ark_video_request_error(format!("查询 Seedance 视频任务失败:{error}"))
})?;
let status = response.status();
let text = response.text().await.map_err(|error| {
map_ark_video_request_error(format!("读取 Seedance 视频任务响应失败:{error}"))
})?;
if !status.is_success() {
return Err(parse_ark_video_upstream_error(
text.as_str(),
"查询开局 CG 视频任务失败。",
));
}
let payload = parse_ark_video_json_payload(text.as_str(), "查询开局 CG 视频任务失败。")?;
if let Some(video_url) = extract_video_url(&payload.payload) {
return Ok(video_url);
}
let normalized_status = normalize_generation_task_status(
extract_generation_task_status(&payload.payload).as_str(),
);
if is_completed_generation_task_status(normalized_status.as_str()) {
return Err(
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "ark",
"message": "开局 CG 视频任务完成但没有返回 video_url。",
"taskId": task_id,
})),
);
}
if is_failed_generation_task_status(normalized_status.as_str()) {
return Err(parse_ark_video_upstream_error(
text.as_str(),
"开局 CG 视频任务执行失败。",
));
}
sleep(Duration::from_millis(ARK_VIDEO_TASK_POLL_INTERVAL_MS)).await;
}
Err(
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "ark",
"message": "开局 CG 视频生成超时,请稍后重试。",
"taskId": task_id,
})),
)
}
async fn download_generated_video(
http_client: &reqwest::Client,
video_url: &str,
fallback_message: &str,
) -> Result<DownloadedRemoteVideo, AppError> {
let response = http_client
.get(video_url)
.send()
.await
.map_err(|error| map_ark_video_request_error(format!("{fallback_message}{error}")))?;
let status = response.status();
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or("video/mp4")
.to_string();
let body = response
.bytes()
.await
.map_err(|error| map_ark_video_request_error(format!("{fallback_message}{error}")))?;
if !status.is_success() {
return Err(
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "ark",
"message": fallback_message,
"status": status.as_u16(),
})),
);
}
let normalized_mime_type = normalize_downloaded_video_mime_type(content_type.as_str());
Ok(DownloadedRemoteVideo {
extension: video_mime_to_extension(normalized_mime_type.as_str()).to_string(),
mime_type: normalized_mime_type,
bytes: body.to_vec(),
})
}
fn build_asset_metadata(
asset_kind: &str,
owner_user_id: &str,
profile_id: Option<&str>,
entity_kind: &str,
entity_id: &str,
slot: &str,
) -> BTreeMap<String, String> {
let mut metadata = BTreeMap::from([
("asset_kind".to_string(), asset_kind.to_string()),
("owner_user_id".to_string(), owner_user_id.to_string()),
("entity_kind".to_string(), entity_kind.to_string()),
("entity_id".to_string(), entity_id.to_string()),
("slot".to_string(), slot.to_string()),
]);
if let Some(profile_id) = profile_id {
metadata.insert("profile_id".to_string(), profile_id.to_string());
}
metadata
}
fn map_asset_object_prepare_error(error: AssetObjectFieldError) -> AppError {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "asset-object",
"message": error.to_string(),
}))
}
fn map_asset_binding_prepare_error(error: AssetObjectFieldError) -> AppError {
AppError::from_status(StatusCode::BAD_REQUEST).with_details(json!({
"provider": "asset-entity-binding",
"message": error.to_string(),
}))
}
fn map_custom_world_asset_spacetime_error(error: SpacetimeClientError) -> AppError {
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(json!({
"provider": "spacetimedb",
"message": error.to_string(),
}))
}
fn map_custom_world_asset_oss_error(error: platform_oss::OssError) -> AppError {
map_oss_error(error, "aliyun-oss")
}
use opening_cg::{
generate_opening_cg_storyboard, generate_opening_cg_video, map_custom_world_asset_oss_error,
};
async fn generate_entity_with_fallback(state: &AppState, profile: &Value, kind: &str) -> Value {
let fallback = build_entity_fallback(profile, kind);