@@ -390,7 +390,7 @@ pub struct AlertRequest {
390390 #[ serde( default = "Severity::default" ) ]
391391 pub severity : Severity ,
392392 pub title : String ,
393- pub query : String ,
393+ pub stream : String ,
394394 pub alert_type : AlertType ,
395395 pub aggregate_config : Aggregations ,
396396 pub eval_type : EvalConfig ,
@@ -404,7 +404,7 @@ impl From<AlertRequest> for AlertConfig {
404404 id : Ulid :: new ( ) ,
405405 severity : val. severity ,
406406 title : val. title ,
407- query : val. query ,
407+ stream : val. stream ,
408408 alert_type : val. alert_type ,
409409 aggregate_config : val. aggregate_config ,
410410 eval_type : val. eval_type ,
@@ -422,7 +422,7 @@ pub struct AlertConfig {
422422 pub id : Ulid ,
423423 pub severity : Severity ,
424424 pub title : String ,
425- pub query : String ,
425+ pub stream : String ,
426426 pub alert_type : AlertType ,
427427 pub aggregate_config : Aggregations ,
428428 pub eval_type : EvalConfig ,
@@ -435,7 +435,7 @@ pub struct AlertConfig {
435435impl AlertConfig {
436436 pub fn modify ( & mut self , alert : AlertRequest ) {
437437 self . title = alert. title ;
438- self . query = alert. query ;
438+ self . stream = alert. stream ;
439439 self . alert_type = alert. alert_type ;
440440 self . aggregate_config = alert. aggregate_config ;
441441 self . eval_type = alert. eval_type ;
@@ -480,28 +480,14 @@ impl AlertConfig {
480480 self . validate_configs ( ) ?;
481481
482482 let session_state = QUERY_SESSION . state ( ) ;
483- let raw_logical_plan = session_state. create_logical_plan ( & self . query ) . await ?;
483+ let select_query = format ! ( "SELECT * FROM {}" , self . stream) ;
484+
485+ let raw_logical_plan = session_state. create_logical_plan ( & select_query) . await ?;
484486
485487 // create a visitor to extract the table names present in query
486488 let mut visitor = TableScanVisitor :: default ( ) ;
487489 let _ = raw_logical_plan. visit ( & mut visitor) ;
488490
489- let table = visitor. into_inner ( ) . first ( ) . unwrap ( ) . to_owned ( ) ;
490-
491- let lowercase = self . query . split ( & table) . collect_vec ( ) [ 0 ] . to_lowercase ( ) ;
492-
493- if lowercase
494- . strip_prefix ( " " )
495- . unwrap_or ( & lowercase)
496- . strip_suffix ( " " )
497- . unwrap_or ( & lowercase)
498- . ne ( "select * from" )
499- {
500- return Err ( AlertError :: Metadata (
501- "Query needs to be select * from <logstream>" ,
502- ) ) ;
503- }
504-
505491 // TODO: Filter tags should be taken care of!!!
506492 let time_range = TimeRange :: parse_human_time ( "1m" , "now" )
507493 . map_err ( |err| AlertError :: CustomError ( err. to_string ( ) ) ) ?;
@@ -517,7 +503,7 @@ impl AlertConfig {
517503 let Some ( stream_name) = query. first_table_name ( ) else {
518504 return Err ( AlertError :: CustomError ( format ! (
519505 "Table name not found in query- {}" ,
520- self . query
506+ select_query
521507 ) ) ) ;
522508 } ;
523509
@@ -766,8 +752,8 @@ impl Alerts {
766752 let mut alerts: Vec < AlertConfig > = Vec :: new ( ) ;
767753 for ( _, alert) in self . alerts . read ( ) . await . iter ( ) {
768754 // filter based on whether the user can execute this query or not
769- let query = & alert. query ;
770- if user_auth_for_query ( & session, query) . await . is_ok ( ) {
755+ let query = format ! ( "SELECT * from {}" , & alert. stream ) ;
756+ if user_auth_for_query ( & session, & query) . await . is_ok ( ) {
771757 alerts. push ( alert. to_owned ( ) ) ;
772758 }
773759 }
0 commit comments