From c58f942e08e8ed819fd3b808ef280a0747d3a45e Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 28 Jan 2025 11:08:05 +0530 Subject: [PATCH 01/11] split arrow conversion and sync --- src/handlers/http/modal/ingest_server.rs | 13 ++ src/handlers/http/modal/mod.rs | 9 +- src/handlers/http/modal/server.rs | 14 +- src/lib.rs | 3 +- src/storage/object_storage.rs | 202 ++++++++++++++++++++++- src/storage/staging.rs | 65 +++++++- src/sync.rs | 60 ++++++- src/utils/arrow/merged_reader.rs | 1 + 8 files changed, 349 insertions(+), 18 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 215f79478..1c1b3e34d 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -108,6 +108,8 @@ impl ParseableServer for IngestServer { sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; + let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = + sync::arrow_conversion().await; tokio::spawn(airplane::server()); @@ -124,12 +126,16 @@ impl ParseableServer for IngestServer { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); + remote_conversion_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -143,6 +149,13 @@ impl ParseableServer for IngestServer { error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + }, + _ = &mut remote_conversion_outbox => { + // remote_conversion failed, this is recoverable by just starting remote_conversion thread again + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } + (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; } }; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 89fc7021e..36849666f 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -134,7 +134,14 @@ pub trait ParseableServer { // Perform S3 sync and wait for completion info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + + if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await { + warn!("Failed to convert arrow files to parquet. {:?}", e); + } else { + info!("Successfully converted arrow files to parquet."); + } + + if let Err(e) = CONFIG.storage().get_object_store().upload_files_from_staging().await { warn!("Failed to sync local data with object store. {:?}", e); } else { info!("Successfully synced all data to S3."); diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index c7ee3963f..2d67a3275 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -130,7 +130,8 @@ impl ParseableServer for Server { sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; - + let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = + sync::arrow_conversion().await; if CONFIG.options.send_analytics { analytics::init_analytics_scheduler()?; } @@ -148,12 +149,16 @@ impl ParseableServer for Server { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); + remote_conversion_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {:?}", e); } + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -167,6 +172,13 @@ impl ParseableServer for Server { error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + }, + _ = &mut remote_conversion_outbox => { + // remote_conversion failed, this is recoverable by just starting remote_conversion thread again + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } + (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; } }; diff --git a/src/lib.rs b/src/lib.rs index 973406cb5..f630af423 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,7 +58,8 @@ pub use handlers::http::modal::{ use once_cell::sync::Lazy; use reqwest::{Client, ClientBuilder}; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; +pub const STORAGE_CONVERSION_INTERVAL: u32 = 60; +pub const STORAGE_UPLOAD_INTERVAL: u32 = 30; // A single HTTP client for all outgoing HTTP requests from the parseable server static HTTP_CLIENT: Lazy = Lazy::new(|| { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 4dc8603df..a05da4ac9 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -24,10 +24,11 @@ use super::{ LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY }; +use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::alerts::AlertConfig; use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; @@ -48,7 +49,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use ulid::Ulid; use std::collections::{BTreeMap, HashSet}; @@ -574,13 +575,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(Bytes::new()) } + // sync only takes care of uploads async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } + // get all streams let streams = STREAM_INFO.list_streams(); + // start the sync loop for a stream + // parallelize this for stream in &streams { let time_partition = STREAM_INFO .get_time_partition(stream) @@ -589,6 +594,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .get_custom_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let dir = StorageDir::new(stream); + + // read arrow files on disk + // convert them to parquet let schema = convert_disk_files_to_parquet( stream, &dir, @@ -607,8 +615,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } } - let parquet_files = dir.parquet_files(); - for file in parquet_files { + let parquet_and_schema_files = dir.parquet_and_schema_files(); + for file in parquet_and_schema_files { let filename = file .file_name() .expect("only parquet files are returned by iterator") @@ -735,6 +743,192 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // pick a better name fn get_bucket_name(&self) -> String; + + async fn put_correlation( + &self, + correlation: &CorrelationConfig, + ) -> Result<(), ObjectStorageError> { + let path = RelativePathBuf::from_iter([ + CORRELATION_DIR, + &format!("{}.json", correlation.id), + ]); + self.put_object(&path, to_bytes(correlation)).await?; + Ok(()) + } + + async fn get_correlations(&self) -> Result, CorrelationError> { + let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIR]); + let correlation_bytes = self + .get_objects( + Some(&correlation_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + Ok(correlation_bytes) + } + + async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { + if !Path::new(&CONFIG.staging_dir()).exists() { + return Ok(()); + } + + // get all streams + let streams = STREAM_INFO.list_streams(); + + for stream in streams { + info!("Starting upload job for stream- {stream}"); + + let custom_partition = STREAM_INFO + .get_custom_partition(&stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + + let dir = StorageDir::new(&stream); + let parquet_and_schema_files = dir.parquet_and_schema_files(); + for file in parquet_and_schema_files { + let filename = file + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + + if filename.ends_with("parquet") { + let mut file_date_part = filename.split('.').collect::>()[0]; + file_date_part = file_date_part.split('=').collect::>()[1]; + let compressed_size = file.metadata().map_or(0, |meta| meta.len()); + STORAGE_SIZE + .with_label_values(&["data", &stream, "parquet"]) + .add(compressed_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", &stream, "parquet", file_date_part]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", &stream, "parquet"]) + .add(compressed_size as i64); + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + let custom_partition_clone = custom_partition.clone(); + if custom_partition_clone.is_some() { + let custom_partition_fields = custom_partition_clone.unwrap(); + let custom_partition_list = + custom_partition_fields.split(',').collect::>(); + file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } + + let stream_relative_path = format!("{stream}/{file_suffix}"); + + // Try uploading the file, handle potential errors without breaking the loop + if let Err(e) = self.upload_file(&stream_relative_path, &file).await { + error!("Failed to upload file {}: {:?}", filename, e); + continue; // Skip to the next file + } + + let absolute_path = self + .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) + .to_string(); + let store = CONFIG.storage().get_object_store(); + let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); + catalog::update_snapshot(store, &stream, manifest).await?; + } else { + let schema: Schema = serde_json::from_slice( + &fs::read(file.clone())? + )?; + commit_schema_to_storage(&stream, schema).await?; + } + + let _ = fs::remove_file(file); + } + } + + Ok(()) + } + + async fn conversion( + &self, + shutdown_signal: bool + ) -> Result<(), ObjectStorageError> { + if !Path::new(&CONFIG.staging_dir()).exists() { + return Ok(()); + } + + // get all streams + let streams = STREAM_INFO.list_streams(); + + for stream in &streams { + conversion_for_stream(stream, shutdown_signal)?; + } + Ok(()) + } + +} + +fn conversion_for_stream( + stream: &str, + shutdown_signal: bool +) -> Result<(), ObjectStorageError> { + + info!("Starting conversion job for stream- {stream}"); + + let time_partition = STREAM_INFO + .get_time_partition(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let custom_partition = STREAM_INFO + .get_custom_partition(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let dir = StorageDir::new(stream); + + // read arrow files on disk + // convert them to parquet + let schema = convert_disk_files_to_parquet( + stream, + &dir, + time_partition, + custom_partition.clone(), + shutdown_signal, + ) + .map_err(|err| { + warn!("Error while converting arrow to parquet- {err:?}"); + ObjectStorageError::UnhandledError(Box::new(err)) + })?; + + // check if there is already a schema file in staging pertaining to this stream + // if yes, then merge them and save + + if let Some(schema) = schema { + let static_schema_flag = STREAM_INFO + .get_static_schema_flag(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + if !static_schema_flag { + // schema is dynamic, read from staging and merge if present + + // need to add something before .schema to make the file have an extension of type `schema` + let path = RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&dir.data_path); + + let staging_schemas = dir.get_schemas_if_present(); + if let Some(mut staging_schemas) = staging_schemas { + warn!("Found {} schemas in staging for stream- {stream}", staging_schemas.len()); + staging_schemas.push(schema); + let merged_schema = Schema::try_merge(staging_schemas) + .map_err(|e|ObjectStorageError::Custom(e.to_string()))?; + + warn!("writing merged schema to path- {path:?}"); + // save the merged schema on staging disk + // the path should be stream/.ingestor.id.schema + fs::write( + path, + to_bytes(&merged_schema) + )?; + } else { + info!("writing single schema to path- {path:?}"); + fs::write( + path, + to_bytes(&schema) + )?; + } + } + } + + Ok(()) } pub async fn commit_schema_to_storage( diff --git a/src/storage/staging.rs b/src/storage/staging.rs index aab6603d9..def2a4f1a 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -50,7 +50,7 @@ use std::{ process, sync::Arc, }; -use tracing::{error, info}; +use tracing::{error, info, warn}; const ARROW_FILE_EXTENSION: &str = "data.arrows"; // const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -136,6 +136,27 @@ impl StorageDir { paths } + #[allow(dead_code)] + pub fn arrow_files_grouped_by_time(&self) -> HashMap> { + // hashmap + let mut grouped_arrow_file: HashMap> = HashMap::new(); + let arrow_files = self.arrow_files(); + for arrow_file_path in arrow_files { + let key = Self::arrow_path_to_parquet(&arrow_file_path, String::default()); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); + } + + grouped_arrow_file + } + + /// Groups arrow files which are to be included in one parquet + /// + /// Excludes the arrow file being written for the current minute (data is still being written to that one) + /// + /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, exclude: NaiveDateTime, @@ -145,6 +166,8 @@ impl StorageDir { let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); + // if the shutdown signal is false i.e. normal condition + // don't keep the ones for the current minute if !shutdown_signal { arrow_files.retain(|path| { !path @@ -176,17 +199,39 @@ impl StorageDir { grouped_arrow_file } - pub fn parquet_files(&self) -> Vec { + pub fn parquet_and_schema_files(&self) -> Vec { let Ok(dir) = self.data_path.read_dir() else { return vec![]; }; dir.flatten() .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet"))) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet") || ext.eq("schema"))) .collect() } + pub fn get_schemas_if_present(&self) -> Option> { + let Ok(dir) = self.data_path.read_dir() else { + return None; + }; + + let mut schemas: Vec = Vec::new(); + + for file in dir.flatten() { + if let Some(ext) = file.path().extension() { + if ext.eq("schema") { + let schema = match serde_json::from_slice(&std::fs::read(file.path()).unwrap()) { + Ok(schema) => schema, + Err(_) => continue, + }; + schemas.push(schema); + } + } + } + + Some(schemas) + } + fn arrow_path_to_parquet(path: &Path, random_string: String) -> PathBuf { let filename = path.file_stem().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); @@ -207,6 +252,9 @@ impl StorageDir { // data_path.join(dir) // } +/// This function reads arrow files, groups their schemas +/// +/// converts them into parquet files and returns a merged schema pub fn convert_disk_files_to_parquet( stream: &str, dir: &StorageDir, @@ -229,12 +277,13 @@ pub fn convert_disk_files_to_parquet( } // warn!("staging files-\n{staging_files:?}\n"); - for (parquet_path, files) in staging_files { + for (parquet_path, arrow_files) in staging_files { + warn!("parquet_path-\n{parquet_path:?}"); metrics::STAGING_FILES .with_label_values(&[stream]) - .set(files.len() as i64); + .set(arrow_files.len() as i64); - for file in &files { + for file in &arrow_files { let file_size = file.metadata().unwrap().len(); let file_type = file.extension().unwrap().to_str().unwrap(); @@ -243,7 +292,7 @@ pub fn convert_disk_files_to_parquet( .add(file_size as i64); } - let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); + let record_reader = MergedReverseRecordReader::try_new(&arrow_files).unwrap(); if record_reader.readers.is_empty() { continue; } @@ -281,7 +330,7 @@ pub fn convert_disk_files_to_parquet( ); fs::remove_file(parquet_path).unwrap(); } else { - for file in files { + for file in arrow_files { // warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len(); let file_type = file.extension().unwrap().to_str().unwrap(); diff --git a/src/sync.rs b/src/sync.rs index d843e3a3a..f65114d59 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -25,7 +25,7 @@ use tracing::{error, info, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; -use crate::{storage, STORAGE_UPLOAD_INTERVAL}; +use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; pub async fn object_store_sync() -> ( task::JoinHandle<()>, @@ -40,10 +40,64 @@ pub async fn object_store_sync() -> ( let mut scheduler = AsyncScheduler::new(); scheduler .every(STORAGE_UPLOAD_INTERVAL.seconds()) + // .plus(5u32.seconds()) + .run(|| async { + if let Err(e) = CONFIG.storage().get_object_store().upload_files_from_staging().await { + warn!("failed to upload local data with object store. {:?}", e); + } + }); + + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let mut check_interval = interval(Duration::from_secs(1)); + + loop { + check_interval.tick().await; + scheduler.run_pending().await; + + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + warn!("Inbox channel closed unexpectedly"); + break; + } + } + } + })); + + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + error!("Panic in object store sync task: {:?}", panic_error); + let _ = outbox_tx.send(()); + } + } + + info!("Object store sync task ended"); + }); + + (handle, outbox_rx, inbox_tx) +} + +pub async fn arrow_conversion() -> ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, +) { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + + let handle = task::spawn(async move { + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(STORAGE_CONVERSION_INTERVAL.seconds()) .plus(5u32.seconds()) .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().sync(false).await { - warn!("failed to sync local data with object store. {:?}", e); + if let Err(e) = CONFIG.storage().get_object_store().conversion(false).await { + warn!("failed to convert local arrow data to parquet. {:?}", e); } }); diff --git a/src/utils/arrow/merged_reader.rs b/src/utils/arrow/merged_reader.rs index 3248bd37d..45436106e 100644 --- a/src/utils/arrow/merged_reader.rs +++ b/src/utils/arrow/merged_reader.rs @@ -112,6 +112,7 @@ impl MergedReverseRecordReader { .map(move |batch| adapt_batch(&schema, &batch)) } + /// Tries to merge the schemas present in self.readers pub fn merged_schema(&self) -> Schema { Schema::try_merge( self.readers From 46780a775b806b9bc5af29e07e7a594a057d55f0 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 28 Jan 2025 11:11:41 +0530 Subject: [PATCH 02/11] refactor: linting --- src/handlers/http/modal/ingest_server.rs | 7 ++- src/handlers/http/modal/mod.rs | 9 ++- src/handlers/http/modal/server.rs | 7 ++- src/storage/object_storage.rs | 74 ++++++++++-------------- src/storage/staging.rs | 16 +++-- src/sync.rs | 7 ++- 6 files changed, 64 insertions(+), 56 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 1c1b3e34d..a43bd5d53 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -108,8 +108,11 @@ impl ParseableServer for IngestServer { sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; - let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = - sync::arrow_conversion().await; + let ( + mut remote_conversion_handler, + mut remote_conversion_outbox, + mut remote_conversion_inbox, + ) = sync::arrow_conversion().await; tokio::spawn(airplane::server()); diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 36849666f..70819c026 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -134,14 +134,19 @@ pub trait ParseableServer { // Perform S3 sync and wait for completion info!("Starting data sync to S3..."); - + if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await { warn!("Failed to convert arrow files to parquet. {:?}", e); } else { info!("Successfully converted arrow files to parquet."); } - if let Err(e) = CONFIG.storage().get_object_store().upload_files_from_staging().await { + if let Err(e) = CONFIG + .storage() + .get_object_store() + .upload_files_from_staging() + .await + { warn!("Failed to sync local data with object store. {:?}", e); } else { info!("Successfully synced all data to S3."); diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 2d67a3275..5454c252e 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -130,8 +130,11 @@ impl ParseableServer for Server { sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; - let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = - sync::arrow_conversion().await; + let ( + mut remote_conversion_handler, + mut remote_conversion_outbox, + mut remote_conversion_inbox, + ) = sync::arrow_conversion().await; if CONFIG.options.send_analytics { analytics::init_analytics_scheduler()?; } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a05da4ac9..fa3f1e44d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -748,10 +748,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { &self, correlation: &CorrelationConfig, ) -> Result<(), ObjectStorageError> { - let path = RelativePathBuf::from_iter([ - CORRELATION_DIR, - &format!("{}.json", correlation.id), - ]); + let path = + RelativePathBuf::from_iter([CORRELATION_DIR, &format!("{}.json", correlation.id)]); self.put_object(&path, to_bytes(correlation)).await?; Ok(()) } @@ -772,7 +770,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } - + // get all streams let streams = STREAM_INFO.list_streams(); @@ -780,9 +778,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { info!("Starting upload job for stream- {stream}"); let custom_partition = STREAM_INFO - .get_custom_partition(&stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - + .get_custom_partition(&stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let dir = StorageDir::new(&stream); let parquet_and_schema_files = dir.parquet_and_schema_files(); for file in parquet_and_schema_files { @@ -806,67 +804,59 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .with_label_values(&["data", &stream, "parquet"]) .add(compressed_size as i64); let mut file_suffix = str::replacen(filename, ".", "/", 3); - + let custom_partition_clone = custom_partition.clone(); if custom_partition_clone.is_some() { let custom_partition_fields = custom_partition_clone.unwrap(); let custom_partition_list = custom_partition_fields.split(',').collect::>(); - file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + file_suffix = + str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); } - + let stream_relative_path = format!("{stream}/{file_suffix}"); - + // Try uploading the file, handle potential errors without breaking the loop if let Err(e) = self.upload_file(&stream_relative_path, &file).await { error!("Failed to upload file {}: {:?}", filename, e); continue; // Skip to the next file } - + let absolute_path = self .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) .to_string(); let store = CONFIG.storage().get_object_store(); - let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); + let manifest = + catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, &stream, manifest).await?; } else { - let schema: Schema = serde_json::from_slice( - &fs::read(file.clone())? - )?; + let schema: Schema = serde_json::from_slice(&fs::read(file.clone())?)?; commit_schema_to_storage(&stream, schema).await?; } let _ = fs::remove_file(file); } } - + Ok(()) } - async fn conversion( - &self, - shutdown_signal: bool - ) -> Result<(), ObjectStorageError> { + async fn conversion(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } - + // get all streams let streams = STREAM_INFO.list_streams(); - + for stream in &streams { conversion_for_stream(stream, shutdown_signal)?; } Ok(()) } - } -fn conversion_for_stream( - stream: &str, - shutdown_signal: bool -) -> Result<(), ObjectStorageError> { - +fn conversion_for_stream(stream: &str, shutdown_signal: bool) -> Result<(), ObjectStorageError> { info!("Starting conversion job for stream- {stream}"); let time_partition = STREAM_INFO @@ -902,28 +892,26 @@ fn conversion_for_stream( // schema is dynamic, read from staging and merge if present // need to add something before .schema to make the file have an extension of type `schema` - let path = RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&dir.data_path); - + let path = + RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&dir.data_path); + let staging_schemas = dir.get_schemas_if_present(); if let Some(mut staging_schemas) = staging_schemas { - warn!("Found {} schemas in staging for stream- {stream}", staging_schemas.len()); + warn!( + "Found {} schemas in staging for stream- {stream}", + staging_schemas.len() + ); staging_schemas.push(schema); let merged_schema = Schema::try_merge(staging_schemas) - .map_err(|e|ObjectStorageError::Custom(e.to_string()))?; - + .map_err(|e| ObjectStorageError::Custom(e.to_string()))?; + warn!("writing merged schema to path- {path:?}"); // save the merged schema on staging disk // the path should be stream/.ingestor.id.schema - fs::write( - path, - to_bytes(&merged_schema) - )?; + fs::write(path, to_bytes(&merged_schema))?; } else { info!("writing single schema to path- {path:?}"); - fs::write( - path, - to_bytes(&schema) - )?; + fs::write(path, to_bytes(&schema))?; } } } diff --git a/src/storage/staging.rs b/src/storage/staging.rs index def2a4f1a..f7a7c8540 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -153,9 +153,9 @@ impl StorageDir { } /// Groups arrow files which are to be included in one parquet - /// + /// /// Excludes the arrow file being written for the current minute (data is still being written to that one) - /// + /// /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, @@ -206,7 +206,10 @@ impl StorageDir { dir.flatten() .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet") || ext.eq("schema"))) + .filter(|file| { + file.extension() + .is_some_and(|ext| ext.eq("parquet") || ext.eq("schema")) + }) .collect() } @@ -220,7 +223,8 @@ impl StorageDir { for file in dir.flatten() { if let Some(ext) = file.path().extension() { if ext.eq("schema") { - let schema = match serde_json::from_slice(&std::fs::read(file.path()).unwrap()) { + let schema = match serde_json::from_slice(&std::fs::read(file.path()).unwrap()) + { Ok(schema) => schema, Err(_) => continue, }; @@ -228,7 +232,7 @@ impl StorageDir { } } } - + Some(schemas) } @@ -253,7 +257,7 @@ impl StorageDir { // } /// This function reads arrow files, groups their schemas -/// +/// /// converts them into parquet files and returns a merged schema pub fn convert_disk_files_to_parquet( stream: &str, diff --git a/src/sync.rs b/src/sync.rs index f65114d59..3a564bca7 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -42,7 +42,12 @@ pub async fn object_store_sync() -> ( .every(STORAGE_UPLOAD_INTERVAL.seconds()) // .plus(5u32.seconds()) .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().upload_files_from_staging().await { + if let Err(e) = CONFIG + .storage() + .get_object_store() + .upload_files_from_staging() + .await + { warn!("failed to upload local data with object store. {:?}", e); } }); From a9743d59a87d2ce5b111fa6272bbfc038a6ab39f Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 30 Jan 2025 16:31:53 +0530 Subject: [PATCH 03/11] handled return condition Returning None based on length of vec for proper message printing --- src/storage/staging.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/storage/staging.rs b/src/storage/staging.rs index f7a7c8540..099332c5d 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -50,7 +50,7 @@ use std::{ process, sync::Arc, }; -use tracing::{error, info, warn}; +use tracing::{error, info}; const ARROW_FILE_EXTENSION: &str = "data.arrows"; // const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -233,7 +233,12 @@ impl StorageDir { } } - Some(schemas) + if schemas.len() > 0 { + Some(schemas) + } else { + None + } + } fn arrow_path_to_parquet(path: &Path, random_string: String) -> PathBuf { @@ -282,7 +287,6 @@ pub fn convert_disk_files_to_parquet( // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { - warn!("parquet_path-\n{parquet_path:?}"); metrics::STAGING_FILES .with_label_values(&[stream]) .set(arrow_files.len() as i64); From 1c26d5b5ba31d4cc5a744d5c829aaf753a175873 Mon Sep 17 00:00:00 2001 From: anant Date: Sun, 2 Feb 2025 13:35:27 +0530 Subject: [PATCH 04/11] add warnings Upload and conversion tasks are handled by a monitor task which checks whether these tasks have crossed a threshold or not. In case either of these tasks crosses a pre-defined threshold, a warning will be printed. --- src/sync.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 3a564bca7..10beb1831 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -18,15 +18,66 @@ use clokwerk::{AsyncScheduler, Job, TimeUnits}; use std::panic::AssertUnwindSafe; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use tokio::sync::oneshot; -use tokio::task; -use tokio::time::{interval, sleep, Duration}; +use tokio::time::{interval, sleep, Duration, Instant}; +use tokio::{select, task}; use tracing::{error, info, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; +pub async fn monitor_task_duration(task_name: &str, threshold: Duration, f: F) -> T +where + F: FnOnce() -> Fut, + Fut: std::future::Future, +{ + let warning_issued = Arc::new(AtomicBool::new(false)); + let warning_issued_clone = warning_issued.clone(); + let warning_issued_clone_select = warning_issued.clone(); + let start_time = Instant::now(); + + // create the main task future + let task_future = f(); + + // create the monitoring task future + let monitor_future = async move { + let mut check_interval = interval(Duration::from_millis(100)); + + loop { + check_interval.tick().await; + let elapsed = start_time.elapsed(); + + if elapsed > threshold + && !warning_issued_clone.load(std::sync::atomic::Ordering::Relaxed) + { + warn!( + "Task '{}' is taking longer than expected: (threshold: {:?})", + task_name, threshold + ); + warning_issued_clone.store(true, std::sync::atomic::Ordering::Relaxed); + } + } + }; + + // run both futures concurrently, but only take the result from the task future + select! { + task_result = task_future => { + if warning_issued_clone_select.load(std::sync::atomic::Ordering::Relaxed) { + let elapsed = start_time.elapsed(); + warn!( + "Task '{}' took longer than expected: {:?} (threshold: {:?})", + task_name, elapsed, threshold + ); + } + task_result + }, + _ = monitor_future => unreachable!(), // monitor future never completes + } +} + pub async fn object_store_sync() -> ( task::JoinHandle<()>, oneshot::Receiver<()>, @@ -42,11 +93,18 @@ pub async fn object_store_sync() -> ( .every(STORAGE_UPLOAD_INTERVAL.seconds()) // .plus(5u32.seconds()) .run(|| async { - if let Err(e) = CONFIG - .storage() - .get_object_store() - .upload_files_from_staging() - .await + if let Err(e) = monitor_task_duration( + "object_store_sync", + Duration::from_secs(15), + || async { + CONFIG + .storage() + .get_object_store() + .upload_files_from_staging() + .await + }, + ) + .await { warn!("failed to upload local data with object store. {:?}", e); } @@ -101,7 +159,13 @@ pub async fn arrow_conversion() -> ( .every(STORAGE_CONVERSION_INTERVAL.seconds()) .plus(5u32.seconds()) .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().conversion(false).await { + if let Err(e) = monitor_task_duration( + "arrow_conversion", + Duration::from_secs(30), + || async { CONFIG.storage().get_object_store().conversion(false).await }, + ) + .await + { warn!("failed to convert local arrow data to parquet. {:?}", e); } }); From e8af0123ed3cff3b192d6a1842f4876647b925d7 Mon Sep 17 00:00:00 2001 From: anant Date: Sun, 2 Feb 2025 17:19:13 +0530 Subject: [PATCH 05/11] parallelize conversion, remove old sync fn --- src/storage/object_storage.rs | 125 ++++++---------------------------- 1 file changed, 21 insertions(+), 104 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index fa3f1e44d..68c606038 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -46,6 +46,8 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Local, Utc}; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; @@ -575,100 +577,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(Bytes::new()) } - // sync only takes care of uploads - async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { - if !Path::new(&CONFIG.staging_dir()).exists() { - return Ok(()); - } - - // get all streams - let streams = STREAM_INFO.list_streams(); - - // start the sync loop for a stream - // parallelize this - for stream in &streams { - let time_partition = STREAM_INFO - .get_time_partition(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let custom_partition = STREAM_INFO - .get_custom_partition(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let dir = StorageDir::new(stream); - - // read arrow files on disk - // convert them to parquet - let schema = convert_disk_files_to_parquet( - stream, - &dir, - time_partition, - custom_partition.clone(), - shutdown_signal, - ) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - - if let Some(schema) = schema { - let static_schema_flag = STREAM_INFO - .get_static_schema_flag(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - if !static_schema_flag { - commit_schema_to_storage(stream, schema).await?; - } - } - - let parquet_and_schema_files = dir.parquet_and_schema_files(); - for file in parquet_and_schema_files { - let filename = file - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - - let mut file_date_part = filename.split('.').collect::>()[0]; - file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - EVENTS_STORAGE_SIZE_DATE - .with_label_values(&["data", stream, "parquet", file_date_part]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - let mut file_suffix = str::replacen(filename, ".", "/", 3); - - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = - str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); - } - - let stream_relative_path = format!("{stream}/{file_suffix}"); - - // Try uploading the file, handle potential errors without breaking the loop - if let Err(e) = self.upload_file(&stream_relative_path, &file).await { - error!("Failed to upload file {}: {:?}", filename, e); - continue; // Skip to the next file - } - - let absolute_path = self - .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) - .to_string(); - let store = CONFIG.storage().get_object_store(); - let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); - catalog::update_snapshot(store, stream, manifest).await?; - - let _ = fs::remove_file(file); - } - } - - Ok(()) - } - async fn get_stream_meta_from_storage( &self, stream_name: &str, @@ -846,31 +754,40 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { return Ok(()); } - // get all streams - let streams = STREAM_INFO.list_streams(); + let mut conversion_tasks = FuturesUnordered::new(); + for stream in STREAM_INFO.list_streams() { + conversion_tasks.push(conversion_for_stream(stream, shutdown_signal)); + } - for stream in &streams { - conversion_for_stream(stream, shutdown_signal)?; + while let Some(res) = conversion_tasks.next().await { + if let Err(err) = res { + error!("Failed to run conversion task {err:?}"); + return Err(err); + } } + Ok(()) } } -fn conversion_for_stream(stream: &str, shutdown_signal: bool) -> Result<(), ObjectStorageError> { +async fn conversion_for_stream( + stream: String, + shutdown_signal: bool, +) -> Result<(), ObjectStorageError> { info!("Starting conversion job for stream- {stream}"); let time_partition = STREAM_INFO - .get_time_partition(stream) + .get_time_partition(&stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let custom_partition = STREAM_INFO - .get_custom_partition(stream) + .get_custom_partition(&stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let dir = StorageDir::new(stream); + let dir = StorageDir::new(&stream); // read arrow files on disk // convert them to parquet let schema = convert_disk_files_to_parquet( - stream, + &stream, &dir, time_partition, custom_partition.clone(), @@ -886,7 +803,7 @@ fn conversion_for_stream(stream: &str, shutdown_signal: bool) -> Result<(), Obje if let Some(schema) = schema { let static_schema_flag = STREAM_INFO - .get_static_schema_flag(stream) + .get_static_schema_flag(&stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if !static_schema_flag { // schema is dynamic, read from staging and merge if present From cdd863aa69094216bf1f65765d60bed8d954bc88 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 3 Feb 2025 11:20:19 +0530 Subject: [PATCH 06/11] modified warn statement --- src/handlers/http/logstream.rs | 2 -- src/storage/object_storage.rs | 8 +++----- src/sync.rs | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 2aa544dbe..8bf24a2ea 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -102,8 +102,6 @@ pub async fn list(req: HttpRequest) -> Result { .unwrap() .into_iter() .filter(|logstream| { - warn!("logstream-\n{logstream:?}"); - Users.authorize(key.clone(), Action::ListStream, Some(logstream), None) == crate::rbac::Response::Authorized }) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 68c606038..afa08978c 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -680,10 +680,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } // get all streams - let streams = STREAM_INFO.list_streams(); - - for stream in streams { - info!("Starting upload job for stream- {stream}"); + for stream in STREAM_INFO.list_streams() { + info!("Starting object_store_sync for stream- {stream}"); let custom_partition = STREAM_INFO .get_custom_partition(&stream) @@ -774,7 +772,7 @@ async fn conversion_for_stream( stream: String, shutdown_signal: bool, ) -> Result<(), ObjectStorageError> { - info!("Starting conversion job for stream- {stream}"); + info!("Starting arrow_conversion job for stream- {stream}"); let time_partition = STREAM_INFO .get_time_partition(&stream) diff --git a/src/sync.rs b/src/sync.rs index 10beb1831..521eba9f5 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -54,7 +54,7 @@ where && !warning_issued_clone.load(std::sync::atomic::Ordering::Relaxed) { warn!( - "Task '{}' is taking longer than expected: (threshold: {:?})", + "Task '{}' started at: {start_time:?} is taking longer than expected: (threshold: {:?})", task_name, threshold ); warning_issued_clone.store(true, std::sync::atomic::Ordering::Relaxed); @@ -68,7 +68,7 @@ where if warning_issued_clone_select.load(std::sync::atomic::Ordering::Relaxed) { let elapsed = start_time.elapsed(); warn!( - "Task '{}' took longer than expected: {:?} (threshold: {:?})", + "Task '{}' started at: {start_time:?} took longer than expected: {:?} (threshold: {:?})", task_name, elapsed, threshold ); } From b6c552182c9e3fe68284e871568c599812a06833 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 3 Feb 2025 11:29:51 +0530 Subject: [PATCH 07/11] refactor: clippy --- src/storage/object_storage.rs | 6 ++++-- src/storage/staging.rs | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index afa08978c..c57f6f0af 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,11 +21,13 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY + LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, + PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; -use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::alerts::AlertConfig; +use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; diff --git a/src/storage/staging.rs b/src/storage/staging.rs index 099332c5d..332ca5280 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -233,12 +233,11 @@ impl StorageDir { } } - if schemas.len() > 0 { + if !schemas.is_empty() { Some(schemas) } else { None } - } fn arrow_path_to_parquet(path: &Path, random_string: String) -> PathBuf { From 3a5ee5aa0883db46914b5d2338fb98f5cc920930 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 5 Feb 2025 11:49:47 +0530 Subject: [PATCH 08/11] perf: utilise lesser CPU cycles by actually utilising tokio --- src/sync.rs | 149 +++++++++++++++++++++------------------------------- 1 file changed, 59 insertions(+), 90 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 35a2d0f4b..56146b2a5 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,11 +17,11 @@ */ use clokwerk::{AsyncScheduler, Job, TimeUnits}; +use std::future::Future; use std::panic::AssertUnwindSafe; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; +use std::time::Instant; use tokio::sync::oneshot; -use tokio::time::{interval, sleep, Duration, Instant}; +use tokio::time::{sleep, Duration}; use tokio::{select, task}; use tracing::{error, info, warn}; @@ -32,50 +32,32 @@ use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; pub async fn monitor_task_duration(task_name: &str, threshold: Duration, f: F) -> T where - F: FnOnce() -> Fut, - Fut: std::future::Future, + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send, + T: Send + 'static, { - let warning_issued = Arc::new(AtomicBool::new(false)); - let warning_issued_clone = warning_issued.clone(); - let warning_issued_clone_select = warning_issued.clone(); + let mut future = tokio::spawn(async move { f().await }); + let mut warned_once = false; let start_time = Instant::now(); - // create the main task future - let task_future = f(); - - // create the monitoring task future - let monitor_future = async move { - let mut check_interval = interval(threshold); - - loop { - check_interval.tick().await; - let elapsed = start_time.elapsed(); - - if elapsed > threshold - && !warning_issued_clone.load(std::sync::atomic::Ordering::Relaxed) - { + loop { + select! { + _ = sleep(threshold), if !warned_once => { warn!( - "Task '{}' started at: {start_time:?} is taking longer than expected: (threshold: {:?})", - task_name, threshold + "Task '{task_name}' started at: {start_time:?} is taking longer than expected: (threshold: {threshold:?})", ); - warning_issued_clone.store(true, std::sync::atomic::Ordering::Relaxed); + warned_once = true; + }, + res = &mut future => { + if warned_once { + warn!( + "Task '{task_name}' started at: {start_time:?} took longer than expected: {:?} (threshold: {threshold:?})", + start_time.elapsed() + ); + } + break res.expect("Task handle shouldn't error"); } } - }; - - // run both futures concurrently, but only take the result from the task future - select! { - task_result = task_future => { - if warning_issued_clone_select.load(std::sync::atomic::Ordering::Relaxed) { - let elapsed = start_time.elapsed(); - warn!( - "Task '{}' started at: {start_time:?} took longer than expected: {:?} (threshold: {:?})", - task_name, elapsed, threshold - ); - } - task_result - }, - _ = monitor_future => unreachable!(), // monitor future never completes } } @@ -107,23 +89,21 @@ pub async fn object_store_sync() -> ( ) .await { - warn!("failed to upload local data with object store. {:?}", e); + warn!("failed to upload local data with object store. {e:?}"); } }); let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let mut check_interval = interval(Duration::from_secs(1)); loop { - check_interval.tick().await; - scheduler.run_pending().await; - - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = scheduler.run_pending() => {}, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -134,7 +114,7 @@ pub async fn object_store_sync() -> ( future.await; } Err(panic_error) => { - error!("Panic in object store sync task: {:?}", panic_error); + error!("Panic in object store sync task: {panic_error:?}"); let _ = outbox_tx.send(()); } } @@ -167,23 +147,21 @@ pub async fn arrow_conversion() -> ( ) .await { - warn!("failed to convert local arrow data to parquet. {:?}", e); + warn!("failed to convert local arrow data to parquet. {e:?}"); } }); let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let mut check_interval = interval(Duration::from_secs(1)); loop { - check_interval.tick().await; - scheduler.run_pending().await; - - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = scheduler.run_pending() => {}, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -194,7 +172,7 @@ pub async fn arrow_conversion() -> ( future.await; } Err(panic_error) => { - error!("Panic in object store sync task: {:?}", panic_error); + error!("Panic in object store sync task: {panic_error:?}"); let _ = outbox_tx.send(()); } } @@ -226,19 +204,14 @@ pub async fn run_local_sync() -> ( }); loop { - // Sleep for 50ms - sleep(Duration::from_millis(50)).await; - - // Run any pending scheduled tasks - scheduler.run_pending().await; - - // Check inbox - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = scheduler.run_pending() => {}, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -289,20 +262,16 @@ pub async fn schedule_alert_task( } }); let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let mut check_interval = interval(Duration::from_secs(1)); loop { - // Run any pending scheduled tasks - check_interval.tick().await; - scheduler.run_pending().await; - - // Check inbox - match inbox_rx.try_recv() { - Ok(_) => break, - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - warn!("Inbox channel closed unexpectedly"); - break; + select! { + _ = scheduler.run_pending() => {}, + res = &mut inbox_rx => {match res{ + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + }} } } } @@ -313,7 +282,7 @@ pub async fn schedule_alert_task( future.await; } Err(panic_error) => { - error!("Panic in scheduled alert task: {:?}", panic_error); + error!("Panic in scheduled alert task: {panic_error:?}"); let _ = outbox_tx.send(()); } } From a30da15c67d29bbd6505eee2cdd128d18e50868f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 5 Feb 2025 13:40:41 +0530 Subject: [PATCH 09/11] don't schedule with clokwerk --- src/alerts/mod.rs | 4 +- src/sync.rs | 131 +++++++++++++++++++++++++--------------------- 2 files changed, 72 insertions(+), 63 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index bd6603088..8f89136ef 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -330,7 +330,7 @@ pub struct RollingWindow { // should always be "now" pub eval_end: String, // x minutes (5m) - pub eval_frequency: u32, + pub eval_frequency: u64, } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -641,7 +641,7 @@ impl AlertConfig { columns } - pub fn get_eval_frequency(&self) -> u32 { + pub fn get_eval_frequency(&self) -> u64 { match &self.eval_type { EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, } diff --git a/src/sync.rs b/src/sync.rs index 56146b2a5..2d6a4dae7 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -16,20 +16,31 @@ * */ -use clokwerk::{AsyncScheduler, Job, TimeUnits}; +use chrono::{TimeDelta, Timelike}; use std::future::Future; use std::panic::AssertUnwindSafe; -use std::time::Instant; use tokio::sync::oneshot; -use tokio::time::{sleep, Duration}; +use tokio::time::{interval_at, sleep, Duration, Instant}; use tokio::{select, task}; -use tracing::{error, info, warn}; +use tracing::{error, info, warn, trace}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; use crate::staging::STAGING; use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; +// Calculates the instant that is the start of the next minute +fn next_minute() -> Instant { + let now = chrono::Utc::now(); + let time_till = now + .with_second(0) + .expect("Start of the minute") + .signed_duration_since(now) + + TimeDelta::minutes(1); + + Instant::now() + time_till.to_std().expect("Valid duration") +} + pub async fn monitor_task_duration(task_name: &str, threshold: Duration, f: F) -> T where F: FnOnce() -> Fut + Send + 'static, @@ -71,33 +82,33 @@ pub async fn object_store_sync() -> ( let handle = task::spawn(async move { let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(STORAGE_UPLOAD_INTERVAL.seconds()) - // .plus(5u32.seconds()) - .run(|| async { - if let Err(e) = monitor_task_duration( - "object_store_sync", - Duration::from_secs(15), - || async { - CONFIG - .storage() - .get_object_store() - .upload_files_from_staging() - .await - }, - ) - .await - { - warn!("failed to upload local data with object store. {e:?}"); - } - }); + let mut sync_interval = interval_at( + next_minute(), + Duration::from_secs(STORAGE_UPLOAD_INTERVAL as u64), + ); let mut inbox_rx = AssertUnwindSafe(inbox_rx); loop { select! { - _ = scheduler.run_pending() => {}, + _ = sync_interval.tick() => { + trace!("Syncing Parquets to Object Store... "); + if let Err(e) = monitor_task_duration( + "object_store_sync", + Duration::from_secs(15), + || async { + CONFIG + .storage() + .get_object_store() + .upload_files_from_staging() + .await + }, + ) + .await + { + warn!("failed to upload local data with object store. {e:?}"); + } + }, res = &mut inbox_rx => {match res{ Ok(_) => break, Err(_) => { @@ -135,27 +146,26 @@ pub async fn arrow_conversion() -> ( let handle = task::spawn(async move { let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(STORAGE_CONVERSION_INTERVAL.seconds()) - .plus(5u32.seconds()) - .run(|| async { - if let Err(e) = monitor_task_duration( - "arrow_conversion", - Duration::from_secs(30), - || async { CONFIG.storage().get_object_store().conversion(false).await }, - ) - .await - { - warn!("failed to convert local arrow data to parquet. {e:?}"); - } - }); + let mut sync_interval = interval_at( + next_minute() + Duration::from_secs(5), // 5 second delay + Duration::from_secs(STORAGE_CONVERSION_INTERVAL as u64), + ); let mut inbox_rx = AssertUnwindSafe(inbox_rx); loop { select! { - _ = scheduler.run_pending() => {}, + _ = sync_interval.tick() => { + trace!("Converting Arrow to Parquet... "); + if let Err(e) = monitor_task_duration( + "arrow_conversion", + Duration::from_secs(30), + || async { CONFIG.storage().get_object_store().conversion(false).await }, + ).await + { + warn!("failed to convert local arrow data to parquet. {e:?}"); + } + }, res = &mut inbox_rx => {match res{ Ok(_) => break, Err(_) => { @@ -196,16 +206,17 @@ pub async fn run_local_sync() -> ( let mut inbox_rx = inbox_rx; let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(|| async { - STAGING.flush_all(); - }); + let mut sync_interval = interval_at( + next_minute(), + Duration::from_secs(storage::LOCAL_SYNC_INTERVAL), + ); loop { select! { - _ = scheduler.run_pending() => {}, + _ = sync_interval.tick() => { + trace!("Flushing Arrows to disk..."); + STAGING.flush_all(); + }, res = &mut inbox_rx => {match res{ Ok(_) => break, Err(_) => { @@ -234,7 +245,7 @@ pub async fn run_local_sync() -> ( } pub async fn schedule_alert_task( - eval_frequency: u32, + eval_frequency: u64, alert: AlertConfig, ) -> Result< ( @@ -251,21 +262,19 @@ pub async fn schedule_alert_task( info!("new alert task started for {alert:?}"); let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut scheduler = AsyncScheduler::new(); - scheduler.every((eval_frequency).minutes()).run(move || { - let alert_val = alert.clone(); - async move { - match alerts_utils::evaluate_alert(&alert_val).await { - Ok(_) => {} - Err(err) => error!("Error while evaluation- {err}"), - } - } - }); + let mut sync_interval = + interval_at(next_minute(), Duration::from_secs(eval_frequency * 60)); let mut inbox_rx = AssertUnwindSafe(inbox_rx); loop { select! { - _ = scheduler.run_pending() => {}, + _ = sync_interval.tick() => { + trace!("Flushing stage to disk..."); + match alerts_utils::evaluate_alert(&alert).await { + Ok(_) => {} + Err(err) => error!("Error while evaluation- {err}"), + } + }, res = &mut inbox_rx => {match res{ Ok(_) => break, Err(_) => { From 9b50048c3f514a09bb9f70553df874d725620e56 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 5 Feb 2025 13:49:35 +0530 Subject: [PATCH 10/11] ehance warning message --- src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 2d6a4dae7..16cf448ee 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -22,7 +22,7 @@ use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; use tokio::time::{interval_at, sleep, Duration, Instant}; use tokio::{select, task}; -use tracing::{error, info, warn, trace}; +use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; @@ -63,7 +63,7 @@ where if warned_once { warn!( "Task '{task_name}' started at: {start_time:?} took longer than expected: {:?} (threshold: {threshold:?})", - start_time.elapsed() + start_time.elapsed() - threshold ); } break res.expect("Task handle shouldn't error"); From 601e04ef2b43cce42a4f7d42117f02f38890f169 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 5 Feb 2025 14:03:13 +0530 Subject: [PATCH 11/11] don't include `start_time` in warning --- src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 16cf448ee..1eceb21bc 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -55,14 +55,14 @@ where select! { _ = sleep(threshold), if !warned_once => { warn!( - "Task '{task_name}' started at: {start_time:?} is taking longer than expected: (threshold: {threshold:?})", + "Task '{task_name}' is taking longer than expected: (threshold: {threshold:?})", ); warned_once = true; }, res = &mut future => { if warned_once { warn!( - "Task '{task_name}' started at: {start_time:?} took longer than expected: {:?} (threshold: {threshold:?})", + "Task '{task_name}' took longer than expected: {:?} (threshold: {threshold:?})", start_time.elapsed() - threshold ); }