From bb8b9f5c1b6301f4af7034ef8eb85bec9c1f9893 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 31 Jul 2024 21:08:29 +0100 Subject: [PATCH 1/2] feat: concurrent data file fetches, parallel RecordBatch processing --- crates/iceberg/Cargo.toml | 1 - crates/iceberg/src/arrow/reader.rs | 149 +++++++++++++++++++---------- crates/iceberg/src/scan.rs | 29 ++++-- 3 files changed, 123 insertions(+), 56 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index de5b7cdc5e..8f5cf02d44 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -50,7 +50,6 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } -async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 58440bfdf2..6a004438f7 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; use arrow_string::like::starts_with; -use async_stream::try_stream; use bytes::Bytes; use fnv::FnvHashSet; +use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; -use futures::stream::StreamExt; -use futures::{try_join, TryFutureExt}; +use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -44,7 +43,8 @@ use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream}; +use crate::runtime::spawn; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, Schema}; use crate::{Error, ErrorKind}; @@ -52,17 +52,30 @@ use crate::{Error, ErrorKind}; pub struct ArrowReaderBuilder { batch_size: Option, file_io: FileIO, + concurrency_limit_data_files: usize, } impl ArrowReaderBuilder { /// Create a new ArrowReaderBuilder pub(crate) fn new(file_io: FileIO) -> Self { + let num_cpus = std::thread::available_parallelism() + .expect("failed to get number of CPUs") + .get(); + ArrowReaderBuilder { batch_size: None, file_io, + concurrency_limit_data_files: num_cpus, } } + /// Sets the max number of in flight data files that are being fetched + pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self { + self.concurrency_limit_data_files = val; + + self + } + /// Sets the desired size of batches in the response /// to something other than the default pub fn with_batch_size(mut self, batch_size: usize) -> Self { @@ -75,6 +88,7 @@ impl ArrowReaderBuilder { ArrowReader { batch_size: self.batch_size, file_io: self.file_io, + concurrency_limit_data_files: self.concurrency_limit_data_files, } } } @@ -84,73 +98,113 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, + + /// the maximum number of data files that can be fetched at the same time + concurrency_limit_data_files: usize, } impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result { + pub fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); - - Ok(try_stream! { - while let Some(task_result) = tasks.next().await { - match task_result { - Ok(task) => { - // Collect Parquet column indices from field ids - let mut collector = CollectFieldIdVisitor { - field_ids: HashSet::default(), - }; - if let Some(predicates) = task.predicate() { - visit(&mut collector, predicates)?; + let batch_size = self.batch_size; + let max_concurrent_fetching_datafiles = self.concurrency_limit_data_files; + + let (tx, rx) = channel(10); + let mut channel_for_error = tx.clone(); + + spawn(async move { + let result = tasks + .map(|task| Ok((task, file_io.clone(), tx.clone()))) + .try_for_each_concurrent( + max_concurrent_fetching_datafiles, + |(file_scan_task, file_io, tx)| async move { + match file_scan_task { + Ok(task) => { + let file_path = task.data_file_path().to_string(); + + spawn(async move { + Self::process_file_scan_task(task, batch_size, file_io, tx) + .await + }) + .await + .map_err(|e| e.with_context("file_path", file_path)) + } + Err(err) => Err(err), } + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_error.send(Err(error)).await; + } + }); + + return Ok(rx.boxed()); + } + + async fn process_file_scan_task( + task: FileScanTask, + batch_size: Option, + file_io: FileIO, + mut tx: Sender>, + ) -> Result<()> { + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; - let parquet_file = file_io - .new_input(task.data_file_path())?; + if let Some(predicates) = task.predicate() { + visit(&mut collector, predicates)?; + } - let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; - let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + let parquet_file = file_io.new_input(task.data_file_path())?; - let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader) - .await?; + let (parquet_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - let parquet_schema = batch_stream_builder.parquet_schema(); - let arrow_schema = batch_stream_builder.schema(); - let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?; - batch_stream_builder = batch_stream_builder.with_projection(projection_mask); + let mut batch_stream_builder = + ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?; - let parquet_schema = batch_stream_builder.parquet_schema(); - let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?; + let parquet_schema = batch_stream_builder.parquet_schema(); + let arrow_schema = batch_stream_builder.schema(); + let projection_mask = Self::get_arrow_projection_mask( + task.project_field_ids(), + task.schema(), + parquet_schema, + arrow_schema, + )?; + batch_stream_builder = batch_stream_builder.with_projection(projection_mask); - if let Some(row_filter) = row_filter { - batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); - } + let parquet_schema = batch_stream_builder.parquet_schema(); + let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?; - if let Some(batch_size) = self.batch_size { - batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); - } + if let Some(row_filter) = row_filter { + batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); + } - let mut batch_stream = batch_stream_builder.build()?; + if let Some(batch_size) = batch_size { + batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); + } - while let Some(batch) = batch_stream.next().await { - yield batch?; - } - } - Err(e) => { - Err(e)? - } - } - } + let mut batch_stream = batch_stream_builder.build()?; + + while let Some(batch) = batch_stream.try_next().await? { + tx.send(Ok(batch)).await? } - .boxed()) + + Ok(()) } fn get_arrow_projection_mask( - &self, field_ids: &[i32], iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - ) -> crate::Result { + ) -> Result { if field_ids.is_empty() { Ok(ProjectionMask::all()) } else { @@ -216,7 +270,6 @@ impl ArrowReader { } fn get_row_filter( - &self, predicates: Option<&BoundPredicate>, parquet_schema: &SchemaDescriptor, collector: &CollectFieldIdVisitor, diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 6c41ddc914..d8d66d3b40 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -55,8 +55,9 @@ pub struct TableScanBuilder<'a> { batch_size: Option, case_sensitive: bool, filter: Option, - concurrency_limit_manifest_files: usize, + concurrency_limit_data_files: usize, concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, } impl<'a> TableScanBuilder<'a> { @@ -72,8 +73,9 @@ impl<'a> TableScanBuilder<'a> { batch_size: None, case_sensitive: true, filter: None, - concurrency_limit_manifest_files: num_cpus, + concurrency_limit_data_files: num_cpus, concurrency_limit_manifest_entries: num_cpus, + concurrency_limit_manifest_files: num_cpus, } } @@ -124,12 +126,13 @@ impl<'a> TableScanBuilder<'a> { pub fn with_concurrency_limit(mut self, limit: usize) -> Self { self.concurrency_limit_manifest_files = limit; self.concurrency_limit_manifest_entries = limit; + self.concurrency_limit_data_files = limit; self } - /// Sets the manifest file concurrency limit for this scan - pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self { - self.concurrency_limit_manifest_files = limit; + /// Sets the data file concurrency limit for this scan + pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_data_files = limit; self } @@ -139,6 +142,12 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the manifest file concurrency limit for this scan + pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_files = limit; + self + } + /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -244,10 +253,11 @@ impl<'a> TableScanBuilder<'a> { Ok(TableScan { batch_size: self.batch_size, column_names: self.column_names, - concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, file_io: self.table.file_io().clone(), plan_context, + concurrency_limit_data_files: self.concurrency_limit_data_files, concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, }) } } @@ -266,6 +276,10 @@ pub struct TableScan { /// The maximum number of [`ManifestEntry`]s that will /// be processed in parallel concurrency_limit_manifest_entries: usize, + + /// The maximum number of [`ManifestEntry`]s that will + /// be processed in parallel + concurrency_limit_data_files: usize, } /// PlanContext wraps a [`SnapshotRef`] alongside all the other @@ -350,7 +364,8 @@ impl TableScan { /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { - let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()); + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); From c9566ec25b471a6187b337f34028d5fd1fd82966 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Tue, 13 Aug 2024 19:41:26 +0100 Subject: [PATCH 2/2] refactor: centralize infallible `available_parallelism` fn. Use better channel size limit in arrow read --- crates/iceberg/src/arrow/reader.rs | 11 ++++---- crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/scan.rs | 5 ++-- crates/iceberg/src/utils.rs | 42 ++++++++++++++++++++++++++++++ 4 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 crates/iceberg/src/utils.rs diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6a004438f7..ebef735b13 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -46,6 +46,7 @@ use crate::io::{FileIO, FileMetadata, FileRead}; use crate::runtime::spawn; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, Schema}; +use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; /// Builder to create ArrowReader @@ -58,9 +59,7 @@ pub struct ArrowReaderBuilder { impl ArrowReaderBuilder { /// Create a new ArrowReaderBuilder pub(crate) fn new(file_io: FileIO) -> Self { - let num_cpus = std::thread::available_parallelism() - .expect("failed to get number of CPUs") - .get(); + let num_cpus = available_parallelism().get(); ArrowReaderBuilder { batch_size: None, @@ -109,16 +108,16 @@ impl ArrowReader { pub fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); let batch_size = self.batch_size; - let max_concurrent_fetching_datafiles = self.concurrency_limit_data_files; + let concurrency_limit_data_files = self.concurrency_limit_data_files; - let (tx, rx) = channel(10); + let (tx, rx) = channel(concurrency_limit_data_files); let mut channel_for_error = tx.clone(); spawn(async move { let result = tasks .map(|task| Ok((task, file_io.clone(), tx.clone()))) .try_for_each_concurrent( - max_concurrent_fetching_datafiles, + concurrency_limit_data_files, |(file_scan_task, file_io, tx)| async move { match file_scan_task { Ok(task) => { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 9682fa1873..79c97ae660 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -47,4 +47,5 @@ pub mod transform; mod runtime; pub mod arrow; +mod utils; pub mod writer; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index d8d66d3b40..8a178dde9b 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -39,6 +39,7 @@ use crate::spec::{ SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; +use crate::utils::available_parallelism; use crate::{Error, ErrorKind, Result}; /// A stream of [`FileScanTask`]. @@ -62,9 +63,7 @@ pub struct TableScanBuilder<'a> { impl<'a> TableScanBuilder<'a> { pub(crate) fn new(table: &'a Table) -> Self { - let num_cpus = std::thread::available_parallelism() - .expect("failed to get number of CPUs") - .get(); + let num_cpus = available_parallelism().get(); Self { table, diff --git a/crates/iceberg/src/utils.rs b/crates/iceberg/src/utils.rs new file mode 100644 index 0000000000..70514cccb1 --- /dev/null +++ b/crates/iceberg/src/utils.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::num::NonZero; + +// Use a default value of 1 as the safest option. +// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations +// for more details. +const DEFAULT_PARALLELISM: usize = 1; + +/// Uses [`std::thread::available_parallelism`] in order to +/// retrieve an estimate of the default amount of parallelism +/// that should be used. Note that [`std::thread::available_parallelism`] +/// returns a `Result` as it can fail, so here we use +/// a default value instead. +/// Note: we don't use a OnceCell or LazyCell here as there +/// are circumstances where the level of available +/// parallelism can change during the lifetime of an executing +/// process, but this should not be called in a hot loop. +pub(crate) fn available_parallelism() -> NonZero { + std::thread::available_parallelism().unwrap_or_else(|_err| { + // Failed to get the level of parallelism. + // TODO: log/trace when this fallback occurs. + + // Using a default value. + NonZero::new(DEFAULT_PARALLELISM).unwrap() + }) +}