@@ -788,14 +788,16 @@ impl DataSink for IcebergDataSink {
788788 write_parquet_partitioned ( table, data. map_err ( Into :: into) , self . 0 . branch . as_deref ( ) )
789789 . await ?;
790790
791+ let count = metadata_files. iter ( ) . map ( |x|x. record_count ( ) ) . fold ( 0 , |acc, x| acc+ x) ;
792+
791793 table
792794 . new_transaction ( self . 0 . branch . as_deref ( ) )
793795 . append_data ( metadata_files)
794796 . commit ( )
795797 . await
796798 . map_err ( DataFusionIcebergError :: from) ?;
797799
798- Ok ( 0 )
800+ Ok ( count as u64 )
799801 }
800802 fn metrics ( & self ) -> Option < MetricsSet > {
801803 None
@@ -1196,7 +1198,7 @@ mod tests {
11961198
11971199 ctx. register_table ( "orders" , table. clone ( ) ) . unwrap ( ) ;
11981200
1199- ctx. sql (
1201+ let res = ctx. sql (
12001202 "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
12011203 (1, 1, 1, '2020-01-01', 1),
12021204 (2, 2, 1, '2020-01-01', 1),
@@ -1211,6 +1213,14 @@ mod tests {
12111213 . await
12121214 . expect ( "Failed to insert values into table" ) ;
12131215
1216+ let count = res[ 0 ]
1217+ . column ( 0 )
1218+ . as_any ( )
1219+ . downcast_ref :: < Int64Array > ( )
1220+ . unwrap ( )
1221+ . values ( ) [ 0 ] ;
1222+ assert_eq ! ( count, 6 ) ;
1223+
12141224 let batches = ctx
12151225 . sql ( "select product_id, sum(amount) from orders where customer_id = 1 group by product_id;" )
12161226 . await
0 commit comments