@@ -30,7 +30,10 @@ use std::{collections::HashMap, sync::Arc};
3030use tracing:: error;
3131
3232use super :: { EventFormat , Metadata , Tags } ;
33- use crate :: utils:: { arrow:: get_field, json:: flatten_json_body} ;
33+ use crate :: {
34+ metadata:: SchemaVersion ,
35+ utils:: { arrow:: get_field, json:: flatten_json_body} ,
36+ } ;
3437
3538pub struct Event {
3639 pub data : Value ,
@@ -48,8 +51,9 @@ impl EventFormat for Event {
4851 schema : & HashMap < String , Arc < Field > > ,
4952 static_schema_flag : Option < & String > ,
5053 time_partition : Option < & String > ,
54+ schema_version : SchemaVersion ,
5155 ) -> Result < ( Self :: Data , Vec < Arc < Field > > , bool , Tags , Metadata ) , anyhow:: Error > {
52- let data = flatten_json_body ( & self . data , None , None , None , false ) ?;
56+ let data = flatten_json_body ( self . data , None , None , None , schema_version , false ) ?;
5357 let stream_schema = schema;
5458
5559 // incoming event may be a single json or a json array
@@ -68,43 +72,38 @@ impl EventFormat for Event {
6872 let mut is_first = false ;
6973 let schema = match derive_arrow_schema ( stream_schema, fields) {
7074 Ok ( schema) => schema,
71- Err ( _) => match infer_json_schema_from_iterator ( value_arr. iter ( ) . map ( Ok ) ) {
72- Ok ( mut infer_schema) => {
73- let new_infer_schema = super :: super :: format:: update_field_type_in_schema (
74- Arc :: new ( infer_schema) ,
75- Some ( stream_schema) ,
76- time_partition,
77- Some ( & value_arr) ,
78- ) ;
79- infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
80- if let Err ( err) = Schema :: try_merge ( vec ! [
81- Schema :: new( stream_schema. values( ) . cloned( ) . collect:: <Fields >( ) ) ,
82- infer_schema. clone( ) ,
83- ] ) {
84- return Err ( anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ;
85- }
86- is_first = true ;
87- infer_schema
88- . fields
89- . iter ( )
90- . filter ( |field| !field. data_type ( ) . is_null ( ) )
91- . cloned ( )
92- . sorted_by ( |a, b| a. name ( ) . cmp ( b. name ( ) ) )
93- . collect ( )
94- }
95- Err ( err) => {
96- return Err ( anyhow ! (
97- "Could not infer schema for this event due to err {:?}" ,
98- err
99- ) )
100- }
101- } ,
75+ Err ( _) => {
76+ let mut infer_schema = infer_json_schema_from_iterator ( value_arr. iter ( ) . map ( Ok ) )
77+ . map_err ( |err| {
78+ anyhow ! ( "Could not infer schema for this event due to err {:?}" , err)
79+ } ) ?;
80+ let new_infer_schema = super :: update_field_type_in_schema (
81+ Arc :: new ( infer_schema) ,
82+ Some ( stream_schema) ,
83+ time_partition,
84+ Some ( & value_arr) ,
85+ schema_version,
86+ ) ;
87+ infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
88+ Schema :: try_merge ( vec ! [
89+ Schema :: new( stream_schema. values( ) . cloned( ) . collect:: <Fields >( ) ) ,
90+ infer_schema. clone( ) ,
91+ ] ) . map_err ( |err| anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ?;
92+ is_first = true ;
93+ infer_schema
94+ . fields
95+ . iter ( )
96+ . filter ( |field| !field. data_type ( ) . is_null ( ) )
97+ . cloned ( )
98+ . sorted_by ( |a, b| a. name ( ) . cmp ( b. name ( ) ) )
99+ . collect ( )
100+ }
102101 } ;
103102
104103 if static_schema_flag. is_none ( )
105104 && value_arr
106105 . iter ( )
107- . any ( |value| fields_mismatch ( & schema, value) )
106+ . any ( |value| fields_mismatch ( & schema, value, schema_version ) )
108107 {
109108 return Err ( anyhow ! (
110109 "Could not process this event due to mismatch in datatype"
@@ -165,27 +164,30 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
165164 Ok ( keys)
166165}
167166
168- fn fields_mismatch ( schema : & [ Arc < Field > ] , body : & Value ) -> bool {
167+ fn fields_mismatch ( schema : & [ Arc < Field > ] , body : & Value , schema_version : SchemaVersion ) -> bool {
169168 for ( name, val) in body. as_object ( ) . expect ( "body is of object variant" ) {
170169 if val. is_null ( ) {
171170 continue ;
172171 }
173172 let Some ( field) = get_field ( schema, name) else {
174173 return true ;
175174 } ;
176- if !valid_type ( field. data_type ( ) , val) {
175+ if !valid_type ( field. data_type ( ) , val, schema_version ) {
177176 return true ;
178177 }
179178 }
180179 false
181180}
182181
183- fn valid_type ( data_type : & DataType , value : & Value ) -> bool {
182+ fn valid_type ( data_type : & DataType , value : & Value , schema_version : SchemaVersion ) -> bool {
184183 match data_type {
185184 DataType :: Boolean => value. is_boolean ( ) ,
186185 DataType :: Int8 | DataType :: Int16 | DataType :: Int32 | DataType :: Int64 => value. is_i64 ( ) ,
187186 DataType :: UInt8 | DataType :: UInt16 | DataType :: UInt32 | DataType :: UInt64 => value. is_u64 ( ) ,
188- DataType :: Float16 | DataType :: Float32 | DataType :: Float64 => value. is_f64 ( ) ,
187+ DataType :: Float16 | DataType :: Float32 => value. is_f64 ( ) ,
188+ // All numbers can be cast as Float64 from schema version v1
189+ DataType :: Float64 if schema_version == SchemaVersion :: V1 => value. is_number ( ) ,
190+ DataType :: Float64 if schema_version != SchemaVersion :: V1 => value. is_f64 ( ) ,
189191 DataType :: Utf8 => value. is_string ( ) ,
190192 DataType :: List ( field) => {
191193 let data_type = field. data_type ( ) ;
@@ -194,7 +196,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
194196 if elem. is_null ( ) {
195197 continue ;
196198 }
197- if !valid_type ( data_type, elem) {
199+ if !valid_type ( data_type, elem, schema_version ) {
198200 return false ;
199201 }
200202 }
@@ -212,7 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
212214 if value. is_null ( ) {
213215 continue ;
214216 }
215- if !valid_type ( field. data_type ( ) , value) {
217+ if !valid_type ( field. data_type ( ) , value, schema_version ) {
216218 return false ;
217219 }
218220 } else {
0 commit comments