@@ -47,6 +47,8 @@ use async_trait::async_trait;
4747use bytes:: Bytes ;
4848use chrono:: { DateTime , Local , Utc } ;
4949use datafusion:: { datasource:: listing:: ListingTableUrl , execution:: runtime_env:: RuntimeEnvBuilder } ;
50+ use futures:: stream:: FuturesUnordered ;
51+ use futures:: StreamExt ;
5052use once_cell:: sync:: OnceCell ;
5153use relative_path:: RelativePath ;
5254use relative_path:: RelativePathBuf ;
@@ -573,100 +575,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
573575 Ok ( Bytes :: new ( ) )
574576 }
575577
576- // sync only takes care of uploads
577- async fn sync ( & self , shutdown_signal : bool ) -> Result < ( ) , ObjectStorageError > {
578- if !Path :: new ( & CONFIG . staging_dir ( ) ) . exists ( ) {
579- return Ok ( ( ) ) ;
580- }
581-
582- // get all streams
583- let streams = STREAM_INFO . list_streams ( ) ;
584-
585- // start the sync loop for a stream
586- // parallelize this
587- for stream in & streams {
588- let time_partition = STREAM_INFO
589- . get_time_partition ( stream)
590- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
591- let custom_partition = STREAM_INFO
592- . get_custom_partition ( stream)
593- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
594- let dir = StorageDir :: new ( stream) ;
595-
596- // read arrow files on disk
597- // convert them to parquet
598- let schema = convert_disk_files_to_parquet (
599- stream,
600- & dir,
601- time_partition,
602- custom_partition. clone ( ) ,
603- shutdown_signal,
604- )
605- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
606-
607- if let Some ( schema) = schema {
608- let static_schema_flag = STREAM_INFO
609- . get_static_schema_flag ( stream)
610- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
611- if !static_schema_flag {
612- commit_schema_to_storage ( stream, schema) . await ?;
613- }
614- }
615-
616- let parquet_and_schema_files = dir. parquet_and_schema_files ( ) ;
617- for file in parquet_and_schema_files {
618- let filename = file
619- . file_name ( )
620- . expect ( "only parquet files are returned by iterator" )
621- . to_str ( )
622- . expect ( "filename is valid string" ) ;
623-
624- let mut file_date_part = filename. split ( '.' ) . collect :: < Vec < & str > > ( ) [ 0 ] ;
625- file_date_part = file_date_part. split ( '=' ) . collect :: < Vec < & str > > ( ) [ 1 ] ;
626- let compressed_size = file. metadata ( ) . map_or ( 0 , |meta| meta. len ( ) ) ;
627- STORAGE_SIZE
628- . with_label_values ( & [ "data" , stream, "parquet" ] )
629- . add ( compressed_size as i64 ) ;
630- EVENTS_STORAGE_SIZE_DATE
631- . with_label_values ( & [ "data" , stream, "parquet" , file_date_part] )
632- . add ( compressed_size as i64 ) ;
633- LIFETIME_EVENTS_STORAGE_SIZE
634- . with_label_values ( & [ "data" , stream, "parquet" ] )
635- . add ( compressed_size as i64 ) ;
636- let mut file_suffix = str:: replacen ( filename, "." , "/" , 3 ) ;
637-
638- let custom_partition_clone = custom_partition. clone ( ) ;
639- if custom_partition_clone. is_some ( ) {
640- let custom_partition_fields = custom_partition_clone. unwrap ( ) ;
641- let custom_partition_list =
642- custom_partition_fields. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
643- file_suffix =
644- str:: replacen ( filename, "." , "/" , 3 + custom_partition_list. len ( ) ) ;
645- }
646-
647- let stream_relative_path = format ! ( "{stream}/{file_suffix}" ) ;
648-
649- // Try uploading the file, handle potential errors without breaking the loop
650- if let Err ( e) = self . upload_file ( & stream_relative_path, & file) . await {
651- error ! ( "Failed to upload file {}: {:?}" , filename, e) ;
652- continue ; // Skip to the next file
653- }
654-
655- let absolute_path = self
656- . absolute_url ( RelativePath :: from_path ( & stream_relative_path) . unwrap ( ) )
657- . to_string ( ) ;
658- let store = CONFIG . storage ( ) . get_object_store ( ) ;
659- let manifest =
660- catalog:: create_from_parquet_file ( absolute_path. clone ( ) , & file) . unwrap ( ) ;
661- catalog:: update_snapshot ( store, stream, manifest) . await ?;
662-
663- let _ = fs:: remove_file ( file) ;
664- }
665- }
666-
667- Ok ( ( ) )
668- }
669-
670578 async fn get_stream_meta_from_storage (
671579 & self ,
672580 stream_name : & str ,
@@ -844,31 +752,40 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
844752 return Ok ( ( ) ) ;
845753 }
846754
847- // get all streams
848- let streams = STREAM_INFO . list_streams ( ) ;
755+ let mut conversion_tasks = FuturesUnordered :: new ( ) ;
756+ for stream in STREAM_INFO . list_streams ( ) {
757+ conversion_tasks. push ( conversion_for_stream ( stream, shutdown_signal) ) ;
758+ }
849759
850- for stream in & streams {
851- conversion_for_stream ( stream, shutdown_signal) ?;
760+ while let Some ( res) = conversion_tasks. next ( ) . await {
761+ if let Err ( err) = res {
762+ error ! ( "Failed to run conversion task {err:?}" ) ;
763+ return Err ( err) ;
764+ }
852765 }
766+
853767 Ok ( ( ) )
854768 }
855769}
856770
857- fn conversion_for_stream ( stream : & str , shutdown_signal : bool ) -> Result < ( ) , ObjectStorageError > {
771+ async fn conversion_for_stream (
772+ stream : String ,
773+ shutdown_signal : bool ,
774+ ) -> Result < ( ) , ObjectStorageError > {
858775 info ! ( "Starting conversion job for stream- {stream}" ) ;
859776
860777 let time_partition = STREAM_INFO
861- . get_time_partition ( stream)
778+ . get_time_partition ( & stream)
862779 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
863780 let custom_partition = STREAM_INFO
864- . get_custom_partition ( stream)
781+ . get_custom_partition ( & stream)
865782 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
866- let dir = StorageDir :: new ( stream) ;
783+ let dir = StorageDir :: new ( & stream) ;
867784
868785 // read arrow files on disk
869786 // convert them to parquet
870787 let schema = convert_disk_files_to_parquet (
871- stream,
788+ & stream,
872789 & dir,
873790 time_partition,
874791 custom_partition. clone ( ) ,
@@ -884,7 +801,7 @@ fn conversion_for_stream(stream: &str, shutdown_signal: bool) -> Result<(), Obje
884801
885802 if let Some ( schema) = schema {
886803 let static_schema_flag = STREAM_INFO
887- . get_static_schema_flag ( stream)
804+ . get_static_schema_flag ( & stream)
888805 . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
889806 if !static_schema_flag {
890807 // schema is dynamic, read from staging and merge if present
0 commit comments