Skip to content

Commit 1c26d5b

Browse files
committed
add warnings
Upload and conversion tasks are handled by a monitor task which checks whether these tasks have crossed a threshold or not. In case either of these tasks crosses a pre-defined threshold, a warning will be printed.
1 parent a9743d5 commit 1c26d5b

File tree

1 file changed

+72
-8
lines changed

1 file changed

+72
-8
lines changed

src/sync.rs

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,66 @@
1818

1919
use clokwerk::{AsyncScheduler, Job, TimeUnits};
2020
use std::panic::AssertUnwindSafe;
21+
use std::sync::atomic::AtomicBool;
22+
use std::sync::Arc;
2123
use tokio::sync::oneshot;
22-
use tokio::task;
23-
use tokio::time::{interval, sleep, Duration};
24+
use tokio::time::{interval, sleep, Duration, Instant};
25+
use tokio::{select, task};
2426
use tracing::{error, info, warn};
2527

2628
use crate::alerts::{alerts_utils, AlertConfig, AlertError};
2729
use crate::option::CONFIG;
2830
use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL};
2931

32+
pub async fn monitor_task_duration<F, Fut, T>(task_name: &str, threshold: Duration, f: F) -> T
33+
where
34+
F: FnOnce() -> Fut,
35+
Fut: std::future::Future<Output = T>,
36+
{
37+
let warning_issued = Arc::new(AtomicBool::new(false));
38+
let warning_issued_clone = warning_issued.clone();
39+
let warning_issued_clone_select = warning_issued.clone();
40+
let start_time = Instant::now();
41+
42+
// create the main task future
43+
let task_future = f();
44+
45+
// create the monitoring task future
46+
let monitor_future = async move {
47+
let mut check_interval = interval(Duration::from_millis(100));
48+
49+
loop {
50+
check_interval.tick().await;
51+
let elapsed = start_time.elapsed();
52+
53+
if elapsed > threshold
54+
&& !warning_issued_clone.load(std::sync::atomic::Ordering::Relaxed)
55+
{
56+
warn!(
57+
"Task '{}' is taking longer than expected: (threshold: {:?})",
58+
task_name, threshold
59+
);
60+
warning_issued_clone.store(true, std::sync::atomic::Ordering::Relaxed);
61+
}
62+
}
63+
};
64+
65+
// run both futures concurrently, but only take the result from the task future
66+
select! {
67+
task_result = task_future => {
68+
if warning_issued_clone_select.load(std::sync::atomic::Ordering::Relaxed) {
69+
let elapsed = start_time.elapsed();
70+
warn!(
71+
"Task '{}' took longer than expected: {:?} (threshold: {:?})",
72+
task_name, elapsed, threshold
73+
);
74+
}
75+
task_result
76+
},
77+
_ = monitor_future => unreachable!(), // monitor future never completes
78+
}
79+
}
80+
3081
pub async fn object_store_sync() -> (
3182
task::JoinHandle<()>,
3283
oneshot::Receiver<()>,
@@ -42,11 +93,18 @@ pub async fn object_store_sync() -> (
4293
.every(STORAGE_UPLOAD_INTERVAL.seconds())
4394
// .plus(5u32.seconds())
4495
.run(|| async {
45-
if let Err(e) = CONFIG
46-
.storage()
47-
.get_object_store()
48-
.upload_files_from_staging()
49-
.await
96+
if let Err(e) = monitor_task_duration(
97+
"object_store_sync",
98+
Duration::from_secs(15),
99+
|| async {
100+
CONFIG
101+
.storage()
102+
.get_object_store()
103+
.upload_files_from_staging()
104+
.await
105+
},
106+
)
107+
.await
50108
{
51109
warn!("failed to upload local data with object store. {:?}", e);
52110
}
@@ -101,7 +159,13 @@ pub async fn arrow_conversion() -> (
101159
.every(STORAGE_CONVERSION_INTERVAL.seconds())
102160
.plus(5u32.seconds())
103161
.run(|| async {
104-
if let Err(e) = CONFIG.storage().get_object_store().conversion(false).await {
162+
if let Err(e) = monitor_task_duration(
163+
"arrow_conversion",
164+
Duration::from_secs(30),
165+
|| async { CONFIG.storage().get_object_store().conversion(false).await },
166+
)
167+
.await
168+
{
105169
warn!("failed to convert local arrow data to parquet. {:?}", e);
106170
}
107171
});

0 commit comments

Comments
 (0)