@@ -355,6 +355,7 @@ impl Stream {
355355 // if yes, then merge them and save
356356
357357 if let Some ( mut schema) = schema {
358+ // calculate field stats for all user defined streams
358359 if self . get_stream_type ( ) != StreamType :: Internal {
359360 if let Err ( err) = self . calculate_field_stats ( rbs, schema. clone ( ) . into ( ) ) . await {
360361 warn ! (
@@ -519,6 +520,7 @@ impl Stream {
519520 let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props) ) ?;
520521 for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
521522 writer. write ( record) ?;
523+ // Collect record batches for finding statistics later
522524 record_batches. push ( record. clone ( ) ) ;
523525 }
524526 writer. close ( ) ?;
@@ -780,6 +782,9 @@ impl Stream {
780782 Ok ( ( ) )
781783 }
782784
785+ /// Calculates field statistics for the stream and pushes them to the internal stats dataset.
786+ /// This function creates a new internal stream for stats if it doesn't exist.
787+ /// It collects statistics for each field in the stream
783788 async fn calculate_field_stats (
784789 & self ,
785790 record_batches : Vec < RecordBatch > ,
@@ -821,6 +826,9 @@ impl Stream {
821826 Ok ( ( ) )
822827 }
823828
829+ /// Collects statistics for all fields in the stream.
830+ /// Returns a vector of `FieldStat` for each field with non-zero count.
831+ /// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently.
824832 async fn collect_all_field_stats (
825833 & self ,
826834 ctx : & SessionContext ,
@@ -841,10 +849,13 @@ impl Stream {
841849
842850 futures:: stream:: iter ( field_futures)
843851 . buffer_unordered ( MAX_CONCURRENT_FIELD_STATS )
844- . filter_map ( |x| async { x } )
852+ . filter_map ( std :: future :: ready )
845853 . collect :: < Vec < _ > > ( )
846854 . await
847855 }
856+
857+ /// Calculates statistics for a single field in the stream.
858+ /// Returns `None` if the count query returns 0.
848859 async fn calculate_single_field_stats (
849860 ctx : SessionContext ,
850861 stream_name : String ,
@@ -879,6 +890,9 @@ impl Stream {
879890 } )
880891 }
881892
893+ /// Queries a single integer value from the DataFusion context.
894+ /// Returns `None` if the query fails or returns no rows.
895+ /// This is used for fetching record count for a field and distinct count.
882896 async fn query_single_i64 ( ctx : & SessionContext , sql : & str ) -> Option < i64 > {
883897 let df = ctx. sql ( sql) . await . ok ( ) ?;
884898 let batches = df. collect ( ) . await . ok ( ) ?;
@@ -891,6 +905,8 @@ impl Stream {
891905 Some ( array. value ( 0 ) )
892906 }
893907
908+ /// Helper function to format an Arrow value at a given index into a string.
909+ /// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean.
894910 fn format_arrow_value ( array : & dyn Array , idx : usize ) -> String {
895911 if array. is_null ( idx) {
896912 return "NULL" . to_string ( ) ;
@@ -921,6 +937,9 @@ impl Stream {
921937 }
922938 }
923939
940+ /// This function is used to fetch distinct values and their counts for a field in the stream.
941+ /// Returns a vector of `DistinctStat` containing distinct values and their counts.
942+ /// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
924943 async fn query_distinct_stats (
925944 ctx : & SessionContext ,
926945 stream_name : & str ,
0 commit comments