Skip to content

Commit e42b4f7

Browse files
add node type to cluster info and metrics
1 parent 819dd98 commit e42b4f7

File tree

4 files changed

+57
-42
lines changed

4 files changed

+57
-42
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,8 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
564564

565565
// Fetch info for both node types concurrently
566566
let (ingestor_infos, indexer_infos) = future::join(
567-
fetch_servers_info(ingestor_metadata),
568-
fetch_servers_info(indexer_metadata),
567+
fetch_nodes_info(ingestor_metadata),
568+
fetch_nodes_info(indexer_metadata),
569569
)
570570
.await;
571571

@@ -577,18 +577,18 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
577577
Ok(actix_web::HttpResponse::Ok().json(infos))
578578
}
579579

580-
/// Fetches info for a single server (ingestor or indexer)
581-
async fn fetch_server_info<T: Metadata>(server: &T) -> Result<utils::ClusterInfo, StreamError> {
580+
/// Fetches info for a single node (ingestor or indexer)
581+
async fn fetch_node_info<T: Metadata>(node: &T) -> Result<utils::ClusterInfo, StreamError> {
582582
let uri = Url::parse(&format!(
583583
"{}{}/about",
584-
server.domain_name(),
584+
node.domain_name(),
585585
base_path_without_preceding_slash()
586586
))
587587
.expect("should always be a valid url");
588588

589589
let resp = HTTP_CLIENT
590590
.get(uri)
591-
.header(header::AUTHORIZATION, server.token().to_owned())
591+
.header(header::AUTHORIZATION, node.token().to_owned())
592592
.header(header::CONTENT_TYPE, "application/json")
593593
.send()
594594
.await;
@@ -597,13 +597,13 @@ async fn fetch_server_info<T: Metadata>(server: &T) -> Result<utils::ClusterInfo
597597
let status = Some(resp.status().to_string());
598598

599599
let resp_data = resp.bytes().await.map_err(|err| {
600-
error!("Fatal: failed to parse server info to bytes: {:?}", err);
600+
error!("Fatal: failed to parse node info to bytes: {:?}", err);
601601
StreamError::Network(err)
602602
})?;
603603

604604
let sp = serde_json::from_slice::<JsonValue>(&resp_data)
605605
.map_err(|err| {
606-
error!("Fatal: failed to parse server info: {:?}", err);
606+
error!("Fatal: failed to parse node info: {:?}", err);
607607
StreamError::SerdeError(err)
608608
})?
609609
.get("staging")
@@ -627,23 +627,24 @@ async fn fetch_server_info<T: Metadata>(server: &T) -> Result<utils::ClusterInfo
627627
};
628628

629629
Ok(utils::ClusterInfo::new(
630-
server.domain_name(),
630+
node.domain_name(),
631631
reachable,
632632
staging_path,
633633
PARSEABLE.storage.get_endpoint(),
634634
error,
635635
status,
636+
node.node_type(),
636637
))
637638
}
638639

