feat: add graceful api shutdown readiness
This commit is contained in:
@@ -159,6 +159,16 @@ impl TrackingOutbox {
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn flush_for_shutdown(&self) -> Result<(), TrackingOutboxError> {
|
||||
{
|
||||
let mut inner = self.inner.lock().await;
|
||||
self.ensure_initialized_locked(&mut inner).await?;
|
||||
self.seal_active_locked(&mut inner, "shutdown").await?;
|
||||
}
|
||||
|
||||
self.flush_sealed_files_once().await
|
||||
}
|
||||
|
||||
async fn seal_active_if_due(&self) -> Result<(), TrackingOutboxError> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
self.ensure_initialized_locked(&mut inner).await?;
|
||||
@@ -176,7 +186,11 @@ impl TrackingOutbox {
|
||||
crate::telemetry::update_tracking_outbox_pending_files(sealed_files.len());
|
||||
for path in sealed_files {
|
||||
let started_at = Instant::now();
|
||||
let metadata = fs::metadata(&path).await?;
|
||||
let metadata = match fs::metadata(&path).await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
let file_bytes = metadata.len();
|
||||
let events = match read_outbox_events(&path).await {
|
||||
Ok(events) => events,
|
||||
@@ -203,7 +217,11 @@ impl TrackingOutbox {
|
||||
|
||||
match self.spacetime_client.record_tracking_events(events).await {
|
||||
Ok(accepted_count) => {
|
||||
fs::remove_file(&path).await?;
|
||||
match fs::remove_file(&path).await {
|
||||
Ok(()) => {}
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(error) => return Err(error.into()),
|
||||
}
|
||||
self.subtract_total_bytes(file_bytes).await;
|
||||
crate::telemetry::record_tracking_outbox_flush(
|
||||
started_at.elapsed(),
|
||||
@@ -596,6 +614,34 @@ mod tests {
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_flush_seals_active_file_for_later_retry() {
|
||||
let dir = test_dir("shutdown");
|
||||
let outbox = test_outbox(dir.clone(), 500, 1024 * 1024);
|
||||
|
||||
outbox.enqueue(sample_event("event-1")).await.unwrap();
|
||||
let result = outbox.flush_for_shutdown().await;
|
||||
|
||||
assert!(
|
||||
matches!(result, Err(TrackingOutboxError::Spacetime(_))),
|
||||
"missing test SpacetimeDB should keep sealed file for retry"
|
||||
);
|
||||
assert!(!dir.join(ACTIVE_FILE_NAME).exists());
|
||||
let sealed_count = std::fs::read_dir(&dir)
|
||||
.unwrap()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| {
|
||||
entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.is_some_and(|name| name.starts_with(SEALED_FILE_PREFIX))
|
||||
})
|
||||
.count();
|
||||
assert_eq!(sealed_count, 1);
|
||||
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn directory_size_excludes_quarantined_corrupt_files() {
|
||||
let dir = test_dir("directory-size");
|
||||
|
||||
Reference in New Issue
Block a user