Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Event {
let schema_ref = Arc::new(schema);
// validate schema before processing the event
let Ok(mut event) = self.get_record(schema_ref.clone()) else {
return Err(EventError::SchemaMismatch(self.stream_name.clone()));
return Err(EventError::SchemaMismatch);
};

if event
Expand Down Expand Up @@ -312,12 +312,39 @@ impl Event {

fn get_record(&self, schema: Arc<Schema>) -> Result<RecordBatch, EventError> {
let mut iter = std::iter::once(Ok(self.body.clone()));
if fields_mismatch(&schema, &self.body) {
return Err(EventError::SchemaMismatch);
}
let record = Decoder::new(schema, DecoderOptions::new()).next_batch(&mut iter)?;

record.ok_or(EventError::MissingRecord)
}
}

fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
let Ok(field) = schema.field_with_name(name) else { return true };

// datatype check only some basic cases
let valid_datatype = match field.data_type() {
DataType::Boolean => val.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => val.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
val.is_u64()
}
DataType::Float16 | DataType::Float32 | DataType::Float64 => val.is_f64(),
DataType::Utf8 => val.is_string(),
_ => false,
};

if !valid_datatype {
return true;
}
}

false
}

fn replace(
schema: Arc<Schema>,
batch: RecordBatch,
Expand Down Expand Up @@ -390,8 +417,8 @@ pub mod error {
Metadata(#[from] MetadataError),
#[error("Stream Writer Failed: {0}")]
Arrow(#[from] ArrowError),
#[error("Schema Mismatch: {0}")]
SchemaMismatch(String),
#[error("Schema Mismatch")]
SchemaMismatch,
#[error("Schema Mismatch: {0}")]
ObjectStorage(#[from] ObjectStorageError),
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use serde_json::{json, Value};

pub fn flatten_json_body(body: &serde_json::Value) -> Result<Value, serde_json::Error> {
let mut flat_value: Value = json!({});
flatten_json::flatten(body, &mut flat_value, None, true, Some("_")).unwrap();
flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap();
Ok(flat_value)
}

Expand Down