Skip to content

Commit d2b8da5

Browse files
committed
fix
1 parent 481f7f9 commit d2b8da5

File tree

1 file changed

+25
-16
lines changed

1 file changed

+25
-16
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,14 @@ const HASH_JOIN_SEED: RandomState =
122122
/// All fields use atomic operations or mutexes to ensure correct coordination between concurrent
123123
/// partition executions.
124124
struct SharedBoundsAccumulator {
125-
/// Bounds from completed partitions
125+
/// Bounds from completed partitions.
126126
bounds: Mutex<Vec<Vec<(ScalarValue, ScalarValue)>>>,
127-
/// Total number of partitions (needed to know when we are doing building the build-side)
128-
total_partitions: AtomicUsize,
127+
/// Number of partitions that have reported completion.
128+
completed_partitions: AtomicUsize,
129+
/// Total number of partitions.
130+
/// Need to know this so that we can update the dynamic filter once we are done
131+
/// building *all* of the hash tables.
132+
total_partitions: usize,
129133
}
130134

131135
impl SharedBoundsAccumulator {
@@ -173,7 +177,8 @@ impl SharedBoundsAccumulator {
173177
};
174178
Self {
175179
bounds: Mutex::new(Vec::new()),
176-
total_partitions: AtomicUsize::new(expected_calls),
180+
completed_partitions: AtomicUsize::new(0),
181+
total_partitions: expected_calls,
177182
}
178183
}
179184

@@ -1821,25 +1826,29 @@ impl HashJoinStream {
18211826
// Note: In CollectLeft mode, multiple partitions may access the SAME build data
18221827
// (shared via OnceFut), but each partition must report separately to ensure proper
18231828
// coordination across all output partitions.
1824-
if let (Some(dynamic_filter), Some(bounds)) =
1825-
(&self.dynamic_filter, &left_data.bounds)
1826-
{
1829+
//
1830+
// The consequences of not doing this syncronization properly would be that a filter
1831+
// with incomplete bounds would be pushed down resulting in incorrect results (missing rows).
1832+
if let Some(dynamic_filter) = &self.dynamic_filter {
18271833
// Store bounds in the accumulator - this runs once per partition
1828-
// Troubleshooting: If bounds are empty/None, check collect_left_input
1829-
// was called with should_compute_bounds=true
1830-
let mut bounds_guard = self.bounds_accumulator.bounds.lock();
1831-
bounds_guard.push(bounds.clone());
1832-
let completed = bounds_guard.len();
1833-
let total_partitions = self
1834+
if let Some(bounds) = &left_data.bounds {
1835+
// Only push actual bounds if they exist
1836+
self.bounds_accumulator.bounds.lock().push(bounds.clone());
1837+
}
1838+
1839+
// Atomically increment the completion counter
1840+
// Even empty partitions must report to ensure proper termination
1841+
let completed = self
18341842
.bounds_accumulator
1835-
.total_partitions
1836-
.load(Ordering::SeqCst);
1843+
.completed_partitions
1844+
.fetch_add(1, Ordering::SeqCst)
1845+
+ 1;
1846+
let total_partitions = self.bounds_accumulator.total_partitions;
18371847

18381848
// Critical synchronization point: Only the last partition updates the filter
18391849
// Troubleshooting: If you see "completed > total_partitions", check partition
18401850
// count calculation in try_new() - it may not match actual execution calls
18411851
if completed == total_partitions {
1842-
drop(bounds_guard); // Release lock before merging
18431852
if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() {
18441853
let filter_expr = self.create_filter_from_bounds(merged_bounds)?;
18451854
dynamic_filter.update(filter_expr)?;

0 commit comments

Comments
 (0)