Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 84 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 15 additions & 10 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ actix-cors = "0.6"
actix-web-prometheus = { version = "0.1" }
prometheus = { version = "0.13", features = ["process"] }
anyhow = { version = "1.0", features = ["backtrace"] }
arrow-schema = { version = "36.0.0", features = ["serde"] }
arrow-array = { version = "36.0.0" }
arrow-json = "36.0.0"
arrow-ipc = "36.0.0"
arrow-select = "36.0.0"
arrow-schema = { version = "40.0.0", features = ["serde"] }
arrow-array = { version = "40.0.0" }
arrow-json = "40.0.0"
arrow-ipc = "40.0.0"
arrow-select = "40.0.0"
async-trait = "0.1"
base64 = "0.21"
bytes = "1.4"
Expand All @@ -35,7 +35,7 @@ clap = { version = "4.1", default-features = false, features = [
"error-context",
] }
crossterm = "0.26"
datafusion = "22.0.0"
datafusion = "26.0.0"
object_store = { version = "0.5.6", features = ["aws", "aws_profile"] }
derive_more = "0.99"
env_logger = "0.10"
Expand All @@ -49,7 +49,12 @@ sysinfo = "0.28.4"
hostname = "0.3"
rand = "0.8"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11", default_features=false, features=["rustls", "json", "hyper-rustls", "tokio-rustls"]}
reqwest = { version = "0.11", default_features = false, features = [
"rustls",
"json",
"hyper-rustls",
"tokio-rustls",
] }
rustls = "0.20"
rustls-pemfile = "1.0"
semver = "1.0"
Expand All @@ -70,10 +75,10 @@ ulid = { version = "1.0", features = ["serde"] }
hex = "0.4"
itertools = "0.10"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features=["static"] }
bzip2 = { version = "*", features=["static"] }
xz2 = { version = "*", features = ["static"] }
bzip2 = { version = "*", features = ["static"] }
once_cell = "1.17.1"
parquet = "36.0.0"
parquet = "40.0.0"
pyroscope = { version = "0.5.3", optional = true }
pyroscope_pprofrs = { version = "0.2", optional = true }
uptime_lib = "0.2.2"
Expand Down
2 changes: 1 addition & 1 deletion server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Message {
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
if let Some(col) = column {
return get_field(schema, col).is_some();
return get_field(&schema.fields, col).is_some();
}
true
}
Expand Down
8 changes: 4 additions & 4 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod format;
mod writer;

use arrow_array::RecordBatch;
use arrow_schema::{Field, Schema};
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;

use std::sync::Arc;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Event {
}
}

pub fn get_schema_key(fields: &[Field]) -> String {
pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
// Fields must be sorted
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
for field in fields.iter().sorted_by_key(|v| v.name()) {
Expand All @@ -102,10 +102,10 @@ pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), Event
.get_mut(stream_name)
.expect("map has entry for this stream name")
.schema;
let current_schema = Schema::new(map.values().cloned().collect());
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
map.clear();
map.extend(schema.fields.into_iter().map(|f| (f.name().clone(), f)));
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
Ok(())
}

Expand Down
34 changes: 18 additions & 16 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ pub mod json;

type Tags = String;
type Metadata = String;
type EventSchema = Vec<Arc<Field>>;

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
type Data;

fn to_data(
self,
schema: &HashMap<String, Field>,
) -> Result<(Self::Data, Schema, bool, Tags, Metadata), AnyError>;
schema: HashMap<String, Arc<Field>>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(
self,
schema: &HashMap<String, Field>,
schema: HashMap<String, Arc<Field>>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;

Expand All @@ -67,36 +69,36 @@ pub trait EventFormat: Sized {
};

// add the p_timestamp field to the event schema to the 0th index
schema.fields.insert(
schema.insert(
0,
Field::new(
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
)),
);

// p_tags and p_metadata are added to the end of the schema
let tags_index = schema.fields.len();
let tags_index = schema.len();
let metadata_index = tags_index + 1;
schema
.fields
.push(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));
schema
.fields
.push(Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true));
schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)));
schema.push(Arc::new(Field::new(
DEFAULT_METADATA_KEY,
DataType::Utf8,
true,
)));

