@@ -44,11 +44,10 @@ use datafusion_physical_expr::{
4444 aggregate:: AggregateExprBuilder , conjunction, Partitioning ,
4545} ;
4646use datafusion_physical_expr_common:: physical_expr:: fmt_sql;
47- use datafusion_physical_optimizer:: push_down_filter :: PushdownFilter ;
47+ use datafusion_physical_optimizer:: filter_pushdown :: FilterPushdown ;
4848use datafusion_physical_optimizer:: PhysicalOptimizerRule ;
4949use datafusion_physical_plan:: filter_pushdown:: {
50- filter_pushdown_not_supported, FilterDescription , FilterPushdownResult ,
51- FilterPushdownSupport ,
50+ FilterPushdownPropagation , PredicateSupports ,
5251} ;
5352use datafusion_physical_plan:: {
5453 aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ,
@@ -154,29 +153,24 @@ impl FileSource for TestSource {
154153
155154 fn try_pushdown_filters (
156155 & self ,
157- mut fd : FilterDescription ,
156+ mut filters : Vec < Arc < dyn PhysicalExpr > > ,
158157 config : & ConfigOptions ,
159- ) -> Result < FilterPushdownResult < Arc < dyn FileSource > > > {
158+ ) -> Result < FilterPushdownPropagation < Arc < dyn FileSource > > > {
160159 if self . support && config. execution . parquet . pushdown_filters {
161160 if let Some ( internal) = self . predicate . as_ref ( ) {
162- fd . filters . push ( Arc :: clone ( internal) ) ;
161+ filters. push ( Arc :: clone ( internal) ) ;
163162 }
164- let all_filters = fd. take_description ( ) ;
165-
166- Ok ( FilterPushdownResult {
167- support : FilterPushdownSupport :: Supported {
168- child_descriptions : vec ! [ ] ,
169- op : Arc :: new ( TestSource {
170- support : true ,
171- predicate : Some ( conjunction ( all_filters) ) ,
172- statistics : self . statistics . clone ( ) , // should be updated in reality
173- } ) ,
174- revisit : false ,
175- } ,
176- remaining_description : FilterDescription :: empty ( ) ,
163+ let new_node = Arc :: new ( TestSource {
164+ support : true ,
165+ predicate : Some ( conjunction ( filters. clone ( ) ) ) ,
166+ statistics : self . statistics . clone ( ) , // should be updated in reality
167+ } ) ;
168+ Ok ( FilterPushdownPropagation {
169+ filters : PredicateSupports :: all_supported ( filters) ,
170+ updated_node : Some ( new_node) ,
177171 } )
178172 } else {
179- Ok ( filter_pushdown_not_supported ( fd ) )
173+ Ok ( FilterPushdownPropagation :: unsupported ( filters ) )
180174 }
181175 }
182176}
@@ -201,7 +195,7 @@ fn test_pushdown_into_scan() {
201195
202196 // expect the predicate to be pushed down into the DataSource
203197 insta:: assert_snapshot!(
204- OptimizationTest :: new( plan, PushdownFilter { } , true ) ,
198+ OptimizationTest :: new( plan, FilterPushdown { } , true ) ,
205199 @r"
206200 OptimizationTest:
207201 input:
@@ -225,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() {
225219 insta:: assert_snapshot!(
226220 OptimizationTest :: new(
227221 Arc :: clone( & plan) ,
228- PushdownFilter { } ,
222+ FilterPushdown { } ,
229223 false
230224 ) ,
231225 @r"
@@ -244,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() {
244238 insta:: assert_snapshot!(
245239 OptimizationTest :: new(
246240 plan,
247- PushdownFilter { } ,
241+ FilterPushdown { } ,
248242 true
249243 ) ,
250244 @r"
@@ -269,7 +263,7 @@ fn test_filter_collapse() {
269263 let plan = Arc :: new ( FilterExec :: try_new ( predicate2, filter1) . unwrap ( ) ) ;
270264
271265 insta:: assert_snapshot!(
272- OptimizationTest :: new( plan, PushdownFilter { } , true ) ,
266+ OptimizationTest :: new( plan, FilterPushdown { } , true ) ,
273267 @r"
274268 OptimizationTest:
275269 input:
@@ -278,7 +272,7 @@ fn test_filter_collapse() {
278272 - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
279273 output:
280274 Ok:
281- - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
275+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
282276 "
283277 ) ;
284278}
@@ -288,25 +282,28 @@ fn test_filter_with_projection() {
288282 let scan = test_scan ( true ) ;
289283 let projection = vec ! [ 1 , 0 ] ;
290284 let predicate = col_lit_predicate ( "a" , "foo" , schema ( ) ) ;
291- let plan = Arc :: new (
292- FilterExec :: try_new ( predicate, Arc :: clone ( & scan) )
285+ let filter = Arc :: new (
286+ FilterExec :: try_new ( Arc :: clone ( & predicate) , Arc :: clone ( & scan) )
293287 . unwrap ( )
294288 . with_projection ( Some ( projection) )
295289 . unwrap ( ) ,
296290 ) ;
291+ let predicate = col_lit_predicate ( "b" , "bar" , & filter. schema ( ) ) ;
292+ let plan = Arc :: new ( FilterExec :: try_new ( predicate, filter) . unwrap ( ) ) ;
297293
298294 // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
299295 insta:: assert_snapshot!(
300- OptimizationTest :: new( plan, PushdownFilter { } , true ) ,
296+ OptimizationTest :: new( plan, FilterPushdown { } , true ) ,
301297 @r"
302298 OptimizationTest:
303299 input:
304- - FilterExec: a@0 = foo, projection=[b@1, a@0]
305- - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
300+ - FilterExec: b@0 = bar
301+ - FilterExec: a@0 = foo, projection=[b@1, a@0]
302+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
306303 output:
307304 Ok:
308305 - ProjectionExec: expr=[b@1 as b, a@0 as a]
309- - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
306+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
310307 " ,
311308 ) ;
312309
@@ -320,7 +317,7 @@ fn test_filter_with_projection() {
320317 . unwrap ( ) ,
321318 ) ;
322319 insta:: assert_snapshot!(
323- OptimizationTest :: new( plan, PushdownFilter { } , true ) ,
320+ OptimizationTest :: new( plan, FilterPushdown { } , true ) ,
324321 @r"
325322 OptimizationTest:
326323 input:
@@ -349,7 +346,7 @@ fn test_push_down_through_transparent_nodes() {
349346
350347 // expect the predicate to be pushed down into the DataSource
351348 insta:: assert_snapshot!(
352- OptimizationTest :: new( plan, PushdownFilter { } , true ) ,
349+ OptimizationTest :: new( plan, FilterPushdown { } , true ) ,
353350 @r"
354351 OptimizationTest:
355352 input:
@@ -362,7 +359,7 @@ fn test_push_down_through_transparent_nodes() {
362359 Ok:
363360 - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0
364361 - CoalesceBatchesExec: target_batch_size=1
365- - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
362+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
366363 "
367364 ) ;
368365}
@@ -413,7 +410,7 @@ fn test_no_pushdown_through_aggregates() {
413410
414411 // expect the predicate to be pushed down into the DataSource
415412 insta:: assert_snapshot!(
416- OptimizationTest :: new( plan, PushdownFilter { } , true ) ,
413+ OptimizationTest :: new( plan, FilterPushdown { } , true ) ,
417414 @r"
418415 OptimizationTest:
419416 input:
0 commit comments