Skip to content

Commit 0eddbd1

Browse files
berkaysynnadametesynnada
authored andcommitted
Correct schema is used while updating filter expr bounds (#233)
1 parent b0f0ff2 commit 0eddbd1

File tree

4 files changed

+4
-11
lines changed

4 files changed

+4
-11
lines changed

datafusion/physical-plan/src/joins/partitioned_hash_join.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,6 @@ impl CommonJoinData {
941941
let calculated_necessary_build_side_intervals =
942942
calculate_the_necessary_build_side_range_helper(
943943
&self.filter,
944-
&self.build_buffer.latest_batch,
945944
&mut self.graph,
946945
&mut self.left_sorted_filter_expr,
947946
&mut self.right_sorted_filter_expr,
@@ -1265,7 +1264,6 @@ impl LazyJoinStream for LazyPartitionedHashJoinStream {
12651264
) -> Result<Vec<(PhysicalSortExpr, Interval)>> {
12661265
calculate_the_necessary_build_side_range_helper(
12671266
&self.join_data.filter,
1268-
&self.join_data.build_buffer.latest_batch,
12691267
&mut self.join_data.graph,
12701268
&mut self.join_data.left_sorted_filter_expr,
12711269
&mut self.join_data.right_sorted_filter_expr,
@@ -1851,7 +1849,6 @@ mod fuzzy_tests {
18511849

18521850
#[rstest]
18531851
#[tokio::test(flavor = "multi_thread")]
1854-
#[ignore]
18551852
async fn testing_with_temporal_columns(
18561853
#[values(JoinType::Inner, JoinType::Right)] join_type: JoinType,
18571854
#[values((3, 5), (5, 3))] cardinality: (i32, i32),

datafusion/physical-plan/src/joins/sliding_hash_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,6 @@ impl LazyJoinStream for LazySlidingHashJoinStream {
960960
) -> Result<Vec<(PhysicalSortExpr, Interval)>> {
961961
calculate_the_necessary_build_side_range_helper(
962962
&self.join_data.filter,
963-
&self.join_data.build_buffer.input_buffer,
964963
&mut self.join_data.graph,
965964
&mut self.join_data.left_sorted_filter_expr,
966965
&mut self.join_data.right_sorted_filter_expr,

datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,6 @@ impl LazyJoinStream for LazySlidingNestedLoopJoinStream {
11151115
) -> Result<Vec<(PhysicalSortExpr, Interval)>> {
11161116
calculate_the_necessary_build_side_range_helper(
11171117
&self.join_data.filter,
1118-
&self.join_data.build_buffer.input_buffer,
11191118
&mut self.join_data.graph,
11201119
&mut self.join_data.left_sorted_filter_expr,
11211120
&mut self.join_data.right_sorted_filter_expr,

datafusion/physical-plan/src/joins/sliding_window_join_utils.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,6 @@ pub fn is_batch_suitable_interval_calculation(
905905
/// for each expression.
906906
pub fn calculate_the_necessary_build_side_range_helper(
907907
filter: &JoinFilter,
908-
build_inner_buffer: &RecordBatch,
909908
graph: &mut ExprIntervalGraph,
910909
build_sorted_filter_exprs: &mut [SortedFilterExpr],
911910
probe_sorted_filter_exprs: &mut [SortedFilterExpr],
@@ -915,7 +914,6 @@ pub fn calculate_the_necessary_build_side_range_helper(
915914
// Calculate the interval for the build side filter expression (if present):
916915
update_filter_expr_bounds(
917916
filter,
918-
build_inner_buffer,
919917
build_sorted_filter_exprs,
920918
probe_batch,
921919
probe_sorted_filter_exprs,
@@ -1287,16 +1285,16 @@ pub enum LazyJoinStreamState {
12871285
/// calculates the actual interval for the probe side based on the sort options.
12881286
pub(crate) fn update_filter_expr_bounds(
12891287
filter: &JoinFilter,
1290-
build_inner_buffer: &RecordBatch,
12911288
build_sorted_filter_exprs: &mut [SortedFilterExpr],
12921289
probe_batch: &RecordBatch,
12931290
probe_sorted_filter_exprs: &mut [SortedFilterExpr],
12941291
probe_side: JoinSide,
12951292
) -> Result<()> {
1293+
let schema = filter.schema();
12961294
// Evaluate the build side order expression to get datatype:
12971295
let build_order_datatype = build_sorted_filter_exprs[0]
12981296
.intermediate_batch_filter_expr()
1299-
.data_type(&build_inner_buffer.schema())?;
1297+
.data_type(schema)?;
13001298

13011299
// Create a null interval using the null scalar value:
13021300
let unbounded_interval = Interval::make_unbounded(&build_order_datatype)?;
@@ -1308,14 +1306,14 @@ pub(crate) fn update_filter_expr_bounds(
13081306
});
13091307

13101308
let first_probe_intermediate_batch = get_filter_representation_of_join_side(
1311-
filter.schema(),
1309+
schema,
13121310
&probe_batch.slice(0, 1),
13131311
filter.column_indices(),
13141312
probe_side,
13151313
)?;
13161314

13171315
let last_probe_intermediate_batch = get_filter_representation_of_join_side(
1318-
filter.schema(),
1316+
schema,
13191317
&probe_batch.slice(probe_batch.num_rows() - 1, 1),
13201318
filter.column_indices(),
13211319
probe_side,

0 commit comments

Comments
 (0)