@@ -549,83 +549,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
549549 Ok ( Bytes :: new ( ) )
550550 }
551551
552- async fn sync ( & self , shutdown_signal : bool ) -> Result < ( ) , ObjectStorageError > {
553- if !Path :: new ( & PARSEABLE . staging_dir ( ) ) . exists ( ) {
554- return Ok ( ( ) ) ;
555- }
556-
557- for stream_name in PARSEABLE . streams . list ( ) . iter ( ) {
558- let stream = PARSEABLE . get_or_create_stream ( stream_name) ;
559- let time_partition = stream. get_time_partition ( ) ;
560- let custom_partition = stream. get_custom_partition ( ) ;
561- if let Some ( schema) = stream
562- . convert_disk_files_to_parquet (
563- time_partition. as_ref ( ) ,
564- custom_partition. as_ref ( ) ,
565- shutdown_signal,
566- )
567- . map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?
568- {
569- let static_schema_flag = stream. get_static_schema_flag ( ) ;
570- if !static_schema_flag {
571- commit_schema_to_storage ( stream_name, schema) . await ?;
572- }
573- }
574-
575- for file in stream. parquet_files ( ) {
576- let filename = file
577- . file_name ( )
578- . expect ( "only parquet files are returned by iterator" )
579- . to_str ( )
580- . expect ( "filename is valid string" ) ;
581-
582- let mut file_date_part = filename. split ( '.' ) . collect :: < Vec < & str > > ( ) [ 0 ] ;
583- file_date_part = file_date_part. split ( '=' ) . collect :: < Vec < & str > > ( ) [ 1 ] ;
584- let compressed_size = file. metadata ( ) . map_or ( 0 , |meta| meta. len ( ) ) ;
585- STORAGE_SIZE
586- . with_label_values ( & [ "data" , stream_name, "parquet" ] )
587- . add ( compressed_size as i64 ) ;
588- EVENTS_STORAGE_SIZE_DATE
589- . with_label_values ( & [ "data" , stream_name, "parquet" , file_date_part] )
590- . add ( compressed_size as i64 ) ;
591- LIFETIME_EVENTS_STORAGE_SIZE
592- . with_label_values ( & [ "data" , stream_name, "parquet" ] )
593- . add ( compressed_size as i64 ) ;
594- let mut file_suffix = str:: replacen ( filename, "." , "/" , 3 ) ;
595-
596- let custom_partition_clone = custom_partition. clone ( ) ;
597- if custom_partition_clone. is_some ( ) {
598- let custom_partition_fields = custom_partition_clone. unwrap ( ) ;
599- let custom_partition_list =
600- custom_partition_fields. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
601- file_suffix =
602- str:: replacen ( filename, "." , "/" , 3 + custom_partition_list. len ( ) ) ;
603- }
604-
605- let stream_relative_path = format ! ( "{stream_name}/{file_suffix}" ) ;
606-
607- // Try uploading the file, handle potential errors without breaking the loop
608- if let Err ( e) = self . upload_file ( & stream_relative_path, & file) . await {
609- error ! ( "Failed to upload file {}: {:?}" , filename, e) ;
610- continue ; // Skip to the next file
611- }
612-
613- let absolute_path = self
614- . absolute_url ( RelativePath :: from_path ( & stream_relative_path) . unwrap ( ) )
615- . to_string ( ) ;
616- let store = PARSEABLE . storage ( ) . get_object_store ( ) ;
617- let manifest =
618- catalog:: manifest:: create_from_parquet_file ( absolute_path. clone ( ) , & file)
619- . unwrap ( ) ;
620- catalog:: update_snapshot ( store, stream_name, manifest) . await ?;
621-
622- let _ = fs:: remove_file ( file) ;
623- }
624- }
625-
626- Ok ( ( ) )
627- }
628-
629552 async fn get_stream_meta_from_storage (
630553 & self ,
631554 stream_name : & str ,
0 commit comments