@@ -72,38 +72,32 @@ impl EventFormat for Event {
7272 let mut is_first = false ;
7373 let schema = match derive_arrow_schema ( stream_schema, fields) {
7474 Ok ( schema) => schema,
75- Err ( _) => match infer_json_schema_from_iterator ( value_arr. iter ( ) . map ( Ok ) ) {
76- Ok ( mut infer_schema) => {
77- let new_infer_schema = super :: update_field_type_in_schema (
78- Arc :: new ( infer_schema) ,
79- Some ( stream_schema) ,
80- time_partition,
81- Some ( & value_arr) ,
82- schema_version,
83- ) ;
84- infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
85- if let Err ( err) = Schema :: try_merge ( vec ! [
86- Schema :: new( stream_schema. values( ) . cloned( ) . collect:: <Fields >( ) ) ,
87- infer_schema. clone( ) ,
88- ] ) {
89- return Err ( anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ;
90- }
91- is_first = true ;
92- infer_schema
93- . fields
94- . iter ( )
95- . filter ( |field| !field. data_type ( ) . is_null ( ) )
96- . cloned ( )
97- . sorted_by ( |a, b| a. name ( ) . cmp ( b. name ( ) ) )
98- . collect ( )
99- }
100- Err ( err) => {
101- return Err ( anyhow ! (
102- "Could not infer schema for this event due to err {:?}" ,
103- err
104- ) )
105- }
106- } ,
75+ Err ( _) => {
76+ let mut infer_schema = infer_json_schema_from_iterator ( value_arr. iter ( ) . map ( Ok ) )
77+ . map_err ( |err| {
78+ anyhow ! ( "Could not infer schema for this event due to err {:?}" , err)
79+ } ) ?;
80+ let new_infer_schema = super :: update_field_type_in_schema (
81+ Arc :: new ( infer_schema) ,
82+ Some ( stream_schema) ,
83+ time_partition,
84+ Some ( & value_arr) ,
85+ schema_version,
86+ ) ;
87+ infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
88+ Schema :: try_merge ( vec ! [
89+ Schema :: new( stream_schema. values( ) . cloned( ) . collect:: <Fields >( ) ) ,
90+ infer_schema. clone( ) ,
91+ ] ) . map_err ( |err| anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ?;
92+ is_first = true ;
93+ infer_schema
94+ . fields
95+ . iter ( )
96+ . filter ( |field| !field. data_type ( ) . is_null ( ) )
97+ . cloned ( )
98+ . sorted_by ( |a, b| a. name ( ) . cmp ( b. name ( ) ) )
99+ . collect ( )
100+ }
107101 } ;
108102
109103 if static_schema_flag. is_none ( )
0 commit comments