@@ -27,10 +27,13 @@ use std::{
2727 time:: { Instant , SystemTime , UNIX_EPOCH } ,
2828} ;
2929
30- use arrow_array:: RecordBatch ;
30+ use arrow_array:: { Array , Float64Array , Int64Array , NullArray , StringArray } ;
31+ use arrow_array:: { BooleanArray , RecordBatch , TimestampMillisecondArray } ;
3132use arrow_schema:: { Field , Fields , Schema } ;
32- use chrono:: { NaiveDateTime , Timelike , Utc } ;
33+ use chrono:: { DateTime , NaiveDateTime , Timelike , Utc } ;
34+ use datafusion:: { datasource:: MemTable , prelude:: SessionContext } ;
3335use derive_more:: { Deref , DerefMut } ;
36+ use futures:: stream:: { FuturesUnordered , StreamExt } ;
3437use itertools:: Itertools ;
3538use parquet:: {
3639 arrow:: ArrowWriter ,
@@ -41,6 +44,7 @@ use parquet::{
4144} ;
4245use rand:: distributions:: DistString ;
4346use relative_path:: RelativePathBuf ;
47+ use serde:: Serialize ;
4448use tokio:: task:: JoinSet ;
4549use tracing:: { error, info, trace, warn} ;
4650
@@ -50,9 +54,14 @@ use crate::{
5054 format:: { LogSource , LogSourceEntry } ,
5155 DEFAULT_TIMESTAMP_KEY ,
5256 } ,
57+ handlers:: http:: {
58+ cluster:: INTERNAL_STREAM_NAME , ingest:: PostError ,
59+ modal:: utils:: ingest_utils:: flatten_and_push_logs,
60+ } ,
5361 metadata:: { LogStreamMetadata , SchemaVersion } ,
5462 metrics,
5563 option:: Mode ,
64+ parseable:: PARSEABLE ,
5665 storage:: { object_storage:: to_bytes, retention:: Retention , StreamType } ,
5766 utils:: time:: { Minute , TimeRange } ,
5867 LOCK_EXPECT , OBJECT_STORE_DATA_GRANULARITY ,
@@ -67,6 +76,26 @@ use super::{
6776 LogStream , ARROW_FILE_EXTENSION ,
6877} ;
6978
79+ #[ derive( Serialize , Debug ) ]
80+ struct DistinctStat {
81+ distinct_value : String ,
82+ count : i64 ,
83+ }
84+
85+ #[ derive( Serialize , Debug ) ]
86+ struct FieldStat {
87+ field_name : String ,
88+ count : i64 ,
89+ distinct_count : i64 ,
90+ distinct_stats : Vec < DistinctStat > ,
91+ }
92+
93+ #[ derive( Serialize , Debug ) ]
94+ struct DatasetStats {
95+ dataset_name : String ,
96+ field_stats : Vec < FieldStat > ,
97+ }
98+
7099/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
71100fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
72101 let filename = path. file_stem ( ) ?. to_str ( ) ?;
@@ -114,7 +143,7 @@ impl Stream {
114143 let data_path = options. local_stream_data_path ( & stream_name) ;
115144
116145 Arc :: new ( Self {
117- stream_name,
146+ stream_name : stream_name . clone ( ) ,
118147 metadata : RwLock :: new ( metadata) ,
119148 data_path,
120149 options,
@@ -306,7 +335,7 @@ impl Stream {
306335 }
307336
308337 /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
309- pub fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
338+ pub async fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
310339 info ! (
311340 "Starting arrow_conversion job for stream- {}" ,
312341 self . stream_name
@@ -317,18 +346,23 @@ impl Stream {
317346
318347 // read arrow files on disk
319348 // convert them to parquet
320- let schema = self
321- . convert_disk_files_to_parquet (
322- time_partition. as_ref ( ) ,
323- custom_partition. as_ref ( ) ,
324- shutdown_signal,
325- )
326- . inspect_err ( |err| warn ! ( "Error while converting arrow to parquet- {err:?}" ) ) ?;
327-
349+ let ( schema, rbs) = self . convert_disk_files_to_parquet (
350+ time_partition. as_ref ( ) ,
351+ custom_partition. as_ref ( ) ,
352+ shutdown_signal,
353+ ) ?;
328354 // check if there is already a schema file in staging pertaining to this stream
329355 // if yes, then merge them and save
330356
331357 if let Some ( mut schema) = schema {
358+ if !& self . stream_name . contains ( INTERNAL_STREAM_NAME ) {
359+ if let Err ( err) = self . calculate_field_stats ( rbs, schema. clone ( ) . into ( ) ) . await {
360+ warn ! (
361+ "Error calculating field stats for stream {}: {}" ,
362+ self . stream_name, err
363+ ) ;
364+ }
365+ }
332366 let static_schema_flag = self . get_static_schema_flag ( ) ;
333367 if !static_schema_flag {
334368 // schema is dynamic, read from staging and merge if present
@@ -429,7 +463,7 @@ impl Stream {
429463 time_partition : Option < & String > ,
430464 custom_partition : Option < & String > ,
431465 shutdown_signal : bool ,
432- ) -> Result < Option < Schema > , StagingError > {
466+ ) -> Result < ( Option < Schema > , Vec < RecordBatch > ) , StagingError > {
433467 let mut schemas = Vec :: new ( ) ;
434468
435469 let now = SystemTime :: now ( ) ;
@@ -464,8 +498,7 @@ impl Stream {
464498 metrics:: STORAGE_SIZE
465499 . with_label_values ( & [ "staging" , & self . stream_name , "arrows" ] )
466500 . set ( total_arrow_files_size as i64 ) ;
467-
468- // warn!("staging files-\n{staging_files:?}\n");
501+ let mut record_batches = Vec :: new ( ) ;
469502 for ( parquet_path, arrow_files) in staging_files {
470503 let record_reader = MergedReverseRecordReader :: try_new ( & arrow_files) ;
471504 if record_reader. readers . is_empty ( ) {
@@ -486,6 +519,7 @@ impl Stream {
486519 let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props) ) ?;
487520 for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
488521 writer. write ( record) ?;
522+ record_batches. push ( record. clone ( ) ) ;
489523 }
490524 writer. close ( ) ?;
491525
@@ -525,10 +559,10 @@ impl Stream {
525559 }
526560
527561 if schemas. is_empty ( ) {
528- return Ok ( None ) ;
562+ return Ok ( ( None , record_batches ) ) ;
529563 }
530564
531- Ok ( Some ( Schema :: try_merge ( schemas) . unwrap ( ) ) )
565+ Ok ( ( Some ( Schema :: try_merge ( schemas) ? ) , record_batches ) )
532566 }
533567
534568 pub fn updated_schema ( & self , current_schema : Schema ) -> Schema {
@@ -725,7 +759,7 @@ impl Stream {
725759 }
726760
727761 /// First flushes arrows onto disk and then converts the arrow into parquet
728- pub fn flush_and_convert ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
762+ pub async fn flush_and_convert ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
729763 let start_flush = Instant :: now ( ) ;
730764 self . flush ( shutdown_signal) ;
731765 trace ! (
@@ -735,7 +769,8 @@ impl Stream {
735769 ) ;
736770
737771 let start_convert = Instant :: now ( ) ;
738- self . prepare_parquet ( shutdown_signal) ?;
772+
773+ self . prepare_parquet ( shutdown_signal) . await ?;
739774 trace ! (
740775 "Converting arrows to parquet on stream ({}) took: {}s" ,
741776 self . stream_name,
@@ -744,6 +779,165 @@ impl Stream {
744779
745780 Ok ( ( ) )
746781 }
782+
783+ async fn calculate_field_stats (
784+ & self ,
785+ record_batches : Vec < RecordBatch > ,
786+ schema : Arc < Schema > ,
787+ ) -> Result < ( ) , PostError > {
788+ let dataset_meta = format ! ( "{}_{INTERNAL_STREAM_NAME}" , & self . stream_name) ;
789+ let log_source_entry = LogSourceEntry :: new ( LogSource :: Json , HashSet :: new ( ) ) ;
790+ PARSEABLE
791+ . create_stream_if_not_exists (
792+ & dataset_meta,
793+ StreamType :: Internal ,
794+ vec ! [ log_source_entry] ,
795+ )
796+ . await ?;
797+ let mem_table = MemTable :: try_new ( schema. clone ( ) , vec ! [ record_batches] )
798+ . map_err ( |e| PostError :: Invalid ( e. into ( ) ) ) ?;
799+ let ctx = SessionContext :: new ( ) ;
800+ ctx. register_table ( & self . stream_name , Arc :: new ( mem_table) )
801+ . map_err ( |e| PostError :: Invalid ( e. into ( ) ) ) ?;
802+
803+ let field_stats = self . collect_all_field_stats ( & ctx, & schema) . await ;
804+
805+ let stats = DatasetStats {
806+ dataset_name : self . stream_name . clone ( ) ,
807+ field_stats,
808+ } ;
809+ if stats. field_stats . is_empty ( ) {
810+ return Ok ( ( ) ) ;
811+ }
812+ let stats_value = serde_json:: to_value ( & stats) . map_err ( |e| PostError :: Invalid ( e. into ( ) ) ) ?;
813+
814+ flatten_and_push_logs (
815+ stats_value,
816+ & dataset_meta,
817+ & LogSource :: Json ,
818+ & HashMap :: new ( ) ,
819+ )
820+ . await ?;
821+ Ok ( ( ) )
822+ }
823+
824+ async fn collect_all_field_stats (
825+ & self ,
826+ ctx : & SessionContext ,
827+ schema : & Arc < Schema > ,
828+ ) -> Vec < FieldStat > {
829+ let field_futures = schema. fields ( ) . iter ( ) . map ( |field| {
830+ let ctx = ctx. clone ( ) ;
831+ let stream_name = self . stream_name . clone ( ) ;
832+ let field_name = field. name ( ) . clone ( ) ;
833+ async move { Self :: calculate_single_field_stats ( ctx, stream_name, field_name) . await }
834+ } ) ;
835+
836+ FuturesUnordered :: from_iter ( field_futures)
837+ . filter_map ( |x| async { x } )
838+ . collect :: < Vec < _ > > ( )
839+ . await
840+ }
841+
842+ async fn calculate_single_field_stats (
843+ ctx : SessionContext ,
844+ stream_name : String ,
845+ field_name : String ,
846+ ) -> Option < FieldStat > {
847+ let count = Self :: query_single_i64 (
848+ & ctx,
849+ & format ! (
850+ "select count(\" {field_name}\" ) as count from \" {stream_name}\" where \" {field_name}\" is not null"
851+ ) ,
852+ )
853+ . await ?;
854+ if count == 0 {
855+ return None ;
856+ }
857+
858+ let distinct_count = Self :: query_single_i64 (
859+ & ctx,
860+ & format ! (
861+ "select COUNT(DISTINCT \" {field_name}\" ) as distinct_count from \" {stream_name}\" "
862+ ) ,
863+ )
864+ . await ?;
865+
866+ let distinct_stats = Self :: query_distinct_stats ( & ctx, & stream_name, & field_name) . await ;
867+
868+ Some ( FieldStat {
869+ field_name,
870+ count,
871+ distinct_count,
872+ distinct_stats,
873+ } )
874+ }
875+
876+ async fn query_single_i64 ( ctx : & SessionContext , sql : & str ) -> Option < i64 > {
877+ let df = ctx. sql ( sql) . await . ok ( ) ?;
878+ let batches = df. collect ( ) . await . ok ( ) ?;
879+ let array = batches
880+ . first ( ) ?
881+ . column ( 0 )
882+ . as_any ( )
883+ . downcast_ref :: < arrow_array:: Int64Array > ( ) ?;
884+ Some ( array. value ( 0 ) )
885+ }
886+
887+ fn format_arrow_value ( array : & dyn Array , idx : usize ) -> String {
888+ if array. is_null ( idx) {
889+ return "NULL" . to_string ( ) ;
890+ }
891+ if let Some ( arr) = array. as_any ( ) . downcast_ref :: < StringArray > ( ) {
892+ arr. value ( idx) . to_string ( )
893+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < Int64Array > ( ) {
894+ arr. value ( idx) . to_string ( )
895+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < Float64Array > ( ) {
896+ arr. value ( idx) . to_string ( )
897+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < TimestampMillisecondArray > ( ) {
898+ let timestamp = arr. value ( idx) ;
899+ DateTime :: from_timestamp_millis ( timestamp)
900+ . map ( |dt| dt. to_string ( ) )
901+ . unwrap_or_else ( || "INVALID_TIMESTAMP" . to_string ( ) )
902+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < BooleanArray > ( ) {
903+ arr. value ( idx) . to_string ( )
904+ } else if array. as_any ( ) . downcast_ref :: < NullArray > ( ) . is_some ( ) {
905+ "NULL" . to_string ( )
906+ } else {
907+ "UNSUPPORTED" . to_string ( )
908+ }
909+ }
910+
911+ async fn query_distinct_stats (
912+ ctx : & SessionContext ,
913+ stream_name : & str ,
914+ field_name : & str ,
915+ ) -> Vec < DistinctStat > {
916+ let sql = format ! (
917+ "select count(*) as distinct_count, \" {field_name}\" from \" {stream_name}\" where \" {field_name}\" is not null group by \" {field_name}\" order by distinct_count desc limit 50"
918+ ) ;
919+ let mut distinct_stats = Vec :: new ( ) ;
920+ if let Ok ( df) = ctx. sql ( & sql) . await {
921+ if let Ok ( batches) = df. collect ( ) . await {
922+ for rb in batches {
923+ let counts = rb
924+ . column ( 0 )
925+ . as_any ( )
926+ . downcast_ref :: < Int64Array > ( )
927+ . expect ( "Counts should be Int64Array" ) ;
928+ let values = rb. column ( 1 ) . as_ref ( ) ;
929+ for i in 0 ..rb. num_rows ( ) {
930+ let value = Self :: format_arrow_value ( values, i) ;
931+ distinct_stats. push ( DistinctStat {
932+ distinct_value : value,
933+ count : counts. value ( i) ,
934+ } ) ;
935+ }
936+ }
937+ }
938+ }
939+ distinct_stats
940+ }
747941}
748942
749943#[ derive( Deref , DerefMut , Default ) ]
@@ -829,7 +1023,7 @@ impl Streams {
8291023 . map ( Arc :: clone)
8301024 . collect ( ) ;
8311025 for stream in streams {
832- joinset. spawn ( async move { stream. flush_and_convert ( shutdown_signal) } ) ;
1026+ joinset. spawn ( async move { stream. flush_and_convert ( shutdown_signal) . await } ) ;
8331027 }
8341028 }
8351029}
@@ -1019,7 +1213,7 @@ mod tests {
10191213 None ,
10201214 )
10211215 . convert_disk_files_to_parquet ( None , None , false ) ?;
1022- assert ! ( result. is_none( ) ) ;
1216+ assert ! ( result. 0 . is_none( ) ) ;
10231217 // Verify metrics were set to 0
10241218 let staging_files = metrics:: STAGING_FILES . with_label_values ( & [ & stream] ) . get ( ) ;
10251219 assert_eq ! ( staging_files, 0 ) ;
@@ -1100,8 +1294,8 @@ mod tests {
11001294 . convert_disk_files_to_parquet ( None , None , true )
11011295 . unwrap ( ) ;
11021296
1103- assert ! ( result. is_some( ) ) ;
1104- let result_schema = result. unwrap ( ) ;
1297+ assert ! ( result. 0 . is_some( ) ) ;
1298+ let result_schema = result. 0 . unwrap ( ) ;
11051299 assert_eq ! ( result_schema. fields( ) . len( ) , 3 ) ;
11061300
11071301 // Verify parquet files were created and the arrow files deleted
@@ -1149,8 +1343,8 @@ mod tests {
11491343 . convert_disk_files_to_parquet ( None , None , true )
11501344 . unwrap ( ) ;
11511345
1152- assert ! ( result. is_some( ) ) ;
1153- let result_schema = result. unwrap ( ) ;
1346+ assert ! ( result. 0 . is_some( ) ) ;
1347+ let result_schema = result. 0 . unwrap ( ) ;
11541348 assert_eq ! ( result_schema. fields( ) . len( ) , 3 ) ;
11551349
11561350 // Verify parquet files were created and the arrow files deleted
@@ -1203,8 +1397,8 @@ mod tests {
12031397 . convert_disk_files_to_parquet ( None , None , false )
12041398 . unwrap ( ) ;
12051399
1206- assert ! ( result. is_some( ) ) ;
1207- let result_schema = result. unwrap ( ) ;
1400+ assert ! ( result. 0 . is_some( ) ) ;
1401+ let result_schema = result. 0 . unwrap ( ) ;
12081402 assert_eq ! ( result_schema. fields( ) . len( ) , 3 ) ;
12091403
12101404 // Verify parquet files were created and the arrow file left
0 commit comments