@@ -27,7 +27,7 @@ use once_cell::sync::Lazy;
2727use serde_json:: Error as SerdeError ;
2828use std:: collections:: { HashMap , HashSet } ;
2929use std:: fmt:: { self , Display } ;
30- use tokio:: sync:: oneshot:: { Receiver , Sender } ;
30+ use tokio:: sync:: oneshot:: { self , Receiver , Sender } ;
3131use tokio:: sync:: RwLock ;
3232use tokio:: task:: JoinHandle ;
3333use tracing:: { trace, warn} ;
@@ -36,7 +36,7 @@ use ulid::Ulid;
3636pub mod alerts_utils;
3737pub mod target;
3838
39- use crate :: option :: CONFIG ;
39+ use crate :: parseable :: PARSEABLE ;
4040use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
4141use crate :: rbac:: map:: SessionKey ;
4242use crate :: storage;
@@ -650,8 +650,8 @@ impl AlertConfig {
650650 fn get_context ( & self ) -> Context {
651651 let deployment_instance = format ! (
652652 "{}://{}" ,
653- CONFIG . options. get_scheme( ) ,
654- CONFIG . options. address
653+ PARSEABLE . options. get_scheme( ) ,
654+ PARSEABLE . options. address
655655 ) ;
656656 let deployment_id = storage:: StorageMetadata :: global ( ) . deployment_id ;
657657 let deployment_mode = storage:: StorageMetadata :: global ( ) . mode . to_string ( ) ;
@@ -730,13 +730,20 @@ impl Alerts {
730730 /// Loads alerts from disk, blocks
731731 pub async fn load ( & self ) -> Result < ( ) , AlertError > {
732732 let mut map = self . alerts . write ( ) . await ;
733- let store = CONFIG . storage ( ) . get_object_store ( ) ;
733+ let store = PARSEABLE . storage . get_object_store ( ) ;
734734
735735 for alert in store. get_alerts ( ) . await . unwrap_or_default ( ) {
736- let ( handle, rx, tx) =
737- schedule_alert_task ( alert. get_eval_frequency ( ) , alert. clone ( ) ) . await ?;
738-
739- self . update_task ( alert. id , handle, rx, tx) . await ;
736+ let ( outbox_tx, outbox_rx) = oneshot:: channel :: < ( ) > ( ) ;
737+ let ( inbox_tx, inbox_rx) = oneshot:: channel :: < ( ) > ( ) ;
738+ let handle = schedule_alert_task (
739+ alert. get_eval_frequency ( ) ,
740+ alert. clone ( ) ,
741+ inbox_rx,
742+ outbox_tx,
743+ ) ?;
744+
745+ self . update_task ( alert. id , handle, outbox_rx, inbox_tx)
746+ . await ;
740747
741748 map. insert ( alert. id , alert) ;
742749 }
@@ -785,7 +792,7 @@ impl Alerts {
785792 new_state : AlertState ,
786793 trigger_notif : Option < String > ,
787794 ) -> Result < ( ) , AlertError > {
788- let store = CONFIG . storage ( ) . get_object_store ( ) ;
795+ let store = PARSEABLE . storage . get_object_store ( ) ;
789796
790797 // read and modify alert
791798 let mut alert = self . get_alert_by_id ( alert_id) . await ?;
0 commit comments