diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 212db653f713..0a9a80abc1b3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -694,6 +694,11 @@ config_namespace! { /// reduce the number of rows decoded. This optimization is sometimes called "late materialization". pub pushdown_filters: bool, default = false + /// (reading) Number of row groups to prefetch while scanning parquet files. + /// Set to 0 to disable prefetching. A value of 1 prefetches the next row + /// group while the current one is being processed. + pub prefetch_row_groups: usize, default = 1 + /// (reading) If true, filter expressions evaluated during the parquet decoding operation /// will be reordered heuristically to minimize the cost of evaluation. If false, /// the filters are applied in the same order as written in the query diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 564929c61bab..62f1e57d84be 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + prefetch_row_groups: _, } = self; let mut builder = WriterProperties::builder() @@ -469,6 +470,7 @@ mod tests { .maximum_parallel_row_group_writers, maximum_buffered_record_batches_per_stream: defaults .maximum_buffered_record_batches_per_stream, + prefetch_row_groups: defaults.prefetch_row_groups, bloom_filter_on_read: defaults.bloom_filter_on_read, schema_force_view_types: defaults.schema_force_view_types, binary_as_string: defaults.binary_as_string, @@ -583,6 +585,7 @@ mod tests { .maximum_parallel_row_group_writers, maximum_buffered_record_batches_per_stream: global_options_defaults .maximum_buffered_record_batches_per_stream, + prefetch_row_groups: global_options_defaults.prefetch_row_groups, bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, max_predicate_cache_size: global_options_defaults .max_predicate_cache_size, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c905d950a96..26c007fe3c6b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -17,6 +17,8 @@ //! [`ParquetOpener`] for opening Parquet files +mod prefetch; + use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; use crate::{ @@ -45,10 +47,12 @@ use datafusion_physical_plan::metrics::{ }; use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; +use crate::opener::prefetch::EagerRowGroupPrefetchStream; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; +use futures::stream::BoxStream; use futures::{ready, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; @@ -111,6 +115,8 @@ pub(super) struct ParquetOpener { /// Maximum size of the predicate cache, in bytes. If none, uses /// the arrow-rs default. pub max_predicate_cache_size: Option, + /// Number of row groups to prefetch + pub prefetch_row_groups: usize, } impl FileOpener for ParquetOpener { @@ -162,6 +168,7 @@ impl FileOpener for ParquetOpener { #[cfg(feature = "parquet_encryption")] let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; + let prefetch_row_groups = self.prefetch_row_groups; Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] @@ -449,6 +456,12 @@ impl FileOpener for ParquetOpener { .with_metrics(arrow_reader_metrics.clone()) .build()?; + let stream: BoxStream> = if prefetch_row_groups == 0 { + stream.map_err(DataFusionError::from).boxed() + } else { + EagerRowGroupPrefetchStream::new(stream, prefetch_row_groups).boxed() + }; + let files_ranges_pruned_statistics = file_metrics.files_ranges_pruned_statistics.clone(); let predicate_cache_inner_records = @@ -754,13 +767,16 @@ fn should_enable_page_index( #[cfg(test)] mod test { + use std::ops::Range; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::time::Duration; use arrow::{ compute::cast, datatypes::{DataType, Field, Schema, SchemaRef}, }; - use bytes::{BufMut, BytesMut}; + use bytes::{BufMut, Bytes, BytesMut}; use datafusion_common::{ assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, Statistics, @@ -779,11 +795,19 @@ mod test { }; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; - use futures::{Stream, StreamExt}; + use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectStore}; + use parquet::arrow::arrow_reader::ArrowReaderOptions; + use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ArrowWriter; + use parquet::file::properties::WriterProperties; + use parquet::file::reader::FileReader; + use parquet::file::serialized_reader::SerializedFileReader; + use tokio::sync::{mpsc, Semaphore}; - use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; + use crate::{ + opener::ParquetOpener, DefaultParquetFileReaderFactory, ParquetFileReaderFactory, + }; async fn count_batches_and_rows( mut stream: std::pin::Pin< @@ -835,6 +859,168 @@ mod test { data_len } + async fn write_parquet_with_properties( + store: Arc, + filename: &str, + batch: arrow::record_batch::RecordBatch, + properties: WriterProperties, + ) -> (usize, Bytes) { + let mut out = BytesMut::new().writer(); + { + let mut writer = + ArrowWriter::try_new(&mut out, batch.schema(), Some(properties)).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + let data_len = data.len(); + store + .put(&Path::from(filename), data.clone().into()) + .await + .unwrap(); + (data_len, data) + } + + fn row_group_ranges(data: &Bytes) -> Vec> { + let reader = + SerializedFileReader::new(data.clone()).expect("reading parquet metadata"); + reader + .metadata() + .row_groups() + .iter() + .map(|row_group| { + row_group + .columns() + .iter() + .fold((u64::MAX, 0), |(start, end), column| { + let (col_start, len) = column.byte_range(); + (start.min(col_start), end.max(col_start + len)) + }) + }) + .map(|(start, end)| start..end) + .collect() + } + + #[derive(Debug)] + struct RowGroupGateState { + ranges: Vec>, + started: Vec, + permits: Vec, + start_tx: mpsc::UnboundedSender, + } + + impl RowGroupGateState { + fn new(ranges: Vec>, start_tx: mpsc::UnboundedSender) -> Self { + let started = (0..ranges.len()).map(|_| AtomicBool::new(false)).collect(); + let permits = (0..ranges.len()).map(|_| Semaphore::new(0)).collect(); + Self { + ranges, + started, + permits, + start_tx, + } + } + + fn row_group_for_range(&self, range: &Range) -> Option { + self.ranges + .iter() + .position(|row_group_range| row_group_range.contains(&range.start)) + } + + async fn notify_row_group(&self, idx: usize) { + if !self.started[idx].swap(true, Ordering::SeqCst) { + let _ = self.start_tx.send(idx); + // Wait until this row group is allowed to proceed + self.permits[idx].acquire().await.unwrap().forget(); + } + } + } + + #[derive(Debug)] + struct GatedParquetFileReaderFactory { + inner: DefaultParquetFileReaderFactory, + gates: Arc, + } + + impl GatedParquetFileReaderFactory { + fn new(store: Arc, gates: Arc) -> Self { + Self { + inner: DefaultParquetFileReaderFactory::new(store), + gates, + } + } + } + + impl ParquetFileReaderFactory for GatedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + partitioned_file: PartitionedFile, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let reader = self.inner.create_reader( + partition_index, + partitioned_file, + metadata_size_hint, + metrics, + )?; + Ok(Box::new(GatedAsyncFileReader { + inner: reader, + gates: Arc::clone(&self.gates), + })) + } + } + + struct GatedAsyncFileReader { + inner: Box, + gates: Arc, + } + + impl AsyncFileReader for GatedAsyncFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + let gates = Arc::clone(&self.gates); + let inner = &mut self.inner; + async move { + if let Some(idx) = gates.row_group_for_range(&range) { + gates.notify_row_group(idx).await; + } + inner.get_bytes(range).await + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let gates = Arc::clone(&self.gates); + let inner = &mut self.inner; + async move { + for range in &ranges { + if let Some(idx) = gates.row_group_for_range(range) { + gates.notify_row_group(idx).await; + } + } + inner.get_byte_ranges(ranges).await + } + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture< + 'a, + parquet::errors::Result>, + > { + self.inner.get_metadata(options) + } + } + fn make_dynamic_expr(expr: Arc) -> Arc { Arc::new(DynamicFilterPhysicalExpr::new( expr.children().into_iter().map(Arc::clone).collect(), @@ -898,6 +1084,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -971,6 +1158,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1060,6 +1248,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1152,6 +1341,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1244,6 +1434,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1265,6 +1456,179 @@ mod test { assert_eq!(num_rows, 0); } + #[tokio::test] + async fn test_row_group_prefetch_config_enabled() { + let store = Arc::new(InMemory::new()) as Arc; + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let writer_properties = WriterProperties::builder() + .set_max_row_group_size(1) + .build(); + let (data_size, data) = write_parquet_with_properties( + Arc::clone(&store), + "prefetch_enabled.parquet", + batch.clone(), + writer_properties, + ) + .await; + let ranges = row_group_ranges(&data); + assert!(ranges.len() > 1); + + let schema = batch.schema(); + let file = PartitionedFile::new( + "prefetch_enabled.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let (start_tx, mut start_rx) = mpsc::unbounded_channel(); + let gates = Arc::new(RowGroupGateState::new(ranges, start_tx)); + gates.permits[0].add_permits(1); + + let opener = ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(logical2physical(&col("a"), &schema)), + logical_file_schema: schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(GatedParquetFileReaderFactory::new( + Arc::clone(&store), + Arc::clone(&gates), + )), + partition_fields: vec![], + pushdown_filters: false, + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + prefetch_row_groups: 1, + }; + + let mut stream = opener.open(file).unwrap().await.unwrap(); + let first_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(first_row_group, 0); + let first_batch = stream.next().await.unwrap().unwrap(); + assert_eq!(first_batch.num_rows(), 1); + + let second_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(second_row_group, 1); + + for permit in gates.permits.iter().skip(1) { + permit.add_permits(1); + } + + let mut remaining = vec![]; + while let Some(batch) = stream.next().await { + remaining.push(batch.unwrap()); + } + assert_eq!(remaining.len(), gates.permits.len() - 1); + } + + #[tokio::test] + async fn test_row_group_prefetch_can_be_disabled() { + let store = Arc::new(InMemory::new()) as Arc; + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let writer_properties = WriterProperties::builder() + .set_max_row_group_size(1) + .build(); + let (data_size, data) = write_parquet_with_properties( + Arc::clone(&store), + "prefetch_disabled.parquet", + batch.clone(), + writer_properties, + ) + .await; + let ranges = row_group_ranges(&data); + assert!(ranges.len() > 1); + + let schema = batch.schema(); + let file = PartitionedFile::new( + "prefetch_disabled.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let (start_tx, mut start_rx) = mpsc::unbounded_channel(); + let gates = Arc::new(RowGroupGateState::new(ranges, start_tx)); + gates.permits[0].add_permits(1); + + let opener = ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(logical2physical(&col("a"), &schema)), + logical_file_schema: schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(GatedParquetFileReaderFactory::new( + Arc::clone(&store), + Arc::clone(&gates), + )), + partition_fields: vec![], + pushdown_filters: false, + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + prefetch_row_groups: 0, + }; + + let mut stream = opener.open(file).unwrap().await.unwrap(); + let first_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(first_row_group, 0); + let first_batch = stream.next().await.unwrap().unwrap(); + assert_eq!(first_batch.num_rows(), 1); + + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(start_rx.try_recv().is_err()); + + for permit in gates.permits.iter().skip(1) { + permit.add_permits(1); + } + + let second_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(second_row_group, 1); + + let mut remaining = vec![]; + while let Some(batch) = stream.next().await { + remaining.push(batch.unwrap()); + } + assert_eq!(remaining.len(), gates.permits.len() - 1); + } + fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { match metrics.sum_by_name(metric_name) { Some(v) => v.as_usize(), @@ -1394,6 +1758,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, }; let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); diff --git a/datafusion/datasource-parquet/src/opener/prefetch.rs b/datafusion/datasource-parquet/src/opener/prefetch.rs new file mode 100644 index 000000000000..b72062619590 --- /dev/null +++ b/datafusion/datasource-parquet/src/opener/prefetch.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EagerRowGroupPrefetchStream`] prefetches Parquet RowGroups in the background. + +use arrow::array::RecordBatch; +use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common_runtime::SpawnedTask; +use futures::{ready, Future, Stream}; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; +use parquet::errors::ParquetError; +use std::pin::pin; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::{channel, Receiver}; + +/// Eagerly prefetches RowGroups from the underlying stream +pub(crate) struct EagerRowGroupPrefetchStream { + /// Channel receiver for prefetched row groups + receiver: Receiver< + datafusion_common::Result, ParquetError>, + >, + /// Background task that drives prefetching + prefetch_task: Option>>, + /// Active reader, if any + parquet_record_batch_reader: Option, +} + +impl EagerRowGroupPrefetchStream { + /// Create a new prefetching stream, that prefetches up to `prefetch_row_groups` at once + pub fn new(stream: ParquetRecordBatchStream, prefetch_row_groups: usize) -> Self + where + T: AsyncFileReader + Unpin + Send + 'static, + { + let (sender, receiver) = channel(prefetch_row_groups); + + let prefetch_task = SpawnedTask::spawn(async move { + let mut stream = stream; + loop { + match stream.next_row_group().await { + Ok(Some(reader)) => { + // Stop prefetching if the receiver is dropped + if sender.send(Ok(Some(reader))).await.is_err() { + break; + } + } + Ok(None) => { + // Send the error, ignore errors if the receiver is dropped as + // there is nowhere to send the errors to + let _ = sender.send(Ok(None)).await; + break; + } + Err(err) => { + // Send the error, ignore errors if the receiver is dropped as + // there is nowhere to send the errors to + let _ = sender.send(Err(err)).await; + break; + } + } + } + Ok(()) + }); + + Self { + receiver, + prefetch_task: Some(prefetch_task), + parquet_record_batch_reader: None, + } + } +} + +impl Stream for EagerRowGroupPrefetchStream { + type Item = datafusion_common::Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + // If we have an active reader, try to read from it first + if let Some(mut reader) = self.parquet_record_batch_reader.take() { + match reader.next() { + Some(result) => { + // Return the reader to self for the next poll + self.parquet_record_batch_reader = Some(reader); + let result = result.map_err(DataFusionError::from); + return Poll::Ready(Some(result)); + } + None => { + // Reader is exhausted, continue to prefetching the next row group + } + } + } + + match ready!(Pin::new(&mut self.receiver).poll_recv(cx)) { + Some(Ok(Some(reader))) => { + self.parquet_record_batch_reader = Some(reader); + } + Some(Ok(None)) => { + return Poll::Ready(None); + } + Some(Err(err)) => { + return Poll::Ready(Some(Err(DataFusionError::from(err)))); + } + // end of stream from producer task + None => { + if let Some(handle) = self.prefetch_task.take() { + match ready!(pin!(handle).poll(cx)) { + Ok(Ok(())) => return Poll::Ready(None), + Ok(Err(err)) => { + return Poll::Ready(Some(Err(DataFusionError::from(err)))) + } + Err(err) => { + return Poll::Ready(Some(exec_err!( + "Eager prefetch task panicked: {err}" + ))); + } + } + } else { + return Poll::Ready(None); + } + } + } + } + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index da7bc125d2f6..8fe8972c2436 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -445,6 +445,11 @@ impl ParquetSource { self.table_parquet_options.global.max_predicate_cache_size } + /// Number of row groups to prefetch + pub fn prefetch_row_groups(&self) -> usize { + self.table_parquet_options.global.prefetch_row_groups + } + /// Applies schema adapter factory from the FileScanConfig if present. /// /// # Arguments @@ -602,6 +607,7 @@ impl FileSource for ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), + prefetch_row_groups: self.prefetch_row_groups(), }) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 267953556b16..025b73edc5b9 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -585,6 +585,8 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + uint64 prefetch_row_groups = 34; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 4ede5b970eae..9d34a19eb729 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1005,6 +1005,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + prefetch_row_groups: value.prefetch_row_groups as usize, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e63f345459b8..db70f5e4a8a3 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5602,6 +5602,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { len += 1; } + if self.prefetch_row_groups != 0 { + len += 1; + } if self.metadata_size_hint_opt.is_some() { len += 1; } @@ -5710,6 +5713,11 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { struct_ser.serialize_field("createdBy", &self.created_by)?; } + if self.prefetch_row_groups != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("prefetchRowGroups", ToString::to_string(&self.prefetch_row_groups).as_str())?; + } if let Some(v) = self.metadata_size_hint_opt.as_ref() { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { @@ -5846,6 +5854,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxRowGroupSize", "created_by", "createdBy", + "prefetch_row_groups", + "prefetchRowGroups", "metadata_size_hint", "metadataSizeHint", "compression", @@ -5890,6 +5900,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { DataPageRowCountLimit, MaxRowGroupSize, CreatedBy, + PrefetchRowGroups, MetadataSizeHint, Compression, DictionaryEnabled, @@ -5942,6 +5953,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + "prefetchRowGroups" | "prefetch_row_groups" => Ok(GeneratedField::PrefetchRowGroups), "metadataSizeHint" | "metadata_size_hint" => Ok(GeneratedField::MetadataSizeHint), "compression" => Ok(GeneratedField::Compression), "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), @@ -5992,6 +6004,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; let mut created_by__ = None; + let mut prefetch_row_groups__ = None; let mut metadata_size_hint_opt__ = None; let mut compression_opt__ = None; let mut dictionary_enabled_opt__ = None; @@ -6139,6 +6152,14 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } created_by__ = Some(map_.next_value()?); } + GeneratedField::PrefetchRowGroups => { + if prefetch_row_groups__.is_some() { + return Err(serde::de::Error::duplicate_field("prefetchRowGroups")); + } + prefetch_row_groups__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } GeneratedField::MetadataSizeHint => { if metadata_size_hint_opt__.is_some() { return Err(serde::de::Error::duplicate_field("metadataSizeHint")); @@ -6228,6 +6249,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), created_by: created_by__.unwrap_or_default(), + prefetch_row_groups: prefetch_row_groups__.unwrap_or_default(), metadata_size_hint_opt: metadata_size_hint_opt__, compression_opt: compression_opt__, dictionary_enabled_opt: dictionary_enabled_opt__, diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa7c3d51a9d6..e6236793e48b 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -806,6 +806,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(uint64, tag = "34")] + pub prefetch_row_groups: u64, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e9de1d9e9a9e..23f3725bbbef 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -881,6 +881,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + prefetch_row_groups: value.prefetch_row_groups as u64, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa7c3d51a9d6..e6236793e48b 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -806,6 +806,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(uint64, tag = "34")] + pub prefetch_row_groups: u64, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d32bfb22ffdd..7e06ad21910d 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -411,6 +411,7 @@ mod parquet { allow_single_file_parallelism: global_options.global.allow_single_file_parallelism, maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64, + prefetch_row_groups: global_options.global.prefetch_row_groups as u64, schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, skip_arrow_metadata: global_options.global.skip_arrow_metadata, @@ -507,6 +508,7 @@ mod parquet { allow_single_file_parallelism: proto.allow_single_file_parallelism, maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, + prefetch_row_groups: proto.prefetch_row_groups as usize, schema_force_view_types: proto.schema_force_view_types, binary_as_string: proto.binary_as_string, skip_arrow_metadata: proto.skip_arrow_metadata, diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index eba527ed2b21..4eb4529b5213 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -249,6 +249,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 datafusion.execution.parquet.metadata_size_hint 524288 +datafusion.execution.parquet.prefetch_row_groups 1 datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false @@ -371,6 +372,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. +datafusion.execution.parquet.prefetch_row_groups 1 (reading) Number of row groups to prefetch while scanning parquet files. Set to 0 to disable prefetching. A value of 1 prefetches the next row group while the current one is being processed. datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index aa87026c5cf3..5f9276bdb78e 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -939,4 +939,4 @@ SELECT t2.a, t2.b, t2.c FROM t2 WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) ---- -4 101 1001 \ No newline at end of file +4 101 1001 diff --git a/datafusion/sqllogictest/test_files/spark/math/csc.slt b/datafusion/sqllogictest/test_files/spark/math/csc.slt index 5eb9f4447280..837704113da4 100644 --- a/datafusion/sqllogictest/test_files/spark/math/csc.slt +++ b/datafusion/sqllogictest/test_files/spark/math/csc.slt @@ -43,4 +43,4 @@ SELECT csc(a) FROM (VALUES (pi()), (-pi()), (pi()/2) , (arrow_cast('NAN','Float3 8165619676597685 -8165619676597685 1 -NaN \ No newline at end of file +NaN diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c3eda544a1de..7bf6b7363ffb 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ The following configuration settings are available: | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.prefetch_row_groups | 1 | (reading) Number of row groups to prefetch while scanning parquet files. Set to 0 to disable prefetching. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. |