From d31e0462a96ca85c42baf335d49b82d7402b21cd Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Thu, 29 Aug 2024 09:55:40 +0530 Subject: [PATCH 01/10] handle sigterm for readiness probe --- Cargo.lock | 1 + server/Cargo.toml | 1 + server/src/handlers/http/health_check.rs | 64 ++++++++++++++++-------- server/src/handlers/http/modal/server.rs | 4 ++ 4 files changed, 48 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8355e167..6f96b76d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3009,6 +3009,7 @@ dependencies = [ "humantime", "humantime-serde", "itertools 0.13.0", + "lazy_static", "log", "maplit", "mime", diff --git a/server/Cargo.toml b/server/Cargo.toml index 1d5a3d50d..0bc623a79 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -36,6 +36,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } argon2 = "0.5.0" async-trait = "0.1" base64 = "0.22.0" +lazy_static = "1.4" bytes = "1.4" byteorder = "1.4.3" bzip2 = { version = "*", features = ["static"] } diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index bb988b933..04795c6f5 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -1,34 +1,54 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - +use crate::option::CONFIG; use actix_web::http::StatusCode; use actix_web::HttpResponse; +use lazy_static::lazy_static; +use std::sync::Arc; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; -use crate::option::CONFIG; +// Create a global variable to store signal status +lazy_static! { + static ref SIGNAL_RECEIVED: Arc> = Arc::new(Mutex::new(false)); +} pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) } +// Initialize the signal handler and handle signals +pub async fn handle_signals() { + let signal_received = SIGNAL_RECEIVED.clone(); + + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler"); + println!("Signal handler task started"); + loop { + match sigterm.recv().await { + Some(_) => { + println!("Received SIGTERM signal"); + let mut shutdown_flag = signal_received.lock().await; + *shutdown_flag = true; + println!("Current signal flag value: {:?}", *shutdown_flag); + } + None => { + eprintln!("Signal handler received None, indicating an error or end of stream"); + } + } + }; +} + pub async fn readiness() -> HttpResponse { - if CONFIG.storage().get_object_store().check().await.is_ok() { - return HttpResponse::new(StatusCode::OK); + // Check if the application has received a shutdown signal + let shutdown_flag = SIGNAL_RECEIVED.lock().await; + if *shutdown_flag { + return HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE); } - HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE) + // Check the object store connection + if CONFIG.storage().get_object_store().check().await.is_ok() { + HttpResponse::new(StatusCode::OK) + } else { + HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE) + } } diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index e17080e31..fc6c1687f 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -531,6 +531,10 @@ impl Server { FILTERS.load().await?; DASHBOARDS.load().await?; + tokio::spawn(async { + health_check::handle_signals().await; + }); + storage::retention::load_retention_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = From 6e672078522adb1f878e02daa330c346b6834d0b Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Thu, 29 Aug 2024 16:26:28 +0530 Subject: [PATCH 02/10] update --- server/src/handlers/http/health_check.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index 04795c6f5..dd1a0b39f 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -2,6 +2,7 @@ use crate::option::CONFIG; use actix_web::http::StatusCode; use actix_web::HttpResponse; use lazy_static::lazy_static; +use std::cmp::min; use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::Mutex; @@ -22,14 +23,16 @@ pub async fn handle_signals() { let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler"); - println!("Signal handler task started"); + eprintln!("Signal handler task started"); loop { match sigterm.recv().await { Some(_) => { - println!("Received SIGTERM signal"); + eprintln!("Received SIGTERM signal"); let mut shutdown_flag = signal_received.lock().await; *shutdown_flag = true; - println!("Current signal flag value: {:?}", *shutdown_flag); + eprintln!("Current signal flag value: {:?}", *shutdown_flag); + + } None => { eprintln!("Signal handler received None, indicating an error or end of stream"); From 20dabd7a8010be99e17af3e9440e771bed5131f7 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Thu, 29 Aug 2024 19:49:26 +0530 Subject: [PATCH 03/10] add sigterm to handler --- server/src/handlers/http/health_check.rs | 37 ++++++++++--------- .../src/handlers/http/modal/ingest_server.rs | 15 ++++++++ server/src/handlers/http/modal/server.rs | 20 +++++++--- 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index dd1a0b39f..d0d541e7a 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -2,7 +2,6 @@ use crate::option::CONFIG; use actix_web::http::StatusCode; use actix_web::HttpResponse; use lazy_static::lazy_static; -use std::cmp::min; use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::Mutex; @@ -23,22 +22,26 @@ pub async fn handle_signals() { let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler"); - eprintln!("Signal handler task started"); - loop { - match sigterm.recv().await { - Some(_) => { - eprintln!("Received SIGTERM signal"); - let mut shutdown_flag = signal_received.lock().await; - *shutdown_flag = true; - eprintln!("Current signal flag value: {:?}", *shutdown_flag); - - - } - None => { - eprintln!("Signal handler received None, indicating an error or end of stream"); - } - } - }; + println!("Signal handler task started"); + + // Block until SIGTERM is received + match sigterm.recv().await { + Some(_) => { + println!("Received SIGTERM signal"); + let mut shutdown_flag = signal_received.lock().await; + *shutdown_flag = true; + println!("Current signal flag value: {:?}", *shutdown_flag); + + // Delay to allow readiness probe to return SERVICE_UNAVAILABLE + let _ = sleep(Duration::from_secs(15)).await; + + } + None => { + println!("Signal handler received None, indicating an error or end of stream"); + } + } + + eprintln!("Signal handler task completed"); } pub async fn readiness() -> HttpResponse { diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 68747c0b2..b400b0073 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -34,6 +34,8 @@ use crate::storage::staging; use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; +use crate::handlers::http::health_check; +use std::sync::Arc; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; @@ -56,6 +58,7 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; +use tokio::sync::Notify; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = @@ -91,6 +94,15 @@ impl ParseableServer for IngestServer { .wrap(cross_origin_config()) }; + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_clone = notify_shutdown.clone(); + + tokio::spawn(async move { + health_check::handle_signals().await; + println!("Received shutdown signal, notifying server to shut down..."); + notify_shutdown_clone.notify_one(); + }); + // concurrent workers equal to number of logical cores let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); @@ -103,6 +115,9 @@ impl ParseableServer for IngestServer { http_server.bind(&CONFIG.parseable.address)?.run().await?; } + notify_shutdown.notified().await; + + Ok(()) } diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index fc6c1687f..8edc0b7ec 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -37,11 +37,12 @@ use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use std::sync::Arc; +use tokio::sync::Notify; use actix_web::web::resource; use actix_web::Resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; +use actix_web::{web, App, HttpServer, Responder}; use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; @@ -96,6 +97,16 @@ impl ParseableServer for Server { &CONFIG.parseable.tls_key_path, )?; + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_clone = notify_shutdown.clone(); + + tokio::spawn(async move { + health_check::handle_signals().await; + println!("Received shutdown signal, notifying server to shut down..."); + notify_shutdown_clone.notify_one(); + }); + // concurrent workers equal to number of cores on the cpu let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); if let Some(config) = ssl { @@ -523,6 +534,7 @@ impl Server { .await?; }; + let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); @@ -531,10 +543,6 @@ impl Server { FILTERS.load().await?; DASHBOARDS.load().await?; - tokio::spawn(async { - health_check::handle_signals().await; - }); - storage::retention::load_retention_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = @@ -552,6 +560,7 @@ impl Server { let app = self.start(prometheus, CONFIG.parseable.openid.clone()); tokio::pin!(app); + loop { tokio::select! { e = &mut app => { @@ -578,6 +587,7 @@ impl Server { } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } + }; } } From ae443361dc5d0003d2722cd28d249e44a2bafd9b Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Sun, 1 Sep 2024 22:40:56 +0530 Subject: [PATCH 04/10] actix to shutdown connections --- Dockerfile | 2 +- Dockerfile.debug | 2 +- server/src/handlers/http/health_check.rs | 28 ++++++++--- .../src/handlers/http/modal/ingest_server.rs | 45 ++++++++++++------ .../src/handlers/http/modal/query_server.rs | 45 +++++++++++++++--- server/src/handlers/http/modal/server.rs | 46 +++++++++++++------ 6 files changed, 127 insertions(+), 41 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9dfcbc1e6..b9f7e36ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,4 +34,4 @@ WORKDIR /parseable COPY --from=builder /bin/sh /bin/sh COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable -CMD ["parseable"] +CMD ["/usr/bin/parseable"] diff --git a/Dockerfile.debug b/Dockerfile.debug index dd72acc0d..acbc44698 100644 --- a/Dockerfile.debug +++ b/Dockerfile.debug @@ -32,4 +32,4 @@ WORKDIR /parseable COPY --from=builder /parseable/target/debug/parseable /usr/bin/parseable -CMD ["parseable"] +CMD ["/usr/bin/parseable"] diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index d0d541e7a..9bd17f916 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -4,7 +4,7 @@ use actix_web::HttpResponse; use lazy_static::lazy_static; use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; -use tokio::sync::Mutex; +use tokio::sync::{oneshot, Mutex}; use tokio::time::{sleep, Duration}; // Create a global variable to store signal status @@ -16,25 +16,39 @@ pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) } -// Initialize the signal handler and handle signals -pub async fn handle_signals() { +pub async fn handle_signals(shutdown_signal: Arc>>>) { let signal_received = SIGNAL_RECEIVED.clone(); let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler"); - println!("Signal handler task started"); + log::info!("Signal handler task started"); // Block until SIGTERM is received match sigterm.recv().await { Some(_) => { - println!("Received SIGTERM signal"); + log::info!("Received SIGTERM signal at Readiness Probe Handler"); + + // Set the shutdown flag to true let mut shutdown_flag = signal_received.lock().await; *shutdown_flag = true; - println!("Current signal flag value: {:?}", *shutdown_flag); + + // Trigger graceful shutdown + if let Some(shutdown_sender) = shutdown_signal.lock().await.take() { + let _ = shutdown_sender.send(()); + } // Delay to allow readiness probe to return SERVICE_UNAVAILABLE - let _ = sleep(Duration::from_secs(15)).await; + let _ = sleep(Duration::from_secs(20)).await; + + // Sync to local + crate::event::STREAM_WRITERS.unset_all(); + + // Sync to S3 + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } + log::info!("Local and S3 Sync done, handler SIGTERM completed."); } None => { println!("Signal handler received None, indicating an error or end of stream"); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index b400b0073..a9746fae8 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,6 +18,7 @@ use crate::analytics; use crate::banner; use crate::handlers::airplane; +use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; @@ -34,7 +35,7 @@ use crate::storage::staging; use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; -use crate::handlers::http::health_check; + use std::sync::Arc; use super::server::Server; @@ -58,7 +59,7 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; -use tokio::sync::Notify; +use tokio::sync::{oneshot, Mutex}; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = @@ -94,29 +95,47 @@ impl ParseableServer for IngestServer { .wrap(cross_origin_config()) }; - let notify_shutdown = Arc::new(Notify::new()); - let notify_shutdown_clone = notify_shutdown.clone(); + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task tokio::spawn(async move { - health_check::handle_signals().await; + health_check::handle_signals(shutdown_signal).await; println!("Received shutdown signal, notifying server to shut down..."); - notify_shutdown_clone.notify_one(); }); - // concurrent workers equal to number of logical cores - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); - if let Some(config) = ssl { + // Start the server with or without TLS + let srv = if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? .run() - .await?; } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); - notify_shutdown.notified().await; + tokio::spawn(async move { + // Wait for the shutdown signal + shutdown_rx.await.ok(); + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + // Await the server to run and handle shutdown + srv.await?; Ok(()) } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 9d6f3fbdf..cc291f1a3 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -18,6 +18,7 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::health_check; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -32,6 +33,7 @@ use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; use std::sync::Arc; +use tokio::sync::{oneshot, Mutex}; use crate::option::CONFIG; @@ -74,16 +76,47 @@ impl ParseableServer for QueryServer { .wrap(cross_origin_config()) }; - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task + tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + println!("Received shutdown signal, notifying server to shut down..."); + }); + + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? .run() - .await?; } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + tokio::spawn(async move { + // Wait for the shutdown signal + shutdown_rx.await.ok(); + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the server to run and handle shutdown + srv.await?; Ok(()) } diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 8edc0b7ec..4eb104a01 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -37,12 +37,12 @@ use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use std::sync::Arc; -use tokio::sync::Notify; +use tokio::sync::{oneshot, Mutex}; use actix_web::web::resource; use actix_web::Resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer, Responder}; +use actix_web::{web, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; @@ -97,26 +97,47 @@ impl ParseableServer for Server { &CONFIG.parseable.tls_key_path, )?; + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - let notify_shutdown = Arc::new(Notify::new()); - let notify_shutdown_clone = notify_shutdown.clone(); + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + // Spawn the signal handler task tokio::spawn(async move { - health_check::handle_signals().await; + health_check::handle_signals(shutdown_signal).await; println!("Received shutdown signal, notifying server to shut down..."); - notify_shutdown_clone.notify_one(); }); - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? .run() - .await?; } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + tokio::spawn(async move { + // Wait for the shutdown signal + shutdown_rx.await.ok(); + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the server to run and handle shutdown + srv.await?; Ok(()) } @@ -534,7 +555,6 @@ impl Server { .await?; }; - let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); From ddaf19645ac462e08dccdb31d903dbe093f60a45 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Sun, 1 Sep 2024 23:01:44 +0530 Subject: [PATCH 05/10] add license --- server/src/handlers/http/health_check.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index 9bd17f916..9d9560404 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::option::CONFIG; use actix_web::http::StatusCode; use actix_web::HttpResponse; From ca63c9ee2b9ebad450bc529d895537fb3342cbcb Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Sun, 1 Sep 2024 23:02:15 +0530 Subject: [PATCH 06/10] 120 seconds --- server/src/handlers/http/modal/query_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index cc291f1a3..c3e6932fd 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -92,7 +92,7 @@ impl ParseableServer for QueryServer { // Create the HTTP server let http_server = HttpServer::new(create_app_fn) .workers(num_cpus::get()) - .shutdown_timeout(60); + .shutdown_timeout(120); // Start the server with or without TLS let srv = if let Some(config) = ssl { From 245f52a3e7174811fc86d15b8bff1434539ec9e2 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Sun, 1 Sep 2024 23:06:03 +0530 Subject: [PATCH 07/10] add log info --- server/src/handlers/http/health_check.rs | 4 ++-- server/src/handlers/http/modal/query_server.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index 9d9560404..de4a01ba9 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -69,11 +69,11 @@ pub async fn handle_signals(shutdown_signal: Arc { - println!("Signal handler received None, indicating an error or end of stream"); + log::info!("Signal handler received None, indicating an error or end of stream"); } } - eprintln!("Signal handler task completed"); + log::info!("Signal handler task completed"); } pub async fn readiness() -> HttpResponse { diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index c3e6932fd..204d0e384 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -86,7 +86,6 @@ impl ParseableServer for QueryServer { // Spawn the signal handler task tokio::spawn(async move { health_check::handle_signals(shutdown_signal).await; - println!("Received shutdown signal, notifying server to shut down..."); }); // Create the HTTP server From fd5d1e0409718b3ae530894ee400fd3a47cdd3be Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Sun, 1 Sep 2024 23:28:16 +0530 Subject: [PATCH 08/10] add probes --- helm/templates/ingestor-statefulset.yaml | 4 ++++ helm/templates/querier-statefulset.yaml | 4 ++++ helm/templates/standalone-deployment.yaml | 6 +++++- helm/values.yaml | 10 +++++++++- 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index 29038762c..a2cb58b3b 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -62,6 +62,10 @@ spec: value: "ingest" ports: - containerPort: {{ .Values.parseable.highAvailability.ingestor.port }} + {{- with .Values.readinessProbe }} + readinessProbe: + {{ toYaml . | nindent 12 }} + {{- end }} resources: {{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 12 }} {{- if .Values.parseable.persistence.ingestor.enabled }} diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml index 4e35abf95..c7a1130ad 100644 --- a/helm/templates/querier-statefulset.yaml +++ b/helm/templates/querier-statefulset.yaml @@ -67,6 +67,10 @@ spec: {{- end }} ports: - containerPort: 8000 + {{- with .Values.readinessProbe }} + readinessProbe: + {{ toYaml . | nindent 12 }} + {{- end }} resources: {{- toYaml .Values.parseable.resources | nindent 12 }} volumeMounts: diff --git a/helm/templates/standalone-deployment.yaml b/helm/templates/standalone-deployment.yaml index f7e18e8ac..249834fe4 100644 --- a/helm/templates/standalone-deployment.yaml +++ b/helm/templates/standalone-deployment.yaml @@ -18,7 +18,7 @@ spec: {{- toYaml . | nindent 8 }} {{- end }} labels: - {{- .Values.parseable.podLabels | toYaml | nindent 4 }} + {{- .Values.parseable.podLabels | toYaml | nindent 8 }} {{- include "parseable.labelsSelector" . | nindent 8 }} spec: {{- with .Values.parseable.imagePullSecrets }} @@ -73,6 +73,10 @@ spec: {{- end }} ports: - containerPort: 8000 + {{- with .Values.readinessProbe }} + readinessProbe: + {{ toYaml . | nindent 12 }} + {{- end }} resources: {{- toYaml .Values.parseable.resources | nindent 12 }} volumeMounts: diff --git a/helm/values.yaml b/helm/values.yaml index 6d765563e..ce8f24b8e 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -23,7 +23,11 @@ parseable: ## Console (UI) is available on the other service (that points to the query pod) service: type: ClusterIP - port: 80 + port: 80 + readinessProbe: + httpGet: + path: /api/v1/readiness + port: 8000 resources: limits: cpu: 500m @@ -104,6 +108,10 @@ parseable: service: type: ClusterIP port: 80 + readinessProbe: + httpGet: + path: /api/v1/readiness + port: 8000 resources: limits: cpu: 500m From acabaad36f9bce3b041eeabf9d603e1b85f7a4a0 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 3 Sep 2024 21:39:33 +0530 Subject: [PATCH 09/10] cli args and hostname --- Dockerfile | 1 - helm/templates/ingestor-statefulset.yaml | 11 +++++++++-- helm/templates/querier-statefulset.yaml | 10 ++++++++-- helm/templates/standalone-deployment.yaml | 9 +++++++-- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index b9f7e36ea..cfd88348b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,7 +31,6 @@ FROM gcr.io/distroless/cc-debian12:latest WORKDIR /parseable # Copy the static shell into base image. -COPY --from=builder /bin/sh /bin/sh COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable CMD ["/usr/bin/parseable"] diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index a2cb58b3b..38a7519cc 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -40,12 +40,19 @@ spec: {{- toYaml .Values.parseable.securityContext | nindent 8 }} image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} - command: ["/bin/sh", "-c"] - args: ["parseable s3-store --ingestor-endpoint=${HOSTNAME}.{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}"] + args: + - /use/bin/parseable + - s3-store + - --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }} env: {{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }} - name: {{ $key }} value: {{ tpl $value $ | quote }} + - name: HOSTNAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name {{- end }} {{- range $secret := .Values.parseable.s3ModeSecret }} {{- range $key := $secret.keys }} diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml index c7a1130ad..9aab8f281 100644 --- a/helm/templates/querier-statefulset.yaml +++ b/helm/templates/querier-statefulset.yaml @@ -41,8 +41,9 @@ spec: {{- toYaml .Values.parseable.securityContext | nindent 8 }} image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} - command: ["parseable"] - args: ["s3-store"] + args: + - /usr/bin/parseable + - s3-store env: {{- range $key, $value := .Values.parseable.env }} - name: {{ $key }} @@ -57,6 +58,11 @@ spec: secretKeyRef: name: {{ $secret.name }} key: {{ $key }} + - name: HOSTNAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name {{- end }} {{- end }} - name: P_MODE diff --git a/helm/templates/standalone-deployment.yaml b/helm/templates/standalone-deployment.yaml index 3e6d1bf2a..860bf1712 100644 --- a/helm/templates/standalone-deployment.yaml +++ b/helm/templates/standalone-deployment.yaml @@ -37,9 +37,9 @@ spec: # Uncomment to debug # command: [ "/bin/sh", "-c", "sleep 1000000" ] {{- if .Values.parseable.local }} - args: ["parseable", "local-store"] + args: ["/usr/bin/parseable", "local-store"] {{- else }} - args: ["parseable", "s3-store"] + args: ["/usr/bin/parseable", "s3-store"] {{- end }} env: {{- range $key, $value := .Values.parseable.env }} @@ -56,6 +56,11 @@ spec: secretKeyRef: name: {{ $secret.name }} key: {{ $key }} + - name: HOSTNAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name {{- end }} {{- end }} {{- else}} From 38f1d1028f04eddc0935f0bc840b9da6592aa26c Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 3 Sep 2024 21:50:01 +0530 Subject: [PATCH 10/10] fix typo --- helm/templates/ingestor-statefulset.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index 38a7519cc..3f29637d7 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -41,7 +41,7 @@ spec: image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} args: - - /use/bin/parseable + - /usr/bin/parseable - s3-store - --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }} env: