Skip to content

Commit 0c9aea4

Browse files
committed
Pass filter pushdown through cooperative exec
1 parent 7543850 commit 0c9aea4

File tree

3 files changed

+26
-2
lines changed

3 files changed

+26
-2
lines changed

datafusion/physical-plan/src/coop.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
//! The optimizer rule currently checks the plan for exchange-like operators and leave operators
6666
//! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties).
6767
68+
use datafusion_common::config::ConfigOptions;
69+
use datafusion_physical_expr::PhysicalExpr;
6870
#[cfg(datafusion_coop = "tokio_fallback")]
6971
use futures::Future;
7072
use std::any::Any;
@@ -73,6 +75,10 @@ use std::sync::Arc;
7375
use std::task::{Context, Poll};
7476

7577
use crate::execution_plan::CardinalityEffect::{self, Equal};
78+
use crate::filter_pushdown::{
79+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
80+
FilterPushdownPropagation,
81+
};
7682
use crate::{
7783
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
7884
SendableRecordBatchStream,
@@ -289,6 +295,24 @@ impl ExecutionPlan for CooperativeExec {
289295
fn cardinality_effect(&self) -> CardinalityEffect {
290296
Equal
291297
}
298+
299+
fn gather_filters_for_pushdown(
300+
&self,
301+
_phase: FilterPushdownPhase,
302+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
303+
_config: &ConfigOptions,
304+
) -> Result<FilterDescription> {
305+
FilterDescription::from_children(parent_filters, &self.children())
306+
}
307+
308+
fn handle_child_pushdown_result(
309+
&self,
310+
_phase: FilterPushdownPhase,
311+
child_pushdown_result: ChildPushdownResult,
312+
_config: &ConfigOptions,
313+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
314+
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
315+
}
292316
}
293317

294318
/// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`].

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ physical_plan
392392
44)-----------------------------│ -------------------- ││ -------------------- │
393393
45)-----------------------------│ files: 1 ││ partition_count(in->out): │
394394
46)-----------------------------│ format: parquet ││ 1 -> 4 │
395-
47)-----------------------------│ ││ │
395+
47)-----------------------------│ predicate: true ││ │
396396
48)-----------------------------│ ││ partitioning_scheme: │
397397
49)-----------------------------│ ││ RoundRobinBatch(4) │
398398
50)-----------------------------└───────────────────────────┘└─────────────┬─────────────┘

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa
372372
physical_plan
373373
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
374374
02)--ProjectionExec: expr=[number@0 as number, letter@1 as letter, age@2 as age, number@0 as column4, letter@1 as column5]
375-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet
375+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
376376

377377
# Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age)
378378
query TT

0 commit comments

Comments
 (0)