diff --git a/server/src/event.rs b/server/src/event.rs index 7bf8b2a78..1dcd10bf0 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -179,7 +179,7 @@ impl Event { let schema_ref = Arc::new(schema); // validate schema before processing the event let Ok(mut event) = self.get_record(schema_ref.clone()) else { - return Err(EventError::SchemaMismatch(self.stream_name.clone())); + return Err(EventError::SchemaMismatch); }; if event @@ -312,12 +312,39 @@ impl Event { fn get_record(&self, schema: Arc) -> Result { let mut iter = std::iter::once(Ok(self.body.clone())); + if fields_mismatch(&schema, &self.body) { + return Err(EventError::SchemaMismatch); + } let record = Decoder::new(schema, DecoderOptions::new()).next_batch(&mut iter)?; record.ok_or(EventError::MissingRecord) } } +fn fields_mismatch(schema: &Schema, body: &Value) -> bool { + for (name, val) in body.as_object().expect("body is of object variant") { + let Ok(field) = schema.field_with_name(name) else { return true }; + + // datatype check only some basic cases + let valid_datatype = match field.data_type() { + DataType::Boolean => val.is_boolean(), + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => val.is_i64(), + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { + val.is_u64() + } + DataType::Float16 | DataType::Float32 | DataType::Float64 => val.is_f64(), + DataType::Utf8 => val.is_string(), + _ => false, + }; + + if !valid_datatype { + return true; + } + } + + false +} + fn replace( schema: Arc, batch: RecordBatch, @@ -390,8 +417,8 @@ pub mod error { Metadata(#[from] MetadataError), #[error("Stream Writer Failed: {0}")] Arrow(#[from] ArrowError), - #[error("Schema Mismatch: {0}")] - SchemaMismatch(String), + #[error("Schema Mismatch")] + SchemaMismatch, #[error("Schema Mismatch: {0}")] ObjectStorage(#[from] ObjectStorageError), } diff --git a/server/src/utils.rs b/server/src/utils.rs index 97b800a73..cabd71ff9 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -21,7 +21,7 @@ use serde_json::{json, Value}; pub fn flatten_json_body(body: &serde_json::Value) -> Result { let mut flat_value: Value = json!({}); - flatten_json::flatten(body, &mut flat_value, None, true, Some("_")).unwrap(); + flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap(); Ok(flat_value) }