From fad876cb78d5ae5e42423a577eb2d69f0df39d0e Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 27 Mar 2024 12:37:51 +0530 Subject: [PATCH] feat: add cluster/metrics endpoint 1. add: new endpoint /cluster/metrics 2. add: prometheus metrics parser 3. chore: code cleanup --- Cargo.lock | 22 + server/Cargo.toml | 1 + server/src/handlers/http.rs | 7 +- server/src/handlers/http/cluster/mod.rs | 384 ++++++++++++ server/src/handlers/http/cluster/utils.rs | 249 ++++++++ server/src/handlers/http/ingest.rs | 3 + server/src/handlers/http/logstream.rs | 15 +- .../src/handlers/http/modal/query_server.rs | 571 +----------------- server/src/metrics/mod.rs | 1 + server/src/metrics/prom_utils.rs | 87 +++ server/src/rbac/role.rs | 2 + 11 files changed, 784 insertions(+), 558 deletions(-) create mode 100644 server/src/handlers/http/cluster/mod.rs create mode 100644 server/src/handlers/http/cluster/utils.rs create mode 100644 server/src/metrics/prom_utils.rs diff --git a/Cargo.lock b/Cargo.lock index eb2586b60..13c3a2665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2107,6 +2107,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.8" @@ -2734,6 +2743,7 @@ dependencies = [ "parquet", "path-clean", "prometheus", + "prometheus-parse", "prost", "prost-build", "rand", @@ -2982,6 +2992,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "811031bea65e5a401fb2e1f37d802cca6601e204ac463809a3189352d13b78a5" +dependencies = [ + "chrono", + "itertools 0.12.1", + "once_cell", + "regex", +] + [[package]] name = "prost" version = "0.12.3" diff --git a/server/Cargo.toml b/server/Cargo.toml index 0af2b43cb..a456f6995 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -104,6 +104,7 @@ serde_repr = "0.1.17" hashlru = { version = "0.11.0", features = ["serde"] } path-clean = "1.0.1" prost = "0.12.3" +prometheus-parse = "0.2.5" [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 7068a7b73..8026626e7 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -20,9 +20,10 @@ use actix_cors::Cors; use arrow_schema::Schema; use serde_json::Value; -use self::{modal::query_server::QueryServer, query::Query}; +use self::{cluster::get_ingester_info, query::Query}; pub(crate) mod about; +pub mod cluster; pub(crate) mod health_check; pub(crate) mod ingest; mod kinesis; @@ -62,7 +63,7 @@ pub fn base_path_without_preceding_slash() -> String { pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { let mut res = vec![]; - let ima = QueryServer::get_ingester_info().await.unwrap(); + let ima = get_ingester_info().await.unwrap(); for im in ima { let uri = format!( @@ -92,7 +93,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { // send the query request to the ingester let mut res = vec![]; - let ima = QueryServer::get_ingester_info().await.unwrap(); + let ima = get_ingester_info().await.unwrap(); for im in ima.iter() { let uri = format!( diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs new file mode 100644 index 000000000..747139233 --- /dev/null +++ b/server/src/handlers/http/cluster/mod.rs @@ -0,0 +1,384 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod utils; + +use crate::handlers::http::ingest::PostError; +use crate::handlers::http::logstream::error::StreamError; +use crate::option::CONFIG; + +use crate::metrics::prom_utils::Metrics; +use actix_web::http::header; +use actix_web::Responder; +use http::StatusCode; +use itertools::Itertools; +use relative_path::RelativePathBuf; +use serde_json::Value as JsonValue; +use url::Url; + +type IngesterMetadataArr = Vec; + +use super::base_path_without_preceding_slash; + +use super::modal::IngesterMetadata; + +// forward the request to all ingesters to keep them in sync +pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { + let ingester_infos = get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + let mut errored = false; + for ingester in ingester_infos.iter() { + let url = format!( + "{}{}/logstream/{}", + ingester.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + match send_stream_sync_request(&url, ingester.clone()).await { + Ok(_) => continue, + Err(_) => { + errored = true; + break; + } + } + } + + if errored { + for ingester in ingester_infos { + let url = format!( + "{}{}/logstream/{}", + ingester.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + // roll back the stream creation + send_stream_rollback_request(&url, ingester.clone()).await?; + } + + // this might be a bit too much + return Err(StreamError::Custom { + msg: "Failed to sync stream with ingesters".to_string(), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) +} + +/// get the cumulative stats from all ingesters +pub async fn fetch_stats_from_ingesters( + stream_name: &str, +) -> Result, StreamError> { + let mut stats = Vec::new(); + + let ingester_infos = get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + for ingester in ingester_infos { + let url = format!( + "{}{}/logstream/{}/stats", + ingester.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + match utils::send_stats_request(&url, ingester.clone()).await { + Ok(Some(res)) => { + match serde_json::from_str::(&res.text().await.unwrap()) { + Ok(stat) => stats.push(stat), + Err(err) => { + log::error!( + "Could not parse stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + continue; + } + } + } + Ok(None) => { + log::error!("Ingester at {} is not reachable", &ingester.domain_name); + continue; + } + Err(err) => { + log::error!( + "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + return Err(err); + } + } + } + + Ok(stats) +} + +async fn send_stream_sync_request( + url: &str, + ingester: IngesterMetadata, +) -> Result<(), StreamError> { + if !utils::check_liveness(&ingester.domain_name).await { + return Ok(()); + } + + let client = reqwest::Client::new(); + let res = client + .put(url) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward create stream request to ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res + ); + return Err(StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) +} + +/// send a rollback request to all ingesters +async fn send_stream_rollback_request( + url: &str, + ingester: IngesterMetadata, +) -> Result<(), StreamError> { + if !utils::check_liveness(&ingester.domain_name).await { + return Ok(()); + } + + let client = reqwest::Client::new(); + let resp = client + .delete(url) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingester.token) + .send() + .await + .map_err(|err| { + // log the error and return a custom error + log::error!( + "Fatal: failed to rollback stream creation: {}\n Error: {:?}", + ingester.domain_name, + err + ); + StreamError::Custom { + msg: format!( + "failed to rollback stream creation: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + // if the response is not successful, log the error and return a custom error + // this could be a bit too much, but we need to be sure it covers all cases + if !resp.status().is_success() { + log::error!( + "failed to rollback stream creation: {}\nResponse Returned: {:?}", + ingester.domain_name, + resp + ); + return Err(StreamError::Custom { + msg: format!( + "failed to rollback stream creation: {}\nResponse Returned: {:?}", + ingester.domain_name, + resp.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) +} + +pub async fn get_cluster_info() -> Result { + let ingester_infos = get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + let mut infos = vec![]; + + for ingester in ingester_infos { + let uri = Url::parse(&format!( + "{}{}/about", + ingester.domain_name, + base_path_without_preceding_slash() + )) + .expect("should always be a valid url"); + + let resp = reqwest::Client::new() + .get(uri) + .header(header::AUTHORIZATION, ingester.token.clone()) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + let (reachable, staging_path, error, status) = if let Ok(resp) = resp { + let status = Some(resp.status().to_string()); + + let resp_data = resp.bytes().await.map_err(|err| { + log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err); + StreamError::Custom { + msg: format!("failed to parse ingester info to bytes: {:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + let sp = serde_json::from_slice::(&resp_data) + .map_err(|err| { + log::error!("Fatal: failed to parse ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to parse ingester info: {:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })? + .get("staging") + .unwrap() + .as_str() + .unwrap() + .to_string(); + + (true, sp, None, status) + } else { + ( + false, + "".to_owned(), + resp.as_ref().err().map(|e| e.to_string()), + resp.unwrap_err().status().map(|s| s.to_string()), + ) + }; + + infos.push(utils::ClusterInfo::new( + &ingester.domain_name, + reachable, + staging_path, + CONFIG.storage().get_endpoint(), + error, + status, + )); + } + + Ok(actix_web::HttpResponse::Ok().json(infos)) +} + +pub async fn get_cluster_metrics() -> Result { + let ingester_metadata = get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + PostError::CustomError(err.to_string()) + })?; + + let mut dresses = vec![]; + + for ingester in ingester_metadata { + let uri = Url::parse(&format!( + "{}{}/metrics", + &ingester.domain_name, + base_path_without_preceding_slash() + )) + .unwrap(); + + let res = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + if let Ok(res) = res { + let text = res + .text() + .await + .map_err(|err| PostError::CustomError(err.to_string()))?; + let lines: Vec> = + text.lines().map(|line| Ok(line.to_owned())).collect_vec(); + + let sample = prometheus_parse::Scrape::parse(lines.into_iter()) + .map_err(|err| PostError::CustomError(err.to_string()))? + .samples; + + dresses.push(Metrics::from_prometheus_samples( + sample, + ingester.domain_name, + )); + } else { + log::warn!( + "Failed to fetch metrics from ingester: {}\n", + ingester.domain_name, + ); + } + } + + Ok(actix_web::HttpResponse::Ok().json(dresses)) +} + +// update the .query.json file and return the new IngesterMetadataArr +pub async fn get_ingester_info() -> anyhow::Result { + let store = CONFIG.storage().get_object_store(); + + let root_path = RelativePathBuf::from(""); + let arr = store + .get_objects(Some(&root_path)) + .await? + .iter() + // this unwrap will most definateley shoot me in the foot later + .map(|x| serde_json::from_slice::(x).unwrap_or_default()) + .collect_vec(); + + Ok(arr) +} diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs new file mode 100644 index 000000000..6380b0639 --- /dev/null +++ b/server/src/handlers/http/cluster/utils.rs @@ -0,0 +1,249 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::handlers::http::{logstream::error::StreamError, modal::IngesterMetadata}; +use actix_web::http::header; +use chrono::{DateTime, Utc}; +use http::StatusCode; +use itertools::Itertools; +use reqwest::Response; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct QueriedStats { + pub stream: String, + pub creation_time: String, + pub first_event_at: Option, + pub time: DateTime, + pub ingestion: IngestionStats, + pub storage: StorageStats, +} + +impl QueriedStats { + pub fn new( + stream: &str, + creation_time: &str, + first_event_at: Option, + time: DateTime, + ingestion: IngestionStats, + storage: StorageStats, + ) -> Self { + Self { + stream: stream.to_string(), + creation_time: creation_time.to_string(), + first_event_at, + time, + ingestion, + storage, + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct ClusterInfo { + domain_name: String, + reachable: bool, + staging_path: String, + storage_path: String, + error: Option, // error message if the ingester is not reachable + status: Option, // status message if the ingester is reachable +} + +impl ClusterInfo { + pub fn new( + domain_name: &str, + reachable: bool, + staging_path: String, + storage_path: String, + error: Option, + status: Option, + ) -> Self { + Self { + domain_name: domain_name.to_string(), + reachable, + staging_path, + storage_path, + error, + status, + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct IngestionStats { + pub count: u64, + pub size: String, + pub format: String, +} + +impl IngestionStats { + pub fn new(count: u64, size: String, format: &str) -> Self { + Self { + count, + size, + format: format.to_string(), + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StorageStats { + size: String, + format: String, +} + +impl StorageStats { + pub fn new(size: String, format: &str) -> Self { + Self { + size, + format: format.to_string(), + } + } +} + +pub fn merge_quried_stats(stats: Vec) -> QueriedStats { + // get the actual creation time + let min_creation_time = stats + .iter() + .map(|x| x.creation_time.parse::>().unwrap()) + .min() + .unwrap_or_default(); + + // get the stream name + let stream_name = stats[0].stream.clone(); + + // get the first event at + let min_first_event_at = stats + .iter() + .map(|x| match x.first_event_at.as_ref() { + Some(fea) => fea.parse::>().unwrap_or_default(), + None => Utc::now(), + }) + .min() + .unwrap_or_else(Utc::now); + + let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); + + let cumulative_ingestion = + stats + .iter() + .map(|x| &x.ingestion) + .fold(IngestionStats::default(), |acc, x| IngestionStats { + count: acc.count + x.count, + size: format!( + "{} Bytes", + acc.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + format: x.format.clone(), + }); + + let cumulative_storage = + stats + .iter() + .map(|x| &x.storage) + .fold(StorageStats::default(), |acc, x| StorageStats { + size: format!( + "{} Bytes", + acc.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + format: x.format.clone(), + }); + + QueriedStats::new( + &stream_name, + &min_creation_time.to_string(), + Some(min_first_event_at.to_string()), + min_time, + cumulative_ingestion, + cumulative_storage, + ) +} + +pub async fn check_liveness(domain_name: &str) -> bool { + let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap(); + + let reqw = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + reqw.is_ok() +} + +/// send a request to the ingester to fetch its stats +pub async fn send_stats_request( + url: &str, + ingester: IngesterMetadata, +) -> Result, StreamError> { + if !check_liveness(&ingester.domain_name).await { + return Ok(None); + } + + let client = reqwest::Client::new(); + let res = client + .get(url) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + + StreamError::Custom { + msg: format!( + "failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res + ); + return Err(StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(Some(res)) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index a5ed7978f..d9751fefd 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -170,6 +170,8 @@ pub enum PostError { Invalid(#[from] anyhow::Error), #[error("{0}")] CreateStream(#[from] CreateStreamError), + #[error("Error: {0}")] + CustomError(String), } impl actix_web::ResponseError for PostError { @@ -184,6 +186,7 @@ impl actix_web::ResponseError for PostError { } PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, + PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 327b6fd98..6c77559a8 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -33,7 +33,8 @@ use crate::{metadata, validator}; use self::error::{CreateStreamError, StreamError}; -use super::modal::query_server::{self, IngestionStats, QueriedStats, QueryServer, StorageStats}; +use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; +use super::cluster::{fetch_stats_from_ingesters, sync_streams_with_ingesters}; pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -123,7 +124,7 @@ pub async fn put_stream(req: HttpRequest) -> Result }); } if CONFIG.parseable.mode == Mode::Query { - query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?; + sync_streams_with_ingesters(&stream_name).await?; } create_stream(stream_name.clone()).await?; @@ -284,8 +285,8 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats = if CONFIG.parseable.mode == Mode::Query { - Some(query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?) + let ingester_stats = if CONFIG.parseable.mode == Mode::Query { + Some(fetch_stats_from_ingesters(&stream_name).await?) } else { None }; @@ -336,9 +337,9 @@ pub async fn get_stats(req: HttpRequest) -> Result ) } }; - let stats = if let Some(mut ingestor_stats) = ingestor_stats { - ingestor_stats.push(stats); - QueryServer::merge_quried_stats(ingestor_stats) + let stats = if let Some(mut ingester_stats) = ingester_stats { + ingester_stats.push(stats); + merge_quried_stats(ingester_stats) } else { stats }; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 6eec4078b..9efca36ed 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,35 +16,24 @@ * */ -use crate::handlers::http::logstream::error::StreamError; +use crate::handlers::http::cluster::utils::check_liveness; +use crate::handlers::http::cluster::{self, get_ingester_info}; use crate::handlers::http::middleware::RouteExt; -use crate::handlers::http::{ - base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION, -}; +use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; + use crate::rbac::role::Action; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; -use actix_web::http::header; +use actix_web::web; use actix_web::web::ServiceConfig; -use actix_web::{web, Responder}; use actix_web::{App, HttpServer}; use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use http::StatusCode; -use itertools::Itertools; -use relative_path::RelativePathBuf; -use reqwest::Response; -use serde::{Deserialize, Serialize}; -use serde_json::Value as JsonValue; use std::sync::Arc; -use url::Url; use crate::option::CONFIG; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; -use super::{IngesterMetadata, OpenIdClient, ParseableServer}; - -type IngesterMetadataArr = Vec; +use super::{OpenIdClient, ParseableServer}; #[derive(Default, Debug)] pub struct QueryServer; @@ -56,11 +45,11 @@ impl ParseableServer for QueryServer { prometheus: actix_web_prometheus::PrometheusMetrics, oidc_client: Option, ) -> anyhow::Result<()> { - let data = Self::get_ingester_info().await?; + let data = get_ingester_info().await?; // on subsequent runs, the qurier should check if the ingester is up and running or not for ingester in data.iter() { - if !Self::check_liveness(&ingester.domain_name).await { + if !check_liveness(&ingester.domain_name).await { eprintln!("Ingester at {} is not reachable", &ingester.domain_name); } else { println!("Ingester at {} is up and running", &ingester.domain_name); @@ -144,117 +133,21 @@ impl QueryServer { } fn get_cluster_info_web_scope() -> actix_web::Scope { - web::scope("/cluster").service( - web::resource("/info").route( - web::get() - .to(Self::get_cluster_info) - .authorize(Action::ListCluster), - ), - ) - } - - // update the .query.json file and return the new IngesterMetadataArr - pub async fn get_ingester_info() -> anyhow::Result { - let store = CONFIG.storage().get_object_store(); - - let root_path = RelativePathBuf::from(""); - let arr = store - .get_objects(Some(&root_path)) - .await? - .iter() - // this unwrap will most definateley shoot me in the foot later - .map(|x| serde_json::from_slice::(x).unwrap_or_default()) - .collect_vec(); - - // TODO: add validation logic here - // validate the ingester metadata - Ok(arr) - } - - pub async fn check_liveness(domain_name: &str) -> bool { - let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap(); - - let reqw = reqwest::Client::new() - .get(uri) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; - - reqw.is_ok() - } - - async fn get_cluster_info() -> Result { - let ingester_infos = Self::get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to get ingester info\n{:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - let mut infos = vec![]; - - for ingester in ingester_infos { - let uri = Url::parse(&format!( - "{}{}/about", - ingester.domain_name, - base_path_without_preceding_slash() - )) - .expect("should always be a valid url"); - - let resp = reqwest::Client::new() - .get(uri) - .header(header::AUTHORIZATION, ingester.token.clone()) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; - - let (reachable, staging_path, error, status) = if let Ok(resp) = resp { - let status = Some(resp.status().to_string()); - - let resp_data = resp.bytes().await.map_err(|err| { - log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err); - StreamError::Custom { - msg: format!("failed to parse ingester info to bytes: {:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - let sp = serde_json::from_slice::(&resp_data) - .map_err(|err| { - log::error!("Fatal: failed to parse ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to parse ingester info: {:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })? - .get("staging") - .unwrap() - .as_str() - .unwrap() - .to_string(); - - (true, sp, None, status) - } else { - ( - false, - "".to_owned(), - resp.as_ref().err().map(|e| e.to_string()), - resp.unwrap_err().status().map(|s| s.to_string()), - ) - }; - - infos.push(ClusterInfo::new( - &ingester.domain_name, - reachable, - staging_path, - CONFIG.storage().get_endpoint(), - error, - status, - )); - } - - Ok(actix_web::HttpResponse::Ok().json(infos)) + web::scope("/cluster") + .service( + web::resource("/info").route( + web::get() + .to(cluster::get_cluster_info) + .authorize(Action::ListCluster), + ), + ) + .service( + web::resource("/metrics").route( + web::get() + .to(cluster::get_cluster_metrics) + .authorize(Action::ListClusterMetrics), + ), + ) } /// initialize the server, run migrations as needed and start the server @@ -262,8 +155,6 @@ impl QueryServer { migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; - // do not commit the below line - tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; banner::print(&CONFIG, &metadata).await; @@ -300,420 +191,4 @@ impl QueryServer { Ok(()) } - - // forward the request to all ingesters to keep them in sync - pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { - let ingester_infos = Self::get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to get ingester info\n{:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - let mut errored = false; - for ingester in ingester_infos.iter() { - let url = format!( - "{}{}/logstream/{}", - ingester.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - match Self::send_stream_sync_request(&url, ingester.clone()).await { - Ok(_) => continue, - Err(_) => { - errored = true; - break; - } - } - } - - if errored { - for ingester in ingester_infos { - let url = format!( - "{}{}/logstream/{}", - ingester.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - // roll back the stream creation - Self::send_stream_rollback_request(&url, ingester.clone()).await?; - } - - // this might be a bit too much - return Err(StreamError::Custom { - msg: "Failed to sync stream with ingesters".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); - } - - Ok(()) - } - - /// get the cumulative stats from all ingesters - pub async fn fetch_stats_from_ingesters( - stream_name: &str, - ) -> Result, StreamError> { - let mut stats = Vec::new(); - - let ingester_infos = Self::get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to get ingester info\n{:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - for ingester in ingester_infos { - let url = format!( - "{}{}/logstream/{}/stats", - ingester.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - match Self::send_stats_request(&url, ingester.clone()).await { - Ok(Some(res)) => { - match serde_json::from_str::(&res.text().await.unwrap()) { - Ok(stat) => stats.push(stat), - Err(err) => { - log::error!( - "Could not parse stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - continue; - } - } - } - Ok(None) => { - log::error!("Ingester at {} is not reachable", &ingester.domain_name); - continue; - } - Err(err) => { - log::error!( - "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - return Err(err); - } - } - } - - Ok(stats) - } - - /// send a request to the ingester to fetch its stats - async fn send_stats_request( - url: &str, - ingester: IngesterMetadata, - ) -> Result, StreamError> { - if !Self::check_liveness(&ingester.domain_name).await { - return Ok(None); - } - - let client = reqwest::Client::new(); - let res = client - .get(url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingester.token) - .send() - .await - .map_err(|err| { - log::error!( - "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - - StreamError::Custom { - msg: format!( - "failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, err - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - if !res.status().is_success() { - log::error!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name, - res - ); - return Err(StreamError::Custom { - msg: format!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name,res.text().await.unwrap_or_default() - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); - } - - Ok(Some(res)) - } - - async fn send_stream_sync_request( - url: &str, - ingester: IngesterMetadata, - ) -> Result<(), StreamError> { - if !Self::check_liveness(&ingester.domain_name).await { - return Ok(()); - } - - let client = reqwest::Client::new(); - let res = client - .put(url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingester.token) - .send() - .await - .map_err(|err| { - log::error!( - "Fatal: failed to forward create stream request to ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - StreamError::Custom { - msg: format!( - "failed to forward create stream request to ingester: {}\n Error: {:?}", - ingester.domain_name, err - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - if !res.status().is_success() { - log::error!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name, - res - ); - return Err(StreamError::Custom { - msg: format!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name,res.text().await.unwrap_or_default() - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); - } - - Ok(()) - } - - /// send a rollback request to all ingesters - async fn send_stream_rollback_request( - url: &str, - ingester: IngesterMetadata, - ) -> Result<(), StreamError> { - if !Self::check_liveness(&ingester.domain_name).await { - return Ok(()); - } - - let client = reqwest::Client::new(); - let resp = client - .delete(url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingester.token) - .send() - .await - .map_err(|err| { - // log the error and return a custom error - log::error!( - "Fatal: failed to rollback stream creation: {}\n Error: {:?}", - ingester.domain_name, - err - ); - StreamError::Custom { - msg: format!( - "failed to rollback stream creation: {}\n Error: {:?}", - ingester.domain_name, err - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - } - })?; - - // if the response is not successful, log the error and return a custom error - // this could be a bit too much, but we need to be sure it covers all cases - if !resp.status().is_success() { - log::error!( - "failed to rollback stream creation: {}\nResponse Returned: {:?}", - ingester.domain_name, - resp - ); - return Err(StreamError::Custom { - msg: format!( - "failed to rollback stream creation: {}\nResponse Returned: {:?}", - ingester.domain_name, - resp.text().await.unwrap_or_default() - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); - } - - Ok(()) - } - - pub fn merge_quried_stats(stats: Vec) -> QueriedStats { - // get the actual creation time - let min_creation_time = stats - .iter() - .map(|x| x.creation_time.parse::>().unwrap()) - .min() - .unwrap_or_default(); - - // get the stream name - let stream_name = stats[0].stream.clone(); - - // get the first event at - let min_first_event_at = stats - .iter() - .map(|x| match x.first_event_at.as_ref() { - Some(fea) => fea.parse::>().unwrap_or_default(), - None => Utc::now(), - }) - .min() - .unwrap_or_else(Utc::now); - - let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); - - let cumulative_ingestion = - stats - .iter() - .map(|x| &x.ingestion) - .fold(IngestionStats::default(), |acc, x| IngestionStats { - count: acc.count + x.count, - size: format!( - "{} Bytes", - acc.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), - format: x.format.clone(), - }); - - let cumulative_storage = - stats - .iter() - .map(|x| &x.storage) - .fold(StorageStats::default(), |acc, x| StorageStats { - size: format!( - "{} Bytes", - acc.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), - format: x.format.clone(), - }); - - QueriedStats::new( - &stream_name, - &min_creation_time.to_string(), - Some(min_first_event_at.to_string()), - min_time, - cumulative_ingestion, - cumulative_storage, - ) - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct QueriedStats { - pub stream: String, - pub creation_time: String, - pub first_event_at: Option, - pub time: DateTime, - pub ingestion: IngestionStats, - pub storage: StorageStats, -} - -impl QueriedStats { - pub fn new( - stream: &str, - creation_time: &str, - first_event_at: Option, - time: DateTime, - ingestion: IngestionStats, - storage: StorageStats, - ) -> Self { - Self { - stream: stream.to_string(), - creation_time: creation_time.to_string(), - first_event_at, - time, - ingestion, - storage, - } - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct IngestionStats { - pub count: u64, - pub size: String, - pub format: String, -} - -impl IngestionStats { - pub fn new(count: u64, size: String, format: &str) -> Self { - Self { - count, - size, - format: format.to_string(), - } - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct StorageStats { - size: String, - format: String, -} - -impl StorageStats { - pub fn new(size: String, format: &str) -> Self { - Self { - size, - format: format.to_string(), - } - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -struct ClusterInfo { - domain_name: String, - reachable: bool, - staging_path: String, - storage_path: String, - error: Option, // error message if the ingester is not reachable - status: Option, // status message if the ingester is reachable -} - -impl ClusterInfo { - fn new( - domain_name: &str, - reachable: bool, - staging_path: String, - storage_path: String, - error: Option, - status: Option, - ) -> Self { - Self { - domain_name: domain_name.to_string(), - reachable, - staging_path, - storage_path, - error, - status, - } - } } diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 513bf2540..3e337123d 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -16,6 +16,7 @@ * */ +pub mod prom_utils; pub mod storage; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs new file mode 100644 index 000000000..72442a96c --- /dev/null +++ b/server/src/metrics/prom_utils.rs @@ -0,0 +1,87 @@ +use prometheus_parse::Sample as PromSample; +use prometheus_parse::Value as PromValue; +use serde::Serialize; +use serde_json::Error as JsonError; +use serde_json::Value as JsonValue; + +use crate::handlers::http::modal::server::Server; + +#[derive(Debug, Serialize, Clone)] +pub struct Metrics { + address: String, + parseable_events_ingested: f64, // all streams + parseable_staging_files: f64, + process_resident_memory_bytes: f64, + parseable_storage_size: StorageMetrics, +} + +#[derive(Debug, Serialize, Default, Clone)] +struct StorageMetrics { + staging: f64, + data: f64, +} + +impl Default for Metrics { + fn default() -> Self { + let socket = Server::get_server_address(); + let address = format!("http://{}:{}", socket.ip(), socket.port()); + Metrics { + address, + parseable_events_ingested: 0.0, + parseable_staging_files: 0.0, + process_resident_memory_bytes: 0.0, + parseable_storage_size: StorageMetrics::default(), + } + } +} + +impl Metrics { + fn new(address: String) -> Self { + Metrics { + address, + parseable_events_ingested: 0.0, + parseable_staging_files: 0.0, + process_resident_memory_bytes: 0.0, + parseable_storage_size: StorageMetrics::default(), + } + } +} + +impl Metrics { + pub fn from_prometheus_samples(samples: Vec, address: String) -> Self { + let mut prom_dress = Metrics::new(address); + + for sample in samples { + if &sample.metric == "parseable_events_ingested" { + if let PromValue::Counter(val) = sample.value { + prom_dress.parseable_events_ingested += val; + } + } else if sample.metric == "parseable_staging_files" { + if let PromValue::Gauge(val) = sample.value { + prom_dress.parseable_staging_files += val; + } + } else if sample.metric == "process_resident_memory_bytes" { + if let PromValue::Gauge(val) = sample.value { + prom_dress.process_resident_memory_bytes += val; + } + } else if sample.metric == "parseable_storage_size" { + if sample.labels.get("type").unwrap() == "data" { + if let PromValue::Gauge(val) = sample.value { + prom_dress.parseable_storage_size.data += val; + } + } else if sample.labels.get("type").unwrap() == "staging" { + if let PromValue::Gauge(val) = sample.value { + prom_dress.parseable_storage_size.staging += val; + } + } + } + } + + prom_dress + } + + #[allow(unused)] + pub fn to_json(&self) -> Result { + serde_json::to_value(self) + } +} diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index bc78b80c2..cbffd92b5 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -45,6 +45,7 @@ pub enum Action { GetAbout, QueryLLM, ListCluster, + ListClusterMetrics, All, } @@ -110,6 +111,7 @@ impl RoleBuilder { | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), Action::ListCluster => Permission::Unit(action), + Action::ListClusterMetrics => Permission::Unit(action), }; perms.push(perm); }