@@ -32,7 +32,7 @@ use fnv::FnvHashSet;
3232use futures:: channel:: mpsc:: { channel, Sender } ;
3333use futures:: future:: BoxFuture ;
3434use futures:: { try_join, SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
35- use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , ArrowReaderOptions , RowFilter } ;
35+ use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , ArrowReaderOptions , RowFilter , RowSelection } ;
3636use parquet:: arrow:: async_reader:: { AsyncFileReader , MetadataLoader } ;
3737use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask , PARQUET_FIELD_ID_META_KEY } ;
3838use parquet:: file:: metadata:: ParquetMetaData ;
@@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4141use crate :: arrow:: { arrow_schema_to_schema, get_arrow_datum} ;
4242use crate :: error:: Result ;
4343use crate :: expr:: visitors:: bound_predicate_visitor:: { visit, BoundPredicateVisitor } ;
44+ use crate :: expr:: visitors:: page_index_evaluator:: PageIndexEvaluator ;
4445use crate :: expr:: visitors:: row_group_metrics_evaluator:: RowGroupMetricsEvaluator ;
4546use crate :: expr:: { BoundPredicate , BoundReference } ;
4647use crate :: io:: { FileIO , FileMetadata , FileRead } ;
@@ -56,6 +57,7 @@ pub struct ArrowReaderBuilder {
5657 file_io : FileIO ,
5758 concurrency_limit_data_files : usize ,
5859 row_group_filtering_enabled : bool ,
60+ row_selection_enabled : bool ,
5961}
6062
6163impl ArrowReaderBuilder {
@@ -68,6 +70,7 @@ impl ArrowReaderBuilder {
6870 file_io,
6971 concurrency_limit_data_files : num_cpus,
7072 row_group_filtering_enabled : true ,
73+ row_selection_enabled : false ,
7174 }
7275 }
7376
@@ -90,13 +93,20 @@ impl ArrowReaderBuilder {
9093 self
9194 }
9295
96+ /// Determines whether to enable row selection.
97+ pub fn with_row_selection_enabled ( mut self , row_selection_enabled : bool ) -> Self {
98+ self . row_selection_enabled = row_selection_enabled;
99+ self
100+ }
101+
93102 /// Build the ArrowReader.
94103 pub fn build ( self ) -> ArrowReader {
95104 ArrowReader {
96105 batch_size : self . batch_size ,
97106 file_io : self . file_io ,
98107 concurrency_limit_data_files : self . concurrency_limit_data_files ,
99108 row_group_filtering_enabled : self . row_group_filtering_enabled ,
109+ row_selection_enabled : self . row_selection_enabled ,
100110 }
101111 }
102112}
@@ -111,6 +121,7 @@ pub struct ArrowReader {
111121 concurrency_limit_data_files : usize ,
112122
113123 row_group_filtering_enabled : bool ,
124+ row_selection_enabled : bool ,
114125}
115126
116127impl ArrowReader {
@@ -121,6 +132,7 @@ impl ArrowReader {
121132 let batch_size = self . batch_size ;
122133 let concurrency_limit_data_files = self . concurrency_limit_data_files ;
123134 let row_group_filtering_enabled = self . row_group_filtering_enabled ;
135+ let row_selection_enabled = self . row_selection_enabled ;
124136
125137 let ( tx, rx) = channel ( concurrency_limit_data_files) ;
126138 let mut channel_for_error = tx. clone ( ) ;
@@ -142,6 +154,7 @@ impl ArrowReader {
142154 file_io,
143155 tx,
144156 row_group_filtering_enabled,
157+ row_selection_enabled,
145158 )
146159 . await
147160 } )
@@ -168,6 +181,7 @@ impl ArrowReader {
168181 file_io : FileIO ,
169182 mut tx : Sender < Result < RecordBatch > > ,
170183 row_group_filtering_enabled : bool ,
184+ row_selection_enabled : bool ,
171185 ) -> Result < ( ) > {
172186 // Get the metadata for the Parquet file we need to read and build
173187 // a reader for the data within
@@ -176,11 +190,12 @@ impl ArrowReader {
176190 try_join ! ( parquet_file. metadata( ) , parquet_file. reader( ) ) ?;
177191 let parquet_file_reader = ArrowFileReader :: new ( parquet_metadata, parquet_reader) ;
178192
193+ let should_load_page_index = row_selection_enabled && task. predicate ( ) . is_some ( ) ;
194+
179195 // Start creating the record batch stream, which wraps the parquet file reader
180196 let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder :: new_with_options (
181197 parquet_file_reader,
182- // Page index will be required in upcoming row selection PR
183- ArrowReaderOptions :: new ( ) . with_page_index ( false ) ,
198+ ArrowReaderOptions :: new ( ) . with_page_index ( should_load_page_index) ,
184199 )
185200 . await ?;
186201
@@ -224,6 +239,19 @@ impl ArrowReader {
224239 selected_row_groups = Some ( result) ;
225240 }
226241
242+ if row_selection_enabled {
243+ let row_selection = Self :: get_row_selection (
244+ predicate,
245+ record_batch_stream_builder. metadata ( ) ,
246+ & selected_row_groups,
247+ & field_id_map,
248+ task. schema ( ) ,
249+ ) ?;
250+
251+ record_batch_stream_builder =
252+ record_batch_stream_builder. with_row_selection ( row_selection) ;
253+ }
254+
227255 if let Some ( selected_row_groups) = selected_row_groups {
228256 record_batch_stream_builder =
229257 record_batch_stream_builder. with_row_groups ( selected_row_groups) ;
@@ -377,6 +405,67 @@ impl ArrowReader {
377405
378406 Ok ( results)
379407 }
408+
409+ fn get_row_selection (
410+ predicate : & BoundPredicate ,
411+ parquet_metadata : & Arc < ParquetMetaData > ,
412+ selected_row_groups : & Option < Vec < usize > > ,
413+ field_id_map : & HashMap < i32 , usize > ,
414+ snapshot_schema : & Schema ,
415+ ) -> Result < RowSelection > {
416+ let Some ( column_index) = parquet_metadata. column_index ( ) else {
417+ return Err ( Error :: new (
418+ ErrorKind :: Unexpected ,
419+ "Parquet file metadata does not contain a column index" ,
420+ ) ) ;
421+ } ;
422+
423+ let Some ( offset_index) = parquet_metadata. offset_index ( ) else {
424+ return Err ( Error :: new (
425+ ErrorKind :: Unexpected ,
426+ "Parquet file metadata does not contain an offset index" ,
427+ ) ) ;
428+ } ;
429+
430+ let mut selected_row_groups_idx = 0 ;
431+
432+ let page_index = column_index
433+ . iter ( )
434+ . enumerate ( )
435+ . zip ( offset_index)
436+ . zip ( parquet_metadata. row_groups ( ) ) ;
437+
438+ let mut results = Vec :: new ( ) ;
439+ for ( ( ( idx, column_index) , offset_index) , row_group_metadata) in page_index {
440+ if let Some ( selected_row_groups) = selected_row_groups {
441+ // skip row groups that aren't present in selected_row_groups
442+ if idx == selected_row_groups[ selected_row_groups_idx] {
443+ selected_row_groups_idx += 1 ;
444+ } else {
445+ continue ;
446+ }
447+ }
448+
449+ let selections_for_page = PageIndexEvaluator :: eval (
450+ predicate,
451+ column_index,
452+ offset_index,
453+ row_group_metadata,
454+ field_id_map,
455+ snapshot_schema,
456+ ) ?;
457+
458+ results. push ( selections_for_page) ;
459+
460+ if let Some ( selected_row_groups) = selected_row_groups {
461+ if selected_row_groups_idx == selected_row_groups. len ( ) {
462+ break ;
463+ }
464+ }
465+ }
466+
467+ Ok ( results. into_iter ( ) . flatten ( ) . collect :: < Vec < _ > > ( ) . into ( ) )
468+ }
380469}
381470
382471/// Build the map of parquet field id to Parquet column index in the schema.
0 commit comments