@@ -28,9 +28,8 @@ use std::{
2828} ;
2929
3030use arrow_array:: RecordBatch ;
31- use arrow_ipc:: writer:: StreamWriter ;
3231use arrow_schema:: { Field , Fields , Schema } ;
33- use chrono:: { NaiveDateTime , Timelike } ;
32+ use chrono:: { NaiveDateTime , Timelike , Utc } ;
3433use derive_more:: { Deref , DerefMut } ;
3534use itertools:: Itertools ;
3635use parquet:: {
@@ -55,14 +54,14 @@ use crate::{
5554 metrics,
5655 option:: Mode ,
5756 storage:: { object_storage:: to_bytes, retention:: Retention , StreamType } ,
58- utils:: time:: Minute ,
57+ utils:: time:: { Minute , TimeRange } ,
5958 LOCK_EXPECT , OBJECT_STORE_DATA_GRANULARITY ,
6059} ;
6160
6261use super :: {
6362 staging:: {
6463 reader:: { MergedRecordReader , MergedReverseRecordReader } ,
65- writer:: Writer ,
64+ writer:: { DiskWriter , Writer } ,
6665 StagingError ,
6766 } ,
6867 LogStream , ARROW_FILE_EXTENSION ,
@@ -123,29 +122,26 @@ impl Stream {
123122 ) -> Result < ( ) , StagingError > {
124123 let mut guard = self . writer . lock ( ) . unwrap ( ) ;
125124 if self . options . mode != Mode :: Query || stream_type == StreamType :: Internal {
126- match guard. disk . get_mut ( schema_key) {
125+ let filename =
126+ self . filename_by_partition ( schema_key, parsed_timestamp, custom_partition_values) ;
127+ match guard. disk . get_mut ( & filename) {
127128 Some ( writer) => {
128129 writer. write ( record) ?;
129130 }
130131 None => {
131132 // entry is not present thus we create it
132- let file_path = self . path_by_current_time (
133- schema_key,
134- parsed_timestamp,
135- custom_partition_values,
136- ) ;
137133 std:: fs:: create_dir_all ( & self . data_path ) ?;
138134
139- let file = OpenOptions :: new ( )
140- . create ( true )
141- . append ( true )
142- . open ( & file_path ) ? ;
143-
144- let mut writer = StreamWriter :: try_new ( file , & record. schema ( ) )
135+ let range = TimeRange :: granularity_range (
136+ parsed_timestamp . and_local_timezone ( Utc ) . unwrap ( ) ,
137+ OBJECT_STORE_DATA_GRANULARITY ,
138+ ) ;
139+ let file_path = self . data_path . join ( & filename ) ;
140+ let mut writer = DiskWriter :: try_new ( file_path , & record. schema ( ) , range )
145141 . expect ( "File and RecordBatch both are checked" ) ;
146142
147143 writer. write ( record) ?;
148- guard. disk . insert ( schema_key . to_owned ( ) , writer) ;
144+ guard. disk . insert ( filename , writer) ;
149145 }
150146 } ;
151147 }
@@ -155,17 +151,17 @@ impl Stream {
155151 Ok ( ( ) )
156152 }
157153
158- pub fn path_by_current_time (
154+ pub fn filename_by_partition (
159155 & self ,
160156 stream_hash : & str ,
161157 parsed_timestamp : NaiveDateTime ,
162158 custom_partition_values : & HashMap < String , String > ,
163- ) -> PathBuf {
159+ ) -> String {
164160 let mut hostname = hostname:: get ( ) . unwrap ( ) . into_string ( ) . unwrap ( ) ;
165161 if let Some ( id) = & self . ingestor_id {
166162 hostname. push_str ( id) ;
167163 }
168- let filename = format ! (
164+ format ! (
169165 "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}" ,
170166 parsed_timestamp. date( ) ,
171167 parsed_timestamp. hour( ) ,
@@ -175,8 +171,7 @@ impl Stream {
175171 . sorted_by_key( |v| v. 0 )
176172 . map( |( key, value) | format!( "{key}={value}." ) )
177173 . join( "" )
178- ) ;
179- self . data_path . join ( filename)
174+ )
180175 }
181176
182177 pub fn arrow_files ( & self ) -> Vec < PathBuf > {
@@ -366,19 +361,12 @@ impl Stream {
366361 self . writer . lock ( ) . unwrap ( ) . mem . clear ( ) ;
367362 }
368363
369- pub fn flush ( & self ) {
370- let mut disk_writers = {
371- let mut writer = self . writer . lock ( ) . unwrap ( ) ;
372- // Flush memory
373- writer. mem . clear ( ) ;
374- // Take schema -> disk writer mapping
375- std:: mem:: take ( & mut writer. disk )
376- } ;
377-
378- // Flush disk
379- for writer in disk_writers. values_mut ( ) {
380- _ = writer. finish ( ) ;
381- }
364+ pub fn flush ( & self , forced : bool ) {
365+ let mut writer = self . writer . lock ( ) . unwrap ( ) ;
366+ // Flush memory
367+ writer. mem . clear ( ) ;
368+ // Drop schema -> disk writer mapping, triggers flush to disk
369+ writer. disk . retain ( |_, w| !forced && w. is_current ( ) ) ;
382370 }
383371
384372 fn parquet_writer_props (
@@ -734,7 +722,7 @@ impl Stream {
734722 /// First flushes arrows onto disk and then converts the arrow into parquet
735723 pub fn flush_and_convert ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
736724 let start_flush = Instant :: now ( ) ;
737- self . flush ( ) ;
725+ self . flush ( shutdown_signal ) ;
738726 trace ! (
739727 "Flushing stream ({}) took: {}s" ,
740728 self . stream_name,
@@ -958,18 +946,18 @@ mod tests {
958946 None ,
959947 ) ;
960948
961- let expected_path = staging . data_path . join ( format ! (
949+ let expected = format ! (
962950 "{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}" ,
963951 parsed_timestamp. date( ) ,
964952 parsed_timestamp. hour( ) ,
965953 Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
966954 hostname:: get( ) . unwrap( ) . into_string( ) . unwrap( )
967- ) ) ;
955+ ) ;
968956
969- let generated_path =
970- staging. path_by_current_time ( stream_hash, parsed_timestamp, & custom_partition_values) ;
957+ let generated =
958+ staging. filename_by_partition ( stream_hash, parsed_timestamp, & custom_partition_values) ;
971959
972- assert_eq ! ( generated_path , expected_path ) ;
960+ assert_eq ! ( generated , expected ) ;
973961 }
974962
975963 #[ test]
@@ -992,18 +980,18 @@ mod tests {
992980 None ,
993981 ) ;
994982
995- let expected_path = staging . data_path . join ( format ! (
983+ let expected = format ! (
996984 "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}" ,
997985 parsed_timestamp. date( ) ,
998986 parsed_timestamp. hour( ) ,
999987 Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
1000988 hostname:: get( ) . unwrap( ) . into_string( ) . unwrap( )
1001- ) ) ;
989+ ) ;
1002990
1003- let generated_path =
1004- staging. path_by_current_time ( stream_hash, parsed_timestamp, & custom_partition_values) ;
991+ let generated =
992+ staging. filename_by_partition ( stream_hash, parsed_timestamp, & custom_partition_values) ;
1005993
1006- assert_eq ! ( generated_path , expected_path ) ;
994+ assert_eq ! ( generated , expected ) ;
1007995 }
1008996
1009997 #[ test]
@@ -1059,7 +1047,7 @@ mod tests {
10591047 StreamType :: UserDefined ,
10601048 )
10611049 . unwrap ( ) ;
1062- staging. flush ( ) ;
1050+ staging. flush ( true ) ;
10631051 }
10641052
10651053 #[ test]
0 commit comments