Skip to content

Commit d36bf3f

Browse files
committed
fix: handle edge cases while parsing ingester indentifiers
1 parent 187854e commit d36bf3f

File tree

6 files changed

+42
-14
lines changed

6 files changed

+42
-14
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
2626
use crate::option::CONFIG;
2727

2828
use crate::metrics::prom_utils::Metrics;
29+
use crate::storage::ObjectStorageError;
2930
use actix_web::http::header;
3031
use actix_web::{HttpRequest, Responder};
3132
use http::StatusCode;
@@ -354,21 +355,31 @@ pub async fn remove_ingester(req: HttpRequest) -> Result<impl Responder, PostErr
354355
let domain_name = to_url_string(domain_name);
355356

356357
if check_liveness(&domain_name).await {
357-
return Err(PostError::Invalid(anyhow::anyhow!("Ingester is Online")));
358+
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
358359
}
359360

360361
let ingester_meta_filename = ingester_meta_filename(&domain_name);
361362
let object_store = CONFIG.storage().get_object_store();
362363
let msg = match object_store
363-
.delete_ingester_meta(ingester_meta_filename)
364+
.try_delete_ingester_meta(ingester_meta_filename)
364365
.await
365366
{
366367
Ok(_) => {
367-
format!("Ingester {} Removed", domain_name)
368+
format!("Node {} Removed Successfully", domain_name)
369+
}
370+
Err(err) => {
371+
if matches!(err, ObjectStorageError::IoError(_)) {
372+
format!("Node {} Not Found", domain_name)
373+
} else {
374+
format!(
375+
"Error Removing Node {}\n Reason: {}",
376+
domain_name,
377+
err
378+
)
379+
}
368380
}
369-
Err(err) => err.to_string(),
370381
};
371382

372-
log::error!("{}", &msg);
383+
log::info!("{}", &msg);
373384
Ok((msg, StatusCode::OK))
374385
}

server/src/handlers/http/cluster/utils.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,13 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
188188
}
189189

190190
pub async fn check_liveness(domain_name: &str) -> bool {
191-
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();
191+
let uri = match Url::parse(&format!("{}liveness", domain_name)) {
192+
Ok(uri) => uri,
193+
Err(err) => {
194+
log::error!("Node Indentifier Failed To Parse: {}", err);
195+
return false;
196+
}
197+
};
192198

193199
let reqw = reqwest::Client::new()
194200
.get(uri)

server/src/handlers/http/modal/query_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use crate::handlers::http::cluster::utils::check_liveness;
20-
use crate::handlers::http::cluster::{self, get_ingester_info, remove_ingester};
20+
use crate::handlers::http::cluster::{self, get_ingester_info};
2121
use crate::handlers::http::middleware::RouteExt;
2222
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
2323

@@ -156,7 +156,7 @@ impl QueryServer {
156156
web::scope("/{ingester}").service(
157157
web::resource("").route(
158158
web::delete()
159-
.to(remove_ingester)
159+
.to(cluster::remove_ingester)
160160
.authorize(Action::DeleteIngester),
161161
),
162162
),

server/src/storage/localfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl ObjectStorage for LocalFS {
193193
Ok(fs::remove_dir_all(path).await?)
194194
}
195195

196-
async fn delete_ingester_meta(
196+
async fn try_delete_ingester_meta(
197197
&self,
198198
ingester_filename: String,
199199
) -> Result<(), ObjectStorageError> {

server/src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub trait ObjectStorage: Sync + 'static {
8282
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
8383
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
8484
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
85-
async fn delete_ingester_meta(
85+
async fn try_delete_ingester_meta(
8686
&self,
8787
ingester_filename: String,
8888
) -> Result<(), ObjectStorageError>;

server/src/storage/s3.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -482,14 +482,25 @@ impl ObjectStorage for S3 {
482482
Ok(())
483483
}
484484

485-
async fn delete_ingester_meta(
485+
async fn try_delete_ingester_meta(
486486
&self,
487487
ingester_filename: String,
488488
) -> Result<(), ObjectStorageError> {
489489
let file = RelativePathBuf::from(&ingester_filename);
490-
self.client.delete(&to_object_store_path(&file)).await?;
491-
492-
Ok(())
490+
match self.client.delete(&to_object_store_path(&file)).await {
491+
Ok(_) => Ok(()),
492+
Err(err) => {
493+
// if the object is not found, it is not an error
494+
// the given url path was incorrect
495+
if matches!(err, object_store::Error::NotFound { .. }) {
496+
log::error!("Node does not exist");
497+
Err(err.into())
498+
} else {
499+
log::error!("Error deleting ingester meta file: {:?}", err);
500+
Err(err.into())
501+
}
502+
}
503+
}
493504
}
494505

495506
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {

0 commit comments

Comments
 (0)