Skip to content

Commit d8d15cf

Browse files
AdheipSinghnikhilsinhaparseable
authored andcommitted
sigterm fixes
1 parent ad977fd commit d8d15cf

File tree

6 files changed

+79
-47
lines changed

6 files changed

+79
-47
lines changed

server/src/handlers/http/health_check.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,23 +50,13 @@ pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()
5050
let mut shutdown_flag = signal_received.lock().await;
5151
*shutdown_flag = true;
5252

53-
// Trigger graceful shutdown
54-
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
55-
let _ = shutdown_sender.send(());
56-
}
57-
58-
// Delay to allow readiness probe to return SERVICE_UNAVAILABLE
59-
let _ = sleep(Duration::from_secs(20)).await;
60-
6153
// Sync to local
6254
crate::event::STREAM_WRITERS.unset_all();
6355

64-
// Sync to S3
65-
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
66-
log::warn!("Failed to sync local data with object store. {:?}", e);
56+
// Trigger graceful shutdown
57+
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
58+
let _ = shutdown_sender.send(());
6759
}
68-
69-
log::info!("Local and S3 Sync done, handler SIGTERM completed.");
7060
}
7161
None => {
7262
log::info!("Signal handler received None, indicating an error or end of stream");

server/src/handlers/http/modal/server.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::users::dashboards::DASHBOARDS;
4040
use crate::users::filters::FILTERS;
4141
use std::sync::Arc;
4242
use tokio::sync::{oneshot, Mutex};
43+
use tokio::time::{sleep, Duration};
4344

4445
use actix_web::web::resource;
4546
use actix_web::Resource;
@@ -64,7 +65,6 @@ use super::generate;
6465
use super::ssl_acceptor::get_ssl_acceptor;
6566
use super::OpenIdClient;
6667
use super::ParseableServer;
67-
6868
#[derive(Default)]
6969
pub struct Server;
7070

@@ -108,9 +108,9 @@ impl ParseableServer for Server {
108108
let shutdown_signal = server_shutdown_signal.clone();
109109

110110
// Spawn the signal handler task
111-
tokio::spawn(async move {
111+
let signal_task = tokio::spawn(async move {
112112
health_check::handle_signals(shutdown_signal).await;
113-
println!("Received shutdown signal, notifying server to shut down...");
113+
log::info!("Received shutdown signal, notifying server to shut down...");
114114
});
115115

116116
// Create the HTTP server
@@ -129,18 +129,41 @@ impl ParseableServer for Server {
129129

130130
// Graceful shutdown handling
131131
let srv_handle = srv.handle();
132-
133-
tokio::spawn(async move {
132+
133+
let sync_task = tokio::spawn(async move {
134134
// Wait for the shutdown signal
135-
shutdown_rx.await.ok();
135+
let _ = shutdown_rx.await;
136+
137+
// Perform S3 sync and wait for completion
138+
log::info!("Starting data sync to S3...");
139+
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
140+
log::warn!("Failed to sync local data with object store. {:?}", e);
141+
} else {
142+
log::info!("Successfully synced all data to S3.");
143+
}
136144

137145
// Initiate graceful shutdown
138146
log::info!("Graceful shutdown of HTTP server triggered");
139147
srv_handle.stop(true).await;
140148
});
141149

142-
// Await the server to run and handle shutdown
143-
srv.await?;
150+
// Await the HTTP server to run
151+
let server_result = srv.await;
152+
153+
// Await the signal handler to ensure proper cleanup
154+
if let Err(e) = signal_task.await {
155+
log::error!("Error in signal handler: {:?}", e);
156+
}
157+
158+
// Wait for the sync task to complete before exiting
159+
if let Err(e) = sync_task.await {
160+
log::error!("Error in sync task: {:?}", e);
161+
} else {
162+
log::info!("Sync task completed successfully.");
163+
}
164+
165+
// Return the result of the server
166+
server_result?;
144167

145168
Ok(())
146169
}

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use option::{Mode, CONFIG};
5252
use crate::handlers::http::modal::{
5353
ingest_server::IngestServer, query_server::QueryServer, server::Server,
5454
};
55-
pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
55+
pub const STORAGE_UPLOAD_INTERVAL: u32 = 600;
5656

5757
#[actix_web::main]
5858
async fn main() -> anyhow::Result<()> {

server/src/storage/object_storage.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -426,16 +426,16 @@ pub trait ObjectStorage: Sync + 'static {
426426
.await
427427
}
428428

429-
async fn sync(&self) -> Result<(), ObjectStorageError> {
429+
async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> {
430430
if !Path::new(&CONFIG.staging_dir()).exists() {
431431
return Ok(());
432432
}
433-
433+
434434
let streams = STREAM_INFO.list_streams();
435-
435+
436436
let cache_manager = LocalCacheManager::global();
437437
let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new();
438-
438+
439439
for stream in &streams {
440440
let cache_enabled = STREAM_INFO
441441
.get_cache_enabled(stream)
@@ -452,9 +452,10 @@ pub trait ObjectStorage: Sync + 'static {
452452
&dir,
453453
time_partition,
454454
custom_partition.clone(),
455+
shutdown_signal,
455456
)
456457
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
457-
458+
458459
if let Some(schema) = schema {
459460
let static_schema_flag = STREAM_INFO
460461
.get_static_schema_flag(stream)
@@ -463,14 +464,18 @@ pub trait ObjectStorage: Sync + 'static {
463464
commit_schema_to_storage(stream, schema).await?;
464465
}
465466
}
467+
466468
let parquet_files = dir.parquet_files();
467-
468469
for file in parquet_files {
469470
let filename = file
470471
.file_name()
471472
.expect("only parquet files are returned by iterator")
472473
.to_str()
473474
.expect("filename is valid string");
475+
476+
// Log the filename being processed
477+
log::debug!("Processing file: {}", filename);
478+
474479
let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0];
475480
file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1];
476481
let compressed_size = file.metadata().map_or(0, |meta| meta.len());
@@ -484,7 +489,7 @@ pub trait ObjectStorage: Sync + 'static {
484489
.with_label_values(&["data", stream, "parquet"])
485490
.add(compressed_size as i64);
486491
let mut file_suffix = str::replacen(filename, ".", "/", 3);
487-
492+
488493
let custom_partition_clone = custom_partition.clone();
489494
if custom_partition_clone.is_some() {
490495
let custom_partition_fields = custom_partition_clone.unwrap();
@@ -493,8 +498,15 @@ pub trait ObjectStorage: Sync + 'static {
493498
file_suffix =
494499
str::replacen(filename, ".", "/", 3 + custom_partition_list.len());
495500
}
501+
496502
let stream_relative_path = format!("{stream}/{file_suffix}");
497-
self.upload_file(&stream_relative_path, &file).await?;
503+
504+
// Try uploading the file, handle potential errors without breaking the loop
505+
if let Err(e) = self.upload_file(&stream_relative_path, &file).await {
506+
log::error!("Failed to upload file {}: {:?}", filename, e);
507+
continue; // Skip to the next file
508+
}
509+
498510
let absolute_path = self
499511
.absolute_url(RelativePath::from_path(&stream_relative_path).unwrap())
500512
.to_string();
@@ -512,28 +524,30 @@ pub trait ObjectStorage: Sync + 'static {
512524
}
513525
}
514526
}
515-
527+
528+
// Cache management logic
516529
if let Some(manager) = cache_manager {
517530
let cache_updates = cache_updates
518531
.into_iter()
519532
.map(|(key, value)| (key.to_owned(), value))
520533
.collect_vec();
521-
534+
522535
tokio::spawn(async move {
523536
for (stream, files) in cache_updates {
524537
for (storage_path, file) in files {
525-
manager
538+
if let Err(e) = manager
526539
.move_to_cache(&stream, storage_path, file.to_owned())
527-
.await
528-
.unwrap()
540+
.await {
541+
log::error!("Failed to move file to cache: {:?}", e);
542+
}
529543
}
530544
}
531545
});
532546
}
533-
547+
534548
Ok(())
535549
}
536-
550+
537551
// pick a better name
538552
fn get_bucket_name(&self) -> String;
539553
}

