use std::{ fmt, path::{Path, PathBuf}, sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use module_runtime::RuntimeTrackingEventInput; use serde::{Deserialize, Serialize}; use spacetime_client::{SpacetimeClient, SpacetimeClientError}; use tokio::{ fs::{self, File, OpenOptions}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, sync::{Mutex, Notify}, time::sleep, }; use tracing::{debug, warn}; use crate::config::AppConfig; const ACTIVE_FILE_NAME: &str = "active.ndjson"; const SEALED_FILE_PREFIX: &str = "sealed-"; const CORRUPT_FILE_PREFIX: &str = "corrupt-"; const SEALED_FILE_EXTENSION: &str = ".ndjson"; #[derive(Clone)] pub struct TrackingOutbox { dir: PathBuf, batch_size: usize, flush_interval: Duration, max_bytes: u64, spacetime_client: SpacetimeClient, inner: Arc>, flush_notify: Arc, } struct TrackingOutboxInner { initialized: bool, active_file: Option, active_count: usize, active_bytes: u64, total_bytes: u64, last_sealed_at: Instant, } #[derive(Debug)] pub enum TrackingOutboxEnqueueOutcome { Enqueued, Dropped { reason: &'static str }, } #[derive(Debug)] pub enum TrackingOutboxError { Io(std::io::Error), Json(serde_json::Error), Spacetime(SpacetimeClientError), } #[derive(Clone, Debug, Deserialize, Serialize)] struct TrackingOutboxRecord { event: RuntimeTrackingEventInput, } impl TrackingOutbox { pub fn from_config(config: &AppConfig, spacetime_client: SpacetimeClient) -> Option> { if !config.tracking_outbox_enabled { return None; } let total_bytes = directory_size_if_exists(&config.tracking_outbox_dir).unwrap_or(0); let outbox = Self { dir: config.tracking_outbox_dir.clone(), batch_size: config.tracking_outbox_batch_size.max(1), flush_interval: config.tracking_outbox_flush_interval, max_bytes: config.tracking_outbox_max_bytes, spacetime_client, inner: Arc::new(Mutex::new(TrackingOutboxInner { initialized: false, active_file: None, active_count: 0, active_bytes: 0, total_bytes, last_sealed_at: Instant::now(), })), flush_notify: Arc::new(Notify::new()), }; crate::telemetry::update_tracking_outbox_pending_bytes(total_bytes); Some(Arc::new(outbox)) } pub async fn enqueue( &self, event: RuntimeTrackingEventInput, ) -> Result { let record = TrackingOutboxRecord { event }; let mut line = serde_json::to_vec(&record)?; line.push(b'\n'); let line_bytes = line.len().min(u64::MAX as usize) as u64; let mut inner = self.inner.lock().await; self.ensure_initialized_locked(&mut inner).await?; if inner.total_bytes.saturating_add(line_bytes) > self.max_bytes { crate::telemetry::record_tracking_outbox_dropped("max_bytes"); return Ok(TrackingOutboxEnqueueOutcome::Dropped { reason: "max_bytes", }); } let active_path = self.active_path(); if inner.active_file.is_none() { inner.active_file = Some( OpenOptions::new() .create(true) .append(true) .open(&active_path) .await?, ); } let file = inner .active_file .as_mut() .expect("active file should be open before append"); file.write_all(&line).await?; inner.active_count = inner.active_count.saturating_add(1); inner.active_bytes = inner.active_bytes.saturating_add(line_bytes); inner.total_bytes = inner.total_bytes.saturating_add(line_bytes); crate::telemetry::record_tracking_outbox_enqueued(); crate::telemetry::update_tracking_outbox_pending_bytes(inner.total_bytes); if inner.active_count >= self.batch_size { self.seal_active_locked(&mut inner, "batch_size").await?; self.flush_notify.notify_one(); } Ok(TrackingOutboxEnqueueOutcome::Enqueued) } pub fn spawn_worker(self: Arc) { tokio::spawn(async move { loop { tokio::select! { _ = sleep(self.flush_interval) => { if let Err(error) = self.seal_active_if_due().await { warn!(error = %error, "tracking outbox 定时封存 active 文件失败"); } if let Err(error) = self.flush_sealed_files_once().await { warn!(error = %error, "tracking outbox 批量写入 SpacetimeDB 失败,将保留 sealed 文件等待重试"); } } _ = self.flush_notify.notified() => { if let Err(error) = self.flush_sealed_files_once().await { warn!(error = %error, "tracking outbox 批量写入 SpacetimeDB 失败,将保留 sealed 文件等待重试"); } } } } }); } async fn seal_active_if_due(&self) -> Result<(), TrackingOutboxError> { let mut inner = self.inner.lock().await; self.ensure_initialized_locked(&mut inner).await?; if inner.active_count == 0 || inner.last_sealed_at.elapsed() < self.flush_interval { return Ok(()); } self.seal_active_locked(&mut inner, "flush_interval").await } async fn flush_sealed_files_once(&self) -> Result<(), TrackingOutboxError> { self.ensure_initialized().await?; let sealed_files = self.list_sealed_files().await?; crate::telemetry::update_tracking_outbox_pending_files(sealed_files.len()); for path in sealed_files { let started_at = Instant::now(); let metadata = fs::metadata(&path).await?; let file_bytes = metadata.len(); let events = match read_outbox_events(&path).await { Ok(events) => events, Err(error) if error.is_data_corruption() => { let corrupt_path = self.corrupt_path_for(&path); fs::rename(&path, &corrupt_path).await?; self.subtract_total_bytes(file_bytes).await; crate::telemetry::record_tracking_outbox_corrupt_file(); warn!( error = %error, source = %path.display(), target = %corrupt_path.display(), "tracking outbox sealed 文件含无法解析的记录,已隔离并继续处理后续文件" ); continue; } Err(error) => return Err(error), }; if events.is_empty() { fs::remove_file(&path).await?; self.subtract_total_bytes(file_bytes).await; continue; } match self.spacetime_client.record_tracking_events(events).await { Ok(accepted_count) => { fs::remove_file(&path).await?; self.subtract_total_bytes(file_bytes).await; crate::telemetry::record_tracking_outbox_flush( started_at.elapsed(), accepted_count, file_bytes, false, ); debug!( accepted_count, file_bytes, path = %path.display(), "tracking outbox sealed 文件已批量入库并删除" ); } Err(error) => { crate::telemetry::record_tracking_outbox_flush( started_at.elapsed(), 0, file_bytes, true, ); return Err(TrackingOutboxError::Spacetime(error)); } } } Ok(()) } async fn ensure_initialized(&self) -> Result<(), TrackingOutboxError> { let mut inner = self.inner.lock().await; self.ensure_initialized_locked(&mut inner).await } async fn ensure_initialized_locked( &self, inner: &mut TrackingOutboxInner, ) -> Result<(), TrackingOutboxError> { if inner.initialized { return Ok(()); } fs::create_dir_all(&self.dir).await?; self.seal_existing_active_file().await?; inner.total_bytes = directory_size(&self.dir).await?; inner.initialized = true; inner.last_sealed_at = Instant::now(); crate::telemetry::update_tracking_outbox_pending_bytes(inner.total_bytes); Ok(()) } async fn seal_active_locked( &self, inner: &mut TrackingOutboxInner, reason: &'static str, ) -> Result<(), TrackingOutboxError> { if inner.active_count == 0 && inner.active_bytes == 0 { return Ok(()); } if let Some(mut file) = inner.active_file.take() { file.flush().await?; file.sync_data().await?; drop(file); } let active_path = self.active_path(); match fs::metadata(&active_path).await { Ok(metadata) if metadata.len() > 0 => { let sealed_path = self.next_sealed_path(); fs::rename(&active_path, &sealed_path).await?; crate::telemetry::record_tracking_outbox_sealed(reason); debug!( reason, event_count = inner.active_count, file_bytes = metadata.len(), path = %sealed_path.display(), "tracking outbox active 文件已封存" ); } Ok(_) => { let _ = fs::remove_file(&active_path).await; } Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} Err(error) => return Err(error.into()), } inner.active_count = 0; inner.active_bytes = 0; inner.last_sealed_at = Instant::now(); Ok(()) } async fn seal_existing_active_file(&self) -> Result<(), TrackingOutboxError> { let active_path = self.active_path(); match fs::metadata(&active_path).await { Ok(metadata) if metadata.len() > 0 => { fs::rename(&active_path, self.next_sealed_path()).await?; crate::telemetry::record_tracking_outbox_sealed("startup"); } Ok(_) => { let _ = fs::remove_file(&active_path).await; } Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} Err(error) => return Err(error.into()), } Ok(()) } async fn list_sealed_files(&self) -> Result, TrackingOutboxError> { let mut entries = fs::read_dir(&self.dir).await?; let mut files = Vec::new(); while let Some(entry) = entries.next_entry().await? { let path = entry.path(); let Some(name) = path.file_name().and_then(|value| value.to_str()) else { continue; }; if name.starts_with(SEALED_FILE_PREFIX) && name.ends_with(SEALED_FILE_EXTENSION) { files.push(path); } } files.sort(); Ok(files) } async fn subtract_total_bytes(&self, bytes: u64) { let mut inner = self.inner.lock().await; inner.total_bytes = inner.total_bytes.saturating_sub(bytes); crate::telemetry::update_tracking_outbox_pending_bytes(inner.total_bytes); } fn active_path(&self) -> PathBuf { self.dir.join(ACTIVE_FILE_NAME) } fn next_sealed_path(&self) -> PathBuf { self.dir.join(format!( "{SEALED_FILE_PREFIX}{}-{uuid}{SEALED_FILE_EXTENSION}", current_unix_micros(), uuid = uuid::Uuid::new_v4() )) } fn corrupt_path_for(&self, path: &Path) -> PathBuf { let name = path .file_name() .and_then(|value| value.to_str()) .unwrap_or("unknown.ndjson"); self.dir.join(format!( "{CORRUPT_FILE_PREFIX}{}-{uuid}-{name}", current_unix_micros(), uuid = uuid::Uuid::new_v4() )) } } impl fmt::Debug for TrackingOutbox { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TrackingOutbox") .field("dir", &self.dir) .field("batch_size", &self.batch_size) .field("flush_interval", &self.flush_interval) .field("max_bytes", &self.max_bytes) .finish() } } impl fmt::Display for TrackingOutboxError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Io(error) => write!(f, "{error}"), Self::Json(error) => write!(f, "{error}"), Self::Spacetime(error) => write!(f, "{error}"), } } } impl From for TrackingOutboxError { fn from(value: std::io::Error) -> Self { Self::Io(value) } } impl From for TrackingOutboxError { fn from(value: serde_json::Error) -> Self { Self::Json(value) } } impl TrackingOutboxError { fn is_data_corruption(&self) -> bool { matches!(self, Self::Json(_)) } } async fn read_outbox_events( path: &Path, ) -> Result, TrackingOutboxError> { let file = File::open(path).await?; let mut lines = BufReader::new(file).lines(); let mut events = Vec::new(); while let Some(line) = lines.next_line().await? { if line.trim().is_empty() { continue; } let record = serde_json::from_str::(&line)?; events.push(record.event); } Ok(events) } async fn directory_size(path: &Path) -> Result { let mut total = 0u64; let mut entries = fs::read_dir(path).await?; while let Some(entry) = entries.next_entry().await? { if !is_pending_outbox_file_name(&entry.file_name()) { continue; } let metadata = entry.metadata().await?; if metadata.is_file() { total = total.saturating_add(metadata.len()); } } Ok(total) } fn directory_size_if_exists(path: &Path) -> Result { if !path.is_dir() { return Ok(0); } let mut total = 0u64; for entry in std::fs::read_dir(path)? { let entry = entry?; if !is_pending_outbox_file_name(&entry.file_name()) { continue; } let metadata = entry.metadata()?; if metadata.is_file() { total = total.saturating_add(metadata.len()); } } Ok(total) } fn current_unix_micros() -> u128 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_micros() } fn is_pending_outbox_file_name(name: &std::ffi::OsStr) -> bool { name.to_str().is_some_and(|value| { value == ACTIVE_FILE_NAME || (value.starts_with(SEALED_FILE_PREFIX) && value.ends_with(SEALED_FILE_EXTENSION)) }) } #[cfg(test)] mod tests { use super::*; fn sample_event(event_id: &str) -> RuntimeTrackingEventInput { RuntimeTrackingEventInput { event_id: event_id.to_string(), event_key: "puzzle_route_success".to_string(), scope_kind: module_runtime::RuntimeTrackingScopeKind::Site, scope_id: "site".to_string(), user_id: None, owner_user_id: None, profile_id: None, module_key: Some("puzzle".to_string()), metadata_json: "{}".to_string(), occurred_at_micros: 1_713_680_000_000_000, } } fn test_dir(name: &str) -> PathBuf { let dir = std::env::temp_dir().join(format!( "genarrative-tracking-outbox-{name}-{}", current_unix_micros() )); let _ = std::fs::remove_dir_all(&dir); dir } fn test_outbox(dir: PathBuf, batch_size: usize, max_bytes: u64) -> Arc { let config = AppConfig { tracking_outbox_dir: dir, tracking_outbox_batch_size: batch_size, tracking_outbox_max_bytes: max_bytes, tracking_outbox_flush_interval: Duration::from_secs(60), ..AppConfig::default() }; TrackingOutbox::from_config( &config, SpacetimeClient::new(spacetime_client::SpacetimeClientConfig { server_url: "http://127.0.0.1:1".to_string(), database: "missing".to_string(), token: None, pool_size: 1, procedure_timeout: Duration::from_millis(10), }), ) .expect("outbox should be enabled") } #[tokio::test] async fn enqueue_seals_active_file_when_batch_size_reached_and_rotates_active() { let dir = test_dir("batch"); let outbox = test_outbox(dir.clone(), 2, 1024 * 1024); outbox.enqueue(sample_event("event-1")).await.unwrap(); outbox.enqueue(sample_event("event-2")).await.unwrap(); assert!(!dir.join(ACTIVE_FILE_NAME).exists()); let sealed_count = std::fs::read_dir(&dir) .unwrap() .filter_map(Result::ok) .filter(|entry| { entry .file_name() .to_str() .is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX)) }) .count(); assert_eq!(sealed_count, 1); outbox.enqueue(sample_event("event-3")).await.unwrap(); let active_contents = std::fs::read_to_string(dir.join(ACTIVE_FILE_NAME)).unwrap(); assert!(active_contents.contains("event-3")); let sealed_count_after_rotate = std::fs::read_dir(&dir) .unwrap() .filter_map(Result::ok) .filter(|entry| { entry .file_name() .to_str() .is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX)) }) .count(); assert_eq!(sealed_count_after_rotate, 1); let _ = std::fs::remove_dir_all(dir); } #[tokio::test] async fn enqueue_drops_when_outbox_exceeds_max_bytes() { let dir = test_dir("max-bytes"); let outbox = test_outbox(dir.clone(), 500, 1); let outcome = outbox.enqueue(sample_event("event-1")).await.unwrap(); assert!(matches!( outcome, TrackingOutboxEnqueueOutcome::Dropped { reason: "max_bytes" } )); assert!(!dir.join(ACTIVE_FILE_NAME).exists()); let _ = std::fs::remove_dir_all(dir); } #[tokio::test] async fn flush_quarantines_corrupt_sealed_file() { let dir = test_dir("corrupt"); std::fs::create_dir_all(&dir).unwrap(); let sealed_path = dir.join(format!("{SEALED_FILE_PREFIX}bad{SEALED_FILE_EXTENSION}")); std::fs::write(&sealed_path, b"{not-json}\n").unwrap(); let outbox = test_outbox(dir.clone(), 500, 1024 * 1024); outbox.flush_sealed_files_once().await.unwrap(); assert!(!sealed_path.exists()); let corrupt_count = std::fs::read_dir(&dir) .unwrap() .filter_map(Result::ok) .filter(|entry| { entry .file_name() .to_str() .is_some_and(|name| name.starts_with(CORRUPT_FILE_PREFIX)) }) .count(); assert_eq!(corrupt_count, 1); let _ = std::fs::remove_dir_all(dir); } #[test] fn directory_size_excludes_quarantined_corrupt_files() { let dir = test_dir("directory-size"); std::fs::create_dir_all(&dir).unwrap(); std::fs::write(dir.join(ACTIVE_FILE_NAME), b"active").unwrap(); std::fs::write( dir.join(format!("{SEALED_FILE_PREFIX}one{SEALED_FILE_EXTENSION}")), b"sealed", ) .unwrap(); std::fs::write( dir.join(format!("{CORRUPT_FILE_PREFIX}one{SEALED_FILE_EXTENSION}")), b"corrupt", ) .unwrap(); let total = directory_size_if_exists(&dir).unwrap(); assert_eq!(total, 12); let _ = std::fs::remove_dir_all(dir); } }