Skip to content

Commit 55248ec

Browse files
trueleonitisht
andauthored
Refactor Event Ingestion into separate format packages (#359)
* Add EventFormat Trait * Add support for batching array of object * Derive sub schema for event instead of fetching from hashed key * Defer update of schema until sync process * Change alert to use record batch instead * Update stats from bytes and number of objects in event. Signed-off-by: Nitish Tiwari <[email protected]> Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 7f52b5b commit 55248ec

29 files changed

+1777
-816
lines changed

Cargo.lock

Lines changed: 290 additions & 108 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ actix-cors = "0.6"
1313
actix-web-prometheus = { version = "0.1" }
1414
prometheus = { version = "0.13", features = ["process"] }
1515
anyhow = { version = "1.0", features = ["backtrace"] }
16-
arrow-schema = { version = "31.0", features = ["serde"] }
16+
arrow-schema = { version = "34.0.0", features = ["serde"] }
17+
arrow-array = { version = "34.0.0" }
18+
arrow-json = "34.0.0"
19+
arrow-ipc = "34.0.0"
1720
async-trait = "0.1"
1821
base64 = "0.21"
1922
bytes = "1.4"
@@ -29,7 +32,7 @@ clap = { version = "4.1", default-features = false, features = [
2932
"error-context",
3033
] }
3134
crossterm = "0.26"
32-
datafusion = "17"
35+
datafusion = "21.0.0"
3336
object_store = { version = "0.5.6", features = ["aws", "aws_profile"] }
3437
derive_more = "0.99"
3538
env_logger = "0.10"
@@ -60,14 +63,15 @@ tokio = { version = "1.25", default-features = false, features = [
6063
] }
6164
clokwerk = "0.4"
6265
actix-web-static-files = "4.0"
63-
static-files = "0.2"
66+
static-files = "0.2"
6467
ulid = { version = "1.0", features = ["serde"] }
6568
hex = "0.4"
6669
itertools = "0.10"
6770
xxhash-rust = { version = "0.8", features = ["xxh3"] }
6871
xz2 = { version = "*", features=["static"] }
6972
bzip2 = { version = "*", features=["static"] }
7073
once_cell = "1.17.1"
74+
parquet = "34.0.0"
7175
pyroscope = { version = "0.5.3", optional = true }
7276
pyroscope_pprofrs = { version = "0.2", optional = true }
7377
uptime_lib = "0.2.2"

server/src/alerts/mod.rs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
*
1717
*/
1818

19+
use arrow_array::cast::as_string_array;
20+
use arrow_array::RecordBatch;
21+
use arrow_schema::DataType;
1922
use async_trait::async_trait;
23+
use datafusion::arrow::compute::kernels::cast;
2024
use datafusion::arrow::datatypes::Schema;
2125
use regex::Regex;
2226
use serde::{Deserialize, Serialize};
@@ -33,21 +37,21 @@ use crate::{storage, utils};
3337
pub use self::rule::Rule;
3438
use self::target::Target;
3539

36-
#[derive(Default, Debug, Serialize, Deserialize)]
40+
#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
3741
#[serde(rename_all = "camelCase")]
3842
pub struct Alerts {
3943
pub version: AlertVerison,
4044
pub alerts: Vec<Alert>,
4145
}
4246

43-
#[derive(Default, Debug, Serialize, Deserialize)]
47+
#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
4448
#[serde(rename_all = "lowercase")]
4549
pub enum AlertVerison {
4650
#[default]
4751
V1,
4852
}
4953

50-
#[derive(Debug, Serialize, Deserialize)]
54+
#[derive(Debug, serde::Serialize, serde::Deserialize)]
5155
#[serde(rename_all = "camelCase")]
5256
pub struct Alert {
5357
#[serde(default = "crate::utils::uid::gen")]
@@ -60,22 +64,29 @@ pub struct Alert {
6064
}
6165

6266
impl Alert {
63-
pub fn check_alert(&self, stream_name: String, event_json: &serde_json::Value) {
64-
let resolves = self.rule.resolves(event_json);
67+
pub fn check_alert(&self, stream_name: String, events: RecordBatch) {
68+
let resolves = self.rule.resolves(events.clone());
6569

66-
match resolves {
67-
AlertState::Listening | AlertState::Firing => (),
68-
alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => {
69-
let context = self.get_context(stream_name, alert_state, &self.rule, event_json);
70-
ALERTS_STATES
71-
.with_label_values(&[
72-
context.stream.as_str(),
73-
context.alert_info.alert_name.as_str(),
74-
context.alert_info.alert_state.to_string().as_str(),
75-
])
76-
.inc();
77-
for target in &self.targets {
78-
target.call(context.clone());
70+
for (index, state) in resolves.into_iter().enumerate() {
71+
match state {
72+
AlertState::Listening | AlertState::Firing => (),
73+
alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => {
74+
let context = self.get_context(
75+
stream_name.clone(),
76+
alert_state,
77+
&self.rule,
78+
events.slice(index, 1),
79+
);
80+
ALERTS_STATES
81+
.with_label_values(&[
82+
context.stream.as_str(),
83+
context.alert_info.alert_name.as_str(),
84+
context.alert_info.alert_state.to_string().as_str(),
85+
])
86+
.inc();
87+
for target in &self.targets {
88+
target.call(context.clone());
89+
}
7990
}
8091
}
8192
}
@@ -86,7 +97,7 @@ impl Alert {
8697
stream_name: String,
8798
alert_state: AlertState,
8899
rule: &Rule,
89-
event_json: &serde_json::Value,
100+
event_row: RecordBatch,
90101
) -> Context {
91102
let deployment_instance = format!(
92103
"{}://{}",
@@ -104,7 +115,7 @@ impl Alert {
104115
stream_name,
105116
AlertInfo::new(
106117
self.name.clone(),
107-
self.message.get(event_json),
118+
self.message.get(event_row),
108119
rule.trigger_reason(),
109120
alert_state,
110121
),
@@ -144,9 +155,12 @@ impl Message {
144155
}
145156

146157
// returns the message with the column name replaced with the value of the column
147-
fn get(&self, event_json: &serde_json::Value) -> String {
158+
fn get(&self, event: RecordBatch) -> String {
148159
if let Some(column) = self.extract_column_name() {
149-
if let Some(value) = event_json.get(column) {
160+
if let Some(value) = event.column_by_name(column) {
161+
let arr = cast(value, &DataType::Utf8).unwrap();
162+
let value = as_string_array(&arr).value(0);
163+
150164
return self
151165
.message
152166
.replace(&format!("{{{column}}}"), value.to_string().as_str());

0 commit comments

Comments
 (0)