diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 8026626e7..959eb2ed0 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -30,7 +30,7 @@ mod kinesis; pub(crate) mod llm; pub(crate) mod logstream; pub(crate) mod middleware; -pub(crate) mod modal; +pub mod modal; pub(crate) mod oidc; mod otel; pub(crate) mod query; @@ -108,9 +108,10 @@ pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result anyhow::Result { Ok(arr) } + +pub async fn remove_ingester(req: HttpRequest) -> Result { + let domain_name: String = req.match_info().get("ingester").unwrap().parse().unwrap(); + let domain_name = to_url_string(domain_name); + + if check_liveness(&domain_name).await { + return Err(PostError::Invalid(anyhow::anyhow!("Node Online"))); + } + + let ingester_meta_filename = ingester_meta_filename(&domain_name); + let object_store = CONFIG.storage().get_object_store(); + let msg = match object_store + .try_delete_ingester_meta(ingester_meta_filename) + .await + { + Ok(_) => { + format!("Node {} Removed Successfully", domain_name) + } + Err(err) => { + if matches!(err, ObjectStorageError::IoError(_)) { + format!("Node {} Not Found", domain_name) + } else { + format!("Error Removing Node {}\n Reason: {}", domain_name, err) + } + } + }; + + log::info!("{}", &msg); + Ok((msg, StatusCode::OK)) +} diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 870aaa850..a2887f814 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -188,7 +188,13 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { } pub async fn check_liveness(domain_name: &str) -> bool { - let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap(); + let uri = match Url::parse(&format!("{}liveness", domain_name)) { + Ok(uri) => uri, + Err(err) => { + log::error!("Node Indentifier Failed To Parse: {}", err); + return false; + } + }; let reqw = reqwest::Client::new() .get(uri) @@ -243,3 +249,25 @@ pub async fn send_stats_request( Ok(Some(res)) } + +/// domain_name needs to be http://ip:port +pub fn ingester_meta_filename(domain_name: &str) -> String { + if domain_name.starts_with("http://") | domain_name.starts_with("https://") { + let url = Url::parse(domain_name).unwrap(); + return format!( + "ingester.{}.{}.json", + url.host_str().unwrap(), + url.port().unwrap() + ); + } + format!("ingester.{}.json", domain_name) +} + +pub fn to_url_string(str: String) -> String { + // if the str is already a url i am guessing that it will end in '/' + if str.starts_with("http://") || str.starts_with("https://") { + return str; + } + + format!("http://{}/", str) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index bba472a81..36e4bf996 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -33,6 +33,7 @@ use crate::handlers::{ }; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; +use crate::storage::ObjectStorageError; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use super::logstream::error::CreateStreamError; @@ -174,6 +175,8 @@ pub enum PostError { CustomError(String), #[error("Error: {0}")] NetworkError(#[from] reqwest::Error), + #[error("ObjectStorageError: {0}")] + ObjectStorageError(#[from] ObjectStorageError), } impl actix_web::ResponseError for PostError { @@ -190,6 +193,7 @@ impl actix_web::ResponseError for PostError { PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index fef714cba..825808566 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -136,12 +136,14 @@ impl QueryServer { fn get_cluster_info_web_scope() -> actix_web::Scope { web::scope("/cluster") .service( + // GET "/cluster/info" ==> Get info of the cluster web::resource("/info").route( web::get() .to(cluster::get_cluster_info) .authorize(Action::ListCluster), ), ) + // GET "/cluster/metrics" ==> Get metrics of the cluster .service( web::resource("/metrics").route( web::get() @@ -149,6 +151,16 @@ impl QueryServer { .authorize(Action::ListClusterMetrics), ), ) + // DELETE "/cluster/{ingester_domain:port}" ==> Delete an ingester from the cluster + .service( + web::scope("/{ingester}").service( + web::resource("").route( + web::delete() + .to(cluster::remove_ingester) + .authorize(Action::DeleteIngester), + ), + ), + ) } /// initialize the server, run migrations as needed and start the server diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index cbffd92b5..3bbb01b23 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -46,6 +46,7 @@ pub enum Action { QueryLLM, ListCluster, ListClusterMetrics, + DeleteIngester, All, } @@ -112,6 +113,7 @@ impl RoleBuilder { | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), Action::ListCluster => Permission::Unit(action), Action::ListClusterMetrics => Permission::Unit(action), + Action::DeleteIngester => Permission::Unit(action), }; perms.push(perm); } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 0ee026789..0bfd26b7d 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -193,6 +193,14 @@ impl ObjectStorage for LocalFS { Ok(fs::remove_dir_all(path).await?) } + async fn try_delete_ingester_meta( + &self, + ingester_filename: String, + ) -> Result<(), ObjectStorageError> { + let path = self.root.join(ingester_filename); + Ok(fs::remove_file(path).await?) + } + async fn list_streams(&self) -> Result, ObjectStorageError> { let ignore_dir = &["lost+found"]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 49209dbb0..9f54c7a30 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -82,7 +82,10 @@ pub trait ObjectStorage: Sync + 'static { async fn list_dirs(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; - + async fn try_delete_ingester_meta( + &self, + ingester_filename: String, + ) -> Result<(), ObjectStorageError>; /// Returns the amount of time taken by the `ObjectStore` to perform a get /// call. async fn get_latency(&self) -> Duration { diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 7b403095a..3db6aaf23 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -29,7 +29,7 @@ use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; use object_store::{ClientOptions, ObjectStore}; -use relative_path::RelativePath; +use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -482,6 +482,27 @@ impl ObjectStorage for S3 { Ok(()) } + async fn try_delete_ingester_meta( + &self, + ingester_filename: String, + ) -> Result<(), ObjectStorageError> { + let file = RelativePathBuf::from(&ingester_filename); + match self.client.delete(&to_object_store_path(&file)).await { + Ok(_) => Ok(()), + Err(err) => { + // if the object is not found, it is not an error + // the given url path was incorrect + if matches!(err, object_store::Error::NotFound { .. }) { + log::error!("Node does not exist"); + Err(err.into()) + } else { + log::error!("Error deleting ingester meta file: {:?}", err); + Err(err.into()) + } + } + } + } + async fn list_streams(&self) -> Result, ObjectStorageError> { let streams = self._list_streams().await?;