diff --git a/Cargo.lock b/Cargo.lock index f8355e167..6f96b76d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3009,6 +3009,7 @@ dependencies = [ "humantime", "humantime-serde", "itertools 0.13.0", + "lazy_static", "log", "maplit", "mime", diff --git a/Dockerfile b/Dockerfile index 9dfcbc1e6..cfd88348b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,7 +31,6 @@ FROM gcr.io/distroless/cc-debian12:latest WORKDIR /parseable # Copy the static shell into base image. -COPY --from=builder /bin/sh /bin/sh COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable -CMD ["parseable"] +CMD ["/usr/bin/parseable"] diff --git a/Dockerfile.debug b/Dockerfile.debug index dd72acc0d..acbc44698 100644 --- a/Dockerfile.debug +++ b/Dockerfile.debug @@ -32,4 +32,4 @@ WORKDIR /parseable COPY --from=builder /parseable/target/debug/parseable /usr/bin/parseable -CMD ["parseable"] +CMD ["/usr/bin/parseable"] diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index 29038762c..3f29637d7 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -40,12 +40,19 @@ spec: {{- toYaml .Values.parseable.securityContext | nindent 8 }} image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} - command: ["/bin/sh", "-c"] - args: ["parseable s3-store --ingestor-endpoint=${HOSTNAME}.{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}"] + args: + - /usr/bin/parseable + - s3-store + - --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }} env: {{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }} - name: {{ $key }} value: {{ tpl $value $ | quote }} + - name: HOSTNAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name {{- end }} {{- range $secret := .Values.parseable.s3ModeSecret }} {{- range $key := $secret.keys }} @@ -62,6 +69,10 @@ spec: value: "ingest" ports: - containerPort: {{ .Values.parseable.highAvailability.ingestor.port }} + {{- with .Values.readinessProbe }} + readinessProbe: + {{ toYaml . | nindent 12 }} + {{- end }} resources: {{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 12 }} {{- if .Values.parseable.persistence.ingestor.enabled }} diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml index 4e35abf95..9aab8f281 100644 --- a/helm/templates/querier-statefulset.yaml +++ b/helm/templates/querier-statefulset.yaml @@ -41,8 +41,9 @@ spec: {{- toYaml .Values.parseable.securityContext | nindent 8 }} image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} - command: ["parseable"] - args: ["s3-store"] + args: + - /usr/bin/parseable + - s3-store env: {{- range $key, $value := .Values.parseable.env }} - name: {{ $key }} @@ -57,6 +58,11 @@ spec: secretKeyRef: name: {{ $secret.name }} key: {{ $key }} + - name: HOSTNAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name {{- end }} {{- end }} - name: P_MODE @@ -67,6 +73,10 @@ spec: {{- end }} ports: - containerPort: 8000 + {{- with .Values.readinessProbe }} + readinessProbe: + {{ toYaml . | nindent 12 }} + {{- end }} resources: {{- toYaml .Values.parseable.resources | nindent 12 }} volumeMounts: diff --git a/helm/templates/standalone-deployment.yaml b/helm/templates/standalone-deployment.yaml index abb552ba6..860bf1712 100644 --- a/helm/templates/standalone-deployment.yaml +++ b/helm/templates/standalone-deployment.yaml @@ -37,9 +37,9 @@ spec: # Uncomment to debug # command: [ "/bin/sh", "-c", "sleep 1000000" ] {{- if .Values.parseable.local }} - args: ["parseable", "local-store"] + args: ["/usr/bin/parseable", "local-store"] {{- else }} - args: ["parseable", "s3-store"] + args: ["/usr/bin/parseable", "s3-store"] {{- end }} env: {{- range $key, $value := .Values.parseable.env }} @@ -56,6 +56,11 @@ spec: secretKeyRef: name: {{ $secret.name }} key: {{ $key }} + - name: HOSTNAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name {{- end }} {{- end }} {{- else}} @@ -73,6 +78,10 @@ spec: {{- end }} ports: - containerPort: 8000 + {{- with .Values.readinessProbe }} + readinessProbe: + {{ toYaml . | nindent 12 }} + {{- end }} resources: {{- toYaml .Values.parseable.resources | nindent 12 }} volumeMounts: diff --git a/helm/values.yaml b/helm/values.yaml index 6d765563e..ce8f24b8e 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -23,7 +23,11 @@ parseable: ## Console (UI) is available on the other service (that points to the query pod) service: type: ClusterIP - port: 80 + port: 80 + readinessProbe: + httpGet: + path: /api/v1/readiness + port: 8000 resources: limits: cpu: 500m @@ -104,6 +108,10 @@ parseable: service: type: ClusterIP port: 80 + readinessProbe: + httpGet: + path: /api/v1/readiness + port: 8000 resources: limits: cpu: 500m diff --git a/server/Cargo.toml b/server/Cargo.toml index 1d5a3d50d..0bc623a79 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -36,6 +36,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } argon2 = "0.5.0" async-trait = "0.1" base64 = "0.22.0" +lazy_static = "1.4" bytes = "1.4" byteorder = "1.4.3" bzip2 = { version = "*", features = ["static"] } diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index bb988b933..de4a01ba9 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -16,19 +16,77 @@ * */ +use crate::option::CONFIG; use actix_web::http::StatusCode; use actix_web::HttpResponse; +use lazy_static::lazy_static; +use std::sync::Arc; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::{oneshot, Mutex}; +use tokio::time::{sleep, Duration}; -use crate::option::CONFIG; +// Create a global variable to store signal status +lazy_static! { + static ref SIGNAL_RECEIVED: Arc> = Arc::new(Mutex::new(false)); +} pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) } +pub async fn handle_signals(shutdown_signal: Arc>>>) { + let signal_received = SIGNAL_RECEIVED.clone(); + + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler"); + log::info!("Signal handler task started"); + + // Block until SIGTERM is received + match sigterm.recv().await { + Some(_) => { + log::info!("Received SIGTERM signal at Readiness Probe Handler"); + + // Set the shutdown flag to true + let mut shutdown_flag = signal_received.lock().await; + *shutdown_flag = true; + + // Trigger graceful shutdown + if let Some(shutdown_sender) = shutdown_signal.lock().await.take() { + let _ = shutdown_sender.send(()); + } + + // Delay to allow readiness probe to return SERVICE_UNAVAILABLE + let _ = sleep(Duration::from_secs(20)).await; + + // Sync to local + crate::event::STREAM_WRITERS.unset_all(); + + // Sync to S3 + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } + + log::info!("Local and S3 Sync done, handler SIGTERM completed."); + } + None => { + log::info!("Signal handler received None, indicating an error or end of stream"); + } + } + + log::info!("Signal handler task completed"); +} + pub async fn readiness() -> HttpResponse { - if CONFIG.storage().get_object_store().check().await.is_ok() { - return HttpResponse::new(StatusCode::OK); + // Check if the application has received a shutdown signal + let shutdown_flag = SIGNAL_RECEIVED.lock().await; + if *shutdown_flag { + return HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE); } - HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE) + // Check the object store connection + if CONFIG.storage().get_object_store().check().await.is_ok() { + HttpResponse::new(StatusCode::OK) + } else { + HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE) + } } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 68747c0b2..a9746fae8 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,6 +18,7 @@ use crate::analytics; use crate::banner; use crate::handlers::airplane; +use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; @@ -35,6 +36,8 @@ use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; +use std::sync::Arc; + use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::IngestorMetadata; @@ -56,6 +59,7 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; +use tokio::sync::{oneshot, Mutex}; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = @@ -91,17 +95,47 @@ impl ParseableServer for IngestServer { .wrap(cross_origin_config()) }; - // concurrent workers equal to number of logical cores - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task + tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + println!("Received shutdown signal, notifying server to shut down..."); + }); - if let Some(config) = ssl { + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? .run() - .await?; } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + tokio::spawn(async move { + // Wait for the shutdown signal + shutdown_rx.await.ok(); + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the server to run and handle shutdown + srv.await?; Ok(()) } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 9d6f3fbdf..204d0e384 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -18,6 +18,7 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::health_check; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -32,6 +33,7 @@ use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; use std::sync::Arc; +use tokio::sync::{oneshot, Mutex}; use crate::option::CONFIG; @@ -74,16 +76,46 @@ impl ParseableServer for QueryServer { .wrap(cross_origin_config()) }; - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task + tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + }); + + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(120); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? .run() - .await?; } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + tokio::spawn(async move { + // Wait for the shutdown signal + shutdown_rx.await.ok(); + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the server to run and handle shutdown + srv.await?; Ok(()) } diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index e17080e31..4eb104a01 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -37,6 +37,7 @@ use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use std::sync::Arc; +use tokio::sync::{oneshot, Mutex}; use actix_web::web::resource; use actix_web::Resource; @@ -96,16 +97,47 @@ impl ParseableServer for Server { &CONFIG.parseable.tls_key_path, )?; - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task + tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + println!("Received shutdown signal, notifying server to shut down..."); + }); + + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? .run() - .await?; } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + tokio::spawn(async move { + // Wait for the shutdown signal + shutdown_rx.await.ok(); + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the server to run and handle shutdown + srv.await?; Ok(()) } @@ -548,6 +580,7 @@ impl Server { let app = self.start(prometheus, CONFIG.parseable.openid.clone()); tokio::pin!(app); + loop { tokio::select! { e = &mut app => { @@ -574,6 +607,7 @@ impl Server { } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } + }; } }