@@ -67,12 +67,16 @@ use super::{
6767} ;
6868
6969/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
70- fn arrow_path_to_parquet ( staging_path : & Path , path : & Path , random_string : & str ) -> Option < PathBuf > {
70+ fn arrow_path_to_parquet (
71+ stream_staging_path : & Path ,
72+ path : & Path ,
73+ random_string : & str ,
74+ ) -> Option < PathBuf > {
7175 let filename = path. file_stem ( ) ?. to_str ( ) ?;
7276 let ( _, front) = filename. split_once ( '.' ) ?;
7377 assert ! ( front. contains( '.' ) , "contains the delim `.`" ) ;
7478 let filename_with_random_number = format ! ( "{front}.{random_string}.parquet" ) ;
75- let mut parquet_path = staging_path . to_owned ( ) ;
79+ let mut parquet_path = stream_staging_path . to_owned ( ) ;
7680 parquet_path. push ( filename_with_random_number) ;
7781 Some ( parquet_path)
7882}
@@ -345,9 +349,10 @@ impl Stream {
345349 arrow_files. retain ( |path| {
346350 let creation = path
347351 . metadata ( )
348- . expect ( "Arrow file should exist on disk" )
349- . created ( )
350- . expect ( "Creation time should be accessible" ) ;
352+ . ok ( )
353+ . and_then ( |meta| meta. created ( ) . or_else ( |_| meta. modified ( ) ) . ok ( ) )
354+ . expect ( "Arrow file should have a valid creation or modified time" ) ;
355+
351356 // Compare if creation time is actually from previous minute
352357 minute_from_system_time ( creation) < minute_from_system_time ( exclude)
353358 } ) ;
@@ -594,7 +599,7 @@ impl Stream {
594599 . values ( )
595600 . map ( |v| {
596601 v. iter ( )
597- . map ( |file| file. metadata ( ) . unwrap ( ) . len ( ) )
602+ . filter_map ( |file| file. metadata ( ) . ok ( ) . map ( |meta| meta . len ( ) ) )
598603 . sum :: < u64 > ( )
599604 } )
600605 . sum :: < u64 > ( ) ;
@@ -624,92 +629,129 @@ impl Stream {
624629 return Ok ( None ) ;
625630 }
626631
627- //find sum of arrow files in staging directory for a stream
628632 self . update_staging_metrics ( & staging_files) ;
629633
630- // warn!("staging files-\n{staging_files:?}\n");
631634 for ( parquet_path, arrow_files) in staging_files {
632635 let record_reader = MergedReverseRecordReader :: try_new ( & arrow_files) ;
633636 if record_reader. readers . is_empty ( ) {
634637 continue ;
635638 }
636639 let merged_schema = record_reader. merged_schema ( ) ;
637-
638640 let props = self . parquet_writer_props ( & merged_schema, time_partition, custom_partition) ;
639641 schemas. push ( merged_schema. clone ( ) ) ;
640642 let schema = Arc :: new ( merged_schema) ;
641- let mut part_path = parquet_path. to_owned ( ) ;
642- part_path. set_extension ( "part" ) ;
643- let mut part_file = OpenOptions :: new ( )
644- . create ( true )
645- . append ( true )
646- . open ( & part_path)
647- . map_err ( |_| StagingError :: Create ) ?;
648- let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props) ) ?;
649- for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
650- writer. write ( record) ?;
651- }
652- writer. close ( ) ?;
653643
654- if part_file . metadata ( ) . expect ( "File was just created" ) . len ( )
655- < parquet :: file :: FOOTER_SIZE as u64
656- {
657- error ! (
658- "Invalid parquet file {part_path:?} detected for stream {}, removing it" ,
659- & self . stream_name
660- ) ;
661- remove_file ( part_path ) . expect ( "File should be removable if it is invalid" ) ;
644+ let part_path = parquet_path . with_extension ( "part" ) ;
645+ if ! self . write_parquet_part_file (
646+ & part_path ,
647+ record_reader ,
648+ & schema ,
649+ & props ,
650+ time_partition ,
651+ ) ? {
662652 continue ;
663653 }
664- trace ! ( "Parquet file successfully constructed" ) ;
665654
666- if let Err ( e) = std :: fs :: rename ( & part_path, & parquet_path) {
655+ if let Err ( e) = self . finalize_parquet_file ( & part_path, & parquet_path) {
667656 error ! ( "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" ) ;
668657 } else {
669- // delete the files that were grouped to create parquet file
670- for ( i, file) in arrow_files. iter ( ) . enumerate ( ) {
671- match file. metadata ( ) {
672- Ok ( meta) => {
673- let file_size = meta. len ( ) ;
674- match remove_file ( file) {
675- Ok ( _) => {
676- metrics:: STORAGE_SIZE
677- . with_label_values ( & [
678- "staging" ,
679- & self . stream_name ,
680- ARROW_FILE_EXTENSION ,
681- ] )
682- . sub ( file_size as i64 ) ;
683- }
684- Err ( e) => {
685- warn ! ( "Failed to delete file {}: {e}" , file. display( ) ) ;
686- }
687- }
658+ self . cleanup_arrow_files_and_dir ( & arrow_files) ;
659+ }
660+ }
661+
662+ if schemas. is_empty ( ) {
663+ return Ok ( None ) ;
664+ }
665+
666+ Ok ( Some ( Schema :: try_merge ( schemas) . unwrap ( ) ) )
667+ }
668+
669+ fn write_parquet_part_file (
670+ & self ,
671+ part_path : & Path ,
672+ record_reader : MergedReverseRecordReader ,
673+ schema : & Arc < Schema > ,
674+ props : & WriterProperties ,
675+ time_partition : Option < & String > ,
676+ ) -> Result < bool , StagingError > {
677+ let mut part_file = OpenOptions :: new ( )
678+ . create ( true )
679+ . append ( true )
680+ . open ( part_path)
681+ . map_err ( |_| StagingError :: Create ) ?;
682+ let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props. clone ( ) ) ) ?;
683+ for ref record in record_reader. merged_iter ( schema. clone ( ) , time_partition. cloned ( ) ) {
684+ writer. write ( record) ?;
685+ }
686+ writer. close ( ) ?;
687+
688+ if part_file. metadata ( ) . expect ( "File was just created" ) . len ( )
689+ < parquet:: file:: FOOTER_SIZE as u64
690+ {
691+ error ! (
692+ "Invalid parquet file {part_path:?} detected for stream {}, removing it" ,
693+ & self . stream_name
694+ ) ;
695+ remove_file ( part_path) . expect ( "File should be removable if it is invalid" ) ;
696+ return Ok ( false ) ;
697+ }
698+ trace ! ( "Parquet file successfully constructed" ) ;
699+ Ok ( true )
700+ }
701+
702+ fn finalize_parquet_file ( & self , part_path : & Path , parquet_path : & Path ) -> std:: io:: Result < ( ) > {
703+ std:: fs:: rename ( part_path, parquet_path)
704+ }
705+
706+ fn cleanup_arrow_files_and_dir ( & self , arrow_files : & [ PathBuf ] ) {
707+ for ( i, file) in arrow_files. iter ( ) . enumerate ( ) {
708+ match file. metadata ( ) {
709+ Ok ( meta) => {
710+ let file_size = meta. len ( ) ;
711+ match remove_file ( file) {
712+ Ok ( _) => {
713+ metrics:: STORAGE_SIZE
714+ . with_label_values ( & [
715+ "staging" ,
716+ & self . stream_name ,
717+ ARROW_FILE_EXTENSION ,
718+ ] )
719+ . sub ( file_size as i64 ) ;
688720 }
689- Err ( err ) => {
690- warn ! ( "File ({}) not found; Error = {err }" , file. display( ) ) ;
721+ Err ( e ) => {
722+ warn ! ( "Failed to delete file {}: {e }" , file. display( ) ) ;
691723 }
692724 }
725+ }
726+ Err ( err) => {
727+ warn ! ( "File ({}) not found; Error = {err}" , file. display( ) ) ;
728+ }
729+ }
693730
694- // After deleting the last file, try to remove the inprocess directory
695- if i == arrow_files. len ( ) - 1 {
696- if let Some ( parent_dir) = file. parent ( ) {
697- if let Err ( err) = fs:: remove_dir ( parent_dir) {
698- warn ! (
699- "Failed to remove inprocess directory {}: {err}" ,
700- parent_dir. display( )
701- ) ;
731+ // After deleting the last file, try to remove the inprocess directory if empty
732+ if i == arrow_files. len ( ) - 1 {
733+ if let Some ( parent_dir) = file. parent ( ) {
734+ match fs:: read_dir ( parent_dir) {
735+ Ok ( mut entries) => {
736+ if entries. next ( ) . is_none ( ) {
737+ if let Err ( err) = fs:: remove_dir ( parent_dir) {
738+ warn ! (
739+ "Failed to remove inprocess directory {}: {err}" ,
740+ parent_dir. display( )
741+ ) ;
742+ }
702743 }
703744 }
745+ Err ( err) => {
746+ warn ! (
747+ "Failed to read inprocess directory {}: {err}" ,
748+ parent_dir. display( )
749+ ) ;
750+ }
704751 }
705752 }
706753 }
707754 }
708- if schemas. is_empty ( ) {
709- return Ok ( None ) ;
710- }
711-
712- Ok ( Some ( Schema :: try_merge ( schemas) . unwrap ( ) ) )
713755 }
714756
715757 pub fn updated_schema ( & self , current_schema : Schema ) -> Schema {
0 commit comments