|
16 | 16 | * |
17 | 17 | */ |
18 | 18 |
|
| 19 | +use super::health_check::SIGNAL_RECEIVED; |
19 | 20 | use super::logstream::error::{CreateStreamError, StreamError}; |
20 | 21 | use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; |
21 | 22 | use super::otel; |
@@ -47,6 +48,13 @@ use std::sync::Arc; |
47 | 48 | // ingests events by extracting stream name from header |
48 | 49 | // creates if stream does not exist |
49 | 50 | pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> { |
| 51 | + // Check if the application has received a shutdown signal |
| 52 | + let shutdown_flag = SIGNAL_RECEIVED.lock().await; |
| 53 | + if *shutdown_flag { |
| 54 | + return Err(PostError::CustomError( |
| 55 | + "Server is shutting down".to_string(), |
| 56 | + )); |
| 57 | + } |
50 | 58 | if let Some((_, stream_name)) = req |
51 | 59 | .headers() |
52 | 60 | .iter() |
@@ -107,6 +115,13 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< |
107 | 115 | // ingests events by extracting stream name from header |
108 | 116 | // creates if stream does not exist |
109 | 117 | pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> { |
| 118 | + // Check if the application has received a shutdown signal |
| 119 | + let shutdown_flag = SIGNAL_RECEIVED.lock().await; |
| 120 | + if *shutdown_flag { |
| 121 | + return Err(PostError::CustomError( |
| 122 | + "Server is shutting down".to_string(), |
| 123 | + )); |
| 124 | + } |
110 | 125 | if let Some((_, stream_name)) = req |
111 | 126 | .headers() |
112 | 127 | .iter() |
@@ -143,6 +158,13 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo |
143 | 158 | // only ingests events into the specified logstream |
144 | 159 | // fails if the logstream does not exist |
145 | 160 | pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> { |
| 161 | + // Check if the application has received a shutdown signal |
| 162 | + let shutdown_flag = SIGNAL_RECEIVED.lock().await; |
| 163 | + if *shutdown_flag { |
| 164 | + return Err(PostError::CustomError( |
| 165 | + "Server is shutting down".to_string(), |
| 166 | + )); |
| 167 | + } |
146 | 168 | let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); |
147 | 169 | let internal_stream_names = STREAM_INFO.list_internal_streams(); |
148 | 170 | if internal_stream_names.contains(&stream_name) { |
|
0 commit comments