From e0484f5bf04925f64b1e9a44a84b5c5c1489a519 Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Wed, 22 Apr 2026 08:40:21 +0300 Subject: [PATCH 1/2] fix(tests): fix flaky file_start_position_server_restart_unfinalized test Backport upstream fix (vectordotdev/vector#24957) for a race condition in run_file_source test helper. When using Unfinalized acking mode, the default Delivered status on drop leaked checkpoint writes, causing the second run to skip re-reading the file. Use EventStatus::Rejected for Unfinalized mode so events are finalized but checkpoints are NOT updated, eliminating the race condition. Co-Authored-By: Claude Opus 4.6 --- src/sources/file.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/sources/file.rs b/src/sources/file.rs index 62a95db362f10..df51a5a73788a 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -2462,12 +2462,23 @@ mod tests { inner: impl Future, ) -> Vec { assert_source_compliance(&FILE_SOURCE_TAGS, async move { - let (tx, rx) = if acking_mode == Acks { - let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); - (tx, rx.boxed()) - } else { - let (tx, rx) = SourceSender::new_test(); - (tx, rx.boxed()) + let (tx, rx) = match acking_mode { + Acks => { + let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + (tx, rx.boxed()) + } + Unfinalized => { + // Use Rejected so that events are finalized but checkpoints + // are NOT updated (only Delivered triggers checkpoint updates). + // This avoids a race where the default Delivered status on drop + // could leak checkpoint writes into the next run. + let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected); + (tx, rx.boxed()) + } + NoAcks => { + let (tx, rx) = SourceSender::new_test(); + (tx, rx.boxed()) + } }; let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired(); From 9e12cdbfbc95cf39cf4c8b72fe8f06e247fce5ed Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Wed, 22 Apr 2026 11:07:13 +0300 Subject: [PATCH 2/2] fix(tests): fix flaky initial_size_correct_with_multievents test Add yield_now() after dropping the buffer writer to allow the background finalizer task to release its Arc and the lock file before attempting to reopen the buffer. Without this, a race condition can cause LedgerLockAlreadyHeld errors when the finalizer task hasn't exited before the new buffer is created. Co-Authored-By: Claude Opus 4.6 --- lib/vector-buffers/src/variants/disk_v2/tests/basic.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs index 30cb47d3c265b..ef7c7c848729f 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs @@ -210,7 +210,11 @@ async fn initial_size_correct_with_multievents() { writer.close(); // Now drop our buffer and reopen it. + // Yield to allow the background finalizer task to observe the closed + // stream and release its Arc (and thus the lock file) before + // we attempt to reopen the buffer. drop(writer); + tokio::task::yield_now().await; let (writer, mut reader, ledger, usage) = create_default_buffer_v2_with_usage::<_, MultiEventRecord>(data_dir).await; drop(writer);