// prepare the record batch and new fields to be added
let schema_ref = Arc::new(schema);
let rb = Self::decode(data, Arc::clone(&schema_ref))?;
let schema = Arc::new(Schema::new(schema));
let rb = Self::decode(data, schema.clone())?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
let timestamp_array = get_timestamp_array(rb.num_rows());

// modify the record batch to add fields to respective indexes
let rb = utils::arrow::replace_columns(
Arc::clone(&schema_ref),
Arc::clone(&schema),
rb,
&[0, tags_index, metadata_index],
&[
Expand Down
42 changes: 22 additions & 20 deletions server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@

use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions};
use arrow_schema::{DataType, Field, Schema};
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
use arrow_schema::{DataType, Field, Fields, Schema};
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};

use super::EventFormat;
use super::{EventFormat, Metadata, Tags};
use crate::utils::{arrow::get_field, json::flatten_json_body};

pub struct Event {
pub data: Value,
pub tags: String,
pub metadata: String,
pub tags: Tags,
pub metadata: Metadata,
}

impl EventFormat for Event {
Expand All @@ -43,10 +43,9 @@ impl EventFormat for Event {
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
self,
schema: &HashMap<String, Field>,
) -> Result<(Self::Data, Schema, bool, String, String), anyhow::Error> {
schema: HashMap<String, Arc<Field>>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data)?;

let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -63,18 +62,18 @@ impl EventFormat for Event {
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");

let mut is_first = false;
let schema = match derive_arrow_schema(stream_schema, fields) {
let schema = match derive_arrow_schema(&stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(infer_schema) => {
if let Err(err) = Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect()),
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
]) {
return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err));
}
is_first = true;
infer_schema
infer_schema.fields.iter().cloned().collect()
}
Err(err) => {
return Err(anyhow!(
Expand All @@ -100,13 +99,13 @@ impl EventFormat for Event {
// Convert the Data type (defined above) to arrow record batch
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, anyhow::Error> {
let array_capacity = round_upto_multiple_of_64(data.len());
let value_iter: &mut (dyn Iterator<Item = Value>) = &mut data.into_iter();
let mut reader = ReaderBuilder::new(schema)
.with_batch_size(array_capacity)
.with_coerce_primitive(false)
.build_decoder()?;

let reader = Decoder::new(
schema,
DecoderOptions::new().with_batch_size(array_capacity),
);
match reader.next_batch(&mut value_iter.map(Ok)) {
reader.serialize(&data)?;
match reader.flush() {
Ok(Some(recordbatch)) => Ok(recordbatch),
Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)),
Ok(None) => unreachable!("all records are added to one rb"),
Expand All @@ -116,14 +115,17 @@ impl EventFormat for Event {

// Returns arrow schema with the fields that are present in the request body
// This schema is an input to convert the request body to arrow record batch
fn derive_arrow_schema(schema: &HashMap<String, Field>, fields: Vec<&str>) -> Result<Schema, ()> {
fn derive_arrow_schema(
schema: &HashMap<String, Arc<Field>>,
fields: Vec<&str>,
) -> Result<Vec<Arc<Field>>, ()> {
let mut res = Vec::with_capacity(fields.len());
let fields = fields.into_iter().map(|field_name| schema.get(field_name));
for field in fields {
let Some(field) = field else { return Err(()) };
res.push(field.clone())
}
Ok(Schema::new(res))
Ok(res)
}

fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a str>, ()> {
Expand All @@ -145,7 +147,7 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
Ok(keys)
}

fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
fn fields_mismatch(schema: &[Arc<Field>], body: &Value) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
if val.is_null() {
continue;
Expand Down
Loading