@@ -100,38 +100,32 @@ pub trait ObjectStorage: Sync + 'static {
100100 continue ;
101101 }
102102
103- match sync. move_local_to_temp ( ) {
104- Ok ( parquet_size) => {
105- if let Err ( e) = STREAM_INFO . update_stats ( & stream, 0 , parquet_size) {
106- log:: error!( "Couldn't update stream stats. {:?}" , e) ;
107- }
108- }
109- Err ( e) => {
110- log:: error!(
111- "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]" ,
112- sync. dir. data_path. to_string_lossy( ) ,
113- sync. dir. temp_dir. to_string_lossy( ) ,
114- e
115- ) ;
116- continue ;
117- }
103+ if let Err ( e) = sync. move_local_to_temp ( ) {
104+ log:: error!(
105+ "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]" ,
106+ sync. dir. data_path. to_string_lossy( ) ,
107+ sync. dir. temp_dir. to_string_lossy( ) ,
108+ e
109+ ) ;
110+ continue ;
118111 }
119112 }
120113
121114 Ok ( ( ) )
122115 }
123116
124- async fn s3_sync ( & self ) -> Result < ( ) , ObjectStorageError > {
117+ async fn s3_sync ( & self ) -> Result < ( ) , MoveDataError > {
125118 if !Path :: new ( & CONFIG . parseable . local_disk_path ) . exists ( ) {
126119 return Ok ( ( ) ) ;
127120 }
128121
129122 let streams = STREAM_INFO . list_streams ( ) ;
130123
131124 for stream in streams {
125+ // get dir
132126 let dir = StorageDir :: new ( stream. clone ( ) ) ;
133-
134- for file in WalkDir :: new ( dir. temp_dir )
127+ // walk dir, find all .tmp files and convert to parquet
128+ for file in WalkDir :: new ( & dir. temp_dir )
135129 . min_depth ( 1 )
136130 . max_depth ( 1 )
137131 . into_iter ( )
@@ -144,7 +138,55 @@ pub trait ObjectStorage: Sync + 'static {
144138 None => false ,
145139 } ;
146140
147- !is_tmp
141+ is_tmp
142+ } )
143+ {
144+ let record_tmp_file = file. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
145+ let file = File :: open ( & file) . map_err ( |_| MoveDataError :: Open ) ?;
146+ let reader = StreamReader :: try_new ( file, None ) ?;
147+ let schema = reader. schema ( ) ;
148+ let records = reader. filter_map ( |record| match record {
149+ Ok ( record) => Some ( record) ,
150+ Err ( e) => {
151+ log:: warn!( "error when reading from arrow stream {:?}" , e) ;
152+ None
153+ }
154+ } ) ;
155+
156+ let parquet_path = dir. temp_dir . join (
157+ record_tmp_file
158+ . strip_suffix ( ".tmp" )
159+ . expect ( "file has a .tmp extention" ) ,
160+ ) ;
161+ let parquet_file =
162+ fs:: File :: create ( & parquet_path) . map_err ( |_| MoveDataError :: Create ) ?;
163+ let props = WriterProperties :: builder ( ) . build ( ) ;
164+ let mut writer = ArrowWriter :: try_new ( parquet_file, schema, Some ( props) ) ?;
165+
166+ for ref record in records {
167+ writer. write ( record) ?;
168+ }
169+
170+ writer. close ( ) ?;
171+
172+ fs:: remove_file ( dir. temp_dir . join ( record_tmp_file) )
173+ . map_err ( |_| MoveDataError :: Delete ) ?;
174+ }
175+
176+ for file in WalkDir :: new ( dir. temp_dir )
177+ . min_depth ( 1 )
178+ . max_depth ( 1 )
179+ . into_iter ( )
180+ . filter_map ( |file| file. ok ( ) )
181+ . map ( |file| file. path ( ) . to_path_buf ( ) )
182+ . filter ( |file| file. is_file ( ) )
183+ . filter ( |file| {
184+ let is_parquet = match file. extension ( ) {
185+ Some ( ext) => ext. eq_ignore_ascii_case ( "parquet" ) ,
186+ None => false ,
187+ } ;
188+
189+ is_parquet
148190 } )
149191 {
150192 let filename = file. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
@@ -195,40 +237,11 @@ impl StorageDir {
195237 fs:: create_dir_all ( & self . temp_dir )
196238 }
197239
198- pub fn move_local_to_temp ( & self , filename : String ) -> Result < u64 , MoveDataError > {
199- let record_tmp_file_path = self . temp_dir . join ( filename. clone ( ) + ".tmp" ) ;
200- fs:: rename ( self . data_path . join ( "data.records" ) , & record_tmp_file_path)
201- . map_err ( |_| MoveDataError :: Rename ) ?;
240+ pub fn move_local_to_temp ( & self , filename : String ) -> io:: Result < ( ) > {
241+ let record_tmp_file_path = self . temp_dir . join ( filename + ".tmp" ) ;
242+ fs:: rename ( self . data_path . join ( "data.records" ) , & record_tmp_file_path) ?;
202243 event:: STREAM_WRITERS :: unset_entry ( & self . stream_name ) . unwrap ( ) ;
203- let file = File :: open ( & record_tmp_file_path) . map_err ( |_| MoveDataError :: Open ) ?;
204- let reader = StreamReader :: try_new ( file, None ) ?;
205- let schema = reader. schema ( ) ;
206- let records = reader. filter_map ( |record| match record {
207- Ok ( record) => Some ( record) ,
208- Err ( e) => {
209- log:: warn!( "error when reading from arrow stream {:?}" , e) ;
210- None
211- }
212- } ) ;
213-
214- let parquet_path = self . temp_dir . join ( filename) ;
215- let parquet_file = fs:: File :: create ( & parquet_path) . map_err ( |_| MoveDataError :: Create ) ?;
216- let props = WriterProperties :: builder ( ) . build ( ) ;
217- let mut writer = ArrowWriter :: try_new ( parquet_file, schema, Some ( props) ) ?;
218-
219- for ref record in records {
220- writer. write ( record) ?;
221- }
222-
223- writer. close ( ) ?;
224-
225- fs:: remove_file ( record_tmp_file_path) . map_err ( |_| MoveDataError :: Delete ) ?;
226-
227- let compressed_size = fs:: metadata ( parquet_path)
228- . map_err ( |_| MoveDataError :: Metadata ) ?
229- . len ( ) ;
230-
231- Ok ( compressed_size)
244+ Ok ( ( ) )
232245 }
233246
234247 pub fn local_data_exists ( & self ) -> bool {
@@ -238,20 +251,18 @@ impl StorageDir {
238251
239252#[ derive( Debug , thiserror:: Error ) ]
240253pub enum MoveDataError {
241- #[ error( "Failed to rename file" ) ]
242- Rename ,
243254 #[ error( "Unable to Open file after moving" ) ]
244255 Open ,
245256 #[ error( "Unable to create recordbatch stream" ) ]
246257 Arrow ( #[ from] ArrowError ) ,
247258 #[ error( "Could not generate parquet file" ) ]
248259 Parquet ( #[ from] ParquetError ) ,
260+ #[ error( "Object Storage Error {0}" ) ]
261+ ObjectStorag ( #[ from] ObjectStorageError ) ,
249262 #[ error( "Could not generate parquet file" ) ]
250263 Create ,
251264 #[ error( "Could not delete temp arrow file" ) ]
252265 Delete ,
253- #[ error( "Could not fetch metadata of moved parquet file" ) ]
254- Metadata ,
255266}
256267
257268struct StorageSync {
@@ -266,7 +277,7 @@ impl StorageSync {
266277 Self { dir, time }
267278 }
268279
269- fn move_local_to_temp ( & self ) -> Result < u64 , MoveDataError > {
280+ fn move_local_to_temp ( & self ) -> io :: Result < ( ) > {
270281 let time = self . time - Duration :: minutes ( OBJECT_STORE_DATA_GRANULARITY as i64 ) ;
271282 let uri = utils:: date_to_prefix ( time. date ( ) )
272283 + & utils:: hour_to_prefix ( time. hour ( ) )
0 commit comments