1616 *
1717 *
1818 */
19-
20- use std:: {
21- collections:: { HashMap , HashSet } ,
22- fs:: { remove_file, write, File , OpenOptions } ,
23- num:: NonZeroU32 ,
24- path:: { Path , PathBuf } ,
25- process,
26- sync:: { Arc , Mutex , RwLock } ,
27- time:: { Instant , SystemTime , UNIX_EPOCH } ,
28- } ;
29-
30- use arrow_array:: { Array , Float64Array , Int64Array , NullArray , StringArray } ;
19+ use arrow_array:: { Array , Date32Array , Float64Array , Int64Array , NullArray , StringArray } ;
3120use arrow_array:: { BooleanArray , RecordBatch , TimestampMillisecondArray } ;
3221use arrow_schema:: { Field , Fields , Schema } ;
3322use chrono:: { DateTime , NaiveDateTime , Timelike , Utc } ;
3423use datafusion:: { datasource:: MemTable , prelude:: SessionContext } ;
3524use derive_more:: { Deref , DerefMut } ;
36- use futures :: stream :: { FuturesUnordered , StreamExt } ;
25+ use futures_util :: StreamExt ;
3726use itertools:: Itertools ;
3827use parquet:: {
3928 arrow:: ArrowWriter ,
@@ -45,6 +34,15 @@ use parquet::{
4534use rand:: distributions:: DistString ;
4635use relative_path:: RelativePathBuf ;
4736use serde:: Serialize ;
37+ use std:: {
38+ collections:: { HashMap , HashSet } ,
39+ fs:: { remove_file, write, File , OpenOptions } ,
40+ num:: NonZeroU32 ,
41+ path:: { Path , PathBuf } ,
42+ process,
43+ sync:: { Arc , Mutex , RwLock } ,
44+ time:: { Instant , SystemTime , UNIX_EPOCH } ,
45+ } ;
4846use tokio:: task:: JoinSet ;
4947use tracing:: { error, info, trace, warn} ;
5048
@@ -76,6 +74,8 @@ use super::{
7674 LogStream , ARROW_FILE_EXTENSION ,
7775} ;
7876
77+ const MAX_CONCURRENT_FIELD_STATS : usize = 10 ;
78+
7979#[ derive( Serialize , Debug ) ]
8080struct DistinctStat {
8181 distinct_value : String ,
@@ -355,7 +355,7 @@ impl Stream {
355355 // if yes, then merge them and save
356356
357357 if let Some ( mut schema) = schema {
358- if ! & self . stream_name . contains ( INTERNAL_STREAM_NAME ) {
358+ if self . get_stream_type ( ) != StreamType :: Internal {
359359 if let Err ( err) = self . calculate_field_stats ( rbs, schema. clone ( ) . into ( ) ) . await {
360360 warn ! (
361361 "Error calculating field stats for stream {}: {}" ,
@@ -826,19 +826,25 @@ impl Stream {
826826 ctx : & SessionContext ,
827827 schema : & Arc < Schema > ,
828828 ) -> Vec < FieldStat > {
829- let field_futures = schema. fields ( ) . iter ( ) . map ( |field| {
829+ // Collect field names into an owned Vec<String> to avoid lifetime issues
830+ let field_names: Vec < String > = schema
831+ . fields ( )
832+ . iter ( )
833+ . map ( |field| field. name ( ) . clone ( ) )
834+ . collect ( ) ;
835+
836+ let field_futures = field_names. into_iter ( ) . map ( |field_name| {
830837 let ctx = ctx. clone ( ) ;
831838 let stream_name = self . stream_name . clone ( ) ;
832- let field_name = field. name ( ) . clone ( ) ;
833839 async move { Self :: calculate_single_field_stats ( ctx, stream_name, field_name) . await }
834840 } ) ;
835841
836- FuturesUnordered :: from_iter ( field_futures)
842+ futures:: stream:: iter ( field_futures)
843+ . buffer_unordered ( MAX_CONCURRENT_FIELD_STATS )
837844 . filter_map ( |x| async { x } )
838845 . collect :: < Vec < _ > > ( )
839846 . await
840847 }
841-
842848 async fn calculate_single_field_stats (
843849 ctx : SessionContext ,
844850 stream_name : String ,
@@ -876,11 +882,12 @@ impl Stream {
876882 async fn query_single_i64 ( ctx : & SessionContext , sql : & str ) -> Option < i64 > {
877883 let df = ctx. sql ( sql) . await . ok ( ) ?;
878884 let batches = df. collect ( ) . await . ok ( ) ?;
879- let array = batches
880- . first ( ) ?
881- . column ( 0 )
882- . as_any ( )
883- . downcast_ref :: < arrow_array:: Int64Array > ( ) ?;
885+ let batch = batches. first ( ) ?;
886+ if batch. num_rows ( ) == 0 {
887+ return None ;
888+ }
889+ let array = batch. column ( 0 ) . as_any ( ) . downcast_ref :: < Int64Array > ( ) ?;
890+
884891 Some ( array. value ( 0 ) )
885892 }
886893
@@ -899,11 +906,17 @@ impl Stream {
899906 DateTime :: from_timestamp_millis ( timestamp)
900907 . map ( |dt| dt. to_string ( ) )
901908 . unwrap_or_else ( || "INVALID_TIMESTAMP" . to_string ( ) )
909+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < Date32Array > ( ) {
910+ return arr. value ( idx) . to_string ( ) ;
902911 } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < BooleanArray > ( ) {
903912 arr. value ( idx) . to_string ( )
904913 } else if array. as_any ( ) . downcast_ref :: < NullArray > ( ) . is_some ( ) {
905914 "NULL" . to_string ( )
906915 } else {
916+ warn ! (
917+ "Unsupported array type for statistics: {:?}" ,
918+ array. data_type( )
919+ ) ;
907920 "UNSUPPORTED" . to_string ( )
908921 }
909922 }
@@ -914,7 +927,8 @@ impl Stream {
914927 field_name : & str ,
915928 ) -> Vec < DistinctStat > {
916929 let sql = format ! (
917- "select count(*) as distinct_count, \" {field_name}\" from \" {stream_name}\" where \" {field_name}\" is not null group by \" {field_name}\" order by distinct_count desc limit 50"
930+ "select count(*) as distinct_count, \" {field_name}\" from \" {stream_name}\" where \" {field_name}\" is not null group by \" {field_name}\" order by distinct_count desc limit {}" ,
931+ PARSEABLE . options. max_field_statistics
918932 ) ;
919933 let mut distinct_stats = Vec :: new ( ) ;
920934 if let Ok ( df) = ctx. sql ( & sql) . await {
0 commit comments