Skip to content

Commit 5f00a8a

Browse files
AdheipSinghnikhilsinhaparseable
authored andcommitted
sigterm on ingest and query
1 parent 7a9db04 commit 5f00a8a

File tree

2 files changed

+60
-13
lines changed

2 files changed

+60
-13
lines changed

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ impl ParseableServer for IngestServer {
110110
let shutdown_signal = server_shutdown_signal.clone();
111111

112112
// Spawn the signal handler task
113-
tokio::spawn(async move {
113+
let signal_task = tokio::spawn(async move {
114114
health_check::handle_signals(shutdown_signal).await;
115-
println!("Received shutdown signal, notifying server to shut down...");
115+
log::info!("Received shutdown signal, notifying server to shut down...");
116116
});
117117

118118
// Create the HTTP server
@@ -131,18 +131,41 @@ impl ParseableServer for IngestServer {
131131

132132
// Graceful shutdown handling
133133
let srv_handle = srv.handle();
134-
135-
tokio::spawn(async move {
134+
135+
let sync_task = tokio::spawn(async move {
136136
// Wait for the shutdown signal
137-
shutdown_rx.await.ok();
137+
let _ = shutdown_rx.await;
138+
139+
// Perform S3 sync and wait for completion
140+
log::info!("Starting data sync to S3...");
141+
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
142+
log::warn!("Failed to sync local data with object store. {:?}", e);
143+
} else {
144+
log::info!("Successfully synced all data to S3.");
145+
}
138146

139147
// Initiate graceful shutdown
140148
log::info!("Graceful shutdown of HTTP server triggered");
141149
srv_handle.stop(true).await;
142150
});
143151

144-
// Await the server to run and handle shutdown
145-
srv.await?;
152+
// Await the HTTP server to run
153+
let server_result = srv.await;
154+
155+
// Await the signal handler to ensure proper cleanup
156+
if let Err(e) = signal_task.await {
157+
log::error!("Error in signal handler: {:?}", e);
158+
}
159+
160+
// Wait for the sync task to complete before exiting
161+
if let Err(e) = sync_task.await {
162+
log::error!("Error in sync task: {:?}", e);
163+
} else {
164+
log::info!("Sync task completed successfully.");
165+
}
166+
167+
// Return the result of the server
168+
server_result?;
146169

147170
Ok(())
148171
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ impl ParseableServer for QueryServer {
8787
let shutdown_signal = server_shutdown_signal.clone();
8888

8989
// Spawn the signal handler task
90-
tokio::spawn(async move {
90+
let signal_task = tokio::spawn(async move {
9191
health_check::handle_signals(shutdown_signal).await;
92+
log::info!("Received shutdown signal, notifying server to shut down...");
9293
});
9394

9495
// Create the HTTP server
@@ -107,18 +108,41 @@ impl ParseableServer for QueryServer {
107108

108109
// Graceful shutdown handling
109110
let srv_handle = srv.handle();
110-
111-
tokio::spawn(async move {
111+
112+
let sync_task = tokio::spawn(async move {
112113
// Wait for the shutdown signal
113-
shutdown_rx.await.ok();
114+
let _ = shutdown_rx.await;
115+
116+
// Perform S3 sync and wait for completion
117+
log::info!("Starting data sync to S3...");
118+
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
119+
log::warn!("Failed to sync local data with object store. {:?}", e);
120+
} else {
121+
log::info!("Successfully synced all data to S3.");
122+
}
114123

115124
// Initiate graceful shutdown
116125
log::info!("Graceful shutdown of HTTP server triggered");
117126
srv_handle.stop(true).await;
118127
});
119128

120-
// Await the server to run and handle shutdown
121-
srv.await?;
129+
// Await the HTTP server to run
130+
let server_result = srv.await;
131+
132+
// Await the signal handler to ensure proper cleanup
133+
if let Err(e) = signal_task.await {
134+
log::error!("Error in signal handler: {:?}", e);
135+
}
136+
137+
// Wait for the sync task to complete before exiting
138+
if let Err(e) = sync_task.await {
139+
log::error!("Error in sync task: {:?}", e);
140+
} else {
141+
log::info!("Sync task completed successfully.");
142+
}
143+
144+
// Return the result of the server
145+
server_result?;
122146

123147
Ok(())
124148
}

0 commit comments

Comments
 (0)