1616 *
1717 */
1818
19- use std:: sync:: Arc ;
20-
2119use actix_web:: http:: header:: ContentType ;
2220use actix_web:: { HttpRequest , HttpResponse } ;
2321use arrow_schema:: Schema ;
@@ -29,6 +27,7 @@ use crate::event::error::EventError;
2927use crate :: event:: format:: EventFormat ;
3028use crate :: event:: { self , format} ;
3129use crate :: handlers:: { PREFIX_META , PREFIX_TAGS , SEPARATOR , STREAM_NAME_HEADER_KEY } ;
30+ use crate :: metadata:: error:: stream_info:: MetadataError ;
3231use crate :: metadata:: STREAM_INFO ;
3332use crate :: utils:: header_parsing:: { collect_labelled_headers, ParseHeaderError } ;
3433
@@ -62,7 +61,8 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
6261}
6362
6463async fn push_logs ( stream_name : String , req : HttpRequest , body : Bytes ) -> Result < ( ) , PostError > {
65- let ( size, rb) = into_event_batch ( req, body, & get_stream_schema ( & stream_name) ) ?;
64+ let schema = STREAM_INFO . schema ( & stream_name) ?;
65+ let ( size, rb) = into_event_batch ( req, body, & schema) ?;
6666
6767 event:: Event {
6868 rb,
@@ -95,12 +95,10 @@ fn into_event_batch(
9595 Ok ( ( size, rb) )
9696}
9797
98- fn get_stream_schema ( stream_name : & str ) -> Arc < Schema > {
99- STREAM_INFO . schema ( stream_name) . unwrap ( )
100- }
101-
10298#[ derive( Debug , thiserror:: Error ) ]
10399pub enum PostError {
100+ #[ error( "{0}" ) ]
101+ StreamNotFound ( #[ from] MetadataError ) ,
104102 #[ error( "Could not deserialize into JSON object, {0}" ) ]
105103 SerdeError ( #[ from] serde_json:: Error ) ,
106104 #[ error( "Header Error: {0}" ) ]
@@ -121,6 +119,7 @@ impl actix_web::ResponseError for PostError {
121119 PostError :: Event ( _) => StatusCode :: INTERNAL_SERVER_ERROR ,
122120 PostError :: Invalid ( _) => StatusCode :: BAD_REQUEST ,
123121 PostError :: CreateStream ( _) => StatusCode :: INTERNAL_SERVER_ERROR ,
122+ PostError :: StreamNotFound ( _) => StatusCode :: NOT_FOUND ,
124123 }
125124 }
126125
0 commit comments