Skip to content

Commit 187854e

Browse files
committed
add delete ingester endpoint
1. Add delete_ingester_meta func in ObjectStorage Trait 2. Update Permission Actions 3. Update PostError 4. Add delete Ingeter Endpoint -> `api/v1/cluster/ingester_ip%3Aingester_port`
1 parent cb0e27a commit 187854e

File tree

8 files changed

+92
-4
lines changed

8 files changed

+92
-4
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818

1919
pub mod utils;
2020

21+
use crate::handlers::http::cluster::utils::{
22+
check_liveness, ingester_meta_filename, to_url_string,
23+
};
2124
use crate::handlers::http::ingest::PostError;
2225
use crate::handlers::http::logstream::error::StreamError;
2326
use crate::option::CONFIG;
2427

2528
use crate::metrics::prom_utils::Metrics;
2629
use actix_web::http::header;
27-
use actix_web::Responder;
30+
use actix_web::{HttpRequest, Responder};
2831
use http::StatusCode;
2932
use itertools::Itertools;
3033
use relative_path::RelativePathBuf;
@@ -345,3 +348,27 @@ pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
345348

346349
Ok(arr)
347350
}
351+
352+
pub async fn remove_ingester(req: HttpRequest) -> Result<impl Responder, PostError> {
353+
let domain_name: String = req.match_info().get("ingester").unwrap().parse().unwrap();
354+
let domain_name = to_url_string(domain_name);
355+
356+
if check_liveness(&domain_name).await {
357+
return Err(PostError::Invalid(anyhow::anyhow!("Ingester is Online")));
358+
}
359+
360+
let ingester_meta_filename = ingester_meta_filename(&domain_name);
361+
let object_store = CONFIG.storage().get_object_store();
362+
let msg = match object_store
363+
.delete_ingester_meta(ingester_meta_filename)
364+
.await
365+
{
366+
Ok(_) => {
367+
format!("Ingester {} Removed", domain_name)
368+
}
369+
Err(err) => err.to_string(),
370+
};
371+
372+
log::error!("{}", &msg);
373+
Ok((msg, StatusCode::OK))
374+
}

server/src/handlers/http/cluster/utils.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,25 @@ pub async fn send_stats_request(
243243

244244
Ok(Some(res))
245245
}
246+
247+
/// domain_name needs to be http://ip:port
248+
pub fn ingester_meta_filename(domain_name: &str) -> String {
249+
if domain_name.starts_with("http://") | domain_name.starts_with("https://") {
250+
let url = Url::parse(domain_name).unwrap();
251+
return format!(
252+
"ingester.{}.{}.json",
253+
url.host_str().unwrap(),
254+
url.port().unwrap()
255+
);
256+
}
257+
format!("ingester.{}.json", domain_name)
258+
}
259+
260+
pub fn to_url_string(str: String) -> String {
261+
// if the str is already a url i am guessing that it will end in '/'
262+
if str.starts_with("http://") || str.starts_with("https://") {
263+
return str;
264+
}
265+
266+
format!("http://{}/", str)
267+
}

server/src/handlers/http/ingest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::handlers::{
3333
};
3434
use crate::metadata::STREAM_INFO;
3535
use crate::option::{Mode, CONFIG};
36+
use crate::storage::ObjectStorageError;
3637
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3738

