@@ -67,6 +67,18 @@ use super::{
6767 LogStream , ARROW_FILE_EXTENSION ,
6868} ;
6969
70+ /// Returns the filename for parquet if provided arrows file path is valid as per our expectation
71+ fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
72+ let filename = path. file_stem ( ) ?. to_str ( ) ?;
73+ let ( _, front) = filename. split_once ( '.' ) ?;
74+ assert ! ( front. contains( '.' ) , "contains the delim `.`" ) ;
75+ let filename_with_random_number = format ! ( "{front}.{random_string}.parquet" ) ;
76+ let mut parquet_path = path. to_owned ( ) ;
77+ parquet_path. set_file_name ( filename_with_random_number) ;
78+
79+ Some ( parquet_path)
80+ }
81+
7082#[ derive( Debug , thiserror:: Error ) ]
7183#[ error( "Stream not found: {0}" ) ]
7284pub struct StreamNotFound ( pub String ) ;
@@ -182,7 +194,10 @@ impl Stream {
182194 let paths = dir
183195 . flatten ( )
184196 . map ( |file| file. path ( ) )
185- . filter ( |file| file. extension ( ) . is_some_and ( |ext| ext. eq ( "arrows" ) ) )
197+ . filter ( |file| {
198+ file. extension ( )
199+ . is_some_and ( |ext| ext. eq ( ARROW_FILE_EXTENSION ) )
200+ } )
186201 . sorted_by_key ( |f| f. metadata ( ) . unwrap ( ) . modified ( ) . unwrap ( ) )
187202 . collect ( ) ;
188203
@@ -225,12 +240,13 @@ impl Stream {
225240 & arrow_file_path, self . stream_name
226241 ) ;
227242 remove_file ( & arrow_file_path) . unwrap ( ) ;
228- } else {
229- let key = Self :: arrow_path_to_parquet ( & arrow_file_path, & random_string) ;
243+ } else if let Some ( key) = arrow_path_to_parquet ( & arrow_file_path, & random_string) {
230244 grouped_arrow_file
231245 . entry ( key)
232246 . or_default ( )
233247 . push ( arrow_file_path) ;
248+ } else {
249+ warn ! ( "Unexpected arrows file: {}" , arrow_file_path. display( ) ) ;
234250 }
235251 }
236252 grouped_arrow_file
@@ -289,17 +305,6 @@ impl Stream {
289305 }
290306 }
291307
292- fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> PathBuf {
293- let filename = path. file_stem ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
294- let ( _, filename) = filename. split_once ( '.' ) . unwrap ( ) ;
295- assert ! ( filename. contains( '.' ) , "contains the delim `.`" ) ;
296- let filename_with_random_number = format ! ( "{filename}.{random_string}.arrows" ) ;
297- let mut parquet_path = path. to_owned ( ) ;
298- parquet_path. set_file_name ( filename_with_random_number) ;
299- parquet_path. set_extension ( "parquet" ) ;
300- parquet_path
301- }
302-
303308 /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
304309 pub fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
305310 info ! (
@@ -831,7 +836,7 @@ impl Streams {
831836
832837#[ cfg( test) ]
833838mod tests {
834- use std:: { sync:: Barrier , thread:: spawn, time:: Duration } ;
839+ use std:: { io :: Write , sync:: Barrier , thread:: spawn, time:: Duration } ;
835840
836841 use arrow_array:: { Int32Array , StringArray , TimestampMillisecondArray } ;
837842 use arrow_schema:: { DataType , Field , TimeUnit } ;
@@ -1207,6 +1212,57 @@ mod tests {
12071212 assert_eq ! ( staging. arrow_files( ) . len( ) , 1 ) ;
12081213 }
12091214
1215+ fn create_test_file ( dir : & TempDir , filename : & str ) -> PathBuf {
1216+ let file_path = dir. path ( ) . join ( filename) ;
1217+ let mut file = File :: create ( & file_path) . expect ( "Failed to create test file" ) ;
1218+ // Write some dummy content
1219+ file. write_all ( b"test content" )
1220+ . expect ( "Failed to write to test file" ) ;
1221+ file_path
1222+ }
1223+
1224+ #[ test]
1225+ fn test_valid_arrow_path_conversion ( ) {
1226+ let temp_dir = TempDir :: new ( ) . expect ( "Failed to create temp dir" ) ;
1227+ let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows" ;
1228+ let file_path = create_test_file ( & temp_dir, filename) ;
1229+ let random_string = "random123" ;
1230+
1231+ let result = arrow_path_to_parquet ( & file_path, random_string) ;
1232+
1233+ assert ! ( result. is_some( ) ) ;
1234+ let parquet_path = result. unwrap ( ) ;
1235+ assert_eq ! (
1236+ parquet_path. file_name( ) . unwrap( ) . to_str( ) . unwrap( ) ,
1237+ "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet"
1238+ ) ;
1239+ }
1240+
1241+ #[ test]
1242+ fn test_complex_path ( ) {
1243+ let temp_dir = TempDir :: new ( ) . expect ( "Failed to create temp dir" ) ;
1244+ let nested_dir = temp_dir. path ( ) . join ( "nested/directory/structure" ) ;
1245+ std:: fs:: create_dir_all ( & nested_dir) . expect ( "Failed to create nested directories" ) ;
1246+
1247+ let filename = "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows" ;
1248+ let file_path = nested_dir. join ( filename) ;
1249+
1250+ let mut file = File :: create ( & file_path) . expect ( "Failed to create test file" ) ;
1251+ file. write_all ( b"test content" )
1252+ . expect ( "Failed to write to test file" ) ;
1253+
1254+ let random_string = "random456" ;
1255+
1256+ let result = arrow_path_to_parquet ( & file_path, random_string) ;
1257+
1258+ assert ! ( result. is_some( ) ) ;
1259+ let parquet_path = result. unwrap ( ) ;
1260+ assert_eq ! (
1261+ parquet_path. file_name( ) . unwrap( ) . to_str( ) . unwrap( ) ,
1262+ "date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet"
1263+ ) ;
1264+ }
1265+
12101266 #[ test]
12111267 fn get_or_create_returns_existing_stream ( ) {
12121268 let streams = Streams :: default ( ) ;
0 commit comments