Skip to content

Commit c8c73db

Browse files
committed
feat: concurrent data file fetches, parallel RecordBatch processing
1 parent 80c1399 commit c8c73db

File tree

3 files changed

+137
-56
lines changed

3 files changed

+137
-56
lines changed

crates/iceberg/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ arrow-schema = { workspace = true }
5050
arrow-select = { workspace = true }
5151
arrow-string = { workspace = true }
5252
async-std = { workspace = true, optional = true, features = ["attributes"] }
53-
async-stream = { workspace = true }
5453
async-trait = { workspace = true }
5554
bimap = { workspace = true }
5655
bitvec = { workspace = true }

crates/iceberg/src/arrow/reader.rs

Lines changed: 115 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
2727
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
2828
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
2929
use arrow_string::like::starts_with;
30-
use async_stream::try_stream;
3130
use bytes::Bytes;
3231
use fnv::FnvHashSet;
32+
use futures::channel::mpsc::{channel, Sender};
3333
use futures::future::BoxFuture;
34-
use futures::stream::StreamExt;
35-
use futures::{try_join, TryFutureExt};
34+
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
3635
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
3736
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
3837
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
@@ -44,14 +43,34 @@ use crate::error::Result;
4443
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
4544
use crate::expr::{BoundPredicate, BoundReference};
4645
use crate::io::{FileIO, FileMetadata, FileRead};
47-
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
46+
use crate::runtime::spawn;
47+
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
4848
use crate::spec::{Datum, Schema};
4949
use crate::{Error, ErrorKind};
5050

5151
/// Builder to create ArrowReader
5252
pub struct ArrowReaderBuilder {
5353
batch_size: Option<usize>,
5454
file_io: FileIO,
55+
config: ArrowReaderConfig,
56+
}
57+
58+
#[derive(Clone)]
59+
struct ArrowReaderConfig {
60+
/// the maximum number of data files that can be fetched at the same time
61+
concurrency_limit_data_files: usize,
62+
}
63+
64+
impl Default for ArrowReaderConfig {
65+
fn default() -> Self {
66+
let num_cpus = std::thread::available_parallelism()
67+
.expect("failed to get number of CPUs")
68+
.get();
69+
70+
Self {
71+
concurrency_limit_data_files: num_cpus,
72+
}
73+
}
5574
}
5675

