Skip to content

Commit 35b1c95

Browse files
committed
init
1 parent 5308808 commit 35b1c95

File tree

4 files changed

+52
-33
lines changed

4 files changed

+52
-33
lines changed

server/src/handlers/http/health_check.rs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use actix_web::middleware::Next;
2525
use actix_web::{Error, HttpResponse};
2626
use lazy_static::lazy_static;
2727
use std::sync::Arc;
28-
use tokio::signal::unix::{signal, SignalKind};
28+
use tokio::signal::ctrl_c;
29+
2930
use tokio::sync::{oneshot, Mutex};
3031

3132
// Create a global variable to store signal status
@@ -52,35 +53,45 @@ pub async fn check_shutdown_middleware(
5253
}
5354

5455
pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
55-
let mut sigterm =
56-
signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler");
57-
log::info!("Signal handler task started");
58-
59-
// Block until SIGTERM is received
60-
match sigterm.recv().await {
61-
Some(_) => {
62-
log::info!("Received SIGTERM signal at Readiness Probe Handler");
63-
64-
// Set the shutdown flag to true
65-
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
66-
*shutdown_flag = true;
67-
68-
// Sync to local
69-
crate::event::STREAM_WRITERS.unset_all();
70-
71-
// Trigger graceful shutdown
72-
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
73-
let _ = shutdown_sender.send(());
56+
#[cfg(windows)]
57+
{
58+
tokio::select! {
59+
_ = ctrl_c() => {
60+
log::info!("Received SIGINT signal at Readiness Probe Handler");
61+
shutdown(shutdown_signal).await;
7462
}
7563
}
76-
None => {
77-
log::info!("Signal handler received None, indicating an error or end of stream");
64+
}
65+
#[cfg(unix)]
66+
{
67+
use tokio::signal::unix::{signal, SignalKind};
68+
let mut sigterm = signal(SignalKind::terminate()).unwrap();
69+
tokio::select! {
70+
_ = ctrl_c() => {
71+
log::info!("Received SIGINT signal at Readiness Probe Handler");
72+
shutdown(shutdown_signal).await;
73+
},
74+
_ = sigterm.recv() => {
75+
log::info!("Received SIGTERM signal at Readiness Probe Handler");
76+
shutdown(shutdown_signal).await;
77+
}
7878
}
7979
}
80-
81-
log::info!("Signal handler task completed");
8280
}
8381

82+
async fn shutdown(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
83+
// Set the shutdown flag to true
84+
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
85+
*shutdown_flag = true;
86+
87+
// Sync to local
88+
crate::event::STREAM_WRITERS.unset_all();
89+
90+
// Trigger graceful shutdown
91+
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
92+
let _ = shutdown_sender.send(());
93+
}
94+
}
8495
pub async fn readiness() -> HttpResponse {
8596
// Check the object store connection
8697
if CONFIG.storage().get_object_store().check().await.is_ok() {

server/src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl ParseableServer for QueryServer {
9191
// Spawn the signal handler task
9292
let signal_task = tokio::spawn(async move {
9393
health_check::handle_signals(shutdown_signal).await;
94-
log::info!("Received shutdown signal, notifying server to shut down...");
94+
log::info!("Received signal, notifying server to shut down...");
9595
});
9696

9797
// Create the HTTP server

server/src/handlers/http/modal/server.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,9 @@ impl ParseableServer for Server {
108108
// Clone the shutdown signal for the signal handler
109109
let shutdown_signal = server_shutdown_signal.clone();
110110

111-
// Spawn the signal handler task
112111
let signal_task = tokio::spawn(async move {
113112
health_check::handle_signals(shutdown_signal).await;
114-
log::info!("Received shutdown signal, notifying server to shut down...");
113+
log::info!("Received signal, notifying server to shut down...");
115114
});
116115

117116
// Create the HTTP server

server/src/query/stream_schema_provider.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,21 @@ fn partitioned_files(
237237
// object_store::path::Path doesn't automatically deal with Windows path separators
238238
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
239239
// before sending the file path to PartitionedFile
240-
let pf = if CONFIG.storage_name.eq("drive") {
241-
let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
242-
PartitionedFile::new(file_path, file.file_size)
243-
} else {
244-
PartitionedFile::new(file_path, file.file_size)
245-
};
240+
let pf;
241+
242+
#[cfg(unix)]
243+
{
244+
pf = PartitionedFile::new(file_path, file.file_size);
245+
}
246+
#[cfg(windows)]
247+
{
248+
pf = if CONFIG.storage_name.eq("drive") {
249+
let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
250+
PartitionedFile::new(file_path, file.file_size)
251+
} else {
252+
PartitionedFile::new(file_path, file.file_size)
253+
};
254+
}
246255

247256
partitioned_files[index].push(pf);
248257
columns.into_iter().for_each(|col| {

0 commit comments

Comments
 (0)