@@ -29,12 +29,12 @@ use actix_web::{
2929use http:: StatusCode ;
3030use once_cell:: sync:: Lazy ;
3131use tokio:: { sync:: Mutex , task:: JoinSet } ;
32- use tracing:: { error, info, warn } ;
32+ use tracing:: { error, info} ;
3333
34- use crate :: parseable:: PARSEABLE ;
34+ use crate :: { parseable:: PARSEABLE , storage :: object_storage :: sync_all_streams } ;
3535
3636// Create a global variable to store signal status
37- static SIGNAL_RECEIVED : Lazy < Arc < Mutex < bool > > > = Lazy :: new ( || Arc :: new ( Mutex :: new ( false ) ) ) ;
37+ pub static SIGNAL_RECEIVED : Lazy < Arc < Mutex < bool > > > = Lazy :: new ( || Arc :: new ( Mutex :: new ( false ) ) ) ;
3838
3939pub async fn liveness ( ) -> HttpResponse {
4040 HttpResponse :: new ( StatusCode :: OK )
@@ -60,28 +60,33 @@ pub async fn shutdown() {
6060 let mut shutdown_flag = SIGNAL_RECEIVED . lock ( ) . await ;
6161 * shutdown_flag = true ;
6262
63- let mut joinset = JoinSet :: new ( ) ;
63+ //sleep for 5 secs to allow any ongoing requests to finish
64+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) . await ;
65+ let mut local_sync_joinset = JoinSet :: new ( ) ;
6466
6567 // Sync staging
66- PARSEABLE . streams . flush_and_convert ( & mut joinset, true ) ;
68+ PARSEABLE
69+ . streams
70+ . flush_and_convert ( & mut local_sync_joinset, false , true ) ;
6771
68- while let Some ( res) = joinset . join_next ( ) . await {
72+ while let Some ( res) = local_sync_joinset . join_next ( ) . await {
6973 match res {
7074 Ok ( Ok ( _) ) => info ! ( "Successfully converted arrow files to parquet." ) ,
71- Ok ( Err ( err) ) => warn ! ( "Failed to convert arrow files to parquet. {err:?}" ) ,
75+ Ok ( Err ( err) ) => error ! ( "Failed to convert arrow files to parquet. {err:?}" ) ,
7276 Err ( err) => error ! ( "Failed to join async task: {err}" ) ,
7377 }
7478 }
7579
76- if let Err ( e) = PARSEABLE
77- . storage
78- . get_object_store ( )
79- . upload_files_from_staging ( )
80- . await
81- {
82- warn ! ( "Failed to sync local data with object store. {:?}" , e) ;
83- } else {
84- info ! ( "Successfully synced all data to S3." ) ;
80+ // Sync object store
81+ let mut object_store_joinset = JoinSet :: new ( ) ;
82+ sync_all_streams ( & mut object_store_joinset) ;
83+
84+ while let Some ( res) = object_store_joinset. join_next ( ) . await {
85+ match res {
86+ Ok ( Ok ( _) ) => info ! ( "Successfully synced all data to S3." ) ,
87+ Ok ( Err ( err) ) => error ! ( "Failed to sync local data with object store. {err:?}" ) ,
88+ Err ( err) => error ! ( "Failed to join async task: {err}" ) ,
89+ }
8590 }
8691}
8792
0 commit comments