From b41bc7d2a871c646490a6f23f76b3be19320fd72 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 13 Aug 2023 12:58:59 +0530 Subject: [PATCH 01/11] Add composite alerts --- Cargo.lock | 17 ++++ server/Cargo.toml | 1 + server/src/alerts/mod.rs | 1 + server/src/alerts/parser.rs | 186 ++++++++++++++++++++++++++++++++++++ server/src/alerts/rule.rs | 161 ++++++++++++++++++++++++++++++- server/src/validator.rs | 1 + 6 files changed, 364 insertions(+), 3 deletions(-) create mode 100644 server/src/alerts/parser.rs diff --git a/Cargo.lock b/Cargo.lock index 52cc45e4a..e4f29241a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2283,6 +2283,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.6.2" @@ -2343,6 +2349,16 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nom8" version = "0.2.0" @@ -2662,6 +2678,7 @@ dependencies = [ "itertools 0.10.5", "log", "maplit", + "nom", "num_cpus", "object_store", "once_cell", diff --git a/server/Cargo.toml b/server/Cargo.toml index 08e83f136..e49f300a0 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -87,6 +87,7 @@ ulid = { version = "1.0", features = ["serde"] } uptime_lib = "0.2.2" xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features = ["static"] } +nom = "7.1.3" [build-dependencies] diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 97458223f..81868d093 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -26,6 +26,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use std::fmt; +pub mod parser; pub mod rule; pub mod target; diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs new file mode 100644 index 000000000..f24ebe376 --- /dev/null +++ b/server/src/alerts/parser.rs @@ -0,0 +1,186 @@ +use std::str::FromStr; + +use nom::{ + branch::alt, + bytes::complete::{tag, take_until, take_while1}, + character::complete::{char, multispace0, multispace1}, + combinator::map, + sequence::{delimited, separated_pair}, + IResult, +}; + +use super::rule::{ + base::{ + ops::{NumericOperator, StringOperator}, + NumericRule, StringRule, + }, + CompositeRule, +}; + +fn parse_numeric_op(input: &str) -> IResult<&str, NumericOperator> { + alt(( + map(tag("<"), |_| NumericOperator::LessThan), + map(tag(">"), |_| NumericOperator::GreaterThan), + map(tag("<="), |_| NumericOperator::LessThanEquals), + map(tag(">="), |_| NumericOperator::GreaterThanEquals), + map(tag("="), |_| NumericOperator::EqualTo), + map(tag("!="), |_| NumericOperator::NotEqualTo), + ))(input) +} + +fn parse_string_op(input: &str) -> IResult<&str, StringOperator> { + alt(( + map(tag("="), |_| StringOperator::Exact), + map(tag("!="), |_| StringOperator::NotExact), + map(tag("%"), |_| StringOperator::Contains), + map(tag("!%"), |_| StringOperator::NotContains), + map(tag("*="), |_| StringOperator::Regex), + ))(input) +} + +fn parse_numeric_rule(input: &str) -> IResult<&str, CompositeRule> { + let (remaining, key) = map(parse_identifier, |s: &str| s.to_string())(input)?; + let (remaining, op) = delimited(multispace0, parse_numeric_op, multispace0)(remaining)?; + let (remaining, value) = map(take_while1(|c: char| c.is_ascii_digit()), |x| { + str::parse(x).unwrap() + })(remaining)?; + + Ok(( + remaining, + CompositeRule::Numeric(NumericRule { + column: key, + operator: op, + value, + }), + )) +} + +fn parse_string_rule(input: &str) -> IResult<&str, CompositeRule> { + let (remaining, key) = map(parse_identifier, |s: &str| s.to_string())(input)?; + let (remaining, op) = delimited(multispace0, parse_string_op, multispace0)(remaining)?; + let (remaining, value) = map( + delimited(char('"'), take_until("\""), char('"')), + |x: &str| x.to_string(), + )(remaining)?; + + Ok(( + remaining, + CompositeRule::String(StringRule { + column: key, + operator: op, + value, + ignore_case: None, + }), + )) +} + +fn parse_identifier(input: &str) -> IResult<&str, &str> { + take_while1(|c: char| c.is_alphanumeric())(input) +} + +fn parse_unary_expr(input: &str) -> IResult<&str, CompositeRule> { + map(delimited(tag("!("), parse_expression, char(')')), |x| { + CompositeRule::Not(Box::new(x)) + })(input) +} + +fn parse_bracket_expr(input: &str) -> IResult<&str, CompositeRule> { + delimited( + char('('), + delimited(multispace0, parse_expression, multispace0), + char(')'), + )(input) +} + +fn parse_and(input: &str) -> IResult<&str, CompositeRule> { + let (remaining, (lhs, rhs)) = separated_pair( + parse_atom, + delimited(multispace1, tag("and"), multispace1), + parse_term, + )(input)?; + + Ok((remaining, CompositeRule::And(vec![lhs, rhs]))) +} + +fn parse_or(input: &str) -> IResult<&str, CompositeRule> { + let (remaining, (lhs, rhs)) = separated_pair( + parse_term, + delimited(multispace1, tag("or"), multispace1), + parse_expression, + )(input)?; + + Ok((remaining, CompositeRule::Or(vec![lhs, rhs]))) +} + +fn parse_expression(input: &str) -> IResult<&str, CompositeRule> { + alt((parse_or, parse_term))(input) +} +fn parse_term(input: &str) -> IResult<&str, CompositeRule> { + alt((parse_and, parse_atom))(input) +} +fn parse_atom(input: &str) -> IResult<&str, CompositeRule> { + alt(( + alt((parse_numeric_rule, parse_string_rule)), + parse_unary_expr, + parse_bracket_expr, + ))(input) +} + +impl FromStr for CompositeRule { + type Err = Box; + + fn from_str(s: &str) -> Result { + parse_expression(s) + .map(|(_, x)| x) + .map_err(|x| x.to_string().into()) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use crate::alerts::rule::{ + base::{ + ops::{NumericOperator, StringOperator}, + NumericRule, StringRule, + }, + CompositeRule, + }; + + #[test] + fn test_and_or_not() { + let input = r#"key=500 and key="value" or !(key=300)"#; + let rule = CompositeRule::from_str(input).unwrap(); + + let numeric1 = NumericRule { + column: "key".to_string(), + operator: NumericOperator::EqualTo, + value: serde_json::Number::from(500), + }; + + let string1 = StringRule { + column: "key".to_string(), + operator: StringOperator::Exact, + value: "value".to_string(), + ignore_case: None, + }; + + let numeric3 = NumericRule { + column: "key".to_string(), + operator: NumericOperator::EqualTo, + value: serde_json::Number::from(300), + }; + + assert_eq!( + rule, + CompositeRule::Or(vec![ + CompositeRule::And(vec![ + CompositeRule::Numeric(numeric1), + CompositeRule::String(string1) + ]), + CompositeRule::Not(Box::new(CompositeRule::Numeric(numeric3))) + ]) + ) + } +} diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index cd8c614b5..69071be35 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -18,7 +18,17 @@ use arrow_array::{cast::as_string_array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Schema}; -use std::sync::atomic::{AtomicU32, Ordering}; +use itertools::Itertools; +use serde::{ + de::{MapAccess, Visitor}, + Deserialize, Deserializer, +}; +use std::{ + fmt, + marker::PhantomData, + str::FromStr, + sync::atomic::{AtomicU32, Ordering}, +}; use self::base::{ ops::{NumericOperator, StringOperator}, @@ -32,24 +42,39 @@ use super::AlertState; #[serde(rename_all = "camelCase")] pub enum Rule { Column(ColumnRule), + #[serde(deserialize_with = "string_or_struct")] + Composite(CompositeRule), } impl Rule { pub fn resolves(&self, event: RecordBatch) -> Vec { match self { Rule::Column(rule) => rule.resolves(event), + Rule::Composite(rule) => rule + .resolves(event) + .iter() + .map(|x| { + if *x { + AlertState::SetToFiring + } else { + AlertState::Listening + } + }) + .collect(), } } pub fn valid_for_schema(&self, schema: &Schema) -> bool { match self { Rule::Column(rule) => rule.valid_for_schema(schema), + Rule::Composite(rule) => rule.valid_for_schema(schema), } } pub fn trigger_reason(&self) -> String { match self { Rule::Column(rule) => rule.trigger_reason(), + Rule::Composite(_) => "todo".to_string(), } } } @@ -217,6 +242,136 @@ fn one() -> u32 { 1 } +fn string_or_struct<'de, T, D>(deserializer: D) -> Result +where + T: Deserialize<'de> + FromStr>, + D: Deserializer<'de>, +{ + // This is a Visitor that forwards string types to T's `FromStr` impl and + // forwards map types to T's `Deserialize` impl. The `PhantomData` is to + // keep the compiler from complaining about T being an unused generic type + // parameter. We need T in order to know the Value type for the Visitor + // impl. + struct StringOrStruct(PhantomData T>); + + impl<'de, T> Visitor<'de> for StringOrStruct + where + T: Deserialize<'de> + FromStr>, + { + type Value = T; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("string or map") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + Ok(FromStr::from_str(value).unwrap()) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + // `MapAccessDeserializer` is a wrapper that turns a `MapAccess` + // into a `Deserializer`, allowing it to be used as the input to T's + // `Deserialize` implementation. T then deserializes itself using + // the entries from the map visitor. + Deserialize::deserialize(serde::de::value::MapAccessDeserializer::new(map)) + } + } + + deserializer.deserialize_any(StringOrStruct(PhantomData)) +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum CompositeRule { + And(Vec), + Or(Vec), + Not(Box), + Numeric(NumericRule), + String(StringRule), +} + +impl CompositeRule { + fn resolves(&self, event: RecordBatch) -> Vec { + let res = match self { + CompositeRule::And(rules) => { + // get individual evaluation for each subrule + let mut evaluations = rules + .iter() + .map(|x| x.resolves(event.clone())) + .collect_vec(); + // They all must be of same length otherwise some columns was missing in evaluation + let is_same_len = evaluations.iter().map(|x| x.len()).all_equal(); + // if there are more than one rule then we go through all evaluations and compare them side by side + if is_same_len && evaluations.len() > 1 { + (0..evaluations[0].len()) + .map(|idx| evaluations.iter().all(|x| x[idx])) + .collect() + } else if is_same_len && evaluations.len() == 1 { + evaluations.pop().expect("length one") + } else { + vec![] + } + } + CompositeRule::Or(rules) => { + // get individual evaluation for each subrule + let evaluations: Vec> = rules + .iter() + .map(|x| x.resolves(event.clone())) + .collect_vec(); + let mut evaluation_iterators = evaluations.iter().map(|x| x.iter()).collect_vec(); + let mut res = vec![]; + + loop { + let mut continue_iteration = false; + let mut accumulator = false; + for iter in &mut evaluation_iterators { + if let Some(val) = iter.next() { + accumulator = accumulator || *val; + continue_iteration = true + } + } + if !continue_iteration { + break; + } else { + res.push(accumulator) + } + } + + res + } + CompositeRule::Numeric(rule) => { + let Some(column) = event.column_by_name(&rule.column) else { + return Vec::new(); + }; + rule.resolves(column) + } + CompositeRule::String(rule) => { + let Some(column) = event.column_by_name(&rule.column) else { + return Vec::new(); + }; + rule.resolves(as_string_array(column)) + } + CompositeRule::Not(rule) => { + let mut res = rule.resolves(event); + res.iter_mut().for_each(|x| *x = !*x); + res + } + }; + + dbg!(res) + } + + fn valid_for_schema(&self, _: &Schema) -> bool { + true + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ConsecutiveRepeatState { #[serde(default = "one")] @@ -308,7 +463,7 @@ pub mod base { use self::ops::{NumericOperator, StringOperator}; use regex::Regex; - #[derive(Debug, serde::Serialize, serde::Deserialize)] + #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct NumericRule { pub column: String, @@ -362,7 +517,7 @@ pub mod base { } } - #[derive(Debug, serde::Serialize, serde::Deserialize)] + #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct StringRule { pub column: String, diff --git a/server/src/validator.rs b/server/src/validator.rs index 463a9d3f0..5a84ceddb 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -62,6 +62,7 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { } } }, + _ => {} } } Ok(()) From f1bb25a92f947c0b508c9ff9aa8a80790c5465b6 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 14:13:25 +0530 Subject: [PATCH 02/11] Serialize to string --- server/src/alerts/parser.rs | 4 +- server/src/alerts/rule.rs | 175 +++++++++++++++++++++++++----------- 2 files changed, 124 insertions(+), 55 deletions(-) diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs index f24ebe376..c346a0a7e 100644 --- a/server/src/alerts/parser.rs +++ b/server/src/alerts/parser.rs @@ -32,9 +32,9 @@ fn parse_string_op(input: &str) -> IResult<&str, StringOperator> { alt(( map(tag("="), |_| StringOperator::Exact), map(tag("!="), |_| StringOperator::NotExact), - map(tag("%"), |_| StringOperator::Contains), + map(tag("=%"), |_| StringOperator::Contains), map(tag("!%"), |_| StringOperator::NotContains), - map(tag("*="), |_| StringOperator::Regex), + map(tag("~"), |_| StringOperator::Regex), ))(input) } diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 69071be35..bd05a57a7 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -42,7 +42,7 @@ use super::AlertState; #[serde(rename_all = "camelCase")] pub enum Rule { Column(ColumnRule), - #[serde(deserialize_with = "string_or_struct")] + #[serde(deserialize_with = "string_or_struct", serialize_with = "to_string")] Composite(CompositeRule), } @@ -242,50 +242,6 @@ fn one() -> u32 { 1 } -fn string_or_struct<'de, T, D>(deserializer: D) -> Result -where - T: Deserialize<'de> + FromStr>, - D: Deserializer<'de>, -{ - // This is a Visitor that forwards string types to T's `FromStr` impl and - // forwards map types to T's `Deserialize` impl. The `PhantomData` is to - // keep the compiler from complaining about T being an unused generic type - // parameter. We need T in order to know the Value type for the Visitor - // impl. - struct StringOrStruct(PhantomData T>); - - impl<'de, T> Visitor<'de> for StringOrStruct - where - T: Deserialize<'de> + FromStr>, - { - type Value = T; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("string or map") - } - - fn visit_str(self, value: &str) -> Result - where - E: serde::de::Error, - { - Ok(FromStr::from_str(value).unwrap()) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - // `MapAccessDeserializer` is a wrapper that turns a `MapAccess` - // into a `Deserializer`, allowing it to be used as the input to T's - // `Deserialize` implementation. T then deserializes itself using - // the entries from the map visitor. - Deserialize::deserialize(serde::de::value::MapAccessDeserializer::new(map)) - } - } - - deserializer.deserialize_any(StringOrStruct(PhantomData)) -} - #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub enum CompositeRule { @@ -372,6 +328,25 @@ impl CompositeRule { } } +impl fmt::Display for CompositeRule { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let v = match self { + CompositeRule::And(rules) => { + let rules_str: Vec = rules.iter().map(|rule| rule.to_string()).collect(); + format!("({})", rules_str.join(" and ")) + } + CompositeRule::Or(rules) => { + let rules_str: Vec = rules.iter().map(|rule| rule.to_string()).collect(); + format!("({})", rules_str.join(" or ")) + } + CompositeRule::Not(rule) => format!("!({})", rule), + CompositeRule::Numeric(numeric_rule) => numeric_rule.to_string(), + CompositeRule::String(string_rule) => string_rule.to_string(), + }; + write!(f, "{}", v) + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ConsecutiveRepeatState { #[serde(default = "one")] @@ -415,6 +390,57 @@ impl ConsecutiveRepeatState { } } +fn string_or_struct<'de, T, D>(deserializer: D) -> Result +where + T: Deserialize<'de> + FromStr>, + D: Deserializer<'de>, +{ + // This is a Visitor that forwards string types to T's `FromStr` impl and + // forwards map types to T's `Deserialize` impl. The `PhantomData` is to + // keep the compiler from complaining about T being an unused generic type + // parameter. We need T in order to know the Value type for the Visitor + // impl. + struct StringOrStruct(PhantomData T>); + + impl<'de, T> Visitor<'de> for StringOrStruct + where + T: Deserialize<'de> + FromStr>, + { + type Value = T; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("string or map") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + FromStr::from_str(value).map_err(|x| serde::de::Error::custom(x)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + // `MapAccessDeserializer` is a wrapper that turns a `MapAccess` + // into a `Deserializer`, allowing it to be used as the input to T's + // `Deserialize` implementation. T then deserializes itself using + // the entries from the map visitor. + Deserialize::deserialize(serde::de::value::MapAccessDeserializer::new(map)) + } + } + + deserializer.deserialize_any(StringOrStruct(PhantomData)) +} + +fn to_string(ty: &CompositeRule, serializer: S) -> Result +where + S: serde::Serializer, +{ + serializer.serialize_str(&ty.to_string()) +} + #[cfg(test)] mod tests { use std::sync::atomic::AtomicU32; @@ -453,6 +479,8 @@ mod tests { } pub mod base { + use std::fmt::Display; + use arrow_array::{ cast::as_primitive_array, types::{Float64Type, Int64Type, UInt64Type}, @@ -517,6 +545,12 @@ pub mod base { } } + impl Display for NumericRule { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {} {}", self.column, self.operator, self.value) + } + } + #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct StringRule { @@ -574,10 +608,21 @@ pub mod base { } } + impl Display for StringRule { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {} \"{}\"", self.column, self.operator, self.value) + } + } + pub mod ops { - #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + use std::fmt::Display; + + #[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, + )] #[serde(rename_all = "camelCase")] pub enum NumericOperator { + #[default] #[serde(alias = "=")] EqualTo, #[serde(alias = "!=")] @@ -592,19 +637,33 @@ pub mod base { LessThanEquals, } - impl Default for NumericOperator { - fn default() -> Self { - Self::EqualTo + impl Display for NumericOperator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + NumericOperator::EqualTo => "=", + NumericOperator::NotEqualTo => "!=", + NumericOperator::GreaterThan => ">", + NumericOperator::GreaterThanEquals => ">=", + NumericOperator::LessThan => "<", + NumericOperator::LessThanEquals => "<=", + } + ) } } - #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + #[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, + )] #[serde(rename_all = "camelCase")] pub enum StringOperator { #[serde(alias = "=")] Exact, #[serde(alias = "!=")] NotExact, + #[default] #[serde(alias = "=%")] Contains, #[serde(alias = "!%")] @@ -613,9 +672,19 @@ pub mod base { Regex, } - impl Default for StringOperator { - fn default() -> Self { - Self::Contains + impl Display for StringOperator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + StringOperator::Exact => "=", + StringOperator::NotExact => "!=", + StringOperator::Contains => "=%", + StringOperator::NotContains => "!%", + StringOperator::Regex => "~", + } + ) } } } From 0e8e53699f2a962fa050916b3ac7e423a8e629b8 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 14:26:46 +0530 Subject: [PATCH 03/11] Fix rule message and validation --- server/src/alerts/rule.rs | 55 +++++++++++++++++++++++---------------- server/src/validator.rs | 7 +++-- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index bd05a57a7..172337b7a 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -17,7 +17,7 @@ */ use arrow_array::{cast::as_string_array, RecordBatch}; -use datafusion::arrow::datatypes::{DataType, Schema}; +use datafusion::arrow::datatypes::Schema; use itertools::Itertools; use serde::{ de::{MapAccess, Visitor}, @@ -74,7 +74,7 @@ impl Rule { pub fn trigger_reason(&self) -> String { match self { Rule::Column(rule) => rule.trigger_reason(), - Rule::Composite(_) => "todo".to_string(), + Rule::Composite(rule) => format!("matched rule {}", rule), } } } @@ -98,29 +98,10 @@ impl ColumnRule { match self { Self::ConsecutiveNumeric(ConsecutiveNumericRule { base_rule: rule, .. - }) => match schema.column_with_name(&rule.column) { - Some((_, column)) => matches!( - column.data_type(), - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float16 - | DataType::Float32 - | DataType::Float64 - ), - None => false, - }, + }) => rule.valid_for_schema(schema), Self::ConsecutiveString(ConsecutiveStringRule { base_rule: rule, .. - }) => match schema.column_with_name(&rule.column) { - Some((_, column)) => matches!(column.data_type(), DataType::Utf8), - None => false, - }, + }) => rule.valid_for_schema(schema), } } @@ -486,6 +467,7 @@ pub mod base { types::{Float64Type, Int64Type, UInt64Type}, Array, ArrowPrimitiveType, PrimitiveArray, StringArray, }; + use arrow_schema::{DataType, Schema}; use itertools::Itertools; use self::ops::{NumericOperator, StringOperator}; @@ -543,6 +525,26 @@ pub mod base { }) .collect() } + + pub fn valid_for_schema(&self, schema: &Schema) -> bool { + match schema.column_with_name(&self.column) { + Some((_, column)) => matches!( + column.data_type(), + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + ), + None => false, + } + } } impl Display for NumericRule { @@ -606,6 +608,13 @@ pub mod base { } } } + + pub fn valid_for_schema(&self, schema: &Schema) -> bool { + match schema.column_with_name(&self.column) { + Some((_, column)) => matches!(column.data_type(), DataType::Utf8), + None => false, + } + } } impl Display for StringRule { diff --git a/server/src/validator.rs b/server/src/validator.rs index 5a84ceddb..1a44a16c2 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -44,8 +44,8 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { return Err(AlertValidationError::NoTarget); } - match alert.rule { - Rule::Column(ref column_rule) => match column_rule { + if let Rule::Column(ref column_rule) = alert.rule { + match column_rule { ColumnRule::ConsecutiveNumeric(ConsecutiveNumericRule { base_rule: NumericRule { ref column, .. }, ref state, @@ -61,8 +61,7 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { return Err(AlertValidationError::InvalidRuleRepeat); } } - }, - _ => {} + } } } Ok(()) From 0f331727b3cb45252ac617617d7d583870fd11aa Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 15:44:25 +0530 Subject: [PATCH 04/11] Fix --- server/src/alerts/mod.rs | 2 +- server/src/alerts/parser.rs | 56 ++++++++++++++++++++++++++++++++++--- server/src/alerts/rule.rs | 2 -- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 81868d093..6625b89b6 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -67,7 +67,7 @@ pub struct Alert { impl Alert { pub fn check_alert(&self, stream_name: &str, events: RecordBatch) { - let resolves = self.rule.resolves(events.clone()); + let resolves = dbg!(self.rule.resolves(events.clone())); for (index, state) in resolves.into_iter().enumerate() { match state { diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs index c346a0a7e..02d944427 100644 --- a/server/src/alerts/parser.rs +++ b/server/src/alerts/parser.rs @@ -19,21 +19,21 @@ use super::rule::{ fn parse_numeric_op(input: &str) -> IResult<&str, NumericOperator> { alt(( - map(tag("<"), |_| NumericOperator::LessThan), - map(tag(">"), |_| NumericOperator::GreaterThan), map(tag("<="), |_| NumericOperator::LessThanEquals), map(tag(">="), |_| NumericOperator::GreaterThanEquals), - map(tag("="), |_| NumericOperator::EqualTo), map(tag("!="), |_| NumericOperator::NotEqualTo), + map(tag("<"), |_| NumericOperator::LessThan), + map(tag(">"), |_| NumericOperator::GreaterThan), + map(tag("="), |_| NumericOperator::EqualTo), ))(input) } fn parse_string_op(input: &str) -> IResult<&str, StringOperator> { alt(( - map(tag("="), |_| StringOperator::Exact), map(tag("!="), |_| StringOperator::NotExact), map(tag("=%"), |_| StringOperator::Contains), map(tag("!%"), |_| StringOperator::NotContains), + map(tag("="), |_| StringOperator::Exact), map(tag("~"), |_| StringOperator::Regex), ))(input) } @@ -183,4 +183,52 @@ mod tests { ]) ) } + + #[test] + fn test_complex() { + let input = r#"(verb =% "list" or verb =% "get") and (resource = "secret" and username !% "admin")"#; + let rule = CompositeRule::from_str(input).unwrap(); + + let verb_like_list = StringRule { + column: "verb".to_string(), + operator: StringOperator::Contains, + value: "list".to_string(), + ignore_case: None, + }; + + let verb_like_get = StringRule { + column: "verb".to_string(), + operator: StringOperator::Contains, + value: "get".to_string(), + ignore_case: None, + }; + + let resource_exact_secret = StringRule { + column: "resource".to_string(), + operator: StringOperator::Exact, + value: "secret".to_string(), + ignore_case: None, + }; + + let username_notcontains_admin = StringRule { + column: "username".to_string(), + operator: StringOperator::NotContains, + value: "admin".to_string(), + ignore_case: None, + }; + + assert_eq!( + rule, + CompositeRule::And(vec![ + CompositeRule::Or(vec![ + CompositeRule::String(verb_like_list), + CompositeRule::String(verb_like_get) + ]), + CompositeRule::And(vec![ + CompositeRule::String(resource_exact_secret), + CompositeRule::String(username_notcontains_admin) + ]), + ]) + ) + } } diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 172337b7a..4608f3f44 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -300,8 +300,6 @@ impl CompositeRule { res } }; - - dbg!(res) } fn valid_for_schema(&self, _: &Schema) -> bool { From 21008e04c4b96ddc5b4087bf3d58985496c48d1f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 15:52:41 +0530 Subject: [PATCH 05/11] Fix --- server/src/alerts/rule.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 4608f3f44..c171620d3 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -235,7 +235,7 @@ pub enum CompositeRule { impl CompositeRule { fn resolves(&self, event: RecordBatch) -> Vec { - let res = match self { + match self { CompositeRule::And(rules) => { // get individual evaluation for each subrule let mut evaluations = rules @@ -299,7 +299,7 @@ impl CompositeRule { res.iter_mut().for_each(|x| *x = !*x); res } - }; + } } fn valid_for_schema(&self, _: &Schema) -> bool { From 288f8ee57618f451b1afe86db5f026546495d877 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 16:02:53 +0530 Subject: [PATCH 06/11] Fix --- server/src/alerts/parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs index 02d944427..a3bdf7ef1 100644 --- a/server/src/alerts/parser.rs +++ b/server/src/alerts/parser.rs @@ -75,7 +75,7 @@ fn parse_string_rule(input: &str) -> IResult<&str, CompositeRule> { } fn parse_identifier(input: &str) -> IResult<&str, &str> { - take_while1(|c: char| c.is_alphanumeric())(input) + take_while1(|c: char| c.is_alphanumeric() || c == '-' || c == '_' )(input) } fn parse_unary_expr(input: &str) -> IResult<&str, CompositeRule> { From e1a4c24aa21d886bf48de5ebddba3788589a8de5 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 16:20:17 +0530 Subject: [PATCH 07/11] Valid for schema --- server/src/alerts/rule.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index c171620d3..b907e0cfb 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -302,8 +302,14 @@ impl CompositeRule { } } - fn valid_for_schema(&self, _: &Schema) -> bool { - true + fn valid_for_schema(&self, schema: &Schema) -> bool { + match self { + CompositeRule::And(rules) => rules.iter().all(|rule| rule.valid_for_schema(schema)), + CompositeRule::Or(rules) => rules.iter().all(|rule| rule.valid_for_schema(schema)), + CompositeRule::Not(rule) => rule.valid_for_schema(schema), + CompositeRule::Numeric(rule) => rule.valid_for_schema(schema), + CompositeRule::String(rule) => rule.valid_for_schema(schema), + } } } From 1220bd3475be07f7ba499076d9b84bbfbf37e7f7 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 15 Aug 2023 16:30:42 +0530 Subject: [PATCH 08/11] add dbg --- server/src/alerts/rule.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index b907e0cfb..0271ad82f 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -235,7 +235,7 @@ pub enum CompositeRule { impl CompositeRule { fn resolves(&self, event: RecordBatch) -> Vec { - match self { + let res = match self { CompositeRule::And(rules) => { // get individual evaluation for each subrule let mut evaluations = rules @@ -299,7 +299,12 @@ impl CompositeRule { res.iter_mut().for_each(|x| *x = !*x); res } - } + }; + + let dbg = self.to_string(); + eprintln!("{} {}", dbg, res[0]); + + res } fn valid_for_schema(&self, schema: &Schema) -> bool { From 45a00b0cca41a8848b8641fbbf243a219b00e8cc Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 16 Aug 2023 11:27:38 +0530 Subject: [PATCH 09/11] Remove dbg --- server/src/alerts/mod.rs | 2 +- server/src/alerts/rule.rs | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 6625b89b6..81868d093 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -67,7 +67,7 @@ pub struct Alert { impl Alert { pub fn check_alert(&self, stream_name: &str, events: RecordBatch) { - let resolves = dbg!(self.rule.resolves(events.clone())); + let resolves = self.rule.resolves(events.clone()); for (index, state) in resolves.into_iter().enumerate() { match state { diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 0271ad82f..24b222415 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -301,9 +301,6 @@ impl CompositeRule { } }; - let dbg = self.to_string(); - eprintln!("{} {}", dbg, res[0]); - res } From 90c34d3f85130e239c9ac9b3c6d362422f47270c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 16 Aug 2023 11:27:59 +0530 Subject: [PATCH 10/11] Fix --- server/src/alerts/parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs index a3bdf7ef1..fa5351bdb 100644 --- a/server/src/alerts/parser.rs +++ b/server/src/alerts/parser.rs @@ -75,7 +75,7 @@ fn parse_string_rule(input: &str) -> IResult<&str, CompositeRule> { } fn parse_identifier(input: &str) -> IResult<&str, &str> { - take_while1(|c: char| c.is_alphanumeric() || c == '-' || c == '_' )(input) + take_while1(|c: char| c.is_alphanumeric() || c == '-' || c == '_')(input) } fn parse_unary_expr(input: &str) -> IResult<&str, CompositeRule> { From 84045caf4128cc602c589532b79fb12fa66b6ad4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 16 Aug 2023 11:32:32 +0530 Subject: [PATCH 11/11] Add header --- server/src/alerts/parser.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs index fa5351bdb..cd7704b0c 100644 --- a/server/src/alerts/parser.rs +++ b/server/src/alerts/parser.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::str::FromStr; use nom::{