diff --git a/server/src/event.rs b/server/src/event.rs index b0c757050..b1e8037df 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -172,7 +172,6 @@ impl Event { let inferred_schema = self.infer_schema()?; let event = self.get_reader(inferred_schema.clone()); - let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; if let Some(existing_schema) = stream_schema { @@ -211,7 +210,7 @@ impl Event { // note for functions _schema_with_map and _set_schema_with_map, // these are to be called while holding a write lock specifically. // this guarantees two things - // - no other metadata operation can happen inbetween + // - no other metadata operation can happen in between // - map always have an entry for this stream let stream_name = &self.stream_name; diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 84327feb8..c44950a0f 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -32,7 +32,7 @@ use self::error::{PostError, QueryError}; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; -const STREAM_NAME_HEADER_KEY: &str = "x-p-stream-name"; +const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const SEPARATOR: char = '^'; pub async fn query(_req: HttpRequest, json: web::Json) -> Result { @@ -49,6 +49,9 @@ pub async fn query(_req: HttpRequest, json: web::Json) -> Result, @@ -58,21 +61,24 @@ pub async fn ingest( .iter() .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) { - push_logs(stream_name.to_str().unwrap().to_owned(), req, body).await?; - + let str_name = stream_name.to_str().unwrap().to_owned(); + super::logstream::create_stream_if_not_exists(str_name.clone()).await; + push_logs(str_name, req, body).await?; Ok(HttpResponse::Ok().finish()) } else { Err(PostError::Header(ParseHeaderError::MissingStreamName)) } } +// Handler for POST /api/v1/logstream/{logstream} +// only ingests events into the specified logstream +// fails if the logstream does not exist pub async fn post_event( req: HttpRequest, body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); push_logs(stream_name, req, body).await?; - Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 91a842156..59bfce4c4 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -164,50 +164,9 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse { .to_http() } -pub async fn put(req: HttpRequest) -> HttpResponse { +pub async fn put_stream(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - // fail to proceed if there is an error in log stream name validation - if let Err(e) = validator::stream_name(&stream_name) { - return response::ServerResponse { - msg: format!("failed to create log stream due to err: {}", e), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } - - let s3 = S3::new(); - - // Proceed to create log stream if it doesn't exist - if s3.get_schema(&stream_name).await.is_err() { - // Fail if unable to create log stream on object store backend - if let Err(e) = s3.create_stream(&stream_name).await { - return response::ServerResponse { - msg: format!( - "failed to create log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } - metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); - return response::ServerResponse { - msg: format!("created log stream {}", stream_name), - code: StatusCode::OK, - } - .to_http(); - } - - // Error if the log stream already exists - response::ServerResponse { - msg: format!( - "log stream {} already exists, please create a new log stream with unique name", - stream_name - ), - code: StatusCode::BAD_REQUEST, - } - .to_http() + create_stream_if_not_exists(stream_name).await } pub async fn put_alert(req: HttpRequest, body: web::Json) -> HttpResponse { @@ -349,3 +308,47 @@ fn remove_id_from_alerts(value: &mut Value) { }); } } + +// Check if the stream exists and create a new stream if doesn't exist +pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse { + if metadata::STREAM_INFO.stream_exists(stream_name.as_str()) { + // Error if the log stream already exists + response::ServerResponse { + msg: format!( + "log stream {} already exists, please create a new log stream with unique name", + stream_name + ), + code: StatusCode::BAD_REQUEST, + } + .to_http(); + } + + // fail to proceed if invalid stream name + if let Err(e) = validator::stream_name(&stream_name) { + response::ServerResponse { + msg: format!("failed to create log stream due to err: {}", e), + code: StatusCode::BAD_REQUEST, + } + .to_http(); + } + + // Proceed to create log stream if it doesn't exist + let s3 = S3::new(); + if let Err(e) = s3.create_stream(&stream_name).await { + // Fail if unable to create log stream on object store backend + response::ServerResponse { + msg: format!( + "failed to create log stream {} due to err: {}", + stream_name, e + ), + code: StatusCode::INTERNAL_SERVER_ERROR, + } + .to_http(); + } + metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); + response::ServerResponse { + msg: format!("created log stream {}", stream_name), + code: StatusCode::OK, + } + .to_http() +} diff --git a/server/src/main.rs b/server/src/main.rs index 47e3e7b23..b0c8aeacf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -265,7 +265,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { // logstream API web::resource(logstream_path("{logstream}")) // PUT "/logstream/{logstream}" ==> Create log stream - .route(web::put().to(handlers::logstream::put)) + .route(web::put().to(handlers::logstream::put_stream)) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route(web::post().to(handlers::event::post_event)) // DELETE "/logstream/{logstream}" ==> Delete log stream diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a66522fcb..0a39bfa22 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -80,6 +80,11 @@ impl STREAM_INFO { }) } + pub fn stream_exists(&self, stream_name: &str) -> bool { + let map = self.read().expect(LOCK_EXPECT); + map.contains_key(stream_name) + } + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -112,7 +117,7 @@ impl STREAM_INFO { } pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> { - // When loading streams this funtion will assume list_streams only returns valid streams. + // When loading streams this function will assume list_streams only returns valid streams. // a valid stream would have a .schema file. // .schema file could be empty in that case it will be treated as an uninitialized stream. // return error in case of an error from object storage itself. diff --git a/server/src/stats.rs b/server/src/stats.rs index 778e68885..87f78b381 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::sync::atomic::{AtomicU64, Ordering}; use serde::{Deserialize, Serialize}; diff --git a/server/src/utils.rs b/server/src/utils.rs index 2ef6e1e2e..80edde5c9 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -35,7 +35,7 @@ pub fn merge(value: Value, fields: HashMap) -> Value { for (k, v) in fields { match m.get_mut(&k) { Some(val) => { - let mut final_val = String::new(); + let mut final_val = String::default(); final_val.push_str(val.as_str().unwrap()); final_val.push(','); final_val.push_str(&v); @@ -103,7 +103,7 @@ pub mod header_parsing { SeperatorInKey(char), #[error("A value passed in header contains reserved char {0}")] SeperatorInValue(char), - #[error("Stream name not found in header")] + #[error("Stream name not found in header [x-p-stream]")] MissingStreamName, }