From 6f322fc741a648d02ca086504b8f082dbbc14943 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 13:31:58 +0530 Subject: [PATCH 1/8] feat: merge finish `.arrows` and convert to `.parquet` --- src/lib.rs | 11 +++- src/parseable/streams.rs | 6 +-- src/query/listing_table_builder.rs | 5 +- src/storage/mod.rs | 8 --- src/sync.rs | 86 +++++------------------------- 5 files changed, 25 insertions(+), 91 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 91fa0a405..8b4e7dc3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; -pub const STORAGE_CONVERSION_INTERVAL: u64 = 60; -pub const STORAGE_UPLOAD_INTERVAL: u64 = 30; +/// Describes the duration at the end of which in-memory buffers are flushed, +/// arrows files are "finished" and compacted into parquet files. +pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60); + +/// Duration used to configure prefix generation. +pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60; + +/// Describes the duration at the end of which parquets are pushed into objectstore. +pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(60); // A single HTTP client for all outgoing HTTP requests from the parseable server static HTTP_CLIENT: Lazy = Lazy::new(|| { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..df4bd3cda 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -49,11 +49,9 @@ use crate::{ metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - storage::{ - object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY, - }, + storage::{object_storage::to_bytes, retention::Retention, StreamType}, utils::minute_to_slot, - LOCK_EXPECT, + LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; use super::{ diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index f8ea4f75c..869dcb1db 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -32,9 +32,8 @@ use itertools::Itertools; use object_store::{path::Path, ObjectMeta, ObjectStore}; use crate::{ - event::DEFAULT_TIMESTAMP_KEY, - storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}, - utils::time::TimeRange, + event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange, + OBJECT_STORE_DATA_GRANULARITY, }; use super::PartialTimeFilter; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3be5bfc37..fb1487fcd 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; pub const MANIFEST_FILE: &str = "manifest.json"; -/// local sync interval to move data.records to /tmp dir of that stream. -/// 60 sec is a reasonable value. -pub const LOCAL_SYNC_INTERVAL: u64 = 60; - -/// duration used to configure prefix in objectstore and local disk structure -/// used for storage. Defaults to 1 min. -pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60; - // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; diff --git a/src/sync.rs b/src/sync.rs index a05b4ca72..769daefc7 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -26,8 +26,7 @@ use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::parseable::PARSEABLE; -use crate::storage::LOCAL_SYNC_INTERVAL; -use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; +use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; // Calculates the instant that is the start of the next minute fn next_minute() -> Instant { @@ -76,27 +75,21 @@ where /// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. #[tokio::main(flavor = "current_thread")] pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { - let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = object_store_sync(); - let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = - arrow_conversion(); loop { select! { _ = &mut cancel_rx => { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); - remote_conversion_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } return Ok(()); }, _ = &mut localsync_outbox => { @@ -111,13 +104,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); }, - _ = &mut remote_conversion_outbox => { - // remote_conversion failed, this is recoverable by just starting remote_conversion thread again - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } - (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion(); - }, } } } @@ -132,8 +118,7 @@ pub fn object_store_sync() -> ( let handle = task::spawn(async move { let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = - interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL)); + let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); let mut inbox_rx = AssertUnwindSafe(inbox_rx); @@ -183,7 +168,8 @@ pub fn object_store_sync() -> ( (handle, outbox_rx, inbox_tx) } -pub fn arrow_conversion() -> ( +/// Flush arrows onto disk and convert them into parquet files +pub fn local_sync() -> ( task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>, @@ -192,17 +178,18 @@ pub fn arrow_conversion() -> ( let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = interval_at( - next_minute() + Duration::from_secs(5), // 5 second delay - Duration::from_secs(STORAGE_CONVERSION_INTERVAL), - ); + info!("Local sync task started"); + let mut inbox_rx = inbox_rx; - let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL); loop { select! { _ = sync_interval.tick() => { + trace!("Flushing Arrows to disk..."); + PARSEABLE.flush_all_streams(); + trace!("Converting Arrow to Parquet... "); if let Err(e) = monitor_task_duration( "arrow_conversion", @@ -224,55 +211,6 @@ pub fn arrow_conversion() -> ( } })); - match result { - Ok(future) => { - future.await; - } - Err(panic_error) => { - error!("Panic in object store sync task: {panic_error:?}"); - let _ = outbox_tx.send(()); - } - } - - info!("Object store sync task ended"); - }); - - (handle, outbox_rx, inbox_tx) -} - -pub fn run_local_sync() -> ( - task::JoinHandle<()>, - oneshot::Receiver<()>, - oneshot::Sender<()>, -) { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - - let handle = task::spawn(async move { - info!("Local sync task started"); - let mut inbox_rx = inbox_rx; - - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = - interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL)); - - loop { - select! { - _ = sync_interval.tick() => { - trace!("Flushing Arrows to disk..."); - PARSEABLE.flush_all_streams(); - }, - res = &mut inbox_rx => {match res{ - Ok(_) => break, - Err(_) => { - warn!("Inbox channel closed unexpectedly"); - break; - }} - } - } - } - })); - match result { Ok(future) => { future.await; From 281a1a4f133023d672c12eee9a087864c94df57e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:09:03 +0530 Subject: [PATCH 2/8] fix: push every 30s Signed-off-by: Devdutt Shenoi --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 8b4e7dc3e..2f8eb06ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,7 +67,7 @@ pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60); pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60; /// Describes the duration at the end of which parquets are pushed into objectstore. -pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(60); +pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30); // A single HTTP client for all outgoing HTTP requests from the parseable server static HTTP_CLIENT: Lazy = Lazy::new(|| { From 4a2a659505394850c85eebbbcea29146e565ce0b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Feb 2025 20:29:27 +0530 Subject: [PATCH 3/8] refact: spawn conversion tasks to not block schedule --- src/sync.rs | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 769daefc7..60ec7db12 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -20,6 +20,7 @@ use chrono::{TimeDelta, Timelike}; use std::future::Future; use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; +use tokio::task::JoinSet; use tokio::time::{interval_at, sleep, Duration, Instant}; use tokio::{select, task}; use tracing::{error, info, trace, warn}; @@ -183,23 +184,18 @@ pub fn local_sync() -> ( let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL); + let mut joinset = JoinSet::new(); loop { select! { + // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { - trace!("Flushing Arrows to disk..."); - PARSEABLE.flush_all_streams(); - - trace!("Converting Arrow to Parquet... "); - if let Err(e) = monitor_task_duration( - "arrow_conversion", - Duration::from_secs(30), - || async { PARSEABLE.streams.prepare_parquet(false) }, - ).await - { - warn!("failed to convert local arrow data to parquet. {e:?}"); - } + joinset.spawn(flush_and_convert()); }, + // Joins and logs errors in spawned tasks + Some(Err(e)) = joinset.join_next(), if !joinset.is_empty() => { + error!("Issue joining flush+conversion: {e}") + } res = &mut inbox_rx => {match res{ Ok(_) => break, Err(_) => { @@ -273,3 +269,18 @@ pub fn schedule_alert_task( }); Ok(handle) } + +/// Asynchronously flushes all streams when called, then compacts them into parquet files ready to be pushed onto objectstore +async fn flush_and_convert() { + trace!("Flushing Arrows to disk..."); + PARSEABLE.flush_all_streams(); + + trace!("Converting Arrow to Parquet... "); + if let Err(e) = monitor_task_duration("arrow_conversion", Duration::from_secs(30), || async { + PARSEABLE.streams.prepare_parquet(false) + }) + .await + { + warn!("failed to convert local arrow data to parquet. {e:?}"); + } +} From 8219e436aa539498b1e428916a794c0490ce480b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 11:44:22 +0530 Subject: [PATCH 4/8] fix: sync with two threads --- src/sync.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 60ec7db12..278397f50 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -74,7 +74,7 @@ where /// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every /// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. -#[tokio::main(flavor = "current_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = @@ -86,10 +86,10 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {e:?}"); } if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {e:?}"); } return Ok(()); }, @@ -101,7 +101,7 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {e:?}"); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); }, @@ -212,7 +212,7 @@ pub fn local_sync() -> ( future.await; } Err(panic_error) => { - error!("Panic in local sync task: {:?}", panic_error); + error!("Panic in local sync task: {panic_error:?}"); } } From 2b3b2d44d86e988ca5b743726c5d1099c91731ca Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 12:41:41 +0530 Subject: [PATCH 5/8] feat: parallelize flush+conversion per stream --- src/handlers/http/health_check.rs | 26 ++++++++++++++++++++++++-- src/handlers/http/modal/mod.rs | 20 -------------------- src/parseable/mod.rs | 10 ---------- src/parseable/streams.rs | 23 ++++++++++++++++------- src/sync.rs | 25 +++++++------------------ 5 files changed, 47 insertions(+), 57 deletions(-) diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index bd79d1019..7bb1fed97 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -27,7 +27,8 @@ use actix_web::{ HttpResponse, }; use http::StatusCode; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, task::JoinSet}; +use tracing::{error, info, warn}; use crate::parseable::PARSEABLE; @@ -60,8 +61,29 @@ pub async fn shutdown() { let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; *shutdown_flag = true; + let mut joinset = JoinSet::new(); + // Sync staging - PARSEABLE.flush_all_streams(); + PARSEABLE.streams.flush_and_convert(&mut joinset, true); + + while let Some(res) = joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Err(err) => error!("Failed to join async task: {err}"), + } + } + + if let Err(e) = PARSEABLE + .storage + .get_object_store() + .upload_files_from_staging() + .await + { + warn!("Failed to sync local data with object store. {:?}", e); + } else { + info!("Successfully synced all data to S3."); + } } pub async fn readiness() -> HttpResponse { diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index c0e7278b9..e9aae3e6a 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -136,26 +136,6 @@ pub trait ParseableServer { health_check::shutdown().await; - // Perform S3 sync and wait for completion - info!("Starting data sync to S3..."); - - if let Err(e) = PARSEABLE.streams.prepare_parquet(true) { - warn!("Failed to convert arrow files to parquet. {:?}", e); - } else { - info!("Successfully converted arrow files to parquet."); - } - - if let Err(e) = PARSEABLE - .storage - .get_object_store() - .upload_files_from_staging() - .await - { - warn!("Failed to sync local data with object store. {:?}", e); - } else { - info!("Successfully synced all data to S3."); - } - // Initiate graceful shutdown info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 8e485c589..eb4dd6761 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -179,16 +179,6 @@ impl Parseable { .unwrap_or_default()) } - /// Writes all streams in staging onto disk, awaiting conversion into parquet. - /// Deletes all in memory recordbatches, freeing up rows in mem-writer. - pub fn flush_all_streams(&self) { - let streams = self.streams.read().unwrap(); - - for staging in streams.values() { - staging.flush() - } - } - // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result, ObjectStorageError> { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index df4bd3cda..e0e143c34 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -41,6 +41,7 @@ use parquet::{ }; use rand::distributions::DistString; use relative_path::RelativePathBuf; +use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; use crate::{ @@ -651,6 +652,13 @@ impl Stream { pub fn get_stream_type(&self) -> StreamType { self.metadata.read().expect(LOCK_EXPECT).stream_type } + + /// First flushes arrows onto disk and then converts the arrow into parquet + pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { + self.flush(); + + self.prepare_parquet(shutdown_signal) + } } #[derive(Deref, DerefMut, Default)] @@ -717,8 +725,13 @@ impl Streams { .collect() } - /// Convert arrow files into parquet, preparing it for upload - pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { + /// Asynchronously flushes arrows and compacts into parquet data on all streams in staging, + /// so that it is ready to be pushed onto objectstore. + pub fn flush_and_convert( + &self, + joinset: &mut JoinSet>, + shutdown_signal: bool, + ) { let streams: Vec> = self .read() .expect(LOCK_EXPECT) @@ -726,12 +739,8 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - stream - .prepare_parquet(shutdown_signal) - .inspect_err(|err| error!("Failed to run conversion task {err:?}"))?; + joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) }); } - - Ok(()) } } diff --git a/src/sync.rs b/src/sync.rs index 278397f50..9df5e59b2 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -190,11 +190,15 @@ pub fn local_sync() -> ( select! { // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { - joinset.spawn(flush_and_convert()); + PARSEABLE.streams.flush_and_convert(&mut joinset, false) }, // Joins and logs errors in spawned tasks - Some(Err(e)) = joinset.join_next(), if !joinset.is_empty() => { - error!("Issue joining flush+conversion: {e}") + Some(res) = joinset.join_next(), if !joinset.is_empty() => { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Err(err) => error!("Issue joining flush+conversion task: {err}"), + } } res = &mut inbox_rx => {match res{ Ok(_) => break, @@ -269,18 +273,3 @@ pub fn schedule_alert_task( }); Ok(handle) } - -/// Asynchronously flushes all streams when called, then compacts them into parquet files ready to be pushed onto objectstore -async fn flush_and_convert() { - trace!("Flushing Arrows to disk..."); - PARSEABLE.flush_all_streams(); - - trace!("Converting Arrow to Parquet... "); - if let Err(e) = monitor_task_duration("arrow_conversion", Duration::from_secs(30), || async { - PARSEABLE.streams.prepare_parquet(false) - }) - .await - { - warn!("failed to convert local arrow data to parquet. {e:?}"); - } -} From 0c789ebd849f7a62526bfde9320eac8519aaa322 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 12:48:23 +0530 Subject: [PATCH 6/8] don't construct path --- src/storage/object_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a6ba5558e..24ed3222d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -712,7 +712,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { - if !Path::new(&PARSEABLE.options.staging_dir()).exists() { + if !PARSEABLE.options.staging_dir().exists() { return Ok(()); } From 71d643d3767f6de96abe53033a4844f8e0bfadf9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 12:59:54 +0530 Subject: [PATCH 7/8] fix: time total --- src/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync.rs b/src/sync.rs index 9df5e59b2..9c701777d 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -63,7 +63,7 @@ where if warned_once { warn!( "Task '{task_name}' took longer than expected: {:?} (threshold: {threshold:?})", - start_time.elapsed() - threshold + start_time.elapsed() ); } break res.expect("Task handle shouldn't error"); From 00b5b714ebdec7e504f88706e98969ab52134941 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 13:39:47 +0530 Subject: [PATCH 8/8] localsync Signed-off-by: Devdutt Shenoi --- src/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync.rs b/src/sync.rs index 9c701777d..86b489893 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -86,7 +86,7 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { - error!("Error joining remote_sync_handler: {e:?}"); + error!("Error joining localsync_handler: {e:?}"); } if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {e:?}");