server/src/storage/staging.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,21 @@ impl StorageDir {
155155
&self,
156156
exclude: NaiveDateTime,
157157
stream: &str,
158+
shutdown_signal: bool,
158159
) -> HashMap<PathBuf, Vec<PathBuf>> {
159160
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
160161
let mut arrow_files = self.arrow_files();
161-
arrow_files.retain(|path| {
162-
!path
163-
.file_name()
164-
.unwrap()
165-
.to_str()
166-
.unwrap()
167-
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
168-
});
162+
163+
if !shutdown_signal {
164+
arrow_files.retain(|path| {
165+
path.file_name()
166+
.unwrap()
167+
.to_str()
168+
.unwrap()
169+
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
170+
});
171+
}
172+
169173
let random_string =
170174
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15);
171175
for arrow_file_path in arrow_files {
@@ -223,11 +227,12 @@ pub fn convert_disk_files_to_parquet(
223227
dir: &StorageDir,
224228
time_partition: Option<String>,
225229
custom_partition: Option<String>,
230+
shutdown_signal: bool,
226231
) -> Result<Option<Schema>, MoveDataError> {
227232
let mut schemas = Vec::new();
228233

229234
let time = chrono::Utc::now().naive_utc();
230-
let staging_files = dir.arrow_files_grouped_exclude_time(time, stream);
235+
let staging_files = dir.arrow_files_grouped_exclude_time(time, stream, shutdown_signal);
231236
if staging_files.is_empty() {
232237
metrics::STAGING_FILES.with_label_values(&[stream]).set(0);
233238
metrics::STORAGE_SIZE

server/src/sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub async fn object_store_sync() -> (
4040
.every(STORAGE_UPLOAD_INTERVAL.seconds())
4141
.plus(5u32.seconds())
4242
.run(|| async {
43-
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
43+
if let Err(e) = CONFIG.storage().get_object_store().sync(false).await {
4444
log::warn!("failed to sync local data with object store. {:?}", e);
4545
}
4646
});

0 commit comments

Comments
 (0)