From cb0e27afa380d7db3abb03eb108cc216c925b277 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 28 Mar 2024 13:23:17 +0530 Subject: [PATCH 1/3] fix: query fail when an Ingest Server is down --- server/src/handlers/http.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 8026626e7..959eb2ed0 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -30,7 +30,7 @@ mod kinesis; pub(crate) mod llm; pub(crate) mod logstream; pub(crate) mod middleware; -pub(crate) mod modal; +pub mod modal; pub(crate) mod oidc; mod otel; pub(crate) mod query; @@ -108,9 +108,10 @@ pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result Date: Thu, 28 Mar 2024 13:27:11 +0530 Subject: [PATCH 2/3] add delete ingester endpoint 1. Add delete_ingester_meta func in ObjectStorage Trait 2. Update Permission Actions 3. Update PostError 4. Add delete Ingeter Endpoint -> `api/v1/cluster/ingester_ip%3Aingester_port` --- server/src/handlers/http/cluster/mod.rs | 29 ++++++++++++++++++- server/src/handlers/http/cluster/utils.rs | 22 ++++++++++++++ server/src/handlers/http/ingest.rs | 4 +++ .../src/handlers/http/modal/query_server.rs | 14 ++++++++- server/src/rbac/role.rs | 2 ++ server/src/storage/localfs.rs | 8 +++++ server/src/storage/object_storage.rs | 5 +++- server/src/storage/s3.rs | 12 +++++++- 8 files changed, 92 insertions(+), 4 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 54c829b0d..4f14d2996 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -18,13 +18,16 @@ pub mod utils; +use crate::handlers::http::cluster::utils::{ + check_liveness, ingester_meta_filename, to_url_string, +}; use crate::handlers::http::ingest::PostError; use crate::handlers::http::logstream::error::StreamError; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use actix_web::http::header; -use actix_web::Responder; +use actix_web::{HttpRequest, Responder}; use http::StatusCode; use itertools::Itertools; use relative_path::RelativePathBuf; @@ -345,3 +348,27 @@ pub async fn get_ingester_info() -> anyhow::Result { Ok(arr) } + +pub async fn remove_ingester(req: HttpRequest) -> Result { + let domain_name: String = req.match_info().get("ingester").unwrap().parse().unwrap(); + let domain_name = to_url_string(domain_name); + + if check_liveness(&domain_name).await { + return Err(PostError::Invalid(anyhow::anyhow!("Ingester is Online"))); + } + + let ingester_meta_filename = ingester_meta_filename(&domain_name); + let object_store = CONFIG.storage().get_object_store(); + let msg = match object_store + .delete_ingester_meta(ingester_meta_filename) + .await + { + Ok(_) => { + format!("Ingester {} Removed", domain_name) + } + Err(err) => err.to_string(), + }; + + log::error!("{}", &msg); + Ok((msg, StatusCode::OK)) +} diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 870aaa850..47155b428 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -243,3 +243,25 @@ pub async fn send_stats_request( Ok(Some(res)) } + +/// domain_name needs to be http://ip:port +pub fn ingester_meta_filename(domain_name: &str) -> String { + if domain_name.starts_with("http://") | domain_name.starts_with("https://") { + let url = Url::parse(domain_name).unwrap(); + return format!( + "ingester.{}.{}.json", + url.host_str().unwrap(), + url.port().unwrap() + ); + } + format!("ingester.{}.json", domain_name) +} + +pub fn to_url_string(str: String) -> String { + // if the str is already a url i am guessing that it will end in '/' + if str.starts_with("http://") || str.starts_with("https://") { + return str; + } + + format!("http://{}/", str) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index bba472a81..36e4bf996 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -33,6 +33,7 @@ use crate::handlers::{ }; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; +use crate::storage::ObjectStorageError; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use super::logstream::error::CreateStreamError; @@ -174,6 +175,8 @@ pub enum PostError { CustomError(String), #[error("Error: {0}")] NetworkError(#[from] reqwest::Error), + #[error("ObjectStorageError: {0}")] + ObjectStorageError(#[from] ObjectStorageError), } impl actix_web::ResponseError for PostError { @@ -190,6 +193,7 @@ impl actix_web::ResponseError for PostError { PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index fef714cba..d2ba49ad0 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -17,7 +17,7 @@ */ use crate::handlers::http::cluster::utils::check_liveness; -use crate::handlers::http::cluster::{self, get_ingester_info}; +use crate::handlers::http::cluster::{self, get_ingester_info, remove_ingester}; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -136,12 +136,14 @@ impl QueryServer { fn get_cluster_info_web_scope() -> actix_web::Scope { web::scope("/cluster") .service( + // GET "/cluster/info" ==> Get info of the cluster web::resource("/info").route( web::get() .to(cluster::get_cluster_info) .authorize(Action::ListCluster), ), ) + // GET "/cluster/metrics" ==> Get metrics of the cluster .service( web::resource("/metrics").route( web::get() @@ -149,6 +151,16 @@ impl QueryServer { .authorize(Action::ListClusterMetrics), ), ) + // DELETE "/cluster/{ingester_domain:port}" ==> Delete an ingester from the cluster + .service( + web::scope("/{ingester}").service( + web::resource("").route( + web::delete() + .to(remove_ingester) + .authorize(Action::DeleteIngester), + ), + ), + ) } /// initialize the server, run migrations as needed and start the server diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index cbffd92b5..3bbb01b23 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -46,6 +46,7 @@ pub enum Action { QueryLLM, ListCluster, ListClusterMetrics, + DeleteIngester, All, } @@ -112,6 +113,7 @@ impl RoleBuilder { | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), Action::ListCluster => Permission::Unit(action), Action::ListClusterMetrics => Permission::Unit(action), + Action::DeleteIngester => Permission::Unit(action), }; perms.push(perm); } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 0ee026789..a818f66cd 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -193,6 +193,14 @@ impl ObjectStorage for LocalFS { Ok(fs::remove_dir_all(path).await?) } + async fn delete_ingester_meta( + &self, + ingester_filename: String, + ) -> Result<(), ObjectStorageError> { + let path = self.root.join(ingester_filename); + Ok(fs::remove_file(path).await?) + } + async fn list_streams(&self) -> Result, ObjectStorageError> { let ignore_dir = &["lost+found"]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 49209dbb0..6fb7ad94b 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -82,7 +82,10 @@ pub trait ObjectStorage: Sync + 'static { async fn list_dirs(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; - + async fn delete_ingester_meta( + &self, + ingester_filename: String, + ) -> Result<(), ObjectStorageError>; /// Returns the amount of time taken by the `ObjectStore` to perform a get /// call. async fn get_latency(&self) -> Duration { diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 7b403095a..7d4f66064 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -29,7 +29,7 @@ use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; use object_store::{ClientOptions, ObjectStore}; -use relative_path::RelativePath; +use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -482,6 +482,16 @@ impl ObjectStorage for S3 { Ok(()) } + async fn delete_ingester_meta( + &self, + ingester_filename: String, + ) -> Result<(), ObjectStorageError> { + let file = RelativePathBuf::from(&ingester_filename); + self.client.delete(&to_object_store_path(&file)).await?; + + Ok(()) + } + async fn list_streams(&self) -> Result, ObjectStorageError> { let streams = self._list_streams().await?; From 564c20c2a4417d6a8e298fd9364af6870eb6b890 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 29 Mar 2024 15:14:12 +0530 Subject: [PATCH 3/3] fix: handle edge cases while parsing ingester indentifiers --- server/src/handlers/http/cluster/mod.rs | 17 ++++++++++++----- server/src/handlers/http/cluster/utils.rs | 8 +++++++- .../src/handlers/http/modal/query_server.rs | 4 ++-- server/src/storage/localfs.rs | 2 +- server/src/storage/object_storage.rs | 2 +- server/src/storage/s3.rs | 19 +++++++++++++++---- 6 files changed, 38 insertions(+), 14 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 4f14d2996..d9e30c0b9 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; +use crate::storage::ObjectStorageError; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use http::StatusCode; @@ -354,21 +355,27 @@ pub async fn remove_ingester(req: HttpRequest) -> Result { - format!("Ingester {} Removed", domain_name) + format!("Node {} Removed Successfully", domain_name) + } + Err(err) => { + if matches!(err, ObjectStorageError::IoError(_)) { + format!("Node {} Not Found", domain_name) + } else { + format!("Error Removing Node {}\n Reason: {}", domain_name, err) + } } - Err(err) => err.to_string(), }; - log::error!("{}", &msg); + log::info!("{}", &msg); Ok((msg, StatusCode::OK)) } diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 47155b428..a2887f814 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -188,7 +188,13 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { } pub async fn check_liveness(domain_name: &str) -> bool { - let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap(); + let uri = match Url::parse(&format!("{}liveness", domain_name)) { + Ok(uri) => uri, + Err(err) => { + log::error!("Node Indentifier Failed To Parse: {}", err); + return false; + } + }; let reqw = reqwest::Client::new() .get(uri) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index d2ba49ad0..825808566 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -17,7 +17,7 @@ */ use crate::handlers::http::cluster::utils::check_liveness; -use crate::handlers::http::cluster::{self, get_ingester_info, remove_ingester}; +use crate::handlers::http::cluster::{self, get_ingester_info}; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -156,7 +156,7 @@ impl QueryServer { web::scope("/{ingester}").service( web::resource("").route( web::delete() - .to(remove_ingester) + .to(cluster::remove_ingester) .authorize(Action::DeleteIngester), ), ), diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index a818f66cd..0bfd26b7d 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -193,7 +193,7 @@ impl ObjectStorage for LocalFS { Ok(fs::remove_dir_all(path).await?) } - async fn delete_ingester_meta( + async fn try_delete_ingester_meta( &self, ingester_filename: String, ) -> Result<(), ObjectStorageError> { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 6fb7ad94b..9f54c7a30 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -82,7 +82,7 @@ pub trait ObjectStorage: Sync + 'static { async fn list_dirs(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; - async fn delete_ingester_meta( + async fn try_delete_ingester_meta( &self, ingester_filename: String, ) -> Result<(), ObjectStorageError>; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 7d4f66064..3db6aaf23 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -482,14 +482,25 @@ impl ObjectStorage for S3 { Ok(()) } - async fn delete_ingester_meta( + async fn try_delete_ingester_meta( &self, ingester_filename: String, ) -> Result<(), ObjectStorageError> { let file = RelativePathBuf::from(&ingester_filename); - self.client.delete(&to_object_store_path(&file)).await?; - - Ok(()) + match self.client.delete(&to_object_store_path(&file)).await { + Ok(_) => Ok(()), + Err(err) => { + // if the object is not found, it is not an error + // the given url path was incorrect + if matches!(err, object_store::Error::NotFound { .. }) { + log::error!("Node does not exist"); + Err(err.into()) + } else { + log::error!("Error deleting ingester meta file: {:?}", err); + Err(err.into()) + } + } + } } async fn list_streams(&self) -> Result, ObjectStorageError> {