diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index bd6603088..8f89136ef 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -330,7 +330,7 @@ pub struct RollingWindow { // should always be "now" pub eval_end: String, // x minutes (5m) - pub eval_frequency: u32, + pub eval_frequency: u64, } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -641,7 +641,7 @@ impl AlertConfig { columns } - pub fn get_eval_frequency(&self) -> u32 { + pub fn get_eval_frequency(&self) -> u64 { match &self.eval_type { EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 90e8f863f..753bdde87 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -103,8 +103,6 @@ pub async fn list(req: HttpRequest) -> Result { .unwrap() .into_iter() .filter(|logstream| { - warn!("logstream-\n{logstream:?}"); - Users.authorize(key.clone(), Action::ListStream, Some(logstream), None) == crate::rbac::Response::Authorized }) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 4cdfb6428..8f0c92ca0 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -203,6 +203,11 @@ impl ParseableServer for IngestServer { sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; + let ( + mut remote_conversion_handler, + mut remote_conversion_outbox, + mut remote_conversion_inbox, + ) = sync::arrow_conversion().await; tokio::spawn(airplane::server()); @@ -219,12 +224,16 @@ impl ParseableServer for IngestServer { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); + remote_conversion_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -238,6 +247,13 @@ impl ParseableServer for IngestServer { error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + }, + _ = &mut remote_conversion_outbox => { + // remote_conversion failed, this is recoverable by just starting remote_conversion thread again + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } + (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; } } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index f791d16b2..bf62a1885 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -139,7 +139,19 @@ pub trait ParseableServer { // Perform S3 sync and wait for completion info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + + if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await { + warn!("Failed to convert arrow files to parquet. {:?}", e); + } else { + info!("Successfully converted arrow files to parquet."); + } + + if let Err(e) = CONFIG + .storage() + .get_object_store() + .upload_files_from_staging() + .await + { warn!("Failed to sync local data with object store. {:?}", e); } else { info!("Successfully synced all data to S3."); diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 0b66a743a..ad6be59c0 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -134,7 +134,11 @@ impl ParseableServer for Server { sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; - + let ( + mut remote_conversion_handler, + mut remote_conversion_outbox, + mut remote_conversion_inbox, + ) = sync::arrow_conversion().await; if CONFIG.options.send_analytics { analytics::init_analytics_scheduler()?; } @@ -152,12 +156,16 @@ impl ParseableServer for Server { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); + remote_conversion_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -171,6 +179,13 @@ impl ParseableServer for Server { error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + }, + _ = &mut remote_conversion_outbox => { + // remote_conversion failed, this is recoverable by just starting remote_conversion thread again + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } + (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; } }; diff --git a/src/lib.rs b/src/lib.rs index 2a20b2bb0..0eb4b0a02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,8 @@ pub use handlers::http::modal::{ use once_cell::sync::Lazy; use reqwest::{Client, ClientBuilder}; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; +pub const STORAGE_CONVERSION_INTERVAL: u32 = 60; +pub const STORAGE_UPLOAD_INTERVAL: u32 = 30; // A single HTTP client for all outgoing HTTP requests from the parseable server static HTTP_CLIENT: Lazy = Lazy::new(|| { diff --git a/src/staging/streams.rs b/src/staging/streams.rs index 967f6d682..e950ed6dc 100644 --- a/src/staging/streams.rs +++ b/src/staging/streams.rs @@ -19,7 +19,7 @@ use std::{ collections::HashMap, - fs::{remove_file, OpenOptions}, + fs::{remove_file, File, OpenOptions}, path::{Path, PathBuf}, process, sync::{Arc, Mutex, RwLock}, @@ -165,6 +165,11 @@ impl<'a> Stream<'a> { paths } + /// Groups arrow files which are to be included in one parquet + /// + /// Excludes the arrow file being written for the current minute (data is still being written to that one) + /// + /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, exclude: NaiveDateTime, @@ -173,6 +178,8 @@ impl<'a> Stream<'a> { let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); + // if the shutdown signal is false i.e. normal condition + // don't keep the ones for the current minute if !shutdown_signal { arrow_files.retain(|path| { !path @@ -215,6 +222,45 @@ impl<'a> Stream<'a> { .collect() } + pub fn schema_files(&self) -> Vec { + let Ok(dir) = self.data_path.read_dir() else { + return vec![]; + }; + + dir.flatten() + .map(|file| file.path()) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("schema"))) + .collect() + } + + pub fn get_schemas_if_present(&self) -> Option> { + let Ok(dir) = self.data_path.read_dir() else { + return None; + }; + + let mut schemas: Vec = Vec::new(); + + for file in dir.flatten() { + if let Some(ext) = file.path().extension() { + if ext.eq("schema") { + let file = File::open(file.path()).expect("Schema File should exist"); + + let schema = match serde_json::from_reader(file) { + Ok(schema) => schema, + Err(_) => continue, + }; + schemas.push(schema); + } + } + } + + if !schemas.is_empty() { + Some(schemas) + } else { + None + } + } + fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { let filename = path.file_stem().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); @@ -249,6 +295,9 @@ impl<'a> Stream<'a> { } } + /// This function reads arrow files, groups their schemas + /// + /// converts them into parquet files and returns a merged schema pub fn convert_disk_files_to_parquet( &self, time_partition: Option<&String>, @@ -272,12 +321,12 @@ impl<'a> Stream<'a> { } // warn!("staging files-\n{staging_files:?}\n"); - for (parquet_path, files) in staging_files { + for (parquet_path, arrow_files) in staging_files { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) - .set(files.len() as i64); + .set(arrow_files.len() as i64); - for file in &files { + for file in &arrow_files { let file_size = file.metadata().unwrap().len(); let file_type = file.extension().unwrap().to_str().unwrap(); @@ -286,7 +335,7 @@ impl<'a> Stream<'a> { .add(file_size as i64); } - let record_reader = MergedReverseRecordReader::try_new(&files); + let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { continue; } @@ -319,7 +368,7 @@ impl<'a> Stream<'a> { ); remove_file(parquet_path).unwrap(); } else { - for file in files { + for file in arrow_files { // warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len(); let file_type = file.extension().unwrap().to_str().unwrap(); diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index aaa628364..8994f06a4 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -24,9 +24,10 @@ use super::{ }; use crate::alerts::AlertConfig; +use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; @@ -45,14 +46,17 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Local, Utc}; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use ulid::Ulid; use std::collections::{BTreeMap, HashSet}; use std::fmt::Debug; +use std::fs::File; use std::num::NonZeroU32; use std::{ collections::HashMap, @@ -574,91 +578,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(Bytes::new()) } - async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { - if !Path::new(&CONFIG.staging_dir()).exists() { - return Ok(()); - } - - let streams = STREAM_INFO.list_streams(); - - for stream in &streams { - let time_partition = STREAM_INFO - .get_time_partition(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let custom_partition = STREAM_INFO - .get_custom_partition(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let staging = STAGING.get_or_create_stream(stream); - let schema = staging - .convert_disk_files_to_parquet( - time_partition.as_ref(), - custom_partition.as_ref(), - shutdown_signal, - ) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - - if let Some(schema) = schema { - let static_schema_flag = STREAM_INFO - .get_static_schema_flag(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - if !static_schema_flag { - commit_schema_to_storage(stream, schema).await?; - } - } - - for file in staging.parquet_files() { - let filename = file - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - - let mut file_date_part = filename.split('.').collect::>()[0]; - file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - EVENTS_STORAGE_SIZE_DATE - .with_label_values(&["data", stream, "parquet", file_date_part]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - let mut file_suffix = str::replacen(filename, ".", "/", 3); - - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = - str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); - } - - let stream_relative_path = format!("{stream}/{file_suffix}"); - - // Try uploading the file, handle potential errors without breaking the loop - if let Err(e) = self.upload_file(&stream_relative_path, &file).await { - error!("Failed to upload file {}: {:?}", filename, e); - continue; // Skip to the next file - } - - let absolute_path = self - .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) - .to_string(); - let store = CONFIG.storage().get_object_store(); - let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); - catalog::update_snapshot(store, stream, manifest).await?; - - let _ = fs::remove_file(file); - } - } - - Ok(()) - } - async fn get_stream_meta_from_storage( &self, stream_name: &str, @@ -733,6 +652,187 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // pick a better name fn get_bucket_name(&self) -> String; + + async fn put_correlation( + &self, + correlation: &CorrelationConfig, + ) -> Result<(), ObjectStorageError> { + let path = + RelativePathBuf::from_iter([CORRELATION_DIR, &format!("{}.json", correlation.id)]); + self.put_object(&path, to_bytes(correlation)).await?; + Ok(()) + } + + async fn get_correlations(&self) -> Result, CorrelationError> { + let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIR]); + let correlation_bytes = self + .get_objects( + Some(&correlation_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + Ok(correlation_bytes) + } + + async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { + if !Path::new(&CONFIG.staging_dir()).exists() { + return Ok(()); + } + + // get all streams + for stream in STREAM_INFO.list_streams() { + info!("Starting object_store_sync for stream- {stream}"); + + let custom_partition = STREAM_INFO + .get_custom_partition(&stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + + let stage = STAGING.get_or_create_stream(&stream); + for file in stage.parquet_files() { + let filename = file + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + + let mut file_date_part = filename.split('.').collect::>()[0]; + file_date_part = file_date_part.split('=').collect::>()[1]; + let compressed_size = file.metadata().map_or(0, |meta| meta.len()); + STORAGE_SIZE + .with_label_values(&["data", &stream, "parquet"]) + .add(compressed_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", &stream, "parquet", file_date_part]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", &stream, "parquet"]) + .add(compressed_size as i64); + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + let custom_partition_clone = custom_partition.clone(); + if custom_partition_clone.is_some() { + let custom_partition_fields = custom_partition_clone.unwrap(); + let custom_partition_list = + custom_partition_fields.split(',').collect::>(); + file_suffix = + str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } + + let stream_relative_path = format!("{stream}/{file_suffix}"); + + // Try uploading the file, handle potential errors without breaking the loop + if let Err(e) = self.upload_file(&stream_relative_path, &file).await { + error!("Failed to upload file {}: {:?}", filename, e); + continue; // Skip to the next file + } + + let absolute_path = self + .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) + .to_string(); + let store = CONFIG.storage().get_object_store(); + let manifest = + catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); + catalog::update_snapshot(store, &stream, manifest).await?; + + let _ = fs::remove_file(file); + } + + for path in stage.schema_files() { + let file = File::open(&path)?; + let schema: Schema = serde_json::from_reader(file)?; + commit_schema_to_storage(&stream, schema).await?; + let _ = fs::remove_file(path); + } + } + + Ok(()) + } + + async fn conversion(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { + if !Path::new(&CONFIG.staging_dir()).exists() { + return Ok(()); + } + + let mut conversion_tasks = FuturesUnordered::new(); + for stream in STREAM_INFO.list_streams() { + conversion_tasks.push(conversion_for_stream(stream, shutdown_signal)); + } + + while let Some(res) = conversion_tasks.next().await { + if let Err(err) = res { + error!("Failed to run conversion task {err:?}"); + return Err(err); + } + } + + Ok(()) + } +} + +async fn conversion_for_stream( + stream: String, + shutdown_signal: bool, +) -> Result<(), ObjectStorageError> { + info!("Starting arrow_conversion job for stream- {stream}"); + + let time_partition = STREAM_INFO + .get_time_partition(&stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let custom_partition = STREAM_INFO + .get_custom_partition(&stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let stage = STAGING.get_or_create_stream(&stream); + + // read arrow files on disk + // convert them to parquet + let schema = stage + .convert_disk_files_to_parquet( + time_partition.as_ref(), + custom_partition.as_ref(), + shutdown_signal, + ) + .map_err(|err| { + warn!("Error while converting arrow to parquet- {err:?}"); + ObjectStorageError::UnhandledError(Box::new(err)) + })?; + + // check if there is already a schema file in staging pertaining to this stream + // if yes, then merge them and save + + if let Some(schema) = schema { + let static_schema_flag = STREAM_INFO + .get_static_schema_flag(&stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + if !static_schema_flag { + // schema is dynamic, read from staging and merge if present + + // need to add something before .schema to make the file have an extension of type `schema` + let path = + RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&stage.data_path); + + let staging_schemas = stage.get_schemas_if_present(); + if let Some(mut staging_schemas) = staging_schemas { + warn!( + "Found {} schemas in staging for stream- {stream}", + staging_schemas.len() + ); + staging_schemas.push(schema); + let merged_schema = Schema::try_merge(staging_schemas) + .map_err(|e| ObjectStorageError::Custom(e.to_string()))?; + + warn!("writing merged schema to path- {path:?}"); + // save the merged schema on staging disk + // the path should be stream/.ingestor.id.schema + fs::write(path, to_bytes(&merged_schema))?; + } else { + info!("writing single schema to path- {path:?}"); + fs::write(path, to_bytes(&schema))?; + } + } + } + + Ok(()) } pub async fn commit_schema_to_storage( diff --git a/src/sync.rs b/src/sync.rs index 24e8be765..1eceb21bc 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -16,17 +16,61 @@ * */ -use clokwerk::{AsyncScheduler, Job, TimeUnits}; +use chrono::{TimeDelta, Timelike}; +use std::future::Future; use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; -use tokio::task; -use tokio::time::{interval, sleep, Duration}; -use tracing::{error, info, warn}; +use tokio::time::{interval_at, sleep, Duration, Instant}; +use tokio::{select, task}; +use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; use crate::staging::STAGING; -use crate::{storage, STORAGE_UPLOAD_INTERVAL}; +use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; + +// Calculates the instant that is the start of the next minute +fn next_minute() -> Instant { + let now = chrono::Utc::now(); + let time_till = now + .with_second(0) + .expect("Start of the minute") + .signed_duration_since(now) + + TimeDelta::minutes(1); + + Instant::now() + time_till.to_std().expect("Valid duration") +} + +pub async fn monitor_task_duration(task_name: &str, threshold: Duration, f: F) -> T +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send, + T: Send + 'static, +{ + let mut future = tokio::spawn(async move { f().await }); + let mut warned_once = false; + let start_time = Instant::now(); + + loop { + select! { + _ = sleep(threshold), if !warned_once => { + warn!( + "Task '{task_name}' is taking longer than expected: (threshold: {threshold:?})", + ); + warned_once = true; + }, + res = &mut future => { + if warned_once { + warn!( + "Task '{task_name}' took longer than expected: {:?} (threshold: {threshold:?})", + start_time.elapsed() - threshold + ); + } + break res.expect("Task handle shouldn't error"); + } + } + } +} pub async fn object_store_sync() -> ( task::JoinHandle<()>, @@ -38,29 +82,96 @@ pub async fn object_store_sync() -> ( let handle = task::spawn(async move { let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(STORAGE_UPLOAD_INTERVAL.seconds()) - .plus(5u32.seconds()) - .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().sync(false).await { - warn!("failed to sync local data with object store. {:?}", e); + let mut sync_interval = interval_at( + next_minute(), + Duration::from_secs(STORAGE_UPLOAD_INTERVAL as u64), + ); + + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + + loop { + select! { + _ = sync_interval.tick() => { + trace!("Syncing Parquets to Object Store... "); + if let Err(e) = monitor_task_duration( + "object_store_sync", + Duration::from_secs(15), + || async { + CONFIG + .storage() + .get_object_store() + .upload_files_from_staging() + .await + }, + ) + .await + { + warn!("failed to upload local data with object store. {e:?}"); + } + }, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } - }); + } + } + })); + + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + error!("Panic in object store sync task: {panic_error:?}"); + let _ = outbox_tx.send(()); + } + } + + info!("Object store sync task ended"); + }); + + (handle, outbox_rx, inbox_tx) +} + +pub async fn arrow_conversion() -> ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, +) { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + + let handle = task::spawn(async move { + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut sync_interval = interval_at( + next_minute() + Duration::from_secs(5), // 5 second delay + Duration::from_secs(STORAGE_CONVERSION_INTERVAL as u64), + ); let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let mut check_interval = interval(Duration::from_secs(1)); loop { - check_interval.tick().await; - scheduler.run_pending().await; - - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = sync_interval.tick() => { + trace!("Converting Arrow to Parquet... "); + if let Err(e) = monitor_task_duration( + "arrow_conversion", + Duration::from_secs(30), + || async { CONFIG.storage().get_object_store().conversion(false).await }, + ).await + { + warn!("failed to convert local arrow data to parquet. {e:?}"); + } + }, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -71,7 +182,7 @@ pub async fn object_store_sync() -> ( future.await; } Err(panic_error) => { - error!("Panic in object store sync task: {:?}", panic_error); + error!("Panic in object store sync task: {panic_error:?}"); let _ = outbox_tx.send(()); } } @@ -95,27 +206,23 @@ pub async fn run_local_sync() -> ( let mut inbox_rx = inbox_rx; let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(|| async { - STAGING.flush_all(); - }); + let mut sync_interval = interval_at( + next_minute(), + Duration::from_secs(storage::LOCAL_SYNC_INTERVAL), + ); loop { - // Sleep for 50ms - sleep(Duration::from_millis(50)).await; - - // Run any pending scheduled tasks - scheduler.run_pending().await; - - // Check inbox - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = sync_interval.tick() => { + trace!("Flushing Arrows to disk..."); + STAGING.flush_all(); + }, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -138,7 +245,7 @@ pub async fn run_local_sync() -> ( } pub async fn schedule_alert_task( - eval_frequency: u32, + eval_frequency: u64, alert: AlertConfig, ) -> Result< ( @@ -155,31 +262,25 @@ pub async fn schedule_alert_task( info!("new alert task started for {alert:?}"); let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler.every((eval_frequency).minutes()).run(move || { - let alert_val = alert.clone(); - async move { - match alerts_utils::evaluate_alert(&alert_val).await { - Ok(_) => {} - Err(err) => error!("Error while evaluation- {err}"), - } - } - }); + let mut sync_interval = + interval_at(next_minute(), Duration::from_secs(eval_frequency * 60)); let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let mut check_interval = interval(Duration::from_secs(1)); loop { - // Run any pending scheduled tasks - check_interval.tick().await; - scheduler.run_pending().await; - - // Check inbox - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = sync_interval.tick() => { + trace!("Flushing stage to disk..."); + match alerts_utils::evaluate_alert(&alert).await { + Ok(_) => {} + Err(err) => error!("Error while evaluation- {err}"), + } + }, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -190,7 +291,7 @@ pub async fn schedule_alert_task( future.await; } Err(panic_error) => { - error!("Panic in scheduled alert task: {:?}", panic_error); + error!("Panic in scheduled alert task: {panic_error:?}"); let _ = outbox_tx.send(()); } }