@@ -32,7 +32,6 @@ use arrow_schema::{Field, Fields, Schema};
3232use chrono:: { NaiveDateTime , Timelike , Utc } ;
3333use derive_more:: { Deref , DerefMut } ;
3434use itertools:: Itertools ;
35- use once_cell:: sync:: Lazy ;
3635use parquet:: {
3736 arrow:: ArrowWriter ,
3837 basic:: Encoding ,
@@ -41,7 +40,6 @@ use parquet::{
4140 schema:: types:: ColumnPath ,
4241} ;
4342use rand:: distributions:: DistString ;
44- use regex:: Regex ;
4543use relative_path:: RelativePathBuf ;
4644use tokio:: task:: JoinSet ;
4745use tracing:: { error, info, trace, warn} ;
@@ -72,41 +70,6 @@ use super::{
7270// ~16K rows is default in-memory limit for each recordbatch
7371const MAX_RECORD_BATCH_SIZE : usize = 16384 ;
7472
75- /// Regex pattern for parsing arrow file names.
76- ///
77- /// # Format
78- /// The expected format is: `<schema_key>.<front_part>.data.arrows`
79- /// where:
80- /// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value
81- /// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2")
82- /// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition
83- /// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76")
84- ///
85- /// # Limitations
86- /// - Partition keys and values must only contain alphanumeric characters
87- /// - Special characters in partition values will cause the pattern to fail in capturing
88- ///
89- /// # Examples
90- /// Valid: "key1=value1,key2=value2"
91- /// Invalid: "key1=special!value,key2=special#value"
92- static ARROWS_NAME_STRUCTURE : Lazy < Regex > = Lazy :: new ( || {
93- Regex :: new ( r"^[a-zA-Z0-9&=]+\.(?P<front>\S+)\.data\.arrows$" ) . expect ( "Validated regex" )
94- } ) ;
95-
96- /// Returns the filename for parquet if provided arrows file path is valid as per our expectation
97- fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
98- let filename = path. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
99- let filename = ARROWS_NAME_STRUCTURE
100- . captures ( filename)
101- . and_then ( |c| c. get ( 1 ) ) ?
102- . as_str ( ) ;
103- let filename_with_random_number = format ! ( "{filename}.data.{random_string}.parquet" ) ;
104- let mut parquet_path = path. to_owned ( ) ;
105- parquet_path. set_file_name ( filename_with_random_number) ;
106-
107- Some ( parquet_path)
108- }
109-
11073#[ derive( Debug , thiserror:: Error ) ]
11174#[ error( "Stream not found: {0}" ) ]
11275pub struct StreamNotFound ( pub String ) ;
@@ -228,12 +191,7 @@ impl Stream {
228191 let paths = dir
229192 . flatten ( )
230193 . map ( |file| file. path ( ) )
231- . filter ( |path| {
232- let Some ( file_name) = path. file_name ( ) . and_then ( |f| f. to_str ( ) ) else {
233- return false ;
234- } ;
235- ARROWS_NAME_STRUCTURE . is_match ( file_name)
236- } )
194+ . filter ( |file| file. extension ( ) . is_some_and ( |ext| ext. eq ( "arrows" ) ) )
237195 . sorted_by_key ( |f| f. metadata ( ) . unwrap ( ) . modified ( ) . unwrap ( ) )
238196 . collect ( ) ;
239197
@@ -276,13 +234,12 @@ impl Stream {
276234 & arrow_file_path, self . stream_name
277235 ) ;
278236 remove_file ( & arrow_file_path) . unwrap ( ) ;
279- } else if let Some ( key) = arrow_path_to_parquet ( & arrow_file_path, & random_string) {
237+ } else {
238+ let key = Self :: arrow_path_to_parquet ( & arrow_file_path, & random_string) ;
280239 grouped_arrow_file
281240 . entry ( key)
282241 . or_default ( )
283242 . push ( arrow_file_path) ;
284- } else {
285- warn ! ( "Unexpected arrows file: {}" , arrow_file_path. display( ) ) ;
286243 }
287244 }
288245 grouped_arrow_file
@@ -341,6 +298,17 @@ impl Stream {
341298 }
342299 }
343300
301+ fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> PathBuf {
302+ let filename = path. file_stem ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
303+ let ( _, filename) = filename. split_once ( '.' ) . unwrap ( ) ;
304+ assert ! ( filename. contains( '.' ) , "contains the delim `.`" ) ;
305+ let filename_with_random_number = format ! ( "{filename}.{random_string}.arrows" ) ;
306+ let mut parquet_path = path. to_owned ( ) ;
307+ parquet_path. set_file_name ( filename_with_random_number) ;
308+ parquet_path. set_extension ( "parquet" ) ;
309+ parquet_path
310+ }
311+
344312 /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
345313 pub fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
346314 info ! (
0 commit comments