639-
/// Fetches info for multiple servers in parallel
640-
async fn fetch_servers_info<T: Metadata>(
641-
servers: Vec<T>,
640+
/// Fetches info for multiple nodes in parallel
641+
async fn fetch_nodes_info<T: Metadata>(
642+
nodes: Vec<T>,
642643
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
643-
let servers_len = servers.len();
644-
let results = stream::iter(servers)
645-
.map(|server| async move { fetch_server_info(&server).await })
646-
.buffer_unordered(servers_len) // No concurrency limit
644+
let nodes_len = nodes.len();
645+
let results = stream::iter(nodes)
646+
.map(|node| async move { fetch_node_info(&node).await })
647+
.buffer_unordered(nodes_len) // No concurrency limit
647648
.collect::<Vec<_>>()
648649
.await;
649650

@@ -749,31 +750,29 @@ pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, P
749750
Ok((msg, StatusCode::OK))
750751
}
751752

752-
/// Fetches metrics from a server (ingestor or indexer)
753-
async fn fetch_server_metrics<T>(server: &T) -> Result<Option<Metrics>, PostError>
753+
/// Fetches metrics from a node (ingestor or indexer)
754+
async fn fetch_node_metrics<T>(node: &T) -> Result<Option<Metrics>, PostError>
754755
where
755756
T: Metadata + Send + Sync + 'static,
756757
{
757758
// Format the metrics URL
758759
let uri = Url::parse(&format!(
759760
"{}{}/metrics",
760-
server.domain_name(),
761+
node.domain_name(),
761762
base_path_without_preceding_slash()
762763
))
763-
.map_err(|err| {
764-
PostError::Invalid(anyhow::anyhow!("Invalid URL in server metadata: {}", err))
765-
})?;
764+
.map_err(|err| PostError::Invalid(anyhow::anyhow!("Invalid URL in node metadata: {}", err)))?;
766765

767-
// Check if the server is live
768-
if !check_liveness(server.domain_name()).await {
769-
warn!("Server {} is not live", server.domain_name());
766+
// Check if the node is live
767+
if !check_liveness(node.domain_name()).await {
768+
warn!("node {} is not live", node.domain_name());
770769
return Ok(None);
771770
}
772771

773772
// Fetch metrics
774773
let res = HTTP_CLIENT
775774
.get(uri)
776-
.header(header::AUTHORIZATION, server.token())
775+
.header(header::AUTHORIZATION, node.token())
777776
.header(header::CONTENT_TYPE, "application/json")
778777
.send()
779778
.await;
@@ -788,43 +787,43 @@ where
788787
.map_err(|err| PostError::CustomError(err.to_string()))?
789788
.samples;
790789

791-
let metrics = Metrics::from_prometheus_samples(sample, server)
790+
let metrics = Metrics::from_prometheus_samples(sample, node)
792791
.await
793792
.map_err(|err| {
794-
error!("Fatal: failed to get server metrics: {:?}", err);
793+
error!("Fatal: failed to get node metrics: {:?}", err);
795794
PostError::Invalid(err.into())
796795
})?;
797796

798797
Ok(Some(metrics))
799798
}
800799
Err(_) => {
801800
warn!(
802-
"Failed to fetch metrics from server: {}\n",
803-
server.domain_name()
801+
"Failed to fetch metrics from node: {}\n",
802+
node.domain_name()
804803
);
805804
Ok(None)
806805
}
807806
}
808807
}
809808

