feat(api-server): default otlp and async tracking outbox
This commit is contained in:
@@ -11,7 +11,7 @@ use spacetime_client::{SpacetimeClient, SpacetimeClientError};
|
||||
use tokio::{
|
||||
fs::{self, File, OpenOptions},
|
||||
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
|
||||
sync::Mutex,
|
||||
sync::{Mutex, Notify},
|
||||
time::sleep,
|
||||
};
|
||||
use tracing::{debug, warn};
|
||||
@@ -31,6 +31,7 @@ pub struct TrackingOutbox {
|
||||
max_bytes: u64,
|
||||
spacetime_client: SpacetimeClient,
|
||||
inner: Arc<Mutex<TrackingOutboxInner>>,
|
||||
flush_notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
struct TrackingOutboxInner {
|
||||
@@ -81,6 +82,7 @@ impl TrackingOutbox {
|
||||
total_bytes,
|
||||
last_sealed_at: Instant::now(),
|
||||
})),
|
||||
flush_notify: Arc::new(Notify::new()),
|
||||
};
|
||||
crate::telemetry::update_tracking_outbox_pending_bytes(total_bytes);
|
||||
Some(Arc::new(outbox))
|
||||
@@ -129,6 +131,7 @@ impl TrackingOutbox {
|
||||
|
||||
if inner.active_count >= self.batch_size {
|
||||
self.seal_active_locked(&mut inner, "batch_size").await?;
|
||||
self.flush_notify.notify_one();
|
||||
}
|
||||
|
||||
Ok(TrackingOutboxEnqueueOutcome::Enqueued)
|
||||
@@ -137,12 +140,20 @@ impl TrackingOutbox {
|
||||
pub fn spawn_worker(self: Arc<Self>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
sleep(self.flush_interval).await;
|
||||
if let Err(error) = self.seal_active_if_due().await {
|
||||
warn!(error = %error, "tracking outbox 定时封存 active 文件失败");
|
||||
}
|
||||
if let Err(error) = self.flush_sealed_files_once().await {
|
||||
warn!(error = %error, "tracking outbox 批量写入 SpacetimeDB 失败,将保留 sealed 文件等待重试");
|
||||
tokio::select! {
|
||||
_ = sleep(self.flush_interval) => {
|
||||
if let Err(error) = self.seal_active_if_due().await {
|
||||
warn!(error = %error, "tracking outbox 定时封存 active 文件失败");
|
||||
}
|
||||
if let Err(error) = self.flush_sealed_files_once().await {
|
||||
warn!(error = %error, "tracking outbox 批量写入 SpacetimeDB 失败,将保留 sealed 文件等待重试");
|
||||
}
|
||||
}
|
||||
_ = self.flush_notify.notified() => {
|
||||
if let Err(error) = self.flush_sealed_files_once().await {
|
||||
warn!(error = %error, "tracking outbox 批量写入 SpacetimeDB 失败,将保留 sealed 文件等待重试");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -502,7 +513,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_seals_active_file_when_batch_size_reached() {
|
||||
async fn enqueue_seals_active_file_when_batch_size_reached_and_rotates_active() {
|
||||
let dir = test_dir("batch");
|
||||
let outbox = test_outbox(dir.clone(), 2, 1024 * 1024);
|
||||
|
||||
@@ -522,6 +533,22 @@ mod tests {
|
||||
.count();
|
||||
assert_eq!(sealed_count, 1);
|
||||
|
||||
outbox.enqueue(sample_event("event-3")).await.unwrap();
|
||||
|
||||
let active_contents = std::fs::read_to_string(dir.join(ACTIVE_FILE_NAME)).unwrap();
|
||||
assert!(active_contents.contains("event-3"));
|
||||
let sealed_count_after_rotate = std::fs::read_dir(&dir)
|
||||
.unwrap()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| {
|
||||
entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX))
|
||||
})
|
||||
.count();
|
||||
assert_eq!(sealed_count_after_rotate, 1);
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user