3839
use super::logstream::error::CreateStreamError;
@@ -174,6 +175,8 @@ pub enum PostError {
174175
CustomError(String),
175176
#[error("Error: {0}")]
176177
NetworkError(#[from] reqwest::Error),
178+
#[error("ObjectStorageError: {0}")]
179+
ObjectStorageError(#[from] ObjectStorageError),
177180
}
178181

179182
impl actix_web::ResponseError for PostError {
@@ -190,6 +193,7 @@ impl actix_web::ResponseError for PostError {
190193
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
191194
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
192195
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
196+
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
193197
}
194198
}
195199

server/src/handlers/http/modal/query_server.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use crate::handlers::http::cluster::utils::check_liveness;
20-
use crate::handlers::http::cluster::{self, get_ingester_info};
20+
use crate::handlers::http::cluster::{self, get_ingester_info, remove_ingester};
2121
use crate::handlers::http::middleware::RouteExt;
2222
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
2323

@@ -136,19 +136,31 @@ impl QueryServer {
136136
fn get_cluster_info_web_scope() -> actix_web::Scope {
137137
web::scope("/cluster")
138138
.service(
139+
// GET "/cluster/info" ==> Get info of the cluster
139140
web::resource("/info").route(
140141
web::get()
141142
.to(cluster::get_cluster_info)
142143
.authorize(Action::ListCluster),
143144
),
144145
)
146+
// GET "/cluster/metrics" ==> Get metrics of the cluster
145147
.service(
146148
web::resource("/metrics").route(
147149
web::get()
148150
.to(cluster::get_cluster_metrics)
149151
.authorize(Action::ListClusterMetrics),
150152
),
151153
)
154+
// DELETE "/cluster/{ingester_domain:port}" ==> Delete an ingester from the cluster
155+
.service(
156+
web::scope("/{ingester}").service(
157+
web::resource("").route(
158+
web::delete()
159+
.to(remove_ingester)
160+
.authorize(Action::DeleteIngester),
161+
),
162+
),
163+
)
152164
}
153165

154166
/// initialize the server, run migrations as needed and start the server

server/src/rbac/role.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub enum Action {
4646
QueryLLM,
4747
ListCluster,
4848
ListClusterMetrics,
49+
DeleteIngester,
4950
All,
5051
}
5152

@@ -112,6 +113,7 @@ impl RoleBuilder {
112113
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
113114
Action::ListCluster => Permission::Unit(action),
114115
Action::ListClusterMetrics => Permission::Unit(action),
116+
Action::DeleteIngester => Permission::Unit(action),
115117
};
116118
perms.push(perm);
117119
}

server/src/storage/localfs.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,14 @@ impl ObjectStorage for LocalFS {
193193
Ok(fs::remove_dir_all(path).await?)
194194
}
195195

196+
async fn delete_ingester_meta(
197+
&self,
198+
ingester_filename: String,
199+
) -> Result<(), ObjectStorageError> {
200+
let path = self.root.join(ingester_filename);
201+
Ok(fs::remove_file(path).await?)
202+
}
203+
196204
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
197205
let ignore_dir = &["lost+found"];
198206
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);

server/src/storage/object_storage.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ pub trait ObjectStorage: Sync + 'static {
8282
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
8383
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
8484
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
85-
85+
async fn delete_ingester_meta(
86+
&self,
87+
ingester_filename: String,
88+
) -> Result<(), ObjectStorageError>;
8689
/// Returns the amount of time taken by the `ObjectStore` to perform a get
8790
/// call.
8891
async fn get_latency(&self) -> Duration {

server/src/storage/s3.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
2929
use object_store::limit::LimitStore;
3030
use object_store::path::Path as StorePath;
3131
use object_store::{ClientOptions, ObjectStore};
32-
use relative_path::RelativePath;
32+
use relative_path::{RelativePath, RelativePathBuf};
3333
use tokio::fs::OpenOptions;
3434
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3535

@@ -482,6 +482,16 @@ impl ObjectStorage for S3 {
482482
Ok(())
483483
}
484484

485+
async fn delete_ingester_meta(
486+
&self,
487+
ingester_filename: String,
488+
) -> Result<(), ObjectStorageError> {
489+
let file = RelativePathBuf::from(&ingester_filename);
490+
self.client.delete(&to_object_store_path(&file)).await?;
491+
492+
Ok(())
493+
}
494+
485495
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
486496
let streams = self._list_streams().await?;
487497

0 commit comments

Comments
 (0)