@@ -33,7 +33,9 @@ use bytes::Bytes;
3333use fnv:: FnvHashSet ;
3434use futures:: future:: BoxFuture ;
3535use futures:: { try_join, FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
36- use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , ArrowReaderOptions , RowFilter , RowSelection } ;
36+ use parquet:: arrow:: arrow_reader:: {
37+ ArrowPredicateFn , ArrowReaderOptions , RowFilter , RowSelection , RowSelector ,
38+ } ;
3739use parquet:: arrow:: async_reader:: AsyncFileReader ;
3840use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask , PARQUET_FIELD_ID_META_KEY } ;
3941use parquet:: file:: metadata:: { ParquetMetaData , ParquetMetaDataReader , RowGroupMetaData } ;
@@ -280,7 +282,7 @@ impl ArrowReader {
280282 let delete_row_selection = Self :: build_deletes_row_selection (
281283 record_batch_stream_builder. metadata ( ) . row_groups ( ) ,
282284 & selected_row_group_indices,
283- positional_delete_indexes,
285+ positional_delete_indexes. as_ref ( ) ,
284286 ) ?;
285287
286288 // merge the row selection from the delete files with the row selection
@@ -342,15 +344,109 @@ impl ArrowReader {
342344 /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
343345 /// as having been deleted by a positional delete, taking into account any row groups that have
344346 /// been skipped entirely by the filter predicate
345- #[ allow( unused) ]
346347 fn build_deletes_row_selection (
347- row_group_metadata : & [ RowGroupMetaData ] ,
348+ row_group_metadata_list : & [ RowGroupMetaData ] ,
348349 selected_row_groups : & Option < Vec < usize > > ,
349- mut positional_deletes : DeleteVector ,
350+ positional_deletes : & DeleteVector ,
350351 ) -> Result < RowSelection > {
351- // TODO
352+ let mut results: Vec < RowSelector > = Vec :: new ( ) ;
353+ let mut selected_row_groups_idx = 0 ;
354+ let mut current_row_group_base_idx: u64 = 0 ;
355+ let mut delete_vector_iter = positional_deletes. iter ( ) ;
356+ let mut next_deleted_row_idx_opt = delete_vector_iter. next ( ) ;
357+
358+ for ( idx, row_group_metadata) in row_group_metadata_list. iter ( ) . enumerate ( ) {
359+ let row_group_num_rows = row_group_metadata. num_rows ( ) as u64 ;
360+ let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
361+
362+ // if row group selection is enabled,
363+ if let Some ( selected_row_groups) = selected_row_groups {
364+ // if we've consumed all the selected row groups, we're done
365+ if selected_row_groups_idx == selected_row_groups. len ( ) {
366+ break ;
367+ }
368+
369+ if idx == selected_row_groups[ selected_row_groups_idx] {
370+ // we're in a selected row group. Increment selected_row_groups_idx
371+ // so that next time around the for loop we're looking for the next
372+ // selected row group
373+ selected_row_groups_idx += 1 ;
374+ } else {
375+ // remove any positional deletes from the skipped page so that
376+ // `positional.deletes.min()` can be used
377+ delete_vector_iter. advance_to ( next_row_group_base_idx) ;
378+ next_deleted_row_idx_opt = delete_vector_iter. next ( ) ;
379+
380+ // still increment the current page base index but then skip to the next row group
381+ // in the file
382+ current_row_group_base_idx += row_group_num_rows;
383+ continue ;
384+ }
385+ }
386+
387+ let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
388+ Some ( next_deleted_row_idx) => {
389+ // if the index of the next deleted row is beyond this row group, add a selection for
390+ // the remainder of this row group and skip to the next row group
391+ if next_deleted_row_idx >= next_row_group_base_idx {
392+ results. push ( RowSelector :: select ( row_group_num_rows as usize ) ) ;
393+ continue ;
394+ }
395+
396+ next_deleted_row_idx
397+ }
398+
399+ // If there are no more pos deletes, add a selector for the entirety of this row group.
400+ _ => {
401+ results. push ( RowSelector :: select ( row_group_num_rows as usize ) ) ;
402+ continue ;
403+ }
404+ } ;
405+
406+ let mut current_idx = current_row_group_base_idx;
407+ ' chunks: while next_deleted_row_idx < next_row_group_base_idx {
408+ // `select` all rows that precede the next delete index
409+ if current_idx < next_deleted_row_idx {
410+ let run_length = next_deleted_row_idx - current_idx;
411+ results. push ( RowSelector :: select ( run_length as usize ) ) ;
412+ current_idx += run_length;
413+ }
414+
415+ // `skip` all consecutive deleted rows in the current row group
416+ let mut run_length = 0 ;
417+ while next_deleted_row_idx == current_idx
418+ && next_deleted_row_idx < next_row_group_base_idx
419+ {
420+ run_length += 1 ;
421+ current_idx += 1 ;
422+
423+ next_deleted_row_idx_opt = delete_vector_iter. next ( ) ;
424+ next_deleted_row_idx = match next_deleted_row_idx_opt {
425+ Some ( next_deleted_row_idx) => next_deleted_row_idx,
426+ _ => {
427+ // We've processed the final positional delete.
428+ // Conclude the skip and then break so that we select the remaining
429+ // rows in the row group and move on to the next row group
430+ results. push ( RowSelector :: skip ( run_length) ) ;
431+ break ' chunks;
432+ }
433+ } ;
434+ }
435+ if run_length > 0 {
436+ results. push ( RowSelector :: skip ( run_length) ) ;
437+ }
438+ }
439+
440+ if current_idx < next_row_group_base_idx {
441+ results. push ( RowSelector :: select (
442+ ( next_row_group_base_idx - current_idx) as usize ,
443+ ) ) ;
444+ }
445+
446+ current_row_group_base_idx += row_group_num_rows;
447+ }
352448
353- Ok ( RowSelection :: default ( ) )
449+ Ok ( results . into ( ) )
354450 }
355451
356452 fn build_field_id_set_and_map (
@@ -1284,15 +1380,19 @@ mod tests {
12841380 use arrow_array:: { ArrayRef , RecordBatch , StringArray } ;
12851381 use arrow_schema:: { DataType , Field , Schema as ArrowSchema , TimeUnit } ;
12861382 use futures:: TryStreamExt ;
1383+ use parquet:: arrow:: arrow_reader:: { RowSelection , RowSelector } ;
12871384 use parquet:: arrow:: { ArrowWriter , ProjectionMask } ;
12881385 use parquet:: basic:: Compression ;
1386+ use parquet:: file:: metadata:: { ColumnChunkMetaData , RowGroupMetaData } ;
12891387 use parquet:: file:: properties:: WriterProperties ;
12901388 use parquet:: schema:: parser:: parse_message_type;
1291- use parquet:: schema:: types:: SchemaDescriptor ;
1389+ use parquet:: schema:: types:: { SchemaDescPtr , SchemaDescriptor } ;
1390+ use roaring:: RoaringTreemap ;
12921391 use tempfile:: TempDir ;
12931392
12941393 use crate :: arrow:: reader:: { CollectFieldIdVisitor , PARQUET_FIELD_ID_META_KEY } ;
12951394 use crate :: arrow:: { ArrowReader , ArrowReaderBuilder } ;
1395+ use crate :: delete_vector:: DeleteVector ;
12961396 use crate :: expr:: visitors:: bound_predicate_visitor:: visit;
12971397 use crate :: expr:: { Bind , Predicate , Reference } ;
12981398 use crate :: io:: FileIO ;
@@ -1593,4 +1693,148 @@ message schema {
15931693
15941694 ( file_io, schema, table_location, tmp_dir)
15951695 }
1696+
1697+ #[ test]
1698+ fn test_build_deletes_row_selection ( ) {
1699+ let schema_descr = get_test_schema_descr ( ) ;
1700+
1701+ let mut columns = vec ! [ ] ;
1702+ for ptr in schema_descr. columns ( ) {
1703+ let column = ColumnChunkMetaData :: builder ( ptr. clone ( ) ) . build ( ) . unwrap ( ) ;
1704+ columns. push ( column) ;
1705+ }
1706+
1707+ let row_groups_metadata = vec ! [
1708+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 1000 , 0 ) ,
1709+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 500 , 1 ) ,
1710+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 500 , 2 ) ,
1711+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 1000 , 3 ) ,
1712+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 500 , 4 ) ,
1713+ ] ;
1714+
1715+ let selected_row_groups = Some ( vec ! [ 1 , 3 ] ) ;
1716+
1717+ /* cases to cover:
1718+ * {skip|select} {first|intermediate|last} {one row|multiple rows} in
1719+ {first|imtermediate|last} {skipped|selected} row group
1720+ * row group selection disabled
1721+ */
1722+
1723+ let positional_deletes = RoaringTreemap :: from_iter ( & [
1724+ 1 , // in skipped rg 0, should be ignored
1725+ 3 , // run of three consecutive items in skipped rg0
1726+ 4 , 5 , 998 , // two consecutive items at end of skipped rg0
1727+ 999 , 1000 , // solitary row at start of selected rg1 (1, 9)
1728+ 1010 , // run of 3 rows in selected rg1
1729+ 1011 , 1012 , // (3, 485)
1730+ 1498 , // run of two items at end of selected rg1
1731+ 1499 , 1500 , // run of two items at start of skipped rg2
1732+ 1501 , 1600 , // should ignore, in skipped rg2
1733+ 1999 , // single row at end of skipped rg2
1734+ 2000 , // run of two items at start of selected rg3
1735+ 2001 , // (4, 98)
1736+ 2100 , // single row in selected row group 3 (1, 99)
1737+ 2200 , // run of 3 consecutive rows in selected row group 3
1738+ 2201 , 2202 , // (3, 796)
1739+ 2999 , // single item at end of selected rg3 (1)
1740+ 3000 , // single item at start of skipped rg4
1741+ ] ) ;
1742+
1743+ let positional_deletes = DeleteVector :: new ( positional_deletes) ;
1744+
1745+ // using selected row groups 1 and 3
1746+ let result = ArrowReader :: build_deletes_row_selection (
1747+ & row_groups_metadata,
1748+ & selected_row_groups,
1749+ & positional_deletes,
1750+ )
1751+ . unwrap ( ) ;
1752+
1753+ let expected = RowSelection :: from ( vec ! [
1754+ RowSelector :: skip( 1 ) ,
1755+ RowSelector :: select( 9 ) ,
1756+ RowSelector :: skip( 3 ) ,
1757+ RowSelector :: select( 485 ) ,
1758+ RowSelector :: skip( 4 ) ,
1759+ RowSelector :: select( 98 ) ,
1760+ RowSelector :: skip( 1 ) ,
1761+ RowSelector :: select( 99 ) ,
1762+ RowSelector :: skip( 3 ) ,
1763+ RowSelector :: select( 796 ) ,
1764+ RowSelector :: skip( 1 ) ,
1765+ ] ) ;
1766+
1767+ assert_eq ! ( result, expected) ;
1768+
1769+ // selecting all row groups
1770+ let result = ArrowReader :: build_deletes_row_selection (
1771+ & row_groups_metadata,
1772+ & None ,
1773+ & positional_deletes,
1774+ )
1775+ . unwrap ( ) ;
1776+
1777+ let expected = RowSelection :: from ( vec ! [
1778+ RowSelector :: select( 1 ) ,
1779+ RowSelector :: skip( 1 ) ,
1780+ RowSelector :: select( 1 ) ,
1781+ RowSelector :: skip( 3 ) ,
1782+ RowSelector :: select( 992 ) ,
1783+ RowSelector :: skip( 3 ) ,
1784+ RowSelector :: select( 9 ) ,
1785+ RowSelector :: skip( 3 ) ,
1786+ RowSelector :: select( 485 ) ,
1787+ RowSelector :: skip( 4 ) ,
1788+ RowSelector :: select( 98 ) ,
1789+ RowSelector :: skip( 1 ) ,
1790+ RowSelector :: select( 398 ) ,
1791+ RowSelector :: skip( 3 ) ,
1792+ RowSelector :: select( 98 ) ,
1793+ RowSelector :: skip( 1 ) ,
1794+ RowSelector :: select( 99 ) ,
1795+ RowSelector :: skip( 3 ) ,
1796+ RowSelector :: select( 796 ) ,
1797+ RowSelector :: skip( 2 ) ,
1798+ RowSelector :: select( 499 ) ,
1799+ ] ) ;
1800+
1801+ assert_eq ! ( result, expected) ;
1802+ }
1803+
1804+ fn build_test_row_group_meta (
1805+ schema_descr : SchemaDescPtr ,
1806+ columns : Vec < ColumnChunkMetaData > ,
1807+ num_rows : i64 ,
1808+ ordinal : i16 ,
1809+ ) -> RowGroupMetaData {
1810+ RowGroupMetaData :: builder ( schema_descr. clone ( ) )
1811+ . set_num_rows ( num_rows)
1812+ . set_total_byte_size ( 2000 )
1813+ . set_column_metadata ( columns)
1814+ . set_ordinal ( ordinal)
1815+ . build ( )
1816+ . unwrap ( )
1817+ }
1818+
1819+ fn get_test_schema_descr ( ) -> SchemaDescPtr {
1820+ use parquet:: schema:: types:: Type as SchemaType ;
1821+
1822+ let schema = SchemaType :: group_type_builder ( "schema" )
1823+ . with_fields ( vec ! [
1824+ Arc :: new(
1825+ SchemaType :: primitive_type_builder( "a" , parquet:: basic:: Type :: INT32 )
1826+ . build( )
1827+ . unwrap( ) ,
1828+ ) ,
1829+ Arc :: new(
1830+ SchemaType :: primitive_type_builder( "b" , parquet:: basic:: Type :: INT32 )
1831+ . build( )
1832+ . unwrap( ) ,
1833+ ) ,
1834+ ] )
1835+ . build ( )
1836+ . unwrap ( ) ;
1837+
1838+ Arc :: new ( SchemaDescriptor :: new ( Arc :: new ( schema) ) )
1839+ }
15961840}
0 commit comments