@@ -25,12 +25,13 @@ use datafusion::{
2525 min_max:: { max, min} ,
2626 sum:: sum,
2727 } ,
28+ logical_expr:: { BinaryExpr , Literal , Operator } ,
2829 prelude:: { col, lit, DataFrame , Expr } ,
2930} ;
3031use tracing:: trace;
3132
3233use crate :: {
33- alerts:: AggregateCondition ,
34+ alerts:: LogicalOperator ,
3435 parseable:: PARSEABLE ,
3536 query:: { TableScanVisitor , QUERY_SESSION } ,
3637 rbac:: {
@@ -42,8 +43,8 @@ use crate::{
4243} ;
4344
4445use super :: {
45- AggregateConfig , AggregateOperation , AggregateResult , Aggregations , AlertConfig , AlertError ,
46- AlertOperator , AlertState , ConditionConfig , Conditions , ALERTS ,
46+ AggregateConfig , AggregateFunction , AggregateResult , Aggregates , AlertConfig , AlertError ,
47+ AlertOperator , AlertState , ConditionConfig , Conditions , WhereConfigOperator , ALERTS ,
4748} ;
4849
4950async fn get_tables_from_query ( query : & str ) -> Result < TableScanVisitor , AlertError > {
@@ -102,23 +103,23 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
102103 trace ! ( "RUNNING EVAL TASK FOR- {alert:?}" ) ;
103104
104105 let query = prepare_query ( alert) . await ?;
105- let base_df = execute_base_query ( & query, & alert. query ) . await ?;
106- let agg_results = evaluate_aggregates ( & alert. aggregate_config , & base_df) . await ?;
107- let final_res = calculate_final_result ( & alert. aggregate_config , & agg_results) ;
106+ let select_query = alert. get_base_query ( ) ;
107+ let base_df = execute_base_query ( & query, & select_query) . await ?;
108+ let agg_results = evaluate_aggregates ( & alert. aggregates , & base_df) . await ?;
109+ let final_res = calculate_final_result ( & alert. aggregates , & agg_results) ;
108110
109111 update_alert_state ( alert, final_res, & agg_results) . await ?;
110112 Ok ( ( ) )
111113}
112114
113115async fn prepare_query ( alert : & AlertConfig ) -> Result < crate :: query:: Query , AlertError > {
114- let ( start_time, end_time) = match & alert. eval_type {
115- super :: EvalConfig :: RollingWindow ( rolling_window) => {
116- ( & rolling_window. eval_start , & rolling_window. eval_end )
117- }
116+ let ( start_time, end_time) = match & alert. eval_config {
117+ super :: EvalConfig :: RollingWindow ( rolling_window) => ( & rolling_window. eval_start , "now" ) ,
118118 } ;
119119
120120 let session_state = QUERY_SESSION . state ( ) ;
121- let raw_logical_plan = session_state. create_logical_plan ( & alert. query ) . await ?;
121+ let select_query = alert. get_base_query ( ) ;
122+ let raw_logical_plan = session_state. create_logical_plan ( & select_query) . await ?;
122123
123124 let time_range = TimeRange :: parse_human_time ( start_time, end_time)
124125 . map_err ( |err| AlertError :: CustomError ( err. to_string ( ) ) ) ?;
@@ -146,15 +147,15 @@ async fn execute_base_query(
146147}
147148
148149async fn evaluate_aggregates (
149- agg_config : & Aggregations ,
150+ agg_config : & Aggregates ,
150151 base_df : & DataFrame ,
151152) -> Result < Vec < AggregateResult > , AlertError > {
152153 let agg_filter_exprs = get_exprs ( agg_config) ;
153154 let mut results = Vec :: new ( ) ;
154155
155156 let conditions = match & agg_config. operator {
156- Some ( _) => & agg_config. aggregate_conditions [ 0 ..2 ] ,
157- None => & agg_config. aggregate_conditions [ 0 ..1 ] ,
157+ Some ( _) => & agg_config. aggregate_config [ 0 ..2 ] ,
158+ None => & agg_config. aggregate_config [ 0 ..1 ] ,
158159 } ;
159160
160161 for ( ( agg_expr, filter) , agg) in agg_filter_exprs. into_iter ( ) . zip ( conditions) {
@@ -186,10 +187,10 @@ async fn evaluate_single_aggregate(
186187 let result = evaluate_condition ( & agg. operator , final_value, agg. value ) ;
187188
188189 let message = if result {
189- agg. condition_config
190+ agg. conditions
190191 . as_ref ( )
191192 . map ( |config| config. generate_filter_message ( ) )
192- . or ( Some ( String :: default ( ) ) )
193+ . or ( None )
193194 } else {
194195 None
195196 } ;
@@ -206,18 +207,17 @@ fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> b
206207 match operator {
207208 AlertOperator :: GreaterThan => actual > expected,
208209 AlertOperator :: LessThan => actual < expected,
209- AlertOperator :: EqualTo => actual == expected,
210- AlertOperator :: NotEqualTo => actual != expected,
211- AlertOperator :: GreaterThanEqualTo => actual >= expected,
212- AlertOperator :: LessThanEqualTo => actual <= expected,
213- _ => unreachable ! ( ) ,
210+ AlertOperator :: Equal => actual == expected,
211+ AlertOperator :: NotEqual => actual != expected,
212+ AlertOperator :: GreaterThanOrEqual => actual >= expected,
213+ AlertOperator :: LessThanOrEqual => actual <= expected,
214214 }
215215}
216216
217- fn calculate_final_result ( agg_config : & Aggregations , results : & [ AggregateResult ] ) -> bool {
217+ fn calculate_final_result ( agg_config : & Aggregates , results : & [ AggregateResult ] ) -> bool {
218218 match & agg_config. operator {
219- Some ( AggregateCondition :: And ) => results. iter ( ) . all ( |r| r. result ) ,
220- Some ( AggregateCondition :: Or ) => results. iter ( ) . any ( |r| r. result ) ,
219+ Some ( LogicalOperator :: And ) => results. iter ( ) . all ( |r| r. result ) ,
220+ Some ( LogicalOperator :: Or ) => results. iter ( ) . any ( |r| r. result ) ,
221221 None => results. first ( ) . is_some_and ( |r| r. result ) ,
222222 }
223223}
@@ -228,8 +228,12 @@ async fn update_alert_state(
228228 agg_results : & [ AggregateResult ] ,
229229) -> Result < ( ) , AlertError > {
230230 if final_res {
231- trace ! ( "ALERT!!!!!!" ) ;
232231 let message = format_alert_message ( agg_results) ;
232+ let message = format ! (
233+ "{message}\n Evaluation Window: {}\n Evaluation Frequency: {}m" ,
234+ alert. get_eval_window( ) ,
235+ alert. get_eval_frequency( )
236+ ) ;
233237 ALERTS
234238 . update_state ( alert. id , AlertState :: Triggered , Some ( message) )
235239 . await
@@ -249,8 +253,8 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
249253 for result in agg_results {
250254 if let Some ( msg) = & result. message {
251255 message. extend ( [ format ! (
252- "| {}({}) WHERE ({}) {} {} (ActualValue : {})| " ,
253- result. config. agg ,
256+ "\n Condition: {}({}) WHERE ({}) {} {}\n ActualValue : {}\n " ,
257+ result. config. aggregate_function ,
254258 result. config. column,
255259 msg,
256260 result. config. operator,
@@ -259,8 +263,8 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
259263 ) ] ) ;
260264 } else {
261265 message. extend ( [ format ! (
262- "| {}({}) {} {} (ActualValue : {}) " ,
263- result. config. agg ,
266+ "\n Condition: {}({}) {} {}\n ActualValue : {}\n " ,
267+ result. config. aggregate_function ,
264268 result. config. column,
265269 result. config. operator,
266270 result. config. value,
@@ -305,17 +309,17 @@ fn get_final_value(aggregated_rows: Vec<RecordBatch>) -> f64 {
305309/// returns a tuple of (aggregate expressions, filter expressions)
306310///
307311/// It calls get_filter_expr() to get filter expressions
308- fn get_exprs ( aggregate_config : & Aggregations ) -> Vec < ( Expr , Option < Expr > ) > {
312+ fn get_exprs ( aggregate_config : & Aggregates ) -> Vec < ( Expr , Option < Expr > ) > {
309313 let mut agg_expr = Vec :: new ( ) ;
310314
311315 match & aggregate_config. operator {
312316 Some ( op) => match op {
313- AggregateCondition :: And | AggregateCondition :: Or => {
314- let agg1 = & aggregate_config. aggregate_conditions [ 0 ] ;
315- let agg2 = & aggregate_config. aggregate_conditions [ 1 ] ;
317+ LogicalOperator :: And | LogicalOperator :: Or => {
318+ let agg1 = & aggregate_config. aggregate_config [ 0 ] ;
319+ let agg2 = & aggregate_config. aggregate_config [ 1 ] ;
316320
317321 for agg in [ agg1, agg2] {
318- let filter_expr = if let Some ( where_clause) = & agg. condition_config {
322+ let filter_expr = if let Some ( where_clause) = & agg. conditions {
319323 let fe = get_filter_expr ( where_clause) ;
320324
321325 trace ! ( "filter_expr-\n {fe:?}" ) ;
@@ -331,9 +335,9 @@ fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
331335 }
332336 } ,
333337 None => {
334- let agg = & aggregate_config. aggregate_conditions [ 0 ] ;
338+ let agg = & aggregate_config. aggregate_config [ 0 ] ;
335339
336- let filter_expr = if let Some ( where_clause) = & agg. condition_config {
340+ let filter_expr = if let Some ( where_clause) = & agg. conditions {
337341 let fe = get_filter_expr ( where_clause) ;
338342
339343 trace ! ( "filter_expr-\n {fe:?}" ) ;
@@ -353,23 +357,23 @@ fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
353357fn get_filter_expr ( where_clause : & Conditions ) -> Expr {
354358 match & where_clause. operator {
355359 Some ( op) => match op {
356- AggregateCondition :: And => {
360+ LogicalOperator :: And => {
357361 let mut expr = Expr :: Literal ( datafusion:: scalar:: ScalarValue :: Boolean ( Some ( true ) ) ) ;
358362
359- let expr1 = & where_clause. conditions [ 0 ] ;
360- let expr2 = & where_clause. conditions [ 1 ] ;
363+ let expr1 = & where_clause. condition_config [ 0 ] ;
364+ let expr2 = & where_clause. condition_config [ 1 ] ;
361365
362366 for e in [ expr1, expr2] {
363367 let ex = match_alert_operator ( e) ;
364368 expr = expr. and ( ex) ;
365369 }
366370 expr
367371 }
368- AggregateCondition :: Or => {
372+ LogicalOperator :: Or => {
369373 let mut expr = Expr :: Literal ( datafusion:: scalar:: ScalarValue :: Boolean ( Some ( false ) ) ) ;
370374
371- let expr1 = & where_clause. conditions [ 0 ] ;
372- let expr2 = & where_clause. conditions [ 1 ] ;
375+ let expr1 = & where_clause. condition_config [ 0 ] ;
376+ let expr2 = & where_clause. condition_config [ 1 ] ;
373377
374378 for e in [ expr1, expr2] {
375379 let ex = match_alert_operator ( e) ;
@@ -379,30 +383,86 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
379383 }
380384 } ,
381385 None => {
382- let expr = & where_clause. conditions [ 0 ] ;
386+ let expr = & where_clause. condition_config [ 0 ] ;
383387 match_alert_operator ( expr)
384388 }
385389 }
386390}
387391
388392fn match_alert_operator ( expr : & ConditionConfig ) -> Expr {
393+ // the form accepts value as a string
394+ // if it can be parsed as a number, then parse it
395+ // else keep it as a string
396+ let value = NumberOrString :: from_string ( expr. value . clone ( ) ) ;
397+
398+ // for maintaining column case
399+ let column = format ! ( r#""{}""# , expr. column) ;
389400 match expr. operator {
390- AlertOperator :: GreaterThan => col ( & expr. column ) . gt ( lit ( & expr. value ) ) ,
391- AlertOperator :: LessThan => col ( & expr. column ) . lt ( lit ( & expr. value ) ) ,
392- AlertOperator :: EqualTo => col ( & expr. column ) . eq ( lit ( & expr. value ) ) ,
393- AlertOperator :: NotEqualTo => col ( & expr. column ) . not_eq ( lit ( & expr. value ) ) ,
394- AlertOperator :: GreaterThanEqualTo => col ( & expr. column ) . gt_eq ( lit ( & expr. value ) ) ,
395- AlertOperator :: LessThanEqualTo => col ( & expr. column ) . lt_eq ( lit ( & expr. value ) ) ,
396- AlertOperator :: Like => col ( & expr. column ) . like ( lit ( & expr. value ) ) ,
397- AlertOperator :: NotLike => col ( & expr. column ) . not_like ( lit ( & expr. value ) ) ,
401+ WhereConfigOperator :: Equal => col ( column) . eq ( lit ( value) ) ,
402+ WhereConfigOperator :: NotEqual => col ( column) . not_eq ( lit ( value) ) ,
403+ WhereConfigOperator :: LessThan => col ( column) . lt ( lit ( value) ) ,
404+ WhereConfigOperator :: GreaterThan => col ( column) . gt ( lit ( value) ) ,
405+ WhereConfigOperator :: LessThanOrEqual => col ( column) . lt_eq ( lit ( value) ) ,
406+ WhereConfigOperator :: GreaterThanOrEqual => col ( column) . gt_eq ( lit ( value) ) ,
407+ WhereConfigOperator :: IsNull => col ( column) . is_null ( ) ,
408+ WhereConfigOperator :: IsNotNull => col ( column) . is_not_null ( ) ,
409+ WhereConfigOperator :: ILike => col ( column) . ilike ( lit ( & expr. value ) ) ,
410+ WhereConfigOperator :: Contains => col ( column) . like ( lit ( & expr. value ) ) ,
411+ WhereConfigOperator :: BeginsWith => Expr :: BinaryExpr ( BinaryExpr :: new (
412+ Box :: new ( col ( column) ) ,
413+ Operator :: RegexIMatch ,
414+ Box :: new ( lit ( format ! ( "^{}" , expr. value) ) ) ,
415+ ) ) ,
416+ WhereConfigOperator :: EndsWith => Expr :: BinaryExpr ( BinaryExpr :: new (
417+ Box :: new ( col ( column) ) ,
418+ Operator :: RegexIMatch ,
419+ Box :: new ( lit ( format ! ( "{}$" , expr. value) ) ) ,
420+ ) ) ,
421+ WhereConfigOperator :: DoesNotContain => col ( column) . not_ilike ( lit ( & expr. value ) ) ,
422+ WhereConfigOperator :: DoesNotBeginWith => Expr :: BinaryExpr ( BinaryExpr :: new (
423+ Box :: new ( col ( column) ) ,
424+ Operator :: RegexNotIMatch ,
425+ Box :: new ( lit ( format ! ( "^{}" , expr. value) ) ) ,
426+ ) ) ,
427+ WhereConfigOperator :: DoesNotEndWith => Expr :: BinaryExpr ( BinaryExpr :: new (
428+ Box :: new ( col ( column) ) ,
429+ Operator :: RegexNotIMatch ,
430+ Box :: new ( lit ( format ! ( "{}$" , expr. value) ) ) ,
431+ ) ) ,
398432 }
399433}
434+
400435fn match_aggregate_operation ( agg : & AggregateConfig ) -> Expr {
401- match agg. agg {
402- AggregateOperation :: Avg => avg ( col ( & agg. column ) ) ,
403- AggregateOperation :: Count => count ( col ( & agg. column ) ) ,
404- AggregateOperation :: Min => min ( col ( & agg. column ) ) ,
405- AggregateOperation :: Max => max ( col ( & agg. column ) ) ,
406- AggregateOperation :: Sum => sum ( col ( & agg. column ) ) ,
436+ // for maintaining column case
437+ let column = format ! ( r#""{}""# , agg. column) ;
438+ match agg. aggregate_function {
439+ AggregateFunction :: Avg => avg ( col ( column) ) ,
440+ AggregateFunction :: Count => count ( col ( column) ) ,
441+ AggregateFunction :: Min => min ( col ( column) ) ,
442+ AggregateFunction :: Max => max ( col ( column) ) ,
443+ AggregateFunction :: Sum => sum ( col ( column) ) ,
444+ }
445+ }
446+
447+ enum NumberOrString {
448+ Number ( f64 ) ,
449+ String ( String ) ,
450+ }
451+
452+ impl Literal for NumberOrString {
453+ fn lit ( & self ) -> Expr {
454+ match self {
455+ NumberOrString :: Number ( expr) => lit ( * expr) ,
456+ NumberOrString :: String ( expr) => lit ( expr. clone ( ) ) ,
457+ }
458+ }
459+ }
460+ impl NumberOrString {
461+ fn from_string ( value : String ) -> Self {
462+ if let Ok ( num) = value. parse :: < f64 > ( ) {
463+ NumberOrString :: Number ( num)
464+ } else {
465+ NumberOrString :: String ( value)
466+ }
407467 }
408468}
0 commit comments