@@ -77,3 +77,150 @@ impl MergedRecordReader {
7777 . map ( move |batch| adapt_batch ( & schema, & batch) )
7878 }
7979}
80+
81+ #[ cfg( test) ]
82+ mod tests {
83+ use std:: {
84+ io:: { BufReader , Cursor , Read , Seek } ,
85+ sync:: Arc ,
86+ } ;
87+
88+ use arrow_array:: {
89+ cast:: AsArray , types:: Int64Type , Array , Float64Array , Int64Array , RecordBatch , StringArray ,
90+ } ;
91+ use arrow_ipc:: {
92+ reader:: StreamReader ,
93+ writer:: {
94+ write_message, DictionaryTracker , IpcDataGenerator , IpcWriteOptions , StreamWriter ,
95+ } ,
96+ } ;
97+
98+ fn rb ( rows : usize ) -> RecordBatch {
99+ let array1: Arc < dyn Array > = Arc :: new ( Int64Array :: from_iter ( 0 ..( rows as i64 ) ) ) ;
100+ let array2: Arc < dyn Array > = Arc :: new ( Float64Array :: from_iter ( ( 0 ..rows) . map ( |x| x as f64 ) ) ) ;
101+ let array3: Arc < dyn Array > = Arc :: new ( StringArray :: from_iter (
102+ ( 0 ..rows) . map ( |x| Some ( format ! ( "str {}" , x) ) ) ,
103+ ) ) ;
104+
105+ RecordBatch :: try_from_iter_with_nullable ( [
106+ ( "a" , array1, true ) ,
107+ ( "b" , array2, true ) ,
108+ ( "c" , array3, true ) ,
109+ ] )
110+ . unwrap ( )
111+ }
112+
113+ fn write_mem ( rbs : & [ RecordBatch ] ) -> Vec < u8 > {
114+ let buf = Vec :: new ( ) ;
115+ let mut writer = StreamWriter :: try_new ( buf, & rbs[ 0 ] . schema ( ) ) . unwrap ( ) ;
116+
117+ for rb in rbs {
118+ writer. write ( rb) . unwrap ( )
119+ }
120+
121+ writer. into_inner ( ) . unwrap ( )
122+ }
123+
124+ fn get_reverse_reader < T : Read + Seek > ( reader : T ) -> StreamReader < BufReader < T > > {
125+ StreamReader :: try_new ( BufReader :: new ( reader) , None ) . unwrap ( )
126+ }
127+
128+ #[ test]
129+ fn test_empty_row ( ) {
130+ let rb = rb ( 0 ) ;
131+ let buf = write_mem ( & [ rb] ) ;
132+ let reader = Cursor :: new ( buf) ;
133+ let mut reader = get_reverse_reader ( reader) ;
134+ let rb = reader. next ( ) . unwrap ( ) . unwrap ( ) ;
135+ assert_eq ! ( rb. num_rows( ) , 0 ) ;
136+ }
137+
138+ #[ test]
139+ fn test_one_row ( ) {
140+ let rb = rb ( 1 ) ;
141+ let buf = write_mem ( & [ rb] ) ;
142+ let reader = Cursor :: new ( buf) ;
143+ let mut reader = get_reverse_reader ( reader) ;
144+ let rb = reader. next ( ) . unwrap ( ) . unwrap ( ) ;
145+ assert_eq ! ( rb. num_rows( ) , 1 ) ;
146+ }
147+
148+ #[ test]
149+ fn test_multiple_row_multiple_rbs ( ) {
150+ let buf = write_mem ( & [ rb ( 1 ) , rb ( 2 ) , rb ( 3 ) ] ) ;
151+ let reader = Cursor :: new ( buf) ;
152+ let mut reader = get_reverse_reader ( reader) ;
153+ let rb = reader. next ( ) . unwrap ( ) . unwrap ( ) ;
154+ assert_eq ! ( rb. num_rows( ) , 1 ) ;
155+ let col1_val: Vec < i64 > = rb
156+ . column ( 0 )
157+ . as_primitive :: < Int64Type > ( )
158+ . iter ( )
159+ . flatten ( )
160+ . collect ( ) ;
161+ assert_eq ! ( col1_val, vec![ 0 ] ) ;
162+
163+ let rb = reader. next ( ) . unwrap ( ) . unwrap ( ) ;
164+ assert_eq ! ( rb. num_rows( ) , 2 ) ;
165+ let col1_val: Vec < i64 > = rb
166+ . column ( 0 )
167+ . as_primitive :: < Int64Type > ( )
168+ . iter ( )
169+ . flatten ( )
170+ . collect ( ) ;
171+ assert_eq ! ( col1_val, vec![ 0 , 1 ] ) ;
172+
173+ let rb = reader. next ( ) . unwrap ( ) . unwrap ( ) ;
174+ assert_eq ! ( rb. num_rows( ) , 3 ) ;
175+ let col1_val: Vec < i64 > = rb
176+ . column ( 0 )
177+ . as_primitive :: < Int64Type > ( )
178+ . iter ( )
179+ . flatten ( )
180+ . collect ( ) ;
181+ assert_eq ! ( col1_val, vec![ 0 , 1 , 2 ] ) ;
182+ }
183+
184+ #[ test]
185+ fn manual_write ( ) {
186+ let error_on_replacement = true ;
187+ let options = IpcWriteOptions :: default ( ) ;
188+ let mut dictionary_tracker = DictionaryTracker :: new ( error_on_replacement) ;
189+ let data_gen = IpcDataGenerator { } ;
190+
191+ let mut buf = Vec :: new ( ) ;
192+ let rb1 = rb ( 1 ) ;
193+
194+ let schema = data_gen. schema_to_bytes_with_dictionary_tracker (
195+ & rb1. schema ( ) ,
196+ & mut dictionary_tracker,
197+ & options,
198+ ) ;
199+ write_message ( & mut buf, schema, & options) . unwrap ( ) ;
200+
201+ for i in ( 1 ..=3 ) . cycle ( ) . skip ( 1 ) . take ( 10000 ) {
202+ let ( _, encoded_message) = data_gen
203+ . encoded_batch ( & rb ( i) , & mut dictionary_tracker, & options)
204+ . unwrap ( ) ;
205+ write_message ( & mut buf, encoded_message, & options) . unwrap ( ) ;
206+ }
207+
208+ let schema = data_gen. schema_to_bytes_with_dictionary_tracker (
209+ & rb1. schema ( ) ,
210+ & mut dictionary_tracker,
211+ & options,
212+ ) ;
213+ write_message ( & mut buf, schema, & options) . unwrap ( ) ;
214+
215+ let buf = Cursor :: new ( buf) ;
216+ let reader = get_reverse_reader ( buf) . flatten ( ) ;
217+
218+ let mut sum = 0 ;
219+ for rb in reader {
220+ sum += 1 ;
221+ assert ! ( rb. num_rows( ) > 0 ) ;
222+ }
223+
224+ assert_eq ! ( sum, 10000 ) ;
225+ }
226+ }
0 commit comments