@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
2121use arrow_array:: Array ;
2222use arrow_array:: { RecordBatch , RecordBatchReader } ;
2323use arrow_schema:: { ArrowError , DataType as ArrowType , Schema , SchemaRef } ;
24- use arrow_select:: filter:: prep_null_mask_filter;
2524pub use filter:: { ArrowPredicate , ArrowPredicateFn , RowFilter } ;
2625pub use selection:: { RowSelection , RowSelector } ;
27- use std:: collections:: VecDeque ;
2826use std:: sync:: Arc ;
2927
3028pub use crate :: arrow:: array_reader:: RowGroups ;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3937use crate :: file:: reader:: { ChunkReader , SerializedPageReader } ;
4038use crate :: schema:: types:: SchemaDescriptor ;
4139
40+ use read_plan:: { ReadPlan , ReadPlanBuilder } ;
41+
4242mod filter;
43+ pub ( crate ) mod read_plan;
4344mod selection;
4445pub mod statistics;
4546
@@ -679,38 +680,33 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
679680 } ;
680681
681682 let mut filter = self . filter ;
682- let mut selection = self . selection ;
683+ let mut plan_builder = ReadPlanBuilder :: new ( batch_size ) . with_selection ( self . selection ) ;
683684
685+ // Update selection based on any filters
684686 if let Some ( filter) = filter. as_mut ( ) {
685687 for predicate in filter. predicates . iter_mut ( ) {
686- if !selects_any ( selection. as_ref ( ) ) {
688+ // break early if we have already ruled out all rows
689+ if !plan_builder. selects_any ( ) {
687690 break ;
688691 }
689692
693+ // TODO move this into the read_plan
690694 let array_reader =
691695 build_array_reader ( self . fields . as_deref ( ) , predicate. projection ( ) , & reader) ?;
692696
693- selection = Some ( evaluate_predicate (
694- batch_size,
695- array_reader,
696- selection,
697- predicate. as_mut ( ) ,
698- ) ?) ;
697+ plan_builder = plan_builder. with_predicate ( array_reader, predicate. as_mut ( ) ) ?;
699698 }
700699 }
701700
702701 let array_reader = build_array_reader ( self . fields . as_deref ( ) , & self . projection , & reader) ?;
702+ let read_plan = plan_builder
703+ . limited ( reader. num_rows ( ) )
704+ . with_offset ( self . offset )
705+ . with_limit ( self . limit )
706+ . build_limited ( )
707+ . build ( ) ;
703708
704- // If selection is empty, truncate
705- if !selects_any ( selection. as_ref ( ) ) {
706- selection = Some ( RowSelection :: from ( vec ! [ ] ) ) ;
707- }
708-
709- Ok ( ParquetRecordBatchReader :: new (
710- batch_size,
711- array_reader,
712- apply_range ( selection, reader. num_rows ( ) , self . offset , self . limit ) ,
713- ) )
709+ Ok ( ParquetRecordBatchReader :: new ( array_reader, read_plan) )
714710 }
715711}
716712
@@ -789,20 +785,20 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
789785/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790786/// read from a parquet data source
791787pub struct ParquetRecordBatchReader {
792- batch_size : usize ,
793788 array_reader : Box < dyn ArrayReader > ,
794789 schema : SchemaRef ,
795- selection : Option < VecDeque < RowSelector > > ,
790+ read_plan : ReadPlan ,
796791}
797792
798793impl Iterator for ParquetRecordBatchReader {
799794 type Item = Result < RecordBatch , ArrowError > ;
800795
801796 fn next ( & mut self ) -> Option < Self :: Item > {
802797 let mut read_records = 0 ;
803- match self . selection . as_mut ( ) {
798+ let batch_size = self . batch_size ( ) ;
799+ match self . read_plan . selection_mut ( ) {
804800 Some ( selection) => {
805- while read_records < self . batch_size && !selection. is_empty ( ) {
801+ while read_records < batch_size && !selection. is_empty ( ) {
806802 let front = selection. pop_front ( ) . unwrap ( ) ;
807803 if front. skip {
808804 let skipped = match self . array_reader . skip_records ( front. row_count ) {
@@ -828,7 +824,7 @@ impl Iterator for ParquetRecordBatchReader {
828824 }
829825
830826 // try to read record
831- let need_read = self . batch_size - read_records;
827+ let need_read = batch_size - read_records;
832828 let to_read = match front. row_count . checked_sub ( need_read) {
833829 Some ( remaining) if remaining != 0 => {
834830 // if page row count less than batch_size we must set batch size to page row count.
@@ -846,7 +842,7 @@ impl Iterator for ParquetRecordBatchReader {
846842 }
847843 }
848844 None => {
849- if let Err ( error) = self . array_reader . read_records ( self . batch_size ) {
845+ if let Err ( error) = self . array_reader . read_records ( self . batch_size ( ) ) {
850846 return Some ( Err ( error. into ( ) ) ) ;
851847 }
852848 }
@@ -903,116 +899,37 @@ impl ParquetRecordBatchReader {
903899 let array_reader =
904900 build_array_reader ( levels. levels . as_ref ( ) , & ProjectionMask :: all ( ) , row_groups) ?;
905901
902+ let read_plan = ReadPlanBuilder :: new ( batch_size)
903+ . with_selection ( selection)
904+ . build ( ) ;
905+
906906 Ok ( Self {
907- batch_size,
908907 array_reader,
909908 schema : Arc :: new ( Schema :: new ( levels. fields . clone ( ) ) ) ,
910- selection : selection . map ( |s| s . trim ( ) . into ( ) ) ,
909+ read_plan ,
911910 } )
912911 }
913912
914913 /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
915914 /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
916915 /// all rows will be returned
917- pub ( crate ) fn new (
918- batch_size : usize ,
919- array_reader : Box < dyn ArrayReader > ,
920- selection : Option < RowSelection > ,
921- ) -> Self {
916+ pub ( crate ) fn new ( array_reader : Box < dyn ArrayReader > , read_plan : ReadPlan ) -> Self {
922917 let schema = match array_reader. get_data_type ( ) {
923918 ArrowType :: Struct ( ref fields) => Schema :: new ( fields. clone ( ) ) ,
924919 _ => unreachable ! ( "Struct array reader's data type is not struct!" ) ,
925920 } ;
926921
927922 Self {
928- batch_size,
929923 array_reader,
930924 schema : Arc :: new ( schema) ,
931- selection : selection . map ( |s| s . trim ( ) . into ( ) ) ,
925+ read_plan ,
932926 }
933927 }
934- }
935928
936- /// Returns `true` if `selection` is `None` or selects some rows
937- pub ( crate ) fn selects_any ( selection : Option < & RowSelection > ) -> bool {
938- selection. map ( |x| x. selects_any ( ) ) . unwrap_or ( true )
939- }
940-
941- /// Applies an optional offset and limit to an optional [`RowSelection`]
942- pub ( crate ) fn apply_range (
943- mut selection : Option < RowSelection > ,
944- row_count : usize ,
945- offset : Option < usize > ,
946- limit : Option < usize > ,
947- ) -> Option < RowSelection > {
948- // If an offset is defined, apply it to the `selection`
949- if let Some ( offset) = offset {
950- selection = Some ( match row_count. checked_sub ( offset) {
951- None => RowSelection :: from ( vec ! [ ] ) ,
952- Some ( remaining) => selection
953- . map ( |selection| selection. offset ( offset) )
954- . unwrap_or_else ( || {
955- RowSelection :: from ( vec ! [
956- RowSelector :: skip( offset) ,
957- RowSelector :: select( remaining) ,
958- ] )
959- } ) ,
960- } ) ;
961- }
962-
963- // If a limit is defined, apply it to the final `selection`
964- if let Some ( limit) = limit {
965- selection = Some (
966- selection
967- . map ( |selection| selection. limit ( limit) )
968- . unwrap_or_else ( || {
969- RowSelection :: from ( vec ! [ RowSelector :: select( limit. min( row_count) ) ] )
970- } ) ,
971- ) ;
972- }
973- selection
974- }
975-
976- /// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
977- /// which rows to return.
978- ///
979- /// `input_selection`: Optional pre-existing selection. If `Some`, then the
980- /// final [`RowSelection`] will be the conjunction of it and the rows selected
981- /// by `predicate`.
982- ///
983- /// Note: A pre-existing selection may come from evaluating a previous predicate
984- /// or if the [`ParquetRecordBatchReader`] specified an explicit
985- /// [`RowSelection`] in addition to one or more predicates.
986- pub ( crate ) fn evaluate_predicate (
987- batch_size : usize ,
988- array_reader : Box < dyn ArrayReader > ,
989- input_selection : Option < RowSelection > ,
990- predicate : & mut dyn ArrowPredicate ,
991- ) -> Result < RowSelection > {
992- let reader = ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) ) ;
993- let mut filters = vec ! [ ] ;
994- for maybe_batch in reader {
995- let maybe_batch = maybe_batch?;
996- let input_rows = maybe_batch. num_rows ( ) ;
997- let filter = predicate. evaluate ( maybe_batch) ?;
998- // Since user supplied predicate, check error here to catch bugs quickly
999- if filter. len ( ) != input_rows {
1000- return Err ( arrow_err ! (
1001- "ArrowPredicate predicate returned {} rows, expected {input_rows}" ,
1002- filter. len( )
1003- ) ) ;
1004- }
1005- match filter. null_count ( ) {
1006- 0 => filters. push ( filter) ,
1007- _ => filters. push ( prep_null_mask_filter ( & filter) ) ,
1008- } ;
929+ #[ inline( always) ]
930+ pub ( crate ) fn batch_size ( & self ) -> usize {
931+ self . read_plan . batch_size ( )
1009932 }
1010-
1011- let raw = RowSelection :: from_filters ( & filters) ;
1012- Ok ( match input_selection {
1013- Some ( selection) => selection. and_then ( & raw ) ,
1014- None => raw,
1015- } )
1016933}
1017934
1018935#[ cfg( test) ]
@@ -3991,7 +3908,7 @@ mod tests {
39913908 . build ( )
39923909 . unwrap ( ) ;
39933910 assert_ne ! ( 1024 , num_rows) ;
3994- assert_eq ! ( reader. batch_size, num_rows as usize ) ;
3911+ assert_eq ! ( reader. read_plan . batch_size( ) , num_rows as usize ) ;
39953912 }
39963913
39973914 #[ test]
0 commit comments