From 3ce8af2015c336b1f2b51ff830353e501a5e173b Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Mon, 21 Oct 2024 00:00:23 +0530 Subject: [PATCH 1/8] sigterm fixes --- server/src/handlers/http/health_check.rs | 16 ++------- server/src/handlers/http/modal/server.rs | 39 ++++++++++++++++----- server/src/main.rs | 2 +- server/src/storage/object_storage.rs | 44 ++++++++++++++++-------- server/src/storage/staging.rs | 23 ++++++++----- server/src/sync.rs | 2 +- 6 files changed, 79 insertions(+), 47 deletions(-) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index d59f3bec0..9e8e5a69a 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -66,23 +66,13 @@ pub async fn handle_signals(shutdown_signal: Arc { log::info!("Signal handler received None, indicating an error or end of stream"); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 51195609b..84ce739e5 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -41,6 +41,7 @@ use crate::users::filters::FILTERS; use actix_web::middleware::from_fn; use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; +use tokio::time::{sleep, Duration}; use actix_web::web::resource; use actix_web::Resource; @@ -65,7 +66,6 @@ use super::generate; use super::ssl_acceptor::get_ssl_acceptor; use super::OpenIdClient; use super::ParseableServer; - #[derive(Default)] pub struct Server; @@ -110,9 +110,9 @@ impl ParseableServer for Server { let shutdown_signal = server_shutdown_signal.clone(); // Spawn the signal handler task - tokio::spawn(async move { + let signal_task = tokio::spawn(async move { health_check::handle_signals(shutdown_signal).await; - println!("Received shutdown signal, notifying server to shut down..."); + log::info!("Received shutdown signal, notifying server to shut down..."); }); // Create the HTTP server @@ -131,18 +131,41 @@ impl ParseableServer for Server { // Graceful shutdown handling let srv_handle = srv.handle(); - - tokio::spawn(async move { + + let sync_task = tokio::spawn(async move { // Wait for the shutdown signal - shutdown_rx.await.ok(); + let _ = shutdown_rx.await; + + // Perform S3 sync and wait for completion + log::info!("Starting data sync to S3..."); + if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } else { + log::info!("Successfully synced all data to S3."); + } // Initiate graceful shutdown log::info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; }); - // Await the server to run and handle shutdown - srv.await?; + // Await the HTTP server to run + let server_result = srv.await; + + // Await the signal handler to ensure proper cleanup + if let Err(e) = signal_task.await { + log::error!("Error in signal handler: {:?}", e); + } + + // Wait for the sync task to complete before exiting + if let Err(e) = sync_task.await { + log::error!("Error in sync task: {:?}", e); + } else { + log::info!("Sync task completed successfully."); + } + + // Return the result of the server + server_result?; Ok(()) } diff --git a/server/src/main.rs b/server/src/main.rs index fca2ca307..cd008211f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -52,7 +52,7 @@ use option::{Mode, CONFIG}; use crate::handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, }; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; +pub const STORAGE_UPLOAD_INTERVAL: u32 = 600; #[actix_web::main] async fn main() -> anyhow::Result<()> { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 93b8a5bcd..4f7c4deae 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -426,16 +426,16 @@ pub trait ObjectStorage: Sync + 'static { .await } - async fn sync(&self) -> Result<(), ObjectStorageError> { + async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } - + let streams = STREAM_INFO.list_streams(); - + let cache_manager = LocalCacheManager::global(); let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); - + for stream in &streams { let cache_enabled = STREAM_INFO .get_cache_enabled(stream) @@ -452,9 +452,10 @@ pub trait ObjectStorage: Sync + 'static { &dir, time_partition, custom_partition.clone(), + shutdown_signal, ) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - + if let Some(schema) = schema { let static_schema_flag = STREAM_INFO .get_static_schema_flag(stream) @@ -463,14 +464,18 @@ pub trait ObjectStorage: Sync + 'static { commit_schema_to_storage(stream, schema).await?; } } + let parquet_files = dir.parquet_files(); - for file in parquet_files { let filename = file .file_name() .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); + + // Log the filename being processed + log::debug!("Processing file: {}", filename); + let mut file_date_part = filename.split('.').collect::>()[0]; file_date_part = file_date_part.split('=').collect::>()[1]; let compressed_size = file.metadata().map_or(0, |meta| meta.len()); @@ -484,7 +489,7 @@ pub trait ObjectStorage: Sync + 'static { .with_label_values(&["data", stream, "parquet"]) .add(compressed_size as i64); let mut file_suffix = str::replacen(filename, ".", "/", 3); - + let custom_partition_clone = custom_partition.clone(); if custom_partition_clone.is_some() { let custom_partition_fields = custom_partition_clone.unwrap(); @@ -493,8 +498,15 @@ pub trait ObjectStorage: Sync + 'static { file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); } + let stream_relative_path = format!("{stream}/{file_suffix}"); - self.upload_file(&stream_relative_path, &file).await?; + + // Try uploading the file, handle potential errors without breaking the loop + if let Err(e) = self.upload_file(&stream_relative_path, &file).await { + log::error!("Failed to upload file {}: {:?}", filename, e); + continue; // Skip to the next file + } + let absolute_path = self .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) .to_string(); @@ -512,28 +524,30 @@ pub trait ObjectStorage: Sync + 'static { } } } - + + // Cache management logic if let Some(manager) = cache_manager { let cache_updates = cache_updates .into_iter() .map(|(key, value)| (key.to_owned(), value)) .collect_vec(); - + tokio::spawn(async move { for (stream, files) in cache_updates { for (storage_path, file) in files { - manager + if let Err(e) = manager .move_to_cache(&stream, storage_path, file.to_owned()) - .await - .unwrap() + .await { + log::error!("Failed to move file to cache: {:?}", e); + } } } }); } - + Ok(()) } - + // pick a better name fn get_bucket_name(&self) -> String; } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 59c4d6651..0b421fb1e 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -155,17 +155,21 @@ impl StorageDir { &self, exclude: NaiveDateTime, stream: &str, + shutdown_signal: bool, ) -> HashMap> { let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); - arrow_files.retain(|path| { - !path - .file_name() - .unwrap() - .to_str() - .unwrap() - .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) - }); + + if !shutdown_signal { + arrow_files.retain(|path| { + path.file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) + }); + } + let random_string = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); for arrow_file_path in arrow_files { @@ -223,11 +227,12 @@ pub fn convert_disk_files_to_parquet( dir: &StorageDir, time_partition: Option, custom_partition: Option, + shutdown_signal: bool, ) -> Result, MoveDataError> { let mut schemas = Vec::new(); let time = chrono::Utc::now().naive_utc(); - let staging_files = dir.arrow_files_grouped_exclude_time(time, stream); + let staging_files = dir.arrow_files_grouped_exclude_time(time, stream, shutdown_signal); if staging_files.is_empty() { metrics::STAGING_FILES.with_label_values(&[stream]).set(0); metrics::STORAGE_SIZE diff --git a/server/src/sync.rs b/server/src/sync.rs index 446f53b7e..2367f3de8 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -40,7 +40,7 @@ pub async fn object_store_sync() -> ( .every(STORAGE_UPLOAD_INTERVAL.seconds()) .plus(5u32.seconds()) .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().sync().await { + if let Err(e) = CONFIG.storage().get_object_store().sync(false).await { log::warn!("failed to sync local data with object store. {:?}", e); } }); From 461aa033b25c763e8808357fdeae0152b8089f09 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Mon, 21 Oct 2024 08:18:59 +0530 Subject: [PATCH 2/8] revert upload interval --- server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main.rs b/server/src/main.rs index cd008211f..fca2ca307 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -52,7 +52,7 @@ use option::{Mode, CONFIG}; use crate::handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, }; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 600; +pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; #[actix_web::main] async fn main() -> anyhow::Result<()> { From ca57b7633605dab3e63ace061291daeecd50b83a Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 22 Oct 2024 15:06:01 +0530 Subject: [PATCH 3/8] sigterm on ingest and query --- .../src/handlers/http/modal/ingest_server.rs | 37 +++++++++++++++---- .../src/handlers/http/modal/query_server.rs | 36 +++++++++++++++--- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 27fd99426..245284ce4 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -112,9 +112,9 @@ impl ParseableServer for IngestServer { let shutdown_signal = server_shutdown_signal.clone(); // Spawn the signal handler task - tokio::spawn(async move { + let signal_task = tokio::spawn(async move { health_check::handle_signals(shutdown_signal).await; - println!("Received shutdown signal, notifying server to shut down..."); + log::info!("Received shutdown signal, notifying server to shut down..."); }); // Create the HTTP server @@ -133,18 +133,41 @@ impl ParseableServer for IngestServer { // Graceful shutdown handling let srv_handle = srv.handle(); - - tokio::spawn(async move { + + let sync_task = tokio::spawn(async move { // Wait for the shutdown signal - shutdown_rx.await.ok(); + let _ = shutdown_rx.await; + + // Perform S3 sync and wait for completion + log::info!("Starting data sync to S3..."); + if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } else { + log::info!("Successfully synced all data to S3."); + } // Initiate graceful shutdown log::info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; }); - // Await the server to run and handle shutdown - srv.await?; + // Await the HTTP server to run + let server_result = srv.await; + + // Await the signal handler to ensure proper cleanup + if let Err(e) = signal_task.await { + log::error!("Error in signal handler: {:?}", e); + } + + // Wait for the sync task to complete before exiting + if let Err(e) = sync_task.await { + log::error!("Error in sync task: {:?}", e); + } else { + log::info!("Sync task completed successfully."); + } + + // Return the result of the server + server_result?; Ok(()) } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 394853f30..a276fb436 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -89,8 +89,9 @@ impl ParseableServer for QueryServer { let shutdown_signal = server_shutdown_signal.clone(); // Spawn the signal handler task - tokio::spawn(async move { + let signal_task = tokio::spawn(async move { health_check::handle_signals(shutdown_signal).await; + log::info!("Received shutdown signal, notifying server to shut down..."); }); // Create the HTTP server @@ -109,18 +110,41 @@ impl ParseableServer for QueryServer { // Graceful shutdown handling let srv_handle = srv.handle(); - - tokio::spawn(async move { + + let sync_task = tokio::spawn(async move { // Wait for the shutdown signal - shutdown_rx.await.ok(); + let _ = shutdown_rx.await; + + // Perform S3 sync and wait for completion + log::info!("Starting data sync to S3..."); + if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } else { + log::info!("Successfully synced all data to S3."); + } // Initiate graceful shutdown log::info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; }); - // Await the server to run and handle shutdown - srv.await?; + // Await the HTTP server to run + let server_result = srv.await; + + // Await the signal handler to ensure proper cleanup + if let Err(e) = signal_task.await { + log::error!("Error in signal handler: {:?}", e); + } + + // Wait for the sync task to complete before exiting + if let Err(e) = sync_task.await { + log::error!("Error in sync task: {:?}", e); + } else { + log::info!("Sync task completed successfully."); + } + + // Return the result of the server + server_result?; Ok(()) } From cd06891dc0d4a67f6ff49437bbc733fec39dcb8d Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 22 Oct 2024 15:15:00 +0530 Subject: [PATCH 4/8] update helm --- helm/templates/ingestor-statefulset.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index 12e9303f9..91ca793c0 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -103,4 +103,4 @@ spec: requests: storage: {{ .Values.parseable.persistence.ingestor.size | quote }} {{- end }} -{{- end }} +{{- end }} \ No newline at end of file From 2f67d0ffe39976d5259ab53698b8f43bfd87d79b Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 22 Oct 2024 15:16:22 +0530 Subject: [PATCH 5/8] fmt code --- .../src/handlers/http/modal/ingest_server.rs | 2 +- .../src/handlers/http/modal/query_server.rs | 2 +- server/src/handlers/http/modal/server.rs | 2 +- server/src/storage/object_storage.rs | 33 ++++++++++--------- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 245284ce4..f6e54ce8e 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -133,7 +133,7 @@ impl ParseableServer for IngestServer { // Graceful shutdown handling let srv_handle = srv.handle(); - + let sync_task = tokio::spawn(async move { // Wait for the shutdown signal let _ = shutdown_rx.await; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index a276fb436..302bd977e 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -110,7 +110,7 @@ impl ParseableServer for QueryServer { // Graceful shutdown handling let srv_handle = srv.handle(); - + let sync_task = tokio::spawn(async move { // Wait for the shutdown signal let _ = shutdown_rx.await; diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 84ce739e5..5865ff915 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -131,7 +131,7 @@ impl ParseableServer for Server { // Graceful shutdown handling let srv_handle = srv.handle(); - + let sync_task = tokio::spawn(async move { // Wait for the shutdown signal let _ = shutdown_rx.await; diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 4f7c4deae..14c1dc264 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -430,12 +430,12 @@ pub trait ObjectStorage: Sync + 'static { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } - + let streams = STREAM_INFO.list_streams(); - + let cache_manager = LocalCacheManager::global(); let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); - + for stream in &streams { let cache_enabled = STREAM_INFO .get_cache_enabled(stream) @@ -455,7 +455,7 @@ pub trait ObjectStorage: Sync + 'static { shutdown_signal, ) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - + if let Some(schema) = schema { let static_schema_flag = STREAM_INFO .get_static_schema_flag(stream) @@ -464,7 +464,7 @@ pub trait ObjectStorage: Sync + 'static { commit_schema_to_storage(stream, schema).await?; } } - + let parquet_files = dir.parquet_files(); for file in parquet_files { let filename = file @@ -472,10 +472,10 @@ pub trait ObjectStorage: Sync + 'static { .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); - + // Log the filename being processed log::debug!("Processing file: {}", filename); - + let mut file_date_part = filename.split('.').collect::>()[0]; file_date_part = file_date_part.split('=').collect::>()[1]; let compressed_size = file.metadata().map_or(0, |meta| meta.len()); @@ -489,7 +489,7 @@ pub trait ObjectStorage: Sync + 'static { .with_label_values(&["data", stream, "parquet"]) .add(compressed_size as i64); let mut file_suffix = str::replacen(filename, ".", "/", 3); - + let custom_partition_clone = custom_partition.clone(); if custom_partition_clone.is_some() { let custom_partition_fields = custom_partition_clone.unwrap(); @@ -498,15 +498,15 @@ pub trait ObjectStorage: Sync + 'static { file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); } - + let stream_relative_path = format!("{stream}/{file_suffix}"); - + // Try uploading the file, handle potential errors without breaking the loop if let Err(e) = self.upload_file(&stream_relative_path, &file).await { log::error!("Failed to upload file {}: {:?}", filename, e); continue; // Skip to the next file } - + let absolute_path = self .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) .to_string(); @@ -524,30 +524,31 @@ pub trait ObjectStorage: Sync + 'static { } } } - + // Cache management logic if let Some(manager) = cache_manager { let cache_updates = cache_updates .into_iter() .map(|(key, value)| (key.to_owned(), value)) .collect_vec(); - + tokio::spawn(async move { for (stream, files) in cache_updates { for (storage_path, file) in files { if let Err(e) = manager .move_to_cache(&stream, storage_path, file.to_owned()) - .await { + .await + { log::error!("Failed to move file to cache: {:?}", e); } } } }); } - + Ok(()) } - + // pick a better name fn get_bucket_name(&self) -> String; } From 49d5dd43788047579125dc4f197b9c3bbf6ce143 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 24 Oct 2024 16:51:09 +0530 Subject: [PATCH 6/8] fixes 1. sync to storage issue - for regular scenario 2. server accepting request even after SIGTERM signal received - ingest handlers to check signal --- server/src/handlers/http/health_check.rs | 3 +-- server/src/handlers/http/ingest.rs | 22 ++++++++++++++++++++++ server/src/handlers/http/modal/server.rs | 1 - server/src/storage/staging.rs | 3 ++- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index 9e8e5a69a..957139657 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -27,11 +27,10 @@ use lazy_static::lazy_static; use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::{oneshot, Mutex}; -use tokio::time::{sleep, Duration}; // Create a global variable to store signal status lazy_static! { - static ref SIGNAL_RECEIVED: Arc> = Arc::new(Mutex::new(false)); + pub static ref SIGNAL_RECEIVED: Arc> = Arc::new(Mutex::new(false)); } pub async fn liveness() -> HttpResponse { diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7e36a4d8a..9502973c8 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,6 +16,7 @@ * */ +use super::health_check::SIGNAL_RECEIVED; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::otel; @@ -47,6 +48,13 @@ use std::sync::Arc; // ingests events by extracting stream name from header // creates if stream does not exist pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { + // Check if the application has received a shutdown signal + let shutdown_flag = SIGNAL_RECEIVED.lock().await; + if *shutdown_flag { + return Err(PostError::CustomError( + "Server is shutting down".to_string(), + )); + } if let Some((_, stream_name)) = req .headers() .iter() @@ -107,6 +115,13 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // ingests events by extracting stream name from header // creates if stream does not exist pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result { + // Check if the application has received a shutdown signal + let shutdown_flag = SIGNAL_RECEIVED.lock().await; + if *shutdown_flag { + return Err(PostError::CustomError( + "Server is shutting down".to_string(), + )); + } if let Some((_, stream_name)) = req .headers() .iter() @@ -143,6 +158,13 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result { + // Check if the application has received a shutdown signal + let shutdown_flag = SIGNAL_RECEIVED.lock().await; + if *shutdown_flag { + return Err(PostError::CustomError( + "Server is shutting down".to_string(), + )); + } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 5865ff915..ba1ab055c 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -41,7 +41,6 @@ use crate::users::filters::FILTERS; use actix_web::middleware::from_fn; use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; -use tokio::time::{sleep, Duration}; use actix_web::web::resource; use actix_web::Resource; diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 0b421fb1e..4f9fdd22b 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -162,7 +162,8 @@ impl StorageDir { if !shutdown_signal { arrow_files.retain(|path| { - path.file_name() + !path + .file_name() .unwrap() .to_str() .unwrap() From 17e049644d5b7c416fb8e03dac5c1c7f8884fea6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 25 Oct 2024 14:14:39 +0530 Subject: [PATCH 7/8] removed SIGTERM check from ingest handlers --- Cargo.lock | 63 ++++++++++++------------------ server/src/handlers/http/ingest.rs | 22 ----------- 2 files changed, 26 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a79a475f..6badbe695 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,9 +36,9 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.6.0" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d223b13fd481fc0d1f83bb12659ae774d9e3601814c68a0bc539731698cca743" +checksum = "d48f96fc3003717aeb9856ca3d02a8c7de502667ad76eeacd830b48d2e91fac4" dependencies = [ "actix-codec", "actix-rt", @@ -46,9 +46,9 @@ dependencies = [ "actix-tls", "actix-utils", "ahash", - "base64 0.21.7", + "base64 0.22.0", "bitflags 2.5.0", - "brotli 3.5.0", + "brotli", "bytes", "bytestring", "derive_more", @@ -86,13 +86,15 @@ dependencies = [ [[package]] name = "actix-router" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d22475596539443685426b6bdadb926ad0ecaefdfc5fb05e5e3441f15463c511" +checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8" dependencies = [ "bytestring", + "cfg-if", "http 0.2.12", "regex", + "regex-lite", "serde", "tracing", ] @@ -137,9 +139,9 @@ dependencies = [ [[package]] name = "actix-tls" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4cce60a2f2b477bc72e5cde0af1812a6e82d8fd85b5570a5dcf2a5bf2c5be5f" +checksum = "ac453898d866cdbecdbc2334fe1738c747b4eba14a677261f2b768ba05329389" dependencies = [ "actix-rt", "actix-service", @@ -166,9 +168,9 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.5.1" +version = "4.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a6556ddebb638c2358714d853257ed226ece6023ef9364f23f0c70737ea984" +checksum = "9180d76e5cc7ccbc4d60a506f2c727730b154010262df5b910eb17dbe4b8cb38" dependencies = [ "actix-codec", "actix-http", @@ -189,6 +191,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", + "impl-more", "itoa", "language-tags", "log", @@ -196,6 +199,7 @@ dependencies = [ "once_cell", "pin-project-lite", "regex", + "regex-lite", "serde", "serde_json", "serde_urlencoded", @@ -207,9 +211,9 @@ dependencies = [ [[package]] name = "actix-web-codegen" -version = "4.2.2" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1f50ebbb30eca122b188319a4398b3f7bb4a8cdf50ecfb73bfc6a3c3ce54f5" +checksum = "f591380e2e68490b5dfaf1dd1aa0ebe78d84ba7067078512b4ea6e4492d622b8" dependencies = [ "actix-router", "proc-macro2", @@ -888,17 +892,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "brotli" -version = "3.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor 2.5.1", -] - [[package]] name = "brotli" version = "6.0.0" @@ -907,17 +900,7 @@ checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor 4.0.1", -] - -[[package]] -name = "brotli-decompressor" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", + "brotli-decompressor", ] [[package]] @@ -3073,7 +3056,7 @@ dependencies = [ "arrow-schema", "arrow-select", "base64 0.22.0", - "brotli 6.0.0", + "brotli", "bytes", "chrono", "flate2", @@ -3448,7 +3431,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -3468,7 +3451,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.79", @@ -3663,6 +3646,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.8.5" diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 9502973c8..7e36a4d8a 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,7 +16,6 @@ * */ -use super::health_check::SIGNAL_RECEIVED; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::otel; @@ -48,13 +47,6 @@ use std::sync::Arc; // ingests events by extracting stream name from header // creates if stream does not exist pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { - // Check if the application has received a shutdown signal - let shutdown_flag = SIGNAL_RECEIVED.lock().await; - if *shutdown_flag { - return Err(PostError::CustomError( - "Server is shutting down".to_string(), - )); - } if let Some((_, stream_name)) = req .headers() .iter() @@ -115,13 +107,6 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // ingests events by extracting stream name from header // creates if stream does not exist pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result { - // Check if the application has received a shutdown signal - let shutdown_flag = SIGNAL_RECEIVED.lock().await; - if *shutdown_flag { - return Err(PostError::CustomError( - "Server is shutting down".to_string(), - )); - } if let Some((_, stream_name)) = req .headers() .iter() @@ -158,13 +143,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result { - // Check if the application has received a shutdown signal - let shutdown_flag = SIGNAL_RECEIVED.lock().await; - if *shutdown_flag { - return Err(PostError::CustomError( - "Server is shutting down".to_string(), - )); - } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { From c47b5471fdaf3d0b9ff6f296706b18073823008d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:40:39 +0530 Subject: [PATCH 8/8] Update object_storage.rs removed unnecessary log::debug statement --- server/src/storage/object_storage.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 14c1dc264..7e2f7f609 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -473,9 +473,6 @@ pub trait ObjectStorage: Sync + 'static { .to_str() .expect("filename is valid string"); - // Log the filename being processed - log::debug!("Processing file: {}", filename); - let mut file_date_part = filename.split('.').collect::>()[0]; file_date_part = file_date_part.split('=').collect::>()[1]; let compressed_size = file.metadata().map_or(0, |meta| meta.len());