@@ -26,7 +26,7 @@ use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
2626use parquet:: file:: properties:: WriterProperties ;
2727use parquet:: file:: reader:: SerializedFileReader ;
2828use std:: fs;
29- use std:: io:: { BufReader , Cursor , Seek , SeekFrom , Write } ;
29+ use std:: io:: BufReader ;
3030use std:: sync:: Arc ;
3131
3232use crate :: metadata;
@@ -59,49 +59,68 @@ impl Event {
5959 & self ,
6060 storage : & impl ObjectStorage ,
6161 ) -> Result < response:: EventResponse , Error > {
62- let schema = metadata:: STREAM_INFO . schema ( & self . stream_name ) ?;
63- if schema. is_empty ( ) {
64- self . first_event ( storage) . await
62+ let Schema {
63+ arrow_schema,
64+ string_schema,
65+ } = self . infer_schema ( ) . map_err ( |e| {
66+ error ! ( "Failed to infer schema for event. {:?}" , e) ;
67+ e
68+ } ) ?;
69+
70+ let event = self . get_reader ( arrow_schema) ;
71+ let size = self . body_size ( ) ;
72+
73+ let stream_schema = metadata:: STREAM_INFO . schema ( & self . stream_name ) ?;
74+ let is_first_event = stream_schema. is_empty ( ) ;
75+ // if stream schema is empty then it is first event.
76+ let compressed_size = if is_first_event {
77+ // process first event and store schema in obect store
78+ self . process_first_event ( event, string_schema. clone ( ) , storage)
79+ . await ?
6580 } else {
66- self . event ( )
81+ // validate schema before processing the event
82+ if stream_schema != string_schema {
83+ return Err ( Error :: SchemaMismatch ( self . stream_name . clone ( ) ) ) ;
84+ } else {
85+ self . process_event ( event) ?
86+ }
87+ } ;
88+
89+ if let Err ( e) = metadata:: STREAM_INFO . update_stats ( & self . stream_name , size, compressed_size)
90+ {
91+ error ! ( "Couldn't update stream stats. {:?}" , e) ;
6792 }
93+
94+ let msg = if is_first_event {
95+ format ! (
96+ "Intial Event recieved for log stream {}, schema uploaded successfully" ,
97+ & self . stream_name,
98+ )
99+ } else {
100+ format ! ( "Event recieved for log stream {}" , & self . stream_name)
101+ } ;
102+
103+ Ok ( response:: EventResponse { msg } )
68104 }
69105
70106 // This is called when the first event of a log stream is received. The first event is
71107 // special because we parse this event to generate the schema for the log stream. This
72108 // schema is then enforced on rest of the events sent to this log stream.
73- async fn first_event (
109+ async fn process_first_event < R : std :: io :: Read > (
74110 & self ,
111+ mut event : json:: Reader < R > ,
112+ string_schema : String ,
75113 storage : & impl ObjectStorage ,
76- ) -> Result < response:: EventResponse , Error > {
77- let mut c = Cursor :: new ( Vec :: new ( ) ) ;
78- let reader = self . body . as_bytes ( ) ;
79- let size = reader. len ( ) as u64 ;
80-
81- c. write_all ( reader) ?;
82- c. seek ( SeekFrom :: Start ( 0 ) ) ?;
83- let buf_reader = BufReader :: new ( reader) ;
84-
85- let options = json:: reader:: DecoderOptions :: new ( ) . with_batch_size ( 1024 ) ;
86- let mut event = json:: Reader :: new (
87- buf_reader,
88- Arc :: new ( self . infer_schema ( ) ?. arrow_schema ) ,
89- options,
90- ) ;
114+ ) -> Result < u64 , Error > {
91115 let rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
92116
93117 // Store record batch to Parquet file on local cache
94118 let compressed_size = self . convert_arrow_parquet ( rb) ?;
95- if let Err ( e) = metadata:: STREAM_INFO . update_stats ( & self . stream_name , size, compressed_size)
96- {
97- error ! ( "Couldn't update stream stats. {:?}" , e) ;
98- }
99119
100120 // Put the inferred schema to object store
101- let schema = self . infer_schema ( ) ?. string_schema ;
102121 let stream_name = & self . stream_name ;
103122 storage
104- . put_schema ( stream_name. clone ( ) , schema . clone ( ) )
123+ . put_schema ( stream_name. clone ( ) , string_schema . clone ( ) )
105124 . await
106125 . map_err ( |e| response:: EventError {
107126 msg : format ! (
@@ -110,81 +129,42 @@ impl Event {
110129 ) ,
111130 } ) ?;
112131
132+ // set the schema in memory for this stream
113133 metadata:: STREAM_INFO
114- . set_schema ( stream_name. to_string ( ) , schema )
134+ . set_schema ( self . stream_name . clone ( ) , string_schema )
115135 . map_err ( |e| response:: EventError {
116136 msg : format ! (
117137 "Failed to set schema for log stream {} due to err: {}" ,
118- stream_name, e
138+ & self . stream_name, e
119139 ) ,
120140 } ) ?;
121141
122- Ok ( response:: EventResponse {
123- msg : format ! (
124- "Intial Event recieved for log stream {}, schema uploaded successfully" ,
125- self . stream_name
126- ) ,
127- } )
142+ Ok ( compressed_size)
128143 }
129144
130145 // event process all events after the 1st event. Concatenates record batches
131146 // and puts them in memory store for each event.
132- fn event ( & self ) -> Result < response:: EventResponse , Error > {
133- let mut c = Cursor :: new ( Vec :: new ( ) ) ;
134- let reader = self . body . as_bytes ( ) ;
135- let size = reader. len ( ) as u64 ;
136-
137- c. write_all ( reader) ?;
138- c. seek ( SeekFrom :: Start ( 0 ) ) ?;
139-
140- match self . infer_schema ( ) {
141- Ok ( event_schema) => {
142- let options = json:: reader:: DecoderOptions :: new ( ) . with_batch_size ( 1024 ) ;
143- let mut event = json:: Reader :: new (
144- self . body . as_bytes ( ) ,
145- Arc :: new ( event_schema. arrow_schema ) ,
146- options,
147- ) ;
148-
149- // validate schema before attempting to append to parquet file
150- let stream_schema = metadata:: STREAM_INFO . schema ( & self . stream_name ) ?;
151- if stream_schema != event_schema. string_schema {
152- return Err ( Error :: SchemaMismatch ( self . stream_name . clone ( ) ) ) ;
153- }
154-
155- let next_event_rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
156-
157- let compressed_size = match self . convert_parquet_rb_reader ( ) {
158- Ok ( mut arrow_reader) => {
159- let mut total_size = 0 ;
160- let rb = arrow_reader. get_record_reader ( 2048 ) . unwrap ( ) ;
161- for prev_rb in rb {
162- let new_rb = RecordBatch :: concat (
163- & std:: sync:: Arc :: new ( arrow_reader. get_schema ( ) . unwrap ( ) ) ,
164- & [ next_event_rb. clone ( ) , prev_rb. unwrap ( ) ] ,
165- ) ?;
166- total_size += self . convert_arrow_parquet ( new_rb) ?;
167- }
168-
169- total_size
170- }
171- Err ( _) => self . convert_arrow_parquet ( next_event_rb) ?,
172- } ;
173- if let Err ( e) =
174- metadata:: STREAM_INFO . update_stats ( & self . stream_name , size, compressed_size)
175- {
176- error ! ( "Couldn't update stream stats. {:?}" , e) ;
147+ fn process_event < R : std:: io:: Read > ( & self , mut event : json:: Reader < R > ) -> Result < u64 , Error > {
148+ let next_event_rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
149+
150+ let compressed_size = match self . convert_parquet_rb_reader ( ) {
151+ Ok ( mut arrow_reader) => {
152+ let mut total_size = 0 ;
153+ let rb = arrow_reader. get_record_reader ( 2048 ) . unwrap ( ) ;
154+ for prev_rb in rb {
155+ let new_rb = RecordBatch :: concat (
156+ & std:: sync:: Arc :: new ( arrow_reader. get_schema ( ) . unwrap ( ) ) ,
157+ & [ next_event_rb. clone ( ) , prev_rb. unwrap ( ) ] ,
158+ ) ?;
159+ total_size += self . convert_arrow_parquet ( new_rb) ?;
177160 }
178161
179- Ok ( response:: EventResponse {
180- msg : format ! ( "Event recieved for log stream {}" , & self . stream_name) ,
181- } )
182- }
183- Err ( e) => {
184- error ! ( "Failed to infer schema for event. {:?}" , e) ;
185- Err ( e)
162+ total_size
186163 }
187- }
164+ Err ( _) => self . convert_arrow_parquet ( next_event_rb) ?,
165+ } ;
166+
167+ Ok ( compressed_size)
188168 }
189169
190170 // inferSchema is a constructor to Schema
@@ -201,6 +181,18 @@ impl Event {
201181 } )
202182 }
203183
184+ fn get_reader ( & self , arrow_schema : arrow:: datatypes:: Schema ) -> json:: Reader < & [ u8 ] > {
185+ json:: Reader :: new (
186+ self . body . as_bytes ( ) ,
187+ Arc :: new ( arrow_schema) ,
188+ json:: reader:: DecoderOptions :: new ( ) . with_batch_size ( 1024 ) ,
189+ )
190+ }
191+
192+ fn body_size ( & self ) -> u64 {
193+ self . body . as_bytes ( ) . len ( ) as u64
194+ }
195+
204196 // convert arrow record batch to parquet
205197 // and write it to local cache path as a data.parquet file.
206198 fn convert_arrow_parquet ( & self , rb : RecordBatch ) -> Result < u64 , Error > {
0 commit comments