1919
2020use std:: {
2121 collections:: HashMap ,
22- fs:: { remove_file, OpenOptions } ,
22+ fs:: { remove_file, File , OpenOptions } ,
2323 path:: { Path , PathBuf } ,
2424 process,
2525 sync:: { Arc , Mutex , RwLock } ,
@@ -165,6 +165,11 @@ impl<'a> Stream<'a> {
165165 paths
166166 }
167167
168+ /// Groups arrow files which are to be included in one parquet
169+ ///
170+ /// Excludes the arrow file being written for the current minute (data is still being written to that one)
171+ ///
172+ /// Only includes ones starting from the previous minute
168173 pub fn arrow_files_grouped_exclude_time (
169174 & self ,
170175 exclude : NaiveDateTime ,
@@ -173,6 +178,8 @@ impl<'a> Stream<'a> {
173178 let mut grouped_arrow_file: HashMap < PathBuf , Vec < PathBuf > > = HashMap :: new ( ) ;
174179 let mut arrow_files = self . arrow_files ( ) ;
175180
181+ // if the shutdown signal is false i.e. normal condition
182+ // don't keep the ones for the current minute
176183 if !shutdown_signal {
177184 arrow_files. retain ( |path| {
178185 !path
@@ -215,6 +222,45 @@ impl<'a> Stream<'a> {
215222 . collect ( )
216223 }
217224
225+ pub fn schema_files ( & self ) -> Vec < PathBuf > {
226+ let Ok ( dir) = self . data_path . read_dir ( ) else {
227+ return vec ! [ ] ;
228+ } ;
229+
230+ dir. flatten ( )
231+ . map ( |file| file. path ( ) )
232+ . filter ( |file| file. extension ( ) . is_some_and ( |ext| ext. eq ( "schema" ) ) )
233+ . collect ( )
234+ }
235+
236+ pub fn get_schemas_if_present ( & self ) -> Option < Vec < Schema > > {
237+ let Ok ( dir) = self . data_path . read_dir ( ) else {
238+ return None ;
239+ } ;
240+
241+ let mut schemas: Vec < Schema > = Vec :: new ( ) ;
242+
243+ for file in dir. flatten ( ) {
244+ if let Some ( ext) = file. path ( ) . extension ( ) {
245+ if ext. eq ( "schema" ) {
246+ let file = File :: open ( file. path ( ) ) . expect ( "Schema File should exist" ) ;
247+
248+ let schema = match serde_json:: from_reader ( file) {
249+ Ok ( schema) => schema,
250+ Err ( _) => continue ,
251+ } ;
252+ schemas. push ( schema) ;
253+ }
254+ }
255+ }
256+
257+ if !schemas. is_empty ( ) {
258+ Some ( schemas)
259+ } else {
260+ None
261+ }
262+ }
263+
218264 fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> PathBuf {
219265 let filename = path. file_stem ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
220266 let ( _, filename) = filename. split_once ( '.' ) . unwrap ( ) ;
@@ -249,6 +295,9 @@ impl<'a> Stream<'a> {
249295 }
250296 }
251297
298+ /// This function reads arrow files, groups their schemas
299+ ///
300+ /// converts them into parquet files and returns a merged schema
252301 pub fn convert_disk_files_to_parquet (
253302 & self ,
254303 time_partition : Option < & String > ,
@@ -272,12 +321,12 @@ impl<'a> Stream<'a> {
272321 }
273322
274323 // warn!("staging files-\n{staging_files:?}\n");
275- for ( parquet_path, files ) in staging_files {
324+ for ( parquet_path, arrow_files ) in staging_files {
276325 metrics:: STAGING_FILES
277326 . with_label_values ( & [ & self . stream_name ] )
278- . set ( files . len ( ) as i64 ) ;
327+ . set ( arrow_files . len ( ) as i64 ) ;
279328
280- for file in & files {
329+ for file in & arrow_files {
281330 let file_size = file. metadata ( ) . unwrap ( ) . len ( ) ;
282331 let file_type = file. extension ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
283332
@@ -286,7 +335,7 @@ impl<'a> Stream<'a> {
286335 . add ( file_size as i64 ) ;
287336 }
288337
289- let record_reader = MergedReverseRecordReader :: try_new ( & files ) ;
338+ let record_reader = MergedReverseRecordReader :: try_new ( & arrow_files ) ;
290339 if record_reader. readers . is_empty ( ) {
291340 continue ;
292341 }
@@ -319,7 +368,7 @@ impl<'a> Stream<'a> {
319368 ) ;
320369 remove_file ( parquet_path) . unwrap ( ) ;
321370 } else {
322- for file in files {
371+ for file in arrow_files {
323372 // warn!("file-\n{file:?}\n");
324373 let file_size = file. metadata ( ) . unwrap ( ) . len ( ) ;
325374 let file_type = file. extension ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
0 commit comments