From c3bdb12b47b33477b87ca1182226fefa4b191630 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Tue, 17 Jun 2025 19:32:40 +0530 Subject: [PATCH 01/18] feat: add resource utilization middleware to monitor CPU and memory usage --- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/mod.rs | 3 +- src/handlers/http/resource_check.rs | 76 +++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 src/handlers/http/resource_check.rs diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 874a2aed5..6b0e71cbd 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -36,6 +36,7 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; +pub mod resource_check; mod kinesis; pub mod llm; pub mod logstream; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 3504ef0fa..b186106b0 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -45,7 +45,7 @@ use crate::{ utils::get_node_id, }; -use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; +use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION}; pub mod ingest; pub mod ingest_server; @@ -113,6 +113,7 @@ pub trait ParseableServer { .wrap(prometheus.clone()) .configure(|config| Self::configure_routes(config, oidc_client.clone())) .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) .wrap(from_fn(audit::audit_log_middleware)) .wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Compress::default()) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs new file mode 100644 index 000000000..2fa3dae3d --- /dev/null +++ b/src/handlers/http/resource_check.rs @@ -0,0 +1,76 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + error::Error, + error::ErrorServiceUnavailable, + middleware::Next, +}; +use sysinfo::System; +use tracing::warn; + +const CPU_UTILIZATION_THRESHOLD: f32 = 50.0; +const MEMORY_UTILIZATION_THRESHOLD: f32 = 50.0; + +/// Middleware to check system resource utilization before processing requests +/// Returns 503 Service Unavailable if CPU or memory usage exceeds thresholds +pub async fn check_resource_utilization_middleware( + req: ServiceRequest, + next: Next, +) -> Result, Error> { + + let mut sys = System::new_all(); + sys.refresh_cpu_usage(); + sys.refresh_memory(); + + let used_memory = sys.used_memory() as f32; + let total_memory = sys.total_memory() as f32; + + // Check memory utilization + if total_memory > 0.0 { + let memory_usage = (used_memory / total_memory) * 100.0; + if memory_usage > MEMORY_UTILIZATION_THRESHOLD { + let error_msg = format!("Memory is over-utilized: {:.1}%", memory_usage); + warn!( + "Rejecting request to {} due to high memory usage: {:.1}% (threshold: {:.1}%)", + req.path(), + memory_usage, + MEMORY_UTILIZATION_THRESHOLD + ); + return Err(ErrorServiceUnavailable(error_msg)); + } + } + + // Check CPU utilization + let cpu_usage = sys.global_cpu_usage(); + if cpu_usage > CPU_UTILIZATION_THRESHOLD { + let error_msg = format!("CPU is over-utilized: {:.1}%", cpu_usage); + warn!( + "Rejecting request to {} due to high CPU usage: {:.1}% (threshold: {:.1}%)", + req.path(), + cpu_usage, + CPU_UTILIZATION_THRESHOLD + ); + return Err(ErrorServiceUnavailable(error_msg)); + } + + // Continue processing the request if resource utilization is within limits + next.call(req).await +} From 4fb2da7052c3590f43cfaab57e288954dd9e823a Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Tue, 17 Jun 2025 19:45:08 +0530 Subject: [PATCH 02/18] refactor: changed thresholds to 90% --- src/handlers/http/resource_check.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index 2fa3dae3d..36eed29df 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -26,8 +26,8 @@ use actix_web::{ use sysinfo::System; use tracing::warn; -const CPU_UTILIZATION_THRESHOLD: f32 = 50.0; -const MEMORY_UTILIZATION_THRESHOLD: f32 = 50.0; +const CPU_UTILIZATION_THRESHOLD: f32 = 90.0; +const MEMORY_UTILIZATION_THRESHOLD: f32 = 90.0; /// Middleware to check system resource utilization before processing requests /// Returns 503 Service Unavailable if CPU or memory usage exceeds thresholds From 5ce1ab96c709a316eb30bb718119af651f87f5b1 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Wed, 18 Jun 2025 11:29:48 +0530 Subject: [PATCH 03/18] added resource monitoring with configurable CPU and memory thresholds --- src/cli.rs | 19 ++++ src/handlers/http/modal/ingest_server.rs | 8 +- src/handlers/http/modal/server.rs | 7 ++ src/handlers/http/resource_check.rs | 123 ++++++++++++++++------- src/option.rs | 12 +++ 5 files changed, 132 insertions(+), 37 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 26cab2e95..33589d8b6 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -317,6 +317,25 @@ pub struct Options { )] pub parquet_compression: Compression, + // Resource monitoring + #[arg( + long, + env = "P_CPU_THRESHOLD", + default_value = "80.0", + value_parser = validation::validate_percentage, + help = "CPU utilization threshold percentage (0.0-100.0) for resource monitoring" + )] + pub cpu_utilization_threshold: f32, + + #[arg( + long, + env = "P_MEMORY_THRESHOLD", + default_value = "80.0", + value_parser = validation::validate_percentage, + help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring" + )] + pub memory_utilization_threshold: f32, + // Integration features #[arg( long, diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 8a8e1d1b1..7f3d51411 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -39,7 +39,7 @@ use crate::{ http::{ base_path, ingest, logstream, middleware::{DisAllowRootUser, RouteExt}, - role, + resource_check, role, }, }, migration, @@ -126,12 +126,18 @@ impl ParseableServer for IngestServer { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); + // Start resource monitor + let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); + resource_check::spawn_resource_monitor(resource_shutdown_rx); + tokio::spawn(airplane::server()); // Ingestors shouldn't have to deal with OpenId auth flow let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); + // Shutdown resource monitor + let _ = resource_shutdown_tx.send(()); if let Err(join_err) = startup_sync_handle.await { tracing::warn!("startup sync task panicked: {join_err}"); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d22e5de02..d7dfef240 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -25,6 +25,7 @@ use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; +use crate::handlers::http::resource_check; use crate::handlers::http::query; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; @@ -138,6 +139,10 @@ impl ParseableServer for Server { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); + // Start resource monitor + let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); + resource_check::spawn_resource_monitor(resource_shutdown_rx); + if PARSEABLE.options.send_analytics { analytics::init_analytics_scheduler()?; } @@ -150,6 +155,8 @@ impl ParseableServer for Server { .await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); + // Shutdown resource monitor + let _ = resource_shutdown_tx.send(()); if let Err(join_err) = startup_sync_handle.await { tracing::warn!("startup sync task panicked: {join_err}"); } diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index 36eed29df..9071b2d0b 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -23,51 +23,102 @@ use actix_web::{ error::ErrorServiceUnavailable, middleware::Next, }; -use sysinfo::System; -use tracing::warn; +use tokio::{select, time::{interval, Duration}}; +use tokio::sync::RwLock; +use tracing::{warn, trace, info}; -const CPU_UTILIZATION_THRESHOLD: f32 = 90.0; -const MEMORY_UTILIZATION_THRESHOLD: f32 = 90.0; +use crate::analytics::{SYS_INFO, refresh_sys_info}; +use crate::parseable::PARSEABLE; + +static RESOURCE_CHECK_ENABLED: RwLock = RwLock::const_new(true); + +/// Spawn a background task to monitor system resources +pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { + tokio::spawn(async move { + let mut check_interval = interval(Duration::from_secs(30)); + let mut shutdown_rx = shutdown_rx; + + let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold; + let memory_threshold = PARSEABLE.options.memory_utilization_threshold; + + info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%", + cpu_threshold, memory_threshold); + loop { + select! { + _ = check_interval.tick() => { + trace!("Checking system resource utilization..."); + + refresh_sys_info(); + let (used_memory, total_memory, cpu_usage) = { + let sys = SYS_INFO.lock().unwrap(); + let used_memory = sys.used_memory() as f32; + let total_memory = sys.total_memory() as f32; + let cpu_usage = sys.global_cpu_usage(); + (used_memory, total_memory, cpu_usage) + }; + + let mut resource_ok = true; + + // Calculate memory usage percentage + let memory_usage = if total_memory > 0.0 { + (used_memory / total_memory) * 100.0 + } else { + 0.0 + }; + + // Log current resource usage every few checks for debugging + info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", + cpu_usage, memory_usage, + used_memory / 1024.0 / 1024.0 / 1024.0, + total_memory / 1024.0 / 1024.0 / 1024.0); + + // Check memory utilization + if memory_usage > memory_threshold { + warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)", + memory_usage, memory_threshold); + resource_ok = false; + } + + // Check CPU utilization + if cpu_usage > cpu_threshold { + warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)", + cpu_usage, cpu_threshold); + resource_ok = false; + } + + let previous_state = *RESOURCE_CHECK_ENABLED.read().await; + *RESOURCE_CHECK_ENABLED.write().await = resource_ok; + + // Log state changes + if previous_state != resource_ok { + if resource_ok { + info!("Resource utilization back to normal - requests will be accepted"); + } else { + warn!("Resource utilization too high - requests will be rejected"); + } + } + }, + _ = &mut shutdown_rx => { + trace!("Resource monitor shutting down"); + break; + } + } + } + }); +} /// Middleware to check system resource utilization before processing requests -/// Returns 503 Service Unavailable if CPU or memory usage exceeds thresholds +/// Returns 503 Service Unavailable if resources are over-utilized pub async fn check_resource_utilization_middleware( req: ServiceRequest, next: Next, ) -> Result, Error> { - let mut sys = System::new_all(); - sys.refresh_cpu_usage(); - sys.refresh_memory(); + let resource_ok = *RESOURCE_CHECK_ENABLED.read().await; - let used_memory = sys.used_memory() as f32; - let total_memory = sys.total_memory() as f32; - - // Check memory utilization - if total_memory > 0.0 { - let memory_usage = (used_memory / total_memory) * 100.0; - if memory_usage > MEMORY_UTILIZATION_THRESHOLD { - let error_msg = format!("Memory is over-utilized: {:.1}%", memory_usage); - warn!( - "Rejecting request to {} due to high memory usage: {:.1}% (threshold: {:.1}%)", - req.path(), - memory_usage, - MEMORY_UTILIZATION_THRESHOLD - ); - return Err(ErrorServiceUnavailable(error_msg)); - } - } - - // Check CPU utilization - let cpu_usage = sys.global_cpu_usage(); - if cpu_usage > CPU_UTILIZATION_THRESHOLD { - let error_msg = format!("CPU is over-utilized: {:.1}%", cpu_usage); - warn!( - "Rejecting request to {} due to high CPU usage: {:.1}% (threshold: {:.1}%)", - req.path(), - cpu_usage, - CPU_UTILIZATION_THRESHOLD - ); + if !resource_ok { + let error_msg = "Server resources over-utilized"; + warn!("Rejecting request to {} due to resource constraints", req.path()); return Err(ErrorServiceUnavailable(error_msg)); } diff --git a/src/option.rs b/src/option.rs index db9c94097..f2a9af119 100644 --- a/src/option.rs +++ b/src/option.rs @@ -175,6 +175,18 @@ pub mod validation { } } + pub fn validate_percentage(percentage: &str) -> Result { + if let Ok(percentage) = percentage.parse::() { + if (0.0..=100.0).contains(&percentage) { + Ok(percentage) + } else { + Err("Invalid percentage value. It should be between 0.0 and 100.0".to_string()) + } + } else { + Err("Invalid percentage value. It should be a decimal number like 80.0".to_string()) + } + } + pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result { if let Ok(size) = s.parse::() { if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) { From b58212f65ff9e3bfc1f7ddc44377654bce8e82cc Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Wed, 18 Jun 2025 14:32:08 +0530 Subject: [PATCH 04/18] fix: use blocking task for resource usage retrieval and update memory units to GiB --- src/handlers/http/resource_check.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index 9071b2d0b..dc0878c46 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -49,13 +49,13 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { trace!("Checking system resource utilization..."); refresh_sys_info(); - let (used_memory, total_memory, cpu_usage) = { + let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| { let sys = SYS_INFO.lock().unwrap(); let used_memory = sys.used_memory() as f32; let total_memory = sys.total_memory() as f32; let cpu_usage = sys.global_cpu_usage(); (used_memory, total_memory, cpu_usage) - }; + }).await.unwrap(); let mut resource_ok = true; @@ -67,7 +67,7 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { }; // Log current resource usage every few checks for debugging - info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", + info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GiB/{:.1}GiB)", cpu_usage, memory_usage, used_memory / 1024.0 / 1024.0 / 1024.0, total_memory / 1024.0 / 1024.0 / 1024.0); From f6a25170393eeb4fc87c64e2ab5e76406cbb72bf Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Wed, 18 Jun 2025 14:36:35 +0530 Subject: [PATCH 05/18] fix: update memory unit logging from GiB to GB for consistency --- src/handlers/http/resource_check.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index dc0878c46..b1b1e9373 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -67,7 +67,7 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { }; // Log current resource usage every few checks for debugging - info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GiB/{:.1}GiB)", + info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", cpu_usage, memory_usage, used_memory / 1024.0 / 1024.0 / 1024.0, total_memory / 1024.0 / 1024.0 / 1024.0); From 891e881ae72694c39e604602dee149c84baa8dc2 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Wed, 18 Jun 2025 20:56:34 +0530 Subject: [PATCH 06/18] refactor: replace RwLock with LazyLock and AtomicBool for speeeedddd --- src/handlers/http/resource_check.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index b1b1e9373..0b15bb176 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -16,6 +16,8 @@ * */ +use std::sync::{atomic::AtomicBool, Arc, LazyLock}; + use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, @@ -24,13 +26,12 @@ use actix_web::{ middleware::Next, }; use tokio::{select, time::{interval, Duration}}; -use tokio::sync::RwLock; use tracing::{warn, trace, info}; use crate::analytics::{SYS_INFO, refresh_sys_info}; use crate::parseable::PARSEABLE; -static RESOURCE_CHECK_ENABLED: RwLock = RwLock::const_new(true); +static RESOURCE_CHECK_ENABLED:LazyLock> = LazyLock::new(|| Arc::new(AtomicBool::new(false))); /// Spawn a background task to monitor system resources pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { @@ -86,9 +87,9 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { resource_ok = false; } - let previous_state = *RESOURCE_CHECK_ENABLED.read().await; - *RESOURCE_CHECK_ENABLED.write().await = resource_ok; - + let previous_state = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); + RESOURCE_CHECK_ENABLED.store(resource_ok, std::sync::atomic::Ordering::SeqCst); + // Log state changes if previous_state != resource_ok { if resource_ok { @@ -114,8 +115,8 @@ pub async fn check_resource_utilization_middleware( next: Next, ) -> Result, Error> { - let resource_ok = *RESOURCE_CHECK_ENABLED.read().await; - + let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); + if !resource_ok { let error_msg = "Server resources over-utilized"; warn!("Rejecting request to {} due to resource constraints", req.path()); From f90714defa7598958aee0c467392776a0072b4ab Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Wed, 18 Jun 2025 23:04:00 +0530 Subject: [PATCH 07/18] fix: add resource utilization middleware to ONLY the ingest routes --- src/handlers/http/modal/ingest_server.rs | 14 +++++++++++--- src/handlers/http/modal/mod.rs | 3 +-- src/handlers/http/modal/server.rs | 14 +++++++++++--- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 7f3d51411..5ab39ca70 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -21,6 +21,7 @@ use std::thread; use actix_web::web; use actix_web::Scope; +use actix_web::middleware::from_fn; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use base64::Engine; @@ -67,7 +68,10 @@ impl ParseableServer for IngestServer { .service( // Base path "{url}/api/v1" web::scope(&base_path()) - .service(Server::get_ingest_factory()) + .service( + Server::get_ingest_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::logstream_api()) .service(Server::get_about_factory()) .service(Self::analytics_factory()) @@ -77,7 +81,10 @@ impl ParseableServer for IngestServer { .service(Server::get_metrics_webscope()) .service(Server::get_readiness_factory()), ) - .service(Server::get_ingest_otel_factory()); + .service( + Server::get_ingest_otel_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ); } async fn load_metadata(&self) -> anyhow::Result> { @@ -229,7 +236,8 @@ impl IngestServer { web::post() .to(ingest::post_event) .authorize_for_stream(Action::Ingest), - ), + ) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) .service( web::resource("/sync") diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index b186106b0..3504ef0fa 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -45,7 +45,7 @@ use crate::{ utils::get_node_id, }; -use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION}; +use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; pub mod ingest; pub mod ingest_server; @@ -113,7 +113,6 @@ pub trait ParseableServer { .wrap(prometheus.clone()) .configure(|config| Self::configure_routes(config, oidc_client.clone())) .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(from_fn(resource_check::check_resource_utilization_middleware)) .wrap(from_fn(audit::audit_log_middleware)) .wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Compress::default()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d7dfef240..80457e198 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -38,6 +38,7 @@ use crate::sync::sync_start; use actix_web::web; use actix_web::web::resource; +use actix_web::middleware::from_fn; use actix_web::Resource; use actix_web::Scope; use actix_web_prometheus::PrometheusMetrics; @@ -73,7 +74,10 @@ impl ParseableServer for Server { web::scope(&base_path()) .service(Self::get_correlation_webscope()) .service(Self::get_query_factory()) - .service(Self::get_ingest_factory()) + .service( + Self::get_ingest_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) .service(Self::get_about_factory()) @@ -96,7 +100,10 @@ impl ParseableServer for Server { .service(Server::get_prism_logstream()) .service(Server::get_prism_datasets()), ) - .service(Self::get_ingest_otel_factory()) + .service( + Self::get_ingest_otel_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::get_generated()); } @@ -374,7 +381,8 @@ impl Server { .to(logstream::delete) .authorize_for_stream(Action::DeleteStream), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream From cd8c029ca0415541e29690b4f0b4989e531d93c4 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Thu, 19 Jun 2025 14:12:31 +0530 Subject: [PATCH 08/18] Added resource check interval option and updated the resource monitoring interval to be taken from the cli via `--resource-check-interval` flag. Also wrapped the middleware around cpu intensive query endpoints available in `query.rs`. --- src/cli.rs | 9 +++++++++ src/handlers/http/modal/query_server.rs | 19 ++++++++++++++++--- src/handlers/http/modal/server.rs | 10 ++++++++-- src/handlers/http/resource_check.rs | 3 ++- src/option.rs | 7 +++++++ 5 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 33589d8b6..04bfb06fc 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -318,6 +318,15 @@ pub struct Options { pub parquet_compression: Compression, // Resource monitoring + #[arg( + long, + env = "P_RESOURCE_CHECK_INTERVAL", + default_value = "30", + value_parser = validation::validate_seconds, + help = "Resource monitoring check interval in seconds" + )] + pub resource_check_interval: u64, + #[arg( long, env = "P_CPU_THRESHOLD", diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b43fa68a9..c4eb8dbc3 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -22,13 +22,14 @@ use std::thread; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; -use crate::handlers::http::{base_path, prism_base_path}; +use crate::handlers::http::{base_path, prism_base_path, resource_check}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync::sync_start; use crate::{analytics, migration, storage, sync}; +use actix_web::middleware::from_fn; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; use actix_web_prometheus::PrometheusMetrics; @@ -53,7 +54,10 @@ impl ParseableServer for QueryServer { .service( web::scope(&base_path()) .service(Server::get_correlation_webscope()) - .service(Server::get_query_factory()) + .service( + Server::get_query_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) @@ -66,7 +70,10 @@ impl ParseableServer for QueryServer { .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Server::get_roles_webscope()) - .service(Server::get_counts_webscope()) + .service( + Server::get_counts_webscope() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), @@ -143,6 +150,10 @@ impl ParseableServer for QueryServer { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); + // Start resource monitor + let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); + resource_check::spawn_resource_monitor(resource_shutdown_rx); + tokio::spawn(airplane::server()); let result = self @@ -150,6 +161,8 @@ impl ParseableServer for QueryServer { .await?; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); + // Shutdown resource monitor + let _ = resource_shutdown_tx.send(()); if let Err(join_err) = startup_sync_handle.await { tracing::warn!("startup sync task panicked: {join_err}"); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 80457e198..c6403028c 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -73,7 +73,10 @@ impl ParseableServer for Server { .service( web::scope(&base_path()) .service(Self::get_correlation_webscope()) - .service(Self::get_query_factory()) + .service( + Self::get_query_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service( Self::get_ingest_factory() .wrap(from_fn(resource_check::check_resource_utilization_middleware)) @@ -90,7 +93,10 @@ impl ParseableServer for Server { .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Self::get_roles_webscope()) - .service(Self::get_counts_webscope()) + .service( + Self::get_counts_webscope() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::get_alerts_webscope()) .service(Self::get_metrics_webscope()), ) diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index 0b15bb176..e1f08285f 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -36,7 +36,8 @@ static RESOURCE_CHECK_ENABLED:LazyLock> = LazyLock::new(|| Arc:: /// Spawn a background task to monitor system resources pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { tokio::spawn(async move { - let mut check_interval = interval(Duration::from_secs(30)); + let resource_check_interval = PARSEABLE.options.resource_check_interval; + let mut check_interval = interval(Duration::from_secs(resource_check_interval)); let mut shutdown_rx = shutdown_rx; let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold; diff --git a/src/option.rs b/src/option.rs index f2a9af119..7378deb8a 100644 --- a/src/option.rs +++ b/src/option.rs @@ -187,6 +187,13 @@ pub mod validation { } } + pub fn validate_seconds(s: &str) -> Result { + if let Ok(seconds) = s.parse::() { + Ok(seconds) + } else { + Err("Invalid value for seconds. It should be a positive integer".to_string()) + } + } pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result { if let Ok(size) = s.parse::() { if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) { From 4578d0c7fe7db4c6a3cbd2fb8af72db61ce3d93f Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Thu, 19 Jun 2025 14:13:48 +0530 Subject: [PATCH 09/18] refactor: changed the default interval to 15 seconds. --- src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli.rs b/src/cli.rs index 04bfb06fc..104fec2cc 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -321,7 +321,7 @@ pub struct Options { #[arg( long, env = "P_RESOURCE_CHECK_INTERVAL", - default_value = "30", + default_value = "15", value_parser = validation::validate_seconds, help = "Resource monitoring check interval in seconds" )] From 53a4532dd2738e74bf2e58428f0d9d9e75fc7793 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Thu, 19 Jun 2025 14:27:41 +0530 Subject: [PATCH 10/18] refactor: remove resource monitor initialization from ingest, query, and server handlers --- src/handlers/http/modal/ingest_server.rs | 6 ------ src/handlers/http/modal/mod.rs | 9 ++++++++- src/handlers/http/modal/query_server.rs | 6 ------ src/handlers/http/modal/server.rs | 6 ------ 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 5ab39ca70..c202a92c6 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -133,18 +133,12 @@ impl ParseableServer for IngestServer { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); - // Start resource monitor - let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); - resource_check::spawn_resource_monitor(resource_shutdown_rx); - tokio::spawn(airplane::server()); // Ingestors shouldn't have to deal with OpenId auth flow let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - // Shutdown resource monitor - let _ = resource_shutdown_tx.send(()); if let Err(join_err) = startup_sync_handle.await { tracing::warn!("startup sync task panicked: {join_err}"); } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 3504ef0fa..1019d5d8a 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -45,7 +45,7 @@ use crate::{ utils::get_node_id, }; -use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; +use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION}; pub mod ingest; pub mod ingest_server; @@ -107,6 +107,10 @@ pub trait ParseableServer { &PARSEABLE.options.trusted_ca_certs_path, )?; + // Start resource monitor + let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); + resource_check::spawn_resource_monitor(resource_shutdown_rx); + // fn that creates the app let create_app_fn = move || { App::new() @@ -142,6 +146,9 @@ pub trait ParseableServer { health_check::shutdown().await; + // Shutdown resource monitor + let _ = resource_shutdown_tx.send(()); + // Initiate graceful shutdown info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c4eb8dbc3..75b10ce7b 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -150,10 +150,6 @@ impl ParseableServer for QueryServer { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); - // Start resource monitor - let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); - resource_check::spawn_resource_monitor(resource_shutdown_rx); - tokio::spawn(airplane::server()); let result = self @@ -161,8 +157,6 @@ impl ParseableServer for QueryServer { .await?; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - // Shutdown resource monitor - let _ = resource_shutdown_tx.send(()); if let Err(join_err) = startup_sync_handle.await { tracing::warn!("startup sync task panicked: {join_err}"); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index c6403028c..13058e951 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -152,10 +152,6 @@ impl ParseableServer for Server { let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); - // Start resource monitor - let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); - resource_check::spawn_resource_monitor(resource_shutdown_rx); - if PARSEABLE.options.send_analytics { analytics::init_analytics_scheduler()?; } @@ -168,8 +164,6 @@ impl ParseableServer for Server { .await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - // Shutdown resource monitor - let _ = resource_shutdown_tx.send(()); if let Err(join_err) = startup_sync_handle.await { tracing::warn!("startup sync task panicked: {join_err}"); } From 88b4596e6a67d1c248a62b5fd09784d91c45ca28 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Thu, 19 Jun 2025 16:37:22 +0530 Subject: [PATCH 11/18] refactor: add resource utilization middleware to logstream routes, deleted middleware from the DELETE request --- src/handlers/http/modal/server.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 13058e951..4664c4a41 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -367,13 +367,15 @@ impl Server { .route( web::put() .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_stream(Action::CreateStream) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), + .authorize_for_stream(Action::Ingest) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( @@ -381,9 +383,8 @@ impl Server { .to(logstream::delete) .authorize_for_stream(Action::DeleteStream), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) - .wrap(from_fn(resource_check::check_resource_utilization_middleware)), - ) + .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream web::resource("/info").route( From 5b142b70e65a44a72a9a8b906f4fb028d19155b3 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Thu, 19 Jun 2025 16:44:31 +0530 Subject: [PATCH 12/18] refactor: removed resource_check from the PUT stream. --- src/handlers/http/modal/server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 4664c4a41..ece5374de 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -368,7 +368,6 @@ impl Server { web::put() .to(logstream::put_stream) .authorize_for_stream(Action::CreateStream) - .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( From da3583a18bf105531adbd0daa92febdc364b3584 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Fri, 20 Jun 2025 16:58:00 +0530 Subject: [PATCH 13/18] refactor: simplify uptime retrieval in Report::new method --- src/analytics.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/analytics.rs b/src/analytics.rs index 8bafe8a95..eaa5458a2 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -95,9 +95,8 @@ pub struct Report { impl Report { pub async fn new() -> anyhow::Result { let mut upt: f64 = 0.0; - if let Ok(uptime) = uptime_lib::get() { - upt = uptime.as_secs_f64(); - } + let uptime = uptime_lib::get().unwrap(); + upt = uptime.as_secs_f64(); refresh_sys_info(); let mut os_version = "Unknown".to_string(); From 305ad2580b5f217c877b96ae7d9e865fa8433e24 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Fri, 20 Jun 2025 17:03:07 +0530 Subject: [PATCH 14/18] refactor: glazing clippy --- src/analytics.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/analytics.rs b/src/analytics.rs index eaa5458a2..e57ebabef 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -94,9 +94,8 @@ pub struct Report { impl Report { pub async fn new() -> anyhow::Result { - let mut upt: f64 = 0.0; let uptime = uptime_lib::get().unwrap(); - upt = uptime.as_secs_f64(); + let upt = uptime.as_secs_f64(); refresh_sys_info(); let mut os_version = "Unknown".to_string(); From 1469507dd361c1abc39809a8b623b5e9667aebd1 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Sat, 21 Jun 2025 02:19:29 +0530 Subject: [PATCH 15/18] empty push From b083724d766116b7e34082cb1e4aa173d3b9ba5d Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Sat, 21 Jun 2025 02:27:59 +0530 Subject: [PATCH 16/18] nit: checking if the build runs --- src/analytics.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/analytics.rs b/src/analytics.rs index e57ebabef..8bafe8a95 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -94,8 +94,10 @@ pub struct Report { impl Report { pub async fn new() -> anyhow::Result { - let uptime = uptime_lib::get().unwrap(); - let upt = uptime.as_secs_f64(); + let mut upt: f64 = 0.0; + if let Ok(uptime) = uptime_lib::get() { + upt = uptime.as_secs_f64(); + } refresh_sys_info(); let mut os_version = "Unknown".to_string(); From 63e7a58affcdded5b6bc12e346582887aa53a00f Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Sat, 21 Jun 2025 02:45:13 +0530 Subject: [PATCH 17/18] version change to not use cache for this build --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ff088f11e..096bff9d3 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -41,7 +41,7 @@ jobs: ~/.cargo/registry ~/.cargo/git target - key: ${{ runner.os }}-cargo-${{ matrix.target }}-default-${{ hashFiles('**/Cargo.lock') }} + key: ${{ runner.os }}-cargo-${{ matrix.target }}-default-v2-${{ hashFiles('**/Cargo.lock') }} - name: Build uses: actions-rs/cargo@v1 with: From 4f1ae5ce2392b50a4259ae9e4925de64961b0b57 Mon Sep 17 00:00:00 2001 From: vkhinvasara Date: Sat, 21 Jun 2025 12:21:31 +0530 Subject: [PATCH 18/18] fix: update cache key format in build workflow --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 096bff9d3..ff088f11e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -41,7 +41,7 @@ jobs: ~/.cargo/registry ~/.cargo/git target - key: ${{ runner.os }}-cargo-${{ matrix.target }}-default-v2-${{ hashFiles('**/Cargo.lock') }} + key: ${{ runner.os }}-cargo-${{ matrix.target }}-default-${{ hashFiles('**/Cargo.lock') }} - name: Build uses: actions-rs/cargo@v1 with: