Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ FROM gcr.io/distroless/cc-debian12:latest
WORKDIR /parseable

# Copy the static shell into base image.
COPY --from=builder /bin/sh /bin/sh
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable

CMD ["parseable"]
CMD ["/usr/bin/parseable"]
2 changes: 1 addition & 1 deletion Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ WORKDIR /parseable

COPY --from=builder /parseable/target/debug/parseable /usr/bin/parseable

CMD ["parseable"]
CMD ["/usr/bin/parseable"]
15 changes: 13 additions & 2 deletions helm/templates/ingestor-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ spec:
{{- toYaml .Values.parseable.securityContext | nindent 8 }}
image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }}
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
command: ["/bin/sh", "-c"]
args: ["parseable s3-store --ingestor-endpoint=${HOSTNAME}.{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}"]
args:
- /usr/bin/parseable
- s3-store
- --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}
env:
{{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }}
- name: {{ $key }}
value: {{ tpl $value $ | quote }}
- name: HOSTNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
{{- end }}
{{- range $secret := .Values.parseable.s3ModeSecret }}
{{- range $key := $secret.keys }}
Expand All @@ -62,6 +69,10 @@ spec:
value: "ingest"
ports:
- containerPort: {{ .Values.parseable.highAvailability.ingestor.port }}
{{- with .Values.readinessProbe }}
readinessProbe:
{{ toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 12 }}
{{- if .Values.parseable.persistence.ingestor.enabled }}
Expand Down
14 changes: 12 additions & 2 deletions helm/templates/querier-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ spec:
{{- toYaml .Values.parseable.securityContext | nindent 8 }}
image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }}
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
command: ["parseable"]
args: ["s3-store"]
args:
- /usr/bin/parseable
- s3-store
env:
{{- range $key, $value := .Values.parseable.env }}
- name: {{ $key }}
Expand All @@ -57,6 +58,11 @@ spec:
secretKeyRef:
name: {{ $secret.name }}
key: {{ $key }}
- name: HOSTNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
{{- end }}
{{- end }}
- name: P_MODE
Expand All @@ -67,6 +73,10 @@ spec:
{{- end }}
ports:
- containerPort: 8000
{{- with .Values.readinessProbe }}
readinessProbe:
{{ toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.parseable.resources | nindent 12 }}
volumeMounts:
Expand Down
13 changes: 11 additions & 2 deletions helm/templates/standalone-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ spec:
# Uncomment to debug
# command: [ "/bin/sh", "-c", "sleep 1000000" ]
{{- if .Values.parseable.local }}
args: ["parseable", "local-store"]
args: ["/usr/bin/parseable", "local-store"]
{{- else }}
args: ["parseable", "s3-store"]
args: ["/usr/bin/parseable", "s3-store"]
{{- end }}
env:
{{- range $key, $value := .Values.parseable.env }}
Expand All @@ -56,6 +56,11 @@ spec:
secretKeyRef:
name: {{ $secret.name }}
key: {{ $key }}
- name: HOSTNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
{{- end }}
{{- end }}
{{- else}}
Expand All @@ -73,6 +78,10 @@ spec:
{{- end }}
ports:
- containerPort: 8000
{{- with .Values.readinessProbe }}
readinessProbe:
{{ toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.parseable.resources | nindent 12 }}
volumeMounts:
Expand Down
10 changes: 9 additions & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ parseable:
## Console (UI) is available on the other service (that points to the query pod)
service:
type: ClusterIP
port: 80
port: 80
readinessProbe:
httpGet:
path: /api/v1/readiness
port: 8000
resources:
limits:
cpu: 500m
Expand Down Expand Up @@ -104,6 +108,10 @@ parseable:
service:
type: ClusterIP
port: 80
readinessProbe:
httpGet:
path: /api/v1/readiness
port: 8000
resources:
limits:
cpu: 500m
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
argon2 = "0.5.0"
async-trait = "0.1"
base64 = "0.22.0"
lazy_static = "1.4"
bytes = "1.4"
byteorder = "1.4.3"
bzip2 = { version = "*", features = ["static"] }
Expand Down
66 changes: 62 additions & 4 deletions server/src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,77 @@
*
*/

use crate::option::CONFIG;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{oneshot, Mutex};
use tokio::time::{sleep, Duration};

use crate::option::CONFIG;
// Create a global variable to store signal status
lazy_static! {
static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
}

pub async fn liveness() -> HttpResponse {
HttpResponse::new(StatusCode::OK)
}

pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
let signal_received = SIGNAL_RECEIVED.clone();

let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler");
log::info!("Signal handler task started");

// Block until SIGTERM is received
match sigterm.recv().await {
Some(_) => {
log::info!("Received SIGTERM signal at Readiness Probe Handler");

// Set the shutdown flag to true
let mut shutdown_flag = signal_received.lock().await;
*shutdown_flag = true;

// Trigger graceful shutdown
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
let _ = shutdown_sender.send(());
}

// Delay to allow readiness probe to return SERVICE_UNAVAILABLE
let _ = sleep(Duration::from_secs(20)).await;

// Sync to local
crate::event::STREAM_WRITERS.unset_all();

// Sync to S3
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
log::warn!("Failed to sync local data with object store. {:?}", e);
}

log::info!("Local and S3 Sync done, handler SIGTERM completed.");
}
None => {
log::info!("Signal handler received None, indicating an error or end of stream");
}
}

log::info!("Signal handler task completed");
}

pub async fn readiness() -> HttpResponse {
if CONFIG.storage().get_object_store().check().await.is_ok() {
return HttpResponse::new(StatusCode::OK);
// Check if the application has received a shutdown signal
let shutdown_flag = SIGNAL_RECEIVED.lock().await;
if *shutdown_flag {
return HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE);
}

HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE)
// Check the object store connection
if CONFIG.storage().get_object_store().check().await.is_ok() {
HttpResponse::new(StatusCode::OK)
} else {
HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE)
}
}
46 changes: 40 additions & 6 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::analytics;
use crate::banner;
use crate::handlers::airplane;
use crate::handlers::http::health_check;
use crate::handlers::http::ingest;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::RouteExt;
Expand All @@ -35,6 +36,8 @@ use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use std::sync::Arc;

use super::server::Server;
use super::ssl_acceptor::get_ssl_acceptor;
use super::IngestorMetadata;
Expand All @@ -56,6 +59,7 @@ use bytes::Bytes;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::{oneshot, Mutex};

/// ! have to use a guard before using it
pub static INGESTOR_META: Lazy<IngestorMetadata> =
Expand Down Expand Up @@ -91,17 +95,47 @@ impl ParseableServer for IngestServer {
.wrap(cross_origin_config())
};

// concurrent workers equal to number of logical cores
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
});

if let Some(config) = ssl {
// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
.await?;
} else {
http_server.bind(&CONFIG.parseable.address)?.run().await?;
}
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;

Ok(())
}
Expand Down
44 changes: 38 additions & 6 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::health_check;
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
Expand All @@ -32,6 +33,7 @@ use actix_web::web::ServiceConfig;
use actix_web::{App, HttpServer};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};

use crate::option::CONFIG;

Expand Down Expand Up @@ -74,16 +76,46 @@ impl ParseableServer for QueryServer {
.wrap(cross_origin_config())
};

// concurrent workers equal to number of cores on the cpu
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
if let Some(config) = ssl {
// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
});

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(120);

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
.await?;
} else {
http_server.bind(&CONFIG.parseable.address)?.run().await?;
}
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;

Ok(())
}
Expand Down
Loading