@@ -21,7 +21,6 @@ use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
2121use crate :: expr:: visitors:: bound_predicate_visitor:: { visit, BoundPredicateVisitor } ;
2222use crate :: expr:: { BoundPredicate , BoundReference } ;
2323use crate :: io:: { FileIO , FileMetadata , FileRead } ;
24- use crate :: scan:: FileScanTask ;
2524use crate :: scan:: { ArrowRecordBatchStream , FileScanTaskStream } ;
2625use crate :: spec:: { Datum , SchemaRef } ;
2726use crate :: Result ;
@@ -30,13 +29,12 @@ use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
3029use arrow_array:: { ArrayRef , BooleanArray , RecordBatch } ;
3130use arrow_ord:: cmp:: { eq, gt, gt_eq, lt, lt_eq, neq} ;
3231use arrow_schema:: { ArrowError , DataType , SchemaRef as ArrowSchemaRef } ;
32+ use async_stream:: try_stream;
3333use bytes:: Bytes ;
3434use fnv:: FnvHashSet ;
35- use futures:: channel:: mpsc:: { channel, Sender } ;
3635use futures:: future:: BoxFuture ;
3736use futures:: stream:: StreamExt ;
3837use futures:: { try_join, TryFutureExt } ;
39- use futures:: { SinkExt , TryStreamExt } ;
4038use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , RowFilter } ;
4139use parquet:: arrow:: async_reader:: { AsyncFileReader , MetadataLoader } ;
4240use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask , PARQUET_FIELD_ID_META_KEY } ;
@@ -46,15 +44,11 @@ use std::collections::{HashMap, HashSet};
4644use std:: ops:: Range ;
4745use std:: str:: FromStr ;
4846use std:: sync:: Arc ;
49- use tokio:: spawn;
50-
51- const CHANNEL_BUFFER_SIZE : usize = 10 ;
52- const CONCURRENCY_LIMIT_TASKS : usize = 10 ;
5347
5448/// Builder to create ArrowReader
5549pub struct ArrowReaderBuilder {
5650 batch_size : Option < usize > ,
57- field_ids : Arc < Vec < usize > > ,
51+ field_ids : Vec < usize > ,
5852 file_io : FileIO ,
5953 schema : SchemaRef ,
6054 predicate : Option < BoundPredicate > ,
@@ -65,7 +59,7 @@ impl ArrowReaderBuilder {
6559 pub fn new ( file_io : FileIO , schema : SchemaRef ) -> Self {
6660 ArrowReaderBuilder {
6761 batch_size : None ,
68- field_ids : Arc :: new ( vec ! [ ] ) ,
62+ field_ids : vec ! [ ] ,
6963 file_io,
7064 schema,
7165 predicate : None ,
@@ -81,10 +75,7 @@ impl ArrowReaderBuilder {
8175
8276 /// Sets the desired column projection with a list of field ids.
8377 pub fn with_field_ids ( mut self , field_ids : impl IntoIterator < Item = usize > ) -> Self {
84- let field_ids = field_ids. into_iter ( ) . collect ( ) ;
85- let field_ids_arc = Arc :: new ( field_ids) ;
86- self . field_ids = field_ids_arc;
87-
78+ self . field_ids = field_ids. into_iter ( ) . collect ( ) ;
8879 self
8980 }
9081
@@ -109,7 +100,7 @@ impl ArrowReaderBuilder {
109100/// Reads data from Parquet files
110101pub struct ArrowReader {
111102 batch_size : Option < usize > ,
112- field_ids : Arc < Vec < usize > > ,
103+ field_ids : Vec < usize > ,
113104 #[ allow( dead_code) ]
114105 schema : SchemaRef ,
115106 file_io : FileIO ,
@@ -119,9 +110,7 @@ pub struct ArrowReader {
119110impl ArrowReader {
120111 /// Take a stream of FileScanTasks and reads all the files.
121112 /// Returns a stream of Arrow RecordBatches containing the data from the files
122- pub fn read ( self , tasks : FileScanTaskStream ) -> Result < ArrowRecordBatchStream > {
123- let ( sender, receiver) = channel ( CHANNEL_BUFFER_SIZE ) ;
124-
113+ pub fn read ( self , mut tasks : FileScanTaskStream ) -> Result < ArrowRecordBatchStream > {
125114 // Collect Parquet column indices from field ids
126115 let mut collector = CollectFieldIdVisitor {
127116 field_ids : HashSet :: default ( ) ,
@@ -130,116 +119,44 @@ impl ArrowReader {
130119 visit ( & mut collector, predicates) ?;
131120 }
132121
133- let tasks = tasks. map ( move |task| self . build_file_scan_task_context ( task, sender. clone ( ) ) ) ;
134-
135- spawn ( async move {
136- tasks
137- . try_for_each_concurrent ( CONCURRENCY_LIMIT_TASKS , Self :: process_file_scan_task)
138- . await
139- } ) ;
140-
141- Ok ( receiver. boxed ( ) )
142- }
143-
144- fn build_file_scan_task_context (
145- & self ,
146- task : Result < FileScanTask > ,
147- sender : Sender < Result < RecordBatch > > ,
148- ) -> Result < FileScanTaskContext > {
149- Ok ( FileScanTaskContext :: new (
150- task?,
151- self . file_io . clone ( ) ,
152- sender,
153- self . batch_size ,
154- self . field_ids . clone ( ) ,
155- self . schema . clone ( ) ,
156- self . predicate . clone ( ) ,
157- ) )
158- }
159-
160- async fn process_file_scan_task ( mut context : FileScanTaskContext ) -> Result < ( ) > {
161- let file_scan_task = context. take_task ( ) ;
162-
163- // Collect Parquet column indices from field ids
164- let mut collector = CollectFieldIdVisitor {
165- field_ids : HashSet :: default ( ) ,
166- } ;
167- if let Some ( predicate) = & context. predicate {
168- visit ( & mut collector, predicate) ?;
169- }
170-
171- let parquet_file = context
172- . file_io
173- . new_input ( file_scan_task. data ( ) . data_file ( ) . file_path ( ) ) ?;
174- let ( parquet_metadata, parquet_reader) =
175- try_join ! ( parquet_file. metadata( ) , parquet_file. reader( ) ) ?;
176- let arrow_file_reader = ArrowFileReader :: new ( parquet_metadata, parquet_reader) ;
177-
178- let mut batch_stream_builder =
179- ParquetRecordBatchStreamBuilder :: new ( arrow_file_reader) . await ?;
122+ Ok ( try_stream ! {
123+ while let Some ( Ok ( task) ) = tasks. next( ) . await {
180124
181- let parquet_schema = batch_stream_builder. parquet_schema ( ) ;
182- let arrow_schema = batch_stream_builder. schema ( ) ;
125+ let parquet_file = self
126+ . file_io
127+ . new_input( task. data( ) . data_file( ) . file_path( ) ) ?;
128+ let ( parquet_metadata, parquet_reader) =
129+ try_join!( parquet_file. metadata( ) , parquet_file. reader( ) ) ?;
130+ let arrow_file_reader = ArrowFileReader :: new( parquet_metadata, parquet_reader) ;
183131
184- let projection_mask = context . get_arrow_projection_mask ( parquet_schema , arrow_schema ) ? ;
185- batch_stream_builder = batch_stream_builder . with_projection ( projection_mask ) ;
132+ let mut batch_stream_builder =
133+ ParquetRecordBatchStreamBuilder :: new ( arrow_file_reader ) . await ? ;
186134
187- let parquet_schema = batch_stream_builder. parquet_schema ( ) ;
188- let row_filter = context. get_row_filter ( parquet_schema, & collector) ?;
189-
190- if let Some ( row_filter) = row_filter {
191- batch_stream_builder = batch_stream_builder. with_row_filter ( row_filter) ;
192- }
135+ let parquet_schema = batch_stream_builder. parquet_schema( ) ;
136+ let arrow_schema = batch_stream_builder. schema( ) ;
193137
194- if let Some ( batch_size) = context. batch_size {
195- batch_stream_builder = batch_stream_builder. with_batch_size ( batch_size) ;
196- }
138+ let projection_mask = self . get_arrow_projection_mask( parquet_schema, arrow_schema) ?;
139+ batch_stream_builder = batch_stream_builder. with_projection( projection_mask) ;
197140
198- let mut batch_stream = batch_stream_builder. build ( ) ?;
141+ let parquet_schema = batch_stream_builder. parquet_schema( ) ;
142+ let row_filter = self . get_row_filter( parquet_schema, & collector) ?;
199143
200- while let Some ( batch ) = batch_stream . next ( ) . await {
201- context . sender . send ( Ok ( batch? ) ) . await ? ;
202- }
144+ if let Some ( row_filter ) = row_filter {
145+ batch_stream_builder = batch_stream_builder . with_row_filter ( row_filter ) ;
146+ }
203147
204- Ok ( ( ) )
205- }
206- }
148+ if let Some ( batch_size ) = self . batch_size {
149+ batch_stream_builder = batch_stream_builder . with_batch_size ( batch_size ) ;
150+ }
207151
208- struct FileScanTaskContext {
209- file_scan_task : Option < FileScanTask > ,
210- file_io : FileIO ,
211- sender : Sender < Result < RecordBatch > > ,
212- batch_size : Option < usize > ,
213- field_ids : Arc < Vec < usize > > ,
214- schema : SchemaRef ,
215- predicate : Option < BoundPredicate > ,
216- }
152+ let mut batch_stream = batch_stream_builder. build( ) ?;
217153
218- impl FileScanTaskContext {
219- fn new (
220- file_scan_task : FileScanTask ,
221- file_io : FileIO ,
222- sender : Sender < Result < RecordBatch > > ,
223- batch_size : Option < usize > ,
224- field_ids : Arc < Vec < usize > > ,
225- schema : SchemaRef ,
226- predicate : Option < BoundPredicate > ,
227- ) -> Self {
228- FileScanTaskContext {
229- file_scan_task : Some ( file_scan_task) ,
230- file_io,
231- sender,
232- batch_size,
233- field_ids,
234- schema,
235- predicate,
154+ while let Some ( batch) = batch_stream. next( ) . await {
155+ yield batch?;
156+ }
157+ }
236158 }
237- }
238-
239- fn take_task ( & mut self ) -> FileScanTask {
240- let mut result = None ;
241- std:: mem:: swap ( & mut self . file_scan_task , & mut result) ;
242- result. unwrap ( )
159+ . boxed ( ) )
243160 }
244161
245162 fn get_arrow_projection_mask (
@@ -297,8 +214,8 @@ impl FileScanTaskContext {
297214 }
298215
299216 let mut indices = vec ! [ ] ;
300- for field_id in self . field_ids . as_ref ( ) {
301- if let Some ( col_idx) = column_map. get ( & ( * field_id as i32 ) ) {
217+ for & field_id in & self . field_ids {
218+ if let Some ( col_idx) = column_map. get ( & ( field_id as i32 ) ) {
302219 indices. push ( * col_idx) ;
303220 } else {
304221 return Err ( Error :: new (
0 commit comments