@@ -20,6 +20,7 @@ pub mod utils;
2020
2121use futures:: { future, stream, StreamExt } ;
2222use std:: collections:: HashSet ;
23+ use std:: sync:: Arc ;
2324use std:: time:: Duration ;
2425
2526use actix_web:: http:: header:: { self , HeaderMap } ;
@@ -31,7 +32,7 @@ use clokwerk::{AsyncScheduler, Interval};
3132use http:: { header as http_header, StatusCode } ;
3233use itertools:: Itertools ;
3334use relative_path:: RelativePathBuf ;
34- use serde:: de:: Error ;
35+ use serde:: de:: { DeserializeOwned , Error } ;
3536use serde_json:: error:: Error as SerdeError ;
3637use serde_json:: { to_vec, Value as JsonValue } ;
3738use tracing:: { error, info, warn} ;
@@ -45,7 +46,8 @@ use crate::rbac::role::model::DefaultPrivilege;
4546use crate :: rbac:: user:: User ;
4647use crate :: stats:: Stats ;
4748use crate :: storage:: {
48- ObjectStorageError , ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY , STREAM_ROOT_DIRECTORY ,
49+ ObjectStorage , ObjectStorageError , ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY ,
50+ STREAM_ROOT_DIRECTORY ,
4951} ;
5052use crate :: HTTP_CLIENT ;
5153
@@ -642,6 +644,9 @@ async fn fetch_nodes_info<T: Metadata>(
642644 nodes : Vec < T > ,
643645) -> Result < Vec < utils:: ClusterInfo > , StreamError > {
644646 let nodes_len = nodes. len ( ) ;
647+ if nodes_len == 0 {
648+ return Ok ( vec ! [ ] ) ;
649+ }
645650 let results = stream:: iter ( nodes)
646651 . map ( |node| async move { fetch_node_info ( & node) . await } )
647652 . buffer_unordered ( nodes_len) // No concurrency limit
@@ -702,52 +707,71 @@ pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
702707 Ok ( arr)
703708}
704709
705- pub async fn remove_ingestor ( ingestor : Path < String > ) -> Result < impl Responder , PostError > {
706- let domain_name = to_url_string ( ingestor . into_inner ( ) ) ;
710+ pub async fn remove_node ( node_url : Path < String > ) -> Result < impl Responder , PostError > {
711+ let domain_name = to_url_string ( node_url . into_inner ( ) ) ;
707712
708713 if check_liveness ( & domain_name) . await {
709714 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
710- "The ingestor is currently live and cannot be removed"
715+ "The node is currently live and cannot be removed"
711716 ) ) ) ;
712717 }
713718 let object_store = PARSEABLE . storage . get_object_store ( ) ;
714719
715- let ingestor_metadatas = object_store
720+ // Delete ingestor metadata
721+ let removed_ingestor =
722+ remove_node_metadata :: < IngestorMetadata > ( & object_store, & domain_name) . await ?;
723+
724+ // Delete indexer metadata
725+ let removed_indexer =
726+ remove_node_metadata :: < IndexerMetadata > ( & object_store, & domain_name) . await ?;
727+
728+ let msg = if removed_ingestor || removed_indexer {
729+ format ! ( "node {} removed successfully" , domain_name)
730+ } else {
731+ format ! ( "node {} is not found" , domain_name)
732+ } ;
733+
734+ info ! ( "{}" , & msg) ;
735+ Ok ( ( msg, StatusCode :: OK ) )
736+ }
737+
738+ // Helper function to remove a specific type of node metadata
739+ async fn remove_node_metadata < T : Metadata + DeserializeOwned + Default > (
740+ object_store : & Arc < dyn ObjectStorage > ,
741+ domain_name : & str ,
742+ ) -> Result < bool , PostError > {
743+ let node_type = T :: default ( ) . node_type ( ) . to_string ( ) ;
744+
745+ let metadatas = object_store
716746 . get_objects (
717747 Some ( & RelativePathBuf :: from ( PARSEABLE_ROOT_DIRECTORY ) ) ,
718- Box :: new ( |file_name| file_name. starts_with ( "ingestor" ) ) ,
748+ Box :: new ( move |file_name| file_name. starts_with ( & node_type ) ) ,
719749 )
720750 . await ?;
721751
722- let ingestor_metadata = ingestor_metadatas
752+ let node_metadatas = metadatas
723753 . iter ( )
724- . map ( |elem| serde_json:: from_slice :: < IngestorMetadata > ( elem) . unwrap_or_default ( ) )
725- . collect_vec ( ) ;
754+ . filter_map ( |elem| match serde_json:: from_slice :: < T > ( elem) {
755+ Ok ( meta) if meta. domain_name ( ) == domain_name => Some ( meta) ,
756+ _ => None ,
757+ } )
758+ . collect :: < Vec < _ > > ( ) ;
726759
727- let ingestor_metadata = ingestor_metadata
728- . iter ( )
729- . filter ( |elem| elem. domain_name == domain_name)
730- . collect_vec ( ) ;
760+ if node_metadatas. is_empty ( ) {
761+ return Ok ( false ) ;
762+ }
731763
732- let ingestor_meta_filename = ingestor_metadata[ 0 ] . file_path ( ) . to_string ( ) ;
733- let msg = match object_store
734- . try_delete_ingestor_meta ( ingestor_meta_filename)
735- . await
736- {
737- Ok ( _) => {
738- format ! ( "Ingestor {} removed successfully" , domain_name)
739- }
764+ let node_meta_filename = node_metadatas[ 0 ] . file_path ( ) . to_string ( ) ;
765+ match object_store. try_delete_node_meta ( node_meta_filename) . await {
766+ Ok ( _) => Ok ( true ) ,
740767 Err ( err) => {
741768 if matches ! ( err, ObjectStorageError :: IoError ( _) ) {
742- format ! ( "Ingestor {} is not found" , domain_name )
769+ Ok ( false )
743770 } else {
744- format ! ( "Error removing ingestor {} \n Reason: {}" , domain_name , err)
771+ Err ( PostError :: ObjectStorageError ( err) )
745772 }
746773 }
747- } ;
748-
749- info ! ( "{}" , & msg) ;
750- Ok ( ( msg, StatusCode :: OK ) )
774+ }
751775}
752776
753777/// Fetches metrics from a node (ingestor or indexer)
0 commit comments