@@ -23,7 +23,7 @@ use std::str::FromStr;
2323use std:: sync:: Arc ;
2424
2525use arrow_arith:: boolean:: { and, is_not_null, is_null, not, or} ;
26- use arrow_array:: { ArrayRef , BooleanArray , RecordBatch } ;
26+ use arrow_array:: { Array , ArrayRef , BooleanArray , RecordBatch } ;
2727use arrow_ord:: cmp:: { eq, gt, gt_eq, lt, lt_eq, neq} ;
2828use arrow_schema:: { ArrowError , DataType , SchemaRef as ArrowSchemaRef } ;
2929use arrow_string:: like:: starts_with;
@@ -32,7 +32,7 @@ use fnv::FnvHashSet;
3232use futures:: channel:: mpsc:: { channel, Sender } ;
3333use futures:: future:: BoxFuture ;
3434use futures:: { try_join, SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
35- use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , RowFilter } ;
35+ use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , ArrowReaderOptions , RowFilter } ;
3636use parquet:: arrow:: async_reader:: { AsyncFileReader , MetadataLoader } ;
3737use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask , PARQUET_FIELD_ID_META_KEY } ;
3838use parquet:: file:: metadata:: ParquetMetaData ;
@@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4141use crate :: arrow:: { arrow_schema_to_schema, get_arrow_datum} ;
4242use crate :: error:: Result ;
4343use crate :: expr:: visitors:: bound_predicate_visitor:: { visit, BoundPredicateVisitor } ;
44+ use crate :: expr:: visitors:: row_group_metrics_evaluator:: RowGroupMetricsEvaluator ;
4445use crate :: expr:: { BoundPredicate , BoundReference } ;
4546use crate :: io:: { FileIO , FileMetadata , FileRead } ;
4647use crate :: runtime:: spawn;
@@ -54,6 +55,7 @@ pub struct ArrowReaderBuilder {
5455 batch_size : Option < usize > ,
5556 file_io : FileIO ,
5657 concurrency_limit_data_files : usize ,
58+ row_group_filtering_enabled : bool ,
5759}
5860
5961impl ArrowReaderBuilder {
@@ -65,13 +67,13 @@ impl ArrowReaderBuilder {
6567 batch_size : None ,
6668 file_io,
6769 concurrency_limit_data_files : num_cpus,
70+ row_group_filtering_enabled : true ,
6871 }
6972 }
7073
7174 /// Sets the max number of in flight data files that are being fetched
7275 pub fn with_data_file_concurrency_limit ( mut self , val : usize ) -> Self {
7376 self . concurrency_limit_data_files = val;
74-
7577 self
7678 }
7779
@@ -82,12 +84,19 @@ impl ArrowReaderBuilder {
8284 self
8385 }
8486
87+ /// Determines whether to enable row group filtering.
88+ pub fn with_row_group_filtering_enabled ( mut self , row_group_filtering_enabled : bool ) -> Self {
89+ self . row_group_filtering_enabled = row_group_filtering_enabled;
90+ self
91+ }
92+
8593 /// Build the ArrowReader.
8694 pub fn build ( self ) -> ArrowReader {
8795 ArrowReader {
8896 batch_size : self . batch_size ,
8997 file_io : self . file_io ,
9098 concurrency_limit_data_files : self . concurrency_limit_data_files ,
99+ row_group_filtering_enabled : self . row_group_filtering_enabled ,
91100 }
92101 }
93102}
@@ -100,6 +109,8 @@ pub struct ArrowReader {
100109
101110 /// the maximum number of data files that can be fetched at the same time
102111 concurrency_limit_data_files : usize ,
112+
113+ row_group_filtering_enabled : bool ,
103114}
104115
105116impl ArrowReader {
@@ -109,6 +120,7 @@ impl ArrowReader {
109120 let file_io = self . file_io . clone ( ) ;
110121 let batch_size = self . batch_size ;
111122 let concurrency_limit_data_files = self . concurrency_limit_data_files ;
123+ let row_group_filtering_enabled = self . row_group_filtering_enabled ;
112124
113125 let ( tx, rx) = channel ( concurrency_limit_data_files) ;
114126 let mut channel_for_error = tx. clone ( ) ;
@@ -124,8 +136,14 @@ impl ArrowReader {
124136 let file_path = task. data_file_path ( ) . to_string ( ) ;
125137
126138 spawn ( async move {
127- Self :: process_file_scan_task ( task, batch_size, file_io, tx)
128- . await
139+ Self :: process_file_scan_task (
140+ task,
141+ batch_size,
142+ file_io,
143+ tx,
144+ row_group_filtering_enabled,
145+ )
146+ . await
129147 } )
130148 . await
131149 . map_err ( |e| e. with_context ( "file_path" , file_path) )
@@ -149,55 +167,95 @@ impl ArrowReader {
149167 batch_size : Option < usize > ,
150168 file_io : FileIO ,
151169 mut tx : Sender < Result < RecordBatch > > ,
170+ row_group_filtering_enabled : bool ,
152171 ) -> Result < ( ) > {
153- // Collect Parquet column indices from field ids
154- let mut collector = CollectFieldIdVisitor {
155- field_ids : HashSet :: default ( ) ,
156- } ;
157-
158- if let Some ( predicates) = task. predicate ( ) {
159- visit ( & mut collector, predicates) ?;
160- }
161-
172+ // Get the metadata for the Parquet file we need to read and build
173+ // a reader for the data within
162174 let parquet_file = file_io. new_input ( task. data_file_path ( ) ) ?;
163-
164175 let ( parquet_metadata, parquet_reader) =
165176 try_join ! ( parquet_file. metadata( ) , parquet_file. reader( ) ) ?;
166- let arrow_file_reader = ArrowFileReader :: new ( parquet_metadata, parquet_reader) ;
177+ let parquet_file_reader = ArrowFileReader :: new ( parquet_metadata, parquet_reader) ;
167178
168- let mut batch_stream_builder =
169- ParquetRecordBatchStreamBuilder :: new ( arrow_file_reader) . await ?;
179+ // Start creating the record batch stream, which wraps the parquet file reader
180+ let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder :: new_with_options (
181+ parquet_file_reader,
182+ // Page index will be required in upcoming row selection PR
183+ ArrowReaderOptions :: new ( ) . with_page_index ( false ) ,
184+ )
185+ . await ?;
170186
171- let parquet_schema = batch_stream_builder . parquet_schema ( ) ;
172- let arrow_schema = batch_stream_builder . schema ( ) ;
187+ // Create a projection mask for the batch stream to select which columns in the
188+ // Parquet file that we want in the response
173189 let projection_mask = Self :: get_arrow_projection_mask (
174190 task. project_field_ids ( ) ,
175191 task. schema ( ) ,
176- parquet_schema,
177- arrow_schema ,
192+ record_batch_stream_builder . parquet_schema ( ) ,
193+ record_batch_stream_builder . schema ( ) ,
178194 ) ?;
179- batch_stream_builder = batch_stream_builder. with_projection ( projection_mask) ;
180-
181- let parquet_schema = batch_stream_builder. parquet_schema ( ) ;
182- let row_filter = Self :: get_row_filter ( task. predicate ( ) , parquet_schema, & collector) ?;
183-
184- if let Some ( row_filter) = row_filter {
185- batch_stream_builder = batch_stream_builder. with_row_filter ( row_filter) ;
186- }
195+ record_batch_stream_builder = record_batch_stream_builder. with_projection ( projection_mask) ;
187196
188197 if let Some ( batch_size) = batch_size {
189- batch_stream_builder = batch_stream_builder . with_batch_size ( batch_size) ;
198+ record_batch_stream_builder = record_batch_stream_builder . with_batch_size ( batch_size) ;
190199 }
191200
192- let mut batch_stream = batch_stream_builder. build ( ) ?;
201+ if let Some ( predicate) = task. predicate ( ) {
202+ let ( iceberg_field_ids, field_id_map) = Self :: build_field_id_set_and_map (
203+ record_batch_stream_builder. parquet_schema ( ) ,
204+ predicate,
205+ ) ?;
206+
207+ let row_filter = Self :: get_row_filter (
208+ predicate,
209+ record_batch_stream_builder. parquet_schema ( ) ,
210+ & iceberg_field_ids,
211+ & field_id_map,
212+ ) ?;
213+ record_batch_stream_builder = record_batch_stream_builder. with_row_filter ( row_filter) ;
214+
215+ let mut selected_row_groups = None ;
216+ if row_group_filtering_enabled {
217+ let result = Self :: get_selected_row_group_indices (
218+ predicate,
219+ record_batch_stream_builder. metadata ( ) ,
220+ & field_id_map,
221+ task. schema ( ) ,
222+ ) ?;
223+
224+ selected_row_groups = Some ( result) ;
225+ }
226+
227+ if let Some ( selected_row_groups) = selected_row_groups {
228+ record_batch_stream_builder =
229+ record_batch_stream_builder. with_row_groups ( selected_row_groups) ;
230+ }
231+ }
193232
194- while let Some ( batch) = batch_stream. try_next ( ) . await ? {
233+ // Build the batch stream and send all the RecordBatches that it generates
234+ // to the requester.
235+ let mut record_batch_stream = record_batch_stream_builder. build ( ) ?;
236+ while let Some ( batch) = record_batch_stream. try_next ( ) . await ? {
195237 tx. send ( Ok ( batch) ) . await ?
196238 }
197239
198240 Ok ( ( ) )
199241 }
200242
243+ fn build_field_id_set_and_map (
244+ parquet_schema : & SchemaDescriptor ,
245+ predicate : & BoundPredicate ,
246+ ) -> Result < ( HashSet < i32 > , HashMap < i32 , usize > ) > {
247+ // Collects all Iceberg field IDs referenced in the filter predicate
248+ let mut collector = CollectFieldIdVisitor {
249+ field_ids : HashSet :: default ( ) ,
250+ } ;
251+ visit ( & mut collector, predicate) ?;
252+
253+ let iceberg_field_ids = collector. field_ids ( ) ;
254+ let field_id_map = build_field_id_map ( parquet_schema) ?;
255+
256+ Ok ( ( iceberg_field_ids, field_id_map) )
257+ }
258+
201259 fn get_arrow_projection_mask (
202260 field_ids : & [ i32 ] ,
203261 iceberg_schema_of_task : & Schema ,
@@ -269,43 +327,59 @@ impl ArrowReader {
269327 }
270328
271329 fn get_row_filter (
272- predicates : Option < & BoundPredicate > ,
330+ predicates : & BoundPredicate ,
273331 parquet_schema : & SchemaDescriptor ,
274- collector : & CollectFieldIdVisitor ,
275- ) -> Result < Option < RowFilter > > {
276- if let Some ( predicates) = predicates {
277- let field_id_map = build_field_id_map ( parquet_schema) ?;
278-
279- // Collect Parquet column indices from field ids.
280- // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
281- let mut column_indices = collector
282- . field_ids
283- . iter ( )
284- . filter_map ( |field_id| field_id_map. get ( field_id) . cloned ( ) )
285- . collect :: < Vec < _ > > ( ) ;
286-
287- column_indices. sort ( ) ;
288-
289- // The converter that converts `BoundPredicates` to `ArrowPredicates`
290- let mut converter = PredicateConverter {
291- parquet_schema,
292- column_map : & field_id_map,
293- column_indices : & column_indices,
294- } ;
295-
296- // After collecting required leaf column indices used in the predicate,
297- // creates the projection mask for the Arrow predicates.
298- let projection_mask = ProjectionMask :: leaves ( parquet_schema, column_indices. clone ( ) ) ;
299- let predicate_func = visit ( & mut converter, predicates) ?;
300- let arrow_predicate = ArrowPredicateFn :: new ( projection_mask, predicate_func) ;
301- Ok ( Some ( RowFilter :: new ( vec ! [ Box :: new( arrow_predicate) ] ) ) )
302- } else {
303- Ok ( None )
332+ iceberg_field_ids : & HashSet < i32 > ,
333+ field_id_map : & HashMap < i32 , usize > ,
334+ ) -> Result < RowFilter > {
335+ // Collect Parquet column indices from field ids.
336+ // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
337+ let mut column_indices = iceberg_field_ids
338+ . iter ( )
339+ . filter_map ( |field_id| field_id_map. get ( field_id) . cloned ( ) )
340+ . collect :: < Vec < _ > > ( ) ;
341+ column_indices. sort ( ) ;
342+
343+ // The converter that converts `BoundPredicates` to `ArrowPredicates`
344+ let mut converter = PredicateConverter {
345+ parquet_schema,
346+ column_map : field_id_map,
347+ column_indices : & column_indices,
348+ } ;
349+
350+ // After collecting required leaf column indices used in the predicate,
351+ // creates the projection mask for the Arrow predicates.
352+ let projection_mask = ProjectionMask :: leaves ( parquet_schema, column_indices. clone ( ) ) ;
353+ let predicate_func = visit ( & mut converter, predicates) ?;
354+ let arrow_predicate = ArrowPredicateFn :: new ( projection_mask, predicate_func) ;
355+ Ok ( RowFilter :: new ( vec ! [ Box :: new( arrow_predicate) ] ) )
356+ }
357+
358+ fn get_selected_row_group_indices (
359+ predicate : & BoundPredicate ,
360+ parquet_metadata : & Arc < ParquetMetaData > ,
361+ field_id_map : & HashMap < i32 , usize > ,
362+ snapshot_schema : & Schema ,
363+ ) -> Result < Vec < usize > > {
364+ let row_groups_metadata = parquet_metadata. row_groups ( ) ;
365+ let mut results = Vec :: with_capacity ( row_groups_metadata. len ( ) ) ;
366+
367+ for ( idx, row_group_metadata) in row_groups_metadata. iter ( ) . enumerate ( ) {
368+ if RowGroupMetricsEvaluator :: eval (
369+ predicate,
370+ row_group_metadata,
371+ field_id_map,
372+ snapshot_schema,
373+ ) ? {
374+ results. push ( idx) ;
375+ }
304376 }
377+
378+ Ok ( results)
305379 }
306380}
307381
308- /// Build the map of field id to Parquet column index in the schema.
382+ /// Build the map of parquet field id to Parquet column index in the schema.
309383fn build_field_id_map ( parquet_schema : & SchemaDescriptor ) -> Result < HashMap < i32 , usize > > {
310384 let mut column_map = HashMap :: new ( ) ;
311385 for ( idx, field) in parquet_schema. columns ( ) . iter ( ) . enumerate ( ) {
@@ -345,6 +419,12 @@ struct CollectFieldIdVisitor {
345419 field_ids : HashSet < i32 > ,
346420}
347421
422+ impl CollectFieldIdVisitor {
423+ fn field_ids ( self ) -> HashSet < i32 > {
424+ self . field_ids
425+ }
426+ }
427+
348428impl BoundPredicateVisitor for CollectFieldIdVisitor {
349429 type T = ( ) ;
350430
0 commit comments