From 718bed3f0137dd3e6e0b84873b779ff53b182f78 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 09:30:25 -0500 Subject: [PATCH 01/15] parquet reader: move pruning predicate creation from ParquetSource to ParquetOpener --- .../src/datasource/file_format/parquet.rs | 12 +- .../src/datasource/physical_plan/parquet.rs | 284 +++++++----------- datafusion/datasource-parquet/src/opener.rs | 88 ++++-- datafusion/datasource-parquet/src/source.rs | 67 +---- 4 files changed, 198 insertions(+), 253 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 27a7e7ae3c06..67a7ba8dc776 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -67,13 +67,13 @@ pub(crate) mod test_util { .into_iter() .zip(tmp_files.into_iter()) .map(|(batch, mut output)| { - let builder = parquet::file::properties::WriterProperties::builder(); - let props = if multi_page { - builder.set_data_page_row_count_limit(ROWS_PER_PAGE) - } else { - builder + let mut builder = parquet::file::properties::WriterProperties::builder(); + if multi_page { + builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE) } - .build(); + builder = builder.set_bloom_filter_enabled(true); + + let props = builder.build(); let mut writer = parquet::arrow::ArrowWriter::try_new( &mut output, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9e1b2822e854..624d5d13e22d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -43,6 +43,7 @@ mod tests { }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; + use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use bytes::{BufMut, BytesMut}; use datafusion_common::config::TableParquetOptions; @@ -61,8 +62,9 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{col, lit, when, Expr}; use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::analyze::AnalyzeExec; + use datafusion_physical_plan::collect; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; - use datafusion_physical_plan::{collect, displayable}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use chrono::{TimeZone, Utc}; @@ -81,10 +83,10 @@ mod tests { struct RoundTripResult { /// Data that was read back from ParquetFiles batches: Result>, + /// The EXPLAIN ANALYZE output + explain: Result, /// The physical plan that was created (that has statistics, etc) parquet_exec: Arc, - /// The ParquetSource that is used in plan - parquet_source: ParquetSource, } /// round-trip record batches by writing each individual RecordBatch to @@ -137,71 +139,109 @@ mod tests { self.round_trip(batches).await.batches } - /// run the test, returning the `RoundTripResult` - async fn round_trip(self, batches: Vec) -> RoundTripResult { - let Self { - projection, - schema, - predicate, - pushdown_predicate, - page_index_predicate, - } = self; - - let file_schema = match schema { - Some(schema) => schema, - None => Arc::new( - Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - ) - .unwrap(), - ), - }; - // If testing with page_index_predicate, write parquet - // files with multiple pages - let multi_page = page_index_predicate; - let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); - let file_group = meta.into_iter().map(Into::into).collect(); - + fn build_file_source(&self, file_schema: SchemaRef) -> Arc { // set up predicate (this is normally done by a layer higher up) - let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); + let predicate = self + .predicate + .as_ref() + .map(|p| logical2physical(p, &file_schema)); let mut source = ParquetSource::default(); if let Some(predicate) = predicate { source = source.with_predicate(Arc::clone(&file_schema), predicate); } - if pushdown_predicate { + if self.pushdown_predicate { source = source .with_pushdown_filters(true) .with_reorder_filters(true); } - if page_index_predicate { + if self.page_index_predicate { source = source.with_enable_page_index(true); } + Arc::new(source) + } + + fn build_parquet_exec( + &self, + file_schema: SchemaRef, + file_group: FileGroup, + source: Arc, + ) -> Arc { let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, - Arc::new(source.clone()), + source, ) .with_file_group(file_group) - .with_projection(projection) + .with_projection(self.projection.clone()) .build(); + DataSourceExec::from_data_source(base_config) + } + + /// run the test, returning the `RoundTripResult` + async fn round_trip(&self, batches: Vec) -> RoundTripResult { + let file_schema = match &self.schema { + Some(schema) => schema, + None => &Arc::new( + Schema::try_merge( + batches.iter().map(|b| b.schema().as_ref().clone()), + ) + .unwrap(), + ), + }; + let file_schema = Arc::clone(file_schema); + // If testing with page_index_predicate, write parquet + // files with multiple pages + let multi_page = self.page_index_predicate; + let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); + let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); + + // build a ParquetExec to return the results + let parquet_source = self.build_file_source(file_schema.clone()); + let parquet_exec = self.build_parquet_exec( + file_schema.clone(), + file_group.clone(), + Arc::clone(&parquet_source), + ); + + let analyze_exec = Arc::new(AnalyzeExec::new( + false, + false, + // use a new ParquetSource to avoid sharing execution metrics + self.build_parquet_exec( + file_schema.clone(), + file_group.clone(), + self.build_file_source(file_schema.clone()), + ), + Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, true), + Field::new("plan", DataType::Utf8, true), + ])), + )); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let parquet_exec = DataSourceExec::from_data_source(base_config.clone()); + let batches = collect( + Arc::clone(&parquet_exec) as Arc, + task_ctx.clone(), + ) + .await; + + let explain = collect(analyze_exec, task_ctx.clone()) + .await + .map(|batches| { + let batches = pretty_format_batches(&batches).unwrap(); + format!("{batches}") + }); + RoundTripResult { - batches: collect(parquet_exec.clone(), task_ctx).await, + batches, + explain, parquet_exec, - parquet_source: base_config - .file_source() - .as_any() - .downcast_ref::() - .unwrap() - .clone(), } } } @@ -1375,26 +1415,6 @@ mod tests { create_batch(vec![("c1", c1.clone())]) } - /// Returns a int64 array with contents: - /// "[-1, 1, null, 2, 3, null, null]" - fn int64_batch() -> RecordBatch { - let contents: ArrayRef = Arc::new(Int64Array::from(vec![ - Some(-1), - Some(1), - None, - Some(2), - Some(3), - None, - None, - ])); - - create_batch(vec![ - ("a", contents.clone()), - ("b", contents.clone()), - ("c", contents.clone()), - ]) - } - #[tokio::test] async fn parquet_exec_metrics() { // batch1: c1(string) @@ -1445,8 +1465,8 @@ mod tests { // batch1: c1(string) let batch1 = string_batch(); - // c1 != 'bar' - let filter = col("c1").not_eq(lit("bar")); + // c1 == 'aaa', should prune via stats + let filter = col("c1").eq(lit("aaa")); let rt = RoundTrip::new() .with_predicate(filter) @@ -1454,110 +1474,17 @@ mod tests { .round_trip(vec![batch1]) .await; - // should have a pruning predicate - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); + let explain = rt.explain.unwrap(); - // convert to explain plan form - let display = displayable(rt.parquet_exec.as_ref()) - .indent(true) - .to_string(); + // check that there was a pruning predicate -> row groups got pruned + assert_contains!(&explain, "predicate=c1@0 = aaa"); - assert_contains!( - &display, - "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)" - ); - - assert_contains!(&display, r#"predicate=c1@0 != bar"#); + // there's a single row group, but we can check that it matched + // if no pruning was done this would be 0 instead of 1 + assert_contains!(&explain, "row_groups_matched_statistics=1"); - assert_contains!(&display, "projection=[c1]"); - } - - #[tokio::test] - async fn parquet_exec_display_deterministic() { - // batches: a(int64), b(int64), c(int64) - let batches = int64_batch(); - - fn extract_required_guarantees(s: &str) -> Option<&str> { - s.split("required_guarantees=").nth(1) - } - - // Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions - for _ in 0..100 { - // c = 1 AND b = 1 AND a = 1 - let filter0 = col("c") - .eq(lit(1)) - .and(col("b").eq(lit(1))) - .and(col("a").eq(lit(1))); - - let rt0 = RoundTrip::new() - .with_predicate(filter0) - .with_pushdown_predicate() - .round_trip(vec![batches.clone()]) - .await; - - let pruning_predicate = rt0.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); - - let display0 = displayable(rt0.parquet_exec.as_ref()) - .indent(true) - .to_string(); - - let guarantees0: &str = extract_required_guarantees(&display0) - .expect("Failed to extract required_guarantees"); - // Compare only the required_guarantees part (Because the file_groups part will not be the same) - assert_eq!( - guarantees0.trim(), - "[a in (1), b in (1), c in (1)]", - "required_guarantees don't match" - ); - } - - // c = 1 AND a = 1 AND b = 1 - let filter1 = col("c") - .eq(lit(1)) - .and(col("a").eq(lit(1))) - .and(col("b").eq(lit(1))); - - let rt1 = RoundTrip::new() - .with_predicate(filter1) - .with_pushdown_predicate() - .round_trip(vec![batches.clone()]) - .await; - - // b = 1 AND a = 1 AND c = 1 - let filter2 = col("b") - .eq(lit(1)) - .and(col("a").eq(lit(1))) - .and(col("c").eq(lit(1))); - - let rt2 = RoundTrip::new() - .with_predicate(filter2) - .with_pushdown_predicate() - .round_trip(vec![batches]) - .await; - - // should have a pruning predicate - let pruning_predicate = rt1.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); - let pruning_predicate = rt2.parquet_source.predicate(); - assert!(pruning_predicate.is_some()); - - // convert to explain plan form - let display1 = displayable(rt1.parquet_exec.as_ref()) - .indent(true) - .to_string(); - let display2 = displayable(rt2.parquet_exec.as_ref()) - .indent(true) - .to_string(); - - let guarantees1 = extract_required_guarantees(&display1) - .expect("Failed to extract required_guarantees"); - let guarantees2 = extract_required_guarantees(&display2) - .expect("Failed to extract required_guarantees"); - - // Compare only the required_guarantees part (Because the predicate part will not be the same) - assert_eq!(guarantees1, guarantees2, "required_guarantees don't match"); + // check the projection + assert_contains!(&explain, "projection=[c1]"); } #[tokio::test] @@ -1581,16 +1508,17 @@ mod tests { .await; // Should not contain a pruning predicate (since nothing can be pruned) - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!( - pruning_predicate.is_none(), - "Still had pruning predicate: {pruning_predicate:?}" - ); + let explain = rt.explain.unwrap(); - // but does still has a pushdown down predicate - let predicate = rt.parquet_source.predicate(); - let filter_phys = logical2physical(&filter, rt.parquet_exec.schema().as_ref()); - assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string()); + assert_contains!(&explain, "row_groups_matched_statistics=0"); + assert_contains!(&explain, "row_groups_pruned_statistics=0"); + + // But pushdown predicate should be present + assert_contains!( + &explain, + "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END" + ); + assert_contains!(&explain, "pushdown_rows_pruned=5"); } #[tokio::test] @@ -1616,8 +1544,14 @@ mod tests { .await; // Should have a pruning predicate - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); + let explain = rt.explain.unwrap(); + assert_contains!( + &explain, + "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END" + ); + + // And bloom filters should have been evaluated + assert_contains!(&explain, "row_groups_pruned_bloom_filter=1"); } /// Returns the sum of all the metrics with the specified name diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 732fef47d5a7..dee873930838 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -34,7 +34,7 @@ use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use futures::{StreamExt, TryStreamExt}; use log::debug; @@ -54,10 +54,6 @@ pub(super) struct ParquetOpener { pub limit: Option, /// Optional predicate to apply during the scan pub predicate: Option>, - /// Optional pruning predicate applied to row group statistics - pub pruning_predicate: Option>, - /// Optional pruning predicate applied to data page statistics - pub page_pruning_predicate: Option>, /// Schema of the output table pub table_schema: SchemaRef, /// Optional hint for how large the initial request to read parquet metadata @@ -80,6 +76,8 @@ pub(super) struct ParquetOpener { pub enable_bloom_filter: bool, /// Schema adapter factory pub schema_adapter_factory: Arc, + /// Should row group pruning be applied + pub enable_row_group_stats_pruning: bool, } impl FileOpener for ParquetOpener { @@ -109,18 +107,33 @@ impl FileOpener for ParquetOpener { .schema_adapter_factory .create(projected_schema, Arc::clone(&self.table_schema)); let predicate = self.predicate.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - let page_pruning_predicate = self.page_pruning_predicate.clone(); let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; - let enable_page_index = should_enable_page_index( - self.enable_page_index, - &self.page_pruning_predicate, - ); let enable_bloom_filter = self.enable_bloom_filter; + let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + + let (pruning_predicate, page_pruning_predicate) = + if let Some(predicate) = &predicate { + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + &table_schema, + &predicate_creation_errors, + ); + let page_pruning_predicate = + build_page_pruning_predicate(predicate, &table_schema); + (pruning_predicate, Some(page_pruning_predicate)) + } else { + (None, None) + }; + + let enable_page_index = + should_enable_page_index(self.enable_page_index, &page_pruning_predicate); + Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -197,13 +210,15 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata if let Some(predicate) = predicate.as_ref() { - row_groups.prune_by_statistics( - &file_schema, - builder.parquet_schema(), - rg_metadata, - predicate, - &file_metrics, - ); + if enable_row_group_stats_pruning { + row_groups.prune_by_statistics( + &file_schema, + builder.parquet_schema(), + rg_metadata, + predicate, + &file_metrics, + ); + } if enable_bloom_filter && !row_groups.is_empty() { row_groups @@ -295,3 +310,40 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub(crate) fn build_pruning_predicate( + predicate: Arc, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub(crate) fn build_page_pruning_predicate( + predicate: &Arc, + file_schema: &SchemaRef, +) -> Arc { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 66d4d313d5a6..80f51bcc5c63 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -17,9 +17,12 @@ //! ParquetSource implementation for reading parquet files use std::any::Any; +use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use crate::opener::build_page_pruning_predicate; +use crate::opener::build_pruning_predicate; use crate::opener::ParquetOpener; use crate::page_filter::PagePruningAccessPlanFilter; use crate::DefaultParquetFileReaderFactory; @@ -40,8 +43,6 @@ use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_physical_plan::DisplayFormatType; -use itertools::Itertools; -use log::debug; use object_store::ObjectStore; /// Execution plan for reading one or more Parquet files. @@ -316,24 +317,10 @@ impl ParquetSource { conf = conf.with_metrics(metrics); conf.predicate = Some(Arc::clone(&predicate)); - match PruningPredicate::try_new(Arc::clone(&predicate), Arc::clone(&file_schema)) - { - Ok(pruning_predicate) => { - if !pruning_predicate.always_true() { - conf.pruning_predicate = Some(Arc::new(pruning_predicate)); - } - } - Err(e) => { - debug!("Could not create pruning predicate for: {e}"); - predicate_creation_errors.add(1); - } - }; - - let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new( - &predicate, - Arc::clone(&file_schema), - )); - conf.page_pruning_predicate = Some(page_pruning_predicate); + conf.page_pruning_predicate = + Some(build_page_pruning_predicate(&predicate, &file_schema)); + conf.pruning_predicate = + build_pruning_predicate(predicate, &file_schema, &predicate_creation_errors); conf } @@ -348,16 +335,6 @@ impl ParquetSource { self.predicate.as_ref() } - /// Optional reference to this parquet scan's pruning predicate - pub fn pruning_predicate(&self) -> Option<&Arc> { - self.pruning_predicate.as_ref() - } - - /// Optional reference to this parquet scan's page pruning predicate - pub fn page_pruning_predicate(&self) -> Option<&Arc> { - self.page_pruning_predicate.as_ref() - } - /// return the optional file reader factory pub fn parquet_file_reader_factory( &self, @@ -488,8 +465,6 @@ impl FileSource for ParquetSource { .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, predicate: self.predicate.clone(), - pruning_predicate: self.pruning_predicate.clone(), - page_pruning_predicate: self.page_pruning_predicate.clone(), table_schema: Arc::clone(&base_config.file_schema), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics().clone(), @@ -498,6 +473,7 @@ impl FileSource for ParquetSource { reorder_filters: self.reorder_filters(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, }) } @@ -537,11 +513,10 @@ impl FileSource for ParquetSource { .expect("projected_statistics must be set"); // When filters are pushed down, we have no way of knowing the exact statistics. // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too) - if self.pruning_predicate().is_some() - || self.page_pruning_predicate().is_some() - || (self.predicate().is_some() && self.pushdown_filters()) - { + // (bloom filters use `pruning_predicate` too). + // Because filter pushdown may happen dynamically as long as there is a predicate + // if we have *any* predicate applied, we can't guarantee the statistics are exact. + if self.predicate().is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -559,24 +534,8 @@ impl FileSource for ParquetSource { .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); - let pruning_predicate_string = self - .pruning_predicate() - .map(|pre| { - let mut guarantees = pre - .literal_guarantees() - .iter() - .map(|item| format!("{}", item)) - .collect_vec(); - guarantees.sort(); - format!( - ", pruning_predicate={}, required_guarantees=[{}]", - pre.predicate_expr(), - guarantees.join(", ") - ) - }) - .unwrap_or_default(); - write!(f, "{}{}", predicate_string, pruning_predicate_string) + write!(f, "{}", predicate_string) } DisplayFormatType::TreeRender => { if let Some(predicate) = self.predicate() { From 4655adc0db08667c740424f5b3c70a8bea59790e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:06:57 -0500 Subject: [PATCH 02/15] use file schema, avoid loading page index if unecessary --- datafusion/datasource-parquet/src/opener.rs | 119 ++++++++++++------ .../datasource-parquet/src/row_filter.rs | 4 +- datafusion/datasource/src/file_scan_config.rs | 7 ++ 3 files changed, 93 insertions(+), 37 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index dee873930838..5b7a0b5ef925 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -41,6 +41,7 @@ use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::ParquetMetaDataReader; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -117,52 +118,56 @@ impl FileOpener for ParquetOpener { let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); - let (pruning_predicate, page_pruning_predicate) = - if let Some(predicate) = &predicate { - let pruning_predicate = build_pruning_predicate( - Arc::clone(predicate), - &table_schema, - &predicate_creation_errors, - ); - let page_pruning_predicate = - build_page_pruning_predicate(predicate, &table_schema); - (pruning_predicate, Some(page_pruning_predicate)) - } else { - (None, None) - }; - - let enable_page_index = - should_enable_page_index(self.enable_page_index, &page_pruning_predicate); + let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { - let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + // Don't load the page index yet - we will decide later if we need it + let options = ArrowReaderOptions::new().with_page_index(false); let mut metadata_timer = file_metrics.metadata_load_time.timer(); - let metadata = + let mut metadata = ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; - let mut schema = Arc::clone(metadata.schema()); + // Note about schemas: we are actually dealing with **3 different schemas** here: + // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. + // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. + // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. + let mut physical_file_schema = Arc::clone(metadata.schema()); // read with view types - if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &schema) + if let Some(merged) = + apply_file_schema_type_coercions(&table_schema, &physical_file_schema) { - schema = Arc::new(merged); + physical_file_schema = Arc::new(merged); } - let options = ArrowReaderOptions::new() - .with_page_index(enable_page_index) - .with_schema(Arc::clone(&schema)); - let metadata = - ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), options)?; + // Build predicates for this specific file + let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( + &predicate, + &physical_file_schema, + &predicate_creation_errors, + ); + + let enable_page_index = + should_enable_page_index(enable_page_index, &page_pruning_predicate); + + // Now check if we should load the page index + if should_enable_page_index(enable_page_index, &page_pruning_predicate) { + metadata = load_page_index( + metadata, + &mut reader, + // Since we're manually loading the page index the option here should not matter but we pass it in for consistency + ArrowReaderOptions::new().with_page_index(true), + ) + .await?; + } metadata_timer.stop(); let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); - let file_schema = Arc::clone(builder.schema()); - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&file_schema)?; + schema_adapter.map_schema(&physical_file_schema)?; let mask = ProjectionMask::roots( builder.parquet_schema(), @@ -173,7 +178,7 @@ impl FileOpener for ParquetOpener { if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( &predicate, - &file_schema, + &physical_file_schema, &table_schema, builder.metadata(), reorder_predicates, @@ -212,7 +217,7 @@ impl FileOpener for ParquetOpener { if let Some(predicate) = predicate.as_ref() { if enable_row_group_stats_pruning { row_groups.prune_by_statistics( - &file_schema, + &physical_file_schema, builder.parquet_schema(), rg_metadata, predicate, @@ -223,7 +228,7 @@ impl FileOpener for ParquetOpener { if enable_bloom_filter && !row_groups.is_empty() { row_groups .prune_by_bloom_filters( - &file_schema, + &physical_file_schema, &mut builder, predicate, &file_metrics, @@ -241,7 +246,7 @@ impl FileOpener for ParquetOpener { if let Some(p) = page_pruning_predicate { access_plan = p.prune_plan_with_page_index( access_plan, - &file_schema, + &physical_file_schema, builder.parquet_schema(), file_metadata.as_ref(), &file_metrics, @@ -316,7 +321,7 @@ fn create_initial_plan( /// predicate, return None. /// If there is an error creating the pruning predicate it is recorded by incrementing /// the `predicate_creation_errors` counter. -pub(crate) fn build_pruning_predicate( +pub fn build_pruning_predicate( predicate: Arc, file_schema: &SchemaRef, predicate_creation_errors: &Count, @@ -338,7 +343,7 @@ pub(crate) fn build_pruning_predicate( /// Build a page pruning predicate from an optional predicate expression. /// If the predicate is None or the predicate cannot be converted to a page pruning /// predicate, return None. -pub(crate) fn build_page_pruning_predicate( +pub fn build_page_pruning_predicate( predicate: &Arc, file_schema: &SchemaRef, ) -> Arc { @@ -347,3 +352,47 @@ pub(crate) fn build_page_pruning_predicate( Arc::clone(file_schema), )) } + +fn build_pruning_predicates( + predicate: &Option>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> ( + Option>, + Option>, +) { + let Some(predicate) = predicate.as_ref() else { + return (None, None); + }; + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + file_schema, + predicate_creation_errors, + ); + let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); + (pruning_predicate, Some(page_pruning_predicate)) +} + +async fn load_page_index( + arrow_reader: ArrowReaderMetadata, + input: &mut T, + options: ArrowReaderOptions, +) -> Result { + let parquet_metadata = arrow_reader.metadata(); + let missing_column_index = parquet_metadata.column_index().is_none(); + let missing_offset_index = parquet_metadata.offset_index().is_none(); + if missing_column_index || missing_offset_index { + let m = Arc::try_unwrap(Arc::clone(&parquet_metadata)) + .unwrap_or_else(|e| e.as_ref().clone()); + let mut reader = + ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input).await?; + let new_parquet_metadata = reader.finish()?; + let new_arrow_reader = + ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?; + Ok(new_arrow_reader) + } else { + // No page index, return the original metadata + Ok(arrow_reader) + } +} diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index da6bf114d71d..2d2993c29a6f 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -449,7 +449,7 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, - file_schema: &SchemaRef, + physical_file_schema: &SchemaRef, table_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, @@ -470,7 +470,7 @@ pub fn build_row_filter( .map(|expr| { FilterCandidateBuilder::new( Arc::clone(expr), - Arc::clone(file_schema), + Arc::clone(physical_file_schema), Arc::clone(table_schema), Arc::clone(schema_adapter_factory), ) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 729283289caf..12de84d43589 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -138,6 +138,9 @@ pub struct FileScanConfig { /// Schema before `projection` is applied. It contains the all columns that may /// appear in the files. It does not include table partition columns /// that may be added. + /// Note that this is **not** the schema of the physical files. + /// This is the schema that the physical file schema will be + /// mapped onto, and the schema that the [`DataSourceExec`] will return. pub file_schema: SchemaRef, /// List of files to be processed, grouped into partitions /// @@ -227,6 +230,10 @@ pub struct FileScanConfig { #[derive(Clone)] pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, + /// Table schema before any projections or partition columns are applied. + /// This schema is used to read the files, but is **not** necessarily the schema of the physical files. + /// Rather this is the schema that the physical file schema will be mapped onto, and the schema that the + /// [`DataSourceExec`] will return. file_schema: SchemaRef, file_source: Arc, From a30b6313969d63f62b4b72f518b8a19d435461a4 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:12:06 -0500 Subject: [PATCH 03/15] Add comment --- datafusion/core/src/datasource/physical_plan/parquet.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 624d5d13e22d..5ff35a7c9e36 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1510,6 +1510,8 @@ mod tests { // Should not contain a pruning predicate (since nothing can be pruned) let explain = rt.explain.unwrap(); + // When both matched and pruned are 0, it means that the pruning predicate + // was not used at all. assert_contains!(&explain, "row_groups_matched_statistics=0"); assert_contains!(&explain, "row_groups_pruned_statistics=0"); From 398e7a45a57ceeae4b9877b4fb79fc1d4722856e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:36:03 -0500 Subject: [PATCH 04/15] add comment --- datafusion/datasource-parquet/src/opener.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5b7a0b5ef925..cacf72d060db 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -381,6 +381,11 @@ async fn load_page_index( let parquet_metadata = arrow_reader.metadata(); let missing_column_index = parquet_metadata.column_index().is_none(); let missing_offset_index = parquet_metadata.offset_index().is_none(); + // You may ask yourself: why are we even checking if the page index is already loaded here? + // Didn't we explicitly *not* load it above? + // Well it's possible that a custom implementation of `AsyncFileReader` gives you + // the page index even if you didn't ask for it (e.g. because it's cached) + // so it's important to check that here to avoid extra work. if missing_column_index || missing_offset_index { let m = Arc::try_unwrap(Arc::clone(&parquet_metadata)) .unwrap_or_else(|e| e.as_ref().clone()); From 05bdcf8c9af0da86a196f296988b7af336aa980e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:16:17 -0500 Subject: [PATCH 05/15] Add comment --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index cacf72d060db..8ddf32767507 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -397,7 +397,7 @@ async fn load_page_index( ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?; Ok(new_arrow_reader) } else { - // No page index, return the original metadata + // No need to load the page index again, just return the existing metadata Ok(arrow_reader) } } From 9a38aec35ed322f66da131603461821d329e1400 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:38:18 -0500 Subject: [PATCH 06/15] remove check --- datafusion/datasource-parquet/src/opener.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 8ddf32767507..f1952098b85f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -147,9 +147,6 @@ impl FileOpener for ParquetOpener { &predicate_creation_errors, ); - let enable_page_index = - should_enable_page_index(enable_page_index, &page_pruning_predicate); - // Now check if we should load the page index if should_enable_page_index(enable_page_index, &page_pruning_predicate) { metadata = load_page_index( From f886645370dbcea17fd385970ddca787e7730a47 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:52:16 -0500 Subject: [PATCH 07/15] fix clippy --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f1952098b85f..3c978fb57a3a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -384,7 +384,7 @@ async fn load_page_index( // the page index even if you didn't ask for it (e.g. because it's cached) // so it's important to check that here to avoid extra work. if missing_column_index || missing_offset_index { - let m = Arc::try_unwrap(Arc::clone(&parquet_metadata)) + let m = Arc::try_unwrap(Arc::clone(parquet_metadata)) .unwrap_or_else(|e| e.as_ref().clone()); let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); From 2d486d5f0d0130487afa8c153443f501dca9e3f6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 12:57:23 -0500 Subject: [PATCH 08/15] update sqllogictest --- datafusion/sqllogictest/test_files/parquet.slt | 2 +- .../sqllogictest/test_files/parquet_filter_pushdown.slt | 8 ++++---- datafusion/sqllogictest/test_files/repartition_scan.slt | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 2970b2effb3e..14d168d22967 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -625,7 +625,7 @@ physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 LIKE f% 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f% statement ok drop table foo diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 758113b70835..51213162c0ea 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -85,7 +85,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 # When filter pushdown *is* enabled, ParquetExec can filter exactly, @@ -113,7 +113,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: b@1 > 2, projection=[a@0] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 # also test querying on columns that are not in all the files query T @@ -131,7 +131,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL query I @@ -148,7 +148,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar ## cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 2b30de572c8c..d4eac3045572 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -61,7 +61,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 # disable round robin repartitioning statement ok @@ -77,7 +77,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 # enable round robin repartitioning again statement ok @@ -102,7 +102,7 @@ physical_plan 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: column1@0 != 42 -05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 ## Read the files as though they are ordered @@ -138,7 +138,7 @@ physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----FilterExec: column1@0 != 42 -04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42 # Cleanup statement ok From bc9af127fb8a171221be232bf76f5a552ef8bfc2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 16:32:15 -0500 Subject: [PATCH 09/15] restore to explain plans --- datafusion/datasource-parquet/src/source.rs | 20 ++++++++++++++++++- .../sqllogictest/test_files/parquet.slt | 2 +- .../test_files/parquet_filter_pushdown.slt | 8 ++++---- .../test_files/repartition_scan.slt | 8 ++++---- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 80f51bcc5c63..a5629e43636a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -43,6 +43,7 @@ use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_physical_plan::DisplayFormatType; +use itertools::Itertools; use object_store::ObjectStore; /// Execution plan for reading one or more Parquet files. @@ -534,8 +535,25 @@ impl FileSource for ParquetSource { .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); + let pruning_predicate_string = self + .pruning_predicate + .as_ref() + .map(|pre| { + let mut guarantees = pre + .literal_guarantees() + .iter() + .map(|item| format!("{}", item)) + .collect_vec(); + guarantees.sort(); + format!( + ", pruning_predicate={}, required_guarantees=[{}]", + pre.predicate_expr(), + guarantees.join(", ") + ) + }) + .unwrap_or_default(); - write!(f, "{}", predicate_string) + write!(f, "{}{}", predicate_string, pruning_predicate_string) } DisplayFormatType::TreeRender => { if let Some(predicate) = self.predicate() { diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 14d168d22967..2970b2effb3e 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -625,7 +625,7 @@ physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 LIKE f% 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f% +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[] statement ok drop table foo diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 51213162c0ea..758113b70835 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -85,7 +85,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] # When filter pushdown *is* enabled, ParquetExec can filter exactly, @@ -113,7 +113,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: b@1 > 2, projection=[a@0] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] # also test querying on columns that are not in all the files query T @@ -131,7 +131,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] query I @@ -148,7 +148,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] ## cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index d4eac3045572..2b30de572c8c 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -61,7 +61,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # disable round robin repartitioning statement ok @@ -77,7 +77,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # enable round robin repartitioning again statement ok @@ -102,7 +102,7 @@ physical_plan 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: column1@0 != 42 -05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] ## Read the files as though they are ordered @@ -138,7 +138,7 @@ physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----FilterExec: column1@0 != 42 -04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42 +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # Cleanup statement ok From 3fd4b8adf2d92ee92a826c7faa5728b78ca1e099 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 16:42:34 -0500 Subject: [PATCH 10/15] reverted --- datafusion/core/src/datasource/physical_plan/parquet.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 5ff35a7c9e36..5c06c3902c1c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1465,8 +1465,8 @@ mod tests { // batch1: c1(string) let batch1 = string_batch(); - // c1 == 'aaa', should prune via stats - let filter = col("c1").eq(lit("aaa")); + // c1 != 'bar' + let filter = col("c1").not_eq(lit("bar")); let rt = RoundTrip::new() .with_predicate(filter) @@ -1477,7 +1477,7 @@ mod tests { let explain = rt.explain.unwrap(); // check that there was a pruning predicate -> row groups got pruned - assert_contains!(&explain, "predicate=c1@0 = aaa"); + assert_contains!(&explain, "predicate=c1@0 != bar"); // there's a single row group, but we can check that it matched // if no pruning was done this would be 0 instead of 1 From c193d2e94c16bd9f50de4182be84c7e3eedb5b36 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Apr 2025 17:53:33 -0500 Subject: [PATCH 11/15] modify access --- datafusion/datasource-parquet/src/opener.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c978fb57a3a..9ddb8c8332ac 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -318,7 +318,7 @@ fn create_initial_plan( /// predicate, return None. /// If there is an error creating the pruning predicate it is recorded by incrementing /// the `predicate_creation_errors` counter. -pub fn build_pruning_predicate( +pub(crate) fn build_pruning_predicate( predicate: Arc, file_schema: &SchemaRef, predicate_creation_errors: &Count, @@ -340,7 +340,7 @@ pub fn build_pruning_predicate( /// Build a page pruning predicate from an optional predicate expression. /// If the predicate is None or the predicate cannot be converted to a page pruning /// predicate, return None. -pub fn build_page_pruning_predicate( +pub(crate) fn build_page_pruning_predicate( predicate: &Arc, file_schema: &SchemaRef, ) -> Arc { From cd6d766ac9d8ee8bb1a27fe5f021ae75cad9912b Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 4 Apr 2025 23:43:25 +0800 Subject: [PATCH 12/15] =?UTF-8?q?Fix=20ArrowReaderOptions=20should=20read?= =?UTF-8?q?=20with=20physical=5Ffile=5Fschema=20so=20we=20do=E2=80=A6=20(#?= =?UTF-8?q?17)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix ArrowReaderOptions should read with physical_file_schema so we don't need to cast back to utf8 * Fix fmt --- datafusion/datasource-parquet/src/opener.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 9ddb8c8332ac..c955633ae671 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -153,7 +153,9 @@ impl FileOpener for ParquetOpener { metadata, &mut reader, // Since we're manually loading the page index the option here should not matter but we pass it in for consistency - ArrowReaderOptions::new().with_page_index(true), + ArrowReaderOptions::new() + .with_page_index(true) + .with_schema(physical_file_schema.clone()), ) .await?; } From ad627d8c17f066733bf0f358cee5658a4fa64fee Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 4 Apr 2025 10:58:40 -0500 Subject: [PATCH 13/15] Update opener.rs --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c955633ae671..e7f196158ae9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -155,7 +155,7 @@ impl FileOpener for ParquetOpener { // Since we're manually loading the page index the option here should not matter but we pass it in for consistency ArrowReaderOptions::new() .with_page_index(true) - .with_schema(physical_file_schema.clone()), + .with_schema(Arc::clone(&physical_file_schema)), ) .await?; } From 34993f21ad8aef6e37aaa2f0f271f61c381d9161 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 4 Apr 2025 13:33:39 -0400 Subject: [PATCH 14/15] Always apply per-file schema during parquet read (#18) --- datafusion/datasource-parquet/src/opener.rs | 61 ++++++++++++++------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index e7f196158ae9..fcd953125997 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -91,7 +91,7 @@ impl FileOpener for ParquetOpener { let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint); - let mut reader: Box = + let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -121,23 +121,40 @@ impl FileOpener for ParquetOpener { let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { - // Don't load the page index yet - we will decide later if we need it - let options = ArrowReaderOptions::new().with_page_index(false); - + // Don't load the page index yet. Since it is not stored inline in + // the footer, loading the page index if it is not needed will do + // unecessary I/O. We decide later if it is needed to evaluate the + // pruning predicates. Thus default to not requesting if from the + // underlying reader. + let mut options = ArrowReaderOptions::new().with_page_index(false); let mut metadata_timer = file_metrics.metadata_load_time.timer(); - let mut metadata = - ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; + + // Begin by loading the metadata from the underlying reader (note + // the returned metadata may actually include page indexes as some + // readers may return page indexes even when not requested -- for + // example when they are cached) + let mut reader_metadata = + ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) + .await?; + // Note about schemas: we are actually dealing with **3 different schemas** here: // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. - let mut physical_file_schema = Arc::clone(metadata.schema()); + let mut physical_file_schema = Arc::clone(reader_metadata.schema()); - // read with view types + // The schema loaded from the file may not be the same as the + // desired schema (for example if we want to instruct the parquet + // reader to read strings using Utf8View instead). Update if necessary if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &physical_file_schema) { physical_file_schema = Arc::new(merged); + options = options.with_schema(Arc::clone(&physical_file_schema)); + reader_metadata = ArrowReaderMetadata::try_new( + Arc::clone(reader_metadata.metadata()), + options.clone(), + )?; } // Build predicates for this specific file @@ -147,23 +164,25 @@ impl FileOpener for ParquetOpener { &predicate_creation_errors, ); - // Now check if we should load the page index + // The page index is not stored inline in the parquet footer so the + // code above may not have raed the page index structures yet. If we + // need them for reading and they aren't yet loaded, we need to load them now. if should_enable_page_index(enable_page_index, &page_pruning_predicate) { - metadata = load_page_index( - metadata, - &mut reader, + reader_metadata = load_page_index( + reader_metadata, + &mut async_file_reader, // Since we're manually loading the page index the option here should not matter but we pass it in for consistency - ArrowReaderOptions::new() - .with_page_index(true) - .with_schema(Arc::clone(&physical_file_schema)), + options.with_page_index(true), ) .await?; } metadata_timer.stop(); - let mut builder = - ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + async_file_reader, + reader_metadata, + ); let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&physical_file_schema)?; @@ -372,12 +391,14 @@ fn build_pruning_predicates( (pruning_predicate, Some(page_pruning_predicate)) } +/// Returns a `ArrowReaderMetadata` with the page index loaded, loading +/// it from the underlying `AsyncFileReader` if necessary. async fn load_page_index( - arrow_reader: ArrowReaderMetadata, + reader_metadata: ArrowReaderMetadata, input: &mut T, options: ArrowReaderOptions, ) -> Result { - let parquet_metadata = arrow_reader.metadata(); + let parquet_metadata = reader_metadata.metadata(); let missing_column_index = parquet_metadata.column_index().is_none(); let missing_offset_index = parquet_metadata.offset_index().is_none(); // You may ask yourself: why are we even checking if the page index is already loaded here? @@ -397,6 +418,6 @@ async fn load_page_index( Ok(new_arrow_reader) } else { // No need to load the page index again, just return the existing metadata - Ok(arrow_reader) + Ok(reader_metadata) } } From c9aaa3d2e93d937d78de8940233e01007517f3ba Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 5 Apr 2025 00:32:08 -0500 Subject: [PATCH 15/15] Update datafusion/datasource-parquet/src/opener.rs --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index fcd953125997..708a8035a4f7 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -165,7 +165,7 @@ impl FileOpener for ParquetOpener { ); // The page index is not stored inline in the parquet footer so the - // code above may not have raed the page index structures yet. If we + // code above may not have read the page index structures yet. If we // need them for reading and they aren't yet loaded, we need to load them now. if should_enable_page_index(enable_page_index, &page_pruning_predicate) { reader_metadata = load_page_index(