810-
/// Fetches metrics from multiple servers in parallel
811-
async fn fetch_servers_metrics<T>(servers: Vec<T>) -> Result<Vec<Metrics>, PostError>
809+
/// Fetches metrics from multiple nodes in parallel
810+
async fn fetch_nodes_metrics<T>(nodes: Vec<T>) -> Result<Vec<Metrics>, PostError>
812811
where
813812
T: Metadata + Send + Sync + 'static,
814813
{
815-
let servers_len = servers.len();
816-
let results = stream::iter(servers)
817-
.map(|server| async move { fetch_server_metrics(&server).await })
818-
.buffer_unordered(servers_len) // No concurrency limit
814+
let nodes_len = nodes.len();
815+
let results = stream::iter(nodes)
816+
.map(|node| async move { fetch_node_metrics(&node).await })
817+
.buffer_unordered(nodes_len) // No concurrency limit
819818
.collect::<Vec<_>>()
820819
.await;
821820

822821
// Process results
823822
let mut metrics = Vec::new();
824823
for result in results {
825824
match result {
826-
Ok(Some(server_metrics)) => metrics.push(server_metrics),
827-
Ok(None) => {} // server was not live or metrics couldn't be fetched
825+
Ok(Some(node_metrics)) => metrics.push(node_metrics),
826+
Ok(None) => {} // node was not live or metrics couldn't be fetched
828827
Err(err) => return Err(err),
829828
}
830829
}
@@ -852,8 +851,8 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
852851

853852
// Fetch metrics from ingestors and indexers concurrently
854853
let (ingestor_metrics, indexer_metrics) = future::join(
855-
fetch_servers_metrics(ingestor_metadata),
856-
fetch_servers_metrics(indexer_metadata),
854+
fetch_nodes_metrics(ingestor_metadata),
855+
fetch_nodes_metrics(indexer_metadata),
857856
)
858857
.await;
859858

src/handlers/http/cluster/utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pub struct ClusterInfo {
5555
storage_path: String,
5656
error: Option<String>, // error message if the ingestor is not reachable
5757
status: Option<String>, // status message if the ingestor is reachable
58+
node_type: String,
5859
}
5960

6061
impl ClusterInfo {
@@ -65,6 +66,7 @@ impl ClusterInfo {
6566
storage_path: String,
6667
error: Option<String>,
6768
status: Option<String>,
69+
node_type: &str,
6870
) -> Self {
6971
Self {
7072
domain_name: domain_name.to_string(),
@@ -73,6 +75,7 @@ impl ClusterInfo {
7375
storage_path,
7476
error,
7577
status,
78+
node_type: node_type.to_string(),
7679
}
7780
}
7881
}

src/handlers/http/modal/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ impl IndexerMetadata {
552552
pub trait Metadata {
553553
fn domain_name(&self) -> &str;
554554
fn token(&self) -> &str;
555+
fn node_type(&self) -> &str;
555556
}
556557

557558
impl Metadata for IngestorMetadata {
@@ -562,6 +563,9 @@ impl Metadata for IngestorMetadata {
562563
fn token(&self) -> &str {
563564
&self.token
564565
}
566+
fn node_type(&self) -> &str {
567+
"ingestor"
568+
}
565569
}
566570

567571
impl Metadata for IndexerMetadata {
@@ -572,6 +576,9 @@ impl Metadata for IndexerMetadata {
572576
fn token(&self) -> &str {
573577
&self.token
574578
}
579+
fn node_type(&self) -> &str {
580+
"indexer"
581+
}
575582
}
576583
#[cfg(test)]
577584
mod test {

src/metrics/prom_utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use url::Url;
3737
#[derive(Debug, Serialize, Clone)]
3838
pub struct Metrics {
3939
address: String,
40+
node_type: String,
4041
parseable_events_ingested: f64, // all streams
4142
parseable_events_ingested_size: f64,
4243
parseable_lifetime_events_ingested: f64, // all streams
@@ -72,6 +73,7 @@ impl Default for Metrics {
7273
);
7374
Metrics {
7475
address,
76+
node_type: "ingestor".to_string(),
7577
parseable_events_ingested: 0.0,
7678
parseable_events_ingested_size: 0.0,
7779
parseable_staging_files: 0.0,
@@ -92,9 +94,10 @@ impl Default for Metrics {
9294
}
9395

9496
impl Metrics {
95-
fn new(address: String) -> Self {
97+
fn new(address: String, node_type: String) -> Self {
9698
Metrics {
9799
address,
100+
node_type,
98101
parseable_events_ingested: 0.0,
99102
parseable_events_ingested_size: 0.0,
100103
parseable_staging_files: 0.0,
@@ -160,7 +163,10 @@ impl Metrics {
160163
samples: Vec<PromSample>,
161164
metadata: &T,
162165
) -> Result<Self, PostError> {
163-
let mut prom_dress = Metrics::new(metadata.domain_name().to_string());
166+
let mut prom_dress = Metrics::new(
167+
metadata.domain_name().to_string(),
168+
metadata.node_type().to_string(),
169+
);
164170
for sample in samples {
165171
if let PromValue::Gauge(val) = sample.value {
166172
match sample.metric.as_str() {

0 commit comments

Comments
 (0)