diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs index 30cb47d3c265b..ef7c7c848729f 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs @@ -210,7 +210,11 @@ async fn initial_size_correct_with_multievents() { writer.close(); // Now drop our buffer and reopen it. + // Yield to allow the background finalizer task to observe the closed + // stream and release its Arc (and thus the lock file) before + // we attempt to reopen the buffer. drop(writer); + tokio::task::yield_now().await; let (writer, mut reader, ledger, usage) = create_default_buffer_v2_with_usage::<_, MultiEventRecord>(data_dir).await; drop(writer); diff --git a/src/sources/file.rs b/src/sources/file.rs index 62a95db362f10..df51a5a73788a 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -2462,12 +2462,23 @@ mod tests { inner: impl Future, ) -> Vec { assert_source_compliance(&FILE_SOURCE_TAGS, async move { - let (tx, rx) = if acking_mode == Acks { - let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); - (tx, rx.boxed()) - } else { - let (tx, rx) = SourceSender::new_test(); - (tx, rx.boxed()) + let (tx, rx) = match acking_mode { + Acks => { + let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + (tx, rx.boxed()) + } + Unfinalized => { + // Use Rejected so that events are finalized but checkpoints + // are NOT updated (only Delivered triggers checkpoint updates). + // This avoids a race where the default Delivered status on drop + // could leak checkpoint writes into the next run. + let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected); + (tx, rx.boxed()) + } + NoAcks => { + let (tx, rx) = SourceSender::new_test(); + (tx, rx.boxed()) + } }; let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();