1717 */
1818
1919use super :: logstream:: error:: { CreateStreamError , StreamError } ;
20+ use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, push_logs} ;
2021use super :: users:: dashboards:: DashboardError ;
2122use super :: users:: filters:: FiltersError ;
22- use super :: { kinesis , otel} ;
23+ use super :: otel;
2324use crate :: event:: {
2425 self ,
2526 error:: EventError ,
2627 format:: { self , EventFormat } ,
2728} ;
2829use crate :: handlers:: {
29- LOG_SOURCE_KEY , LOG_SOURCE_KINESIS , LOG_SOURCE_OTEL , PREFIX_META , PREFIX_TAGS , SEPARATOR ,
30- STREAM_NAME_HEADER_KEY ,
30+ LOG_SOURCE_KEY , LOG_SOURCE_OTEL , STREAM_NAME_HEADER_KEY ,
3131} ;
3232use crate :: localcache:: CacheError ;
3333use crate :: metadata:: error:: stream_info:: MetadataError ;
3434use crate :: metadata:: { self , STREAM_INFO } ;
3535use crate :: option:: { Mode , CONFIG } ;
3636use crate :: storage:: { LogStream , ObjectStorageError , StreamType } ;
37- use crate :: utils:: header_parsing:: { collect_labelled_headers, ParseHeaderError } ;
38- use crate :: utils:: json:: convert_array_to_object;
37+ use crate :: utils:: header_parsing:: ParseHeaderError ;
3938use actix_web:: { http:: header:: ContentType , HttpRequest , HttpResponse } ;
4039use arrow_array:: RecordBatch ;
41- use arrow_schema:: { Field , Schema } ;
40+ use arrow_schema:: Schema ;
4241use bytes:: Bytes ;
43- use chrono:: { DateTime , NaiveDateTime , Utc } ;
42+ use chrono:: Utc ;
4443use http:: StatusCode ;
4544use serde_json:: Value ;
46- use std:: collections:: { BTreeMap , HashMap } ;
45+ use std:: collections:: HashMap ;
4746use std:: sync:: Arc ;
4847
4948// Handler for POST /api/v1/ingest
@@ -142,34 +141,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
142141 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
143142}
144143
145- async fn flatten_and_push_logs (
146- req : HttpRequest ,
147- body : Bytes ,
148- stream_name : String ,
149- ) -> Result < ( ) , PostError > {
150- //flatten logs
151- if let Some ( ( _, log_source) ) = req. headers ( ) . iter ( ) . find ( |& ( key, _) | key == LOG_SOURCE_KEY ) {
152- let mut json: Vec < BTreeMap < String , Value > > = Vec :: new ( ) ;
153- let log_source: String = log_source. to_str ( ) . unwrap ( ) . to_owned ( ) ;
154- match log_source. as_str ( ) {
155- LOG_SOURCE_KINESIS => json = kinesis:: flatten_kinesis_logs ( & body) ,
156- LOG_SOURCE_OTEL => {
157- json = otel:: flatten_otel_logs ( & body) ;
158- }
159- _ => {
160- log:: warn!( "Unknown log source: {}" , log_source) ;
161- push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
162- }
163- }
164- for record in json. iter_mut ( ) {
165- let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
166- push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
167- }
168- } else {
169- push_logs ( stream_name. to_string ( ) , req, body) . await ?;
170- }
171- Ok ( ( ) )
172- }
173144
174145// Handler for POST /api/v1/logstream/{logstream}
175146// only ingests events into the specified logstream
@@ -187,11 +158,6 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
187158 return Err ( PostError :: StreamNotFound ( stream_name) ) ;
188159 }
189160
190- if CONFIG . parseable . mode == Mode :: Query {
191- return Err ( PostError :: Invalid ( anyhow:: anyhow!(
192- "Ingestion is not allowed in Query mode"
193- ) ) ) ;
194- }
195161 flatten_and_push_logs ( req, body, stream_name) . await ?;
196162 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
197163}
@@ -216,209 +182,6 @@ pub async fn push_logs_unchecked(
216182 Ok ( unchecked_event)
217183}
218184
219- async fn push_logs ( stream_name : String , req : HttpRequest , body : Bytes ) -> Result < ( ) , PostError > {
220- let time_partition = STREAM_INFO . get_time_partition ( & stream_name) ?;
221- let time_partition_limit = STREAM_INFO . get_time_partition_limit ( & stream_name) ?;
222- let static_schema_flag = STREAM_INFO . get_static_schema_flag ( & stream_name) ?;
223- let custom_partition = STREAM_INFO . get_custom_partition ( & stream_name) ?;
224- let body_val: Value = serde_json:: from_slice ( & body) ?;
225- let size: usize = body. len ( ) ;
226- let mut parsed_timestamp = Utc :: now ( ) . naive_utc ( ) ;
227- if time_partition. is_none ( ) {
228- if custom_partition. is_none ( ) {
229- let size = size as u64 ;
230- create_process_record_batch (
231- stream_name. clone ( ) ,
232- req. clone ( ) ,
233- body_val. clone ( ) ,
234- static_schema_flag. clone ( ) ,
235- None ,
236- parsed_timestamp,
237- HashMap :: new ( ) ,
238- size,
239- )
240- . await ?;
241- } else {
242- let data =
243- convert_array_to_object ( body_val. clone ( ) , None , None , custom_partition. clone ( ) ) ?;
244- let custom_partition = custom_partition. unwrap ( ) ;
245- let custom_partition_list = custom_partition. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
246-
247- for value in data {
248- let custom_partition_values =
249- get_custom_partition_values ( & value, & custom_partition_list) ;
250-
251- let size = value. to_string ( ) . into_bytes ( ) . len ( ) as u64 ;
252- create_process_record_batch (
253- stream_name. clone ( ) ,
254- req. clone ( ) ,
255- value. clone ( ) ,
256- static_schema_flag. clone ( ) ,
257- None ,
258- parsed_timestamp,
259- custom_partition_values. clone ( ) ,
260- size,
261- )
262- . await ?;
263- }
264- }
265- } else if custom_partition. is_none ( ) {
266- let data = convert_array_to_object (
267- body_val. clone ( ) ,
268- time_partition. clone ( ) ,
269- time_partition_limit,
270- None ,
271- ) ?;
272- for value in data {
273- parsed_timestamp = get_parsed_timestamp ( & value, & time_partition) ;
274- let size = value. to_string ( ) . into_bytes ( ) . len ( ) as u64 ;
275- create_process_record_batch (
276- stream_name. clone ( ) ,
277- req. clone ( ) ,
278- value. clone ( ) ,
279- static_schema_flag. clone ( ) ,
280- time_partition. clone ( ) ,
281- parsed_timestamp,
282- HashMap :: new ( ) ,
283- size,
284- )
285- . await ?;
286- }
287- } else {
288- let data = convert_array_to_object (
289- body_val. clone ( ) ,
290- time_partition. clone ( ) ,
291- time_partition_limit,
292- custom_partition. clone ( ) ,
293- ) ?;
294- let custom_partition = custom_partition. unwrap ( ) ;
295- let custom_partition_list = custom_partition. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
296-
297- for value in data {
298- let custom_partition_values =
299- get_custom_partition_values ( & value, & custom_partition_list) ;
300-
301- parsed_timestamp = get_parsed_timestamp ( & value, & time_partition) ;
302- let size = value. to_string ( ) . into_bytes ( ) . len ( ) as u64 ;
303- create_process_record_batch (
304- stream_name. clone ( ) ,
305- req. clone ( ) ,
306- value. clone ( ) ,
307- static_schema_flag. clone ( ) ,
308- time_partition. clone ( ) ,
309- parsed_timestamp,
310- custom_partition_values. clone ( ) ,
311- size,
312- )
313- . await ?;
314- }
315- }
316-
317- Ok ( ( ) )
318- }
319-
320- fn get_parsed_timestamp ( body : & Value , time_partition : & Option < String > ) -> NaiveDateTime {
321- let body_timestamp = body. get ( time_partition. clone ( ) . unwrap ( ) . to_string ( ) ) ;
322- let parsed_timestamp = body_timestamp
323- . unwrap ( )
324- . to_owned ( )
325- . as_str ( )
326- . unwrap ( )
327- . parse :: < DateTime < Utc > > ( )
328- . unwrap ( )
329- . naive_utc ( ) ;
330- parsed_timestamp
331- }
332-
333- fn get_custom_partition_values (
334- body : & Value ,
335- custom_partition_list : & [ & str ] ,
336- ) -> HashMap < String , String > {
337- let mut custom_partition_values: HashMap < String , String > = HashMap :: new ( ) ;
338- for custom_partition_field in custom_partition_list {
339- let custom_partition_value = body. get ( custom_partition_field. trim ( ) ) . unwrap ( ) . to_owned ( ) ;
340- let custom_partition_value = match custom_partition_value. clone ( ) {
341- e @ Value :: Number ( _) | e @ Value :: Bool ( _) => e. to_string ( ) ,
342- Value :: String ( s) => s,
343- _ => "" . to_string ( ) ,
344- } ;
345- custom_partition_values. insert (
346- custom_partition_field. trim ( ) . to_string ( ) ,
347- custom_partition_value,
348- ) ;
349- }
350- custom_partition_values
351- }
352-
353- #[ allow( clippy:: too_many_arguments) ]
354- async fn create_process_record_batch (
355- stream_name : String ,
356- req : HttpRequest ,
357- value : Value ,
358- static_schema_flag : Option < String > ,
359- time_partition : Option < String > ,
360- parsed_timestamp : NaiveDateTime ,
361- custom_partition_values : HashMap < String , String > ,
362- origin_size : u64 ,
363- ) -> Result < ( ) , PostError > {
364- let ( rb, is_first_event) = get_stream_schema (
365- stream_name. clone ( ) ,
366- req. clone ( ) ,
367- value. clone ( ) ,
368- static_schema_flag. clone ( ) ,
369- time_partition. clone ( ) ,
370- ) ?;
371- event:: Event {
372- rb,
373- stream_name : stream_name. clone ( ) ,
374- origin_format : "json" ,
375- origin_size,
376- is_first_event,
377- parsed_timestamp,
378- time_partition : time_partition. clone ( ) ,
379- custom_partition_values : custom_partition_values. clone ( ) ,
380- stream_type : StreamType :: UserDefined ,
381- }
382- . process ( )
383- . await ?;
384-
385- Ok ( ( ) )
386- }
387-
388- fn get_stream_schema (
389- stream_name : String ,
390- req : HttpRequest ,
391- body : Value ,
392- static_schema_flag : Option < String > ,
393- time_partition : Option < String > ,
394- ) -> Result < ( arrow_array:: RecordBatch , bool ) , PostError > {
395- let hash_map = STREAM_INFO . read ( ) . unwrap ( ) ;
396- let schema = hash_map
397- . get ( & stream_name)
398- . ok_or ( PostError :: StreamNotFound ( stream_name) ) ?
399- . schema
400- . clone ( ) ;
401- into_event_batch ( req, body, schema, static_schema_flag, time_partition)
402- }
403-
404- fn into_event_batch (
405- req : HttpRequest ,
406- body : Value ,
407- schema : HashMap < String , Arc < Field > > ,
408- static_schema_flag : Option < String > ,
409- time_partition : Option < String > ,
410- ) -> Result < ( arrow_array:: RecordBatch , bool ) , PostError > {
411- let tags = collect_labelled_headers ( & req, PREFIX_TAGS , SEPARATOR ) ?;
412- let metadata = collect_labelled_headers ( & req, PREFIX_META , SEPARATOR ) ?;
413- let event = format:: json:: Event {
414- data : body,
415- tags,
416- metadata,
417- } ;
418- let ( rb, is_first) = event. into_recordbatch ( schema, static_schema_flag, time_partition) ?;
419- Ok ( ( rb, is_first) )
420- }
421-
422185// Check if the stream exists and create a new stream if doesn't exist
423186pub async fn create_stream_if_not_exists (
424187 stream_name : & str ,
@@ -547,11 +310,9 @@ mod tests {
547310
548311 use crate :: {
549312 event,
550- handlers:: { PREFIX_META , PREFIX_TAGS } ,
313+ handlers:: { http :: modal :: utils :: ingest_utils :: into_event_batch , PREFIX_META , PREFIX_TAGS } ,
551314 } ;
552315
553- use super :: into_event_batch;
554-
555316 trait TestExt {
556317 fn as_int64_arr ( & self ) -> & Int64Array ;
557318 fn as_float64_arr ( & self ) -> & Float64Array ;
0 commit comments