From 77de6bacd91315812e65997cb84b1302d68b966c Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Mon, 12 Dec 2022 12:34:43 +0530 Subject: [PATCH 1/2] Add support for automatic stream creation In case the user specifies x-p-stream in the header and the stream doesn't exist, server failed with stream doesn't exist error. But in highly automated environments like kubernetes, it is important to dynamically create the stream so that Parseable is able to ingest the log data quickly without too much user involvement. --- server/src/event.rs | 3 +- server/src/handlers/event.rs | 14 +++-- server/src/handlers/logstream.rs | 89 +++++++++++++++++--------------- server/src/main.rs | 2 +- server/src/metadata.rs | 7 ++- server/src/stats.rs | 18 +++++++ server/src/utils.rs | 2 +- 7 files changed, 83 insertions(+), 52 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index b0c757050..b1e8037df 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -172,7 +172,6 @@ impl Event { let inferred_schema = self.infer_schema()?; let event = self.get_reader(inferred_schema.clone()); - let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; if let Some(existing_schema) = stream_schema { @@ -211,7 +210,7 @@ impl Event { // note for functions _schema_with_map and _set_schema_with_map, // these are to be called while holding a write lock specifically. // this guarantees two things - // - no other metadata operation can happen inbetween + // - no other metadata operation can happen in between // - map always have an entry for this stream let stream_name = &self.stream_name; diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 84327feb8..c44950a0f 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -32,7 +32,7 @@ use self::error::{PostError, QueryError}; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; -const STREAM_NAME_HEADER_KEY: &str = "x-p-stream-name"; +const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const SEPARATOR: char = '^'; pub async fn query(_req: HttpRequest, json: web::Json) -> Result { @@ -49,6 +49,9 @@ pub async fn query(_req: HttpRequest, json: web::Json) -> Result, @@ -58,21 +61,24 @@ pub async fn ingest( .iter() .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) { - push_logs(stream_name.to_str().unwrap().to_owned(), req, body).await?; - + let str_name = stream_name.to_str().unwrap().to_owned(); + super::logstream::create_stream_if_not_exists(str_name.clone()).await; + push_logs(str_name, req, body).await?; Ok(HttpResponse::Ok().finish()) } else { Err(PostError::Header(ParseHeaderError::MissingStreamName)) } } +// Handler for POST /api/v1/logstream/{logstream} +// only ingests events into the specified logstream +// fails if the logstream does not exist pub async fn post_event( req: HttpRequest, body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); push_logs(stream_name, req, body).await?; - Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 91a842156..59bfce4c4 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -164,50 +164,9 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse { .to_http() } -pub async fn put(req: HttpRequest) -> HttpResponse { +pub async fn put_stream(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - // fail to proceed if there is an error in log stream name validation - if let Err(e) = validator::stream_name(&stream_name) { - return response::ServerResponse { - msg: format!("failed to create log stream due to err: {}", e), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } - - let s3 = S3::new(); - - // Proceed to create log stream if it doesn't exist - if s3.get_schema(&stream_name).await.is_err() { - // Fail if unable to create log stream on object store backend - if let Err(e) = s3.create_stream(&stream_name).await { - return response::ServerResponse { - msg: format!( - "failed to create log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } - metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); - return response::ServerResponse { - msg: format!("created log stream {}", stream_name), - code: StatusCode::OK, - } - .to_http(); - } - - // Error if the log stream already exists - response::ServerResponse { - msg: format!( - "log stream {} already exists, please create a new log stream with unique name", - stream_name - ), - code: StatusCode::BAD_REQUEST, - } - .to_http() + create_stream_if_not_exists(stream_name).await } pub async fn put_alert(req: HttpRequest, body: web::Json) -> HttpResponse { @@ -349,3 +308,47 @@ fn remove_id_from_alerts(value: &mut Value) { }); } } + +// Check if the stream exists and create a new stream if doesn't exist +pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse { + if metadata::STREAM_INFO.stream_exists(stream_name.as_str()) { + // Error if the log stream already exists + response::ServerResponse { + msg: format!( + "log stream {} already exists, please create a new log stream with unique name", + stream_name + ), + code: StatusCode::BAD_REQUEST, + } + .to_http(); + } + + // fail to proceed if invalid stream name + if let Err(e) = validator::stream_name(&stream_name) { + response::ServerResponse { + msg: format!("failed to create log stream due to err: {}", e), + code: StatusCode::BAD_REQUEST, + } + .to_http(); + } + + // Proceed to create log stream if it doesn't exist + let s3 = S3::new(); + if let Err(e) = s3.create_stream(&stream_name).await { + // Fail if unable to create log stream on object store backend + response::ServerResponse { + msg: format!( + "failed to create log stream {} due to err: {}", + stream_name, e + ), + code: StatusCode::INTERNAL_SERVER_ERROR, + } + .to_http(); + } + metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); + response::ServerResponse { + msg: format!("created log stream {}", stream_name), + code: StatusCode::OK, + } + .to_http() +} diff --git a/server/src/main.rs b/server/src/main.rs index 47e3e7b23..b0c8aeacf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -265,7 +265,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { // logstream API web::resource(logstream_path("{logstream}")) // PUT "/logstream/{logstream}" ==> Create log stream - .route(web::put().to(handlers::logstream::put)) + .route(web::put().to(handlers::logstream::put_stream)) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route(web::post().to(handlers::event::post_event)) // DELETE "/logstream/{logstream}" ==> Delete log stream diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a66522fcb..0a39bfa22 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -80,6 +80,11 @@ impl STREAM_INFO { }) } + pub fn stream_exists(&self, stream_name: &str) -> bool { + let map = self.read().expect(LOCK_EXPECT); + map.contains_key(stream_name) + } + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -112,7 +117,7 @@ impl STREAM_INFO { } pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> { - // When loading streams this funtion will assume list_streams only returns valid streams. + // When loading streams this function will assume list_streams only returns valid streams. // a valid stream would have a .schema file. // .schema file could be empty in that case it will be treated as an uninitialized stream. // return error in case of an error from object storage itself. diff --git a/server/src/stats.rs b/server/src/stats.rs index 778e68885..87f78b381 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::sync::atomic::{AtomicU64, Ordering}; use serde::{Deserialize, Serialize}; diff --git a/server/src/utils.rs b/server/src/utils.rs index 2ef6e1e2e..5bf9e57d2 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -103,7 +103,7 @@ pub mod header_parsing { SeperatorInKey(char), #[error("A value passed in header contains reserved char {0}")] SeperatorInValue(char), - #[error("Stream name not found in header")] + #[error("Stream name not found in header [x-p-stream]")] MissingStreamName, } From 251c4bbb50ae3c3ea2683cdec4af99afa1c4f45c Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Mon, 12 Dec 2022 12:47:07 +0530 Subject: [PATCH 2/2] fixes --- server/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index 5bf9e57d2..80edde5c9 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -35,7 +35,7 @@ pub fn merge(value: Value, fields: HashMap) -> Value { for (k, v) in fields { match m.get_mut(&k) { Some(val) => { - let mut final_val = String::new(); + let mut final_val = String::default(); final_val.push_str(val.as_str().unwrap()); final_val.push(','); final_val.push_str(&v);