1717 */
1818
1919use async_trait:: async_trait;
20+ use datafusion:: arrow:: datatypes:: Schema ;
21+ use regex:: Regex ;
2022use serde:: { Deserialize , Serialize } ;
2123use std:: fmt;
2224
@@ -51,7 +53,8 @@ pub struct Alert {
5153 #[ serde( default = "crate::utils::uid::gen" ) ]
5254 pub id : uid:: Uid ,
5355 pub name : String ,
54- pub message : String ,
56+ #[ serde( flatten) ]
57+ pub message : Message ,
5558 pub rule : Rule ,
5659 pub targets : Vec < Target > ,
5760}
@@ -63,7 +66,7 @@ impl Alert {
6366 match resolves {
6467 AlertState :: Listening | AlertState :: Firing => ( ) ,
6568 alert_state @ ( AlertState :: SetToFiring | AlertState :: Resolved ) => {
66- let context = self . get_context ( stream_name, alert_state, & self . rule ) ;
69+ let context = self . get_context ( stream_name, alert_state, & self . rule , event_json ) ;
6770 ALERTS_STATES
6871 . with_label_values ( & [
6972 context. stream . as_str ( ) ,
@@ -78,7 +81,13 @@ impl Alert {
7881 }
7982 }
8083
81- fn get_context ( & self , stream_name : String , alert_state : AlertState , rule : & Rule ) -> Context {
84+ fn get_context (
85+ & self ,
86+ stream_name : String ,
87+ alert_state : AlertState ,
88+ rule : & Rule ,
89+ event_json : & serde_json:: Value ,
90+ ) -> Context {
8291 let deployment_instance = format ! (
8392 "{}://{}" ,
8493 CONFIG . parseable. get_scheme( ) ,
@@ -102,7 +111,7 @@ impl Alert {
102111 stream_name,
103112 AlertInfo :: new (
104113 self . name . clone ( ) ,
105- self . message . clone ( ) ,
114+ self . message . get ( event_json ) ,
106115 rule. trigger_reason ( ) ,
107116 alert_state,
108117 ) ,
@@ -111,6 +120,49 @@ impl Alert {
111120 )
112121 }
113122}
123+
124+ #[ derive( Debug , Serialize , Deserialize , Clone ) ]
125+ #[ serde( rename_all = "camelCase" ) ]
126+ pub struct Message {
127+ pub message : String ,
128+ }
129+
130+ impl Message {
131+ // checks if message (with a column name) is valid (i.e. the column name is present in the schema)
132+ pub fn valid ( & self , schema : & Schema , column : Option < & str > ) -> bool {
133+ if let Some ( col) = column {
134+ return schema. field_with_name ( col) . is_ok ( ) ;
135+ }
136+ true
137+ }
138+
139+ pub fn extract_column_name ( & self ) -> Option < & str > {
140+ let re = Regex :: new ( r"\{(.*?)\}" ) . unwrap ( ) ;
141+ let tokens: Vec < & str > = re
142+ . captures_iter ( self . message . as_str ( ) )
143+ . map ( |cap| cap. get ( 1 ) . unwrap ( ) . as_str ( ) )
144+ . collect ( ) ;
145+ // the message can have either no column name ({column_name} not present) or one column name
146+ // return Some only if there is exactly one column name present
147+ if tokens. len ( ) == 1 {
148+ return Some ( tokens[ 0 ] ) ;
149+ }
150+ None
151+ }
152+
153+ // returns the message with the column name replaced with the value of the column
154+ fn get ( & self , event_json : & serde_json:: Value ) -> String {
155+ if let Some ( column) = self . extract_column_name ( ) {
156+ if let Some ( value) = event_json. get ( column) {
157+ return self
158+ . message
159+ . replace ( & format ! ( "{{{column}}}" ) , value. to_string ( ) . as_str ( ) ) ;
160+ }
161+ }
162+ self . message . clone ( )
163+ }
164+ }
165+
114166#[ async_trait]
115167pub trait CallableTarget {
116168 async fn call ( & self , payload : & Context ) ;
0 commit comments