|  | 
|  | 1 | +/* | 
|  | 2 | + * Parseable Server (C) 2022 - 2024 Parseable, Inc. | 
|  | 3 | + * | 
|  | 4 | + * This program is free software: you can redistribute it and/or modify | 
|  | 5 | + * it under the terms of the GNU Affero General Public License as | 
|  | 6 | + * published by the Free Software Foundation, either version 3 of the | 
|  | 7 | + * License, or (at your option) any later version. | 
|  | 8 | + * | 
|  | 9 | + * This program is distributed in the hope that it will be useful, | 
|  | 10 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of | 
|  | 11 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
|  | 12 | + * GNU Affero General Public License for more details. | 
|  | 13 | + * | 
|  | 14 | + * You should have received a copy of the GNU Affero General Public License | 
|  | 15 | + * along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
|  | 16 | + * | 
|  | 17 | + */ | 
|  | 18 | + | 
|  | 19 | +use std::time::Duration; | 
|  | 20 | + | 
|  | 21 | +use chrono::{DateTime, Utc}; | 
|  | 22 | +use tonic::async_trait; | 
|  | 23 | +use ulid::Ulid; | 
|  | 24 | + | 
|  | 25 | +use crate::{ | 
|  | 26 | +    alerts::{ | 
|  | 27 | +        AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, | 
|  | 28 | +        ThresholdConfig, | 
|  | 29 | +        alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, | 
|  | 30 | +        is_query_aggregate, | 
|  | 31 | +        target::{self, TARGETS}, | 
|  | 32 | +        traits::AlertTrait, | 
|  | 33 | +    }, | 
|  | 34 | +    handlers::http::query::create_streams_for_distributed, | 
|  | 35 | +    option::Mode, | 
|  | 36 | +    parseable::PARSEABLE, | 
|  | 37 | +    query::resolve_stream_names, | 
|  | 38 | +    rbac::map::SessionKey, | 
|  | 39 | +    utils::user_auth_for_query, | 
|  | 40 | +}; | 
|  | 41 | + | 
|  | 42 | +/// Struct which defines the threshold type alerts | 
|  | 43 | +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] | 
|  | 44 | +pub struct ThresholdAlert { | 
|  | 45 | +    pub version: AlertVersion, | 
|  | 46 | +    #[serde(default)] | 
|  | 47 | +    pub id: Ulid, | 
|  | 48 | +    pub severity: Severity, | 
|  | 49 | +    pub title: String, | 
|  | 50 | +    pub query: String, | 
|  | 51 | +    pub alert_type: AlertType, | 
|  | 52 | +    pub threshold_config: ThresholdConfig, | 
|  | 53 | +    pub eval_config: EvalConfig, | 
|  | 54 | +    pub targets: Vec<Ulid>, | 
|  | 55 | +    // for new alerts, state should be resolved | 
|  | 56 | +    #[serde(default)] | 
|  | 57 | +    pub state: AlertState, | 
|  | 58 | +    pub created: DateTime<Utc>, | 
|  | 59 | +    pub tags: Option<Vec<String>>, | 
|  | 60 | +    pub datasets: Vec<String>, | 
|  | 61 | +} | 
|  | 62 | + | 
|  | 63 | +#[async_trait] | 
|  | 64 | +impl AlertTrait for ThresholdAlert { | 
|  | 65 | +    async fn eval_alert(&self) -> Result<(bool, f64), AlertError> { | 
|  | 66 | +        let time_range = extract_time_range(&self.eval_config)?; | 
|  | 67 | +        let final_value = execute_alert_query(self.get_query(), &time_range).await?; | 
|  | 68 | +        let result = evaluate_condition( | 
|  | 69 | +            &self.threshold_config.operator, | 
|  | 70 | +            final_value, | 
|  | 71 | +            self.threshold_config.value, | 
|  | 72 | +        ); | 
|  | 73 | +        Ok((result, final_value)) | 
|  | 74 | +    } | 
|  | 75 | + | 
|  | 76 | +    async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> { | 
|  | 77 | +        // validate alert type | 
|  | 78 | +        // Anomaly is only allowed in Prism | 
|  | 79 | +        if self.alert_type.eq(&AlertType::Anomaly) && PARSEABLE.options.mode != Mode::Prism { | 
|  | 80 | +            return Err(AlertError::CustomError( | 
|  | 81 | +                "Anomaly alert is only allowed on Prism mode".into(), | 
|  | 82 | +            )); | 
|  | 83 | +        } | 
|  | 84 | + | 
|  | 85 | +        // validate evalType | 
|  | 86 | +        let eval_frequency = match &self.eval_config { | 
|  | 87 | +            EvalConfig::RollingWindow(rolling_window) => { | 
|  | 88 | +                if humantime::parse_duration(&rolling_window.eval_start).is_err() { | 
|  | 89 | +                    return Err(AlertError::Metadata( | 
|  | 90 | +                        "evalStart should be of type humantime", | 
|  | 91 | +                    )); | 
|  | 92 | +                } | 
|  | 93 | +                rolling_window.eval_frequency | 
|  | 94 | +            } | 
|  | 95 | +        }; | 
|  | 96 | + | 
|  | 97 | +        // validate that target repeat notifs !> eval_frequency | 
|  | 98 | +        for target_id in &self.targets { | 
|  | 99 | +            let target = TARGETS.get_target_by_id(target_id).await?; | 
|  | 100 | +            match &target.notification_config.times { | 
|  | 101 | +                target::Retry::Infinite => {} | 
|  | 102 | +                target::Retry::Finite(repeat) => { | 
|  | 103 | +                    let notif_duration = | 
|  | 104 | +                        Duration::from_secs(60 * target.notification_config.interval) | 
|  | 105 | +                            * *repeat as u32; | 
|  | 106 | +                    if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { | 
|  | 107 | +                        return Err(AlertError::Metadata( | 
|  | 108 | +                            "evalFrequency should be greater than target repetition  interval", | 
|  | 109 | +                        )); | 
|  | 110 | +                    } | 
|  | 111 | +                } | 
|  | 112 | +            } | 
|  | 113 | +        } | 
|  | 114 | + | 
|  | 115 | +        // validate that the query is valid | 
|  | 116 | +        if self.query.is_empty() { | 
|  | 117 | +            return Err(AlertError::InvalidAlertQuery); | 
|  | 118 | +        } | 
|  | 119 | + | 
|  | 120 | +        let tables = resolve_stream_names(&self.query)?; | 
|  | 121 | +        if tables.is_empty() { | 
|  | 122 | +            return Err(AlertError::InvalidAlertQuery); | 
|  | 123 | +        } | 
|  | 124 | +        create_streams_for_distributed(tables) | 
|  | 125 | +            .await | 
|  | 126 | +            .map_err(|_| AlertError::InvalidAlertQuery)?; | 
|  | 127 | + | 
|  | 128 | +        // validate that the user has access to the tables mentioned in the query | 
|  | 129 | +        user_auth_for_query(session_key, &self.query).await?; | 
|  | 130 | + | 
|  | 131 | +        // validate that the alert query is valid and can be evaluated | 
|  | 132 | +        if !is_query_aggregate(&self.query).await? { | 
|  | 133 | +            return Err(AlertError::InvalidAlertQuery); | 
|  | 134 | +        } | 
|  | 135 | +        Ok(()) | 
|  | 136 | +    } | 
|  | 137 | + | 
|  | 138 | +    fn get_id(&self) -> &Ulid { | 
|  | 139 | +        &self.id | 
|  | 140 | +    } | 
|  | 141 | + | 
|  | 142 | +    fn get_query(&self) -> &str { | 
|  | 143 | +        &self.query | 
|  | 144 | +    } | 
|  | 145 | + | 
|  | 146 | +    fn get_severity(&self) -> &Severity { | 
|  | 147 | +        &self.severity | 
|  | 148 | +    } | 
|  | 149 | + | 
|  | 150 | +    fn get_title(&self) -> &str { | 
|  | 151 | +        &self.title | 
|  | 152 | +    } | 
|  | 153 | + | 
|  | 154 | +    fn get_alert_type(&self) -> &AlertType { | 
|  | 155 | +        &self.alert_type | 
|  | 156 | +    } | 
|  | 157 | + | 
|  | 158 | +    fn get_threshold_config(&self) -> &ThresholdConfig { | 
|  | 159 | +        &self.threshold_config | 
|  | 160 | +    } | 
|  | 161 | + | 
|  | 162 | +    fn get_eval_config(&self) -> &EvalConfig { | 
|  | 163 | +        &self.eval_config | 
|  | 164 | +    } | 
|  | 165 | + | 
|  | 166 | +    fn get_targets(&self) -> &Vec<Ulid> { | 
|  | 167 | +        &self.targets | 
|  | 168 | +    } | 
|  | 169 | + | 
|  | 170 | +    fn get_state(&self) -> &AlertState { | 
|  | 171 | +        &self.state | 
|  | 172 | +    } | 
|  | 173 | + | 
|  | 174 | +    fn get_eval_frequency(&self) -> u64 { | 
|  | 175 | +        match &self.eval_config { | 
|  | 176 | +            EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, | 
|  | 177 | +        } | 
|  | 178 | +    } | 
|  | 179 | + | 
|  | 180 | +    fn get_eval_window(&self) -> String { | 
|  | 181 | +        match &self.eval_config { | 
|  | 182 | +            EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_start.clone(), | 
|  | 183 | +        } | 
|  | 184 | +    } | 
|  | 185 | + | 
|  | 186 | +    fn get_created(&self) -> String { | 
|  | 187 | +        self.created.to_string() | 
|  | 188 | +    } | 
|  | 189 | + | 
|  | 190 | +    fn get_tags(&self) -> &Option<Vec<String>> { | 
|  | 191 | +        &self.tags | 
|  | 192 | +    } | 
|  | 193 | + | 
|  | 194 | +    fn get_datasets(&self) -> &Vec<String> { | 
|  | 195 | +        &self.datasets | 
|  | 196 | +    } | 
|  | 197 | + | 
|  | 198 | +    fn to_alert_config(&self) -> AlertConfig { | 
|  | 199 | +        let clone = self.clone(); | 
|  | 200 | +        clone.into() | 
|  | 201 | +    } | 
|  | 202 | + | 
|  | 203 | +    fn clone_box(&self) -> Box<dyn AlertTrait> { | 
|  | 204 | +        Box::new(self.clone()) | 
|  | 205 | +    } | 
|  | 206 | + | 
|  | 207 | +    fn set_state(&mut self, new_state: AlertState) { | 
|  | 208 | +        self.state = new_state | 
|  | 209 | +    } | 
|  | 210 | +} | 
|  | 211 | + | 
|  | 212 | +impl From<AlertConfig> for ThresholdAlert { | 
|  | 213 | +    fn from(value: AlertConfig) -> Self { | 
|  | 214 | +        Self { | 
|  | 215 | +            version: value.version, | 
|  | 216 | +            id: value.id, | 
|  | 217 | +            severity: value.severity, | 
|  | 218 | +            title: value.title, | 
|  | 219 | +            query: value.query, | 
|  | 220 | +            alert_type: value.alert_type, | 
|  | 221 | +            threshold_config: value.threshold_config, | 
|  | 222 | +            eval_config: value.eval_config, | 
|  | 223 | +            targets: value.targets, | 
|  | 224 | +            state: value.state, | 
|  | 225 | +            created: value.created, | 
|  | 226 | +            tags: value.tags, | 
|  | 227 | +            datasets: value.datasets, | 
|  | 228 | +        } | 
|  | 229 | +    } | 
|  | 230 | +} | 
|  | 231 | + | 
|  | 232 | +impl From<ThresholdAlert> for AlertConfig { | 
|  | 233 | +    fn from(val: ThresholdAlert) -> Self { | 
|  | 234 | +        AlertConfig { | 
|  | 235 | +            version: val.version, | 
|  | 236 | +            id: val.id, | 
|  | 237 | +            severity: val.severity, | 
|  | 238 | +            title: val.title, | 
|  | 239 | +            query: val.query, | 
|  | 240 | +            alert_type: val.alert_type, | 
|  | 241 | +            threshold_config: val.threshold_config, | 
|  | 242 | +            eval_config: val.eval_config, | 
|  | 243 | +            targets: val.targets, | 
|  | 244 | +            state: val.state, | 
|  | 245 | +            created: val.created, | 
|  | 246 | +            tags: val.tags, | 
|  | 247 | +            datasets: val.datasets, | 
|  | 248 | +        } | 
|  | 249 | +    } | 
|  | 250 | +} | 
0 commit comments