1616 *
1717 */
1818
19- use std:: collections:: HashMap ;
20- use std:: sync:: Arc ;
21-
22- use actix_web:: http:: header:: ContentType ;
23- use actix_web:: { HttpRequest , HttpResponse } ;
19+ use actix_web:: { http:: header:: ContentType , HttpRequest , HttpResponse } ;
2420use arrow_schema:: Field ;
2521use bytes:: Bytes ;
2622use http:: StatusCode ;
2723use serde_json:: Value ;
24+ use std:: collections:: { BTreeMap , HashMap } ;
25+ use std:: sync:: Arc ;
2826
2927use crate :: event:: error:: EventError ;
3028use crate :: event:: format:: EventFormat ;
3129use crate :: event:: { self , format} ;
32- use crate :: handlers:: { PREFIX_META , PREFIX_TAGS , SEPARATOR , STREAM_NAME_HEADER_KEY } ;
30+ use crate :: handlers:: {
31+ LOG_SOURCE_KEY , LOG_SOURCE_VALUE_FOR_KINEIS , LOG_SOURCE_VALUE_FOR_OTEL , PREFIX_META ,
32+ PREFIX_TAGS , SEPARATOR , STREAM_NAME_HEADER_KEY ,
33+ } ;
3334use crate :: metadata:: STREAM_INFO ;
3435use crate :: utils:: header_parsing:: { collect_labelled_headers, ParseHeaderError } ;
3536
37+ use super :: kinesis;
3638use super :: logstream:: error:: CreateStreamError ;
3739
3840// Handler for POST /api/v1/ingest
@@ -46,19 +48,45 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4648 {
4749 let stream_name = stream_name. to_str ( ) . unwrap ( ) . to_owned ( ) ;
4850 create_stream_if_not_exists ( & stream_name) . await ?;
49- push_logs ( stream_name, req, body) . await ?;
51+
52+ flatten_and_push_logs ( req, body, stream_name) . await ?;
5053 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
5154 } else {
5255 Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) )
5356 }
5457}
5558
59+ async fn flatten_and_push_logs (
60+ req : HttpRequest ,
61+ body : Bytes ,
62+ stream_name : String ,
63+ ) -> Result < ( ) , PostError > {
64+ //flatten logs
65+ if let Some ( ( _, log_source) ) = req. headers ( ) . iter ( ) . find ( |& ( key, _) | key == LOG_SOURCE_KEY ) {
66+ let mut json: Vec < BTreeMap < String , Value > > = Vec :: new ( ) ;
67+ let log_source: String = log_source. to_str ( ) . unwrap ( ) . to_owned ( ) ;
68+ match log_source. as_str ( ) {
69+ LOG_SOURCE_VALUE_FOR_KINEIS => json = kinesis:: flatten_kinesis_logs ( & body) ,
70+ LOG_SOURCE_VALUE_FOR_OTEL => { }
71+ _ => { }
72+ }
73+ for record in json. iter_mut ( ) {
74+ let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
75+ push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
76+ }
77+ } else {
78+ push_logs ( stream_name. to_string ( ) , req, body) . await ?;
79+ }
80+ Ok ( ( ) )
81+ }
82+
5683// Handler for POST /api/v1/logstream/{logstream}
5784// only ingests events into the specified logstream
5885// fails if the logstream does not exist
5986pub async fn post_event ( req : HttpRequest , body : Bytes ) -> Result < HttpResponse , PostError > {
6087 let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
61- push_logs ( stream_name, req, body) . await ?;
88+
89+ flatten_and_push_logs ( req, body, stream_name) . await ?;
6290 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
6391}
6492
0 commit comments