@@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
2727use arrow_ord:: cmp:: { eq, gt, gt_eq, lt, lt_eq, neq} ;
2828use arrow_schema:: { ArrowError , DataType , SchemaRef as ArrowSchemaRef } ;
2929use arrow_string:: like:: starts_with;
30- use async_stream:: try_stream;
3130use bytes:: Bytes ;
3231use fnv:: FnvHashSet ;
32+ use futures:: channel:: mpsc:: { channel, Sender } ;
3333use futures:: future:: BoxFuture ;
34- use futures:: stream:: StreamExt ;
35- use futures:: { try_join, TryFutureExt } ;
34+ use futures:: { try_join, SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
3635use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , RowFilter } ;
3736use parquet:: arrow:: async_reader:: { AsyncFileReader , MetadataLoader } ;
3837use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask , PARQUET_FIELD_ID_META_KEY } ;
@@ -44,14 +43,36 @@ use crate::error::Result;
4443use crate :: expr:: visitors:: bound_predicate_visitor:: { visit, BoundPredicateVisitor } ;
4544use crate :: expr:: { BoundPredicate , BoundReference } ;
4645use crate :: io:: { FileIO , FileMetadata , FileRead } ;
47- use crate :: scan:: { ArrowRecordBatchStream , FileScanTaskStream } ;
46+ use crate :: runtime:: spawn;
47+ use crate :: scan:: { ArrowRecordBatchStream , FileScanTask , FileScanTaskStream } ;
4848use crate :: spec:: { Datum , Schema } ;
4949use crate :: { Error , ErrorKind } ;
5050
51+ const DEFAULT_CONCURRENCY_LIMIT_DATA_FILES : usize = 512 ;
52+ const DEFAULT_CPU_MULTIPLIER_QUEUE_LIMIT_DATA_FILES : f32 = 1f32 ;
53+
5154/// Builder to create ArrowReader
5255pub struct ArrowReaderBuilder {
5356 batch_size : Option < usize > ,
5457 file_io : FileIO ,
58+ config : ArrowReaderConfig ,
59+ }
60+
61+ #[ derive( Clone ) ]
62+ struct ArrowReaderConfig {
63+ max_concurrent_fetching_datafiles : usize ,
64+ max_data_file_task_processing_parallelism : usize ,
65+ }
66+
67+ impl Default for ArrowReaderConfig {
68+ fn default ( ) -> Self {
69+ Self {
70+ max_concurrent_fetching_datafiles : DEFAULT_CONCURRENCY_LIMIT_DATA_FILES ,
71+ max_data_file_task_processing_parallelism : ( num_cpus:: get ( ) as f32
72+ * DEFAULT_CPU_MULTIPLIER_QUEUE_LIMIT_DATA_FILES )
73+ as usize ,
74+ }
75+ }
5576}
5677
5778impl ArrowReaderBuilder {
@@ -60,9 +81,32 @@ impl ArrowReaderBuilder {
6081 ArrowReaderBuilder {
6182 batch_size : None ,
6283 file_io,
84+ config : ArrowReaderConfig :: default ( ) ,
6385 }
6486 }
6587
88+ /// Sets the max number of in flight data files that are being fetched
89+ pub fn with_max_concurrent_fetching_datafiles ( mut self , val : usize ) -> Self {
90+ self . config = ArrowReaderConfig {
91+ max_concurrent_fetching_datafiles : val,
92+ max_data_file_task_processing_parallelism : self
93+ . config
94+ . max_data_file_task_processing_parallelism ,
95+ } ;
96+
97+ self
98+ }
99+
100+ /// Sets the max number of data files that can be processed in parallel
101+ pub fn with_data_file_processing_parallelism ( mut self , val : usize ) -> Self {
102+ self . config = ArrowReaderConfig {
103+ max_concurrent_fetching_datafiles : self . config . max_concurrent_fetching_datafiles ,
104+ max_data_file_task_processing_parallelism : val,
105+ } ;
106+
107+ self
108+ }
109+
66110 /// Sets the desired size of batches in the response
67111 /// to something other than the default
68112 pub fn with_batch_size ( mut self , batch_size : usize ) -> Self {
@@ -75,6 +119,7 @@ impl ArrowReaderBuilder {
75119 ArrowReader {
76120 batch_size : self . batch_size ,
77121 file_io : self . file_io ,
122+ config : self . config ,
78123 }
79124 }
80125}
@@ -84,73 +129,106 @@ impl ArrowReaderBuilder {
84129pub struct ArrowReader {
85130 batch_size : Option < usize > ,
86131 file_io : FileIO ,
132+ config : ArrowReaderConfig ,
87133}
88134
89135impl ArrowReader {
90136 /// Take a stream of FileScanTasks and reads all the files.
91137 /// Returns a stream of Arrow RecordBatches containing the data from the files
92- pub fn read ( self , mut tasks : FileScanTaskStream ) -> crate :: Result < ArrowRecordBatchStream > {
138+ pub fn read ( self , tasks : FileScanTaskStream ) -> Result < ArrowRecordBatchStream > {
93139 let file_io = self . file_io . clone ( ) ;
140+ let batch_size = self . batch_size ;
141+ let max_concurrent_fetching_datafiles = self . config . max_concurrent_fetching_datafiles ;
142+
143+ let ( tx, rx) = channel ( self . config . max_data_file_task_processing_parallelism ) ;
144+
145+ spawn ( async move {
146+ tasks
147+ . map ( |task| Ok ( ( task, file_io. clone ( ) , tx. clone ( ) ) ) )
148+ . try_for_each_concurrent (
149+ max_concurrent_fetching_datafiles,
150+ |( file_scan_task, file_io, tx) | async move {
151+ spawn ( async move {
152+ Self :: process_file_scan_task ( file_scan_task, batch_size, file_io, tx)
153+ . await
154+ } )
155+ . await
156+ } ,
157+ )
158+ . await
159+ } ) ;
160+
161+ return Ok ( rx. boxed ( ) ) ;
162+ }
94163
95- Ok ( try_stream ! {
96- while let Some ( task_result) = tasks. next( ) . await {
97- match task_result {
98- Ok ( task) => {
99- // Collect Parquet column indices from field ids
100- let mut collector = CollectFieldIdVisitor {
101- field_ids: HashSet :: default ( ) ,
102- } ;
103- if let Some ( predicates) = task. predicate( ) {
104- visit( & mut collector, predicates) ?;
105- }
106-
107- let parquet_file = file_io
108- . new_input( task. data_file_path( ) ) ?;
109-
110- let ( parquet_metadata, parquet_reader) = try_join!( parquet_file. metadata( ) , parquet_file. reader( ) ) ?;
111- let arrow_file_reader = ArrowFileReader :: new( parquet_metadata, parquet_reader) ;
112-
113- let mut batch_stream_builder = ParquetRecordBatchStreamBuilder :: new( arrow_file_reader)
114- . await ?;
115-
116- let parquet_schema = batch_stream_builder. parquet_schema( ) ;
117- let arrow_schema = batch_stream_builder. schema( ) ;
118- let projection_mask = self . get_arrow_projection_mask( task. project_field_ids( ) , task. schema( ) , parquet_schema, arrow_schema) ?;
119- batch_stream_builder = batch_stream_builder. with_projection( projection_mask) ;
120-
121- let parquet_schema = batch_stream_builder. parquet_schema( ) ;
122- let row_filter = self . get_row_filter( task. predicate( ) , parquet_schema, & collector) ?;
123-
124- if let Some ( row_filter) = row_filter {
125- batch_stream_builder = batch_stream_builder. with_row_filter( row_filter) ;
126- }
127-
128- if let Some ( batch_size) = self . batch_size {
129- batch_stream_builder = batch_stream_builder. with_batch_size( batch_size) ;
130- }
131-
132- let mut batch_stream = batch_stream_builder. build( ) ?;
133-
134- while let Some ( batch) = batch_stream. next( ) . await {
135- yield batch?;
136- }
137- }
138- Err ( e) => {
139- Err ( e) ?
140- }
141- }
164+ async fn process_file_scan_task (
165+ task_res : Result < FileScanTask > ,
166+ batch_size : Option < usize > ,
167+ file_io : FileIO ,
168+ mut tx : Sender < Result < RecordBatch > > ,
169+ ) -> Result < ( ) > {
170+ let task = match task_res {
171+ Ok ( task) => task,
172+ Err ( err) => {
173+ tx. send ( Err ( err) ) . await ?;
174+ return Ok ( ( ) ) ;
142175 }
176+ } ;
177+
178+ // Collect Parquet column indices from field ids
179+ let mut collector = CollectFieldIdVisitor {
180+ field_ids : HashSet :: default ( ) ,
181+ } ;
182+
183+ if let Some ( predicates) = task. predicate ( ) {
184+ visit ( & mut collector, predicates) ?;
185+ }
186+
187+ let parquet_file = file_io. new_input ( task. data_file_path ( ) ) ?;
188+
189+ let ( parquet_metadata, parquet_reader) =
190+ try_join ! ( parquet_file. metadata( ) , parquet_file. reader( ) ) ?;
191+ let arrow_file_reader = ArrowFileReader :: new ( parquet_metadata, parquet_reader) ;
192+
193+ let mut batch_stream_builder =
194+ ParquetRecordBatchStreamBuilder :: new ( arrow_file_reader) . await ?;
195+
196+ let parquet_schema = batch_stream_builder. parquet_schema ( ) ;
197+ let arrow_schema = batch_stream_builder. schema ( ) ;
198+ let projection_mask = Self :: get_arrow_projection_mask (
199+ task. project_field_ids ( ) ,
200+ task. schema ( ) ,
201+ parquet_schema,
202+ arrow_schema,
203+ ) ?;
204+ batch_stream_builder = batch_stream_builder. with_projection ( projection_mask) ;
205+
206+ let parquet_schema = batch_stream_builder. parquet_schema ( ) ;
207+ let row_filter = Self :: get_row_filter ( task. predicate ( ) , parquet_schema, & collector) ?;
208+
209+ if let Some ( row_filter) = row_filter {
210+ batch_stream_builder = batch_stream_builder. with_row_filter ( row_filter) ;
211+ }
212+
213+ if let Some ( batch_size) = batch_size {
214+ batch_stream_builder = batch_stream_builder. with_batch_size ( batch_size) ;
143215 }
144- . boxed ( ) )
216+
217+ let mut batch_stream = batch_stream_builder. build ( ) ?;
218+
219+ while let Some ( batch) = batch_stream. try_next ( ) . await ? {
220+ tx. send ( Ok ( batch) ) . await ?
221+ }
222+
223+ Ok ( ( ) )
145224 }
146225
147226 fn get_arrow_projection_mask (
148- & self ,
149227 field_ids : & [ i32 ] ,
150228 iceberg_schema_of_task : & Schema ,
151229 parquet_schema : & SchemaDescriptor ,
152230 arrow_schema : & ArrowSchemaRef ,
153- ) -> crate :: Result < ProjectionMask > {
231+ ) -> Result < ProjectionMask > {
154232 if field_ids. is_empty ( ) {
155233 Ok ( ProjectionMask :: all ( ) )
156234 } else {
@@ -216,7 +294,6 @@ impl ArrowReader {
216294 }
217295
218296 fn get_row_filter (
219- & self ,
220297 predicates : Option < & BoundPredicate > ,
221298 parquet_schema : & SchemaDescriptor ,
222299 collector : & CollectFieldIdVisitor ,
0 commit comments