diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 1b6556258..84327feb8 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -25,13 +25,14 @@ use crate::event; use crate::query::Query; use crate::response::QueryResponse; use crate::s3::S3; -use crate::utils::header_parsing::collect_labelled_headers; +use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::{self, flatten_json_body, merge}; use self::error::{PostError, QueryError}; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; +const STREAM_NAME_HEADER_KEY: &str = "x-p-stream-name"; const SEPARATOR: char = '^'; pub async fn query(_req: HttpRequest, json: web::Json) -> Result { @@ -48,12 +49,38 @@ pub async fn query(_req: HttpRequest, json: web::Json) -> Result, +) -> Result { + if let Some((_, stream_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + { + push_logs(stream_name.to_str().unwrap().to_owned(), req, body).await?; + + Ok(HttpResponse::Ok().finish()) + } else { + Err(PostError::Header(ParseHeaderError::MissingStreamName)) + } +} + pub async fn post_event( req: HttpRequest, body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + push_logs(stream_name, req, body).await?; + Ok(HttpResponse::Ok().finish()) +} + +async fn push_logs( + stream_name: String, + req: HttpRequest, + body: web::Json, +) -> Result<(), PostError> { let tags = HashMap::from([( "p_tags".to_string(), collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?, @@ -89,7 +116,7 @@ pub async fn post_event( event.process().await?; } - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub mod error { diff --git a/server/src/main.rs b/server/src/main.rs index 5410b6fd9..47e3e7b23 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -259,6 +259,8 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(web::resource(query_path()).route(web::post().to(handlers::event::query))) + // POST "/ingest" ==> Post logs to given log stream based on header + .service(web::resource(ingest_path()).route(web::post().to(handlers::event::ingest))) .service( // logstream API web::resource(logstream_path("{logstream}")) @@ -341,6 +343,10 @@ fn query_path() -> String { "/query".to_string() } +fn ingest_path() -> String { + "/ingest".to_string() +} + fn alert_path(stream_name: &str) -> String { format!("{}/alert", logstream_path(stream_name)) } diff --git a/server/src/utils.rs b/server/src/utils.rs index 0a94bbd77..2ef6e1e2e 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -103,6 +103,8 @@ pub mod header_parsing { SeperatorInKey(char), #[error("A value passed in header contains reserved char {0}")] SeperatorInValue(char), + #[error("Stream name not found in header")] + MissingStreamName, } impl ResponseError for ParseHeaderError {