diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs new file mode 100644 index 000000000..26793deb5 --- /dev/null +++ b/server/src/alerts/mod.rs @@ -0,0 +1,96 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +pub mod rule; +pub mod target; + +pub use self::rule::Rule; +use self::target::Target; +use crate::event::Event; + +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Alerts { + pub alerts: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Alert { + #[serde(default = "crate::utils::uuid::gen")] + pub id: Uuid, + pub name: String, + pub message: String, + pub rule: Rule, + pub targets: Vec, +} + +impl Alert { + pub async fn check_alert(&self, event: &Event) -> Result<(), ()> { + let event_json: serde_json::Value = serde_json::from_str(&event.body).map_err(|_| ())?; + + if self.rule.resolves(&event_json) { + log::info!("Alert triggered for stream {}", self.name); + for target in self.targets.clone() { + let context = Context::new( + event.stream_name.clone(), + self.name.clone(), + self.message.clone(), + self.rule.trigger_reason(), + ); + actix_web::rt::spawn(async move { + target.call(&context); + }); + } + } + + Ok(()) + } +} + +pub trait CallableTarget { + fn call(&self, payload: &Context); +} + +pub struct Context { + stream: String, + alert_name: String, + message: String, + reason: String, +} + +impl Context { + pub fn new(stream: String, alert_name: String, message: String, reason: String) -> Self { + Self { + stream, + alert_name, + message, + reason, + } + } + + fn default_alert_string(&self) -> String { + format!( + "{} triggered on {}\nMessage: {}\nFailing Condition: {}", + self.alert_name, self.stream, self.message, self.reason + ) + } +} diff --git a/server/src/alerts.rs b/server/src/alerts/rule.rs similarity index 53% rename from server/src/alerts.rs rename to server/src/alerts/rule.rs index 96c3f1cf7..7128bcb65 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts/rule.rs @@ -16,54 +16,8 @@ * */ -use std::sync::atomic::{AtomicU32, Ordering}; - use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::event::Event; - -#[derive(Default, Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Alerts { - pub alerts: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Alert { - #[serde(default = "crate::utils::uuid::gen")] - pub id: Uuid, - pub name: String, - pub message: String, - pub rule: Rule, - pub targets: Vec, -} - -impl Alert { - // TODO: spawn async tasks to call webhooks if alert rules are met - // This is done to ensure that threads aren't blocked by calls to the webhook - pub async fn check_alert(&self, event: &Event) -> Result<(), ()> { - let event_json: serde_json::Value = serde_json::from_str(&event.body).map_err(|_| ())?; - - if self.rule.resolves(&event_json) { - log::info!("Alert triggered for stream {}", self.name); - for target in self.targets.clone() { - let context = Context::new( - event.stream_name.clone(), - self.name.clone(), - self.message.clone(), - self.rule.trigger_reason(), - ); - actix_web::rt::spawn(async move { - target.call(&context); - }); - } - } - - Ok(()) - } -} +use std::sync::atomic::{AtomicU32, Ordering}; #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] @@ -72,7 +26,7 @@ pub enum Rule { } impl Rule { - fn resolves(&self, event: &serde_json::Value) -> bool { + pub(super) fn resolves(&self, event: &serde_json::Value) -> bool { match self { Rule::Numeric(rule) => rule.resolves(event), } @@ -100,7 +54,7 @@ impl Rule { } } - pub fn trigger_reason(&self) -> String { + pub(super) fn trigger_reason(&self) -> String { match self { Rule::Numeric(NumericRule { column, @@ -138,6 +92,8 @@ impl Rule { } } +// Rules for alerts + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct NumericRule { @@ -193,6 +149,8 @@ impl NumericRule { } } +// Operator for comparing values + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum NumericOperator { @@ -215,120 +173,3 @@ impl Default for NumericOperator { Self::EqualTo } } - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -#[serde(tag = "type")] -pub enum Target { - Slack(targets::slack::SlackWebHook), - #[serde(alias = "webhook")] - Other(targets::other::OtherWebHook), -} - -impl Target { - pub fn call(&self, payload: &Context) { - match self { - Target::Slack(target) => target.call(payload), - Target::Other(target) => target.call(payload), - } - } -} - -pub trait CallableTarget { - fn call(&self, payload: &Context); -} - -pub mod targets { - pub mod slack { - use serde::{Deserialize, Serialize}; - - use crate::alerts::{CallableTarget, Context}; - - #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] - pub struct SlackWebHook { - #[serde(rename = "server_url")] - server_url: String, - } - - impl CallableTarget for SlackWebHook { - fn call(&self, payload: &Context) { - if let Err(e) = ureq::post(&self.server_url) - .set("Content-Type", "application/json") - .send_json(ureq::json!({ "text": payload.default_alert_string() })) - { - log::error!("Couldn't make call to webhook, error: {}", e) - } - } - } - } - - pub mod other { - use serde::{Deserialize, Serialize}; - - use crate::alerts::{CallableTarget, Context}; - - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] - #[serde(untagged)] - pub enum OtherWebHook { - ApiKey { - #[serde(rename = "server_url")] - server_url: String, - #[serde(rename = "api_key")] - api_key: String, - }, - Simple { - #[serde(rename = "server_url")] - server_url: String, - }, - } - - impl CallableTarget for OtherWebHook { - fn call(&self, payload: &Context) { - let res = match self { - OtherWebHook::Simple { server_url } => ureq::post(server_url) - .set("Content-Type", "text/plain; charset=iso-8859-1") - .send_string(&payload.default_alert_string()), - OtherWebHook::ApiKey { - server_url, - api_key, - } => ureq::post(server_url) - .set("Content-Type", "text/plain; charset=iso-8859-1") - .set("X-API-Key", api_key) - .send_string(&payload.default_alert_string()), - }; - - if let Err(e) = res { - log::error!("Couldn't make call to webhook, error: {}", e) - } - } - } - } -} - -pub struct Context { - stream: String, - alert_name: String, - message: String, - reason: String, -} - -impl Context { - pub fn new(stream: String, alert_name: String, message: String, reason: String) -> Self { - Self { - stream, - alert_name, - message, - reason, - } - } - - // Triggered on - // Message: Ting - // Failing Condition: Status column was equal to 500, 5 times - fn default_alert_string(&self) -> String { - format!( - "{} triggered on {}\nMessage: {}\nFailing Condition: {}", - self.alert_name, self.stream, self.message, self.reason - ) - } -} diff --git a/server/src/alerts/target.rs b/server/src/alerts/target.rs new file mode 100644 index 000000000..ff23dd9bb --- /dev/null +++ b/server/src/alerts/target.rs @@ -0,0 +1,92 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use serde::{Deserialize, Serialize}; + +use super::{CallableTarget, Context}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub enum Target { + Slack(SlackWebHook), + #[serde(alias = "webhook")] + Other(OtherWebHook), +} + +impl Target { + pub fn call(&self, payload: &Context) { + match self { + Target::Slack(target) => target.call(payload), + Target::Other(target) => target.call(payload), + } + } +} + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SlackWebHook { + #[serde(rename = "server_url")] + server_url: String, +} + +impl CallableTarget for SlackWebHook { + fn call(&self, payload: &Context) { + if let Err(e) = ureq::post(&self.server_url) + .set("Content-Type", "application/json") + .send_json(ureq::json!({ "text": payload.default_alert_string() })) + { + log::error!("Couldn't make call to webhook, error: {}", e) + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum OtherWebHook { + ApiKey { + #[serde(rename = "server_url")] + server_url: String, + #[serde(rename = "api_key")] + api_key: String, + }, + Simple { + #[serde(rename = "server_url")] + server_url: String, + }, +} + +impl CallableTarget for OtherWebHook { + fn call(&self, payload: &Context) { + let res = match self { + OtherWebHook::Simple { server_url } => ureq::post(server_url) + .set("Content-Type", "text/plain; charset=iso-8859-1") + .send_string(&payload.default_alert_string()), + OtherWebHook::ApiKey { + server_url, + api_key, + } => ureq::post(server_url) + .set("Content-Type", "text/plain; charset=iso-8859-1") + .set("X-API-Key", api_key) + .send_string(&payload.default_alert_string()), + }; + + if let Err(e) = res { + log::error!("Couldn't make call to webhook, error: {}", e) + } + } +}