2121
2222use anyhow:: anyhow;
2323use arrow_array:: RecordBatch ;
24- use arrow_json:: reader:: { infer_json_schema_from_iterator, Decoder , DecoderOptions } ;
25- use arrow_schema:: { DataType , Field , Schema } ;
24+ use arrow_json:: reader:: { infer_json_schema_from_iterator, ReaderBuilder } ;
25+ use arrow_schema:: { DataType , Field , Fields , Schema } ;
2626use datafusion:: arrow:: util:: bit_util:: round_upto_multiple_of_64;
2727use serde_json:: Value ;
2828use std:: { collections:: HashMap , sync:: Arc } ;
2929
30- use super :: EventFormat ;
30+ use super :: { EventFormat , Metadata , Tags } ;
3131use crate :: utils:: { arrow:: get_field, json:: flatten_json_body} ;
3232
3333pub struct Event {
3434 pub data : Value ,
35- pub tags : String ,
36- pub metadata : String ,
35+ pub tags : Tags ,
36+ pub metadata : Metadata ,
3737}
3838
3939impl EventFormat for Event {
@@ -43,10 +43,9 @@ impl EventFormat for Event {
4343 // also extract the arrow schema, tags and metadata from the incoming json
4444 fn to_data (
4545 self ,
46- schema : & HashMap < String , Field > ,
47- ) -> Result < ( Self :: Data , Schema , bool , String , String ) , anyhow:: Error > {
46+ schema : HashMap < String , Arc < Field > > ,
47+ ) -> Result < ( Self :: Data , Vec < Arc < Field > > , bool , Tags , Metadata ) , anyhow:: Error > {
4848 let data = flatten_json_body ( self . data ) ?;
49-
5049 let stream_schema = schema;
5150
5251 // incoming event may be a single json or a json array
@@ -63,18 +62,18 @@ impl EventFormat for Event {
6362 collect_keys ( value_arr. iter ( ) ) . expect ( "fields can be collected from array of objects" ) ;
6463
6564 let mut is_first = false ;
66- let schema = match derive_arrow_schema ( stream_schema, fields) {
65+ let schema = match derive_arrow_schema ( & stream_schema, fields) {
6766 Ok ( schema) => schema,
6867 Err ( _) => match infer_json_schema_from_iterator ( value_arr. iter ( ) . map ( Ok ) ) {
6968 Ok ( infer_schema) => {
7069 if let Err ( err) = Schema :: try_merge ( vec ! [
71- Schema :: new( stream_schema. values( ) . cloned( ) . collect( ) ) ,
70+ Schema :: new( stream_schema. values( ) . cloned( ) . collect:: < Fields > ( ) ) ,
7271 infer_schema. clone( ) ,
7372 ] ) {
7473 return Err ( anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ;
7574 }
7675 is_first = true ;
77- infer_schema
76+ infer_schema. fields . iter ( ) . cloned ( ) . collect ( )
7877 }
7978 Err ( err) => {
8079 return Err ( anyhow ! (
@@ -100,13 +99,13 @@ impl EventFormat for Event {
10099 // Convert the Data type (defined above) to arrow record batch
101100 fn decode ( data : Self :: Data , schema : Arc < Schema > ) -> Result < RecordBatch , anyhow:: Error > {
102101 let array_capacity = round_upto_multiple_of_64 ( data. len ( ) ) ;
103- let value_iter: & mut ( dyn Iterator < Item = Value > ) = & mut data. into_iter ( ) ;
102+ let mut reader = ReaderBuilder :: new ( schema)
103+ . with_batch_size ( array_capacity)
104+ . with_coerce_primitive ( false )
105+ . build_decoder ( ) ?;
104106
105- let reader = Decoder :: new (
106- schema,
107- DecoderOptions :: new ( ) . with_batch_size ( array_capacity) ,
108- ) ;
109- match reader. next_batch ( & mut value_iter. map ( Ok ) ) {
107+ reader. serialize ( & data) ?;
108+ match reader. flush ( ) {
110109 Ok ( Some ( recordbatch) ) => Ok ( recordbatch) ,
111110 Err ( err) => Err ( anyhow ! ( "Failed to create recordbatch due to {:?}" , err) ) ,
112111 Ok ( None ) => unreachable ! ( "all records are added to one rb" ) ,
@@ -116,14 +115,17 @@ impl EventFormat for Event {
116115
117116// Returns arrow schema with the fields that are present in the request body
118117// This schema is an input to convert the request body to arrow record batch
119- fn derive_arrow_schema ( schema : & HashMap < String , Field > , fields : Vec < & str > ) -> Result < Schema , ( ) > {
118+ fn derive_arrow_schema (
119+ schema : & HashMap < String , Arc < Field > > ,
120+ fields : Vec < & str > ,
121+ ) -> Result < Vec < Arc < Field > > , ( ) > {
120122 let mut res = Vec :: with_capacity ( fields. len ( ) ) ;
121123 let fields = fields. into_iter ( ) . map ( |field_name| schema. get ( field_name) ) ;
122124 for field in fields {
123125 let Some ( field) = field else { return Err ( ( ) ) } ;
124126 res. push ( field. clone ( ) )
125127 }
126- Ok ( Schema :: new ( res) )
128+ Ok ( res)
127129}
128130
129131fn collect_keys < ' a > ( values : impl Iterator < Item = & ' a Value > ) -> Result < Vec < & ' a str > , ( ) > {
@@ -145,7 +147,7 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
145147 Ok ( keys)
146148}
147149
148- fn fields_mismatch ( schema : & Schema , body : & Value ) -> bool {
150+ fn fields_mismatch ( schema : & [ Arc < Field > ] , body : & Value ) -> bool {
149151 for ( name, val) in body. as_object ( ) . expect ( "body is of object variant" ) {
150152 if val. is_null ( ) {
151153 continue ;
0 commit comments