Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11", default_features=false, features=["rustls", "json", "hyper-rustls", "tokio-rustls"]}
rustls = "0.20"
rustls-pemfile = "1.0"
rust-flatten-json = "0.2"
semver = "1.0"
serde = { version = "1.0", features = ["rc"] }
serde_json = "1.0"
Expand Down
15 changes: 4 additions & 11 deletions server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub mod rule;
pub mod target;

use crate::metrics::ALERTS_STATES;
use crate::storage;
use crate::utils::uid;
use crate::CONFIG;
use crate::{storage, utils};

pub use self::rule::Rule;
use self::target::Target;
Expand Down Expand Up @@ -97,16 +97,9 @@ impl Alert {
let deployment_mode = storage::StorageMetadata::global().mode.to_string();
let additional_labels =
serde_json::to_value(rule).expect("rule is perfectly deserializable");
let mut flatten_additional_labels = serde_json::json!({});
flatten_json::flatten(
&additional_labels,
&mut flatten_additional_labels,
Some("rule".to_string()),
false,
Some("_"),
)
.expect("can be flattened");

let flatten_additional_labels =
utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_")
.expect("can be flattened");
Context::new(
stream_name,
AlertInfo::new(
Expand Down
63 changes: 48 additions & 15 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ impl Event {
let stream_name = &self.stream_name;
let schema_key = &self.schema_key;

let old = metadata::STREAM_INFO.merged_schema(stream_name)?;
if Schema::try_merge(vec![old, schema.clone()]).is_err() {
return Err(EventError::SchemaMismatch);
};

commit_schema(stream_name, schema_key, Arc::new(schema))?;
self.process_event(event)
}
Expand Down Expand Up @@ -156,27 +161,55 @@ pub fn get_schema_key(body: &Value) -> String {
fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
let Ok(field) = schema.field_with_name(name) else { return true };

// datatype check only some basic cases
let valid_datatype = match field.data_type() {
DataType::Boolean => val.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => val.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
val.is_u64()
}
DataType::Float16 | DataType::Float32 | DataType::Float64 => val.is_f64(),
DataType::Utf8 => val.is_string(),
_ => false,
};

if !valid_datatype {
if !valid_type(field.data_type(), val) {
return true;
}
}

false
}

fn valid_type(data_type: &DataType, value: &Value) -> bool {
match data_type {
DataType::Boolean => value.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
DataType::Utf8 => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
if let Value::Array(arr) = value {
for elem in arr {
if !valid_type(data_type, elem) {
return false;
}
}
}
true
}
DataType::Struct(fields) => {
if let Value::Object(val) = value {
for (key, value) in val {
let field = (0..fields.len())
.find(|idx| fields[*idx].name() == key)
.map(|idx| &fields[idx]);

if let Some(field) = field {
if !valid_type(field.data_type(), value) {
return false;
}
} else {
return false;
}
}
true
} else {
false
}
}
_ => unreachable!(),
}
}

fn commit_schema(
stream_name: &str,
schema_key: &str,
Expand Down
7 changes: 5 additions & 2 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn push_logs(
Value::Array(array) => {
for mut body in array {
merge(&mut body, tags_n_metadata.clone().into_iter());
let body = flatten_json_body(&body).unwrap();
let body = flatten_json_body(body).map_err(|_| PostError::FlattenError)?;
let schema_key = event::get_schema_key(&body);

let event = event::Event {
Expand All @@ -93,7 +93,7 @@ async fn push_logs(
}
mut body @ Value::Object(_) => {
merge(&mut body, tags_n_metadata.into_iter());
let body = flatten_json_body(&body).unwrap();
let body = flatten_json_body(body).map_err(|_| PostError::FlattenError)?;
let schema_key = event::get_schema_key(&body);
let event = event::Event {
body,
Expand All @@ -117,6 +117,8 @@ pub enum PostError {
Event(#[from] EventError),
#[error("Invalid Request")]
Invalid,
#[error("failed to flatten the json object")]
FlattenError,
#[error("Failed to create stream due to {0}")]
CreateStream(Box<dyn std::error::Error + Send + Sync>),
}
Expand All @@ -128,6 +130,7 @@ impl actix_web::ResponseError for PostError {
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::Invalid => StatusCode::BAD_REQUEST,
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FlattenError => StatusCode::BAD_REQUEST,
}
}

Expand Down
9 changes: 4 additions & 5 deletions server/src/utils/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
*/

use serde_json;
use serde_json::json;
use serde_json::Value;

pub fn flatten_json_body(body: &serde_json::Value) -> Result<Value, serde_json::Error> {
let mut flat_value: Value = json!({});
flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap();
Ok(flat_value)
pub mod flatten;

pub fn flatten_json_body(body: Value) -> Result<Value, ()> {
flatten::flatten(body, "_")
}

pub fn merge(value: &mut Value, fields: impl Iterator<Item = (String, Value)>) {
Expand Down
Loading