Skip to content

Conversation

@kosiew
Copy link
Contributor

@kosiew kosiew commented Dec 5, 2025

Which issue does this PR close?

Rationale for this change

The spill_pool channel test test_reader_catches_up_to_writer was flaky due to non-deterministic coordination between the reader and writer tasks. The test used time-based sleeps and polling on shared state to infer when the reader had started and when it had processed a batch. Under varying scheduler timing, this could cause the reader to miss events or observe them in a different order, leading to intermittent failures where the recorded event sequence did not match expectations (for example, observing 3 instead of 5 reads).

Since this test verifies the correctness and wakeup behavior of the spill channel used by the spill pool, flakiness here undermines confidence in the spill mechanism and can cause spurious CI failures.

This PR makes the test coordination explicit and deterministic using oneshot channels, and also improves the usage example for the spill channel to show how to run writer and reader concurrently in a robust way.

What changes are included in this PR?

  1. Example: concurrent writer and reader usage

    • Update the spill_pool::channel usage example to:

      • Spawn writer and reader tasks concurrently instead of only spawning the writer.
      • Use writer.push_batch(&batch)? so the example returns a Result and propagates errors correctly.
      • Explicitly drop(writer) at the end of the writer task to finalize the spill file and wake the reader.
      • Use tokio::join! to await both tasks and map join errors into DataFusionError::Execution.
      • Assert that the reader sees all expected batches (batches_read == 5).
    • The updated example better demonstrates the intended concurrent usage pattern of the spill channel and ensures the reader is correctly woken when the writer finishes.

  2. Test: make test_reader_catches_up_to_writer deterministic

    • Introduce two oneshot channels:

      • reader_waiting_tx/rx to signal when the reader has started and is pending on its first next() call.
      • first_read_done_tx/rx to signal when the reader has completed processing the first batch.
    • In the reader task:

      • Record ReadStart and send on reader_waiting_tx before awaiting reader.next().
      • After successfully reading and recording the first batch, send on first_read_done_tx.
      • Then read the second batch as before.
    • In the test body:

      • Wait on reader_waiting_rx instead of sleeping for a fixed duration, ensuring the reader is actually pending before writing the first batch.
      • After the first write, wait on first_read_done_rx before issuing the second write.
    • This establishes a precise and documented sequence of events:

      1. Reader starts and pends on the first next().
      2. First write occurs, waking the reader.
      3. Reader processes the first batch and signals completion.
      4. Second write occurs.
    • With this explicit synchronization, the event ordering in the test is stable and no longer depends on scheduler timing or arbitrary sleeps, eliminating the flakiness.

Are these changes tested?

Yes.

for i in {1..200}; do
  echo "Run #$i started"
  cargo test -p datafusion-physical-plan --profile ci  --doc -q || break
  echo "Run #$i completed"
done
  • The modified test test_reader_catches_up_to_writer continues to run as part of the existing spill_pool test suite, but now uses explicit synchronization instead of timing-based assumptions.

  • The test has been exercised repeatedly to confirm that:

    • The expected read/write event sequence is stable across runs.
    • The intermittent assertion failures (e.g., mismatched read counts such as 3 vs 5) no longer occur.
  • The updated example code compiles and type-checks by returning datafusion_common::Result from both spawned tasks and from the combined tokio::join! result.

Are there any user-facing changes?

There are no behavior changes to the public API or spill pool semantics.

  • The spill channel and spill pool behavior remains the same at runtime.
  • Only the documentation/example and the internal test harness have been updated.
  • No configuration flags or public methods were added, removed, or changed, so there are no breaking changes or documentation requirements beyond what is already updated inline.

LLM-generated code disclosure

This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.

Replace timing-based waits in test_reader_catches_up_to_writer
with oneshot channels to signal when the reader is pending and
after the first read. This change gates writer progress for
deterministic coordination while preserving the expected
five-event ordering by performing writes only after the
corresponding reader signals.
Add contextual comments to test_reader_catches_up_to_writer
to clarify the necessary ordering between reader startup,
initial wake-up, and subsequent writes/reads for
deterministic behavior. This improves code understanding
and maintainability.
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Dec 5, 2025
@kosiew kosiew changed the title Fix flaky spill_pool channel test and clarify channel usage example Fix flaky SpillPool channel test by synchronizing reader and writer tasks Dec 5, 2025
@kosiew kosiew force-pushed the flakytest-spillpool-19058 branch 2 times, most recently from 3ab1d0c to 8e199da Compare December 6, 2025 03:13
@kosiew kosiew marked this pull request as ready for review December 6, 2025 14:50
@kosiew kosiew force-pushed the flakytest-spillpool-19058 branch 2 times, most recently from fcebe1b to 2184559 Compare December 7, 2025 09:56
@kosiew kosiew force-pushed the flakytest-spillpool-19058 branch from 2184559 to 3a0cd77 Compare December 7, 2025 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky test: spill::spill_pool::channel

1 participant