diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 489e701e8..44819d0e8 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -163,10 +163,14 @@ pub struct ConsecutiveNumericRule { impl ConsecutiveNumericRule { fn resolves(&self, event: &serde_json::Value) -> AlertState { - if self.base_rule.resolves(event) { - self.state.update_and_fetch_state() + if let Some(resolved) = self.base_rule.resolves(event) { + if resolved { + self.state.update_and_fetch_state() + } else { + self.state.fetch_state() + } } else { - self.state.fetch_state() + self.state.existing_state() } } } @@ -182,10 +186,14 @@ pub struct ConsecutiveStringRule { impl ConsecutiveStringRule { fn resolves(&self, event: &serde_json::Value) -> AlertState { - if self.base_rule.resolves(event) { - self.state.update_and_fetch_state() + if let Some(resolved) = self.base_rule.resolves(event) { + if resolved { + self.state.update_and_fetch_state() + } else { + self.state.fetch_state() + } } else { - self.state.fetch_state() + self.state.existing_state() } } } @@ -211,6 +219,15 @@ impl ConsecutiveRepeatState { self._fetch_state(false) } + fn existing_state(&self) -> AlertState { + let repeated = self.repeated.load(Ordering::Acquire); + if repeated >= self.repeats { + AlertState::Firing + } else { + AlertState::Listening + } + } + fn _fetch_state(&self, update: bool) -> AlertState { let mut repeated = self.repeated.load(Ordering::Acquire); let mut state = AlertState::Listening; @@ -290,13 +307,13 @@ pub mod base { } impl NumericRule { - pub fn resolves(&self, event: &serde_json::Value) -> bool { - let number = match event.get(&self.column).expect("column exists") { + pub fn resolves(&self, event: &serde_json::Value) -> Option { + let number = match event.get(&self.column)? { serde_json::Value::Number(number) => number, _ => unreachable!("right rule is set for right column type"), }; - match self.operator { + let res = match self.operator { NumericOperator::EqualTo => number == &self.value, NumericOperator::NotEqualTo => number != &self.value, NumericOperator::GreaterThan => { @@ -311,7 +328,9 @@ pub mod base { NumericOperator::LessThanEquals => { number.as_f64().unwrap() <= self.value.as_f64().unwrap() } - } + }; + + Some(res) } } @@ -326,13 +345,13 @@ pub mod base { } impl StringRule { - pub fn resolves(&self, event: &serde_json::Value) -> bool { - let string = match event.get(&self.column).expect("column exists") { + pub fn resolves(&self, event: &serde_json::Value) -> Option { + let string = match event.get(&self.column)? { serde_json::Value::String(s) => s, _ => unreachable!("right rule is set for right column type"), }; - if self.ignore_case.unwrap_or_default() { + let res = if self.ignore_case.unwrap_or_default() { match self.operator { StringOperator::Exact => string.eq_ignore_ascii_case(&self.value), StringOperator::NotExact => !string.eq_ignore_ascii_case(&self.value), @@ -350,7 +369,9 @@ pub mod base { StringOperator::Contains => string.contains(&self.value), StringOperator::NotContains => !string.contains(&self.value), } - } + }; + + Some(res) } } diff --git a/server/src/alerts/target.rs b/server/src/alerts/target.rs index 5373e20f2..511b80446 100644 --- a/server/src/alerts/target.rs +++ b/server/src/alerts/target.rs @@ -17,6 +17,7 @@ */ use std::{ + ops::RangeTo, sync::{Arc, Mutex}, time::Duration, }; @@ -26,6 +27,11 @@ use serde::{Deserialize, Serialize}; use super::{AlertState, CallableTarget, Context}; +enum Retry { + Infinity, + Range(RangeTo), +} + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(try_from = "TargetVerifier")] @@ -45,14 +51,18 @@ impl Target { match resolves { AlertState::SetToFiring => { state.alert_state = AlertState::Firing; - if !state.timed_out { // set state state.timed_out = true; state.awaiting_resolve = true; drop(state); - self.spawn_timeout_task(timeout, context.clone()); + let retry = match self.target { + TargetType::AlertManager(_) => Retry::Infinity, + _ => Retry::Range(..10), + }; + + self.spawn_timeout_task(timeout, context.clone(), retry); call_target(self.target.clone(), context) } } @@ -78,38 +88,53 @@ impl Target { } } - fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context) { + fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context, retry: Retry) { let state = Arc::clone(&timeout.state); let timeout = timeout.timeout; let target = self.target.clone(); - actix_web::rt::spawn(async move { - const RETRIES: usize = 10; - // sleep for timeout period - for _ in 0..RETRIES { + let sleep_and_check_if_call = move |timeout_state: Arc>| { + async move { tokio::time::sleep(timeout).await; - let mut state = state.lock().unwrap(); + let mut state = timeout_state.lock().unwrap(); if state.alert_state == AlertState::Firing { // it is still firing .. sleep more and come back state.awaiting_resolve = true; - - call_target(target.clone(), alert_context.clone()) + true } else { state.timed_out = false; - return; + false } } + }; - // fallback for if this task only observed FIRING on all RETRIES - // Stream might be dead and sending too many alerts is not great - // Send and alert stating that this alert will only work once it has seen a RESOLVE - state.lock().unwrap().timed_out = false; - let mut context = alert_context; - context.message = format!( - "Triggering alert did not resolve itself after {RETRIES} retries, This alert is paused until it resolves", - ); - // Send and exit this task. - call_target(target, context); + actix_web::rt::spawn(async move { + match retry { + Retry::Infinity => loop { + let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; + if should_call { + call_target(target.clone(), alert_context.clone()) + } + }, + Retry::Range(range) => { + for _ in 0..range.end { + let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; + if should_call { + call_target(target.clone(), alert_context.clone()) + } + } + // fallback for if this task only observed FIRING on all RETRIES + // Stream might be dead and sending too many alerts is not great + // Send and alert stating that this alert will only work once it has seen a RESOLVE + state.lock().unwrap().timed_out = false; + let mut context = alert_context; + context.message = format!( + "Triggering alert did not resolve itself after {} retries, This alert is paused until it resolves", + range.end); + // Send and exit this task. + call_target(target, context); + } + } }); } } @@ -156,6 +181,7 @@ pub enum TargetType { Slack(SlackWebHook), #[serde(rename = "webhook")] Other(OtherWebHook), + AlertManager(AlertManager), } impl TargetType { @@ -163,6 +189,7 @@ impl TargetType { match self { TargetType::Slack(target) => target.call(payload), TargetType::Other(target) => target.call(payload), + TargetType::AlertManager(target) => target.call(payload), } } } @@ -226,6 +253,48 @@ impl CallableTarget for OtherWebHook { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AlertManager { + url: String, +} + +impl CallableTarget for AlertManager { + fn call(&self, payload: &Context) { + let alert = match payload.alert_state { + AlertState::SetToFiring => ureq::json!([{ + "labels": { + "status": "firing", + "alertname": payload.alert_name, + "streamname": payload.stream + }, + "annotations": { + "message": payload.message, + "reason": payload.reason + } + }]), + AlertState::Resolved => ureq::json!([{ + "labels": { + "status": "resolved", + "alertname": payload.alert_name, + "streamname": payload.stream + }, + "annotations": { + "message": payload.message, + "reason": payload.reason + } + }]), + _ => unreachable!(), + }; + + if let Err(e) = ureq::post(&self.url) + .set("Content-Type", "application/json") + .send_json(alert) + { + log::error!("Couldn't make call to alertmanager, error: {}", e) + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct Timeout { #[serde(with = "humantime_serde")]