From 11c46042cae44ac0449816db0e0d1a1bba0f8106 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 5 Jan 2023 15:08:13 +0530 Subject: [PATCH 1/5] Add schema mismatch check --- server/src/event.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 7bf8b2a78..d5b6ffa25 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -28,7 +28,7 @@ use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, use datafusion::arrow::record_batch::RecordBatch; use lazy_static::lazy_static; use serde_json::Value; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::OpenOptions; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -179,7 +179,7 @@ impl Event { let schema_ref = Arc::new(schema); // validate schema before processing the event let Ok(mut event) = self.get_record(schema_ref.clone()) else { - return Err(EventError::SchemaMismatch(self.stream_name.clone())); + return Err(EventError::SchemaMismatch); }; if event @@ -312,12 +312,28 @@ impl Event { fn get_record(&self, schema: Arc) -> Result { let mut iter = std::iter::once(Ok(self.body.clone())); + if fields_mismatch(&schema, &self.body) { + return Err(EventError::SchemaMismatch); + } let record = Decoder::new(schema, DecoderOptions::new()).next_batch(&mut iter)?; record.ok_or(EventError::MissingRecord) } } +fn fields_mismatch(schema: &Schema, body: &Value) -> bool { + let field: HashSet<&str> = schema + .fields() + .into_iter() + .map(|f| f.name().as_str()) + .collect(); + + body.as_object() + .expect("body is of object variant") + .keys() + .any(|key| !field.contains(key.as_str())) +} + fn replace( schema: Arc, batch: RecordBatch, @@ -390,8 +406,8 @@ pub mod error { Metadata(#[from] MetadataError), #[error("Stream Writer Failed: {0}")] Arrow(#[from] ArrowError), - #[error("Schema Mismatch: {0}")] - SchemaMismatch(String), + #[error("Schema Mismatch")] + SchemaMismatch, #[error("Schema Mismatch: {0}")] ObjectStorage(#[from] ObjectStorageError), } From bb2c2b215b22397194fabb0b0b2e851eb798c0ab Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 5 Jan 2023 15:32:43 +0530 Subject: [PATCH 2/5] Fix --- server/src/event.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index d5b6ffa25..ac932856a 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -322,11 +322,7 @@ impl Event { } fn fields_mismatch(schema: &Schema, body: &Value) -> bool { - let field: HashSet<&str> = schema - .fields() - .into_iter() - .map(|f| f.name().as_str()) - .collect(); + let field: HashSet<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); body.as_object() .expect("body is of object variant") From 9d782b8072451d58d161f3072ca210413a8b6a05 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 7 Jan 2023 13:11:53 +0530 Subject: [PATCH 3/5] check Datatype --- server/src/event.rs | 27 +++++++++++++++++++++------ server/src/utils.rs | 2 +- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index ac932856a..c619ae1ab 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -28,7 +28,7 @@ use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, use datafusion::arrow::record_batch::RecordBatch; use lazy_static::lazy_static; use serde_json::Value; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fs::OpenOptions; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -322,12 +322,27 @@ impl Event { } fn fields_mismatch(schema: &Schema, body: &Value) -> bool { - let field: HashSet<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + for (name, val) in body.as_object().expect("body is of object variant") { + let Ok( field) = schema.field_with_name(&name) else { return false }; + + // datatype check only some basic cases + let valid_datatype = match field.data_type() { + DataType::Boolean => val.is_boolean(), + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => val.is_i64(), + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { + val.is_u64() + } + DataType::Float16 | DataType::Float32 | DataType::Float64 => val.is_f64(), + DataType::Utf8 => val.is_string(), + _ => false, + }; + + if !valid_datatype { + return true; + } + } - body.as_object() - .expect("body is of object variant") - .keys() - .any(|key| !field.contains(key.as_str())) + false } fn replace( diff --git a/server/src/utils.rs b/server/src/utils.rs index 97b800a73..cabd71ff9 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -21,7 +21,7 @@ use serde_json::{json, Value}; pub fn flatten_json_body(body: &serde_json::Value) -> Result { let mut flat_value: Value = json!({}); - flatten_json::flatten(body, &mut flat_value, None, true, Some("_")).unwrap(); + flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap(); Ok(flat_value) } From 4addf8e43cdd50ddf08b32f76dd5255bd3697323 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 7 Jan 2023 13:19:31 +0530 Subject: [PATCH 4/5] Fix --- server/src/event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/event.rs b/server/src/event.rs index c619ae1ab..c888a6ddc 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -323,7 +323,7 @@ impl Event { fn fields_mismatch(schema: &Schema, body: &Value) -> bool { for (name, val) in body.as_object().expect("body is of object variant") { - let Ok( field) = schema.field_with_name(&name) else { return false }; + let Ok(field) = schema.field_with_name(name) else { return false }; // datatype check only some basic cases let valid_datatype = match field.data_type() { From 08bab764de77a51ba48f6868eb4b957e73e19b71 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 7 Jan 2023 15:33:39 +0530 Subject: [PATCH 5/5] Fix --- server/src/event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/event.rs b/server/src/event.rs index c888a6ddc..1dcd10bf0 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -323,7 +323,7 @@ impl Event { fn fields_mismatch(schema: &Schema, body: &Value) -> bool { for (name, val) in body.as_object().expect("body is of object variant") { - let Ok(field) = schema.field_with_name(name) else { return false }; + let Ok(field) = schema.field_with_name(name) else { return true }; // datatype check only some basic cases let valid_datatype = match field.data_type() {