@@ -430,12 +430,12 @@ pub trait ObjectStorage: Sync + 'static {
430430 if !Path :: new ( & CONFIG . staging_dir ( ) ) . exists ( ) {
431431 return Ok ( ( ) ) ;
432432 }
433-
433+
434434 let streams = STREAM_INFO . list_streams ( ) ;
435-
435+
436436 let cache_manager = LocalCacheManager :: global ( ) ;
437437 let mut cache_updates: HashMap < & String , Vec < _ > > = HashMap :: new ( ) ;
438-
438+
439439 for stream in & streams {
440440 let cache_enabled = STREAM_INFO
441441 . get_cache_enabled ( stream)
@@ -455,7 +455,7 @@ pub trait ObjectStorage: Sync + 'static {
455455 shutdown_signal,
456456 )
457457 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
458-
458+
459459 if let Some ( schema) = schema {
460460 let static_schema_flag = STREAM_INFO
461461 . get_static_schema_flag ( stream)
@@ -464,18 +464,18 @@ pub trait ObjectStorage: Sync + 'static {
464464 commit_schema_to_storage ( stream, schema) . await ?;
465465 }
466466 }
467-
467+
468468 let parquet_files = dir. parquet_files ( ) ;
469469 for file in parquet_files {
470470 let filename = file
471471 . file_name ( )
472472 . expect ( "only parquet files are returned by iterator" )
473473 . to_str ( )
474474 . expect ( "filename is valid string" ) ;
475-
475+
476476 // Log the filename being processed
477477 log:: debug!( "Processing file: {}" , filename) ;
478-
478+
479479 let mut file_date_part = filename. split ( '.' ) . collect :: < Vec < & str > > ( ) [ 0 ] ;
480480 file_date_part = file_date_part. split ( '=' ) . collect :: < Vec < & str > > ( ) [ 1 ] ;
481481 let compressed_size = file. metadata ( ) . map_or ( 0 , |meta| meta. len ( ) ) ;
@@ -489,7 +489,7 @@ pub trait ObjectStorage: Sync + 'static {
489489 . with_label_values ( & [ "data" , stream, "parquet" ] )
490490 . add ( compressed_size as i64 ) ;
491491 let mut file_suffix = str:: replacen ( filename, "." , "/" , 3 ) ;
492-
492+
493493 let custom_partition_clone = custom_partition. clone ( ) ;
494494 if custom_partition_clone. is_some ( ) {
495495 let custom_partition_fields = custom_partition_clone. unwrap ( ) ;
@@ -498,15 +498,15 @@ pub trait ObjectStorage: Sync + 'static {
498498 file_suffix =
499499 str:: replacen ( filename, "." , "/" , 3 + custom_partition_list. len ( ) ) ;
500500 }
501-
501+
502502 let stream_relative_path = format ! ( "{stream}/{file_suffix}" ) ;
503-
503+
504504 // Try uploading the file, handle potential errors without breaking the loop
505505 if let Err ( e) = self . upload_file ( & stream_relative_path, & file) . await {
506506 log:: error!( "Failed to upload file {}: {:?}" , filename, e) ;
507507 continue ; // Skip to the next file
508508 }
509-
509+
510510 let absolute_path = self
511511 . absolute_url ( RelativePath :: from_path ( & stream_relative_path) . unwrap ( ) )
512512 . to_string ( ) ;
@@ -524,30 +524,31 @@ pub trait ObjectStorage: Sync + 'static {
524524 }
525525 }
526526 }
527-
527+
528528 // Cache management logic
529529 if let Some ( manager) = cache_manager {
530530 let cache_updates = cache_updates
531531 . into_iter ( )
532532 . map ( |( key, value) | ( key. to_owned ( ) , value) )
533533 . collect_vec ( ) ;
534-
534+
535535 tokio:: spawn ( async move {
536536 for ( stream, files) in cache_updates {
537537 for ( storage_path, file) in files {
538538 if let Err ( e) = manager
539539 . move_to_cache ( & stream, storage_path, file. to_owned ( ) )
540- . await {
540+ . await
541+ {
541542 log:: error!( "Failed to move file to cache: {:?}" , e) ;
542543 }
543544 }
544545 }
545546 } ) ;
546547 }
547-
548+
548549 Ok ( ( ) )
549550 }
550-
551+
551552 // pick a better name
552553 fn get_bucket_name ( & self ) -> String ;
553554}
0 commit comments