@@ -23,13 +23,12 @@ use anyhow::anyhow;
2323use arrow_schema:: Field ;
2424use bytes:: Bytes ;
2525use chrono:: { DateTime , NaiveDateTime , Utc } ;
26- use itertools:: Itertools ;
2726use serde_json:: Value ;
2827
2928use crate :: {
3029 event:: {
30+ self ,
3131 format:: { self , EventFormat } ,
32- Event ,
3332 } ,
3433 handlers:: {
3534 http:: { ingest:: PostError , kinesis} ,
@@ -73,61 +72,174 @@ pub async fn push_logs(
7372 let custom_partition = STREAM_INFO . get_custom_partition ( stream_name) ?;
7473 let schema_version = STREAM_INFO . get_schema_version ( stream_name) ?;
7574 let body_val: Value = serde_json:: from_slice ( body) ?;
76- let data = convert_array_to_object (
77- body_val,
78- time_partition. as_ref ( ) ,
79- time_partition_limit,
80- custom_partition. as_ref ( ) ,
81- schema_version,
82- ) ?;
8375
84- for value in data {
85- let origin_size = serde_json:: to_vec ( & value) . unwrap ( ) . len ( ) as u64 ; // string length need not be the same as byte length
86- let parsed_timestamp = match time_partition. as_ref ( ) {
87- Some ( time_partition) => get_parsed_timestamp ( & value, time_partition) ?,
88- _ => Utc :: now ( ) . naive_utc ( ) ,
89- } ;
90- let custom_partition_values = match custom_partition. as_ref ( ) {
91- Some ( custom_partition) => {
92- let custom_partitions = custom_partition. split ( ',' ) . collect_vec ( ) ;
93- get_custom_partition_values ( & value, & custom_partitions)
76+ let size: usize = body. len ( ) ;
77+ let mut parsed_timestamp = Utc :: now ( ) . naive_utc ( ) ;
78+ if time_partition. is_none ( ) {
79+ if custom_partition. is_none ( ) {
80+ let size = size as u64 ;
81+ create_process_record_batch (
82+ stream_name,
83+ req,
84+ body_val,
85+ static_schema_flag. as_ref ( ) ,
86+ None ,
87+ parsed_timestamp,
88+ & HashMap :: new ( ) ,
89+ size,
90+ schema_version,
91+ )
92+ . await ?;
93+ } else {
94+ let data = convert_array_to_object (
95+ body_val,
96+ None ,
97+ None ,
98+ custom_partition. as_ref ( ) ,
99+ schema_version,
100+ ) ?;
101+ let custom_partition = custom_partition. unwrap ( ) ;
102+ let custom_partition_list = custom_partition. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
103+
104+ for value in data {
105+ let custom_partition_values =
106+ get_custom_partition_values ( & value, & custom_partition_list) ;
107+
108+ let size = value. to_string ( ) . into_bytes ( ) . len ( ) as u64 ;
109+ create_process_record_batch (
110+ stream_name,
111+ req,
112+ value,
113+ static_schema_flag. as_ref ( ) ,
114+ None ,
115+ parsed_timestamp,
116+ & custom_partition_values,
117+ size,
118+ schema_version,
119+ )
120+ . await ?;
94121 }
95- None => HashMap :: new ( ) ,
96- } ;
97- let schema = STREAM_INFO
98- . read ( )
99- . unwrap ( )
100- . get ( stream_name)
101- . ok_or ( PostError :: StreamNotFound ( stream_name. to_owned ( ) ) ) ?
102- . schema
103- . clone ( ) ;
104- let ( rb, is_first_event) = into_event_batch (
105- req,
106- & value,
107- schema,
108- static_schema_flag. as_ref ( ) ,
122+ }
123+ } else if custom_partition. is_none ( ) {
124+ let data = convert_array_to_object (
125+ body_val,
126+ time_partition. as_ref ( ) ,
127+ time_partition_limit,
128+ None ,
129+ schema_version,
130+ ) ?;
131+ for value in data {
132+ parsed_timestamp = get_parsed_timestamp ( & value, time_partition. as_ref ( ) . unwrap ( ) ) ?;
133+ let size = value. to_string ( ) . into_bytes ( ) . len ( ) as u64 ;
134+ create_process_record_batch (
135+ stream_name,
136+ req,
137+ value,
138+ static_schema_flag. as_ref ( ) ,
139+ time_partition. as_ref ( ) ,
140+ parsed_timestamp,
141+ & HashMap :: new ( ) ,
142+ size,
143+ schema_version,
144+ )
145+ . await ?;
146+ }
147+ } else {
148+ let data = convert_array_to_object (
149+ body_val,
109150 time_partition. as_ref ( ) ,
151+ time_partition_limit,
152+ custom_partition. as_ref ( ) ,
110153 schema_version,
111154 ) ?;
155+ let custom_partition = custom_partition. unwrap ( ) ;
156+ let custom_partition_list = custom_partition. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
112157
113- Event {
114- rb,
115- stream_name : stream_name. to_owned ( ) ,
116- origin_format : "json" ,
117- origin_size,
118- is_first_event,
119- parsed_timestamp,
120- time_partition : time_partition. clone ( ) ,
121- custom_partition_values,
122- stream_type : StreamType :: UserDefined ,
158+ for value in data {
159+ let custom_partition_values =
160+ get_custom_partition_values ( & value, & custom_partition_list) ;
161+
162+ parsed_timestamp = get_parsed_timestamp ( & value, time_partition. as_ref ( ) . unwrap ( ) ) ?;
163+ let size = value. to_string ( ) . into_bytes ( ) . len ( ) as u64 ;
164+ create_process_record_batch (
165+ stream_name,
166+ req,
167+ value,
168+ static_schema_flag. as_ref ( ) ,
169+ time_partition. as_ref ( ) ,
170+ parsed_timestamp,
171+ & custom_partition_values,
172+ size,
173+ schema_version,
174+ )
175+ . await ?;
123176 }
124- . process ( )
125- . await ?;
126177 }
127178
128179 Ok ( ( ) )
129180}
130181
182+ #[ allow( clippy:: too_many_arguments) ]
183+ pub async fn create_process_record_batch (
184+ stream_name : & str ,
185+ req : & HttpRequest ,
186+ value : Value ,
187+ static_schema_flag : Option < & String > ,
188+ time_partition : Option < & String > ,
189+ parsed_timestamp : NaiveDateTime ,
190+ custom_partition_values : & HashMap < String , String > ,
191+ origin_size : u64 ,
192+ schema_version : SchemaVersion ,
193+ ) -> Result < ( ) , PostError > {
194+ let ( rb, is_first_event) = get_stream_schema (
195+ stream_name,
196+ req,
197+ & value,
198+ static_schema_flag,
199+ time_partition,
200+ schema_version,
201+ ) ?;
202+ event:: Event {
203+ rb,
204+ stream_name : stream_name. to_owned ( ) ,
205+ origin_format : "json" ,
206+ origin_size,
207+ is_first_event,
208+ parsed_timestamp,
209+ time_partition : time_partition. cloned ( ) ,
210+ custom_partition_values : custom_partition_values. clone ( ) ,
211+ stream_type : StreamType :: UserDefined ,
212+ }
213+ . process ( )
214+ . await ?;
215+
216+ Ok ( ( ) )
217+ }
218+
219+ pub fn get_stream_schema (
220+ stream_name : & str ,
221+ req : & HttpRequest ,
222+ body : & Value ,
223+ static_schema_flag : Option < & String > ,
224+ time_partition : Option < & String > ,
225+ schema_version : SchemaVersion ,
226+ ) -> Result < ( arrow_array:: RecordBatch , bool ) , PostError > {
227+ let hash_map = STREAM_INFO . read ( ) . unwrap ( ) ;
228+ let schema = hash_map
229+ . get ( stream_name)
230+ . ok_or ( PostError :: StreamNotFound ( stream_name. to_owned ( ) ) ) ?
231+ . schema
232+ . clone ( ) ;
233+ into_event_batch (
234+ req,
235+ body,
236+ schema,
237+ static_schema_flag,
238+ time_partition,
239+ schema_version,
240+ )
241+ }
242+
131243pub fn into_event_batch (
132244 req : & HttpRequest ,
133245 body : & Value ,
0 commit comments