diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 27a7e7ae3c06..67a7ba8dc776 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -67,13 +67,13 @@ pub(crate) mod test_util { .into_iter() .zip(tmp_files.into_iter()) .map(|(batch, mut output)| { - let builder = parquet::file::properties::WriterProperties::builder(); - let props = if multi_page { - builder.set_data_page_row_count_limit(ROWS_PER_PAGE) - } else { - builder + let mut builder = parquet::file::properties::WriterProperties::builder(); + if multi_page { + builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE) } - .build(); + builder = builder.set_bloom_filter_enabled(true); + + let props = builder.build(); let mut writer = parquet::arrow::ArrowWriter::try_new( &mut output, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9e1b2822e854..5c06c3902c1c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -43,6 +43,7 @@ mod tests { }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; + use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use bytes::{BufMut, BytesMut}; use datafusion_common::config::TableParquetOptions; @@ -61,8 +62,9 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{col, lit, when, Expr}; use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::analyze::AnalyzeExec; + use datafusion_physical_plan::collect; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; - use datafusion_physical_plan::{collect, displayable}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use chrono::{TimeZone, Utc}; @@ -81,10 +83,10 @@ mod tests { struct RoundTripResult { /// Data that was read back from ParquetFiles batches: Result>, + /// The EXPLAIN ANALYZE output + explain: Result, /// The physical plan that was created (that has statistics, etc) parquet_exec: Arc, - /// The ParquetSource that is used in plan - parquet_source: ParquetSource, } /// round-trip record batches by writing each individual RecordBatch to @@ -137,71 +139,109 @@ mod tests { self.round_trip(batches).await.batches } - /// run the test, returning the `RoundTripResult` - async fn round_trip(self, batches: Vec) -> RoundTripResult { - let Self { - projection, - schema, - predicate, - pushdown_predicate, - page_index_predicate, - } = self; - - let file_schema = match schema { - Some(schema) => schema, - None => Arc::new( - Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - ) - .unwrap(), - ), - }; - // If testing with page_index_predicate, write parquet - // files with multiple pages - let multi_page = page_index_predicate; - let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); - let file_group = meta.into_iter().map(Into::into).collect(); - + fn build_file_source(&self, file_schema: SchemaRef) -> Arc { // set up predicate (this is normally done by a layer higher up) - let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); + let predicate = self + .predicate + .as_ref() + .map(|p| logical2physical(p, &file_schema)); let mut source = ParquetSource::default(); if let Some(predicate) = predicate { source = source.with_predicate(Arc::clone(&file_schema), predicate); } - if pushdown_predicate { + if self.pushdown_predicate { source = source .with_pushdown_filters(true) .with_reorder_filters(true); } - if page_index_predicate { + if self.page_index_predicate { source = source.with_enable_page_index(true); } + Arc::new(source) + } + + fn build_parquet_exec( + &self, + file_schema: SchemaRef, + file_group: FileGroup, + source: Arc, + ) -> Arc { let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, - Arc::new(source.clone()), + source, ) .with_file_group(file_group) - .with_projection(projection) + .with_projection(self.projection.clone()) .build(); + DataSourceExec::from_data_source(base_config) + } + + /// run the test, returning the `RoundTripResult` + async fn round_trip(&self, batches: Vec) -> RoundTripResult { + let file_schema = match &self.schema { + Some(schema) => schema, + None => &Arc::new( + Schema::try_merge( + batches.iter().map(|b| b.schema().as_ref().clone()), + ) + .unwrap(), + ), + }; + let file_schema = Arc::clone(file_schema); + // If testing with page_index_predicate, write parquet + // files with multiple pages + let multi_page = self.page_index_predicate; + let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); + let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); + + // build a ParquetExec to return the results + let parquet_source = self.build_file_source(file_schema.clone()); + let parquet_exec = self.build_parquet_exec( + file_schema.clone(), + file_group.clone(), + Arc::clone(&parquet_source), + ); + + let analyze_exec = Arc::new(AnalyzeExec::new( + false, + false, + // use a new ParquetSource to avoid sharing execution metrics + self.build_parquet_exec( + file_schema.clone(), + file_group.clone(), + self.build_file_source(file_schema.clone()), + ), + Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, true), + Field::new("plan", DataType::Utf8, true), + ])), + )); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let parquet_exec = DataSourceExec::from_data_source(base_config.clone()); + let batches = collect( + Arc::clone(&parquet_exec) as Arc, + task_ctx.clone(), + ) + .await; + + let explain = collect(analyze_exec, task_ctx.clone()) + .await + .map(|batches| { + let batches = pretty_format_batches(&batches).unwrap(); + format!("{batches}") + }); + RoundTripResult { - batches: collect(parquet_exec.clone(), task_ctx).await, + batches, + explain, parquet_exec, - parquet_source: base_config - .file_source() - .as_any() - .downcast_ref::() - .unwrap() - .clone(), } } } @@ -1375,26 +1415,6 @@ mod tests { create_batch(vec![("c1", c1.clone())]) } - /// Returns a int64 array with contents: - /// "[-1, 1, null, 2, 3, null, null]" - fn int64_batch() -> RecordBatch { - let contents: ArrayRef = Arc::new(Int64Array::from(vec![ - Some(-1), - Some(1), - None, - Some(2), - Some(3), - None, - None, - ])); - - create_batch(vec![ - ("a", contents.clone()), - ("b", contents.clone()), - ("c", contents.clone()), - ]) - } - #[tokio::test] async fn parquet_exec_metrics() { // batch1: c1(string) @@ -1454,110 +1474,17 @@ mod tests { .round_trip(vec![batch1]) .await; - // should have a pruning predicate - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); - - // convert to explain plan form - let display = displayable(rt.parquet_exec.as_ref()) - .indent(true) - .to_string(); - - assert_contains!( - &display, - "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)" - ); - - assert_contains!(&display, r#"predicate=c1@0 != bar"#); - - assert_contains!(&display, "projection=[c1]"); - } - - #[tokio::test] - async fn parquet_exec_display_deterministic() { - // batches: a(int64), b(int64), c(int64) - let batches = int64_batch(); - - fn extract_required_guarantees(s: &str) -> Option<&str> { - s.split("required_guarantees=").nth(1) - } - - // Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions - for _ in 0..100 { - // c = 1 AND b = 1 AND a = 1 - let filter0 = col("c") - .eq(lit(1)) - .and(col("b").eq(lit(1))) - .and(col("a").eq(lit(1))); - - let rt0 = RoundTrip::new() - .with_predicate(filter0) - .with_pushdown_predicate() - .round_trip(vec![batches.clone()]) - .await; - - let pruning_predicate = rt0.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); - - let display0 = displayable(rt0.parquet_exec.as_ref()) - .indent(true) - .to_string(); - - let guarantees0: &str = extract_required_guarantees(&display0) - .expect("Failed to extract required_guarantees"); - // Compare only the required_guarantees part (Because the file_groups part will not be the same) - assert_eq!( - guarantees0.trim(), - "[a in (1), b in (1), c in (1)]", - "required_guarantees don't match" - ); - } + let explain = rt.explain.unwrap(); - // c = 1 AND a = 1 AND b = 1 - let filter1 = col("c") - .eq(lit(1)) - .and(col("a").eq(lit(1))) - .and(col("b").eq(lit(1))); + // check that there was a pruning predicate -> row groups got pruned + assert_contains!(&explain, "predicate=c1@0 != bar"); - let rt1 = RoundTrip::new() - .with_predicate(filter1) - .with_pushdown_predicate() - .round_trip(vec![batches.clone()]) - .await; - - // b = 1 AND a = 1 AND c = 1 - let filter2 = col("b") - .eq(lit(1)) - .and(col("a").eq(lit(1))) - .and(col("c").eq(lit(1))); + // there's a single row group, but we can check that it matched + // if no pruning was done this would be 0 instead of 1 + assert_contains!(&explain, "row_groups_matched_statistics=1"); - let rt2 = RoundTrip::new() - .with_predicate(filter2) - .with_pushdown_predicate() - .round_trip(vec![batches]) - .await; - - // should have a pruning predicate - let pruning_predicate = rt1.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); - let pruning_predicate = rt2.parquet_source.predicate(); - assert!(pruning_predicate.is_some()); - - // convert to explain plan form - let display1 = displayable(rt1.parquet_exec.as_ref()) - .indent(true) - .to_string(); - let display2 = displayable(rt2.parquet_exec.as_ref()) - .indent(true) - .to_string(); - - let guarantees1 = extract_required_guarantees(&display1) - .expect("Failed to extract required_guarantees"); - let guarantees2 = extract_required_guarantees(&display2) - .expect("Failed to extract required_guarantees"); - - // Compare only the required_guarantees part (Because the predicate part will not be the same) - assert_eq!(guarantees1, guarantees2, "required_guarantees don't match"); + // check the projection + assert_contains!(&explain, "projection=[c1]"); } #[tokio::test] @@ -1581,16 +1508,19 @@ mod tests { .await; // Should not contain a pruning predicate (since nothing can be pruned) - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!( - pruning_predicate.is_none(), - "Still had pruning predicate: {pruning_predicate:?}" - ); + let explain = rt.explain.unwrap(); - // but does still has a pushdown down predicate - let predicate = rt.parquet_source.predicate(); - let filter_phys = logical2physical(&filter, rt.parquet_exec.schema().as_ref()); - assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string()); + // When both matched and pruned are 0, it means that the pruning predicate + // was not used at all. + assert_contains!(&explain, "row_groups_matched_statistics=0"); + assert_contains!(&explain, "row_groups_pruned_statistics=0"); + + // But pushdown predicate should be present + assert_contains!( + &explain, + "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END" + ); + assert_contains!(&explain, "pushdown_rows_pruned=5"); } #[tokio::test] @@ -1616,8 +1546,14 @@ mod tests { .await; // Should have a pruning predicate - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); + let explain = rt.explain.unwrap(); + assert_contains!( + &explain, + "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END" + ); + + // And bloom filters should have been evaluated + assert_contains!(&explain, "row_groups_pruned_bloom_filter=1"); } /// Returns the sum of all the metrics with the specified name diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 732fef47d5a7..708a8035a4f7 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -34,13 +34,14 @@ use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use futures::{StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::ParquetMetaDataReader; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -54,10 +55,6 @@ pub(super) struct ParquetOpener { pub limit: Option, /// Optional predicate to apply during the scan pub predicate: Option>, - /// Optional pruning predicate applied to row group statistics - pub pruning_predicate: Option>, - /// Optional pruning predicate applied to data page statistics - pub page_pruning_predicate: Option>, /// Schema of the output table pub table_schema: SchemaRef, /// Optional hint for how large the initial request to read parquet metadata @@ -80,6 +77,8 @@ pub(super) struct ParquetOpener { pub enable_bloom_filter: bool, /// Schema adapter factory pub schema_adapter_factory: Arc, + /// Should row group pruning be applied + pub enable_row_group_stats_pruning: bool, } impl FileOpener for ParquetOpener { @@ -92,7 +91,7 @@ impl FileOpener for ParquetOpener { let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint); - let mut reader: Box = + let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -109,47 +108,84 @@ impl FileOpener for ParquetOpener { .schema_adapter_factory .create(projected_schema, Arc::clone(&self.table_schema)); let predicate = self.predicate.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - let page_pruning_predicate = self.page_pruning_predicate.clone(); let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; - let enable_page_index = should_enable_page_index( - self.enable_page_index, - &self.page_pruning_predicate, - ); let enable_bloom_filter = self.enable_bloom_filter; + let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; - Ok(Box::pin(async move { - let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + + let enable_page_index = self.enable_page_index; + Ok(Box::pin(async move { + // Don't load the page index yet. Since it is not stored inline in + // the footer, loading the page index if it is not needed will do + // unecessary I/O. We decide later if it is needed to evaluate the + // pruning predicates. Thus default to not requesting if from the + // underlying reader. + let mut options = ArrowReaderOptions::new().with_page_index(false); let mut metadata_timer = file_metrics.metadata_load_time.timer(); - let metadata = - ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; - let mut schema = Arc::clone(metadata.schema()); - // read with view types - if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &schema) + // Begin by loading the metadata from the underlying reader (note + // the returned metadata may actually include page indexes as some + // readers may return page indexes even when not requested -- for + // example when they are cached) + let mut reader_metadata = + ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) + .await?; + + // Note about schemas: we are actually dealing with **3 different schemas** here: + // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. + // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. + // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. + let mut physical_file_schema = Arc::clone(reader_metadata.schema()); + + // The schema loaded from the file may not be the same as the + // desired schema (for example if we want to instruct the parquet + // reader to read strings using Utf8View instead). Update if necessary + if let Some(merged) = + apply_file_schema_type_coercions(&table_schema, &physical_file_schema) { - schema = Arc::new(merged); + physical_file_schema = Arc::new(merged); + options = options.with_schema(Arc::clone(&physical_file_schema)); + reader_metadata = ArrowReaderMetadata::try_new( + Arc::clone(reader_metadata.metadata()), + options.clone(), + )?; } - let options = ArrowReaderOptions::new() - .with_page_index(enable_page_index) - .with_schema(Arc::clone(&schema)); - let metadata = - ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), options)?; + // Build predicates for this specific file + let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( + &predicate, + &physical_file_schema, + &predicate_creation_errors, + ); - metadata_timer.stop(); + // The page index is not stored inline in the parquet footer so the + // code above may not have read the page index structures yet. If we + // need them for reading and they aren't yet loaded, we need to load them now. + if should_enable_page_index(enable_page_index, &page_pruning_predicate) { + reader_metadata = load_page_index( + reader_metadata, + &mut async_file_reader, + // Since we're manually loading the page index the option here should not matter but we pass it in for consistency + options.with_page_index(true), + ) + .await?; + } - let mut builder = - ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); + metadata_timer.stop(); - let file_schema = Arc::clone(builder.schema()); + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + async_file_reader, + reader_metadata, + ); let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&file_schema)?; + schema_adapter.map_schema(&physical_file_schema)?; let mask = ProjectionMask::roots( builder.parquet_schema(), @@ -160,7 +196,7 @@ impl FileOpener for ParquetOpener { if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( &predicate, - &file_schema, + &physical_file_schema, &table_schema, builder.metadata(), reorder_predicates, @@ -197,18 +233,20 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata if let Some(predicate) = predicate.as_ref() { - row_groups.prune_by_statistics( - &file_schema, - builder.parquet_schema(), - rg_metadata, - predicate, - &file_metrics, - ); + if enable_row_group_stats_pruning { + row_groups.prune_by_statistics( + &physical_file_schema, + builder.parquet_schema(), + rg_metadata, + predicate, + &file_metrics, + ); + } if enable_bloom_filter && !row_groups.is_empty() { row_groups .prune_by_bloom_filters( - &file_schema, + &physical_file_schema, &mut builder, predicate, &file_metrics, @@ -226,7 +264,7 @@ impl FileOpener for ParquetOpener { if let Some(p) = page_pruning_predicate { access_plan = p.prune_plan_with_page_index( access_plan, - &file_schema, + &physical_file_schema, builder.parquet_schema(), file_metadata.as_ref(), &file_metrics, @@ -295,3 +333,91 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub(crate) fn build_pruning_predicate( + predicate: Arc, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub(crate) fn build_page_pruning_predicate( + predicate: &Arc, + file_schema: &SchemaRef, +) -> Arc { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} + +fn build_pruning_predicates( + predicate: &Option>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> ( + Option>, + Option>, +) { + let Some(predicate) = predicate.as_ref() else { + return (None, None); + }; + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + file_schema, + predicate_creation_errors, + ); + let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); + (pruning_predicate, Some(page_pruning_predicate)) +} + +/// Returns a `ArrowReaderMetadata` with the page index loaded, loading +/// it from the underlying `AsyncFileReader` if necessary. +async fn load_page_index( + reader_metadata: ArrowReaderMetadata, + input: &mut T, + options: ArrowReaderOptions, +) -> Result { + let parquet_metadata = reader_metadata.metadata(); + let missing_column_index = parquet_metadata.column_index().is_none(); + let missing_offset_index = parquet_metadata.offset_index().is_none(); + // You may ask yourself: why are we even checking if the page index is already loaded here? + // Didn't we explicitly *not* load it above? + // Well it's possible that a custom implementation of `AsyncFileReader` gives you + // the page index even if you didn't ask for it (e.g. because it's cached) + // so it's important to check that here to avoid extra work. + if missing_column_index || missing_offset_index { + let m = Arc::try_unwrap(Arc::clone(parquet_metadata)) + .unwrap_or_else(|e| e.as_ref().clone()); + let mut reader = + ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input).await?; + let new_parquet_metadata = reader.finish()?; + let new_arrow_reader = + ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?; + Ok(new_arrow_reader) + } else { + // No need to load the page index again, just return the existing metadata + Ok(reader_metadata) + } +} diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index da6bf114d71d..2d2993c29a6f 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -449,7 +449,7 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, - file_schema: &SchemaRef, + physical_file_schema: &SchemaRef, table_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, @@ -470,7 +470,7 @@ pub fn build_row_filter( .map(|expr| { FilterCandidateBuilder::new( Arc::clone(expr), - Arc::clone(file_schema), + Arc::clone(physical_file_schema), Arc::clone(table_schema), Arc::clone(schema_adapter_factory), ) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 66d4d313d5a6..a5629e43636a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -17,9 +17,12 @@ //! ParquetSource implementation for reading parquet files use std::any::Any; +use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use crate::opener::build_page_pruning_predicate; +use crate::opener::build_pruning_predicate; use crate::opener::ParquetOpener; use crate::page_filter::PagePruningAccessPlanFilter; use crate::DefaultParquetFileReaderFactory; @@ -41,7 +44,6 @@ use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_physical_plan::DisplayFormatType; use itertools::Itertools; -use log::debug; use object_store::ObjectStore; /// Execution plan for reading one or more Parquet files. @@ -316,24 +318,10 @@ impl ParquetSource { conf = conf.with_metrics(metrics); conf.predicate = Some(Arc::clone(&predicate)); - match PruningPredicate::try_new(Arc::clone(&predicate), Arc::clone(&file_schema)) - { - Ok(pruning_predicate) => { - if !pruning_predicate.always_true() { - conf.pruning_predicate = Some(Arc::new(pruning_predicate)); - } - } - Err(e) => { - debug!("Could not create pruning predicate for: {e}"); - predicate_creation_errors.add(1); - } - }; - - let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new( - &predicate, - Arc::clone(&file_schema), - )); - conf.page_pruning_predicate = Some(page_pruning_predicate); + conf.page_pruning_predicate = + Some(build_page_pruning_predicate(&predicate, &file_schema)); + conf.pruning_predicate = + build_pruning_predicate(predicate, &file_schema, &predicate_creation_errors); conf } @@ -348,16 +336,6 @@ impl ParquetSource { self.predicate.as_ref() } - /// Optional reference to this parquet scan's pruning predicate - pub fn pruning_predicate(&self) -> Option<&Arc> { - self.pruning_predicate.as_ref() - } - - /// Optional reference to this parquet scan's page pruning predicate - pub fn page_pruning_predicate(&self) -> Option<&Arc> { - self.page_pruning_predicate.as_ref() - } - /// return the optional file reader factory pub fn parquet_file_reader_factory( &self, @@ -488,8 +466,6 @@ impl FileSource for ParquetSource { .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, predicate: self.predicate.clone(), - pruning_predicate: self.pruning_predicate.clone(), - page_pruning_predicate: self.page_pruning_predicate.clone(), table_schema: Arc::clone(&base_config.file_schema), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics().clone(), @@ -498,6 +474,7 @@ impl FileSource for ParquetSource { reorder_filters: self.reorder_filters(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, }) } @@ -537,11 +514,10 @@ impl FileSource for ParquetSource { .expect("projected_statistics must be set"); // When filters are pushed down, we have no way of knowing the exact statistics. // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too) - if self.pruning_predicate().is_some() - || self.page_pruning_predicate().is_some() - || (self.predicate().is_some() && self.pushdown_filters()) - { + // (bloom filters use `pruning_predicate` too). + // Because filter pushdown may happen dynamically as long as there is a predicate + // if we have *any* predicate applied, we can't guarantee the statistics are exact. + if self.predicate().is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -560,7 +536,8 @@ impl FileSource for ParquetSource { .map(|p| format!(", predicate={p}")) .unwrap_or_default(); let pruning_predicate_string = self - .pruning_predicate() + .pruning_predicate + .as_ref() .map(|pre| { let mut guarantees = pre .literal_guarantees() diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 729283289caf..12de84d43589 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -138,6 +138,9 @@ pub struct FileScanConfig { /// Schema before `projection` is applied. It contains the all columns that may /// appear in the files. It does not include table partition columns /// that may be added. + /// Note that this is **not** the schema of the physical files. + /// This is the schema that the physical file schema will be + /// mapped onto, and the schema that the [`DataSourceExec`] will return. pub file_schema: SchemaRef, /// List of files to be processed, grouped into partitions /// @@ -227,6 +230,10 @@ pub struct FileScanConfig { #[derive(Clone)] pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, + /// Table schema before any projections or partition columns are applied. + /// This schema is used to read the files, but is **not** necessarily the schema of the physical files. + /// Rather this is the schema that the physical file schema will be mapped onto, and the schema that the + /// [`DataSourceExec`] will return. file_schema: SchemaRef, file_source: Arc,