From e61a5adf08759bb57d53e60e0754ff44ae3699bf Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 25 Aug 2025 17:41:14 -0500 Subject: [PATCH 1/8] fix equivalnece properties calculation in DataSourceExec similar to #17077 --- datafusion/datasource-parquet/src/source.rs | 4 ++ datafusion/datasource/src/file.rs | 3 ++ datafusion/datasource/src/file_scan_config.rs | 30 ++++++++++++-- datafusion/datasource/src/source.rs | 32 +-------------- .../src/equivalence/properties/mod.rs | 41 +++++++++---------- 5 files changed, 54 insertions(+), 56 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 1277ec52adf7..6f4dd39f29f5 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -587,6 +587,10 @@ impl FileSource for ParquetSource { self } + fn filter(&self) -> Option> { + self.predicate.clone() + } + fn with_batch_size(&self, batch_size: usize) -> Arc { let mut conf = self.clone(); conf.batch_size = Some(batch_size); diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 29fa38a8ee36..7656262a0a05 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -69,6 +69,9 @@ pub trait FileSource: Send + Sync { fn with_projection(&self, config: &FileScanConfig) -> Arc; /// Initialize new instance with projected statistics fn with_statistics(&self, statistics: Statistics) -> Arc; + fn filter(&self) -> Option> { + None + } /// Return execution plan metrics fn metrics(&self) -> &ExecutionPlanMetricsSet; /// Return projected statistics diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7088f811bbce..f6ff4119db67 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -52,18 +52,20 @@ use datafusion_common::{ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec}, DisplayAs, DisplayFormatType, ExecutionPlan, }; +use datafusion_physical_plan::{ + filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation, +}; use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; @@ -577,8 +579,17 @@ impl DataSource for FileScanConfig { fn eq_properties(&self) -> EquivalenceProperties { let (schema, constraints, _, orderings) = self.project(); - EquivalenceProperties::new_with_orderings(schema, orderings) - .with_constraints(constraints) + let mut eq_properties = + EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings) + .with_constraints(constraints); + if let Some(filter) = self.file_source.filter() { + let filter = reassign_predicate_columns(filter, &schema, false).expect( + "reassign_predicate_columns should succeed as schema is derived from projection", + ); + Self::add_filter_equivalence_info(filter, &mut eq_properties) + .unwrap_or_else(|e| warn!("Failed to add filter equivalence info: {e}")); + } + eq_properties } fn scheduling_type(&self) -> SchedulingType { @@ -724,6 +735,17 @@ impl FileScanConfig { )) } + fn add_filter_equivalence_info( + filter: Arc, + eq_properties: &mut EquivalenceProperties, + ) -> Result<()> { + let (equal_pairs, _) = collect_columns_from_predicate(&filter); + for (lhs, rhs) in equal_pairs { + eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? + } + Ok(()) + } + pub fn projected_constraints(&self) -> Constraints { let indexes = self.projection_indices(); self.constraints.project(&indexes).unwrap_or_default() diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 153d03b3ab49..60be39bc637d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -39,11 +39,8 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{ - conjunction, EquivalenceProperties, Partitioning, PhysicalExpr, -}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, }; @@ -375,21 +372,10 @@ impl ExecutionPlan for DataSourceExec { Some(data_source) => { let mut new_node = self.clone(); new_node.data_source = data_source; + // Re-compute properties since we have new filters which will impact equivalence info new_node.cache = Self::compute_properties(Arc::clone(&new_node.data_source)); - // Recompute equivalence info using new filters - let filter = conjunction( - res.filters - .iter() - .zip(parent_filters) - .filter_map(|(s, f)| match s { - PushedDown::Yes => Some(f), - PushedDown::No => None, - }) - .collect_vec(), - ); - new_node = new_node.add_filter_equivalence_info(filter)?; Ok(FilterPushdownPropagation { filters: res.filters, updated_node: Some(Arc::new(new_node)), @@ -437,20 +423,6 @@ impl DataSourceExec { self } - /// Add filters' equivalence info - fn add_filter_equivalence_info( - mut self, - filter: Arc, - ) -> Result { - let (equal_pairs, _) = collect_columns_from_predicate(&filter); - for (lhs, rhs) in equal_pairs { - self.cache - .eq_properties - .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? - } - Ok(self) - } - fn compute_properties(data_source: Arc) -> PlanProperties { PlanProperties::new( data_source.eq_properties(), diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 6d18d34ca4de..822b4f5f7e64 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -326,6 +326,22 @@ impl EquivalenceProperties { self.add_orderings(std::iter::once(ordering)); } + fn update_oeq_cache(&mut self) -> Result<()> { + // Renormalize orderings if the equivalence group changes: + let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); + let normal_orderings = normal_cls + .into_iter() + .map(|o| self.eq_group.normalize_sort_exprs(o)); + self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); + self.oeq_cache.update_map(); + // Discover any new orderings based on the new equivalence classes: + let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect(); + for expr in leading_exprs { + self.discover_new_orderings(expr)?; + } + Ok(()) + } + /// Incorporates the given equivalence group to into the existing /// equivalence group within. pub fn add_equivalence_group( @@ -334,19 +350,7 @@ impl EquivalenceProperties { ) -> Result<()> { if !other_eq_group.is_empty() { self.eq_group.extend(other_eq_group); - // Renormalize orderings if the equivalence group changes: - let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); - let normal_orderings = normal_cls - .into_iter() - .map(|o| self.eq_group.normalize_sort_exprs(o)); - self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); - self.oeq_cache.update_map(); - // Discover any new orderings based on the new equivalence classes: - let leading_exprs: Vec<_> = - self.oeq_cache.leading_map.keys().cloned().collect(); - for expr in leading_exprs { - self.discover_new_orderings(expr)?; - } + self.update_oeq_cache()?; } Ok(()) } @@ -373,16 +377,9 @@ impl EquivalenceProperties { ) -> Result<()> { // Add equal expressions to the state: if self.eq_group.add_equal_conditions(Arc::clone(&left), right) { - // Renormalize orderings if the equivalence group changes: - let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); - let normal_orderings = normal_cls - .into_iter() - .map(|o| self.eq_group.normalize_sort_exprs(o)); - self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); - self.oeq_cache.update_map(); - // Discover any new orderings: - self.discover_new_orderings(left)?; + self.update_oeq_cache()?; } + self.update_oeq_cache()?; Ok(()) } From 34c660c62349dbbab0f82aea895d13a7068f969a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 25 Aug 2025 22:08:48 -0500 Subject: [PATCH 2/8] fix test --- .../physical_optimizer/filter_pushdown/mod.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 6752bc30bc3c..1deb9ac960b4 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -595,7 +595,7 @@ fn test_no_pushdown_through_aggregates() { Ok: - FilterExec: b@1 = bar - CoalesceBatchesExec: target_batch_size=100 - - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] - CoalesceBatchesExec: target_batch_size=10 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo " diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index f6ff4119db67..e0eaf99eb979 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -583,11 +583,15 @@ impl DataSource for FileScanConfig { EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings) .with_constraints(constraints); if let Some(filter) = self.file_source.filter() { - let filter = reassign_predicate_columns(filter, &schema, false).expect( - "reassign_predicate_columns should succeed as schema is derived from projection", - ); - Self::add_filter_equivalence_info(filter, &mut eq_properties) - .unwrap_or_else(|e| warn!("Failed to add filter equivalence info: {e}")); + match reassign_predicate_columns(filter, &schema, false) { + Ok(filter) => { + match Self::add_filter_equivalence_info(filter, &mut eq_properties) { + Ok(()) => {} + Err(e) => warn!("Failed to add filter equivalence info: {e}"), + } + } + Err(e) => warn!("Failed to add filter equivalence info: {e}"), + }; } eq_properties } From 8a819bc28954eaba5c97e087aceec94a7bcbd709 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 25 Aug 2025 22:14:30 -0500 Subject: [PATCH 3/8] Fix more --- datafusion/datasource/src/file_scan_config.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e0eaf99eb979..2c0412855d84 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -580,17 +580,17 @@ impl DataSource for FileScanConfig { fn eq_properties(&self) -> EquivalenceProperties { let (schema, constraints, _, orderings) = self.project(); let mut eq_properties = - EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings) + EquivalenceProperties::new_with_orderings(schema, orderings) .with_constraints(constraints); if let Some(filter) = self.file_source.filter() { - match reassign_predicate_columns(filter, &schema, false) { + match reassign_predicate_columns(filter, &self.file_schema, false) { Ok(filter) => { match Self::add_filter_equivalence_info(filter, &mut eq_properties) { Ok(()) => {} Err(e) => warn!("Failed to add filter equivalence info: {e}"), } } - Err(e) => warn!("Failed to add filter equivalence info: {e}"), + Err(e) => warn!("Failed to reassign predicate columns: {e}"), }; } eq_properties From 0f35b6ca7c9c7407d39812339d4a74bcfb4df8a0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 25 Aug 2025 23:47:24 -0500 Subject: [PATCH 4/8] reassing --- datafusion/datasource/src/file_scan_config.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 2c0412855d84..b77f4a26f1b0 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -580,17 +580,26 @@ impl DataSource for FileScanConfig { fn eq_properties(&self) -> EquivalenceProperties { let (schema, constraints, _, orderings) = self.project(); let mut eq_properties = - EquivalenceProperties::new_with_orderings(schema, orderings) + EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings) .with_constraints(constraints); if let Some(filter) = self.file_source.filter() { - match reassign_predicate_columns(filter, &self.file_schema, false) { + // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with + match reassign_predicate_columns(filter, &schema, true) { Ok(filter) => { match Self::add_filter_equivalence_info(filter, &mut eq_properties) { Ok(()) => {} - Err(e) => warn!("Failed to add filter equivalence info: {e}"), + Err(e) => { + warn!("Failed to add filter equivalence info: {e}"); + #[cfg(debug_assertions)] + panic!("Failed to add filter equivalence info: {e}"); + }, } } - Err(e) => warn!("Failed to reassign predicate columns: {e}"), + Err(e) => { + warn!("Failed to reassign predicate columns: {e}"); + #[cfg(debug_assertions)] + panic!("Failed to reassign predicate columns: {e}"); + }, }; } eq_properties From 513736e4de466c34675a95560e4868513fdcfd2e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 25 Aug 2025 23:58:18 -0500 Subject: [PATCH 5/8] fix --- datafusion/datasource/src/file.rs | 1 + datafusion/datasource/src/file_scan_config.rs | 7 ++++--- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 7656262a0a05..7a2cf403fd8d 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -69,6 +69,7 @@ pub trait FileSource: Send + Sync { fn with_projection(&self, config: &FileScanConfig) -> Arc; /// Initialize new instance with projected statistics fn with_statistics(&self, statistics: Statistics) -> Arc; + /// Returns the filter expression that will be applied during the file scan. fn filter(&self) -> Option> { None } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index b77f4a26f1b0..47f03cbb1bfe 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -583,7 +583,8 @@ impl DataSource for FileScanConfig { EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings) .with_constraints(constraints); if let Some(filter) = self.file_source.filter() { - // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with + // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with. + // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence. match reassign_predicate_columns(filter, &schema, true) { Ok(filter) => { match Self::add_filter_equivalence_info(filter, &mut eq_properties) { @@ -592,14 +593,14 @@ impl DataSource for FileScanConfig { warn!("Failed to add filter equivalence info: {e}"); #[cfg(debug_assertions)] panic!("Failed to add filter equivalence info: {e}"); - }, + } } } Err(e) => { warn!("Failed to reassign predicate columns: {e}"); #[cfg(debug_assertions)] panic!("Failed to reassign predicate columns: {e}"); - }, + } }; } eq_properties diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 86ad54d3f1eb..44e842e9356f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1000,7 +1000,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { self: Arc, _children: Vec>, ) -> Result> { - todo!() + Ok(self) } fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { From 02b2ebd4a2331b16d12c44b23bcce4e10e3de29e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 Aug 2025 00:02:56 -0500 Subject: [PATCH 6/8] Add test --- .../sqllogictest/test_files/parquet_filter_pushdown.slt | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 6adf379b2171..6dc2c264aeb8 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -575,3 +575,12 @@ WHERE trace_id = '00000000000000000000000000000002' ORDER BY start_timestamp, trace_id; ---- staging + +query P +SELECT start_timestamp +FROM t1 +WHERE trace_id = '00000000000000000000000000000002' AND deployment_environment = 'staging' +ORDER BY start_timestamp, trace_id +LIMIT 1; +---- +2024-10-01T00:00:00Z From 2a804951ac241ed320c360a8c69418fe452c2574 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 06:23:35 -0500 Subject: [PATCH 7/8] Fix test code --- .../core/tests/physical_optimizer/filter_pushdown/mod.rs | 2 +- .../core/tests/physical_optimizer/filter_pushdown/util.rs | 4 ++++ datafusion/datasource/src/source.rs | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 1deb9ac960b4..6752bc30bc3c 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -595,7 +595,7 @@ fn test_no_pushdown_through_aggregates() { Ok: - FilterExec: b@1 = bar - CoalesceBatchesExec: target_batch_size=100 - - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) - CoalesceBatchesExec: target_batch_size=10 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo " diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index acb2b808ef8f..64cee011cca2 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -142,6 +142,10 @@ impl FileSource for TestSource { }) } + fn filter(&self) -> Option> { + self.predicate.clone() + } + fn as_any(&self) -> &dyn Any { todo!("should not be called") } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 60be39bc637d..f9e464c0529f 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -359,6 +359,7 @@ impl ExecutionPlan for DataSourceExec { child_pushdown_result: ChildPushdownResult, config: &ConfigOptions, ) -> Result>> { + println!("pushing down {:?}", child_pushdown_result); // Push any remaining filters into our data source let parent_filters = child_pushdown_result .parent_filters From 315aad1524e37a89595be5b02a2527e5f0cc9a58 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 06:24:41 -0500 Subject: [PATCH 8/8] remove print --- datafusion/datasource/src/source.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index f9e464c0529f..60be39bc637d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -359,7 +359,6 @@ impl ExecutionPlan for DataSourceExec { child_pushdown_result: ChildPushdownResult, config: &ConfigOptions, ) -> Result>> { - println!("pushing down {:?}", child_pushdown_result); // Push any remaining filters into our data source let parent_filters = child_pushdown_result .parent_filters