5776
impl ArrowReaderBuilder {
@@ -60,9 +79,19 @@ impl ArrowReaderBuilder {
6079
ArrowReaderBuilder {
6180
batch_size: None,
6281
file_io,
82+
config: ArrowReaderConfig::default(),
6383
}
6484
}
6585

86+
/// Sets the max number of in flight data files that are being fetched
87+
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88+
self.config = ArrowReaderConfig {
89+
concurrency_limit_data_files: val,
90+
};
91+
92+
self
93+
}
94+
6695
/// Sets the desired size of batches in the response
6796
/// to something other than the default
6897
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
@@ -75,6 +104,7 @@ impl ArrowReaderBuilder {
75104
ArrowReader {
76105
batch_size: self.batch_size,
77106
file_io: self.file_io,
107+
config: self.config,
78108
}
79109
}
80110
}
@@ -84,73 +114,111 @@ impl ArrowReaderBuilder {
84114
pub struct ArrowReader {
85115
batch_size: Option<usize>,
86116
file_io: FileIO,
117+
config: ArrowReaderConfig,
87118
}
88119

89120
impl ArrowReader {
90121
/// Take a stream of FileScanTasks and reads all the files.
91122
/// Returns a stream of Arrow RecordBatches containing the data from the files
92-
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
123+
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
93124
let file_io = self.file_io.clone();
94-
95-
Ok(try_stream! {
96-
while let Some(task_result) = tasks.next().await {
97-
match task_result {
98-
Ok(task) => {
99-
// Collect Parquet column indices from field ids
100-
let mut collector = CollectFieldIdVisitor {
101-
field_ids: HashSet::default(),
102-
};
103-
if let Some(predicates) = task.predicate() {
104-
visit(&mut collector, predicates)?;
125+
let batch_size = self.batch_size;
126+
let max_concurrent_fetching_datafiles = self.config.concurrency_limit_data_files;
127+
128+
let (tx, rx) = channel(10);
129+
let mut channel_for_error = tx.clone();
130+
131+
spawn(async move {
132+
let result = tasks
133+
.map(|task| Ok((task, file_io.clone(), tx.clone())))
134+
.try_for_each_concurrent(
135+
max_concurrent_fetching_datafiles,
136+
|(file_scan_task, file_io, tx)| async move {
137+
match file_scan_task {
138+
Ok(task) => {
139+
let file_path = task.data_file_path().to_string();
140+
141+
spawn(async move {
142+
Self::process_file_scan_task(task, batch_size, file_io, tx)
143+
.await
144+
})
145+
.await
146+
.map_err(|e| e.with_context("file_path", file_path))
147+
}
148+
Err(err) => Err(err),
105149
}
150+
},
151+
)
152+
.await;
106153

107-
let parquet_file = file_io
108-
.new_input(task.data_file_path())?;
154+
if let Err(error) = result {
155+
let _ = channel_for_error.send(Err(error)).await;
156+
}
157+
});
109158

110-
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
111-
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
159+
return Ok(rx.boxed());
160+
}
112161

113-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
114-
.await?;
162+
async fn process_file_scan_task(
163+
task: FileScanTask,
164+
batch_size: Option<usize>,
165+
file_io: FileIO,
166+
mut tx: Sender<Result<RecordBatch>>,
167+
) -> Result<()> {
168+
// Collect Parquet column indices from field ids
169+
let mut collector = CollectFieldIdVisitor {
170+
field_ids: HashSet::default(),
171+
};
115172

116-
let parquet_schema = batch_stream_builder.parquet_schema();
117-
let arrow_schema = batch_stream_builder.schema();
118-
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
119-
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
173+
if let Some(predicates) = task.predicate() {
174+
visit(&mut collector, predicates)?;
175+
}
120176

121-
let parquet_schema = batch_stream_builder.parquet_schema();
122-
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
177+
let parquet_file = file_io.new_input(task.data_file_path())?;
123178

124-
if let Some(row_filter) = row_filter {
125-
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
126-
}
179+
let (parquet_metadata, parquet_reader) =
180+
try_join!(parquet_file.metadata(), parquet_file.reader())?;
181+
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
127182

128-
if let Some(batch_size) = self.batch_size {
129-
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
130-
}
183+
let mut batch_stream_builder =
184+
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
131185

132-
let mut batch_stream = batch_stream_builder.build()?;
186+
let parquet_schema = batch_stream_builder.parquet_schema();
187+
let arrow_schema = batch_stream_builder.schema();
188+
let projection_mask = Self::get_arrow_projection_mask(
189+
task.project_field_ids(),
190+
task.schema(),
191+
parquet_schema,
192+
arrow_schema,
193+
)?;
194+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
133195

134-
while let Some(batch) = batch_stream.next().await {
135-
yield batch?;
136-
}
137-
}
138-
Err(e) => {
139-
Err(e)?
140-
}
141-
}
142-
}
196+
let parquet_schema = batch_stream_builder.parquet_schema();
197+
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;
198+
199+
if let Some(row_filter) = row_filter {
200+
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
201+
}
202+
203+
if let Some(batch_size) = batch_size {
204+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
205+
}
206+
207+
let mut batch_stream = batch_stream_builder.build()?;
208+
209+
while let Some(batch) = batch_stream.try_next().await? {
210+
tx.send(Ok(batch)).await?
143211
}
144-
.boxed())
212+
213+
Ok(())
145214
}
146215

147216
fn get_arrow_projection_mask(
148-
&self,
149217
field_ids: &[i32],
150218
iceberg_schema_of_task: &Schema,
151219
parquet_schema: &SchemaDescriptor,
152220
arrow_schema: &ArrowSchemaRef,
153-
) -> crate::Result<ProjectionMask> {
221+
) -> Result<ProjectionMask> {
154222
if field_ids.is_empty() {
155223
Ok(ProjectionMask::all())
156224
} else {
@@ -216,7 +284,6 @@ impl ArrowReader {
216284
}
217285

218286
fn get_row_filter(
219-
&self,
220287
predicates: Option<&BoundPredicate>,
221288
parquet_schema: &SchemaDescriptor,
222289
collector: &CollectFieldIdVisitor,

crates/iceberg/src/scan.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ pub struct TableScanBuilder<'a> {
5555
batch_size: Option<usize>,
5656
case_sensitive: bool,
5757
filter: Option<Predicate>,
58-
concurrency_limit_manifest_files: usize,
58+
concurrency_limit_data_files: usize,
5959
concurrency_limit_manifest_entries: usize,
60+
concurrency_limit_manifest_files: usize,
6061
}
6162

6263
impl<'a> TableScanBuilder<'a> {
@@ -72,8 +73,9 @@ impl<'a> TableScanBuilder<'a> {
7273
batch_size: None,
7374
case_sensitive: true,
7475
filter: None,
75-
concurrency_limit_manifest_files: num_cpus,
76+
concurrency_limit_data_files: num_cpus,
7677
concurrency_limit_manifest_entries: num_cpus,
78+
concurrency_limit_manifest_files: num_cpus,
7779
}
7880
}
7981

@@ -124,12 +126,13 @@ impl<'a> TableScanBuilder<'a> {
124126
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
125127
self.concurrency_limit_manifest_files = limit;
126128
self.concurrency_limit_manifest_entries = limit;
129+
self.concurrency_limit_data_files = limit;
127130
self
128131
}
129132

130-
/// Sets the manifest file concurrency limit for this scan
131-
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
132-
self.concurrency_limit_manifest_files = limit;
133+
/// Sets the data file concurrency limit for this scan
134+
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
135+
self.concurrency_limit_data_files = limit;
133136
self
134137
}
135138

@@ -139,6 +142,12 @@ impl<'a> TableScanBuilder<'a> {
139142
self
140143
}
141144

145+
/// Sets the manifest file concurrency limit for this scan
146+
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
147+
self.concurrency_limit_manifest_files = limit;
148+
self
149+
}
150+
142151
/// Build the table scan.
143152
pub fn build(self) -> Result<TableScan> {
144153
let snapshot = match self.snapshot_id {
@@ -244,10 +253,11 @@ impl<'a> TableScanBuilder<'a> {
244253
Ok(TableScan {
245254
batch_size: self.batch_size,
246255
column_names: self.column_names,
247-
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
248256
file_io: self.table.file_io().clone(),
249257
plan_context,
258+
concurrency_limit_data_files: self.concurrency_limit_data_files,
250259
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
260+
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
251261
})
252262
}
253263
}
@@ -266,6 +276,10 @@ pub struct TableScan {
266276
/// The maximum number of [`ManifestEntry`]s that will
267277
/// be processed in parallel
268278
concurrency_limit_manifest_entries: usize,
279+
280+
/// The maximum number of [`ManifestEntry`]s that will
281+
/// be processed in parallel
282+
concurrency_limit_data_files: usize,
269283
}
270284

271285
/// PlanContext wraps a [`SnapshotRef`] alongside all the other
@@ -338,7 +352,8 @@ impl TableScan {
338352

339353
/// Returns an [`ArrowRecordBatchStream`].
340354
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
341-
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone());
355+
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
356+
.with_data_file_concurrency_limit(self.concurrency_limit_data_files);
342357

343358
if let Some(batch_size) = self.batch_size {
344359
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);

0 commit comments

Comments
 (0)