@@ -46,6 +46,8 @@ use async_trait::async_trait;
4646use bytes:: Bytes ;
4747use chrono:: { DateTime , Local , Utc } ;
4848use datafusion:: { datasource:: listing:: ListingTableUrl , execution:: runtime_env:: RuntimeEnvBuilder } ;
49+ use futures:: stream:: FuturesUnordered ;
50+ use futures:: StreamExt ;
4951use once_cell:: sync:: OnceCell ;
5052use relative_path:: RelativePath ;
5153use relative_path:: RelativePathBuf ;
@@ -575,100 +577,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
575577 Ok ( Bytes :: new ( ) )
576578 }
577579
578- // sync only takes care of uploads
579- async fn sync ( & self , shutdown_signal : bool ) -> Result < ( ) , ObjectStorageError > {
580- if !Path :: new ( & CONFIG . staging_dir ( ) ) . exists ( ) {
581- return Ok ( ( ) ) ;
582- }
583-
584- // get all streams
585- let streams = STREAM_INFO . list_streams ( ) ;
586-
587- // start the sync loop for a stream
588- // parallelize this
589- for stream in & streams {
590- let time_partition = STREAM_INFO
591- . get_time_partition ( stream)
592- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
593- let custom_partition = STREAM_INFO
594- . get_custom_partition ( stream)
595- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
596- let dir = StorageDir :: new ( stream) ;
597-
598- // read arrow files on disk
599- // convert them to parquet
600- let schema = convert_disk_files_to_parquet (
601- stream,
602- & dir,
603- time_partition,
604- custom_partition. clone ( ) ,
605- shutdown_signal,
606- )
607- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
608-
609- if let Some ( schema) = schema {
610- let static_schema_flag = STREAM_INFO
611- . get_static_schema_flag ( stream)
612- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
613- if !static_schema_flag {
614- commit_schema_to_storage ( stream, schema) . await ?;
615- }
616- }
617-
618- let parquet_and_schema_files = dir. parquet_and_schema_files ( ) ;
619- for file in parquet_and_schema_files {
620- let filename = file
621- . file_name ( )
622- . expect ( "only parquet files are returned by iterator" )
623- . to_str ( )
624- . expect ( "filename is valid string" ) ;
625-
626- let mut file_date_part = filename. split ( '.' ) . collect :: < Vec < & str > > ( ) [ 0 ] ;
627- file_date_part = file_date_part. split ( '=' ) . collect :: < Vec < & str > > ( ) [ 1 ] ;
628- let compressed_size = file. metadata ( ) . map_or ( 0 , |meta| meta. len ( ) ) ;
629- STORAGE_SIZE
630- . with_label_values ( & [ "data" , stream, "parquet" ] )
631- . add ( compressed_size as i64 ) ;
632- EVENTS_STORAGE_SIZE_DATE
633- . with_label_values ( & [ "data" , stream, "parquet" , file_date_part] )
634- . add ( compressed_size as i64 ) ;
635- LIFETIME_EVENTS_STORAGE_SIZE
636- . with_label_values ( & [ "data" , stream, "parquet" ] )
637- . add ( compressed_size as i64 ) ;
638- let mut file_suffix = str:: replacen ( filename, "." , "/" , 3 ) ;
639-
640- let custom_partition_clone = custom_partition. clone ( ) ;
641- if custom_partition_clone. is_some ( ) {
642- let custom_partition_fields = custom_partition_clone. unwrap ( ) ;
643- let custom_partition_list =
644- custom_partition_fields. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
645- file_suffix =
646- str:: replacen ( filename, "." , "/" , 3 + custom_partition_list. len ( ) ) ;
647- }
648-
649- let stream_relative_path = format ! ( "{stream}/{file_suffix}" ) ;
650-
651- // Try uploading the file, handle potential errors without breaking the loop
652- if let Err ( e) = self . upload_file ( & stream_relative_path, & file) . await {
653- error ! ( "Failed to upload file {}: {:?}" , filename, e) ;
654- continue ; // Skip to the next file
655- }
656-
657- let absolute_path = self
658- . absolute_url ( RelativePath :: from_path ( & stream_relative_path) . unwrap ( ) )
659- . to_string ( ) ;
660- let store = CONFIG . storage ( ) . get_object_store ( ) ;
661- let manifest =
662- catalog:: create_from_parquet_file ( absolute_path. clone ( ) , & file) . unwrap ( ) ;
663- catalog:: update_snapshot ( store, stream, manifest) . await ?;
664-
665- let _ = fs:: remove_file ( file) ;
666- }
667- }
668-
669- Ok ( ( ) )
670- }
671-
672580 async fn get_stream_meta_from_storage (
673581 & self ,
674582 stream_name : & str ,
@@ -846,31 +754,40 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
846754 return Ok ( ( ) ) ;
847755 }
848756
849- // get all streams
850- let streams = STREAM_INFO . list_streams ( ) ;
757+ let mut conversion_tasks = FuturesUnordered :: new ( ) ;
758+ for stream in STREAM_INFO . list_streams ( ) {
759+ conversion_tasks. push ( conversion_for_stream ( stream, shutdown_signal) ) ;
760+ }
851761
852- for stream in & streams {
853- conversion_for_stream ( stream, shutdown_signal) ?;
762+ while let Some ( res) = conversion_tasks. next ( ) . await {
763+ if let Err ( err) = res {
764+ error ! ( "Failed to run conversion task {err:?}" ) ;
765+ return Err ( err) ;
766+ }
854767 }
768+
855769 Ok ( ( ) )
856770 }
857771}
858772
859- fn conversion_for_stream ( stream : & str , shutdown_signal : bool ) -> Result < ( ) , ObjectStorageError > {
773+ async fn conversion_for_stream (
774+ stream : String ,
775+ shutdown_signal : bool ,
776+ ) -> Result < ( ) , ObjectStorageError > {
860777 info ! ( "Starting conversion job for stream- {stream}" ) ;
861778
862779 let time_partition = STREAM_INFO
863- . get_time_partition ( stream)
780+ . get_time_partition ( & stream)
864781 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
865782 let custom_partition = STREAM_INFO
866- . get_custom_partition ( stream)
783+ . get_custom_partition ( & stream)
867784 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
868- let dir = StorageDir :: new ( stream) ;
785+ let dir = StorageDir :: new ( & stream) ;
869786
870787 // read arrow files on disk
871788 // convert them to parquet
872789 let schema = convert_disk_files_to_parquet (
873- stream,
790+ & stream,
874791 & dir,
875792 time_partition,
876793 custom_partition. clone ( ) ,
@@ -886,7 +803,7 @@ fn conversion_for_stream(stream: &str, shutdown_signal: bool) -> Result<(), Obje
886803
887804 if let Some ( schema) = schema {
888805 let static_schema_flag = STREAM_INFO
889- . get_static_schema_flag ( stream)
806+ . get_static_schema_flag ( & stream)
890807 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
891808 if !static_schema_flag {
892809 // schema is dynamic, read from staging and merge if present
0 commit comments