@@ -34,12 +34,15 @@ use itertools::Itertools;
3434use parquet:: {
3535 arrow:: ArrowWriter ,
3636 basic:: Encoding ,
37- file:: properties:: { WriterProperties , WriterPropertiesBuilder } ,
37+ file:: {
38+ properties:: { WriterProperties , WriterPropertiesBuilder } ,
39+ FOOTER_SIZE ,
40+ } ,
3841 format:: SortingColumn ,
3942 schema:: types:: ColumnPath ,
4043} ;
4144use rand:: distributions:: DistString ;
42- use tracing:: error;
45+ use tracing:: { error, trace } ;
4346
4447use crate :: {
4548 cli:: Options ,
@@ -218,7 +221,10 @@ impl<'a> Stream<'a> {
218221
219222 dir. flatten ( )
220223 . map ( |file| file. path ( ) )
221- . filter ( |file| file. extension ( ) . is_some_and ( |ext| ext. eq ( "parquet" ) ) )
224+ . filter ( |file| {
225+ file. extension ( ) . is_some_and ( |ext| ext. eq ( "parquet" ) )
226+ && std:: fs:: metadata ( file) . is_ok_and ( |meta| meta. len ( ) > FOOTER_SIZE as u64 )
227+ } )
222228 . collect ( )
223229 }
224230
@@ -350,24 +356,32 @@ impl<'a> Stream<'a> {
350356 . build ( ) ;
351357 schemas. push ( merged_schema. clone ( ) ) ;
352358 let schema = Arc :: new ( merged_schema) ;
353- let parquet_file = OpenOptions :: new ( )
359+ let mut part_path = parquet_path. to_owned ( ) ;
360+ part_path. set_extension ( "part" ) ;
361+ let mut part_file = OpenOptions :: new ( )
354362 . create ( true )
355363 . append ( true )
356- . open ( & parquet_path )
364+ . open ( & part_path )
357365 . map_err ( |_| StagingError :: Create ) ?;
358- let mut writer = ArrowWriter :: try_new ( & parquet_file , schema. clone ( ) , Some ( props) ) ?;
366+ let mut writer = ArrowWriter :: try_new ( & mut part_file , schema. clone ( ) , Some ( props) ) ?;
359367 for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
360368 writer. write ( record) ?;
361369 }
362-
363370 writer. close ( ) ?;
364- if parquet_file. metadata ( ) . unwrap ( ) . len ( ) < parquet:: file:: FOOTER_SIZE as u64 {
371+
372+ if part_file. metadata ( ) . unwrap ( ) . len ( ) < parquet:: file:: FOOTER_SIZE as u64 {
365373 error ! (
366374 "Invalid parquet file {:?} detected for stream {}, removing it" ,
367- & parquet_path , & self . stream_name
375+ & part_path , & self . stream_name
368376 ) ;
369- remove_file ( parquet_path ) . unwrap ( ) ;
377+ remove_file ( part_path ) . unwrap ( ) ;
370378 } else {
379+ trace ! ( "Parquet file successfully constructed" ) ;
380+ if let Err ( e) = std:: fs:: rename ( & part_path, & parquet_path) {
381+ error ! (
382+ "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
383+ ) ;
384+ }
371385 for file in arrow_files {
372386 // warn!("file-\n{file:?}\n");
373387 let file_size = file. metadata ( ) . unwrap ( ) . len ( ) ;
0 commit comments