@@ -186,15 +186,15 @@ impl FileOpener for ParquetOpener {
186186 if let Ok ( rewritten) = filter_rewriter
187187 . rewrite ( Arc :: clone ( & file_schema) , original_predicate)
188188 {
189- let mut filter_schema_builder = FilterSchemaBuilder :: new (
190- & file_schema,
191- & table_schema,
192- ) ;
189+ let mut filter_schema_builder =
190+ FilterSchemaBuilder :: new ( & file_schema, & table_schema) ;
193191 rewritten. visit ( & mut filter_schema_builder) ?;
194192 filter_schema = filter_schema_builder. build ( ) ;
195193 // If we rewrote the filter we need to recompute the pruning predicates to match the new filter.
196- page_pruning_predicate =
197- Some ( build_page_pruning_predicate ( & rewritten, & filter_schema) ) ;
194+ page_pruning_predicate = Some ( build_page_pruning_predicate (
195+ & rewritten,
196+ & filter_schema,
197+ ) ) ;
198198 pruning_predicate = build_pruning_predicate (
199199 rewritten. clone ( ) ,
200200 & filter_schema,
@@ -409,24 +409,33 @@ impl<'schema> FilterSchemaBuilder<'schema> {
409409 }
410410 }
411411
412- fn sort_fields ( fields : & mut Vec < Arc < Field > > , table_schema : & Schema , file_schema : & Schema ) {
412+ fn sort_fields (
413+ fields : & mut Vec < Arc < Field > > ,
414+ table_schema : & Schema ,
415+ file_schema : & Schema ,
416+ ) {
413417 fields. sort_by_key ( |f| f. name ( ) . to_string ( ) ) ;
414418 fields. dedup_by_key ( |f| f. name ( ) . to_string ( ) ) ;
415419 fields. sort_by_key ( |f| {
416- let table_schema_index = table_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
417- let file_schema_index = file_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
420+ let table_schema_index =
421+ table_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
422+ let file_schema_index =
423+ file_schema. index_of ( f. name ( ) ) . unwrap_or ( std:: usize:: MAX ) ;
418424 ( table_schema_index, file_schema_index)
419425 } ) ;
420426 }
421427
422428 fn build ( self ) -> SchemaRef {
423429 let mut fields = self . filter_schema_fields . into_iter ( ) . collect :: < Vec < _ > > ( ) ;
424- FilterSchemaBuilder :: sort_fields ( & mut fields, self . table_schema , self . file_schema ) ;
430+ FilterSchemaBuilder :: sort_fields (
431+ & mut fields,
432+ self . table_schema ,
433+ self . file_schema ,
434+ ) ;
425435 Arc :: new ( Schema :: new ( fields) )
426436 }
427437}
428438
429-
430439impl < ' node > TreeNodeVisitor < ' node > for FilterSchemaBuilder < ' _ > {
431440 type Node = Arc < dyn PhysicalExpr > ;
432441
@@ -441,19 +450,33 @@ impl<'node> TreeNodeVisitor<'node> for FilterSchemaBuilder<'_> {
441450 self . filter_schema_fields . insert ( Arc :: new ( field. clone ( ) ) ) ;
442451 } else {
443452 // valid fields are the table schema's fields + the file schema's fields, preferring the table schema's fields when there is a conflict
444- let mut valid_fields = self . table_schema . fields ( ) . iter ( ) . chain ( self . file_schema . fields ( ) . iter ( ) ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
445- FilterSchemaBuilder :: sort_fields ( & mut valid_fields, self . table_schema , self . file_schema ) ;
446- let valid_fields = valid_fields. into_iter ( ) . map ( |f| datafusion_common:: Column :: new_unqualified ( f. name ( ) ) ) . collect ( ) ;
453+ let mut valid_fields = self
454+ . table_schema
455+ . fields ( )
456+ . iter ( )
457+ . chain ( self . file_schema . fields ( ) . iter ( ) )
458+ . cloned ( )
459+ . collect :: < Vec < _ > > ( ) ;
460+ FilterSchemaBuilder :: sort_fields (
461+ & mut valid_fields,
462+ self . table_schema ,
463+ self . file_schema ,
464+ ) ;
465+ let valid_fields = valid_fields
466+ . into_iter ( )
467+ . map ( |f| datafusion_common:: Column :: new_unqualified ( f. name ( ) ) )
468+ . collect ( ) ;
447469 let field = datafusion_common:: Column :: new_unqualified ( column. name ( ) ) ;
448- return Err (
449- datafusion_common:: DataFusionError :: SchemaError (
450- SchemaError :: FieldNotFound { field : Box :: new ( field) , valid_fields } ,
451- Box :: new ( None ) ,
452- )
453- )
470+ return Err ( datafusion_common:: DataFusionError :: SchemaError (
471+ SchemaError :: FieldNotFound {
472+ field : Box :: new ( field) ,
473+ valid_fields,
474+ } ,
475+ Box :: new ( None ) ,
476+ ) ) ;
454477 }
455478 }
456479
457480 Ok ( TreeNodeRecursion :: Continue )
458481 }
459- }
482+ }
0 commit comments