From d791922e9a95e87db88066bc455191840d5d220d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 30 Oct 2025 15:53:38 -0400 Subject: [PATCH 1/3] TEST prefetching Row Groups using next_reader API in parquet-rs --- datafusion/datasource-parquet/src/opener.rs | 122 +++++++++++++++++++- 1 file changed, 119 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c905d950a96..1883c2b4e61d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -26,7 +26,7 @@ use crate::{ use arrow::array::RecordBatch; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; @@ -47,14 +47,17 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; +use datafusion_common_runtime::SpawnedTask; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; use futures::{ready, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, +}; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; @@ -449,6 +452,9 @@ impl FileOpener for ParquetOpener { .with_metrics(arrow_reader_metrics.clone()) .build()?; + // eagerly prefetch row groups + let stream = EagerRowGroupPrefetchStream::new(stream); + let files_ranges_pruned_statistics = file_metrics.files_ranges_pruned_statistics.clone(); let predicate_cache_inner_records = @@ -496,6 +502,116 @@ fn copy_arrow_reader_metrics( } } +/// Eagerly prefetches the next RowGroup from the underlying stream +struct EagerRowGroupPrefetchStream { + /// Outstanding prefetch state + state: EagerPrefetchState, + /// Active reader, if any + parquet_record_batch_reader: Option, +} + +struct PrefetchResult { + stream: ParquetRecordBatchStream, + reader: Option, +} + +enum EagerPrefetchState { + /// Trying to open the next RowGroup in a new task + Prefetching(SpawnedTask>>), + Done, +} + +impl EagerPrefetchState +where + T: AsyncFileReader + Unpin + Send + 'static, +{ + /// Begin fetching the next row group, if any + fn next_row_group(mut stream: ParquetRecordBatchStream) -> Self { + let task = SpawnedTask::spawn(async move { + let reader = stream.next_row_group().await?; + let result = PrefetchResult { stream, reader }; + Ok(result) + }); + Self::Prefetching(task) + } +} + +impl EagerRowGroupPrefetchStream +where + T: AsyncFileReader + Unpin + Send + 'static, +{ + pub fn new(stream: ParquetRecordBatchStream) -> Self { + Self { + state: EagerPrefetchState::next_row_group(stream), + parquet_record_batch_reader: None, + } + } +} + +impl Stream for EagerRowGroupPrefetchStream +where + T: AsyncFileReader + Unpin + Send + 'static, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + // If we have an active reader, try to read from it first + if let Some(mut reader) = self.parquet_record_batch_reader.take() { + match reader.next() { + Some(result) => { + // Return the batch + self.parquet_record_batch_reader = Some(reader); + let result = result.map_err(DataFusionError::from); + return Poll::Ready(Some(result)); + } + None => { + // Reader is exhausted, continue to prefetching the next row group + } + } + } + + use futures::Future; + + match &mut self.state { + EagerPrefetchState::Prefetching(handle) => { + // check if the inner is ready + let handle = pin!(handle); + match ready!(handle.poll(cx)) { + Ok(Ok(result)) => { + let PrefetchResult { stream, reader } = result; + // no reader means end of stream + if reader.is_none() { + self.state = EagerPrefetchState::Done; + } else { + // immediately start reading the next row group + self.state = EagerPrefetchState::next_row_group(stream); + } + self.parquet_record_batch_reader = reader; + } + Ok(Err(err)) => { + // error during prefetch, return it to the caller + return Poll::Ready(Some(Err(err))); + } + Err(e) => { + return Poll::Ready(Some(exec_err!( + "Eager prefetch task panicked: {e}" + ))); + } + } + } + EagerPrefetchState::Done => { + // stream is exhausted + return Poll::Ready(None); + } + } + } + } +} + /// Wraps an inner RecordBatchStream and a [`FilePruner`] /// /// This can terminate the scan early when some dynamic filters is updated after From befa9edc9b422b97643aa58f2e773d106df2a892 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Nov 2025 14:20:27 -0500 Subject: [PATCH 2/3] Codex solution --- datafusion/common/src/config.rs | 5 + .../common/src/file_options/parquet_writer.rs | 3 + datafusion/datasource-parquet/src/opener.rs | 520 +++++++++++++++--- datafusion/datasource-parquet/src/source.rs | 6 + .../proto/datafusion_common.proto | 2 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 22 + .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 11 files changed, 489 insertions(+), 77 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 212db653f713..0a9a80abc1b3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -694,6 +694,11 @@ config_namespace! { /// reduce the number of rows decoded. This optimization is sometimes called "late materialization". pub pushdown_filters: bool, default = false + /// (reading) Number of row groups to prefetch while scanning parquet files. + /// Set to 0 to disable prefetching. A value of 1 prefetches the next row + /// group while the current one is being processed. + pub prefetch_row_groups: usize, default = 1 + /// (reading) If true, filter expressions evaluated during the parquet decoding operation /// will be reordered heuristically to minimize the cost of evaluation. If false, /// the filters are applied in the same order as written in the query diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 564929c61bab..62f1e57d84be 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + prefetch_row_groups: _, } = self; let mut builder = WriterProperties::builder() @@ -469,6 +470,7 @@ mod tests { .maximum_parallel_row_group_writers, maximum_buffered_record_batches_per_stream: defaults .maximum_buffered_record_batches_per_stream, + prefetch_row_groups: defaults.prefetch_row_groups, bloom_filter_on_read: defaults.bloom_filter_on_read, schema_force_view_types: defaults.schema_force_view_types, binary_as_string: defaults.binary_as_string, @@ -583,6 +585,7 @@ mod tests { .maximum_parallel_row_group_writers, maximum_buffered_record_batches_per_stream: global_options_defaults .maximum_buffered_record_batches_per_stream, + prefetch_row_groups: global_options_defaults.prefetch_row_groups, bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, max_predicate_cache_size: global_options_defaults .max_predicate_cache_size, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 1883c2b4e61d..0fb8fde993c2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -50,7 +50,7 @@ use datafusion_common::config::EncryptionFactoryOptions; use datafusion_common_runtime::SpawnedTask; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{ready, Stream, StreamExt, TryStreamExt}; +use futures::{ready, Future, Stream, StreamExt}; use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; @@ -59,7 +59,9 @@ use parquet::arrow::arrow_reader::{ }; use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use tokio::sync::mpsc::{channel, Receiver}; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -114,6 +116,8 @@ pub(super) struct ParquetOpener { /// Maximum size of the predicate cache, in bytes. If none, uses /// the arrow-rs default. pub max_predicate_cache_size: Option, + /// Number of row groups to prefetch while scanning parquet files + pub prefetch_row_groups: usize, } impl FileOpener for ParquetOpener { @@ -165,6 +169,7 @@ impl FileOpener for ParquetOpener { #[cfg(feature = "parquet_encryption")] let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; + let prefetch_row_groups = self.prefetch_row_groups; Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] @@ -452,8 +457,16 @@ impl FileOpener for ParquetOpener { .with_metrics(arrow_reader_metrics.clone()) .build()?; - // eagerly prefetch row groups - let stream = EagerRowGroupPrefetchStream::new(stream); + let stream: Pin< + Box> + Send>, + > = if prefetch_row_groups == 0 { + Box::pin(stream) + } else { + Box::pin(EagerRowGroupPrefetchStream::new( + stream, + prefetch_row_groups, + )) + }; let files_ranges_pruned_statistics = file_metrics.files_ranges_pruned_statistics.clone(); @@ -461,16 +474,18 @@ impl FileOpener for ParquetOpener { file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - let stream = stream.map_err(DataFusionError::from).map(move |b| { - b.and_then(|b| { - copy_arrow_reader_metrics( - &arrow_reader_metrics, - &predicate_cache_inner_records, - &predicate_cache_records, - ); - schema_mapping.map_batch(b) - }) - }); + let stream = stream + .map(|b| b.map_err(DataFusionError::from)) + .map(move |b| { + b.and_then(|b| { + copy_arrow_reader_metrics( + &arrow_reader_metrics, + &predicate_cache_inner_records, + &predicate_cache_records, + ); + schema_mapping.map_batch(b) + }) + }); if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( @@ -502,48 +517,54 @@ fn copy_arrow_reader_metrics( } } -/// Eagerly prefetches the next RowGroup from the underlying stream +/// Eagerly prefetches RowGroups from the underlying stream struct EagerRowGroupPrefetchStream { - /// Outstanding prefetch state - state: EagerPrefetchState, + /// Prefetched row groups + receiver: Receiver>>, + /// Background task that drives prefetching + prefetch_task: Option>>, /// Active reader, if any parquet_record_batch_reader: Option, + /// Keep compiler happy about unused type parameter + _file_reader: std::marker::PhantomData, } -struct PrefetchResult { - stream: ParquetRecordBatchStream, - reader: Option, -} - -enum EagerPrefetchState { - /// Trying to open the next RowGroup in a new task - Prefetching(SpawnedTask>>), - Done, -} - -impl EagerPrefetchState +impl EagerRowGroupPrefetchStream where T: AsyncFileReader + Unpin + Send + 'static, { - /// Begin fetching the next row group, if any - fn next_row_group(mut stream: ParquetRecordBatchStream) -> Self { - let task = SpawnedTask::spawn(async move { - let reader = stream.next_row_group().await?; - let result = PrefetchResult { stream, reader }; - Ok(result) + pub fn new(stream: ParquetRecordBatchStream, prefetch_row_groups: usize) -> Self { + let (sender, receiver) = channel(prefetch_row_groups); + + let prefetch_task = SpawnedTask::spawn(async move { + let mut stream = stream; + loop { + match stream.next_row_group().await { + Ok(Some(reader)) => { + // Stop prefetching if the receiver is dropped + if sender.send(Ok(Some(reader))).await.is_err() { + break; + } + } + Ok(None) => { + // Signal end of stream + let _ = sender.send(Ok(None)).await; + break; + } + Err(err) => { + let _ = sender.send(Err(err)).await; + break; + } + } + } + Ok(()) }); - Self::Prefetching(task) - } -} -impl EagerRowGroupPrefetchStream -where - T: AsyncFileReader + Unpin + Send + 'static, -{ - pub fn new(stream: ParquetRecordBatchStream) -> Self { Self { - state: EagerPrefetchState::next_row_group(stream), + receiver, + prefetch_task: Some(prefetch_task), parquet_record_batch_reader: None, + _file_reader: std::marker::PhantomData, } } } @@ -552,7 +573,7 @@ impl Stream for EagerRowGroupPrefetchStream where T: AsyncFileReader + Unpin + Send + 'static, { - type Item = Result; + type Item = parquet::errors::Result; fn poll_next( mut self: Pin<&mut Self>, @@ -565,7 +586,8 @@ where Some(result) => { // Return the batch self.parquet_record_batch_reader = Some(reader); - let result = result.map_err(DataFusionError::from); + let result = result + .map_err(|err| ParquetError::ArrowError(err.to_string())); return Poll::Ready(Some(result)); } None => { @@ -574,39 +596,31 @@ where } } - use futures::Future; - - match &mut self.state { - EagerPrefetchState::Prefetching(handle) => { - // check if the inner is ready - let handle = pin!(handle); - match ready!(handle.poll(cx)) { - Ok(Ok(result)) => { - let PrefetchResult { stream, reader } = result; - // no reader means end of stream - if reader.is_none() { - self.state = EagerPrefetchState::Done; - } else { - // immediately start reading the next row group - self.state = EagerPrefetchState::next_row_group(stream); + match ready!(Pin::new(&mut self.receiver).poll_recv(cx)) { + Some(Ok(Some(reader))) => { + self.parquet_record_batch_reader = Some(reader); + } + Some(Ok(None)) => { + return Poll::Ready(None); + } + Some(Err(err)) => { + return Poll::Ready(Some(Err(err))); + } + None => { + if let Some(handle) = self.prefetch_task.take() { + match ready!(pin!(handle).poll(cx)) { + Ok(Ok(())) => return Poll::Ready(None), + Ok(Err(err)) => return Poll::Ready(Some(Err(err))), + Err(err) => { + return Poll::Ready(Some(Err(ParquetError::General( + format!("Eager prefetch task panicked: {err}"), + )))); } - self.parquet_record_batch_reader = reader; - } - Ok(Err(err)) => { - // error during prefetch, return it to the caller - return Poll::Ready(Some(Err(err))); - } - Err(e) => { - return Poll::Ready(Some(exec_err!( - "Eager prefetch task panicked: {e}" - ))); } + } else { + return Poll::Ready(None); } } - EagerPrefetchState::Done => { - // stream is exhausted - return Poll::Ready(None); - } } } } @@ -870,13 +884,16 @@ fn should_enable_page_index( #[cfg(test)] mod test { + use std::ops::Range; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::time::Duration; use arrow::{ compute::cast, datatypes::{DataType, Field, Schema, SchemaRef}, }; - use bytes::{BufMut, BytesMut}; + use bytes::{BufMut, Bytes, BytesMut}; use datafusion_common::{ assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, Statistics, @@ -895,11 +912,19 @@ mod test { }; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; - use futures::{Stream, StreamExt}; + use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectStore}; + use parquet::arrow::arrow_reader::ArrowReaderOptions; + use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ArrowWriter; + use parquet::file::properties::WriterProperties; + use parquet::file::reader::FileReader; + use parquet::file::serialized_reader::SerializedFileReader; + use tokio::sync::{mpsc, Semaphore}; - use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; + use crate::{ + opener::ParquetOpener, DefaultParquetFileReaderFactory, ParquetFileReaderFactory, + }; async fn count_batches_and_rows( mut stream: std::pin::Pin< @@ -951,6 +976,168 @@ mod test { data_len } + async fn write_parquet_with_properties( + store: Arc, + filename: &str, + batch: arrow::record_batch::RecordBatch, + properties: WriterProperties, + ) -> (usize, Bytes) { + let mut out = BytesMut::new().writer(); + { + let mut writer = + ArrowWriter::try_new(&mut out, batch.schema(), Some(properties)).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + let data_len = data.len(); + store + .put(&Path::from(filename), data.clone().into()) + .await + .unwrap(); + (data_len, data) + } + + fn row_group_ranges(data: &Bytes) -> Vec> { + let reader = + SerializedFileReader::new(data.clone()).expect("reading parquet metadata"); + reader + .metadata() + .row_groups() + .iter() + .map(|row_group| { + row_group + .columns() + .iter() + .fold((u64::MAX, 0), |(start, end), column| { + let (col_start, len) = column.byte_range(); + (start.min(col_start), end.max(col_start + len)) + }) + }) + .map(|(start, end)| start..end) + .collect() + } + + #[derive(Debug)] + struct RowGroupGateState { + ranges: Vec>, + started: Vec, + permits: Vec, + start_tx: mpsc::UnboundedSender, + } + + impl RowGroupGateState { + fn new(ranges: Vec>, start_tx: mpsc::UnboundedSender) -> Self { + let started = (0..ranges.len()).map(|_| AtomicBool::new(false)).collect(); + let permits = (0..ranges.len()).map(|_| Semaphore::new(0)).collect(); + Self { + ranges, + started, + permits, + start_tx, + } + } + + fn row_group_for_range(&self, range: &Range) -> Option { + self.ranges + .iter() + .position(|row_group_range| row_group_range.contains(&range.start)) + } + + async fn notify_row_group(&self, idx: usize) { + if !self.started[idx].swap(true, Ordering::SeqCst) { + let _ = self.start_tx.send(idx); + // Wait until this row group is allowed to proceed + self.permits[idx].acquire().await.unwrap().forget(); + } + } + } + + #[derive(Debug)] + struct GatedParquetFileReaderFactory { + inner: DefaultParquetFileReaderFactory, + gates: Arc, + } + + impl GatedParquetFileReaderFactory { + fn new(store: Arc, gates: Arc) -> Self { + Self { + inner: DefaultParquetFileReaderFactory::new(store), + gates, + } + } + } + + impl ParquetFileReaderFactory for GatedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + partitioned_file: PartitionedFile, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let reader = self.inner.create_reader( + partition_index, + partitioned_file, + metadata_size_hint, + metrics, + )?; + Ok(Box::new(GatedAsyncFileReader { + inner: reader, + gates: Arc::clone(&self.gates), + })) + } + } + + struct GatedAsyncFileReader { + inner: Box, + gates: Arc, + } + + impl AsyncFileReader for GatedAsyncFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + let gates = Arc::clone(&self.gates); + let inner = &mut self.inner; + async move { + if let Some(idx) = gates.row_group_for_range(&range) { + gates.notify_row_group(idx).await; + } + inner.get_bytes(range).await + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let gates = Arc::clone(&self.gates); + let inner = &mut self.inner; + async move { + for range in &ranges { + if let Some(idx) = gates.row_group_for_range(range) { + gates.notify_row_group(idx).await; + } + } + inner.get_byte_ranges(ranges).await + } + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture< + 'a, + parquet::errors::Result>, + > { + self.inner.get_metadata(options) + } + } + fn make_dynamic_expr(expr: Arc) -> Arc { Arc::new(DynamicFilterPhysicalExpr::new( expr.children().into_iter().map(Arc::clone).collect(), @@ -1014,6 +1201,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1087,6 +1275,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1176,6 +1365,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1268,6 +1458,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1360,6 +1551,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, } }; @@ -1381,6 +1573,179 @@ mod test { assert_eq!(num_rows, 0); } + #[tokio::test] + async fn test_row_group_prefetch_config_enabled() { + let store = Arc::new(InMemory::new()) as Arc; + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let writer_properties = WriterProperties::builder() + .set_max_row_group_size(1) + .build(); + let (data_size, data) = write_parquet_with_properties( + Arc::clone(&store), + "prefetch_enabled.parquet", + batch.clone(), + writer_properties, + ) + .await; + let ranges = row_group_ranges(&data); + assert!(ranges.len() > 1); + + let schema = batch.schema(); + let file = PartitionedFile::new( + "prefetch_enabled.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let (start_tx, mut start_rx) = mpsc::unbounded_channel(); + let gates = Arc::new(RowGroupGateState::new(ranges, start_tx)); + gates.permits[0].add_permits(1); + + let opener = ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(logical2physical(&col("a"), &schema)), + logical_file_schema: schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(GatedParquetFileReaderFactory::new( + Arc::clone(&store), + Arc::clone(&gates), + )), + partition_fields: vec![], + pushdown_filters: false, + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + prefetch_row_groups: 1, + }; + + let mut stream = opener.open(file).unwrap().await.unwrap(); + let first_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(first_row_group, 0); + let first_batch = stream.next().await.unwrap().unwrap(); + assert_eq!(first_batch.num_rows(), 1); + + let second_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(second_row_group, 1); + + for permit in gates.permits.iter().skip(1) { + permit.add_permits(1); + } + + let mut remaining = vec![]; + while let Some(batch) = stream.next().await { + remaining.push(batch.unwrap()); + } + assert_eq!(remaining.len(), gates.permits.len() - 1); + } + + #[tokio::test] + async fn test_row_group_prefetch_can_be_disabled() { + let store = Arc::new(InMemory::new()) as Arc; + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let writer_properties = WriterProperties::builder() + .set_max_row_group_size(1) + .build(); + let (data_size, data) = write_parquet_with_properties( + Arc::clone(&store), + "prefetch_disabled.parquet", + batch.clone(), + writer_properties, + ) + .await; + let ranges = row_group_ranges(&data); + assert!(ranges.len() > 1); + + let schema = batch.schema(); + let file = PartitionedFile::new( + "prefetch_disabled.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let (start_tx, mut start_rx) = mpsc::unbounded_channel(); + let gates = Arc::new(RowGroupGateState::new(ranges, start_tx)); + gates.permits[0].add_permits(1); + + let opener = ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(logical2physical(&col("a"), &schema)), + logical_file_schema: schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(GatedParquetFileReaderFactory::new( + Arc::clone(&store), + Arc::clone(&gates), + )), + partition_fields: vec![], + pushdown_filters: false, + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + prefetch_row_groups: 0, + }; + + let mut stream = opener.open(file).unwrap().await.unwrap(); + let first_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(first_row_group, 0); + let first_batch = stream.next().await.unwrap().unwrap(); + assert_eq!(first_batch.num_rows(), 1); + + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(start_rx.try_recv().is_err()); + + for permit in gates.permits.iter().skip(1) { + permit.add_permits(1); + } + + let second_row_group = + tokio::time::timeout(Duration::from_secs(1), start_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(second_row_group, 1); + + let mut remaining = vec![]; + while let Some(batch) = stream.next().await { + remaining.push(batch.unwrap()); + } + assert_eq!(remaining.len(), gates.permits.len() - 1); + } + fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { match metrics.sum_by_name(metric_name) { Some(v) => v.as_usize(), @@ -1510,6 +1875,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + prefetch_row_groups: 1, }; let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 7c07b7b68c35..5b00c8194a2d 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -447,6 +447,11 @@ impl ParquetSource { self.table_parquet_options.global.max_predicate_cache_size } + /// Number of row groups to prefetch while scanning parquet files + pub fn prefetch_row_groups(&self) -> usize { + self.table_parquet_options.global.prefetch_row_groups + } + /// Applies schema adapter factory from the FileScanConfig if present. /// /// # Arguments @@ -604,6 +609,7 @@ impl FileSource for ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), + prefetch_row_groups: self.prefetch_row_groups(), }) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 267953556b16..025b73edc5b9 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -585,6 +585,8 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + uint64 prefetch_row_groups = 34; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 4ede5b970eae..9d34a19eb729 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1005,6 +1005,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + prefetch_row_groups: value.prefetch_row_groups as usize, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e63f345459b8..fa39b8d1eb96 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5635,6 +5635,9 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.prefetch_row_groups != 0 { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5692,6 +5695,11 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; } + if self.prefetch_row_groups != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("prefetchRowGroups", ToString::to_string(&self.prefetch_row_groups).as_str())?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5838,6 +5846,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "binaryAsString", "skip_arrow_metadata", "skipArrowMetadata", + "prefetch_row_groups", + "prefetchRowGroups", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5886,6 +5896,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SchemaForceViewTypes, BinaryAsString, SkipArrowMetadata, + PrefetchRowGroups, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5938,6 +5949,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), + "prefetchRowGroups" | "prefetch_row_groups" => Ok(GeneratedField::PrefetchRowGroups), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5988,6 +6000,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; let mut skip_arrow_metadata__ = None; + let mut prefetch_row_groups__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -6109,6 +6122,14 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } skip_arrow_metadata__ = Some(map_.next_value()?); } + GeneratedField::PrefetchRowGroups => { + if prefetch_row_groups__.is_some() { + return Err(serde::de::Error::duplicate_field("prefetchRowGroups")); + } + prefetch_row_groups__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -6224,6 +6245,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), + prefetch_row_groups: prefetch_row_groups__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa7c3d51a9d6..39b4c3d0d3f1 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -798,6 +798,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = 1 + #[prost(uint64, tag = "34")] + pub prefetch_row_groups: u64, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e9de1d9e9a9e..23f3725bbbef 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -881,6 +881,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + prefetch_row_groups: value.prefetch_row_groups as u64, }) } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index eba527ed2b21..a4ed7760e968 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -249,6 +249,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 datafusion.execution.parquet.metadata_size_hint 524288 +datafusion.execution.parquet.prefetch_row_groups 1 datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false @@ -371,6 +372,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. +datafusion.execution.parquet.prefetch_row_groups 1 (reading) Number of row groups to prefetch while scanning parquet files. Set to 0 to disable prefetching. datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c3eda544a1de..7bf6b7363ffb 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ The following configuration settings are available: | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.prefetch_row_groups | 1 | (reading) Number of row groups to prefetch while scanning parquet files. Set to 0 to disable prefetching. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | From 4f1deda7443e535777ff062a6847f963f15a6d9d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Nov 2025 14:20:55 -0500 Subject: [PATCH 3/3] Update expected --- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- datafusion/sqllogictest/test_files/sort_merge_join.slt | 2 +- datafusion/sqllogictest/test_files/spark/math/csc.slt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index a4ed7760e968..4eb4529b5213 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -372,7 +372,7 @@ datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. -datafusion.execution.parquet.prefetch_row_groups 1 (reading) Number of row groups to prefetch while scanning parquet files. Set to 0 to disable prefetching. +datafusion.execution.parquet.prefetch_row_groups 1 (reading) Number of row groups to prefetch while scanning parquet files. Set to 0 to disable prefetching. A value of 1 prefetches the next row group while the current one is being processed. datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index aa87026c5cf3..5f9276bdb78e 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -939,4 +939,4 @@ SELECT t2.a, t2.b, t2.c FROM t2 WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) ---- -4 101 1001 \ No newline at end of file +4 101 1001 diff --git a/datafusion/sqllogictest/test_files/spark/math/csc.slt b/datafusion/sqllogictest/test_files/spark/math/csc.slt index 5eb9f4447280..837704113da4 100644 --- a/datafusion/sqllogictest/test_files/spark/math/csc.slt +++ b/datafusion/sqllogictest/test_files/spark/math/csc.slt @@ -43,4 +43,4 @@ SELECT csc(a) FROM (VALUES (pi()), (-pi()), (pi()/2) , (arrow_cast('NAN','Float3 8165619676597685 -8165619676597685 1 -NaN \ No newline at end of file +NaN