@@ -375,12 +375,7 @@ impl TableScan {
375375 // used to stream the results back to the caller
376376 let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
377377
378- let delete_file_idx_and_tx: Option < ( DeleteFileIndex , Sender < DeleteFileContext > ) > =
379- if self . delete_file_processing_enabled {
380- Some ( DeleteFileIndex :: new ( ) )
381- } else {
382- None
383- } ;
378+ let ( delete_file_idx, delete_file_tx) = DeleteFileIndex :: new ( ) ;
384379
385380 let manifest_list = self . plan_context . get_manifest_list ( ) . await ?;
386381
@@ -390,9 +385,8 @@ impl TableScan {
390385 let manifest_file_contexts = self . plan_context . build_manifest_file_contexts (
391386 manifest_list,
392387 manifest_entry_data_ctx_tx,
393- delete_file_idx_and_tx. as_ref ( ) . map ( |( delete_file_idx, _) | {
394- ( delete_file_idx. clone ( ) , manifest_entry_delete_ctx_tx)
395- } ) ,
388+ delete_file_idx. clone ( ) ,
389+ manifest_entry_delete_ctx_tx,
396390 ) ?;
397391
398392 let mut channel_for_manifest_error = file_scan_task_tx. clone ( ) ;
@@ -411,34 +405,30 @@ impl TableScan {
411405 } ) ;
412406
413407 let mut channel_for_data_manifest_entry_error = file_scan_task_tx. clone ( ) ;
408+ let mut channel_for_delete_manifest_entry_error = file_scan_task_tx. clone ( ) ;
414409
415- if let Some ( ( _, delete_file_tx) ) = delete_file_idx_and_tx {
416- let mut channel_for_delete_manifest_entry_error = file_scan_task_tx. clone ( ) ;
417-
418- // Process the delete file [`ManifestEntry`] stream in parallel
419- spawn ( async move {
420- let result = manifest_entry_delete_ctx_rx
421- . map ( |me_ctx| Ok ( ( me_ctx, delete_file_tx. clone ( ) ) ) )
422- . try_for_each_concurrent (
423- concurrency_limit_manifest_entries,
424- |( manifest_entry_context, tx) | async move {
425- spawn ( async move {
426- Self :: process_delete_manifest_entry ( manifest_entry_context, tx)
427- . await
428- } )
429- . await
430- } ,
431- )
432- . await ;
410+ // Process the delete file [`ManifestEntry`] stream in parallel
411+ spawn ( async move {
412+ let result = manifest_entry_delete_ctx_rx
413+ . map ( |me_ctx| Ok ( ( me_ctx, delete_file_tx. clone ( ) ) ) )
414+ . try_for_each_concurrent (
415+ concurrency_limit_manifest_entries,
416+ |( manifest_entry_context, tx) | async move {
417+ spawn ( async move {
418+ Self :: process_delete_manifest_entry ( manifest_entry_context, tx) . await
419+ } )
420+ . await
421+ } ,
422+ )
423+ . await ;
433424
434- if let Err ( error) = result {
435- let _ = channel_for_delete_manifest_entry_error
436- . send ( Err ( error) )
437- . await ;
438- }
439- } )
440- . await ;
441- }
425+ if let Err ( error) = result {
426+ let _ = channel_for_delete_manifest_entry_error
427+ . send ( Err ( error) )
428+ . await ;
429+ }
430+ } )
431+ . await ;
442432
443433 // Process the data file [`ManifestEntry`] stream in parallel
444434 spawn ( async move {
@@ -460,15 +450,16 @@ impl TableScan {
460450 }
461451 } ) ;
462452
463- return Ok ( file_scan_task_rx. boxed ( ) ) ;
453+ Ok ( file_scan_task_rx. boxed ( ) )
464454 }
465455
466456 /// Returns an [`ArrowRecordBatchStream`].
467457 pub async fn to_arrow ( & self ) -> Result < ArrowRecordBatchStream > {
468458 let mut arrow_reader_builder = ArrowReaderBuilder :: new ( self . file_io . clone ( ) )
469459 . with_data_file_concurrency_limit ( self . concurrency_limit_data_files )
470460 . with_row_group_filtering_enabled ( self . row_group_filtering_enabled )
471- . with_row_selection_enabled ( self . row_selection_enabled ) ;
461+ . with_row_selection_enabled ( self . row_selection_enabled )
462+ . with_delete_file_support_enabled ( self . delete_file_processing_enabled ) ;
472463
473464 if let Some ( batch_size) = self . batch_size {
474465 arrow_reader_builder = arrow_reader_builder. with_batch_size ( batch_size) ;
@@ -608,7 +599,7 @@ struct ManifestFileContext {
608599 object_cache : Arc < ObjectCache > ,
609600 snapshot_schema : SchemaRef ,
610601 expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
611- delete_file_index : Option < DeleteFileIndex > ,
602+ delete_file_index : DeleteFileIndex ,
612603}
613604
614605/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -621,7 +612,7 @@ struct ManifestEntryContext {
621612 bound_predicates : Option < Arc < BoundPredicates > > ,
622613 partition_spec_id : i32 ,
623614 snapshot_schema : SchemaRef ,
624- delete_file_index : Option < DeleteFileIndex > ,
615+ delete_file_index : DeleteFileIndex ,
625616}
626617
627618impl ManifestFileContext {
@@ -668,16 +659,13 @@ impl ManifestEntryContext {
668659 /// consume this `ManifestEntryContext`, returning a `FileScanTask`
669660 /// created from it
670661 async fn into_file_scan_task ( self ) -> Result < FileScanTask > {
671- let deletes = if let Some ( delete_file_index) = self . delete_file_index {
672- delete_file_index
673- . get_deletes_for_data_file (
674- self . manifest_entry . data_file ( ) ,
675- self . manifest_entry . sequence_number ( ) ,
676- )
677- . await ?
678- } else {
679- vec ! [ ]
680- } ;
662+ let deletes = self
663+ . delete_file_index
664+ . get_deletes_for_data_file (
665+ self . manifest_entry . data_file ( ) ,
666+ self . manifest_entry . sequence_number ( ) ,
667+ )
668+ . await ?;
681669
682670 Ok ( FileScanTask {
683671 start : 0 ,
@@ -732,24 +720,19 @@ impl PlanContext {
732720 & self ,
733721 manifest_list : Arc < ManifestList > ,
734722 tx_data : Sender < ManifestEntryContext > ,
735- delete_file_idx_and_tx : Option < ( DeleteFileIndex , Sender < ManifestEntryContext > ) > ,
723+ delete_file_idx : DeleteFileIndex ,
724+ delete_file_tx : Sender < ManifestEntryContext > ,
736725 ) -> Result < Box < impl Iterator < Item = Result < ManifestFileContext > > + ' static > > {
737726 let manifest_files = manifest_list. entries ( ) . iter ( ) ;
738727
739728 // TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
740729 let mut filtered_mfcs = vec ! [ ] ;
741730
742731 for manifest_file in manifest_files {
743- let ( delete_file_idx, tx) = if manifest_file. content == ManifestContentType :: Deletes {
744- let Some ( ( delete_file_idx, tx) ) = delete_file_idx_and_tx. as_ref ( ) else {
745- continue ;
746- } ;
747- ( Some ( delete_file_idx. clone ( ) ) , tx. clone ( ) )
732+ let tx = if manifest_file. content == ManifestContentType :: Deletes {
733+ delete_file_tx. clone ( )
748734 } else {
749- (
750- delete_file_idx_and_tx. as_ref ( ) . map ( |x| x. 0 . clone ( ) ) ,
751- tx_data. clone ( ) ,
752- )
735+ tx_data. clone ( )
753736 } ;
754737
755738 let partition_bound_predicate = if self . predicate . is_some ( ) {
@@ -777,7 +760,7 @@ impl PlanContext {
777760 manifest_file,
778761 partition_bound_predicate,
779762 tx,
780- delete_file_idx,
763+ delete_file_idx. clone ( ) ,
781764 ) ;
782765
783766 filtered_mfcs. push ( Ok ( mfc) ) ;
@@ -791,7 +774,7 @@ impl PlanContext {
791774 manifest_file : & ManifestFile ,
792775 partition_filter : Option < Arc < BoundPredicate > > ,
793776 sender : Sender < ManifestEntryContext > ,
794- delete_file_index : Option < DeleteFileIndex > ,
777+ delete_file_index : DeleteFileIndex ,
795778 ) -> ManifestFileContext {
796779 let bound_predicates =
797780 if let ( Some ( ref partition_bound_predicate) , Some ( snapshot_bound_predicate) ) =
@@ -1003,23 +986,18 @@ impl ExpressionEvaluatorCache {
1003986 ErrorKind :: Unexpected ,
1004987 "ManifestEvaluatorCache RwLock was poisoned" ,
1005988 )
1006- } )
1007- . unwrap ( )
989+ } ) ?
1008990 . insert (
1009991 spec_id,
1010992 Arc :: new ( ExpressionEvaluator :: new ( partition_filter. clone ( ) ) ) ,
1011993 ) ;
1012994
1013- let read = self
1014- . 0
1015- . read ( )
1016- . map_err ( |_| {
1017- Error :: new (
1018- ErrorKind :: Unexpected ,
1019- "ManifestEvaluatorCache RwLock was poisoned" ,
1020- )
1021- } )
1022- . unwrap ( ) ;
995+ let read = self . 0 . read ( ) . map_err ( |_| {
996+ Error :: new (
997+ ErrorKind :: Unexpected ,
998+ "ManifestEvaluatorCache RwLock was poisoned" ,
999+ )
1000+ } ) ?;
10231001
10241002 Ok ( read. get ( & spec_id) . unwrap ( ) . clone ( ) )
10251003 }
0 commit comments