@@ -282,7 +282,7 @@ impl ArrowReader {
282282 let delete_row_selection = Self :: build_deletes_row_selection (
283283 record_batch_stream_builder. metadata ( ) . row_groups ( ) ,
284284 & selected_row_group_indices,
285- positional_delete_indexes,
285+ positional_delete_indexes. as_ref ( ) ,
286286 ) ?;
287287
288288 // merge the row selection from the delete files with the row selection
@@ -345,17 +345,19 @@ impl ArrowReader {
345345 /// as having been deleted by a positional delete, taking into account any row groups that have
346346 /// been skipped entirely by the filter predicate
347347 fn build_deletes_row_selection (
348- row_group_metadata : & [ RowGroupMetaData ] ,
348+ row_group_metadata_list : & [ RowGroupMetaData ] ,
349349 selected_row_groups : & Option < Vec < usize > > ,
350- mut positional_deletes : DeleteVector ,
350+ positional_deletes : & DeleteVector ,
351351 ) -> Result < RowSelection > {
352352 let mut results: Vec < RowSelector > = Vec :: new ( ) ;
353353 let mut selected_row_groups_idx = 0 ;
354- let mut current_page_base_idx: u64 = 0 ;
354+ let mut current_row_group_base_idx: u64 = 0 ;
355+ let mut delete_vector_iter = positional_deletes. iter ( ) ;
356+ let mut next_deleted_row_idx_opt = delete_vector_iter. next ( ) ;
355357
356- for ( idx, row_group_metadata) in row_group_metadata . iter ( ) . enumerate ( ) {
357- let page_num_rows = row_group_metadata. num_rows ( ) as u64 ;
358- let next_page_base_idx = current_page_base_idx + page_num_rows ;
358+ for ( idx, row_group_metadata) in row_group_metadata_list . iter ( ) . enumerate ( ) {
359+ let row_group_num_rows = row_group_metadata. num_rows ( ) as u64 ;
360+ let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows ;
359361
360362 // if row group selection is enabled,
361363 if let Some ( selected_row_groups) = selected_row_groups {
@@ -372,36 +374,37 @@ impl ArrowReader {
372374 } else {
373375 // remove any positional deletes from the skipped page so that
374376 // `positional.deletes.min()` can be used
375- positional_deletes. remove_range ( current_page_base_idx..next_page_base_idx) ;
377+ delete_vector_iter. advance_to ( next_row_group_base_idx) ;
378+ next_deleted_row_idx_opt = delete_vector_iter. next ( ) ;
376379
377380 // still increment the current page base index but then skip to the next row group
378381 // in the file
379- current_page_base_idx += page_num_rows ;
382+ current_row_group_base_idx += row_group_num_rows ;
380383 continue ;
381384 }
382385 }
383386
384- let mut next_deleted_row_idx = match positional_deletes . min ( ) {
387+ let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
385388 Some ( next_deleted_row_idx) => {
386- // if the index of the next deleted row is beyond this page , add a selection for
387- // the remainder of this page and skip to the next page
388- if next_deleted_row_idx >= next_page_base_idx {
389- results. push ( RowSelector :: select ( page_num_rows as usize ) ) ;
389+ // if the index of the next deleted row is beyond this row group , add a selection for
390+ // the remainder of this row group and skip to the next row group
391+ if next_deleted_row_idx >= next_row_group_base_idx {
392+ results. push ( RowSelector :: select ( row_group_num_rows as usize ) ) ;
390393 continue ;
391394 }
392395
393396 next_deleted_row_idx
394397 }
395398
396- // If there are no more pos deletes, add a selector for the entirety of this page .
399+ // If there are no more pos deletes, add a selector for the entirety of this row group .
397400 _ => {
398- results. push ( RowSelector :: select ( page_num_rows as usize ) ) ;
401+ results. push ( RowSelector :: select ( row_group_num_rows as usize ) ) ;
399402 continue ;
400403 }
401404 } ;
402405
403- let mut current_idx = current_page_base_idx ;
404- ' chunks: while next_deleted_row_idx < next_page_base_idx {
406+ let mut current_idx = current_row_group_base_idx ;
407+ ' chunks: while next_deleted_row_idx < next_row_group_base_idx {
405408 // `select` all rows that precede the next delete index
406409 if current_idx < next_deleted_row_idx {
407410 let run_length = next_deleted_row_idx - current_idx;
@@ -412,18 +415,18 @@ impl ArrowReader {
412415 // `skip` all consecutive deleted rows in the current row group
413416 let mut run_length = 0 ;
414417 while next_deleted_row_idx == current_idx
415- && next_deleted_row_idx < next_page_base_idx
418+ && next_deleted_row_idx < next_row_group_base_idx
416419 {
417420 run_length += 1 ;
418421 current_idx += 1 ;
419- positional_deletes. remove ( next_deleted_row_idx) ;
420422
421- next_deleted_row_idx = match positional_deletes. min ( ) {
423+ next_deleted_row_idx_opt = delete_vector_iter. next ( ) ;
424+ next_deleted_row_idx = match next_deleted_row_idx_opt {
422425 Some ( next_deleted_row_idx) => next_deleted_row_idx,
423426 _ => {
424427 // We've processed the final positional delete.
425428 // Conclude the skip and then break so that we select the remaining
426- // rows in the page and move on to the next row group
429+ // rows in the row group and move on to the next row group
427430 results. push ( RowSelector :: skip ( run_length) ) ;
428431 break ' chunks;
429432 }
@@ -432,13 +435,13 @@ impl ArrowReader {
432435 results. push ( RowSelector :: skip ( run_length) ) ;
433436 }
434437
435- if current_idx < next_page_base_idx {
438+ if current_idx < next_row_group_base_idx {
436439 results. push ( RowSelector :: select (
437- ( next_page_base_idx - current_idx) as usize ,
440+ ( next_row_group_base_idx - current_idx) as usize ,
438441 ) ) ;
439442 }
440443
441- current_page_base_idx += page_num_rows ;
444+ current_row_group_base_idx += row_group_num_rows ;
442445 }
443446
444447 Ok ( results. into ( ) )
@@ -1375,18 +1378,19 @@ mod tests {
13751378 use arrow_array:: { ArrayRef , RecordBatch , StringArray } ;
13761379 use arrow_schema:: { DataType , Field , Schema as ArrowSchema , TimeUnit } ;
13771380 use futures:: TryStreamExt ;
1381+ use parquet:: arrow:: arrow_reader:: { RowSelection , RowSelector } ;
13781382 use parquet:: arrow:: { ArrowWriter , ProjectionMask } ;
13791383 use parquet:: basic:: Compression ;
1380- use parquet:: file:: properties:: WriterProperties ;
1381- use parquet:: arrow:: arrow_reader:: { RowSelection , RowSelector } ;
13821384 use parquet:: file:: metadata:: { ColumnChunkMetaData , RowGroupMetaData } ;
1385+ use parquet:: file:: properties:: WriterProperties ;
13831386 use parquet:: schema:: parser:: parse_message_type;
1384- use tempfile:: TempDir ;
13851387 use parquet:: schema:: types:: { SchemaDescPtr , SchemaDescriptor } ;
13861388 use roaring:: RoaringTreemap ;
1389+ use tempfile:: TempDir ;
13871390
13881391 use crate :: arrow:: reader:: { CollectFieldIdVisitor , PARQUET_FIELD_ID_META_KEY } ;
13891392 use crate :: arrow:: { ArrowReader , ArrowReaderBuilder } ;
1393+ use crate :: delete_vector:: DeleteVector ;
13901394 use crate :: expr:: visitors:: bound_predicate_visitor:: visit;
13911395 use crate :: expr:: { Bind , Predicate , Reference } ;
13921396 use crate :: io:: FileIO ;
@@ -1733,16 +1737,14 @@ message schema {
17331737 2999 , // single item at end of selected rg3 (1)
17341738 3000 , // single item at start of skipped rg4
17351739 ] ) ;
1736-
1737- let positional_deletes = DeleteVector {
1738- inner : positional_deletes
1739- } ;
1740+
1741+ let positional_deletes = DeleteVector :: new ( positional_deletes) ;
17401742
17411743 // using selected row groups 1 and 3
17421744 let result = ArrowReader :: build_deletes_row_selection (
17431745 & row_groups_metadata,
17441746 & selected_row_groups,
1745- positional_deletes. clone ( ) ,
1747+ & positional_deletes,
17461748 )
17471749 . unwrap ( ) ;
17481750
@@ -1766,7 +1768,7 @@ message schema {
17661768 let result = ArrowReader :: build_deletes_row_selection (
17671769 & row_groups_metadata,
17681770 & None ,
1769- positional_deletes,
1771+ & positional_deletes,
17701772 )
17711773 . unwrap ( ) ;
17721774
0 commit comments