Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions server/src/alerts/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,14 @@ pub struct ConsecutiveNumericRule {

impl ConsecutiveNumericRule {
fn resolves(&self, event: &serde_json::Value) -> AlertState {
if self.base_rule.resolves(event) {
self.state.update_and_fetch_state()
if let Some(resolved) = self.base_rule.resolves(event) {
if resolved {
self.state.update_and_fetch_state()
} else {
self.state.fetch_state()
}
} else {
self.state.fetch_state()
self.state.existing_state()
}
}
}
Expand All @@ -182,10 +186,14 @@ pub struct ConsecutiveStringRule {

impl ConsecutiveStringRule {
fn resolves(&self, event: &serde_json::Value) -> AlertState {
if self.base_rule.resolves(event) {
self.state.update_and_fetch_state()
if let Some(resolved) = self.base_rule.resolves(event) {
if resolved {
self.state.update_and_fetch_state()
} else {
self.state.fetch_state()
}
} else {
self.state.fetch_state()
self.state.existing_state()
}
}
}
Expand All @@ -211,6 +219,15 @@ impl ConsecutiveRepeatState {
self._fetch_state(false)
}

fn existing_state(&self) -> AlertState {
let repeated = self.repeated.load(Ordering::Acquire);
if repeated >= self.repeats {
AlertState::Firing
} else {
AlertState::Listening
}
}

fn _fetch_state(&self, update: bool) -> AlertState {
let mut repeated = self.repeated.load(Ordering::Acquire);
let mut state = AlertState::Listening;
Expand Down Expand Up @@ -290,13 +307,13 @@ pub mod base {
}

impl NumericRule {
pub fn resolves(&self, event: &serde_json::Value) -> bool {
let number = match event.get(&self.column).expect("column exists") {
pub fn resolves(&self, event: &serde_json::Value) -> Option<bool> {
let number = match event.get(&self.column)? {
serde_json::Value::Number(number) => number,
_ => unreachable!("right rule is set for right column type"),
};

match self.operator {
let res = match self.operator {
NumericOperator::EqualTo => number == &self.value,
NumericOperator::NotEqualTo => number != &self.value,
NumericOperator::GreaterThan => {
Expand All @@ -311,7 +328,9 @@ pub mod base {
NumericOperator::LessThanEquals => {
number.as_f64().unwrap() <= self.value.as_f64().unwrap()
}
}
};

Some(res)
}
}

Expand All @@ -326,13 +345,13 @@ pub mod base {
}

impl StringRule {
pub fn resolves(&self, event: &serde_json::Value) -> bool {
let string = match event.get(&self.column).expect("column exists") {
pub fn resolves(&self, event: &serde_json::Value) -> Option<bool> {
let string = match event.get(&self.column)? {
serde_json::Value::String(s) => s,
_ => unreachable!("right rule is set for right column type"),
};

if self.ignore_case.unwrap_or_default() {
let res = if self.ignore_case.unwrap_or_default() {
match self.operator {
StringOperator::Exact => string.eq_ignore_ascii_case(&self.value),
StringOperator::NotExact => !string.eq_ignore_ascii_case(&self.value),
Expand All @@ -350,7 +369,9 @@ pub mod base {
StringOperator::Contains => string.contains(&self.value),
StringOperator::NotContains => !string.contains(&self.value),
}
}
};

Some(res)
}
}

Expand Down
111 changes: 90 additions & 21 deletions server/src/alerts/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

use std::{
ops::RangeTo,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -26,6 +27,11 @@ use serde::{Deserialize, Serialize};

use super::{AlertState, CallableTarget, Context};

enum Retry {
Infinity,
Range(RangeTo<usize>),
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(try_from = "TargetVerifier")]
Expand All @@ -45,14 +51,18 @@ impl Target {
match resolves {
AlertState::SetToFiring => {
state.alert_state = AlertState::Firing;

if !state.timed_out {
// set state
state.timed_out = true;
state.awaiting_resolve = true;
drop(state);

self.spawn_timeout_task(timeout, context.clone());
let retry = match self.target {
TargetType::AlertManager(_) => Retry::Infinity,
_ => Retry::Range(..10),
};

self.spawn_timeout_task(timeout, context.clone(), retry);
call_target(self.target.clone(), context)
}
}
Expand All @@ -78,38 +88,53 @@ impl Target {
}
}

fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context) {
fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context, retry: Retry) {
let state = Arc::clone(&timeout.state);
let timeout = timeout.timeout;
let target = self.target.clone();

actix_web::rt::spawn(async move {
const RETRIES: usize = 10;
// sleep for timeout period
for _ in 0..RETRIES {
let sleep_and_check_if_call = move |timeout_state: Arc<Mutex<TimeoutState>>| {
async move {
tokio::time::sleep(timeout).await;
let mut state = state.lock().unwrap();
let mut state = timeout_state.lock().unwrap();
if state.alert_state == AlertState::Firing {
// it is still firing .. sleep more and come back
state.awaiting_resolve = true;

call_target(target.clone(), alert_context.clone())
true
} else {
state.timed_out = false;
return;
false
}
}
};

// fallback for if this task only observed FIRING on all RETRIES
// Stream might be dead and sending too many alerts is not great
// Send and alert stating that this alert will only work once it has seen a RESOLVE
state.lock().unwrap().timed_out = false;
let mut context = alert_context;
context.message = format!(
"Triggering alert did not resolve itself after {RETRIES} retries, This alert is paused until it resolves",
);
// Send and exit this task.
call_target(target, context);
actix_web::rt::spawn(async move {
match retry {
Retry::Infinity => loop {
let should_call = sleep_and_check_if_call(Arc::clone(&state)).await;
if should_call {
call_target(target.clone(), alert_context.clone())
}
},
Retry::Range(range) => {
for _ in 0..range.end {
let should_call = sleep_and_check_if_call(Arc::clone(&state)).await;
if should_call {
call_target(target.clone(), alert_context.clone())
}
}
// fallback for if this task only observed FIRING on all RETRIES
// Stream might be dead and sending too many alerts is not great
// Send and alert stating that this alert will only work once it has seen a RESOLVE
state.lock().unwrap().timed_out = false;
let mut context = alert_context;
context.message = format!(
"Triggering alert did not resolve itself after {} retries, This alert is paused until it resolves",
range.end);
// Send and exit this task.
call_target(target, context);
}
}
});
}
}
Expand Down Expand Up @@ -156,13 +181,15 @@ pub enum TargetType {
Slack(SlackWebHook),
#[serde(rename = "webhook")]
Other(OtherWebHook),
AlertManager(AlertManager),
}

impl TargetType {
pub fn call(&self, payload: &Context) {
match self {
TargetType::Slack(target) => target.call(payload),
TargetType::Other(target) => target.call(payload),
TargetType::AlertManager(target) => target.call(payload),
}
}
}
Expand Down Expand Up @@ -226,6 +253,48 @@ impl CallableTarget for OtherWebHook {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AlertManager {
url: String,
}

impl CallableTarget for AlertManager {
fn call(&self, payload: &Context) {
let alert = match payload.alert_state {
AlertState::SetToFiring => ureq::json!([{
"labels": {
"status": "firing",
"alertname": payload.alert_name,
"streamname": payload.stream
},
"annotations": {
"message": payload.message,
"reason": payload.reason
}
}]),
AlertState::Resolved => ureq::json!([{
"labels": {
"status": "resolved",
"alertname": payload.alert_name,
"streamname": payload.stream
},
"annotations": {
"message": payload.message,
"reason": payload.reason
}
}]),
_ => unreachable!(),
};

if let Err(e) = ureq::post(&self.url)
.set("Content-Type", "application/json")
.send_json(alert)
{
log::error!("Couldn't make call to alertmanager, error: {}", e)
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Timeout {
#[serde(with = "humantime_serde")]
Expand Down