@@ -321,14 +321,24 @@ fn find_limit_and_type(
321321
322322#[ cfg( test) ]
323323mod tests {
324- use std:: { io:: Cursor , sync:: Arc } ;
324+ use std:: {
325+ fs:: File ,
326+ io:: { self , Cursor , Read } ,
327+ path:: Path ,
328+ sync:: Arc ,
329+ } ;
325330
326331 use arrow_array:: {
327- cast:: AsArray , types:: Int64Type , Array , Float64Array , Int64Array , RecordBatch , StringArray ,
332+ cast:: AsArray , types:: Int64Type , Array , Float64Array , Int32Array , Int64Array , RecordBatch ,
333+ StringArray ,
328334 } ;
329335 use arrow_ipc:: writer:: {
330336 write_message, DictionaryTracker , IpcDataGenerator , IpcWriteOptions , StreamWriter ,
331337 } ;
338+ use arrow_schema:: { DataType , Field , Schema } ;
339+ use temp_dir:: TempDir ;
340+
341+ use crate :: parseable:: staging:: reader:: { MergedReverseRecordReader , OffsetReader } ;
332342
333343 use super :: get_reverse_reader;
334344
@@ -442,4 +452,230 @@ mod tests {
442452
443453 assert_eq ! ( sum, 10000 ) ;
444454 }
455+
456+ // Helper function to create test record batches
457+ fn create_test_batches ( schema : & Arc < Schema > , count : usize ) -> Vec < RecordBatch > {
458+ let mut batches = Vec :: with_capacity ( count) ;
459+
460+ for batch_num in 1 ..=count as i32 {
461+ let id_array = Int32Array :: from_iter ( batch_num * 10 ..batch_num * 10 + 1 ) ;
462+ let name_array = StringArray :: from ( vec ! [
463+ format!( "Name {batch_num}-1" ) ,
464+ format!( "Name {batch_num}-2" ) ,
465+ ] ) ;
466+
467+ let batch = RecordBatch :: try_new (
468+ schema. clone ( ) ,
469+ vec ! [ Arc :: new( id_array) , Arc :: new( name_array) ] ,
470+ )
471+ . expect ( "Failed to create test batch" ) ;
472+
473+ batches. push ( batch) ;
474+ }
475+
476+ batches
477+ }
478+
479+ // Helper function to write batches to a file
480+ fn write_test_batches (
481+ path : & Path ,
482+ schema : & Arc < Schema > ,
483+ batches : & [ RecordBatch ] ,
484+ ) -> io:: Result < ( ) > {
485+ let file = File :: create ( path) ?;
486+ let mut writer =
487+ StreamWriter :: try_new ( file, schema) . expect ( "Failed to create StreamWriter" ) ;
488+
489+ for batch in batches {
490+ writer. write ( batch) . expect ( "Failed to write batch" ) ;
491+ }
492+
493+ writer. finish ( ) . expect ( "Failed to finalize writer" ) ;
494+ Ok ( ( ) )
495+ }
496+
497+ #[ test]
498+ fn test_offset_reader ( ) {
499+ // Create a simple binary file in memory
500+ let data = vec ! [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ] ;
501+ let cursor = Cursor :: new ( data) ;
502+
503+ // Define offset list: (offset, size)
504+ let offsets = vec ! [ ( 2 , 3 ) , ( 7 , 2 ) ] ; // Read bytes 2-4 (3, 4, 5) and then 7-8 (8, 9)
505+
506+ let mut reader = OffsetReader :: new ( cursor, offsets) ;
507+ let mut buffer = [ 0u8 ; 10 ] ;
508+
509+ // First read should get bytes 3, 4, 5
510+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
511+ assert_eq ! ( read_bytes, 3 ) ;
512+ assert_eq ! ( & buffer[ ..read_bytes] , & [ 3 , 4 , 5 ] ) ;
513+
514+ // Second read should get bytes 8, 9
515+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
516+ assert_eq ! ( read_bytes, 2 ) ;
517+ assert_eq ! ( & buffer[ ..read_bytes] , & [ 8 , 9 ] ) ;
518+
519+ // No more data
520+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
521+ assert_eq ! ( read_bytes, 0 ) ;
522+ }
523+
524+ #[ test]
525+ fn test_merged_reverse_record_reader ( ) -> io:: Result < ( ) > {
526+ let dir = TempDir :: new ( ) . unwrap ( ) ;
527+ let file_path = dir. path ( ) . join ( "test.arrow" ) ;
528+
529+ // Create a schema
530+ let schema = Arc :: new ( Schema :: new ( vec ! [
531+ Field :: new( "id" , DataType :: Int32 , false ) ,
532+ Field :: new( "name" , DataType :: Utf8 , false ) ,
533+ ] ) ) ;
534+
535+ // Create test batches (3 batches)
536+ let batches = create_test_batches ( & schema, 3 ) ;
537+
538+ // Write batches to file
539+ write_test_batches ( & file_path, & schema, & batches) ?;
540+
541+ // Now read them back in reverse order
542+ let mut reader = MergedReverseRecordReader :: try_new ( & [ file_path] ) . merged_iter ( schema, None ) ;
543+
544+ // We should get batches in reverse order: 3, 2, 1
545+ // But first message should be schema, so we'll still read them in order
546+
547+ // Read batch 3
548+ let batch = reader. next ( ) . expect ( "Failed to read batch" ) ;
549+ assert_eq ! ( batch. num_rows( ) , 2 ) ;
550+ let id_array = batch
551+ . column ( 0 )
552+ . as_any ( )
553+ . downcast_ref :: < Int32Array > ( )
554+ . unwrap ( ) ;
555+ assert_eq ! ( id_array. value( 0 ) , 31 ) ; // affect of reverse on each recordbatch
556+ assert_eq ! ( id_array. value( 1 ) , 30 ) ;
557+
558+ // Read batch 2
559+ let batch = reader. next ( ) . expect ( "Failed to read batch" ) ;
560+ assert_eq ! ( batch. num_rows( ) , 2 ) ;
561+ let id_array = batch
562+ . column ( 0 )
563+ . as_any ( )
564+ . downcast_ref :: < Int32Array > ( )
565+ . unwrap ( ) ;
566+ assert_eq ! ( id_array. value( 0 ) , 21 ) ;
567+ assert_eq ! ( id_array. value( 1 ) , 20 ) ;
568+
569+ // Read batch 1
570+ let batch = reader. next ( ) . expect ( "Failed to read batch" ) ;
571+ assert_eq ! ( batch. num_rows( ) , 2 ) ;
572+ let id_array = batch
573+ . column ( 0 )
574+ . as_any ( )
575+ . downcast_ref :: < Int32Array > ( )
576+ . unwrap ( ) ;
577+ assert_eq ! ( id_array. value( 0 ) , 11 ) ;
578+ assert_eq ! ( id_array. value( 1 ) , 10 ) ;
579+
580+ // No more batches
581+ assert ! ( reader. next( ) . is_none( ) ) ;
582+
583+ Ok ( ( ) )
584+ }
585+
586+ #[ test]
587+ fn test_empty_offset_list ( ) {
588+ // Test with empty offset list
589+ let data = vec ! [ 1 , 2 , 3 , 4 , 5 ] ;
590+ let cursor = Cursor :: new ( data) ;
591+
592+ let mut reader = OffsetReader :: new ( cursor, vec ! [ ] ) ;
593+ let mut buffer = [ 0u8 ; 10 ] ;
594+
595+ // Should return 0 bytes read
596+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
597+ assert_eq ! ( read_bytes, 0 ) ;
598+ }
599+
600+ #[ test]
601+ fn test_partial_reads ( ) {
602+ // Test reading with a buffer smaller than the section size
603+ let data = vec ! [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ] ;
604+ let cursor = Cursor :: new ( data) ;
605+
606+ // One offset of 5 bytes
607+ let offsets = vec ! [ ( 2 , 5 ) ] ; // Read bytes 2-6 (3, 4, 5, 6, 7)
608+
609+ let mut reader = OffsetReader :: new ( cursor, offsets) ;
610+ let mut buffer = [ 0u8 ; 3 ] ; // Buffer smaller than the 5 bytes we want to read
611+
612+ // First read should get first 3 bytes: 3, 4, 5
613+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
614+ assert_eq ! ( read_bytes, 3 ) ;
615+ assert_eq ! ( & buffer[ ..read_bytes] , & [ 3 , 4 , 5 ] ) ;
616+
617+ // Second read should get remaining 2 bytes: 6, 7
618+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
619+ assert_eq ! ( read_bytes, 2 ) ;
620+ assert_eq ! ( & buffer[ ..read_bytes] , & [ 6 , 7 ] ) ;
621+
622+ // No more data
623+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
624+ assert_eq ! ( read_bytes, 0 ) ;
625+ }
626+
627+ #[ test]
628+ fn test_get_reverse_reader_single_message ( ) -> io:: Result < ( ) > {
629+ let dir = TempDir :: new ( ) . unwrap ( ) ;
630+ let file_path = dir. path ( ) . join ( "test_single.arrow" ) ;
631+
632+ // Create a schema
633+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ;
634+
635+ // Create a single batch
636+ let batch =
637+ RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( Int32Array :: from( vec![ 42 ] ) ) ] )
638+ . expect ( "Failed to create batch" ) ;
639+
640+ // Write batch to file
641+ write_test_batches ( & file_path, & schema, & [ batch] ) ?;
642+
643+ let mut reader = MergedReverseRecordReader :: try_new ( & [ file_path] ) . merged_iter ( schema, None ) ;
644+
645+ // Should get the batch
646+ let result_batch = reader. next ( ) . expect ( "Failed to read batch" ) ;
647+ let id_array = result_batch
648+ . column ( 0 )
649+ . as_any ( )
650+ . downcast_ref :: < Int32Array > ( )
651+ . unwrap ( ) ;
652+ assert_eq ! ( id_array. value( 0 ) , 42 ) ;
653+
654+ // No more batches
655+ assert ! ( reader. next( ) . is_none( ) ) ;
656+
657+ Ok ( ( ) )
658+ }
659+
660+ #[ test]
661+ fn test_large_buffer_resizing ( ) {
662+ // Test that buffer resizes correctly for large sections
663+ let data = vec ! [ 1 ; 10000 ] ; // 10KB of data
664+ let cursor = Cursor :: new ( data) ;
665+
666+ // One large offset (8KB)
667+ let offsets = vec ! [ ( 1000 , 8000 ) ] ;
668+
669+ let mut reader = OffsetReader :: new ( cursor, offsets) ;
670+ let mut buffer = [ 0u8 ; 10000 ] ;
671+
672+ // Should read 8KB
673+ let read_bytes = reader. read ( & mut buffer) . unwrap ( ) ;
674+ assert_eq ! ( read_bytes, 8000 ) ;
675+
676+ // All bytes should be 1
677+ for i in 0 ..read_bytes {
678+ assert_eq ! ( buffer[ i] , 1 ) ;
679+ }
680+ }
445681}
0 commit comments