diff --git a/Cargo.lock b/Cargo.lock index 1b38a69c0..ba43846f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2970,7 +2970,7 @@ dependencies = [ [[package]] name = "parseable" -version = "0.4.1" +version = "0.4.2" dependencies = [ "actix-cors", "actix-web", diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index d3c4c99b5..d740b9d9d 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,8 +16,6 @@ * */ -use std::sync::Arc; - use actix_web::http::header::ContentType; use actix_web::{HttpRequest, HttpResponse}; use arrow_schema::Schema; @@ -29,6 +27,7 @@ use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY}; +use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; @@ -62,7 +61,8 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb) = into_event_batch(req, body, &get_stream_schema(&stream_name))?; + let schema = STREAM_INFO.schema(&stream_name)?; + let (size, rb) = into_event_batch(req, body, &schema)?; event::Event { rb, @@ -95,12 +95,10 @@ fn into_event_batch( Ok((size, rb)) } -fn get_stream_schema(stream_name: &str) -> Arc { - STREAM_INFO.schema(stream_name).unwrap() -} - #[derive(Debug, thiserror::Error)] pub enum PostError { + #[error("{0}")] + StreamNotFound(#[from] MetadataError), #[error("Could not deserialize into JSON object, {0}")] SerdeError(#[from] serde_json::Error), #[error("Header Error: {0}")] @@ -121,6 +119,7 @@ impl actix_web::ResponseError for PostError { PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::Invalid(_) => StatusCode::BAD_REQUEST, PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, } }