Skip to content

Commit f2c1143

Browse files
committed
feat: concurrent data file fetches, parallel RecordBatch processing
1 parent c696a3f commit f2c1143

File tree

3 files changed

+135
-66
lines changed

3 files changed

+135
-66
lines changed

crates/iceberg/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ arrow-schema = { workspace = true }
5050
arrow-select = { workspace = true }
5151
arrow-string = { workspace = true }
5252
async-std = { workspace = true, optional = true, features = ["attributes"] }
53-
async-stream = { workspace = true }
5453
async-trait = { workspace = true }
5554
bimap = { workspace = true }
5655
bitvec = { workspace = true }

crates/iceberg/src/arrow/reader.rs

Lines changed: 116 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
2727
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
2828
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
2929
use arrow_string::like::starts_with;
30-
use async_stream::try_stream;
3130
use bytes::Bytes;
3231
use fnv::FnvHashSet;
32+
use futures::channel::mpsc::{channel, Sender};
3333
use futures::future::BoxFuture;
34-
use futures::stream::StreamExt;
35-
use futures::{try_join, TryFutureExt};
34+
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
3635
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
3736
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
3837
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
@@ -44,14 +43,31 @@ use crate::error::Result;
4443
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
4544
use crate::expr::{BoundPredicate, BoundReference};
4645
use crate::io::{FileIO, FileMetadata, FileRead};
47-
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
46+
use crate::runtime::spawn;
47+
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
4848
use crate::spec::{Datum, Schema};
4949
use crate::{Error, ErrorKind};
5050

5151
/// Builder to create ArrowReader
5252
pub struct ArrowReaderBuilder {
5353
batch_size: Option<usize>,
5454
file_io: FileIO,
55+
config: ArrowReaderConfig,
56+
}
57+
58+
#[derive(Clone)]
59+
struct ArrowReaderConfig {
60+
/// the maximum number of data files that can be fetched at the same time
61+
concurrency_limit_data_files: usize,
62+
}
63+
64+
impl Default for ArrowReaderConfig {
65+
fn default() -> Self {
66+
let num_cores = num_cpus::get();
67+
Self {
68+
concurrency_limit_data_files: num_cores,
69+
}
70+
}
5571
}
5672

