diff --git a/server/Cargo.toml b/server/Cargo.toml
index d22af6427..8da2242e5 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -49,6 +49,7 @@ clokwerk = "0.4.0-rc1"
actix-web-static-files = "4.0"
static-files = "0.2.1"
walkdir = "2"
+ureq = "2.5.0"
[build-dependencies]
static-files = "0.2.1"
diff --git a/server/src/alerts.rs b/server/src/alerts.rs
new file mode 100644
index 000000000..13ae789a9
--- /dev/null
+++ b/server/src/alerts.rs
@@ -0,0 +1,134 @@
+/*
+ * Parseable Server (C) 2022 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use log::{error, info};
+use serde::{Deserialize, Serialize};
+
+use crate::error::Error;
+
+#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Alerts {
+ pub alerts: Vec,
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Alert {
+ pub name: String,
+ pub message: String,
+ pub rule: Rule,
+ pub targets: Vec,
+}
+
+impl Alert {
+ // TODO: spawn async tasks to call webhooks if alert rules are met
+ // This is done to ensure that threads aren't blocked by calls to the webhook
+ pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> {
+ if self.rule.resolves(event).await {
+ info!("Alert triggered; name: {}", self.name);
+ for target in self.targets.clone() {
+ let msg = self.message.clone();
+ actix_web::rt::spawn(async move {
+ target.call(&msg);
+ });
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Rule {
+ pub field: String,
+ /// Field that determines what comparison operator is to be used
+ #[serde(default)]
+ pub operator: Operator,
+ pub value: serde_json::Value,
+ pub repeats: u32,
+ #[serde(skip)]
+ repeated: u32,
+ pub within: String,
+}
+
+impl Rule {
+ // TODO: utilise `within` to set a range for validity of rule to trigger alert
+ pub async fn resolves(&mut self, event: &serde_json::Value) -> bool {
+ let comparison = match self.operator {
+ Operator::EqualTo => event.get(&self.field).unwrap() == &self.value,
+ // TODO: currently this is a hack, ensure checks are performed in the right way
+ Operator::GreaterThan => {
+ event.get(&self.field).unwrap().as_f64().unwrap() > (self.value).as_f64().unwrap()
+ }
+ Operator::LessThan => {
+ event.get(&self.field).unwrap().as_f64().unwrap() < (self.value).as_f64().unwrap()
+ }
+ };
+
+ // If truthy, increment count of repeated
+ if comparison {
+ self.repeated += 1;
+ }
+
+ // If enough repetitions made, return true
+ if self.repeated >= self.repeats {
+ self.repeated = 0;
+ return true;
+ }
+
+ false
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum Operator {
+ EqualTo,
+ GreaterThan,
+ LessThan,
+}
+
+impl Default for Operator {
+ fn default() -> Self {
+ Self::EqualTo
+ }
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Target {
+ pub name: String,
+ #[serde(rename = "server_url")]
+ pub server_url: String,
+ #[serde(rename = "api_key")]
+ pub api_key: String,
+}
+
+impl Target {
+ pub fn call(&self, msg: &str) {
+ if let Err(e) = ureq::post(&self.server_url)
+ .set("Content-Type", "text/plain; charset=iso-8859-1")
+ .set("X-API-Key", &self.api_key)
+ .send_string(msg)
+ {
+ error!("Couldn't make call to webhook, error: {}", e)
+ }
+ }
+}
diff --git a/server/src/event.rs b/server/src/event.rs
index cb36fa893..0b323c4f8 100644
--- a/server/src/event.rs
+++ b/server/src/event.rs
@@ -35,6 +35,7 @@ use crate::response;
use crate::storage::ObjectStorage;
use crate::Error;
+#[derive(Clone)]
pub struct Event {
pub body: String,
pub stream_name: String,
@@ -91,6 +92,10 @@ impl Event {
error!("Couldn't update stream stats. {:?}", e);
}
+ if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
+ error!("Error checking for alerts. {:?}", e);
+ }
+
let msg = if is_first_event {
format!(
"Intial Event recieved for log stream {}, schema uploaded successfully",
diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs
index d77e74953..9a11afcc4 100644
--- a/server/src/handlers/logstream.rs
+++ b/server/src/handlers/logstream.rs
@@ -19,11 +19,11 @@
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, HttpResponse, Responder};
-use crate::metadata;
+use crate::alerts::Alerts;
use crate::response;
use crate::s3::S3;
use crate::storage::ObjectStorage;
-use crate::validator;
+use crate::{metadata, validator};
pub async fn delete(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -116,26 +116,23 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
pub async fn get_alert(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
- match metadata::STREAM_INFO.alert(&stream_name) {
- Ok(alert) => response::ServerResponse {
- msg: alert,
+ match metadata::STREAM_INFO.alert(stream_name.clone()) {
+ Ok(alerts) => response::ServerResponse {
+ msg: serde_json::to_string(&alerts).unwrap(),
code: StatusCode::OK,
}
.to_http(),
- Err(_) => match S3::new().get_alert(&stream_name).await {
- Ok(alert) if alert.is_empty() => response::ServerResponse {
+ Err(_) => match S3::new().get_alerts(&stream_name).await {
+ Ok(alerts) if alerts.alerts.is_empty() => response::ServerResponse {
msg: "alert configuration not set for log stream {}".to_string(),
code: StatusCode::BAD_REQUEST,
}
.to_http(),
- Ok(alert) => {
- let buf = alert.as_ref();
- response::ServerResponse {
- msg: String::from_utf8(buf.to_vec()).unwrap(),
- code: StatusCode::OK,
- }
- .to_http()
+ Ok(alerts) => response::ServerResponse {
+ msg: serde_json::to_string(&alerts).unwrap(),
+ code: StatusCode::OK,
}
+ .to_http(),
Err(_) => response::ServerResponse {
msg: "alert doesn't exist".to_string(),
code: StatusCode::BAD_REQUEST,
@@ -164,7 +161,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
if let Err(e) = metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
"".to_string(),
- "".to_string(),
+ Default::default(),
) {
return response::ServerResponse {
msg: format!(
@@ -208,47 +205,56 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
pub async fn put_alert(req: HttpRequest, body: web::Json) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
- let alert_config = body.clone();
- match validator::alert(serde_json::to_string(&body.as_object()).unwrap()) {
- Ok(_) => match S3::new()
- .create_alert(&stream_name, alert_config.to_string())
- .await
- {
- Ok(_) => {
- if let Err(e) =
- metadata::STREAM_INFO.set_alert(stream_name.clone(), alert_config.to_string())
- {
- return response::ServerResponse {
- msg: format!(
- "failed to set alert configuration for log stream {} due to err: {}",
- stream_name, e
- ),
- code: StatusCode::INTERNAL_SERVER_ERROR,
- }
- .to_http();
- }
- response::ServerResponse {
- msg: format!("set alert configuration for log stream {}", stream_name),
- code: StatusCode::OK,
- }
- .to_http()
- }
- Err(e) => response::ServerResponse {
+ let alerts: Alerts = match serde_json::from_value(body.into_inner()) {
+ Ok(alerts) => alerts,
+ Err(e) => {
+ return response::ServerResponse {
msg: format!(
"failed to set alert configuration for log stream {} due to err: {}",
stream_name, e
),
- code: StatusCode::INTERNAL_SERVER_ERROR,
+ code: StatusCode::BAD_REQUEST,
}
- .to_http(),
- },
- Err(e) => response::ServerResponse {
+ .to_http()
+ }
+ };
+
+ if let Err(e) = validator::alert(serde_json::to_string(&alerts).unwrap()) {
+ return response::ServerResponse {
msg: format!(
"failed to set alert configuration for log stream {} due to err: {}",
stream_name, e
),
code: StatusCode::BAD_REQUEST,
}
- .to_http(),
+ .to_http();
+ }
+
+ if let Err(e) = S3::new().put_alerts(&stream_name, alerts.clone()).await {
+ return response::ServerResponse {
+ msg: format!(
+ "failed to set alert configuration for log stream {} due to err: {}",
+ stream_name, e
+ ),
+ code: StatusCode::INTERNAL_SERVER_ERROR,
+ }
+ .to_http();
+ }
+
+ if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) {
+ return response::ServerResponse {
+ msg: format!(
+ "failed to set alert configuration for log stream {} due to err: {}",
+ stream_name, e
+ ),
+ code: StatusCode::INTERNAL_SERVER_ERROR,
+ }
+ .to_http();
+ }
+
+ response::ServerResponse {
+ msg: format!("set alert configuration for log stream {}", stream_name),
+ code: StatusCode::OK,
}
+ .to_http()
}
diff --git a/server/src/main.rs b/server/src/main.rs
index e1a8d347e..573a34f60 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -34,6 +34,7 @@ use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
+mod alerts;
mod banner;
mod error;
mod event;
diff --git a/server/src/metadata.rs b/server/src/metadata.rs
index b35f07804..5d8fe8223 100644
--- a/server/src/metadata.rs
+++ b/server/src/metadata.rs
@@ -18,17 +18,20 @@
use bytes::Bytes;
use lazy_static::lazy_static;
+use log::error;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::RwLock;
+use crate::alerts::Alerts;
use crate::error::Error;
+use crate::event::Event;
use crate::storage::ObjectStorage;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct LogStreamMetadata {
pub schema: String,
- pub alert_config: String,
+ pub alerts: Alerts,
pub stats: Stats,
}
@@ -65,9 +68,26 @@ lazy_static! {
// 5. When set alert API is called (update the alert)
#[allow(clippy::all)]
impl STREAM_INFO {
+ pub async fn check_alerts(&self, event: &Event) -> Result<(), Error> {
+ let mut map = self.write().unwrap();
+ let meta = map
+ .get_mut(&event.stream_name)
+ .ok_or(Error::StreamMetaNotFound(event.stream_name.to_owned()))?;
+
+ let event: serde_json::Value = serde_json::from_str(&event.body)?;
+
+ for alert in meta.alerts.alerts.iter_mut() {
+ if let Err(e) = alert.check_alert(&event).await {
+ error!("Error while parsing event against alerts: {}", e);
+ }
+ }
+
+ Ok(())
+ }
+
pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> {
- let alert_config = self.alert(&stream_name)?;
- self.add_stream(stream_name, schema, alert_config)
+ let alerts = self.alert(stream_name.clone())?;
+ self.add_stream(stream_name, schema, alerts)
}
pub fn schema(&self, stream_name: &str) -> Result {
@@ -79,30 +99,30 @@ impl STREAM_INFO {
Ok(meta.schema.clone())
}
- pub fn set_alert(&self, stream_name: String, alert_config: String) -> Result<(), Error> {
+ pub fn set_alert(&self, stream_name: String, alerts: Alerts) -> Result<(), Error> {
let schema = self.schema(&stream_name)?;
- self.add_stream(stream_name, schema, alert_config)
+ self.add_stream(stream_name, schema, alerts)
}
- pub fn alert(&self, stream_name: &str) -> Result {
+ pub fn alert(&self, stream_name: String) -> Result {
let map = self.read().unwrap();
let meta = map
- .get(stream_name)
+ .get(&stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?;
- Ok(meta.alert_config.clone())
+ Ok(meta.alerts.clone())
}
pub fn add_stream(
&self,
stream_name: String,
schema: String,
- alert_config: String,
+ alerts: Alerts,
) -> Result<(), Error> {
let mut map = self.write().unwrap();
let metadata = LogStreamMetadata {
schema,
- alert_config,
+ alerts,
..Default::default()
};
// TODO: Add check to confirm data insertion
@@ -125,23 +145,19 @@ impl STREAM_INFO {
// to load the stream metadata based on whatever is available.
//
// TODO: ignore failure(s) if any and skip to next stream
- let alert_config = storage
- .get_alert(&stream.name)
+ let alerts = storage
+ .get_alerts(&stream.name)
.await
- .map_err(|e| e.into())
- .and_then(parse_string)
- .map_err(|_| Error::AlertNotInStore(stream.name.to_owned()));
-
+ .map_err(|_| Error::AlertNotInStore(stream.name.to_owned()))?;
let schema = storage
.get_schema(&stream.name)
.await
.map_err(|e| e.into())
.and_then(parse_string)
- .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()));
-
+ .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?;
let metadata = LogStreamMetadata {
- schema: schema.unwrap_or_default(),
- alert_config: alert_config.unwrap_or_default(),
+ schema,
+ alerts,
..Default::default()
};
@@ -222,24 +238,21 @@ mod tests {
}
#[rstest]
- #[case::stream_schema_alert("teststream", "schema", "alert_config")]
- #[case::stream_only("teststream", "", "")]
+ #[case::stream_schema_alert("teststream", "schema")]
+ #[case::stream_only("teststream", "")]
#[serial]
- fn test_add_stream(
- #[case] stream_name: String,
- #[case] schema: String,
- #[case] alert_config: String,
- ) {
+ fn test_add_stream(#[case] stream_name: String, #[case] schema: String) {
+ let alerts = Alerts { alerts: vec![] };
clear_map();
STREAM_INFO
- .add_stream(stream_name.clone(), schema.clone(), alert_config.clone())
+ .add_stream(stream_name.clone(), schema.clone(), alerts.clone())
.unwrap();
let left = STREAM_INFO.read().unwrap().clone();
let right = hashmap! {
stream_name => LogStreamMetadata {
- schema: schema,
- alert_config: alert_config,
+ schema,
+ alerts,
..Default::default()
}
};
@@ -252,7 +265,11 @@ mod tests {
fn test_delete_stream(#[case] stream_name: String) {
clear_map();
STREAM_INFO
- .add_stream(stream_name.clone(), "".to_string(), "".to_string())
+ .add_stream(
+ stream_name.clone(),
+ "".to_string(),
+ Alerts { alerts: vec![] },
+ )
.unwrap();
STREAM_INFO.delete_stream(&stream_name).unwrap();
diff --git a/server/src/s3.rs b/server/src/s3.rs
index 47da6724b..103a165b1 100644
--- a/server/src/s3.rs
+++ b/server/src/s3.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
use structopt::StructOpt;
use tokio_stream::StreamExt;
+use crate::alerts::Alerts;
use crate::metadata::Stats;
use crate::option::{StorageOpt, CONFIG};
use crate::query::Query;
@@ -199,13 +200,13 @@ impl S3 {
Ok(())
}
- async fn _create_alert(&self, stream_name: &str, body: String) -> Result<(), AwsSdkError> {
+ async fn _put_alerts(&self, stream_name: &str, body: Vec) -> Result<(), AwsSdkError> {
let _resp = self
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.alert.json", stream_name))
- .body(body.into_bytes().into())
+ .body(body.into())
.send()
.await?;
@@ -328,12 +329,13 @@ impl ObjectStorage for S3 {
Ok(())
}
- async fn create_alert(
+ async fn put_alerts(
&self,
stream_name: &str,
- body: String,
+ alerts: Alerts,
) -> Result<(), ObjectStorageError> {
- self._create_alert(stream_name, body).await?;
+ let body = serde_json::to_vec(&alerts)?;
+ self._put_alerts(stream_name, body).await?;
Ok(())
}
@@ -344,10 +346,11 @@ impl ObjectStorage for S3 {
Ok(body_bytes)
}
- async fn get_alert(&self, stream_name: &str) -> Result {
+ async fn get_alerts(&self, stream_name: &str) -> Result {
let body_bytes = self._alert_exists(stream_name).await?;
+ let alerts = serde_json::from_slice(&body_bytes)?;
- Ok(body_bytes)
+ Ok(alerts)
}
async fn get_stats(&self, stream_name: &str) -> Result {
diff --git a/server/src/storage.rs b/server/src/storage.rs
index 7dc8c0183..01a579bc8 100644
--- a/server/src/storage.rs
+++ b/server/src/storage.rs
@@ -16,6 +16,7 @@
*
*/
+use crate::alerts::Alerts;
use crate::metadata::Stats;
use crate::option::CONFIG;
use crate::query::Query;
@@ -51,10 +52,11 @@ pub trait ObjectStorage: Sync + 'static {
-> Result<(), ObjectStorageError>;
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
- async fn create_alert(&self, stream_name: &str, body: String)
+
+ async fn put_alerts(&self, stream_name: &str, alerts: Alerts)
-> Result<(), ObjectStorageError>;
async fn get_schema(&self, stream_name: &str) -> Result;
- async fn get_alert(&self, stream_name: &str) -> Result;
+ async fn get_alerts(&self, stream_name: &str) -> Result;
async fn get_stats(&self, stream_name: &str) -> Result;
async fn list_streams(&self) -> Result, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError>;
diff --git a/server/src/validator.rs b/server/src/validator.rs
index 440886c04..800c5606e 100644
--- a/server/src/validator.rs
+++ b/server/src/validator.rs
@@ -17,9 +17,9 @@
*/
use chrono::{DateTime, Utc};
-use serde_derive::Deserialize;
-use serde_derive::Serialize;
+use serde_json::json;
+use crate::alerts::Alerts;
use crate::query::Query;
use crate::Error;
@@ -28,40 +28,6 @@ const DENIED_NAMES: &[&str] = &[
"select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and",
];
-#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct Alerts {
- pub alerts: Vec,
-}
-
-#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct Alert {
- pub name: String,
- pub message: String,
- pub rule: Rule,
- pub target: Vec,
-}
-
-#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct Rule {
- pub field: String,
- pub contains: String,
- pub repeats: u32,
- pub within: String,
-}
-
-#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct Target {
- pub name: String,
- #[serde(rename = "server_url")]
- pub server_url: String,
- #[serde(rename = "api_key")]
- pub api_key: String,
-}
-
pub fn alert(body: String) -> Result<(), Error> {
let alerts: Alerts = serde_json::from_str(body.as_str())?;
for alert in alerts.alerts {
@@ -75,8 +41,10 @@ pub fn alert(body: String) -> Result<(), Error> {
"alert message cannot be empty".to_string(),
));
}
- if alert.rule.contains.is_empty() {
- return Err(Error::InvalidAlert("rule.contains must be set".to_string()));
+ if alert.rule.value == json!(null) {
+ return Err(Error::InvalidAlert(
+ "rule.value cannot be empty".to_string(),
+ ));
}
if alert.rule.field.is_empty() {
return Err(Error::InvalidAlert("rule.field must be set".to_string()));
@@ -89,7 +57,7 @@ pub fn alert(body: String) -> Result<(), Error> {
"rule.repeats can't be set to 0".to_string(),
));
}
- if alert.target.is_empty() {
+ if alert.targets.is_empty() {
return Err(Error::InvalidAlert(
"alert must have at least one target".to_string(),
));