@@ -25,13 +25,14 @@ use crate::event;
2525use crate :: query:: Query ;
2626use crate :: response:: QueryResponse ;
2727use crate :: s3:: S3 ;
28- use crate :: utils:: header_parsing:: collect_labelled_headers;
28+ use crate :: utils:: header_parsing:: { collect_labelled_headers, ParseHeaderError } ;
2929use crate :: utils:: { self , flatten_json_body, merge} ;
3030
3131use self :: error:: { PostError , QueryError } ;
3232
3333const PREFIX_TAGS : & str = "x-p-tag-" ;
3434const PREFIX_META : & str = "x-p-meta-" ;
35+ const STREAM_NAME_HEADER_KEY : & str = "x-p-stream-name" ;
3536const SEPARATOR : char = '^' ;
3637
3738pub async fn query ( _req : HttpRequest , json : web:: Json < Value > ) -> Result < HttpResponse , QueryError > {
@@ -48,12 +49,30 @@ pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResp
4849 . map_err ( |e| e. into ( ) )
4950}
5051
52+ pub async fn ingest (
53+ req : HttpRequest ,
54+ body : web:: Json < serde_json:: Value > ,
55+ ) -> Result < HttpResponse , PostError > {
56+ if let Some ( ( _, stream_name) ) = req. headers ( ) . iter ( ) . find ( |& ( key, _) | key == STREAM_NAME_HEADER_KEY ) {
57+ push_logs ( stream_name. to_str ( ) . unwrap ( ) . to_owned ( ) , req, body) . await ?;
58+
59+ Ok ( HttpResponse :: Ok ( ) . finish ( ) )
60+ } else {
61+ Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) )
62+ }
63+ }
64+
5165pub async fn post_event (
5266 req : HttpRequest ,
5367 body : web:: Json < serde_json:: Value > ,
5468) -> Result < HttpResponse , PostError > {
5569 let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
70+ push_logs ( stream_name, req, body) . await ?;
71+
72+ Ok ( HttpResponse :: Ok ( ) . finish ( ) )
73+ }
5674
75+ async fn push_logs ( stream_name : String , req : HttpRequest , body : web:: Json < serde_json:: Value > ) -> Result < ( ) , PostError > {
5776 let tags = HashMap :: from ( [ (
5877 "p_tags" . to_string ( ) ,
5978 collect_labelled_headers ( & req, PREFIX_TAGS , SEPARATOR ) ?,
@@ -89,7 +108,7 @@ pub async fn post_event(
89108 event. process ( ) . await ?;
90109 }
91110
92- Ok ( HttpResponse :: Ok ( ) . finish ( ) )
111+ Ok ( ( ) )
93112}
94113
95114pub mod error {
0 commit comments