5773
impl ArrowReaderBuilder {
@@ -60,9 +76,19 @@ impl ArrowReaderBuilder {
6076
ArrowReaderBuilder {
6177
batch_size: None,
6278
file_io,
79+
config: ArrowReaderConfig::default(),
6380
}
6481
}
6582

83+
/// Sets the max number of in flight data files that are being fetched
84+
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
85+
self.config = ArrowReaderConfig {
86+
concurrency_limit_data_files: val,
87+
};
88+
89+
self
90+
}
91+
6692
/// Sets the desired size of batches in the response
6793
/// to something other than the default
6894
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
@@ -75,6 +101,7 @@ impl ArrowReaderBuilder {
75101
ArrowReader {
76102
batch_size: self.batch_size,
77103
file_io: self.file_io,
104+
config: self.config,
78105
}
79106
}
80107
}
@@ -84,73 +111,106 @@ impl ArrowReaderBuilder {
84111
pub struct ArrowReader {
85112
batch_size: Option<usize>,
86113
file_io: FileIO,
114+
config: ArrowReaderConfig,
87115
}
88116

89117
impl ArrowReader {
90118
/// Take a stream of FileScanTasks and reads all the files.
91119
/// Returns a stream of Arrow RecordBatches containing the data from the files
92-
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
120+
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
93121
let file_io = self.file_io.clone();
94-
95-
Ok(try_stream! {
96-
while let Some(task_result) = tasks.next().await {
97-
match task_result {
98-
Ok(task) => {
99-
// Collect Parquet column indices from field ids
100-
let mut collector = CollectFieldIdVisitor {
101-
field_ids: HashSet::default(),
102-
};
103-
if let Some(predicates) = task.predicate() {
104-
visit(&mut collector, predicates)?;
105-
}
106-
107-
let parquet_file = file_io
108-
.new_input(task.data_file_path())?;
109-
110-
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
111-
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
112-
113-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
114-
.await?;
115-
116-
let parquet_schema = batch_stream_builder.parquet_schema();
117-
let arrow_schema = batch_stream_builder.schema();
118-
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
119-
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
120-
121-
let parquet_schema = batch_stream_builder.parquet_schema();
122-
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
123-
124-
if let Some(row_filter) = row_filter {
125-
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
126-
}
127-
128-
if let Some(batch_size) = self.batch_size {
129-
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
130-
}
131-
132-
let mut batch_stream = batch_stream_builder.build()?;
133-
134-
while let Some(batch) = batch_stream.next().await {
135-
yield batch?;
136-
}
137-
}
138-
Err(e) => {
139-
Err(e)?
140-
}
141-
}
122+
let batch_size = self.batch_size;
123+
let max_concurrent_fetching_datafiles = self.config.concurrency_limit_data_files;
124+
125+
let (tx, rx) = channel(0);
126+
127+
spawn(async move {
128+
tasks
129+
.map(|task| Ok((task, file_io.clone(), tx.clone())))
130+
.try_for_each_concurrent(
131+
max_concurrent_fetching_datafiles,
132+
|(file_scan_task, file_io, tx)| async move {
133+
spawn(async move {
134+
Self::process_file_scan_task(file_scan_task, batch_size, file_io, tx)
135+
.await
136+
})
137+
.await
138+
},
139+
)
140+
.await
141+
});
142+
143+
return Ok(rx.boxed());
144+
}
145+
146+
async fn process_file_scan_task(
147+
task_res: Result<FileScanTask>,
148+
batch_size: Option<usize>,
149+
file_io: FileIO,
150+
mut tx: Sender<Result<RecordBatch>>,
151+
) -> Result<()> {
152+
let task = match task_res {
153+
Ok(task) => task,
154+
Err(err) => {
155+
tx.send(Err(err)).await?;
156+
return Ok(());
142157
}
158+
};
159+
160+
// Collect Parquet column indices from field ids
161+
let mut collector = CollectFieldIdVisitor {
162+
field_ids: HashSet::default(),
163+
};
164+
165+
if let Some(predicates) = task.predicate() {
166+
visit(&mut collector, predicates)?;
143167
}
144-
.boxed())
168+
169+
let parquet_file = file_io.new_input(task.data_file_path())?;
170+
171+
let (parquet_metadata, parquet_reader) =
172+
try_join!(parquet_file.metadata(), parquet_file.reader())?;
173+
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
174+
175+
let mut batch_stream_builder =
176+
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
177+
178+
let parquet_schema = batch_stream_builder.parquet_schema();
179+
let arrow_schema = batch_stream_builder.schema();
180+
let projection_mask = Self::get_arrow_projection_mask(
181+
task.project_field_ids(),
182+
task.schema(),
183+
parquet_schema,
184+
arrow_schema,
185+
)?;
186+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
187+
188+
let parquet_schema = batch_stream_builder.parquet_schema();
189+
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;
190+
191+
if let Some(row_filter) = row_filter {
192+
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
193+
}
194+
195+
if let Some(batch_size) = batch_size {
196+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
197+
}
198+
199+
let mut batch_stream = batch_stream_builder.build()?;
200+
201+
while let Some(batch) = batch_stream.try_next().await? {
202+
tx.send(Ok(batch)).await?
203+
}
204+
205+
Ok(())
145206
}
146207

147208
fn get_arrow_projection_mask(
148-
&self,
149209
field_ids: &[i32],
150210
iceberg_schema_of_task: &Schema,
151211
parquet_schema: &SchemaDescriptor,
152212
arrow_schema: &ArrowSchemaRef,
153-
) -> crate::Result<ProjectionMask> {
213+
) -> Result<ProjectionMask> {
154214
if field_ids.is_empty() {
155215
Ok(ProjectionMask::all())
156216
} else {
@@ -216,7 +276,6 @@ impl ArrowReader {
216276
}
217277

218278
fn get_row_filter(
219-
&self,
220279
predicates: Option<&BoundPredicate>,
221280
parquet_schema: &SchemaDescriptor,
222281
collector: &CollectFieldIdVisitor,

crates/iceberg/src/scan.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub struct TableScanConfig {
5757
/// The maximum number of [`ManifestEntry`]s that will
5858
/// be processed in parallel
5959
concurrency_limit_manifest_entries: usize,
60+
61+
/// passed down to ArrowReader
62+
concurrency_limit_data_files: usize,
6063
}
6164

6265
impl Default for TableScanConfig {
@@ -65,6 +68,7 @@ impl Default for TableScanConfig {
6568
Self {
6669
concurrency_limit_manifest_files: num_cpus,
6770
concurrency_limit_manifest_entries: num_cpus,
71+
concurrency_limit_data_files: num_cpus,
6872
}
6973
}
7074
}
@@ -148,6 +152,7 @@ impl<'a> TableScanBuilder<'a> {
148152
self.table_scan_config = TableScanConfig {
149153
concurrency_limit_manifest_files: limit,
150154
concurrency_limit_manifest_entries: limit,
155+
concurrency_limit_data_files: limit,
151156
};
152157
self
153158
}
@@ -156,20 +161,25 @@ impl<'a> TableScanBuilder<'a> {
156161
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
157162
self.table_scan_config = TableScanConfig {
158163
concurrency_limit_manifest_files: limit,
159-
concurrency_limit_manifest_entries: self
160-
.table_scan_config
161-
.concurrency_limit_manifest_entries,
164+
..self.table_scan_config
162165
};
163166
self
164167
}
165168

166169
/// sets the manifest entry concurrency limit for this scan
167-
pub fn with_manifest_entru_concurrency_limit(mut self, limit: usize) -> Self {
170+
pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
168171
self.table_scan_config = TableScanConfig {
169172
concurrency_limit_manifest_entries: limit,
170-
concurrency_limit_manifest_files: self
171-
.table_scan_config
172-
.concurrency_limit_manifest_files,
173+
..self.table_scan_config
174+
};
175+
self
176+
}
177+
178+
/// sets the data file concurrency limit for this scan
179+
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
180+
self.table_scan_config = TableScanConfig {
181+
concurrency_limit_data_files: limit,
182+
..self.table_scan_config
173183
};
174184
self
175185
}
@@ -368,7 +378,8 @@ impl TableScan {
368378

369379
/// Returns an [`ArrowRecordBatchStream`].
370380
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
371-
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone());
381+
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
382+
.with_data_file_concurrency_limit(self.table_scan_config.concurrency_limit_data_files);
372383

373384
if let Some(batch_size) = self.batch_size {
374385
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);

0 commit comments

Comments
 (0)