Skip to content

Commit 2e64a1e

Browse files
committed
feat: allow pushdown of dynamic filters with expr having partition cols
1 parent e323357 commit 2e64a1e

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,8 @@ impl FileFormat for ParquetFormat {
463463
metadata_size_hint = Some(metadata);
464464
}
465465

466-
let mut source = ParquetSource::new(self.options.clone());
466+
let mut source = ParquetSource::new(self.options.clone())
467+
.with_table_partition_cols(conf.table_partition_cols.clone());
467468

468469
// Use the CachedParquetFileReaderFactory
469470
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

datafusion/datasource-parquet/src/source.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use crate::opener::ParquetOpener;
2626
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
2727
use crate::DefaultParquetFileReaderFactory;
2828
use crate::ParquetFileReaderFactory;
29+
use arrow::datatypes::Field;
30+
use arrow::datatypes::Schema;
2931
use datafusion_common::config::ConfigOptions;
3032
#[cfg(feature = "parquet_encryption")]
3133
use datafusion_common::config::EncryptionFactoryOptions;
@@ -43,6 +45,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
4345
use datafusion_physical_expr::conjunction;
4446
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
4547
use datafusion_physical_expr_common::physical_expr::fmt_sql;
48+
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
4649
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4750
use datafusion_physical_plan::filter_pushdown::PushedDown;
4851
use datafusion_physical_plan::filter_pushdown::{
@@ -287,6 +290,7 @@ pub struct ParquetSource {
287290
/// Optional hint for the size of the parquet metadata
288291
pub(crate) metadata_size_hint: Option<usize>,
289292
pub(crate) projected_statistics: Option<Statistics>,
293+
pub(crate) table_partition_cols: Vec<Arc<Field>>,
290294
#[cfg(feature = "parquet_encryption")]
291295
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
292296
}
@@ -330,6 +334,11 @@ impl ParquetSource {
330334
self
331335
}
332336

337+
pub fn with_table_partition_cols(mut self, partition_cols: Vec<Arc<Field>>) -> Self {
338+
self.table_partition_cols = partition_cols;
339+
self
340+
}
341+
333342
/// Options passed to the parquet reader for this scan
334343
pub fn table_parquet_options(&self) -> &TableParquetOptions {
335344
&self.table_parquet_options
@@ -720,7 +729,9 @@ impl FileSource for ParquetSource {
720729
let filters: Vec<PushedDownPredicate> = filters
721730
.into_iter()
722731
.map(|filter| {
723-
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
732+
if is_dynamic_physical_expr(&filter) {
733+
PushedDownPredicate::supported(filter)
734+
} else if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
724735
PushedDownPredicate::supported(filter)
725736
} else {
726737
PushedDownPredicate::unsupported(filter)

0 commit comments

Comments
 (0)