Skip to content

Commit cea3ada

Browse files
authored
fix determinism of HashJoinExec bounds filter creation (#17280)
1 parent 1529dbd commit cea3ada

File tree

2 files changed

+17
-6
lines changed
  • datafusion

2 files changed

+17
-6
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
950950
use datafusion_common::JoinType;
951951
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
952952

953-
// Rouugh plan we're trying to recreate:
953+
// Rough sketch of the MRE we're trying to recreate:
954954
// COPY (select i as k from generate_series(1, 10000000) as t(i))
955955
// TO 'test_files/scratch/push_down_filter/t1.parquet'
956956
// STORED AS PARQUET;

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ use datafusion_physical_expr_common::datum::compare_op_for_nested;
9292
use ahash::RandomState;
9393
use datafusion_physical_expr_common::physical_expr::fmt_sql;
9494
use futures::{ready, Stream, StreamExt, TryStreamExt};
95+
use itertools::Itertools;
9596
use parking_lot::Mutex;
9697

9798
/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
@@ -118,14 +119,19 @@ impl ColumnBounds {
118119
/// This contains the min/max values computed from one partition's build-side data.
119120
#[derive(Debug, Clone)]
120121
struct PartitionBounds {
122+
/// Partition identifier for debugging and determinism (not strictly necessary)
123+
partition: usize,
121124
/// Min/max bounds for each join key column in this partition.
122125
/// Index corresponds to the join key expression index.
123126
column_bounds: Vec<ColumnBounds>,
124127
}
125128

126129
impl PartitionBounds {
127-
fn new(column_bounds: Vec<ColumnBounds>) -> Self {
128-
Self { column_bounds }
130+
fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self {
131+
Self {
132+
partition,
133+
column_bounds,
134+
}
129135
}
130136

131137
fn len(&self) -> usize {
@@ -258,7 +264,7 @@ impl SharedBoundsAccumulator {
258264
// Create a predicate for each partition
259265
let mut partition_predicates = Vec::with_capacity(bounds.len());
260266

261-
for partition_bounds in bounds.iter() {
267+
for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
262268
// Create range predicates for each join key in this partition
263269
let mut column_predicates = Vec::with_capacity(partition_bounds.len());
264270

@@ -320,14 +326,15 @@ impl SharedBoundsAccumulator {
320326
/// * `Result<()>` - Ok if successful, Err if filter update failed
321327
fn report_partition_bounds(
322328
&self,
329+
partition: usize,
323330
partition_bounds: Option<Vec<ColumnBounds>>,
324331
) -> Result<()> {
325332
let mut inner = self.inner.lock();
326333

327334
// Store bounds in the accumulator - this runs once per partition
328335
if let Some(bounds) = partition_bounds {
329336
// Only push actual bounds if they exist
330-
inner.bounds.push(PartitionBounds::new(bounds));
337+
inner.bounds.push(PartitionBounds::new(partition, bounds));
331338
}
332339

333340
// Increment the completion counter
@@ -1254,6 +1261,7 @@ impl ExecutionPlan for HashJoinExec {
12541261
.collect::<Vec<_>>();
12551262

12561263
Ok(Box::pin(HashJoinStream {
1264+
partition,
12571265
schema: self.schema(),
12581266
on_right,
12591267
filter: self.filter.clone(),
@@ -1769,6 +1777,8 @@ impl ProcessProbeBatchState {
17691777
/// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and joins
17701778
/// them with the contents of the hash table
17711779
struct HashJoinStream {
1780+
/// Partition identifier for debugging and determinism
1781+
partition: usize,
17721782
/// Input schema
17731783
schema: Arc<Schema>,
17741784
/// equijoin columns from the right (probe side)
@@ -1993,7 +2003,8 @@ impl HashJoinStream {
19932003
// Dynamic filter coordination between partitions:
19942004
// Report bounds to the accumulator which will handle synchronization and filter updates
19952005
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
1996-
bounds_accumulator.report_partition_bounds(left_data.bounds.clone())?;
2006+
bounds_accumulator
2007+
.report_partition_bounds(self.partition, left_data.bounds.clone())?;
19972008
}
19982009

19992010
self.state = HashJoinStreamState::FetchProbeBatch;

0 commit comments

Comments
 (0)