@@ -84,7 +84,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
8484 loop {
8585 select ! {
8686 _ = & mut cancel_rx => {
87- // actix server finished .. stop other threads and stop the server
8887 remote_sync_inbox. send( ( ) ) . unwrap_or( ( ) ) ;
8988 localsync_inbox. send( ( ) ) . unwrap_or( ( ) ) ;
9089 if let Err ( e) = localsync_handler. await {
@@ -96,12 +95,9 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
9695 return Ok ( ( ) ) ;
9796 } ,
9897 _ = & mut localsync_outbox => {
99- // crash the server if localsync fails for any reason
100- // panic!("Local Sync thread died. Server will fail now!")
10198 return Err ( anyhow:: Error :: msg( "Failed to sync local data to drive. Please restart the Parseable server.\n \n Join us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable" ) )
10299 } ,
103100 _ = & mut remote_sync_outbox => {
104- // remote_sync failed, this is recoverable by just starting remote_sync thread again
105101 if let Err ( e) = remote_sync_handler. await {
106102 error!( "Error joining remote_sync_handler: {e:?}" ) ;
107103 }
@@ -125,16 +121,26 @@ pub fn object_store_sync() -> (
125121
126122 let result = std:: panic:: catch_unwind ( AssertUnwindSafe ( || async move {
127123 let mut sync_interval = interval_at ( next_minute ( ) , STORAGE_UPLOAD_INTERVAL ) ;
128- let mut joinset = JoinSet :: new ( ) ;
129124
130125 loop {
131126 select ! {
132127 _ = sync_interval. tick( ) => {
133128 trace!( "Syncing Parquets to Object Store... " ) ;
134- sync_all_streams( & mut joinset)
135- } ,
136- Some ( res) = joinset. join_next( ) , if !joinset. is_empty( ) => {
137- log_join_result( res, "object store sync" ) ;
129+
130+ // Monitor the duration of sync_all_streams execution
131+ monitor_task_duration(
132+ "object_store_sync_all_streams" ,
133+ Duration :: from_secs( 15 ) ,
134+ || async {
135+ let mut joinset = JoinSet :: new( ) ;
136+ sync_all_streams( & mut joinset) ;
137+
138+ // Wait for all spawned tasks to complete
139+ while let Some ( res) = joinset. join_next( ) . await {
140+ log_join_result( res, "object store sync" ) ;
141+ }
142+ }
143+ ) . await ;
138144 } ,
139145 res = & mut inbox_rx => {
140146 match res {
@@ -147,10 +153,6 @@ pub fn object_store_sync() -> (
147153 }
148154 }
149155 }
150- // Drain remaining joinset tasks
151- while let Some ( res) = joinset. join_next ( ) . await {
152- log_join_result ( res, "object store sync" ) ;
153- }
154156 } ) ) ;
155157
156158 match result {
@@ -184,32 +186,37 @@ pub fn local_sync() -> (
184186
185187 let result = std:: panic:: catch_unwind ( AssertUnwindSafe ( || async move {
186188 let mut sync_interval = interval_at ( next_minute ( ) , LOCAL_SYNC_INTERVAL ) ;
187- let mut joinset = JoinSet :: new ( ) ;
188189
189190 loop {
190191 select ! {
191192 // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
192193 _ = sync_interval. tick( ) => {
193- PARSEABLE . streams. flush_and_convert( & mut joinset, false , false )
194+ // Monitor the duration of flush_and_convert execution
195+ monitor_task_duration(
196+ "local_sync_flush_and_convert" ,
197+ Duration :: from_secs( 15 ) ,
198+ || async {
199+ let mut joinset = JoinSet :: new( ) ;
200+ PARSEABLE . streams. flush_and_convert( & mut joinset, false , false ) ;
201+
202+ // Wait for all spawned tasks to complete
203+ while let Some ( res) = joinset. join_next( ) . await {
204+ log_join_result( res, "flush and convert" ) ;
205+ }
206+ }
207+ ) . await ;
194208 } ,
195- // Joins and logs errors in spawned tasks
196- Some ( res) = joinset. join_next( ) , if !joinset. is_empty( ) => {
197- log_join_result( res, "flush and convert" ) ;
198- }
199- res = & mut inbox_rx => { match res{
200- Ok ( _) => break ,
201- Err ( _) => {
202- warn!( "Inbox channel closed unexpectedly" ) ;
203- break ;
204- } }
209+ res = & mut inbox_rx => {
210+ match res {
211+ Ok ( _) => break ,
212+ Err ( _) => {
213+ warn!( "Inbox channel closed unexpectedly" ) ;
214+ break ;
215+ }
216+ }
205217 }
206218 }
207219 }
208-
209- // Drain remaining joinset tasks
210- while let Some ( res) = joinset. join_next ( ) . await {
211- log_join_result ( res, "flush and convert" ) ;
212- }
213220 } ) ) ;
214221
215222 match result {
@@ -228,21 +235,34 @@ pub fn local_sync() -> (
228235 ( handle, outbox_rx, inbox_tx)
229236}
230237
231- // local sync at the start of the server
238+ // local and object store sync at the start of the server
232239pub async fn sync_start ( ) -> anyhow:: Result < ( ) > {
233- let mut local_sync_joinset = JoinSet :: new ( ) ;
234- PARSEABLE
235- . streams
236- . flush_and_convert ( & mut local_sync_joinset, true , false ) ;
237- while let Some ( res) = local_sync_joinset. join_next ( ) . await {
238- log_join_result ( res, "flush and convert" ) ;
239- }
240+ // Monitor local sync duration at startup
241+ monitor_task_duration ( "startup_local_sync" , Duration :: from_secs ( 15 ) , || async {
242+ let mut local_sync_joinset = JoinSet :: new ( ) ;
243+ PARSEABLE
244+ . streams
245+ . flush_and_convert ( & mut local_sync_joinset, true , false ) ;
246+ while let Some ( res) = local_sync_joinset. join_next ( ) . await {
247+ log_join_result ( res, "flush and convert" ) ;
248+ }
249+ } )
250+ . await ;
251+
252+ // Monitor object store sync duration at startup
253+ monitor_task_duration (
254+ "startup_object_store_sync" ,
255+ Duration :: from_secs ( 15 ) ,
256+ || async {
257+ let mut object_store_joinset = JoinSet :: new ( ) ;
258+ sync_all_streams ( & mut object_store_joinset) ;
259+ while let Some ( res) = object_store_joinset. join_next ( ) . await {
260+ log_join_result ( res, "object store sync" ) ;
261+ }
262+ } ,
263+ )
264+ . await ;
240265
241- let mut object_store_joinset = JoinSet :: new ( ) ;
242- sync_all_streams ( & mut object_store_joinset) ;
243- while let Some ( res) = object_store_joinset. join_next ( ) . await {
244- log_join_result ( res, "object store sync" ) ;
245- }
246266 Ok ( ( ) )
247267}
248268
0 commit comments