合并 master 并保留拼消消体验路径

合入 master 的平台 UI 与后端更新。

解决拼消消创作页、结果页和运行态冲突。

保留拼消消发布前检查、背景图命名、拖拽边界与补牌动画修复。
This commit is contained in:
2026-06-12 16:38:23 +08:00
396 changed files with 43653 additions and 13291 deletions

View File

@@ -4,7 +4,11 @@ use axum::http::StatusCode;
use serde_json::json;
use spacetime_client::SpacetimeClientError;
use crate::{http_error::AppError, state::AppState};
use crate::{
http_error::AppError,
state::AppState,
wallet_refund_outbox::{WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxRecord},
};
pub(crate) const ASSET_OPERATION_POINTS_COST: u64 = 1;
@@ -90,22 +94,11 @@ async fn consume_asset_operation_points(
.await
{
Ok(_) => Ok(true),
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
// 中文注释:外部生图不应被 Maincloud 钱包短暂 503 阻断;此时跳过扣费,让业务链路继续,避免用户重复点击。
tracing::warn!(
owner_user_id,
asset_kind,
asset_id,
error = %error,
"资产操作泥点预扣因 SpacetimeDB 连接不可用而降级跳过"
);
Ok(false)
}
Err(error) => Err(map_asset_operation_wallet_error(error)),
}
}
/// 外部生成或发布 mutation 失败后补偿退款;退款失败只记日志,避免覆盖原始业务错误。
/// 外部生成或发布 mutation 失败后补偿退款;立即退款失败会进入 outbox,避免覆盖原始业务错误。
async fn refund_asset_operation_points(
state: &AppState,
owner_user_id: &str,
@@ -117,22 +110,74 @@ async fn refund_asset_operation_points(
"asset_operation_refund:{}:{}:{}",
owner_user_id, asset_kind, asset_id
);
let created_at_micros = current_utc_micros();
if let Err(error) = state
.spacetime_client()
.refund_profile_wallet_points(
owner_user_id.to_string(),
points_cost,
ledger_id,
current_utc_micros(),
ledger_id.clone(),
created_at_micros,
)
.await
{
let refund_error = error.to_string();
if let Some(outbox) = state.wallet_refund_outbox() {
match outbox
.enqueue(WalletRefundOutboxRecord {
owner_user_id: owner_user_id.to_string(),
amount: points_cost,
ledger_id: ledger_id.clone(),
created_at_micros,
asset_kind: asset_kind.to_string(),
asset_id: asset_id.to_string(),
})
.await
{
Ok(WalletRefundOutboxEnqueueOutcome::Enqueued) => {
tracing::warn!(
owner_user_id,
asset_kind,
asset_id,
ledger_id,
error = %refund_error,
"资产操作失败后的泥点退款立即执行失败,已写入 wallet refund outbox"
);
return;
}
Ok(WalletRefundOutboxEnqueueOutcome::Dropped { reason }) => {
tracing::error!(
owner_user_id,
asset_kind,
asset_id,
ledger_id,
reason,
error = %refund_error,
"资产操作失败后的泥点退款立即执行失败,且 wallet refund outbox 因容量限制丢弃"
);
return;
}
Err(outbox_error) => {
tracing::error!(
owner_user_id,
asset_kind,
asset_id,
ledger_id,
refund_error = %refund_error,
outbox_error = %outbox_error,
"资产操作失败后的泥点退款立即执行失败,且写入 wallet refund outbox 失败"
);
return;
}
}
}
tracing::error!(
owner_user_id,
asset_kind,
asset_id,
error = %error,
"资产操作失败后的泥点退款失败"
ledger_id,
error = %refund_error,
"资产操作失败后的泥点退款失败,且 wallet refund outbox 未启用"
);
}
}
@@ -185,7 +230,7 @@ mod tests {
use super::*;
#[test]
fn asset_operation_billing_skips_spacetime_connectivity_errors() {
fn asset_operation_connectivity_errors_are_classified_for_non_billing_fallbacks() {
assert_eq!(ASSET_OPERATION_POINTS_COST, 1);
assert!(should_skip_asset_operation_billing_for_connectivity(
&SpacetimeClientError::ConnectDropped

View File

@@ -306,11 +306,12 @@ pub async fn generate_bark_battle_image_asset(
.filter(|value| !value.is_empty())
.map(ToString::to_string);
let points_cost = resolve_bark_battle_image_asset_points_cost(&state, &payload).await;
let billing_asset_id = request_context.request_id().to_string();
let result = execute_billable_asset_operation_with_cost(
&state,
&owner_user_id,
bark_battle_slot_asset_kind(&slot),
asset_id.as_str(),
billing_asset_id.as_str(),
points_cost,
async {
generate_and_persist_bark_battle_image_asset(

View File

@@ -722,7 +722,7 @@ pub async fn execute_big_fish_action(
"big_fish_publish_game" => Some("big_fish_publish_game"),
_ => None,
};
let billing_asset_id = format!("{session_id}:{now}");
let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id());
let session_operation = async {
match action.as_str() {
"big_fish_compile_draft" => {

View File

@@ -32,6 +32,11 @@ pub struct AppConfig {
pub tracking_outbox_batch_size: usize,
pub tracking_outbox_flush_interval: Duration,
pub tracking_outbox_max_bytes: u64,
pub wallet_refund_outbox_enabled: bool,
pub wallet_refund_outbox_dir: PathBuf,
pub wallet_refund_outbox_batch_size: usize,
pub wallet_refund_outbox_flush_interval: Duration,
pub wallet_refund_outbox_max_bytes: u64,
pub log_filter: String,
pub otel_enabled: bool,
pub admin_username: Option<String>,
@@ -183,6 +188,11 @@ impl Default for AppConfig {
tracking_outbox_batch_size: 500,
tracking_outbox_flush_interval: Duration::from_millis(1_000),
tracking_outbox_max_bytes: 256 * 1024 * 1024,
wallet_refund_outbox_enabled: true,
wallet_refund_outbox_dir: PathBuf::from("server-rs/.data/wallet-refund-outbox"),
wallet_refund_outbox_batch_size: 100,
wallet_refund_outbox_flush_interval: Duration::from_millis(1_000),
wallet_refund_outbox_max_bytes: 64 * 1024 * 1024,
log_filter: "info,tower_http=info".to_string(),
otel_enabled: false,
admin_username: None,
@@ -409,6 +419,27 @@ impl AppConfig {
{
config.tracking_outbox_max_bytes = max_bytes;
}
if let Some(enabled) = read_first_bool_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED"]) {
config.wallet_refund_outbox_enabled = enabled;
}
if let Some(dir) = read_first_non_empty_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_DIR"]) {
config.wallet_refund_outbox_dir = PathBuf::from(dir);
}
if let Some(batch_size) =
read_first_usize_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE"])
{
config.wallet_refund_outbox_batch_size = batch_size;
}
if let Some(flush_interval_ms) =
read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS"])
{
config.wallet_refund_outbox_flush_interval = Duration::from_millis(flush_interval_ms);
}
if let Some(max_bytes) =
read_first_positive_u64_env(&["GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES"])
{
config.wallet_refund_outbox_max_bytes = max_bytes;
}
if let Some(otel_enabled) = read_first_bool_env(&["GENARRATIVE_OTEL_ENABLED"]) {
config.otel_enabled = otel_enabled;
}
@@ -1380,6 +1411,11 @@ mod tests {
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
std::env::set_var("GENARRATIVE_API_LISTEN_BACKLOG", "2048");
std::env::set_var("GENARRATIVE_API_WORKER_THREADS", "6");
@@ -1396,6 +1432,14 @@ mod tests {
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE", "250");
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS", "2000");
std::env::set_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES", "1048576");
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED", "false");
std::env::set_var(
"GENARRATIVE_WALLET_REFUND_OUTBOX_DIR",
"/tmp/genarrative-wallet-refund-outbox",
);
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE", "50");
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS", "3000");
std::env::set_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES", "524288");
std::env::set_var("GENARRATIVE_OTEL_ENABLED", "true");
}
@@ -1421,6 +1465,17 @@ mod tests {
std::time::Duration::from_millis(2_000)
);
assert_eq!(config.tracking_outbox_max_bytes, 1_048_576);
assert!(!config.wallet_refund_outbox_enabled);
assert_eq!(
config.wallet_refund_outbox_dir,
std::path::PathBuf::from("/tmp/genarrative-wallet-refund-outbox")
);
assert_eq!(config.wallet_refund_outbox_batch_size, 50);
assert_eq!(
config.wallet_refund_outbox_flush_interval,
std::time::Duration::from_millis(3_000)
);
assert_eq!(config.wallet_refund_outbox_max_bytes, 524_288);
assert!(config.otel_enabled);
unsafe {
@@ -1436,6 +1491,11 @@ mod tests {
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_TRACKING_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_ENABLED");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_DIR");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_BATCH_SIZE");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_FLUSH_INTERVAL_MS");
std::env::remove_var("GENARRATIVE_WALLET_REFUND_OUTBOX_MAX_BYTES");
std::env::remove_var("GENARRATIVE_OTEL_ENABLED");
}
}

View File

@@ -547,11 +547,12 @@ pub async fn generate_custom_world_scene_image(
require_openai_image_settings(&state)
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
let asset_id = format!("custom-scene-{}", current_utc_millis());
let billing_asset_id = request_context.request_id().to_string();
let asset = execute_billable_asset_operation(
&state,
&owner_user_id,
"scene_image",
asset_id.as_str(),
billing_asset_id.as_str(),
async {
let settings = require_openai_image_settings(&state)?.with_external_api_audit_context(
&request_context,
@@ -806,11 +807,12 @@ pub async fn generate_custom_world_cover_image(
require_dashscope_settings(&state)
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
let asset_id = format!("custom-cover-{}", current_utc_millis());
let billing_asset_id = request_context.request_id().to_string();
let asset = execute_billable_asset_operation(
&state,
&owner_user_id,
"custom_world_cover",
asset_id.as_str(),
billing_asset_id.as_str(),
async {
let settings = require_dashscope_settings(&state)?;
let http_client = build_dashscope_http_client(&settings)?;
@@ -1011,11 +1013,12 @@ pub async fn generate_custom_world_opening_cg(
.map_err(|error| custom_world_ai_error_response(&request_context, error))?;
let opening_cg_id = normalized.opening_cg_id.clone();
let billing_asset_id = request_context.request_id().to_string();
let generated = execute_billable_asset_operation_with_cost(
&state,
&owner_user_id,
"custom_world_opening_cg",
opening_cg_id.as_str(),
billing_asset_id.as_str(),
OPENING_CG_POINTS_COST,
async {
let image_settings = require_openai_image_settings(&state)?

View File

@@ -89,6 +89,7 @@ mod tracking_outbox;
mod vector_engine_audio_generation;
mod visual_novel;
mod volcengine_speech;
mod wallet_refund_outbox;
mod wechat;
mod wooden_fish;
mod work_author;
@@ -115,6 +116,7 @@ use crate::{
config::AppConfig,
state::{AppState, AppStateInitError},
tracking_outbox::TrackingOutbox,
wallet_refund_outbox::WalletRefundOutbox,
};
const API_SERVER_STARTUP_STACK_SIZE_BYTES: usize = 32 * 1024 * 1024;
@@ -125,6 +127,7 @@ const AUTH_STORE_STARTUP_RETRY_INTERVAL: Duration = Duration::from_secs(5);
struct ShutdownContext {
app_state: Option<AppState>,
tracking_outbox: Option<Arc<TrackingOutbox>>,
wallet_refund_outbox: Option<Arc<WalletRefundOutbox>>,
outbox_flush_timeout: Duration,
}
@@ -178,11 +181,16 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
if let Some(outbox) = tracking_outbox.clone() {
outbox.spawn_worker();
}
let wallet_refund_outbox = state.wallet_refund_outbox();
if let Some(outbox) = wallet_refund_outbox.clone() {
outbox.spawn_worker();
}
(
build_router(state.clone()),
ShutdownContext {
app_state: Some(state),
tracking_outbox,
wallet_refund_outbox,
outbox_flush_timeout,
},
)
@@ -192,6 +200,7 @@ async fn run_server(config: AppConfig) -> Result<(), io::Error> {
ShutdownContext {
app_state: None,
tracking_outbox: None,
wallet_refund_outbox: None,
outbox_flush_timeout,
},
),
@@ -271,12 +280,8 @@ async fn finalize_shutdown(context: ShutdownContext) {
state.mark_not_ready();
}
let Some(outbox) = context.tracking_outbox else {
return;
};
if context.outbox_flush_timeout.is_zero() {
warn!("api-server 退出时 tracking outbox flush timeout 为 0跳过主动 flush");
warn!("api-server 退出时 outbox flush timeout 为 0跳过主动 flush");
return;
}
@@ -284,22 +289,45 @@ async fn finalize_shutdown(context: ShutdownContext) {
.outbox_flush_timeout
.as_millis()
.min(u128::from(u64::MAX)) as u64;
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 tracking outbox flush 完成");
if let Some(outbox) = context.tracking_outbox {
info!(timeout_ms, "api-server 退出前封存并 flush tracking outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 tracking outbox flush 完成");
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 tracking outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 tracking outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
if let Some(outbox) = context.wallet_refund_outbox {
info!(timeout_ms, "api-server 退出前 flush wallet refund outbox");
match timeout(context.outbox_flush_timeout, outbox.flush_for_shutdown()).await {
Ok(Ok(())) => {
info!("api-server 退出前 wallet refund outbox flush 完成");
}
Ok(Err(error)) => {
warn!(
error = %error,
"api-server 退出前 wallet refund outbox flush 未完成,已保留本地文件等待下次启动重试"
);
}
Err(_) => {
warn!(
timeout_ms,
"api-server 退出前 wallet refund outbox flush 超时,已保留本地文件等待下次启动重试"
);
}
}
}
}

View File

@@ -1,4 +1,4 @@
use std::{
use std::{
collections::BTreeMap,
convert::Infallible,
future::Future,
@@ -65,10 +65,7 @@ use spacetime_client::{
use crate::{
api_response::json_success_body,
asset_billing::{
execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error,
should_skip_asset_operation_billing_for_connectivity,
},
asset_billing::{execute_billable_asset_operation_with_cost, map_asset_operation_wallet_error},
auth::{AuthenticatedAccessToken, RuntimePrincipal},
config::AppConfig,
generated_asset_sheets::apply_generated_asset_sheet_green_screen_alpha,
@@ -354,13 +351,6 @@ impl Match3DItemAssetsGenerationPlan {
Self::Replace(plan) => plan.requested_item_names.len(),
}
}
fn billing_fingerprint_source(&self) -> String {
match self {
Self::Append(plan) => format!("append:{}", plan.requested_item_names.join("|")),
Self::Replace(plan) => format!("replace:{}", plan.requested_item_names.join("|")),
}
}
}
fn serialize_match3d_generated_item_assets(assets: &[Match3DGeneratedItemAsset]) -> Option<String> {

View File

@@ -162,7 +162,12 @@ pub(super) async fn compile_match3d_draft_for_session(
let initial_tags = requested_tags
.clone()
.unwrap_or_else(|| fallback_work_metadata.tags.clone());
let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, current_utc_micros());
let billing_asset_id = format!(
"{}:{}:{}",
session_id,
profile_id,
request_context.request_id()
);
let points_cost = crate::creation_entry_config::resolve_creation_entry_mud_point_cost(
state,
"match3d",
@@ -514,15 +519,6 @@ async fn consume_match3d_draft_generation_points(
.await
{
Ok(_) => Ok(true),
Err(error) if should_skip_asset_operation_billing_for_connectivity(&error) => {
tracing::warn!(
owner_user_id,
billing_asset_id,
error = %error,
"抓大鹅草稿泥点预扣因 SpacetimeDB 连接不可用而降级跳过"
);
Ok(false)
}
Err(error) => Err(match3d_error_response(
request_context,
MATCH3D_AGENT_PROVIDER,

View File

@@ -751,7 +751,6 @@ pub async fn generate_match3d_background_image_for_work(
)?;
let prompt = normalize_match3d_background_prompt(payload.prompt.as_str());
ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?;
let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str());
let context =
load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id)
@@ -763,7 +762,12 @@ pub async fn generate_match3d_background_image_for_work(
config,
assets,
} = context;
let billing_asset_id = format!("{}:{}:{}", session_id, profile_id, prompt_fingerprint);
let billing_asset_id = format!(
"{}:{}:{}",
session_id,
profile_id,
request_context.request_id()
);
let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost(
&state,
owner_user_id.as_str(),
@@ -860,7 +864,6 @@ pub async fn generate_match3d_container_image_for_work(
)?;
let prompt = normalize_match3d_background_prompt(payload.prompt.as_str());
ensure_non_empty(&request_context, MATCH3D_WORKS_PROVIDER, &prompt, "prompt")?;
let prompt_fingerprint = build_match3d_prompt_fingerprint(prompt.as_str());
let context =
load_match3d_work_asset_context(&state, &request_context, &authenticated, &profile_id)
@@ -874,7 +877,9 @@ pub async fn generate_match3d_container_image_for_work(
} = context;
let billing_asset_id = format!(
"{}:{}:{}:container",
session_id, profile_id, prompt_fingerprint
session_id,
profile_id,
request_context.request_id()
);
let (generated_background, generated_assets) = execute_billable_asset_operation_with_cost(
&state,
@@ -1017,7 +1022,7 @@ pub async fn generate_match3d_item_assets_for_work(
session_id,
profile_id,
billed_item_count,
build_match3d_prompt_fingerprint(generation_plan.billing_fingerprint_source().as_str())
request_context.request_id()
);
let generated_assets = execute_billable_asset_operation_with_cost(
&state,

View File

@@ -1208,14 +1208,6 @@ pub(super) fn normalize_match3d_background_prompt(raw: &str) -> String {
.to_string()
}
pub(super) fn build_match3d_prompt_fingerprint(value: &str) -> String {
let mut hash = 0u32;
for character in value.chars() {
hash = hash.wrapping_mul(31).wrapping_add(character as u32);
}
format!("{hash:08x}")
}
pub(super) fn build_fallback_match3d_background_prompt(config: &Match3DConfigJson) -> String {
let theme = config.theme_text.trim();
let normalized_theme = if theme.is_empty() { "抓大鹅" } else { theme };

View File

@@ -1,6 +1,5 @@
use std::{
collections::{BTreeMap, HashSet},
sync::{Mutex, OnceLock},
collections::BTreeMap,
time::{Instant, SystemTime, UNIX_EPOCH},
};
@@ -56,17 +55,19 @@ use spacetime_client::{
PuzzleAgentMessageRecord, PuzzleAgentMessageSubmitRecordInput,
PuzzleAgentSessionCreateRecordInput, PuzzleAgentSessionRecord,
PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord, PuzzleAnchorPackRecord,
PuzzleAudioAssetRecord, PuzzleCreatorIntentRecord, PuzzleDraftCompileFailureRecordInput,
PuzzleDraftLevelRecord, PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput,
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
PuzzleLeaderboardEntryRecord, PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput,
PuzzleRecommendedNextWorkRecord, PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord,
PuzzleResultPreviewFindingRecord, PuzzleResultPreviewRecord, PuzzleRunDragRecordInput,
PuzzleRunPauseRecordInput, PuzzleRunPropRecordInput, PuzzleRunRecord,
PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput, PuzzleSelectCoverImageRecordInput,
PuzzleUiBackgroundSaveRecordInput, PuzzleWorkLikeReportRecordInput,
PuzzleWorkPointIncentiveClaimRecordInput, PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput,
PuzzleWorkUpsertRecordInput, SpacetimeClientError,
PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleCreatorIntentRecord,
PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord, PuzzleFormDraftRecord,
PuzzleFormDraftSaveRecordInput, PuzzleGeneratedImageCandidateRecord,
PuzzleGeneratedImagesSaveRecordInput, PuzzleLeaderboardEntryRecord,
PuzzleLeaderboardSubmitRecordInput, PuzzlePublishRecordInput, PuzzleRecommendedNextWorkRecord,
PuzzleResultDraftRecord, PuzzleResultPreviewBlockerRecord, PuzzleResultPreviewFindingRecord,
PuzzleResultPreviewRecord, PuzzleRunDragRecordInput, PuzzleRunPauseRecordInput,
PuzzleRunPropRecordInput, PuzzleRunRecord, PuzzleRunStartRecordInput, PuzzleRunSwapRecordInput,
PuzzleSelectCoverImageRecordInput, PuzzleUiBackgroundSaveRecordInput,
PuzzleWorkLikeReportRecordInput, PuzzleWorkPointIncentiveClaimRecordInput,
PuzzleWorkProfileRecord, PuzzleWorkRemixRecordInput, PuzzleWorkUpsertRecordInput,
SpacetimeClientError,
};
use std::convert::Infallible;
@@ -134,38 +135,51 @@ const PUZZLE_UI_BACKGROUND_PROMPT_FALLBACK_MARKER: &str =
const PUZZLE_VECTOR_ENGINE_SQUARE_IMAGE_SIZE: &str = "1024x1024";
const PUZZLE_VECTOR_ENGINE_PORTRAIT_IMAGE_SIZE: &str = "1024x1536";
static PUZZLE_BACKGROUND_COMPILE_TASKS: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
fn puzzle_background_compile_tasks() -> &'static Mutex<HashSet<String>> {
PUZZLE_BACKGROUND_COMPILE_TASKS.get_or_init(|| Mutex::new(HashSet::new()))
fn build_puzzle_background_compile_task_id(session_id: &str) -> String {
format!("puzzle_initial_background:{session_id}")
}
fn try_register_puzzle_background_compile_task(session_id: &str) -> bool {
match puzzle_background_compile_tasks().lock() {
Ok(mut tasks) => tasks.insert(session_id.to_string()),
Err(error) => {
fn build_puzzle_background_compile_claim_id(task_id: &str, request_id: &str) -> String {
format!("{task_id}:{request_id}")
}
async fn release_claimed_puzzle_background_compile_task(
state: &PuzzleApiState,
task_id: &str,
claim_id: &str,
session_id: &str,
owner_user_id: &str,
) {
let result = state
.spacetime_client()
.release_puzzle_background_compile_task(PuzzleBackgroundCompileTaskReleaseRecordInput {
task_id: task_id.to_string(),
claim_id: claim_id.to_string(),
session_id: session_id.to_string(),
owner_user_id: owner_user_id.to_string(),
})
.await;
match result {
Ok(true) => {}
Ok(false) => {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
task_id,
claim_id,
session_id,
error = %error,
"拼图后台生成任务注册表锁已损坏,允许本次任务继续"
owner_user_id,
"拼图首图后台生成任务释放未命中当前 claim"
);
true
}
}
}
fn unregister_puzzle_background_compile_task(session_id: &str) {
match puzzle_background_compile_tasks().lock() {
Ok(mut tasks) => {
tasks.remove(session_id);
}
Err(error) => {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
task_id,
claim_id,
session_id,
owner_user_id,
error = %error,
"拼图后台生成任务注册表解锁失败,忽略清理"
"拼图首图后台生成任务释放失败"
);
}
}

View File

@@ -588,7 +588,7 @@ pub async fn execute_puzzle_agent_action(
let owner_user_id = authenticated.claims().user_id().to_string();
let now = current_utc_micros();
let action = payload.action.trim().to_string();
let billing_asset_id = format!("{session_id}:{now}");
let billing_asset_id = format!("{}:{}:{}", session_id, action, request_context.request_id());
let mut operation_consumed_points = 0;
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
@@ -695,156 +695,207 @@ pub async fn execute_puzzle_agent_action(
Err(response) => return Err(response),
};
let session = if ai_redraw {
if !try_register_puzzle_background_compile_task(&compile_session_id) {
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %compile_session_id,
owner_user_id = %owner_user_id,
"拼图首图后台生成任务已存在,本次 action 直接返回生成中会话"
);
state
.spacetime_client()
.get_puzzle_agent_session(compile_session_id.clone(), owner_user_id.clone())
.await
.map(mark_puzzle_initial_generation_started_snapshot)
.map_err(map_puzzle_client_error)
} else {
let compiled_session = state
.spacetime_client()
.compile_puzzle_agent_draft(
compile_session_id.clone(),
owner_user_id.clone(),
now,
)
.await
.map_err(map_puzzle_compile_error);
match compiled_session {
Ok(compiled_session) => {
let response_session = mark_puzzle_initial_generation_started_snapshot(
compiled_session.clone(),
);
let background_state = state.clone();
let background_request_context = request_context.clone();
let background_session_id = compile_session_id.clone();
let background_owner_user_id = owner_user_id.clone();
let background_prompt_text = prompt_text.map(str::to_string);
let background_reference_image_src =
primary_reference_image_src.map(str::to_string);
let background_image_model = payload.image_model.clone();
let background_points_cost = puzzle_draft_generation_points_cost;
let background_work_name = compiled_session
.draft
.as_ref()
.map(|draft| draft.work_title.clone());
let background_billing_asset_id =
format!("{background_session_id}:compile_puzzle_draft");
tokio::spawn(async move {
let operation_owner_user_id = background_owner_user_id.clone();
let background_root_state = background_state.root_state().clone();
let operation_state = background_state.clone();
let result = execute_billable_asset_operation_with_cost(
&background_root_state,
&background_owner_user_id,
"puzzle_initial_image",
&background_billing_asset_id,
background_points_cost,
async move {
generate_puzzle_initial_cover_from_compiled_session(
&operation_state,
&background_request_context,
compiled_session,
operation_owner_user_id,
background_prompt_text.as_deref(),
background_reference_image_src.as_deref(),
background_image_model.as_deref(),
current_utc_micros(),
)
.await
},
)
.await;
match result {
Ok(session) => {
send_generation_result_subscribe_message_after_completion(
&background_root_state,
GenerationResultSubscribeMessage {
owner_user_id: background_owner_user_id.clone(),
task_name: Some("拼图".to_string()),
work_name: session
.draft
.as_ref()
.map(|draft| draft.work_title.clone()),
status:
GenerationResultSubscribeMessageStatus::Succeeded,
consumed_points: background_points_cost,
completed_at_micros: current_utc_micros(),
page: Some("/pages/web-view/index".to_string()),
},
)
.await;
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %session.session_id,
owner_user_id = %background_owner_user_id,
"拼图首图后台生成任务完成"
);
}
Err(error) => {
let error_message = error.body_text();
let failed_at_micros = current_utc_micros();
let failure_result = background_state
.spacetime_client()
.mark_puzzle_draft_generation_failed(
PuzzleDraftCompileFailureRecordInput {
session_id: background_session_id.clone(),
owner_user_id: background_owner_user_id.clone(),
error_message: error_message.clone(),
failed_at_micros,
},
let background_task_id =
build_puzzle_background_compile_task_id(&compile_session_id);
let background_claim_id = build_puzzle_background_compile_claim_id(
&background_task_id,
request_context.request_id(),
);
let claim_result = state
.spacetime_client()
.claim_puzzle_background_compile_task(
PuzzleBackgroundCompileTaskClaimRecordInput {
task_id: background_task_id.clone(),
claim_id: background_claim_id.clone(),
session_id: compile_session_id.clone(),
owner_user_id: owner_user_id.clone(),
claimed_at_micros: current_utc_micros(),
},
)
.await
.map_err(map_puzzle_client_error);
match claim_result {
Ok(false) => {
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %compile_session_id,
owner_user_id = %owner_user_id,
task_id = %background_task_id,
"拼图首图后台生成任务已存在,本次 action 直接返回生成中会话"
);
state
.spacetime_client()
.get_puzzle_agent_session(
compile_session_id.clone(),
owner_user_id.clone(),
)
.await
.map(mark_puzzle_initial_generation_started_snapshot)
.map_err(map_puzzle_client_error)
}
Ok(true) => {
let compiled_session = state
.spacetime_client()
.compile_puzzle_agent_draft(
compile_session_id.clone(),
owner_user_id.clone(),
now,
)
.await
.map_err(map_puzzle_compile_error);
match compiled_session {
Ok(compiled_session) => {
let response_session =
mark_puzzle_initial_generation_started_snapshot(
compiled_session.clone(),
);
let background_state = state.clone();
let background_request_context = request_context.clone();
let background_session_id = compile_session_id.clone();
let background_owner_user_id = owner_user_id.clone();
let background_task_id = background_task_id.clone();
let background_claim_id = background_claim_id.clone();
let background_prompt_text = prompt_text.map(str::to_string);
let background_reference_image_src =
primary_reference_image_src.map(str::to_string);
let background_image_model = payload.image_model.clone();
let background_points_cost = puzzle_draft_generation_points_cost;
let background_work_name = compiled_session
.draft
.as_ref()
.map(|draft| draft.work_title.clone());
let background_billing_asset_id = format!(
"{background_session_id}:compile_puzzle_draft:{}",
background_request_context.request_id()
);
tokio::spawn(async move {
let operation_owner_user_id = background_owner_user_id.clone();
let background_root_state =
background_state.root_state().clone();
let operation_state = background_state.clone();
let result = execute_billable_asset_operation_with_cost(
&background_root_state,
&background_owner_user_id,
"puzzle_initial_image",
&background_billing_asset_id,
background_points_cost,
async move {
generate_puzzle_initial_cover_from_compiled_session(
&operation_state,
&background_request_context,
compiled_session,
operation_owner_user_id,
background_prompt_text.as_deref(),
background_reference_image_src.as_deref(),
background_image_model.as_deref(),
current_utc_micros(),
)
.await;
if let Err(mark_error) = failure_result {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %background_session_id,
owner_user_id = %background_owner_user_id,
message = %mark_error,
"拼图首图后台生成失败态回写失败"
);
} else {
.await
},
)
.await;
match result {
Ok(session) => {
send_generation_result_subscribe_message_after_completion(
&background_root_state,
GenerationResultSubscribeMessage {
owner_user_id: background_owner_user_id.clone(),
task_name: Some("拼图".to_string()),
work_name: background_work_name.clone(),
work_name: session
.draft
.as_ref()
.map(|draft| draft.work_title.clone()),
status:
GenerationResultSubscribeMessageStatus::Failed,
consumed_points: 0,
completed_at_micros: failed_at_micros,
GenerationResultSubscribeMessageStatus::Succeeded,
consumed_points: background_points_cost,
completed_at_micros: current_utc_micros(),
page: Some("/pages/web-view/index".to_string()),
},
)
.await;
tracing::info!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %session.session_id,
owner_user_id = %background_owner_user_id,
"拼图首图后台生成任务完成"
);
}
Err(error) => {
let error_message = error.body_text();
let failed_at_micros = current_utc_micros();
let failure_result = background_state
.spacetime_client()
.mark_puzzle_draft_generation_failed(
PuzzleDraftCompileFailureRecordInput {
session_id: background_session_id.clone(),
owner_user_id: background_owner_user_id
.clone(),
error_message: error_message.clone(),
failed_at_micros,
},
)
.await;
if let Err(mark_error) = failure_result {
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %background_session_id,
owner_user_id = %background_owner_user_id,
message = %mark_error,
"拼图首图后台生成失败态回写失败"
);
} else {
send_generation_result_subscribe_message_after_completion(
&background_root_state,
GenerationResultSubscribeMessage {
owner_user_id: background_owner_user_id
.clone(),
task_name: Some("拼图".to_string()),
work_name: background_work_name.clone(),
status:
GenerationResultSubscribeMessageStatus::Failed,
consumed_points: 0,
completed_at_micros: failed_at_micros,
page: Some(
"/pages/web-view/index".to_string(),
),
},
)
.await;
}
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %background_session_id,
owner_user_id = %background_owner_user_id,
message = %error_message,
"拼图首图后台生成任务失败"
);
}
tracing::warn!(
provider = PUZZLE_AGENT_API_BASE_PROVIDER,
session_id = %background_session_id,
owner_user_id = %background_owner_user_id,
message = %error_message,
"拼图首图后台生成任务失败"
);
}
}
unregister_puzzle_background_compile_task(&background_session_id);
});
Ok(response_session)
}
Err(error) => {
unregister_puzzle_background_compile_task(&compile_session_id);
Err(error)
release_claimed_puzzle_background_compile_task(
&background_state,
&background_task_id,
&background_claim_id,
&background_session_id,
&background_owner_user_id,
)
.await;
});
Ok(response_session)
}
Err(error) => {
release_claimed_puzzle_background_compile_task(
&state,
&background_task_id,
&background_claim_id,
&compile_session_id,
&owner_user_id,
)
.await;
Err(error)
}
}
}
Err(error) => Err(error),
}
} else {
compile_puzzle_draft_with_uploaded_cover(
@@ -2231,7 +2282,7 @@ pub async fn use_puzzle_runtime_prop(
}
};
let should_sync_freeze_boundary = matches!(prop_kind.as_str(), "freezeTime" | "freeze_time");
let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, current_utc_micros());
let billing_asset_id = format!("{}:{}:{}", run_id, prop_kind, request_context.request_id());
let reducer_owner_user_id = owner_user_id.clone();
let reducer_run_id = run_id.clone();
let fallback_run_id = run_id.clone();

View File

@@ -41,6 +41,7 @@ use tracing::{info, warn};
use crate::config::AppConfig;
use crate::puzzle_gallery_cache::PuzzleGalleryCache;
use crate::tracking_outbox::TrackingOutbox;
use crate::wallet_refund_outbox::WalletRefundOutbox;
use crate::wechat::pay::{build_wechat_pay_config, map_wechat_pay_init_error};
use crate::wechat::provider::build_wechat_provider;
use crate::work_author::{
@@ -263,6 +264,7 @@ pub struct AppStateInner {
spacetime_client: SpacetimeClient,
puzzle_gallery_cache: PuzzleGalleryCache,
tracking_outbox: Option<Arc<TrackingOutbox>>,
wallet_refund_outbox: Option<Arc<WalletRefundOutbox>>,
llm_client: Option<LlmClient>,
creative_agent_gpt5_client: Option<LlmClient>,
creative_agent_executor: Arc<MockLangChainRustAgentExecutor>,
@@ -406,6 +408,8 @@ impl AppState {
procedure_timeout: config.spacetime_procedure_timeout,
});
let tracking_outbox = TrackingOutbox::from_config(&config, spacetime_client.clone());
let wallet_refund_outbox =
WalletRefundOutbox::from_config(&config, spacetime_client.clone());
let llm_client = build_llm_client(&config)?;
let creative_agent_gpt5_client = build_creative_agent_gpt5_client(&config)?;
let http_request_permit_pools = HttpRequestPermitPools::from_config(&config);
@@ -441,6 +445,7 @@ impl AppState {
spacetime_client,
puzzle_gallery_cache: PuzzleGalleryCache::new(),
tracking_outbox,
wallet_refund_outbox,
llm_client,
creative_agent_gpt5_client,
creative_agent_executor: Arc::new(MockLangChainRustAgentExecutor),
@@ -922,6 +927,10 @@ impl AppState {
self.tracking_outbox.clone()
}
pub fn wallet_refund_outbox(&self) -> Option<Arc<WalletRefundOutbox>> {
self.wallet_refund_outbox.clone()
}
pub fn llm_client(&self) -> Option<&LlmClient> {
self.llm_client.as_ref()
}

View File

@@ -0,0 +1,463 @@
use std::{
fmt,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use spacetime_client::{SpacetimeClient, SpacetimeClientError};
use tokio::{
fs::{self, File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
sync::{Mutex, Notify},
time::sleep,
};
use tracing::{debug, warn};
use crate::config::AppConfig;
const PENDING_FILE_PREFIX: &str = "refund-";
const CORRUPT_FILE_PREFIX: &str = "corrupt-";
const TEMP_FILE_PREFIX: &str = "tmp-";
const OUTBOX_FILE_EXTENSION: &str = ".json";
#[derive(Clone)]
pub struct WalletRefundOutbox {
dir: PathBuf,
batch_size: usize,
flush_interval: Duration,
max_bytes: u64,
spacetime_client: SpacetimeClient,
enqueue_lock: Arc<Mutex<()>>,
flush_notify: Arc<Notify>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct WalletRefundOutboxRecord {
pub owner_user_id: String,
pub amount: u64,
pub ledger_id: String,
pub created_at_micros: i64,
pub asset_kind: String,
pub asset_id: String,
}
#[derive(Debug)]
pub enum WalletRefundOutboxEnqueueOutcome {
Enqueued,
Dropped { reason: &'static str },
}
#[derive(Debug)]
pub enum WalletRefundOutboxError {
Io(std::io::Error),
Json(serde_json::Error),
Spacetime(SpacetimeClientError),
}
impl WalletRefundOutbox {
pub fn from_config(config: &AppConfig, spacetime_client: SpacetimeClient) -> Option<Arc<Self>> {
if !config.wallet_refund_outbox_enabled {
return None;
}
Some(Arc::new(Self {
dir: config.wallet_refund_outbox_dir.clone(),
batch_size: config.wallet_refund_outbox_batch_size.max(1),
flush_interval: config.wallet_refund_outbox_flush_interval,
max_bytes: config.wallet_refund_outbox_max_bytes,
spacetime_client,
enqueue_lock: Arc::new(Mutex::new(())),
flush_notify: Arc::new(Notify::new()),
}))
}
pub async fn enqueue(
&self,
record: WalletRefundOutboxRecord,
) -> Result<WalletRefundOutboxEnqueueOutcome, WalletRefundOutboxError> {
let _guard = self.enqueue_lock.lock().await;
fs::create_dir_all(&self.dir).await?;
let pending_path = self.pending_path_for_ledger(&record.ledger_id);
if fs::metadata(&pending_path).await.is_ok() {
self.flush_notify.notify_one();
return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued);
}
let bytes = serde_json::to_vec(&record)?;
let line_bytes = bytes.len().min(u64::MAX as usize) as u64;
let current_bytes = directory_size_if_exists(&self.dir).unwrap_or(0);
if current_bytes.saturating_add(line_bytes) > self.max_bytes {
return Ok(WalletRefundOutboxEnqueueOutcome::Dropped {
reason: "max_bytes",
});
}
let temp_path = self.temp_path();
let mut file = OpenOptions::new()
.create_new(true)
.write(true)
.open(&temp_path)
.await?;
file.write_all(&bytes).await?;
file.flush().await?;
file.sync_data().await?;
drop(file);
if fs::metadata(&pending_path).await.is_ok() {
let _ = fs::remove_file(&temp_path).await;
self.flush_notify.notify_one();
return Ok(WalletRefundOutboxEnqueueOutcome::Enqueued);
}
fs::rename(&temp_path, &pending_path).await?;
sync_directory_metadata(&self.dir).await?;
self.flush_notify.notify_one();
Ok(WalletRefundOutboxEnqueueOutcome::Enqueued)
}
pub fn spawn_worker(self: Arc<Self>) {
tokio::spawn(async move {
loop {
tokio::select! {
_ = sleep(self.flush_interval) => {
if let Err(error) = self.flush_pending_files_once().await {
warn!(error = %error, "wallet refund outbox 重放退款失败,将保留文件等待重试");
}
}
_ = self.flush_notify.notified() => {
if let Err(error) = self.flush_pending_files_once().await {
warn!(error = %error, "wallet refund outbox 主动重放退款失败,将保留文件等待重试");
}
}
}
}
});
}
pub async fn flush_for_shutdown(&self) -> Result<(), WalletRefundOutboxError> {
self.flush_pending_files_once().await
}
async fn flush_pending_files_once(&self) -> Result<(), WalletRefundOutboxError> {
fs::create_dir_all(&self.dir).await?;
let pending_files = self.list_pending_files().await?;
for path in pending_files.into_iter().take(self.batch_size) {
let record = match read_refund_record(&path).await {
Ok(record) => record,
Err(error) if error.is_data_corruption() => {
let corrupt_path = self.corrupt_path_for(&path);
fs::rename(&path, &corrupt_path).await?;
sync_directory_metadata(&self.dir).await?;
warn!(
error = %error,
source = %path.display(),
target = %corrupt_path.display(),
"wallet refund outbox 文件无法解析,已隔离"
);
continue;
}
Err(error) => return Err(error),
};
match self
.spacetime_client
.refund_profile_wallet_points(
record.owner_user_id.clone(),
record.amount,
record.ledger_id.clone(),
record.created_at_micros,
)
.await
{
Ok(_) => {
match fs::remove_file(&path).await {
Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
Err(error) => return Err(error.into()),
}
sync_directory_metadata(&self.dir).await?;
debug!(
ledger_id = %record.ledger_id,
owner_user_id = %record.owner_user_id,
asset_kind = %record.asset_kind,
asset_id = %record.asset_id,
path = %path.display(),
"wallet refund outbox 退款已重放并删除文件"
);
}
Err(error) => return Err(WalletRefundOutboxError::Spacetime(error)),
}
}
Ok(())
}
async fn list_pending_files(&self) -> Result<Vec<PathBuf>, WalletRefundOutboxError> {
let mut entries = fs::read_dir(&self.dir).await?;
let mut files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let Some(name) = path.file_name().and_then(|value| value.to_str()) else {
continue;
};
if name.starts_with(PENDING_FILE_PREFIX) && name.ends_with(OUTBOX_FILE_EXTENSION) {
files.push(path);
}
}
files.sort();
Ok(files)
}
fn pending_path_for_ledger(&self, ledger_id: &str) -> PathBuf {
self.dir.join(format!(
"{PENDING_FILE_PREFIX}{}{OUTBOX_FILE_EXTENSION}",
ledger_id_hash(ledger_id)
))
}
fn temp_path(&self) -> PathBuf {
self.dir.join(format!(
"{TEMP_FILE_PREFIX}{}-{uuid}{OUTBOX_FILE_EXTENSION}",
current_unix_micros(),
uuid = uuid::Uuid::new_v4()
))
}
fn corrupt_path_for(&self, path: &Path) -> PathBuf {
let name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("unknown.json");
self.dir.join(format!(
"{CORRUPT_FILE_PREFIX}{}-{uuid}-{name}",
current_unix_micros(),
uuid = uuid::Uuid::new_v4()
))
}
}
impl fmt::Debug for WalletRefundOutbox {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalletRefundOutbox")
.field("dir", &self.dir)
.field("batch_size", &self.batch_size)
.field("flush_interval", &self.flush_interval)
.field("max_bytes", &self.max_bytes)
.finish()
}
}
impl fmt::Display for WalletRefundOutboxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
Self::Json(error) => write!(f, "{error}"),
Self::Spacetime(error) => write!(f, "{error}"),
}
}
}
impl From<std::io::Error> for WalletRefundOutboxError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<serde_json::Error> for WalletRefundOutboxError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value)
}
}
impl WalletRefundOutboxError {
fn is_data_corruption(&self) -> bool {
matches!(self, Self::Json(_))
}
}
async fn read_refund_record(
path: &Path,
) -> Result<WalletRefundOutboxRecord, WalletRefundOutboxError> {
let mut file = File::open(path).await?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).await?;
Ok(serde_json::from_slice::<WalletRefundOutboxRecord>(&bytes)?)
}
fn directory_size_if_exists(path: &Path) -> Result<u64, std::io::Error> {
if !path.is_dir() {
return Ok(0);
}
let mut total = 0u64;
for entry in std::fs::read_dir(path)? {
let entry = entry?;
if !is_pending_outbox_file_name(&entry.file_name()) {
continue;
}
let metadata = entry.metadata()?;
if metadata.is_file() {
total = total.saturating_add(metadata.len());
}
}
Ok(total)
}
fn current_unix_micros() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros()
}
fn ledger_id_hash(ledger_id: &str) -> String {
hex::encode(Sha256::digest(ledger_id.as_bytes()))
}
fn is_pending_outbox_file_name(name: &std::ffi::OsStr) -> bool {
name.to_str().is_some_and(|value| {
value.starts_with(PENDING_FILE_PREFIX) && value.ends_with(OUTBOX_FILE_EXTENSION)
})
}
async fn sync_directory_metadata(path: &Path) -> Result<(), WalletRefundOutboxError> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let dir = std::fs::File::open(path)?;
dir.sync_all()
})
.await
.map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error.to_string()))??;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_record(ledger_id: &str) -> WalletRefundOutboxRecord {
WalletRefundOutboxRecord {
owner_user_id: "user-1".to_string(),
amount: 2,
ledger_id: ledger_id.to_string(),
created_at_micros: 1_713_680_000_000_000,
asset_kind: "puzzle_initial_image".to_string(),
asset_id: "asset-1".to_string(),
}
}
fn test_dir(name: &str) -> PathBuf {
let dir = std::env::temp_dir().join(format!(
"genarrative-wallet-refund-outbox-{name}-{}",
current_unix_micros()
));
let _ = std::fs::remove_dir_all(&dir);
dir
}
fn test_outbox(dir: PathBuf, max_bytes: u64) -> Arc<WalletRefundOutbox> {
let config = AppConfig {
wallet_refund_outbox_dir: dir,
wallet_refund_outbox_batch_size: 500,
wallet_refund_outbox_flush_interval: Duration::from_secs(60),
wallet_refund_outbox_max_bytes: max_bytes,
..AppConfig::default()
};
WalletRefundOutbox::from_config(
&config,
SpacetimeClient::new(spacetime_client::SpacetimeClientConfig {
server_url: "http://127.0.0.1:1".to_string(),
database: "missing".to_string(),
token: None,
pool_size: 1,
procedure_timeout: Duration::from_millis(10),
}),
)
.expect("outbox should be enabled")
}
#[tokio::test]
async fn enqueue_is_idempotent_per_ledger_id() {
let dir = test_dir("idempotent");
let outbox = test_outbox(dir.clone(), 1024 * 1024);
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
let pending_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| is_pending_outbox_file_name(&entry.file_name()))
.count();
assert_eq!(pending_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn enqueue_drops_when_outbox_exceeds_max_bytes() {
let dir = test_dir("max-bytes");
let outbox = test_outbox(dir.clone(), 1);
let outcome = outbox.enqueue(sample_record("ledger-1")).await.unwrap();
assert!(matches!(
outcome,
WalletRefundOutboxEnqueueOutcome::Dropped {
reason: "max_bytes"
}
));
assert!(!dir.exists() || std::fs::read_dir(&dir).unwrap().next().is_none());
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn flush_quarantines_corrupt_file() {
let dir = test_dir("corrupt");
std::fs::create_dir_all(&dir).unwrap();
let pending_path = dir.join(format!("{PENDING_FILE_PREFIX}bad{OUTBOX_FILE_EXTENSION}"));
std::fs::write(&pending_path, b"{not-json}").unwrap();
let outbox = test_outbox(dir.clone(), 1024 * 1024);
outbox.flush_pending_files_once().await.unwrap();
assert!(!pending_path.exists());
let corrupt_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| {
entry
.file_name()
.to_str()
.is_some_and(|name| name.starts_with(CORRUPT_FILE_PREFIX))
})
.count();
assert_eq!(corrupt_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn shutdown_flush_keeps_file_when_spacetime_is_unavailable() {
let dir = test_dir("shutdown");
let outbox = test_outbox(dir.clone(), 1024 * 1024);
outbox.enqueue(sample_record("ledger-1")).await.unwrap();
let result = outbox.flush_for_shutdown().await;
assert!(
matches!(result, Err(WalletRefundOutboxError::Spacetime(_))),
"missing test SpacetimeDB should keep refund file for retry"
);
let pending_count = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.filter(|entry| is_pending_outbox_file_name(&entry.file_name()))
.count();
assert_eq!(pending_count, 1);
let _ = std::fs::remove_dir_all(dir);
}
}

View File

@@ -20,6 +20,14 @@ pub struct PuzzleAgentSessionProcedureResult {
pub error_message: Option<String>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleBackgroundCompileTaskProcedureResult {
pub ok: bool,
pub claimed: bool,
pub error_message: Option<String>,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleWorksProcedureResult {

View File

@@ -68,6 +68,25 @@ pub struct PuzzleDraftCompileInput {
pub compiled_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleBackgroundCompileTaskClaimInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub claimed_at_micros: i64,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleBackgroundCompileTaskReleaseInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
}
#[cfg_attr(feature = "spacetime-types", derive(SpacetimeType))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuzzleDraftCompileFailureInput {

View File

@@ -0,0 +1,13 @@
[package]
name = "server-manager-panel"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
eframe = { version = "0.33", default-features = false, features = [
"default_fonts",
"glow",
"wayland",
"x11",
] }

View File

@@ -0,0 +1,577 @@
use eframe::egui;
use crate::health::{
DiskSnapshot, HealthLevel, MemorySnapshot, ProbeSnapshot, ServerHealthReport, ServiceSnapshot,
};
use crate::remote::{
RemoteEvent, RemoteReceiver, RemoteSender, ServiceAction, channel, spawn_health_check,
spawn_service_action,
};
use crate::ssh_config::{SshAlias, discover_ssh_aliases};
const DEFAULT_MANAGED_SERVICES: &[&str] = &[
"genarrative-api.service",
"spacetimedb.service",
"nginx.service",
"genarrative-health-patrol.timer",
"genarrative-database-backup.timer",
];
#[derive(Debug)]
pub struct ServerManagerApp {
servers: Vec<ServerState>,
selected_alias: Option<String>,
sidebar_collapsed: bool,
tx: RemoteSender,
rx: RemoteReceiver,
pending_confirmation: Option<ServiceConfirmation>,
custom_service_name: String,
}
impl Default for ServerManagerApp {
fn default() -> Self {
let (tx, rx) = channel();
let aliases = discover_ssh_aliases();
let selected_alias = aliases.first().map(|alias| alias.name.clone());
Self {
servers: aliases.into_iter().map(ServerState::new).collect(),
selected_alias,
sidebar_collapsed: false,
tx,
rx,
pending_confirmation: None,
custom_service_name: String::new(),
}
}
}
impl eframe::App for ServerManagerApp {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
self.drain_remote_events(ctx);
self.render_confirm_dialog(ctx);
egui::TopBottomPanel::top("top_bar").show(ctx, |ui| {
ui.horizontal(|ui| {
if ui
.button(if self.sidebar_collapsed {
"展开侧栏"
} else {
"收起侧栏"
})
.clicked()
{
self.sidebar_collapsed = !self.sidebar_collapsed;
}
if ui.button("重新读取 SSH alias").clicked() {
self.reload_aliases();
}
if let Some(alias) = self.selected_alias.clone() {
if ui.button("刷新当前服务器").clicked() {
self.refresh_server(&alias);
}
}
ui.separator();
ui.label("本地 SSH alias 管理");
});
});
if !self.sidebar_collapsed {
egui::SidePanel::left("server_sidebar")
.resizable(true)
.default_width(260.0)
.width_range(180.0..=360.0)
.show(ctx, |ui| self.render_sidebar(ui));
}
egui::CentralPanel::default().show(ctx, |ui| {
if self.servers.is_empty() {
self.render_empty_state(ui);
return;
}
let Some(alias) = self.selected_alias.clone() else {
self.render_empty_state(ui);
return;
};
if let Some(index) = self.server_index(&alias) {
self.render_server_detail(ui, index);
} else {
ui.label("请选择服务器");
}
});
}
}
impl ServerManagerApp {
fn drain_remote_events(&mut self, ctx: &egui::Context) {
while let Ok(event) = self.rx.try_recv() {
match event {
RemoteEvent::Health { alias, result } => {
if let Some(server) = self.server_mut(&alias) {
server.loading = false;
match result {
Ok(report) => {
server.error = None;
server.report = Some(report);
}
Err(error) => {
server.error = Some(error);
}
}
}
}
RemoteEvent::ServiceAction {
alias,
service,
action,
result,
} => {
if let Some(server) = self.server_mut(&alias) {
server.action_in_progress = None;
server.action_log = Some(format!(
"{} {}: {}\n{}{}",
action.label(),
service,
result.summary,
result.stdout,
result.stderr
));
server.loading = true;
spawn_health_check(alias, self.tx.clone());
}
}
}
ctx.request_repaint();
}
}
fn render_sidebar(&mut self, ui: &mut egui::Ui) {
ui.heading("服务器");
ui.add_space(8.0);
let mut refresh_alias: Option<String> = None;
for server in &mut self.servers {
let selected = self.selected_alias.as_deref() == Some(server.alias.name.as_str());
let response = ui.selectable_label(selected, server_label(server));
if response.clicked() {
self.selected_alias = Some(server.alias.name.clone());
}
response.on_hover_text(server.alias.source.display().to_string());
ui.horizontal(|ui| {
let status = server.status();
ui.colored_label(level_color(status), status.label());
if server.loading {
ui.spinner();
}
if ui.small_button("刷新").clicked() {
refresh_alias = Some(server.alias.name.clone());
}
});
ui.add_space(6.0);
}
if let Some(alias) = refresh_alias {
self.refresh_server(&alias);
}
}
fn render_empty_state(&mut self, ui: &mut egui::Ui) {
ui.vertical_centered(|ui| {
ui.heading("未发现 SSH alias");
ui.label("请在 ~/.ssh/config 中配置 Host alias 后重新读取。");
if ui.button("重新读取").clicked() {
self.reload_aliases();
}
});
}
fn render_server_detail(&mut self, ui: &mut egui::Ui, index: usize) {
let alias = self.servers[index].alias.name.clone();
let status = self.servers[index].status();
let loading = self.servers[index].loading;
let report = self.servers[index].report.clone();
let error = self.servers[index].error.clone();
let action_log = self.servers[index].action_log.clone();
ui.horizontal(|ui| {
ui.heading(&alias);
ui.colored_label(level_color(status), status.label());
if loading {
ui.spinner();
}
});
ui.add_space(8.0);
if let Some(error) = error {
ui.colored_label(warning_color(), format!("SSH 巡检失败:{error}"));
ui.add_space(8.0);
}
if let Some(report) = report {
self.render_report(ui, &alias, &report);
} else {
ui.label("尚未执行巡检。");
}
ui.add_space(12.0);
self.render_service_controls(ui, &alias, index);
if let Some(log) = action_log {
ui.add_space(12.0);
egui::CollapsingHeader::new("最近一次服务操作输出")
.default_open(true)
.show(ui, |ui| {
ui.add(
egui::TextEdit::multiline(&mut log.clone())
.font(egui::TextStyle::Monospace)
.desired_rows(8)
.interactive(false),
);
});
}
}
fn render_report(&self, ui: &mut egui::Ui, alias: &str, report: &ServerHealthReport) {
egui::ScrollArea::vertical().show(ui, |ui| {
ui.horizontal_wrapped(|ui| {
info_chip(ui, "主机", value_or_dash(&report.host.hostname));
info_chip(ui, "内核", value_or_dash(&report.host.kernel));
info_chip(ui, "运行时间", value_or_dash(&report.host.uptime));
info_chip(ui, "检查时间", value_or_dash(&report.checked_at));
});
ui.add_space(10.0);
egui::CollapsingHeader::new("硬件状态")
.default_open(true)
.show(ui, |ui| {
ui.horizontal_wrapped(|ui| {
info_chip(ui, "CPU", value_or_dash(&report.hardware.cpu_model));
info_chip(ui, "核心", value_or_dash(&report.hardware.cpu_cores));
info_chip(ui, "负载", value_or_dash(&report.hardware.load_average));
});
ui.add_space(6.0);
memory_row(ui, "内存", &report.hardware.memory);
memory_row(ui, "Swap", &report.hardware.swap);
ui.add_space(6.0);
for disk in &report.hardware.disks {
disk_row(ui, disk);
}
ui.add_space(6.0);
for sensor in &report.hardware.sensors {
ui.label(sensor);
}
});
egui::CollapsingHeader::new("服务状态")
.default_open(true)
.show(ui, |ui| {
egui::Grid::new(format!("{alias}_services"))
.striped(true)
.show(ui, |ui| {
ui.strong("服务");
ui.strong("状态");
ui.strong("子状态");
ui.strong("Unit");
ui.end_row();
for service in &report.services {
service_row(ui, service);
}
});
});
egui::CollapsingHeader::new("HTTP 探测")
.default_open(true)
.show(ui, |ui| {
egui::Grid::new(format!("{alias}_probes"))
.striped(true)
.show(ui, |ui| {
ui.strong("探测");
ui.strong("状态码");
ui.strong("耗时");
ui.strong("目标");
ui.end_row();
for probe in &report.probes {
probe_row(ui, probe);
}
});
});
if let Some(patrol) = &report.health_patrol {
egui::CollapsingHeader::new("生产健康巡检")
.default_open(true)
.show(ui, |ui| {
ui.horizontal(|ui| {
ui.colored_label(level_color(patrol.level), &patrol.status);
ui.label(value_or_dash(&patrol.checked_at));
ui.label(value_or_dash(&patrol.summary));
});
});
}
egui::CollapsingHeader::new("原始巡检输出").show(ui, |ui| {
ui.add(
egui::TextEdit::multiline(&mut report.raw_output.clone())
.font(egui::TextStyle::Monospace)
.desired_rows(12)
.interactive(false),
);
});
});
}
fn render_service_controls(&mut self, ui: &mut egui::Ui, alias: &str, index: usize) {
ui.heading("服务控制");
ui.add_space(4.0);
let action_in_progress = self.servers[index].action_in_progress.clone();
for service in DEFAULT_MANAGED_SERVICES {
ui.horizontal(|ui| {
ui.label(*service);
for action in [
ServiceAction::Start,
ServiceAction::Stop,
ServiceAction::Restart,
] {
let disabled = action_in_progress.is_some();
if ui
.add_enabled(!disabled, egui::Button::new(action.label()))
.clicked()
{
self.pending_confirmation = Some(ServiceConfirmation {
alias: alias.to_owned(),
service: (*service).to_owned(),
action,
});
}
}
});
}
ui.add_space(8.0);
ui.horizontal(|ui| {
ui.label("其他 unit");
ui.text_edit_singleline(&mut self.custom_service_name);
if ui.button("启动").clicked() {
self.confirm_custom_service(alias, ServiceAction::Start);
}
if ui.button("关闭").clicked() {
self.confirm_custom_service(alias, ServiceAction::Stop);
}
if ui.button("重启").clicked() {
self.confirm_custom_service(alias, ServiceAction::Restart);
}
});
if let Some(action) = action_in_progress {
ui.label(format!("正在执行:{action}"));
}
}
fn render_confirm_dialog(&mut self, ctx: &egui::Context) {
let Some(confirmation) = self.pending_confirmation.clone() else {
return;
};
egui::Window::new("确认服务操作")
.collapsible(false)
.resizable(false)
.show(ctx, |ui| {
ui.label(format!(
"确认在 {}{} {}",
confirmation.alias,
confirmation.action.label(),
confirmation.service
));
ui.add_space(8.0);
ui.horizontal(|ui| {
if ui.button("确认").clicked() {
self.execute_service_action(&confirmation);
self.pending_confirmation = None;
}
if ui.button("取消").clicked() {
self.pending_confirmation = None;
}
});
});
}
fn reload_aliases(&mut self) {
let aliases = discover_ssh_aliases();
self.servers = aliases.into_iter().map(ServerState::new).collect();
self.selected_alias = self.servers.first().map(|server| server.alias.name.clone());
}
fn refresh_server(&mut self, alias: &str) {
if let Some(server) = self.server_mut(alias) {
server.loading = true;
server.error = None;
}
spawn_health_check(alias.to_owned(), self.tx.clone());
}
fn confirm_custom_service(&mut self, alias: &str, action: ServiceAction) {
let service = self.custom_service_name.trim();
if service.is_empty() {
return;
}
self.pending_confirmation = Some(ServiceConfirmation {
alias: alias.to_owned(),
service: service.to_owned(),
action,
});
}
fn execute_service_action(&mut self, confirmation: &ServiceConfirmation) {
if let Some(server) = self.server_mut(&confirmation.alias) {
server.action_in_progress = Some(format!(
"{} {}",
confirmation.action.label(),
confirmation.service
));
server.action_log = None;
}
spawn_service_action(
confirmation.alias.clone(),
confirmation.service.clone(),
confirmation.action,
self.tx.clone(),
);
}
fn server_index(&self, alias: &str) -> Option<usize> {
self.servers
.iter()
.position(|server| server.alias.name == alias)
}
fn server_mut(&mut self, alias: &str) -> Option<&mut ServerState> {
self.servers
.iter_mut()
.find(|server| server.alias.name == alias)
}
}
#[derive(Debug, Clone)]
struct ServiceConfirmation {
alias: String,
service: String,
action: ServiceAction,
}
#[derive(Debug)]
struct ServerState {
alias: SshAlias,
report: Option<ServerHealthReport>,
loading: bool,
error: Option<String>,
action_in_progress: Option<String>,
action_log: Option<String>,
}
impl ServerState {
fn new(alias: SshAlias) -> Self {
Self {
alias,
report: None,
loading: false,
error: None,
action_in_progress: None,
action_log: None,
}
}
fn status(&self) -> HealthLevel {
if self.error.is_some() {
HealthLevel::Critical
} else {
self.report
.as_ref()
.map(|report| report.status)
.unwrap_or(HealthLevel::Unknown)
}
}
}
fn server_label(server: &ServerState) -> String {
let prefix = match server.status() {
HealthLevel::Ok => "[OK]",
HealthLevel::Warning => "[!]",
HealthLevel::Critical => "[X]",
HealthLevel::Unknown => "[?]",
};
format!("{prefix} {}", server.alias.name)
}
fn service_row(ui: &mut egui::Ui, service: &ServiceSnapshot) {
ui.label(&service.name);
ui.colored_label(level_color(service.level), &service.active);
ui.label(&service.sub);
ui.label(&service.unit_file);
ui.end_row();
}
fn probe_row(ui: &mut egui::Ui, probe: &ProbeSnapshot) {
ui.label(&probe.name);
ui.colored_label(level_color(probe.level), &probe.http_code);
ui.label(
probe
.elapsed_ms
.map(|elapsed| format!("{elapsed}ms"))
.unwrap_or_else(|| "-".to_owned()),
);
ui.label(&probe.target);
ui.end_row();
}
fn memory_row(ui: &mut egui::Ui, label: &str, memory: &MemorySnapshot) {
let percent = memory.used_percent.unwrap_or_default();
ui.horizontal(|ui| {
ui.label(label);
ui.add(egui::ProgressBar::new(f32::from(percent) / 100.0).text(format!("{percent}%")));
ui.label(format!(
"已用 {} / 总计 {},可用 {}",
value_or_dash(&memory.used),
value_or_dash(&memory.total),
value_or_dash(&memory.available)
));
});
}
fn disk_row(ui: &mut egui::Ui, disk: &DiskSnapshot) {
let percent = disk.used_percent.unwrap_or_default();
ui.horizontal(|ui| {
ui.label(&disk.mount);
ui.add(egui::ProgressBar::new(f32::from(percent) / 100.0).text(format!("{percent}%")));
ui.label(format!(
"{} 已用 {} / {},可用 {}",
disk.filesystem, disk.used, disk.size, disk.available
));
});
}
fn info_chip(ui: &mut egui::Ui, label: &str, value: &str) {
ui.group(|ui| {
ui.vertical(|ui| {
ui.small(label);
ui.label(value);
});
});
}
fn value_or_dash(value: &str) -> &str {
if value.trim().is_empty() { "-" } else { value }
}
fn level_color(level: HealthLevel) -> egui::Color32 {
match level {
HealthLevel::Ok => egui::Color32::from_rgb(38, 166, 91),
HealthLevel::Warning => egui::Color32::from_rgb(214, 137, 16),
HealthLevel::Critical => egui::Color32::from_rgb(205, 66, 70),
HealthLevel::Unknown => egui::Color32::from_rgb(120, 126, 136),
}
}
fn warning_color() -> egui::Color32 {
egui::Color32::from_rgb(205, 66, 70)
}

View File

@@ -0,0 +1,128 @@
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
use eframe::egui::{FontData, FontDefinitions, FontFamily};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CjkFontCandidate {
pub path: PathBuf,
pub index: u32,
}
pub fn install_cjk_font(ctx: &eframe::egui::Context) -> Option<CjkFontCandidate> {
let candidate = find_cjk_font_candidate()?;
let bytes = std::fs::read(&candidate.path).ok()?;
let mut font_data = FontData::from_owned(bytes);
font_data.index = candidate.index;
let mut definitions = FontDefinitions::default();
definitions
.font_data
.insert("genarrative-cjk".to_owned(), Arc::new(font_data));
// 中文注释:作为 fallback 注入,保留 egui 默认拉丁/图标字体,同时补齐中文 glyph。
for family in [FontFamily::Proportional, FontFamily::Monospace] {
definitions
.families
.entry(family)
.or_default()
.push("genarrative-cjk".to_owned());
}
ctx.set_fonts(definitions);
Some(candidate)
}
pub fn find_cjk_font_candidate() -> Option<CjkFontCandidate> {
if let Ok(path) = std::env::var("GENARRATIVE_SERVER_PANEL_CJK_FONT") {
if let Some(candidate) = parse_font_spec(&path) {
return Some(candidate);
}
}
const KNOWN_PATHS: &[(&str, u32)] = &[
("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", 2),
("/usr/share/fonts/opentype/noto/NotoSansCJK-Medium.ttc", 2),
("/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", 0),
("/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", 0),
(
"/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
0,
),
(
"/home/dsk/.local/share/fonts/genarrative-cjk/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
0,
),
];
for (path, index) in KNOWN_PATHS {
if Path::new(path).is_file() {
return Some(CjkFontCandidate {
path: PathBuf::from(path),
index: *index,
});
}
}
for family in [
"Noto Sans CJK SC",
"WenQuanYi Zen Hei",
"Droid Sans Fallback",
] {
if let Some(candidate) = find_with_fc_match(family) {
return Some(candidate);
}
}
None
}
fn parse_font_spec(raw: &str) -> Option<CjkFontCandidate> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
let (path, index) = trimmed
.rsplit_once('|')
.and_then(|(path, index)| Some((path, index.parse().ok()?)))
.unwrap_or((trimmed, 0));
let path = PathBuf::from(path);
path.is_file().then_some(CjkFontCandidate { path, index })
}
fn find_with_fc_match(family: &str) -> Option<CjkFontCandidate> {
let output = Command::new("fc-match")
.arg("-f")
.arg("%{file}|%{index}\n")
.arg(family)
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
stdout.lines().find_map(parse_font_spec)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_font_path_with_index() {
let candidate = parse_font_spec("/tmp/missing-font.ttc|2");
assert_eq!(candidate, None);
}
#[test]
fn finds_existing_system_cjk_font() {
let candidate = find_cjk_font_candidate();
assert!(
candidate
.as_ref()
.is_some_and(|candidate| candidate.path.is_file()),
"expected at least one CJK font on this development host"
);
}
}

View File

@@ -0,0 +1,474 @@
use std::collections::BTreeMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum HealthLevel {
Unknown,
Ok,
Warning,
Critical,
}
impl HealthLevel {
pub fn label(self) -> &'static str {
match self {
HealthLevel::Unknown => "未知",
HealthLevel::Ok => "正常",
HealthLevel::Warning => "警告",
HealthLevel::Critical => "异常",
}
}
pub fn rank(self) -> u8 {
match self {
HealthLevel::Unknown => 1,
HealthLevel::Ok => 0,
HealthLevel::Warning => 2,
HealthLevel::Critical => 3,
}
}
}
#[derive(Debug, Clone)]
pub struct ServerHealthReport {
pub status: HealthLevel,
pub checked_at: String,
pub host: HostSnapshot,
pub hardware: HardwareSnapshot,
pub services: Vec<ServiceSnapshot>,
pub probes: Vec<ProbeSnapshot>,
pub health_patrol: Option<HealthPatrolSnapshot>,
pub raw_output: String,
}
#[derive(Debug, Clone, Default)]
pub struct HostSnapshot {
pub hostname: String,
pub kernel: String,
pub uptime: String,
}
#[derive(Debug, Clone, Default)]
pub struct HardwareSnapshot {
pub cpu_model: String,
pub cpu_cores: String,
pub load_average: String,
pub memory: MemorySnapshot,
pub swap: MemorySnapshot,
pub disks: Vec<DiskSnapshot>,
pub sensors: Vec<String>,
}
#[derive(Debug, Clone, Default)]
pub struct MemorySnapshot {
pub total: String,
pub used: String,
pub free: String,
pub available: String,
pub used_percent: Option<u8>,
}
#[derive(Debug, Clone, Default)]
pub struct DiskSnapshot {
pub mount: String,
pub filesystem: String,
pub size: String,
pub used: String,
pub available: String,
pub used_percent: Option<u8>,
}
#[derive(Debug, Clone)]
pub struct ServiceSnapshot {
pub name: String,
pub active: String,
pub sub: String,
pub unit_file: String,
pub level: HealthLevel,
}
#[derive(Debug, Clone)]
pub struct ProbeSnapshot {
pub name: String,
pub target: String,
pub http_code: String,
pub elapsed_ms: Option<u64>,
pub level: HealthLevel,
}
#[derive(Debug, Clone)]
pub struct HealthPatrolSnapshot {
pub status: String,
pub checked_at: String,
pub summary: String,
pub level: HealthLevel,
}
pub fn parse_health_report(raw_output: &str) -> ServerHealthReport {
let mut sections: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut current = String::new();
for line in raw_output.lines() {
if let Some(name) = parse_section_marker(line) {
current = name.to_owned();
sections.entry(current.clone()).or_default();
} else if !current.is_empty() {
sections
.entry(current.clone())
.or_default()
.push(line.to_owned());
}
}
let mut report = ServerHealthReport {
status: HealthLevel::Unknown,
checked_at: section_value(&sections, "checked_at").unwrap_or_default(),
host: parse_host(&sections),
hardware: parse_hardware(&sections),
services: parse_services(&sections),
probes: parse_probes(&sections),
health_patrol: parse_health_patrol(&sections),
raw_output: raw_output.to_owned(),
};
report.status = summarize_report(&report);
report
}
pub fn summarize_report(report: &ServerHealthReport) -> HealthLevel {
let mut status = HealthLevel::Ok;
for level in report
.services
.iter()
.map(|service| service.level)
.chain(report.probes.iter().map(|probe| probe.level))
.chain(report.health_patrol.iter().map(|patrol| patrol.level))
{
if level.rank() > status.rank() {
status = level;
}
}
if let Some(used_percent) = report.hardware.memory.used_percent {
let memory_level = if used_percent >= 95 {
HealthLevel::Critical
} else if used_percent >= 85 {
HealthLevel::Warning
} else {
HealthLevel::Ok
};
if memory_level.rank() > status.rank() {
status = memory_level;
}
}
for disk in &report.hardware.disks {
let disk_level = match disk.used_percent {
Some(percent) if percent >= 95 => HealthLevel::Critical,
Some(percent) if percent >= 85 => HealthLevel::Warning,
_ => HealthLevel::Ok,
};
if disk_level.rank() > status.rank() {
status = disk_level;
}
}
status
}
fn parse_section_marker(line: &str) -> Option<&str> {
line.strip_prefix("==GENARRATIVE_PANEL:")
.and_then(|rest| rest.strip_suffix("=="))
}
fn section_value(sections: &BTreeMap<String, Vec<String>>, name: &str) -> Option<String> {
sections.get(name).and_then(|lines| {
lines
.iter()
.map(|line| line.trim())
.find(|line| !line.is_empty())
.map(str::to_owned)
})
}
fn parse_host(sections: &BTreeMap<String, Vec<String>>) -> HostSnapshot {
HostSnapshot {
hostname: section_value(sections, "hostname").unwrap_or_default(),
kernel: section_value(sections, "kernel").unwrap_or_default(),
uptime: section_value(sections, "uptime").unwrap_or_default(),
}
}
fn parse_hardware(sections: &BTreeMap<String, Vec<String>>) -> HardwareSnapshot {
HardwareSnapshot {
cpu_model: section_value(sections, "cpu_model").unwrap_or_default(),
cpu_cores: section_value(sections, "cpu_cores").unwrap_or_default(),
load_average: section_value(sections, "load_average").unwrap_or_default(),
memory: parse_memory(section_value(sections, "memory").as_deref()),
swap: parse_memory(section_value(sections, "swap").as_deref()),
disks: parse_disks(sections),
sensors: sections.get("sensors").cloned().unwrap_or_default(),
}
}
fn parse_memory(value: Option<&str>) -> MemorySnapshot {
let Some(value) = value else {
return MemorySnapshot::default();
};
let parts: Vec<&str> = value.split('|').collect();
MemorySnapshot {
total: parts.first().copied().unwrap_or_default().to_owned(),
used: parts.get(1).copied().unwrap_or_default().to_owned(),
free: parts.get(2).copied().unwrap_or_default().to_owned(),
available: parts.get(3).copied().unwrap_or_default().to_owned(),
used_percent: parts.get(4).and_then(|value| parse_percent(value)),
}
}
fn parse_disks(sections: &BTreeMap<String, Vec<String>>) -> Vec<DiskSnapshot> {
sections
.get("disks")
.into_iter()
.flatten()
.filter_map(|line| {
let parts: Vec<&str> = line.split('|').collect();
(parts.len() >= 6).then(|| DiskSnapshot {
filesystem: parts[0].to_owned(),
size: parts[1].to_owned(),
used: parts[2].to_owned(),
available: parts[3].to_owned(),
used_percent: parse_percent(parts[4]),
mount: parts[5].to_owned(),
})
})
.collect()
}
fn parse_services(sections: &BTreeMap<String, Vec<String>>) -> Vec<ServiceSnapshot> {
sections
.get("services")
.into_iter()
.flatten()
.filter_map(|line| {
let parts: Vec<&str> = line.split('|').collect();
(parts.len() >= 4).then(|| {
let active = parts[1].to_owned();
let sub = parts[2].to_owned();
let level = if active == "active" {
HealthLevel::Ok
} else if active == "unknown" || active == "inactive" {
HealthLevel::Warning
} else {
HealthLevel::Critical
};
ServiceSnapshot {
name: parts[0].to_owned(),
active,
sub,
unit_file: parts[3].to_owned(),
level,
}
})
})
.collect()
}
fn parse_probes(sections: &BTreeMap<String, Vec<String>>) -> Vec<ProbeSnapshot> {
sections
.get("probes")
.into_iter()
.flatten()
.filter_map(|line| {
let parts: Vec<&str> = line.split('|').collect();
(parts.len() >= 4).then(|| {
let http_code = parts[2].to_owned();
let elapsed_ms = parts[3].parse().ok();
let level = if http_code.starts_with('2') {
HealthLevel::Ok
} else if http_code == "000" {
HealthLevel::Critical
} else {
HealthLevel::Critical
};
ProbeSnapshot {
name: parts[0].to_owned(),
target: parts[1].to_owned(),
http_code,
elapsed_ms,
level,
}
})
})
.collect()
}
fn parse_health_patrol(sections: &BTreeMap<String, Vec<String>>) -> Option<HealthPatrolSnapshot> {
let line = section_value(sections, "health_patrol")?;
let parts: Vec<&str> = line.split('|').collect();
let status = parts.first().copied().unwrap_or_default().to_owned();
let level = match status.as_str() {
"OK" => HealthLevel::Ok,
"WARNING" => HealthLevel::Warning,
"CRITICAL" => HealthLevel::Critical,
_ => HealthLevel::Unknown,
};
Some(HealthPatrolSnapshot {
status,
checked_at: parts.get(1).copied().unwrap_or_default().to_owned(),
summary: parts.get(2).copied().unwrap_or_default().to_owned(),
level,
})
}
fn parse_percent(value: &str) -> Option<u8> {
value.trim_end_matches('%').parse().ok()
}
pub const HEALTH_SCRIPT: &str = r#"set -eu
print_section() {
printf '==GENARRATIVE_PANEL:%s==\n' "$1"
}
print_section checked_at
date -Is 2>/dev/null || date
print_section hostname
hostname 2>/dev/null || true
print_section kernel
uname -srmo 2>/dev/null || uname -a 2>/dev/null || true
print_section uptime
uptime -p 2>/dev/null || uptime 2>/dev/null || true
print_section cpu_model
awk -F: '/model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}' /proc/cpuinfo 2>/dev/null || true
print_section cpu_cores
nproc 2>/dev/null || getconf _NPROCESSORS_ONLN 2>/dev/null || true
print_section load_average
cat /proc/loadavg 2>/dev/null | awk '{print $1" "$2" "$3}' || true
print_section memory
awk '
/^MemTotal:/ {total=$2}
/^MemFree:/ {free=$2}
/^MemAvailable:/ {available=$2}
END {
if (total > 0) {
used = total - free
percent = int((used * 100 + total / 2) / total)
printf "%.1f GiB|%.1f GiB|%.1f GiB|%.1f GiB|%d%%\n", total/1048576, used/1048576, free/1048576, available/1048576, percent
}
}
' /proc/meminfo 2>/dev/null || true
print_section swap
awk '
/^SwapTotal:/ {total=$2}
/^SwapFree:/ {free=$2}
END {
if (total > 0) {
used = total - free
percent = int((used * 100 + total / 2) / total)
printf "%.1f GiB|%.1f GiB|%.1f GiB|%.1f GiB|%d%%\n", total/1048576, used/1048576, free/1048576, free/1048576, percent
} else {
print "0 GiB|0 GiB|0 GiB|0 GiB|0%"
}
}
' /proc/meminfo 2>/dev/null || true
print_section disks
for mount in / /var /opt /stdb /data; do
if [ -e "$mount" ]; then
df -hP "$mount" 2>/dev/null | awk 'NR == 2 {print $1"|"$2"|"$3"|"$4"|"$5"|"$6}'
fi
done | awk '!seen[$6]++'
print_section sensors
if command -v sensors >/dev/null 2>&1; then
sensors 2>/dev/null | sed -n '1,20p'
else
echo "sensors 未安装"
fi
print_section services
for service in genarrative-api.service spacetimedb.service nginx.service genarrative-health-patrol.timer genarrative-database-backup.timer; do
active=$(systemctl is-active "$service" 2>/dev/null || true)
sub=$(systemctl show "$service" -p SubState --value 2>/dev/null || true)
unit_file=$(systemctl show "$service" -p UnitFileState --value 2>/dev/null || true)
[ -n "$active" ] || active="unknown"
[ -n "$sub" ] || sub="unknown"
[ -n "$unit_file" ] || unit_file="unknown"
printf '%s|%s|%s|%s\n' "$service" "$active" "$sub" "$unit_file"
done
print_section probes
probe() {
name="$1"
url="$2"
tmp=$(mktemp)
code=$(curl -fsS -m 5 -o /dev/null -w '%{http_code}|%{time_total}' "$url" 2>"$tmp" || true)
if [ -z "$code" ]; then
code="000|0"
fi
http_code=${code%%|*}
time_total=${code#*|}
elapsed_ms=$(awk "BEGIN {printf \"%d\", $time_total * 1000}")
printf '%s|%s|%s|%s\n' "$name" "$url" "$http_code" "$elapsed_ms"
rm -f "$tmp"
}
probe "api:/healthz" "http://127.0.0.1:8082/healthz"
probe "api:/readyz" "http://127.0.0.1:8082/readyz"
probe "spacetimedb:/v1/ping" "http://127.0.0.1:3101/v1/ping"
probe "public:/api/creation-entry/config" "http://127.0.0.1:8082/api/creation-entry/config"
probe "public:/api/runtime/puzzle/gallery" "http://127.0.0.1:8082/api/runtime/puzzle/gallery"
print_section health_patrol
if [ -r /var/lib/genarrative/health-patrol/status.json ]; then
node -e '
const fs = require("fs");
const payload = JSON.parse(fs.readFileSync("/var/lib/genarrative/health-patrol/status.json", "utf8"));
const status = payload.status || "UNKNOWN";
const checkedAt = payload.checkedAt || "";
const checks = Array.isArray(payload.checks) ? payload.checks : [];
const summary = checks.filter((check) => check.status && check.status !== "OK").slice(0, 3).map((check) => `${check.name}:${check.status}`).join(",");
console.log(`${status}|${checkedAt}|${summary}`);
' 2>/dev/null || echo "UNKNOWN||状态文件解析失败"
else
echo "UNKNOWN||未找到 /var/lib/genarrative/health-patrol/status.json"
fi
"#;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_report_sections() {
let report = parse_health_report(
r#"==GENARRATIVE_PANEL:checked_at==
2026-06-11T12:00:00+08:00
==GENARRATIVE_PANEL:hostname==
release
==GENARRATIVE_PANEL:memory==
2.0 GiB|1.0 GiB|1.0 GiB|1.0 GiB|50%
==GENARRATIVE_PANEL:disks==
/dev/sda1|40G|20G|20G|50%|/
==GENARRATIVE_PANEL:services==
genarrative-api.service|active|running|enabled
spacetimedb.service|failed|failed|enabled
==GENARRATIVE_PANEL:probes==
api:/readyz|http://127.0.0.1:8082/readyz|200|18
==GENARRATIVE_PANEL:health_patrol==
WARNING|2026-06-11T11:59:00Z|journal:WARNING
"#,
);
assert_eq!(report.host.hostname, "release");
assert_eq!(report.hardware.memory.used_percent, Some(50));
assert_eq!(report.services.len(), 2);
assert_eq!(report.probes[0].http_code, "200");
assert_eq!(report.status, HealthLevel::Critical);
}
}

View File

@@ -0,0 +1,5 @@
pub mod app;
pub mod fonts;
pub mod health;
pub mod remote;
pub mod ssh_config;

View File

@@ -0,0 +1,21 @@
use eframe::egui;
use server_manager_panel::app::ServerManagerApp;
use server_manager_panel::fonts::install_cjk_font;
fn main() -> eframe::Result<()> {
let native_options = eframe::NativeOptions {
viewport: egui::ViewportBuilder::default()
.with_inner_size([1180.0, 760.0])
.with_min_inner_size([920.0, 620.0]),
..Default::default()
};
eframe::run_native(
"Genarrative 服务器管理面板",
native_options,
Box::new(|cc| {
install_cjk_font(&cc.egui_ctx);
Ok(Box::new(ServerManagerApp::default()))
}),
)
}

View File

@@ -0,0 +1,231 @@
use std::io::Write;
use std::process::{Command, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use crate::health::{HEALTH_SCRIPT, ServerHealthReport, parse_health_report};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServiceAction {
Start,
Stop,
Restart,
}
impl ServiceAction {
pub fn as_systemctl_arg(self) -> &'static str {
match self {
ServiceAction::Start => "start",
ServiceAction::Stop => "stop",
ServiceAction::Restart => "restart",
}
}
pub fn label(self) -> &'static str {
match self {
ServiceAction::Start => "启动",
ServiceAction::Stop => "关闭",
ServiceAction::Restart => "重启",
}
}
}
#[derive(Debug, Clone)]
pub struct RemoteCommandResult {
pub success: bool,
pub summary: String,
pub stdout: String,
pub stderr: String,
}
#[derive(Debug)]
pub enum RemoteEvent {
Health {
alias: String,
result: Result<ServerHealthReport, String>,
},
ServiceAction {
alias: String,
service: String,
action: ServiceAction,
result: RemoteCommandResult,
},
}
pub type RemoteSender = mpsc::Sender<RemoteEvent>;
pub type RemoteReceiver = mpsc::Receiver<RemoteEvent>;
pub fn channel() -> (RemoteSender, RemoteReceiver) {
mpsc::channel()
}
pub fn spawn_health_check(alias: String, tx: RemoteSender) {
thread::spawn(move || {
let result =
run_ssh_script(&alias, HEALTH_SCRIPT, Duration::from_secs(20)).and_then(|output| {
if output.success {
Ok(parse_health_report(&output.stdout))
} else {
Err(format_remote_error(&output))
}
});
let _ = tx.send(RemoteEvent::Health { alias, result });
});
}
pub fn spawn_service_action(
alias: String,
service: String,
action: ServiceAction,
tx: RemoteSender,
) {
thread::spawn(move || {
let result = if is_safe_service_name(&service) {
run_ssh_script(
&alias,
&build_service_action_script(&service, action),
Duration::from_secs(20),
)
.unwrap_or_else(|error| RemoteCommandResult {
success: false,
summary: error,
stdout: String::new(),
stderr: String::new(),
})
} else {
RemoteCommandResult {
success: false,
summary: "服务名包含不允许的字符".to_owned(),
stdout: String::new(),
stderr: String::new(),
}
};
let _ = tx.send(RemoteEvent::ServiceAction {
alias,
service,
action,
result,
});
});
}
pub fn is_safe_service_name(service: &str) -> bool {
!service.is_empty()
&& service.len() <= 128
&& service.bytes().all(|byte| {
matches!(
byte,
b'a'..=b'z'
| b'A'..=b'Z'
| b'0'..=b'9'
| b'.'
| b'_'
| b'-'
| b'@'
| b':'
)
})
}
fn build_service_action_script(service: &str, action: ServiceAction) -> String {
format!(
r#"set -eu
service='{service}'
action='{action}'
if [ "$(id -u)" = "0" ]; then
systemctl "$action" "$service"
else
sudo -n systemctl "$action" "$service"
fi
systemctl is-active "$service" || true
systemctl status "$service" --no-pager -l -n 12 || true
"#,
service = service,
action = action.as_systemctl_arg()
)
}
fn run_ssh_script(
alias: &str,
script: &str,
timeout: Duration,
) -> Result<RemoteCommandResult, String> {
let started = Instant::now();
let mut child = Command::new("ssh")
.arg("-o")
.arg("BatchMode=yes")
.arg("-o")
.arg("ConnectTimeout=8")
.arg(alias)
.arg("sh")
.arg("-s")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|error| format!("无法启动 ssh: {error}"))?;
{
// 中文注释:写完脚本后必须关闭 stdin让远端 `sh -s` 收到 EOF 并开始退出。
let Some(mut stdin) = child.stdin.take() else {
return Err("无法写入 ssh stdin".to_owned());
};
stdin
.write_all(script.as_bytes())
.map_err(|error| format!("写入远端脚本失败: {error}"))?;
}
loop {
match child.try_wait() {
Ok(Some(_status)) => {
let output = child
.wait_with_output()
.map_err(|error| format!("读取 ssh 输出失败: {error}"))?;
let success = output.status.success();
return Ok(RemoteCommandResult {
success,
summary: if success {
"执行成功".to_owned()
} else {
format!("ssh 退出码 {:?}", output.status.code())
},
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
});
}
Ok(None) if started.elapsed() >= timeout => {
let _ = child.kill();
let _ = child.wait();
return Err(format!("ssh 执行超过 {}", timeout.as_secs()));
}
Ok(None) => thread::sleep(Duration::from_millis(80)),
Err(error) => return Err(format!("等待 ssh 进程失败: {error}")),
}
}
}
fn format_remote_error(result: &RemoteCommandResult) -> String {
let stderr = result.stderr.trim();
let stdout = result.stdout.trim();
if !stderr.is_empty() {
format!("{}: {}", result.summary, stderr)
} else if !stdout.is_empty() {
format!("{}: {}", result.summary, stdout)
} else {
result.summary.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn allows_systemd_unit_names_only() {
assert!(is_safe_service_name("genarrative-api.service"));
assert!(is_safe_service_name("worker@1.service"));
assert!(!is_safe_service_name("api.service;rm -rf /"));
assert!(!is_safe_service_name(""));
}
}

View File

@@ -0,0 +1,143 @@
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SshAlias {
pub name: String,
pub source: PathBuf,
}
pub fn discover_ssh_aliases() -> Vec<SshAlias> {
let Some(home) = std::env::var_os("HOME") else {
return Vec::new();
};
let config_path = PathBuf::from(home).join(".ssh/config");
discover_from_file(&config_path)
}
pub fn discover_from_file(path: &Path) -> Vec<SshAlias> {
let mut visited = HashSet::new();
let mut aliases = Vec::new();
discover_inner(path, &mut visited, &mut aliases);
dedupe_aliases(aliases)
}
fn discover_inner(path: &Path, visited: &mut HashSet<PathBuf>, aliases: &mut Vec<SshAlias>) {
let Ok(canonical) = path.canonicalize() else {
return;
};
if !visited.insert(canonical.clone()) {
return;
}
let Ok(content) = fs::read_to_string(&canonical) else {
return;
};
for line in content.lines() {
let trimmed = trim_comment(line);
let mut parts = trimmed.split_whitespace();
let Some(keyword) = parts.next() else {
continue;
};
if keyword.eq_ignore_ascii_case("host") {
aliases.extend(parts.filter_map(|name| {
is_concrete_alias(name).then(|| SshAlias {
name: name.to_owned(),
source: canonical.clone(),
})
}));
} else if keyword.eq_ignore_ascii_case("include") {
for include in parts {
for include_path in expand_include_path(include, canonical.parent()) {
discover_inner(&include_path, visited, aliases);
}
}
}
}
}
fn dedupe_aliases(aliases: Vec<SshAlias>) -> Vec<SshAlias> {
let mut seen = HashSet::new();
let mut deduped = Vec::new();
for alias in aliases {
if seen.insert(alias.name.clone()) {
deduped.push(alias);
}
}
deduped
}
fn trim_comment(line: &str) -> &str {
line.split('#').next().unwrap_or("").trim()
}
fn is_concrete_alias(value: &str) -> bool {
!value.is_empty()
&& !value.starts_with('-')
&& !value.starts_with('!')
&& !value.contains('*')
&& !value.contains('?')
&& !value.contains('%')
&& !value.contains('/')
}
fn expand_include_path(raw: &str, parent: Option<&Path>) -> Vec<PathBuf> {
if raw.contains('*') || raw.contains('?') {
// 中文注释SSH Include 支持复杂 glob面板只解析普通文件避免误扫过大目录。
return Vec::new();
}
let expanded = if let Some(rest) = raw.strip_prefix("~/") {
std::env::var_os("HOME")
.map(PathBuf::from)
.map(|home| home.join(rest))
} else {
let path = PathBuf::from(raw);
if path.is_absolute() {
Some(path)
} else {
parent.map(|base| base.join(path))
}
};
expanded.into_iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn host_parser_ignores_wildcards_and_negations() {
let mut aliases = Vec::new();
let source = PathBuf::from("/tmp/config");
for line in [
"Host dev release *.internal !blocked",
"Host github.com",
"Host ?pattern",
"Host -bad",
] {
let trimmed = trim_comment(line);
let mut parts = trimmed.split_whitespace();
let keyword = parts.next().unwrap();
if keyword.eq_ignore_ascii_case("host") {
aliases.extend(parts.filter_map(|name| {
is_concrete_alias(name).then(|| SshAlias {
name: name.to_owned(),
source: source.clone(),
})
}));
}
}
let names: Vec<_> = dedupe_aliases(aliases)
.into_iter()
.map(|alias| alias.name)
.collect();
assert_eq!(names, ["dev", "release", "github.com"]);
}
#[test]
fn comment_trimming_keeps_plain_aliases() {
assert_eq!(trim_comment(" Host dev # release host "), "Host dev");
}
}

View File

@@ -51,7 +51,8 @@ pub use mapper::{
PublicWorkGalleryEntryRecord, PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord,
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleClearActionRequest, PuzzleClearActionResponse, PuzzleClearActionType,
PuzzleClearBoardCell, PuzzleClearBoardSnapshot, PuzzleClearCardAsset, PuzzleClearDraftResponse,
PuzzleClearGenerationStatus, PuzzleClearImageAsset, PuzzleClearNextLevelRequest,
@@ -348,7 +349,7 @@ type ProcedureResultSender<T> =
type ReducerResultSender = Arc<Mutex<Option<oneshot::Sender<Result<(), SpacetimeClientError>>>>>;
struct SpacetimeConnectionPool {
slots: Vec<tokio::sync::Mutex<PooledConnectionSlot>>,
slots: Vec<PooledConnectionSlot>,
permits: Arc<Semaphore>,
}
@@ -371,8 +372,10 @@ impl SpacetimeStageError {
}
struct PooledConnectionSlot {
connection: Option<PooledConnection>,
in_use: bool,
// 槽位占用标记独立成原子量:抢占/复位不依赖锁,租约 Drop 兜底可以同步完成。
in_use: AtomicBool,
// in_use=true 的持有者独占本槽连接,正常情况下锁上不会有竞争。
connection: tokio::sync::Mutex<Option<PooledConnection>>,
}
struct PooledConnection {
@@ -385,9 +388,28 @@ struct PooledConnection {
struct PooledConnectionLease {
slot_index: usize,
connection: Option<PooledConnection>,
pool: Arc<SpacetimeConnectionPool>,
_permit: OwnedSemaphorePermit,
}
impl Drop for PooledConnectionLease {
// 租约 Drop 兜底:请求 future 被取消(如客户端断开导致 handler 被丢弃)时,
// 也必须归还连接并复位槽位,否则槽位会永久停留在 in_use 状态、连接池逐渐耗尽。
fn drop(&mut self) {
let slot = &self.pool.slots[self.slot_index];
if let Some(connection) = self.connection.take() {
if !connection.is_broken() {
if let Ok(mut slot_connection) = slot.connection.try_lock() {
*slot_connection = Some(connection);
}
// try_lock 理论上不会失败in_use 持有者独占);万一失败只丢弃连接,不丢槽位。
}
}
slot.in_use.store(false, Ordering::Release);
// _permit 随 Drop 自动归还信号量。
}
}
impl SpacetimeClient {
pub fn new(config: SpacetimeClientConfig) -> Self {
let pool_size = config.pool_size.max(1) as usize;
@@ -400,11 +422,9 @@ impl SpacetimeClient {
..config
};
let slots = (0..pool_size)
.map(|_| {
tokio::sync::Mutex::new(PooledConnectionSlot {
connection: None,
in_use: false,
})
.map(|_| PooledConnectionSlot {
in_use: AtomicBool::new(false),
connection: tokio::sync::Mutex::new(None),
})
.collect::<Vec<_>>();
let pool = Arc::new(SpacetimeConnectionPool {
@@ -678,42 +698,49 @@ impl SpacetimeClient {
)
})?;
loop {
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
if let Ok(mut slot_guard) = slot.try_lock() {
if slot_guard.in_use {
continue;
}
let reusable_connection = slot_guard
.connection
.take()
.filter(|connection| !connection.is_broken());
slot_guard.in_use = true;
drop(slot_guard);
// 持有 permit 即保证最多 pool_size 个并发持有者,必然能抢到一个空闲槽位;
// CAS 抢占后立即构造租约,后续任何失败/取消都由租约 Drop 兜底复位槽位。
let slot_index = self
.pool
.slots
.iter()
.position(|slot| {
slot.in_use
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
})
.ok_or_else(|| {
SpacetimeStageError::new(
SpacetimeClientStage::PoolAcquire,
SpacetimeClientError::Runtime(
"SpacetimeDB 连接池 permit 与槽位状态不一致".to_string(),
),
)
})?;
let connection = if let Some(connection) = reusable_connection {
connection
} else {
match self.build_pooled_connection(operation_timeout).await {
Ok(connection) => connection,
Err(error) => {
let mut slot_guard = self.pool.slots[slot_index].lock().await;
slot_guard.in_use = false;
return Err(error);
}
}
};
let mut lease = PooledConnectionLease {
slot_index,
connection: None,
pool: self.pool.clone(),
_permit: permit,
};
return Ok(PooledConnectionLease {
slot_index,
connection: Some(connection),
_permit: permit,
});
}
}
let reusable_connection = self.pool.slots[slot_index]
.connection
.lock()
.await
.take()
.filter(|connection| !connection.is_broken());
tokio::task::yield_now().await;
}
let connection = if let Some(connection) = reusable_connection {
connection
} else {
// 建连失败时直接返回错误,槽位与 permit 由 lease Drop 自动归还。
self.build_pooled_connection(operation_timeout).await?
};
lease.connection = Some(connection);
Ok(lease)
}
async fn build_pooled_connection(
@@ -911,18 +938,10 @@ impl SpacetimeClient {
Ok(subscription)
}
async fn release_connection(&self, mut lease: PooledConnectionLease) {
let mut slot_guard = self.pool.slots[lease.slot_index].lock().await;
slot_guard.in_use = false;
let Some(connection) = lease.connection.take() else {
slot_guard.connection = None;
return;
};
if connection.is_broken() {
slot_guard.connection = None;
} else {
slot_guard.connection = Some(connection);
}
async fn release_connection(&self, lease: PooledConnectionLease) {
// 显式归还与“请求被取消”的隐式归还共用同一套租约 Drop 兜底逻辑,
// 保证任何路径下槽位与 permit 都会复位,连接池不会被慢慢泄漏占满。
drop(lease);
}
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
@@ -1127,4 +1146,78 @@ mod tests {
SpacetimeClientError::Runtime(_)
));
}
fn test_client(pool_size: u32, procedure_timeout: Duration) -> SpacetimeClient {
SpacetimeClient::new(SpacetimeClientConfig {
// 指向本机不可达端口:测试只验证连接池行为,不需要真实 SpacetimeDB。
server_url: "http://127.0.0.1:9".to_string(),
database: "pool-test".to_string(),
token: None,
pool_size,
procedure_timeout,
})
}
/// 复现线上故障机制:修复前请求 future 被取消时租约不会归还,槽位永久停留在 in_use
/// 后续 acquire 拿着 permit 空转挂死。修复后租约 Drop 必须同时复位槽位与 permit。
#[tokio::test]
async fn dropped_lease_releases_slot_and_permit() {
let client = test_client(1, Duration::from_millis(200));
let permit = client
.pool
.permits
.clone()
.acquire_owned()
.await
.expect("permit should acquire");
client.pool.slots[0].in_use.store(true, Ordering::SeqCst);
assert_eq!(client.pool.permits.available_permits(), 0);
// 模拟请求被取消:租约未经过 release_connection 直接被 Drop。
let lease = PooledConnectionLease {
slot_index: 0,
connection: None,
pool: client.pool.clone(),
_permit: permit,
};
drop(lease);
assert!(
!client.pool.slots[0].in_use.load(Ordering::SeqCst),
"租约 Drop 后槽位必须复位,否则连接池会被泄漏占满"
);
assert_eq!(
client.pool.permits.available_permits(),
1,
"租约 Drop 后 permit 必须归还"
);
}
/// 池内 permit 全部被占用持续在途请求acquire 必须在超时窗口内返回
/// pool_acquire 超时,而不是无限等待。
#[tokio::test]
async fn acquire_times_out_at_pool_acquire_when_pool_is_busy() {
let client = test_client(1, Duration::from_millis(200));
let _held_permit = client
.pool
.permits
.clone()
.acquire_owned()
.await
.expect("permit should acquire");
let result = tokio::time::timeout(
Duration::from_secs(5),
client.acquire_connection_with_timeout(Duration::from_millis(200)),
)
.await
.expect("acquire 必须在超时窗口内返回,而不是空转挂死");
let error = match result {
Ok(_) => panic!("池占满时应返回 pool_acquire 超时"),
Err(error) => error,
};
assert_eq!(error.stage, SpacetimeClientStage::PoolAcquire);
assert!(matches!(error.error, SpacetimeClientError::Timeout));
}
}

View File

@@ -101,7 +101,8 @@ pub use self::puzzle::{
PuzzleAgentMessageFinalizeRecordInput, PuzzleAgentMessageRecord,
PuzzleAgentMessageSubmitRecordInput, PuzzleAgentSessionCreateRecordInput,
PuzzleAgentSessionRecord, PuzzleAgentSuggestedActionRecord, PuzzleAnchorItemRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleAnchorPackRecord, PuzzleAudioAssetRecord, PuzzleBackgroundCompileTaskClaimRecordInput,
PuzzleBackgroundCompileTaskReleaseRecordInput, PuzzleBoardRecord, PuzzleCellPositionRecord,
PuzzleCreatorIntentRecord, PuzzleDraftCompileFailureRecordInput, PuzzleDraftLevelRecord,
PuzzleFormDraftRecord, PuzzleFormDraftSaveRecordInput, PuzzleGalleryCardRecord,
PuzzleGeneratedImageCandidateRecord, PuzzleGeneratedImagesSaveRecordInput,
@@ -199,10 +200,10 @@ pub(crate) use self::public_work::{
map_public_work_gallery_entry, map_public_work_gallery_entry_to_detail_entry,
};
pub(crate) use self::puzzle::{
map_puzzle_agent_session_procedure_result, map_puzzle_gallery_card_view_row,
map_puzzle_run_procedure_result, map_puzzle_work_procedure_result,
map_puzzle_works_procedure_result, map_runtime_profile_wallet_ledger_source_type_back,
parse_puzzle_agent_stage_record,
map_puzzle_agent_session_procedure_result, map_puzzle_background_compile_task_procedure_result,
map_puzzle_gallery_card_view_row, map_puzzle_run_procedure_result,
map_puzzle_work_procedure_result, map_puzzle_works_procedure_result,
map_runtime_profile_wallet_ledger_source_type_back, parse_puzzle_agent_stage_record,
};
pub(crate) use self::puzzle_clear::{
map_puzzle_clear_agent_session_procedure_result, map_puzzle_clear_gallery_card_view_row,

View File

@@ -13,6 +13,16 @@ pub(crate) fn map_puzzle_agent_session_procedure_result(
Ok(map_puzzle_agent_session_snapshot(session))
}
pub(crate) fn map_puzzle_background_compile_task_procedure_result(
result: PuzzleBackgroundCompileTaskProcedureResult,
) -> Result<bool, SpacetimeClientError> {
if !result.ok {
return Err(SpacetimeClientError::procedure_failed(result.error_message));
}
Ok(result.claimed)
}
pub(crate) fn map_puzzle_work_procedure_result(
result: PuzzleWorkProcedureResult,
) -> Result<PuzzleWorkProfileRecord, SpacetimeClientError> {
@@ -614,6 +624,23 @@ pub struct PuzzleFormDraftSaveRecordInput {
pub saved_at_micros: i64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PuzzleBackgroundCompileTaskClaimRecordInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub claimed_at_micros: i64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PuzzleBackgroundCompileTaskReleaseRecordInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PuzzleAgentMessageSubmitRecordInput {
pub session_id: String,

View File

@@ -204,6 +204,7 @@ pub mod chapter_progression_table;
pub mod chapter_progression_type;
pub mod checkpoint_wooden_fish_run_procedure;
pub mod claim_profile_task_reward_and_return_procedure;
pub mod claim_puzzle_background_compile_task_procedure;
pub mod claim_puzzle_work_point_incentive_procedure;
pub mod clear_database_migration_import_chunks_procedure;
pub mod clear_platform_browse_history_and_return_procedure;
@@ -628,6 +629,11 @@ pub mod puzzle_anchor_item_type;
pub mod puzzle_anchor_pack_type;
pub mod puzzle_anchor_status_type;
pub mod puzzle_audio_asset_type;
pub mod puzzle_background_compile_task_claim_input_type;
pub mod puzzle_background_compile_task_procedure_result_type;
pub mod puzzle_background_compile_task_release_input_type;
pub mod puzzle_background_compile_task_row_type;
pub mod puzzle_background_compile_task_table;
pub mod puzzle_board_snapshot_type;
pub mod puzzle_cell_position_type;
pub mod puzzle_clear_agent_session_create_input_type;
@@ -766,6 +772,7 @@ pub mod redeem_profile_reward_code_procedure;
pub mod refresh_session_table;
pub mod refresh_session_type;
pub mod refund_profile_wallet_points_and_return_procedure;
pub mod release_puzzle_background_compile_task_procedure;
pub mod remix_big_fish_work_procedure;
pub mod remix_custom_world_profile_procedure;
pub mod remix_puzzle_work_procedure;
@@ -1312,6 +1319,7 @@ pub use chapter_progression_table::*;
pub use chapter_progression_type::ChapterProgression;
pub use checkpoint_wooden_fish_run_procedure::checkpoint_wooden_fish_run;
pub use claim_profile_task_reward_and_return_procedure::claim_profile_task_reward_and_return;
pub use claim_puzzle_background_compile_task_procedure::claim_puzzle_background_compile_task;
pub use claim_puzzle_work_point_incentive_procedure::claim_puzzle_work_point_incentive;
pub use clear_database_migration_import_chunks_procedure::clear_database_migration_import_chunks;
pub use clear_platform_browse_history_and_return_procedure::clear_platform_browse_history_and_return;
@@ -1736,6 +1744,11 @@ pub use puzzle_anchor_item_type::PuzzleAnchorItem;
pub use puzzle_anchor_pack_type::PuzzleAnchorPack;
pub use puzzle_anchor_status_type::PuzzleAnchorStatus;
pub use puzzle_audio_asset_type::PuzzleAudioAsset;
pub use puzzle_background_compile_task_claim_input_type::PuzzleBackgroundCompileTaskClaimInput;
pub use puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult;
pub use puzzle_background_compile_task_release_input_type::PuzzleBackgroundCompileTaskReleaseInput;
pub use puzzle_background_compile_task_row_type::PuzzleBackgroundCompileTaskRow;
pub use puzzle_background_compile_task_table::*;
pub use puzzle_board_snapshot_type::PuzzleBoardSnapshot;
pub use puzzle_cell_position_type::PuzzleCellPosition;
pub use puzzle_clear_agent_session_create_input_type::PuzzleClearAgentSessionCreateInput;
@@ -1874,6 +1887,7 @@ pub use redeem_profile_reward_code_procedure::redeem_profile_reward_code;
pub use refresh_session_table::*;
pub use refresh_session_type::RefreshSession;
pub use refund_profile_wallet_points_and_return_procedure::refund_profile_wallet_points_and_return;
pub use release_puzzle_background_compile_task_procedure::release_puzzle_background_compile_task;
pub use remix_big_fish_work_procedure::remix_big_fish_work;
pub use remix_custom_world_profile_procedure::remix_custom_world_profile;
pub use remix_puzzle_work_procedure::remix_puzzle_work;
@@ -2569,6 +2583,7 @@ pub struct DbUpdate {
public_work_play_daily_stat: __sdk::TableUpdate<PublicWorkPlayDailyStat>,
puzzle_agent_message: __sdk::TableUpdate<PuzzleAgentMessageRow>,
puzzle_agent_session: __sdk::TableUpdate<PuzzleAgentSessionRow>,
puzzle_background_compile_task: __sdk::TableUpdate<PuzzleBackgroundCompileTaskRow>,
puzzle_clear_agent_session: __sdk::TableUpdate<PuzzleClearAgentSessionRow>,
puzzle_clear_event: __sdk::TableUpdate<PuzzleClearEventRow>,
puzzle_clear_gallery_card_view: __sdk::TableUpdate<PuzzleClearGalleryCardViewRow>,
@@ -2854,6 +2869,11 @@ impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate {
"puzzle_agent_session" => db_update.puzzle_agent_session.append(
puzzle_agent_session_table::parse_table_update(table_update)?,
),
"puzzle_background_compile_task" => {
db_update.puzzle_background_compile_task.append(
puzzle_background_compile_task_table::parse_table_update(table_update)?,
)
}
"puzzle_clear_agent_session" => db_update.puzzle_clear_agent_session.append(
puzzle_clear_agent_session_table::parse_table_update(table_update)?,
),
@@ -3373,6 +3393,12 @@ impl __sdk::DbUpdate for DbUpdate {
&self.puzzle_agent_session,
)
.with_updates_by_pk(|row| &row.session_id);
diff.puzzle_background_compile_task = cache
.apply_diff_to_table::<PuzzleBackgroundCompileTaskRow>(
"puzzle_background_compile_task",
&self.puzzle_background_compile_task,
)
.with_updates_by_pk(|row| &row.task_id);
diff.puzzle_clear_agent_session = cache
.apply_diff_to_table::<PuzzleClearAgentSessionRow>(
"puzzle_clear_agent_session",
@@ -3828,6 +3854,9 @@ impl __sdk::DbUpdate for DbUpdate {
"puzzle_agent_session" => db_update
.puzzle_agent_session
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
"puzzle_background_compile_task" => db_update
.puzzle_background_compile_task
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
"puzzle_clear_agent_session" => db_update
.puzzle_clear_agent_session
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
@@ -4192,6 +4221,9 @@ impl __sdk::DbUpdate for DbUpdate {
"puzzle_agent_session" => db_update
.puzzle_agent_session
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
"puzzle_background_compile_task" => db_update
.puzzle_background_compile_task
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
"puzzle_clear_agent_session" => db_update
.puzzle_clear_agent_session
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
@@ -4410,6 +4442,7 @@ pub struct AppliedDiff<'r> {
public_work_play_daily_stat: __sdk::TableAppliedDiff<'r, PublicWorkPlayDailyStat>,
puzzle_agent_message: __sdk::TableAppliedDiff<'r, PuzzleAgentMessageRow>,
puzzle_agent_session: __sdk::TableAppliedDiff<'r, PuzzleAgentSessionRow>,
puzzle_background_compile_task: __sdk::TableAppliedDiff<'r, PuzzleBackgroundCompileTaskRow>,
puzzle_clear_agent_session: __sdk::TableAppliedDiff<'r, PuzzleClearAgentSessionRow>,
puzzle_clear_event: __sdk::TableAppliedDiff<'r, PuzzleClearEventRow>,
puzzle_clear_gallery_card_view: __sdk::TableAppliedDiff<'r, PuzzleClearGalleryCardViewRow>,
@@ -4829,6 +4862,11 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
&self.puzzle_agent_session,
event,
);
callbacks.invoke_table_row_callbacks::<PuzzleBackgroundCompileTaskRow>(
"puzzle_background_compile_task",
&self.puzzle_background_compile_task,
event,
);
callbacks.invoke_table_row_callbacks::<PuzzleClearAgentSessionRow>(
"puzzle_clear_agent_session",
&self.puzzle_clear_agent_session,
@@ -5766,6 +5804,7 @@ impl __sdk::SpacetimeModule for RemoteModule {
public_work_play_daily_stat_table::register_table(client_cache);
puzzle_agent_message_table::register_table(client_cache);
puzzle_agent_session_table::register_table(client_cache);
puzzle_background_compile_task_table::register_table(client_cache);
puzzle_clear_agent_session_table::register_table(client_cache);
puzzle_clear_event_table::register_table(client_cache);
puzzle_clear_gallery_card_view_table::register_table(client_cache);
@@ -5885,6 +5924,7 @@ impl __sdk::SpacetimeModule for RemoteModule {
"public_work_play_daily_stat",
"puzzle_agent_message",
"puzzle_agent_session",
"puzzle_background_compile_task",
"puzzle_clear_agent_session",
"puzzle_clear_event",
"puzzle_clear_gallery_card_view",

View File

@@ -0,0 +1,59 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::puzzle_background_compile_task_claim_input_type::PuzzleBackgroundCompileTaskClaimInput;
use super::puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct ClaimPuzzleBackgroundCompileTaskArgs {
pub input: PuzzleBackgroundCompileTaskClaimInput,
}
impl __sdk::InModule for ClaimPuzzleBackgroundCompileTaskArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `claim_puzzle_background_compile_task`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait claim_puzzle_background_compile_task {
fn claim_puzzle_background_compile_task(&self, input: PuzzleBackgroundCompileTaskClaimInput) {
self.claim_puzzle_background_compile_task_then(input, |_, _| {});
}
fn claim_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskClaimInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
);
}
impl claim_puzzle_background_compile_task for super::RemoteProcedures {
fn claim_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskClaimInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
) {
self.imp
.invoke_procedure_with_callback::<_, PuzzleBackgroundCompileTaskProcedureResult>(
"claim_puzzle_background_compile_task",
ClaimPuzzleBackgroundCompileTaskArgs { input },
__callback,
);
}
}

View File

@@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskClaimInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub claimed_at_micros: i64,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskClaimInput {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,17 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskProcedureResult {
pub ok: bool,
pub claimed: bool,
pub error_message: Option<String>,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskProcedureResult {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,18 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskReleaseInput {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskReleaseInput {
type Module = super::RemoteModule;
}

View File

@@ -0,0 +1,66 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct PuzzleBackgroundCompileTaskRow {
pub task_id: String,
pub claim_id: String,
pub session_id: String,
pub owner_user_id: String,
pub created_at: __sdk::Timestamp,
pub updated_at: __sdk::Timestamp,
}
impl __sdk::InModule for PuzzleBackgroundCompileTaskRow {
type Module = super::RemoteModule;
}
/// Column accessor struct for the table `PuzzleBackgroundCompileTaskRow`.
///
/// Provides typed access to columns for query building.
pub struct PuzzleBackgroundCompileTaskRowCols {
pub task_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub claim_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub session_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub owner_user_id: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, String>,
pub created_at: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, __sdk::Timestamp>,
pub updated_at: __sdk::__query_builder::Col<PuzzleBackgroundCompileTaskRow, __sdk::Timestamp>,
}
impl __sdk::__query_builder::HasCols for PuzzleBackgroundCompileTaskRow {
type Cols = PuzzleBackgroundCompileTaskRowCols;
fn cols(table_name: &'static str) -> Self::Cols {
PuzzleBackgroundCompileTaskRowCols {
task_id: __sdk::__query_builder::Col::new(table_name, "task_id"),
claim_id: __sdk::__query_builder::Col::new(table_name, "claim_id"),
session_id: __sdk::__query_builder::Col::new(table_name, "session_id"),
owner_user_id: __sdk::__query_builder::Col::new(table_name, "owner_user_id"),
created_at: __sdk::__query_builder::Col::new(table_name, "created_at"),
updated_at: __sdk::__query_builder::Col::new(table_name, "updated_at"),
}
}
}
/// Indexed column accessor struct for the table `PuzzleBackgroundCompileTaskRow`.
///
/// Provides typed access to indexed columns for query building.
pub struct PuzzleBackgroundCompileTaskRowIxCols {
pub session_id: __sdk::__query_builder::IxCol<PuzzleBackgroundCompileTaskRow, String>,
pub task_id: __sdk::__query_builder::IxCol<PuzzleBackgroundCompileTaskRow, String>,
}
impl __sdk::__query_builder::HasIxCols for PuzzleBackgroundCompileTaskRow {
type IxCols = PuzzleBackgroundCompileTaskRowIxCols;
fn ix_cols(table_name: &'static str) -> Self::IxCols {
PuzzleBackgroundCompileTaskRowIxCols {
session_id: __sdk::__query_builder::IxCol::new(table_name, "session_id"),
task_id: __sdk::__query_builder::IxCol::new(table_name, "task_id"),
}
}
}
impl __sdk::__query_builder::CanBeLookupTable for PuzzleBackgroundCompileTaskRow {}

View File

@@ -0,0 +1,169 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use super::puzzle_background_compile_task_row_type::PuzzleBackgroundCompileTaskRow;
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
/// Table handle for the table `puzzle_background_compile_task`.
///
/// Obtain a handle from the [`PuzzleBackgroundCompileTaskTableAccess::puzzle_background_compile_task`] method on [`super::RemoteTables`],
/// like `ctx.db.puzzle_background_compile_task()`.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.puzzle_background_compile_task().on_insert(...)`.
pub struct PuzzleBackgroundCompileTaskTableHandle<'ctx> {
imp: __sdk::TableHandle<PuzzleBackgroundCompileTaskRow>,
ctx: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the table `puzzle_background_compile_task`.
///
/// Implemented for [`super::RemoteTables`].
pub trait PuzzleBackgroundCompileTaskTableAccess {
#[allow(non_snake_case)]
/// Obtain a [`PuzzleBackgroundCompileTaskTableHandle`], which mediates access to the table `puzzle_background_compile_task`.
fn puzzle_background_compile_task(&self) -> PuzzleBackgroundCompileTaskTableHandle<'_>;
}
impl PuzzleBackgroundCompileTaskTableAccess for super::RemoteTables {
fn puzzle_background_compile_task(&self) -> PuzzleBackgroundCompileTaskTableHandle<'_> {
PuzzleBackgroundCompileTaskTableHandle {
imp: self
.imp
.get_table::<PuzzleBackgroundCompileTaskRow>("puzzle_background_compile_task"),
ctx: std::marker::PhantomData,
}
}
}
pub struct PuzzleBackgroundCompileTaskInsertCallbackId(__sdk::CallbackId);
pub struct PuzzleBackgroundCompileTaskDeleteCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::Table for PuzzleBackgroundCompileTaskTableHandle<'ctx> {
type Row = PuzzleBackgroundCompileTaskRow;
type EventContext = super::EventContext;
fn count(&self) -> u64 {
self.imp.count()
}
fn iter(&self) -> impl Iterator<Item = PuzzleBackgroundCompileTaskRow> + '_ {
self.imp.iter()
}
type InsertCallbackId = PuzzleBackgroundCompileTaskInsertCallbackId;
fn on_insert(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> PuzzleBackgroundCompileTaskInsertCallbackId {
PuzzleBackgroundCompileTaskInsertCallbackId(self.imp.on_insert(Box::new(callback)))
}
fn remove_on_insert(&self, callback: PuzzleBackgroundCompileTaskInsertCallbackId) {
self.imp.remove_on_insert(callback.0)
}
type DeleteCallbackId = PuzzleBackgroundCompileTaskDeleteCallbackId;
fn on_delete(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> PuzzleBackgroundCompileTaskDeleteCallbackId {
PuzzleBackgroundCompileTaskDeleteCallbackId(self.imp.on_delete(Box::new(callback)))
}
fn remove_on_delete(&self, callback: PuzzleBackgroundCompileTaskDeleteCallbackId) {
self.imp.remove_on_delete(callback.0)
}
}
pub struct PuzzleBackgroundCompileTaskUpdateCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::TableWithPrimaryKey for PuzzleBackgroundCompileTaskTableHandle<'ctx> {
type UpdateCallbackId = PuzzleBackgroundCompileTaskUpdateCallbackId;
fn on_update(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static,
) -> PuzzleBackgroundCompileTaskUpdateCallbackId {
PuzzleBackgroundCompileTaskUpdateCallbackId(self.imp.on_update(Box::new(callback)))
}
fn remove_on_update(&self, callback: PuzzleBackgroundCompileTaskUpdateCallbackId) {
self.imp.remove_on_update(callback.0)
}
}
/// Access to the `task_id` unique index on the table `puzzle_background_compile_task`,
/// which allows point queries on the field of the same name
/// via the [`PuzzleBackgroundCompileTaskTaskIdUnique::find`] method.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.puzzle_background_compile_task().task_id().find(...)`.
pub struct PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> {
imp: __sdk::UniqueConstraintHandle<PuzzleBackgroundCompileTaskRow, String>,
phantom: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
impl<'ctx> PuzzleBackgroundCompileTaskTableHandle<'ctx> {
/// Get a handle on the `task_id` unique index on the table `puzzle_background_compile_task`.
pub fn task_id(&self) -> PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> {
PuzzleBackgroundCompileTaskTaskIdUnique {
imp: self.imp.get_unique_constraint::<String>("task_id"),
phantom: std::marker::PhantomData,
}
}
}
impl<'ctx> PuzzleBackgroundCompileTaskTaskIdUnique<'ctx> {
/// Find the subscribed row whose `task_id` column value is equal to `col_val`,
/// if such a row is present in the client cache.
pub fn find(&self, col_val: &String) -> Option<PuzzleBackgroundCompileTaskRow> {
self.imp.find(col_val)
}
}
#[doc(hidden)]
pub(super) fn register_table(client_cache: &mut __sdk::ClientCache<super::RemoteModule>) {
let _table = client_cache
.get_or_make_table::<PuzzleBackgroundCompileTaskRow>("puzzle_background_compile_task");
_table.add_unique_constraint::<String>("task_id", |row| &row.task_id);
}
#[doc(hidden)]
pub(super) fn parse_table_update(
raw_updates: __ws::v2::TableUpdate,
) -> __sdk::Result<__sdk::TableUpdate<PuzzleBackgroundCompileTaskRow>> {
__sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| {
__sdk::InternalError::failed_parse(
"TableUpdate<PuzzleBackgroundCompileTaskRow>",
"TableUpdate",
)
.with_cause(e)
.into()
})
}
#[allow(non_camel_case_types)]
/// Extension trait for query builder access to the table `PuzzleBackgroundCompileTaskRow`.
///
/// Implemented for [`__sdk::QueryTableAccessor`].
pub trait puzzle_background_compile_taskQueryTableAccess {
#[allow(non_snake_case)]
/// Get a query builder for the table `PuzzleBackgroundCompileTaskRow`.
fn puzzle_background_compile_task(
&self,
) -> __sdk::__query_builder::Table<PuzzleBackgroundCompileTaskRow>;
}
impl puzzle_background_compile_taskQueryTableAccess for __sdk::QueryTableAccessor {
fn puzzle_background_compile_task(
&self,
) -> __sdk::__query_builder::Table<PuzzleBackgroundCompileTaskRow> {
__sdk::__query_builder::Table::new("puzzle_background_compile_task")
}
}

View File

@@ -0,0 +1,62 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::puzzle_background_compile_task_procedure_result_type::PuzzleBackgroundCompileTaskProcedureResult;
use super::puzzle_background_compile_task_release_input_type::PuzzleBackgroundCompileTaskReleaseInput;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct ReleasePuzzleBackgroundCompileTaskArgs {
pub input: PuzzleBackgroundCompileTaskReleaseInput,
}
impl __sdk::InModule for ReleasePuzzleBackgroundCompileTaskArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `release_puzzle_background_compile_task`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait release_puzzle_background_compile_task {
fn release_puzzle_background_compile_task(
&self,
input: PuzzleBackgroundCompileTaskReleaseInput,
) {
self.release_puzzle_background_compile_task_then(input, |_, _| {});
}
fn release_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskReleaseInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
);
}
impl release_puzzle_background_compile_task for super::RemoteProcedures {
fn release_puzzle_background_compile_task_then(
&self,
input: PuzzleBackgroundCompileTaskReleaseInput,
__callback: impl FnOnce(
&super::ProcedureEventContext,
Result<PuzzleBackgroundCompileTaskProcedureResult, __sdk::InternalError>,
) + Send
+ 'static,
) {
self.imp
.invoke_procedure_with_callback::<_, PuzzleBackgroundCompileTaskProcedureResult>(
"release_puzzle_background_compile_task",
ReleasePuzzleBackgroundCompileTaskArgs { input },
__callback,
);
}
}

View File

@@ -1,8 +1,10 @@
use super::*;
use crate::mapper::*;
use crate::module_bindings::claim_puzzle_background_compile_task_procedure::claim_puzzle_background_compile_task;
use crate::module_bindings::claim_puzzle_work_point_incentive_procedure::claim_puzzle_work_point_incentive;
use crate::module_bindings::delete_puzzle_work_procedure::delete_puzzle_work;
use crate::module_bindings::record_puzzle_work_like_procedure::record_puzzle_work_like;
use crate::module_bindings::release_puzzle_background_compile_task_procedure::release_puzzle_background_compile_task;
use crate::module_bindings::remix_puzzle_work_procedure::remix_puzzle_work;
use crate::module_bindings::save_puzzle_ui_background_procedure::save_puzzle_ui_background;
@@ -194,6 +196,67 @@ impl SpacetimeClient {
.await
}
pub async fn claim_puzzle_background_compile_task(
&self,
input: PuzzleBackgroundCompileTaskClaimRecordInput,
) -> Result<bool, SpacetimeClientError> {
let procedure_input = PuzzleBackgroundCompileTaskClaimInput {
task_id: input.task_id,
claim_id: input.claim_id,
session_id: input.session_id,
owner_user_id: input.owner_user_id,
claimed_at_micros: input.claimed_at_micros,
};
self.call_after_connect(
"claim_puzzle_background_compile_task",
move |connection, sender| {
connection
.procedures()
.claim_puzzle_background_compile_task_then(
procedure_input,
move |_, result| {
let mapped = result
.map_err(SpacetimeClientError::from_sdk_error)
.and_then(map_puzzle_background_compile_task_procedure_result);
send_once(&sender, mapped);
},
);
},
)
.await
}
pub async fn release_puzzle_background_compile_task(
&self,
input: PuzzleBackgroundCompileTaskReleaseRecordInput,
) -> Result<bool, SpacetimeClientError> {
let procedure_input = PuzzleBackgroundCompileTaskReleaseInput {
task_id: input.task_id,
claim_id: input.claim_id,
session_id: input.session_id,
owner_user_id: input.owner_user_id,
};
self.call_after_connect(
"release_puzzle_background_compile_task",
move |connection, sender| {
connection
.procedures()
.release_puzzle_background_compile_task_then(
procedure_input,
move |_, result| {
let mapped = result
.map_err(SpacetimeClientError::from_sdk_error)
.and_then(map_puzzle_background_compile_task_procedure_result);
send_once(&sender, mapped);
},
);
},
)
.await
}
pub async fn save_puzzle_generated_images(
&self,
input: PuzzleGeneratedImagesSaveRecordInput,

View File

@@ -20,8 +20,8 @@ use crate::match3d::tables::{
match_3_d_work_profile, match3d_agent_message, match3d_agent_session, match3d_runtime_run,
};
use crate::puzzle::{
puzzle_agent_message, puzzle_agent_session, puzzle_event, puzzle_leaderboard_entry,
puzzle_runtime_run, puzzle_work_profile,
puzzle_agent_message, puzzle_agent_session, puzzle_background_compile_task, puzzle_event,
puzzle_leaderboard_entry, puzzle_runtime_run, puzzle_work_profile,
};
use crate::puzzle_clear::tables::{
puzzle_clear_agent_session, puzzle_clear_event, puzzle_clear_runtime_run,
@@ -229,6 +229,7 @@ macro_rules! migration_tables {
asset_entity_binding,
asset_event,
puzzle_agent_session,
puzzle_background_compile_task,
puzzle_agent_message,
puzzle_work_profile,
puzzle_event,

View File

@@ -10,14 +10,16 @@ use module_puzzle::{
PUZZLE_NEXT_LEVEL_MODE_SIMILAR_WORKS, PuzzleAgentMessageFinalizeInput, PuzzleAgentMessageKind,
PuzzleAgentMessageRole, PuzzleAgentMessageSnapshot, PuzzleAgentSessionCreateInput,
PuzzleAgentSessionGetInput, PuzzleAgentSessionProcedureResult, PuzzleAgentSessionSnapshot,
PuzzleAgentStage, PuzzleAnchorPack, PuzzleDraftCompileFailureInput, PuzzleDraftCompileInput,
PuzzleFormDraftSaveInput, PuzzleGeneratedImageCandidate, PuzzleGeneratedImagesSaveInput,
PuzzleLeaderboardEntry, PuzzleLeaderboardSubmitInput, PuzzlePublicationStatus,
PuzzlePublishInput, PuzzleRecommendedNextWork, PuzzleResultDraft, PuzzleRunDragInput,
PuzzleRunGetInput, PuzzleRunNextLevelInput, PuzzleRunPauseInput, PuzzleRunProcedureResult,
PuzzleRunPropInput, PuzzleRunSnapshot, PuzzleRunStartInput, PuzzleRunSwapInput,
PuzzleRuntimeLevelStatus, PuzzleSelectCoverImageInput, PuzzleUiBackgroundSaveInput,
PuzzleWorkDeleteInput, PuzzleWorkGetInput, PuzzleWorkLikeRecordInput as PuzzleWorkLikeInput,
PuzzleAgentStage, PuzzleAnchorPack, PuzzleBackgroundCompileTaskClaimInput,
PuzzleBackgroundCompileTaskProcedureResult, PuzzleBackgroundCompileTaskReleaseInput,
PuzzleDraftCompileFailureInput, PuzzleDraftCompileInput, PuzzleFormDraftSaveInput,
PuzzleGeneratedImageCandidate, PuzzleGeneratedImagesSaveInput, PuzzleLeaderboardEntry,
PuzzleLeaderboardSubmitInput, PuzzlePublicationStatus, PuzzlePublishInput,
PuzzleRecommendedNextWork, PuzzleResultDraft, PuzzleRunDragInput, PuzzleRunGetInput,
PuzzleRunNextLevelInput, PuzzleRunPauseInput, PuzzleRunProcedureResult, PuzzleRunPropInput,
PuzzleRunSnapshot, PuzzleRunStartInput, PuzzleRunSwapInput, PuzzleRuntimeLevelStatus,
PuzzleSelectCoverImageInput, PuzzleUiBackgroundSaveInput, PuzzleWorkDeleteInput,
PuzzleWorkGetInput, PuzzleWorkLikeRecordInput as PuzzleWorkLikeInput,
PuzzleWorkPointIncentiveClaimInput, PuzzleWorkProcedureResult, PuzzleWorkProfile,
PuzzleWorkRemixInput, PuzzleWorkUpsertInput, PuzzleWorksListInput, PuzzleWorksProcedureResult,
apply_publish_overrides_to_draft, apply_selected_candidate, build_form_draft_from_seed,
@@ -38,6 +40,7 @@ use spacetimedb::{
use crate::auth::user_account;
const PUZZLE_POINT_INCENTIVE_DEFAULT_U64: u64 = 0;
const PUZZLE_BACKGROUND_COMPILE_TASK_LEASE_MICROS: i64 = 30 * 60 * 1_000_000;
const WORK_VISIBLE_DEFAULT: bool = true;
/// 拼图 Agent session 真相表。
@@ -62,6 +65,22 @@ pub struct PuzzleAgentSessionRow {
updated_at: Timestamp,
}
/// 拼图首图后台编译活动任务表。
/// 中文注释:该表只保存跨 api-server 实例互斥 claim不表达最终生成结果。
#[spacetimedb::table(
accessor = puzzle_background_compile_task,
index(accessor = by_puzzle_background_compile_task_session_id, btree(columns = [session_id]))
)]
pub struct PuzzleBackgroundCompileTaskRow {
#[primary_key]
task_id: String,
claim_id: String,
session_id: String,
owner_user_id: String,
created_at: Timestamp,
updated_at: Timestamp,
}
/// 拼图 Agent 消息真相表。
#[spacetimedb::table(
accessor = puzzle_agent_message,
@@ -388,6 +407,44 @@ pub fn mark_puzzle_draft_generation_failed(
}
}
#[spacetimedb::procedure]
pub fn claim_puzzle_background_compile_task(
ctx: &mut ProcedureContext,
input: PuzzleBackgroundCompileTaskClaimInput,
) -> PuzzleBackgroundCompileTaskProcedureResult {
match ctx.try_with_tx(|tx| claim_puzzle_background_compile_task_tx(tx, input.clone())) {
Ok(claimed) => PuzzleBackgroundCompileTaskProcedureResult {
ok: true,
claimed,
error_message: None,
},
Err(message) => PuzzleBackgroundCompileTaskProcedureResult {
ok: false,
claimed: false,
error_message: Some(message),
},
}
}
#[spacetimedb::procedure]
pub fn release_puzzle_background_compile_task(
ctx: &mut ProcedureContext,
input: PuzzleBackgroundCompileTaskReleaseInput,
) -> PuzzleBackgroundCompileTaskProcedureResult {
match ctx.try_with_tx(|tx| release_puzzle_background_compile_task_tx(tx, input.clone())) {
Ok(released) => PuzzleBackgroundCompileTaskProcedureResult {
ok: true,
claimed: released,
error_message: None,
},
Err(message) => PuzzleBackgroundCompileTaskProcedureResult {
ok: false,
claimed: false,
error_message: Some(message),
},
}
}
/// 保存拼图入口表单草稿。
/// 中文注释:该 procedure 只更新 session 与创作中心草稿卡,不触发图片生成或发布校验。
#[spacetimedb::procedure]
@@ -1024,6 +1081,84 @@ fn compile_puzzle_agent_draft_tx(
)
}
fn claim_puzzle_background_compile_task_tx(
ctx: &TxContext,
input: PuzzleBackgroundCompileTaskClaimInput,
) -> Result<bool, String> {
let task_id = normalize_required_puzzle_task_field(&input.task_id, "拼图后台任务 ID")?;
let claim_id = normalize_required_puzzle_task_field(&input.claim_id, "拼图后台任务 claim ID")?;
let session_id = normalize_required_puzzle_task_field(&input.session_id, "拼图 session ID")?;
let owner_user_id = normalize_required_puzzle_task_field(&input.owner_user_id, "拼图用户 ID")?;
let claimed_at = Timestamp::from_micros_since_unix_epoch(input.claimed_at_micros);
get_owned_session_row(ctx, &session_id, &owner_user_id)?;
if let Some(existing) = ctx
.db
.puzzle_background_compile_task()
.task_id()
.find(&task_id)
{
if !is_stale_puzzle_background_compile_task(&existing, input.claimed_at_micros) {
return Ok(false);
}
ctx.db
.puzzle_background_compile_task()
.task_id()
.delete(&task_id);
}
ctx.db
.puzzle_background_compile_task()
.insert(PuzzleBackgroundCompileTaskRow {
task_id,
claim_id,
session_id,
owner_user_id,
created_at: claimed_at,
updated_at: claimed_at,
});
Ok(true)
}
fn release_puzzle_background_compile_task_tx(
ctx: &TxContext,
input: PuzzleBackgroundCompileTaskReleaseInput,
) -> Result<bool, String> {
let task_id = normalize_required_puzzle_task_field(&input.task_id, "拼图后台任务 ID")?;
let claim_id = normalize_required_puzzle_task_field(&input.claim_id, "拼图后台任务 claim ID")?;
let session_id = normalize_required_puzzle_task_field(&input.session_id, "拼图 session ID")?;
let owner_user_id = normalize_required_puzzle_task_field(&input.owner_user_id, "拼图用户 ID")?;
let Some(row) = ctx
.db
.puzzle_background_compile_task()
.task_id()
.find(&task_id)
else {
return Ok(false);
};
if row.session_id != session_id || row.owner_user_id != owner_user_id {
return Err("无权释放该拼图后台任务".to_string());
}
if row.claim_id != claim_id {
return Ok(false);
}
ctx.db
.puzzle_background_compile_task()
.task_id()
.delete(&task_id);
Ok(true)
}
fn is_stale_puzzle_background_compile_task(
row: &PuzzleBackgroundCompileTaskRow,
now_micros: i64,
) -> bool {
now_micros.saturating_sub(row.updated_at.to_micros_since_unix_epoch())
>= PUZZLE_BACKGROUND_COMPILE_TASK_LEASE_MICROS
}
fn mark_puzzle_draft_generation_failed_tx(
ctx: &TxContext,
input: PuzzleDraftCompileFailureInput,
@@ -2950,6 +3085,14 @@ fn get_owned_session_row(
Ok(row)
}
fn normalize_required_puzzle_task_field(value: &str, field_name: &str) -> Result<String, String> {
let normalized = value.trim();
if normalized.is_empty() {
return Err(format!("{field_name} 不能为空"));
}
Ok(normalized.to_string())
}
fn get_owned_run_row(
ctx: &TxContext,
run_id: &str,