1616 *
1717 */
1818
19- use std:: collections:: HashMap ;
20- use std:: sync:: Arc ;
21-
22- use actix_web:: http:: header:: ContentType ;
23- use actix_web:: { HttpRequest , HttpResponse } ;
19+ use actix_web:: { http:: header:: ContentType , HttpRequest , HttpResponse } ;
2420use arrow_schema:: Field ;
2521use bytes:: Bytes ;
2622use http:: StatusCode ;
2723use serde_json:: Value ;
24+ use std:: collections:: { BTreeMap , HashMap } ;
25+ use std:: sync:: Arc ;
2826
2927use crate :: event:: error:: EventError ;
3028use crate :: event:: format:: EventFormat ;
3129use crate :: event:: { self , format} ;
32- use crate :: handlers:: { PREFIX_META , PREFIX_TAGS , SEPARATOR , STREAM_NAME_HEADER_KEY } ;
30+ use crate :: handlers:: {
31+ LOG_SOURCE_KEY , LOG_SOURCE_KINESIS , LOG_SOURCE_OTEL , PREFIX_META , PREFIX_TAGS , SEPARATOR ,
32+ STREAM_NAME_HEADER_KEY ,
33+ } ;
3334use crate :: metadata:: STREAM_INFO ;
3435use crate :: utils:: header_parsing:: { collect_labelled_headers, ParseHeaderError } ;
3536
37+ use super :: kinesis;
3638use super :: logstream:: error:: CreateStreamError ;
3739
3840// Handler for POST /api/v1/ingest
@@ -46,19 +48,48 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4648 {
4749 let stream_name = stream_name. to_str ( ) . unwrap ( ) . to_owned ( ) ;
4850 create_stream_if_not_exists ( & stream_name) . await ?;
49- push_logs ( stream_name, req, body) . await ?;
51+
52+ flatten_and_push_logs ( req, body, stream_name) . await ?;
5053 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
5154 } else {
5255 Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) )
5356 }
5457}
5558
59+ async fn flatten_and_push_logs (
60+ req : HttpRequest ,
61+ body : Bytes ,
62+ stream_name : String ,
63+ ) -> Result < ( ) , PostError > {
64+ //flatten logs
65+ if let Some ( ( _, log_source) ) = req. headers ( ) . iter ( ) . find ( |& ( key, _) | key == LOG_SOURCE_KEY ) {
66+ let mut json: Vec < BTreeMap < String , Value > > = Vec :: new ( ) ;
67+ let log_source: String = log_source. to_str ( ) . unwrap ( ) . to_owned ( ) ;
68+ match log_source. as_str ( ) {
69+ LOG_SOURCE_KINESIS => json = kinesis:: flatten_kinesis_logs ( & body) ,
70+ LOG_SOURCE_OTEL => { }
71+ _ => {
72+ log:: warn!( "Unknown log source: {}" , log_source) ;
73+ push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
74+ }
75+ }
76+ for record in json. iter_mut ( ) {
77+ let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
78+ push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
79+ }
80+ } else {
81+ push_logs ( stream_name. to_string ( ) , req, body) . await ?;
82+ }
83+ Ok ( ( ) )
84+ }
85+
5686// Handler for POST /api/v1/logstream/{logstream}
5787// only ingests events into the specified logstream
5888// fails if the logstream does not exist
5989pub async fn post_event ( req : HttpRequest , body : Bytes ) -> Result < HttpResponse , PostError > {
6090 let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
61- push_logs ( stream_name, req, body) . await ?;
91+
92+ flatten_and_push_logs ( req, body, stream_name) . await ?;
6293 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
6394}
6495
0 commit comments