diff --git a/Cargo.lock b/Cargo.lock index 7e2a141cf..a08f65638 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2812,6 +2812,7 @@ dependencies = [ "prometheus", "rand", "relative-path", + "reqwest", "rstest", "rust-flatten-json", "rustls", @@ -4012,8 +4013,6 @@ dependencies = [ "log", "once_cell", "rustls", - "serde", - "serde_json", "url", "webpki", "webpki-roots", diff --git a/server/Cargo.toml b/server/Cargo.toml index f05393437..910716cf2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -48,6 +48,7 @@ os_info = "3.6" hostname = "0.3" rand = "0.8" relative-path = { version = "1.7", features = ["serde"] } +reqwest = { version = "0.11", default_features=false, features=["rustls", "json", "hyper-rustls", "tokio-rustls"]} rustls = "0.20" rustls-pemfile = "1.0" rust-flatten-json = "0.2" @@ -66,7 +67,6 @@ clokwerk = "0.4" actix-web-static-files = "4.0" static-files = "0.2" ulid = { version = "1.0", features = ["serde"] } -ureq = { version = "2.6", features = ["json"] } hex = "0.4" itertools = "0.10" xxhash-rust = { version = "0.8", features = ["xxh3"] } diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 5f346d780..c07887448 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -16,6 +16,7 @@ * */ +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt; @@ -77,9 +78,9 @@ impl Alert { ) } } - +#[async_trait] pub trait CallableTarget { - fn call(&self, payload: &Context); + async fn call(&self, payload: &Context); } #[derive(Debug, Clone)] diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 44819d0e8..82b68f89d 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -113,7 +113,6 @@ impl ColumnRule { }) => format!( "{} column was {} {}, {} times", column, - value, match operator { NumericOperator::EqualTo => "equal to", NumericOperator::NotEqualTo => " not equal to", @@ -122,6 +121,7 @@ impl ColumnRule { NumericOperator::LessThan => "less than", NumericOperator::LessThanEquals => "less than or equal to", }, + value, repeats ), Self::ConsecutiveString(ConsecutiveStringRule { @@ -137,13 +137,13 @@ impl ColumnRule { }) => format!( "{} column {} {}, {} times", column, - value, match operator { StringOperator::Exact => "was equal to", StringOperator::NotExact => "was not equal to", StringOperator::Contains => "contained", StringOperator::NotContains => "did not contain", }, + value, repeats ), } diff --git a/server/src/alerts/target.rs b/server/src/alerts/target.rs index 511b80446..8b7ca6fb9 100644 --- a/server/src/alerts/target.rs +++ b/server/src/alerts/target.rs @@ -17,80 +17,79 @@ */ use std::{ - ops::RangeTo, + collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; +use async_trait::async_trait; +use base64::Engine; +use chrono::Utc; +use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; +use reqwest::ClientBuilder; use serde::{Deserialize, Serialize}; use super::{AlertState, CallableTarget, Context}; -enum Retry { - Infinity, - Range(RangeTo), +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +#[serde(untagged)] +pub enum Retry { + Infinite, + Finite(usize), } #[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "lowercase")] #[serde(try_from = "TargetVerifier")] pub struct Target { #[serde(flatten)] pub target: TargetType, - #[serde(flatten)] - pub timeout: Option, + #[serde(default, rename = "repeat")] + pub timeout: Timeout, } impl Target { pub fn call(&self, context: Context) { - if let Some(ref timeout) = self.timeout { - let resolves = context.alert_state; - let mut state = timeout.state.lock().unwrap(); - - match resolves { - AlertState::SetToFiring => { - state.alert_state = AlertState::Firing; - if !state.timed_out { - // set state - state.timed_out = true; - state.awaiting_resolve = true; - drop(state); - - let retry = match self.target { - TargetType::AlertManager(_) => Retry::Infinity, - _ => Retry::Range(..10), - }; - - self.spawn_timeout_task(timeout, context.clone(), retry); - call_target(self.target.clone(), context) - } + let timeout = &self.timeout; + let resolves = context.alert_state; + let mut state = timeout.state.lock().unwrap(); + + match resolves { + AlertState::SetToFiring => { + state.alert_state = AlertState::Firing; + if !state.timed_out { + // set state + state.timed_out = true; + state.awaiting_resolve = true; + drop(state); + self.spawn_timeout_task(timeout, context.clone()); + call_target(self.target.clone(), context) } - AlertState::Resolved => { - state.alert_state = AlertState::Listening; - if state.timed_out { - // if in timeout and resolve came in, only process if it's the first one ( awaiting resolve ) - if state.awaiting_resolve { - state.awaiting_resolve = false; - } else { - // no further resolve will be considered in timeout period - return; - } + } + AlertState::Resolved => { + state.alert_state = AlertState::Listening; + if state.timed_out { + // if in timeout and resolve came in, only process if it's the first one ( awaiting resolve ) + if state.awaiting_resolve { + state.awaiting_resolve = false; + } else { + // no further resolve will be considered in timeout period + return; } - - call_target(self.target.clone(), context); } - _ => unreachable!(), + + call_target(self.target.clone(), context); } - } else { - // Without timeout there is no alert state to consider other than the one returned on `resolves` - call_target(self.target.clone(), context); + _ => unreachable!(), } } - fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context, retry: Retry) { - let state = Arc::clone(&timeout.state); - let timeout = timeout.timeout; + fn spawn_timeout_task(&self, repeat: &Timeout, alert_context: Context) { + let state = Arc::clone(&repeat.state); + let retry = repeat.times; + let timeout = repeat.interval; let target = self.target.clone(); let sleep_and_check_if_call = move |timeout_state: Arc>| { @@ -110,14 +109,14 @@ impl Target { actix_web::rt::spawn(async move { match retry { - Retry::Infinity => loop { + Retry::Infinite => loop { let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; if should_call { call_target(target.clone(), alert_context.clone()) } }, - Retry::Range(range) => { - for _ in 0..range.end { + Retry::Finite(times) => { + for _ in 0..times { let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; if should_call { call_target(target.clone(), alert_context.clone()) @@ -129,8 +128,7 @@ impl Target { state.lock().unwrap().timed_out = false; let mut context = alert_context; context.message = format!( - "Triggering alert did not resolve itself after {} retries, This alert is paused until it resolves", - range.end); + "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); // Send and exit this task. call_target(target, context); } @@ -140,42 +138,61 @@ impl Target { } fn call_target(target: TargetType, context: Context) { - actix_web::rt::spawn(async move { - target.call(&context); - }); + actix_web::rt::spawn(async move { target.call(&context).await }); +} + +#[derive(Debug, Deserialize)] +pub struct RepeatVerifier { + interval: Option, + times: Option, } #[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "lowercase")] pub struct TargetVerifier { #[serde(flatten)] pub target: TargetType, - #[serde(alias = "repeat")] - pub timeout: Option, + #[serde(default)] + pub repeat: Option, } impl TryFrom for Target { - type Error = humantime::DurationError; + type Error = String; fn try_from(value: TargetVerifier) -> Result { - let timeout = value - .timeout - .map(|ref dur| humantime::parse_duration(dur)) - .transpose()?; + let mut timeout = Timeout::default(); + + // Default is Infinite in case of alertmanager + if matches!(value.target, TargetType::AlertManager(_)) { + timeout.times = Retry::Infinite + } + + if let Some(repeat_config) = value.repeat { + let interval = repeat_config + .interval + .map(|ref interval| humantime::parse_duration(interval)) + .transpose() + .map_err(|err| err.to_string())?; + + if let Some(interval) = interval { + timeout.interval = interval + } + + if let Some(times) = repeat_config.times { + timeout.times = Retry::Finite(times) + } + } Ok(Target { target: value.target, - timeout: timeout.map(|duration| Timeout { - timeout: duration, - state: Default::default(), - }), + timeout, }) } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -#[serde(tag = "type", content = "config")] +#[serde(rename_all = "lowercase")] +#[serde(tag = "type")] #[serde(deny_unknown_fields)] pub enum TargetType { Slack(SlackWebHook), @@ -185,69 +202,80 @@ pub enum TargetType { } impl TargetType { - pub fn call(&self, payload: &Context) { + pub async fn call(&self, payload: &Context) { match self { - TargetType::Slack(target) => target.call(payload), - TargetType::Other(target) => target.call(payload), - TargetType::AlertManager(target) => target.call(payload), + TargetType::Slack(target) => target.call(payload).await, + TargetType::Other(target) => target.call(payload).await, + TargetType::AlertManager(target) => target.call(payload).await, } } } +fn default_client_builder() -> ClientBuilder { + ClientBuilder::new() +} + #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SlackWebHook { - url: String, + endpoint: String, } +#[async_trait] impl CallableTarget for SlackWebHook { - fn call(&self, payload: &Context) { + async fn call(&self, payload: &Context) { + let client = default_client_builder() + .build() + .expect("Client can be constructed on this system"); + let alert = match payload.alert_state { - AlertState::SetToFiring => ureq::json!({ "text": payload.default_alert_string() }), - AlertState::Resolved => ureq::json!({ "text": payload.default_resolved_string() }), + AlertState::SetToFiring => { + serde_json::json!({ "text": payload.default_alert_string() }) + } + AlertState::Resolved => { + serde_json::json!({ "text": payload.default_resolved_string() }) + } _ => unreachable!(), }; - if let Err(e) = ureq::post(&self.url) - .set("Content-Type", "application/json") - .send_json(alert) - { + if let Err(e) = client.post(&self.endpoint).json(&alert).send().await { log::error!("Couldn't make call to webhook, error: {}", e) } } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum OtherWebHook { - #[serde(rename_all = "camelCase")] - ApiKey { - url: String, - api_key: String, - }, - Simple { - url: String, - }, +#[serde(rename_all = "snake_case")] +pub struct OtherWebHook { + endpoint: String, + #[serde(default)] + headers: HashMap, + #[serde(default)] + skip_tls_check: bool, } +#[async_trait] impl CallableTarget for OtherWebHook { - fn call(&self, payload: &Context) { + async fn call(&self, payload: &Context) { + let mut builder = default_client_builder(); + if self.skip_tls_check { + builder = builder.danger_accept_invalid_certs(true) + } + + let client = builder + .build() + .expect("Client can be constructed on this system"); + let alert = match payload.alert_state { AlertState::SetToFiring => payload.default_alert_string(), AlertState::Resolved => payload.default_resolved_string(), _ => unreachable!(), }; - let res = match self { - OtherWebHook::Simple { url } => ureq::post(url) - .set("Content-Type", "text/plain; charset=iso-8859-1") - .send_string(&alert), - OtherWebHook::ApiKey { url, api_key } => ureq::post(url) - .set("Content-Type", "text/plain; charset=iso-8859-1") - .set("X-API-Key", api_key) - .send_string(&alert), - }; + let request = client + .post(&self.endpoint) + .headers((&self.headers).try_into().expect("valid_headers")); - if let Err(e) = res { + if let Err(e) = request.body(alert).send().await { log::error!("Couldn't make call to webhook, error: {}", e) } } @@ -255,41 +283,61 @@ impl CallableTarget for OtherWebHook { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AlertManager { - url: String, + endpoint: String, + #[serde(default)] + skip_tls_check: bool, + #[serde(flatten)] + auth: Option, } +#[async_trait] impl CallableTarget for AlertManager { - fn call(&self, payload: &Context) { - let alert = match payload.alert_state { - AlertState::SetToFiring => ureq::json!([{ - "labels": { - "status": "firing", - "alertname": payload.alert_name, - "streamname": payload.stream - }, - "annotations": { - "message": payload.message, - "reason": payload.reason - } - }]), - AlertState::Resolved => ureq::json!([{ - "labels": { - "status": "resolved", - "alertname": payload.alert_name, - "streamname": payload.stream - }, - "annotations": { - "message": payload.message, - "reason": payload.reason - } - }]), + async fn call(&self, payload: &Context) { + let mut builder = default_client_builder(); + + if self.skip_tls_check { + builder = builder.danger_accept_invalid_certs(true) + } + + if let Some(Auth { username, password }) = &self.auth { + let basic_auth_value = "Basic ".to_string() + + &base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); + let headers = HeaderMap::from_iter([( + AUTHORIZATION, + HeaderValue::try_from(basic_auth_value).expect("valid value"), + )]); + builder = builder.default_headers(headers) + } + + let client = builder + .build() + .expect("Client can be constructed on this system"); + + let mut alert = serde_json::json!([{ + "labels": { + "alertname": payload.alert_name, + "stream": payload.stream, + }, + "annotations": { + "message": payload.message, + "reason": payload.reason + } + }]); + + // fill in status label accordingly + match payload.alert_state { + AlertState::SetToFiring => alert[0]["labels"]["status"] = "firing".into(), + AlertState::Resolved => { + let alert = &mut alert[0]; + alert["labels"]["status"] = "resolved".into(); + alert["endsAt"] = Utc::now() + .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) + .into(); + } _ => unreachable!(), }; - if let Err(e) = ureq::post(&self.url) - .set("Content-Type", "application/json") - .send_json(alert) - { + if let Err(e) = client.post(&self.endpoint).json(&alert).send().await { log::error!("Couldn't make call to alertmanager, error: {}", e) } } @@ -298,15 +346,31 @@ impl CallableTarget for AlertManager { #[derive(Debug, Serialize, Deserialize)] pub struct Timeout { #[serde(with = "humantime_serde")] - #[serde(rename = "repeat")] - pub timeout: Duration, + pub interval: Duration, + pub times: Retry, #[serde(skip)] pub state: Arc>, } +impl Default for Timeout { + fn default() -> Self { + Self { + interval: Duration::from_secs(200), + times: Retry::Finite(5), + state: Arc::>::default(), + } + } +} + #[derive(Debug, Default, Clone, Copy)] pub struct TimeoutState { pub alert_state: AlertState, pub timed_out: bool, pub awaiting_resolve: bool, } + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Auth { + username: String, + password: String, +} diff --git a/server/src/banner.rs b/server/src/banner.rs index 40a275baf..3a2c4fb15 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -22,12 +22,12 @@ use crossterm::style::Stylize; use crate::utils::uid::Uid; use crate::{option::Config, storage::StorageMetadata}; -pub fn print(config: &Config, meta: StorageMetadata) { +pub async fn print(config: &Config, meta: StorageMetadata) { print_ascii_art(); let scheme = config.parseable.get_scheme(); status_info(config, &scheme, meta.deployment_id); storage_info(config); - about::print(config, meta); + about::print(config, meta).await; println!(); } @@ -149,11 +149,11 @@ pub mod about { eprint!("{}", fmt_latest_version.red()); } - pub fn print(config: &Config, meta: StorageMetadata) { + pub async fn print(config: &Config, meta: StorageMetadata) { // print current version let current = current(); let latest_release = if config.parseable.check_update { - update::get_latest(&meta).ok() + update::get_latest(&meta).await.ok() } else { None }; diff --git a/server/src/main.rs b/server/src/main.rs index 3c260a803..017e0834f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -69,7 +69,7 @@ async fn main() -> anyhow::Result<()> { CONFIG.validate_staging()?; CONFIG.validate_storage(&*storage).await; let metadata = storage::resolve_parseable_metadata().await?; - banner::print(&CONFIG, metadata); + banner::print(&CONFIG, metadata).await; let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f33381976..be1c3a728 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -181,7 +181,14 @@ pub trait ObjectStorage: Sync + 'static { async fn get_alerts(&self, stream_name: &str) -> Result { match self.get_object(&alert_json_path(stream_name)).await { - Ok(alerts) => Ok(serde_json::from_slice(&alerts).unwrap_or_default()), + Ok(alerts) => { + if let Ok(alerts) = serde_json::from_slice(&alerts) { + Ok(alerts) + } else { + log::error!("Incompatible alerts found for stream - {stream_name}. Refer https://www.parseable.io/docs/alerts for correct alert config."); + Ok(Alerts::default()) + } + } Err(e) => match e { ObjectStorageError::NoSuchKey(_) => Ok(Alerts::default()), e => Err(e), diff --git a/server/src/utils/update.rs b/server/src/utils/update.rs index 9a0c7be89..bda41c9cc 100644 --- a/server/src/utils/update.rs +++ b/server/src/utils/update.rs @@ -66,16 +66,19 @@ fn user_agent(uid: &Ulid) -> String { ) } -pub fn get_latest(meta: &StorageMetadata) -> Result { - let agent = ureq::builder() - .user_agent(user_agent(&meta.deployment_id).as_str()) +pub async fn get_latest(meta: &StorageMetadata) -> Result { + let agent = reqwest::ClientBuilder::new() + .user_agent(user_agent(&meta.deployment_id)) .timeout(Duration::from_secs(8)) - .build(); + .build() + .expect("client can be built on this system"); let json: serde_json::Value = agent .get("https://download.parseable.io/latest-version") - .call()? - .into_json()?; + .send() + .await? + .json() + .await?; let version = json["tag_name"] .as_str()