From 1f76e8b3b7d268c293f1c7a66a8d3344b0f2bc83 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 3 Jan 2023 11:57:34 +0530 Subject: [PATCH 1/2] Refactor handlers * Use StreamError type for error propogation * Handlers should now return json with appropriate content-type --- server/src/handlers/event.rs | 11 +- server/src/handlers/logstream.rs | 356 +++++++++++++------------------ server/src/response.rs | 31 +-- 3 files changed, 162 insertions(+), 236 deletions(-) diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 8d9aa5f8c..af85d6f60 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -59,9 +59,11 @@ pub async fn ingest( .iter() .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) { - let str_name = stream_name.to_str().unwrap().to_owned(); - super::logstream::create_stream_if_not_exists(str_name.clone()).await; - push_logs(str_name, req, body).await?; + let stream_name = stream_name.to_str().unwrap().to_owned(); + if let Err(e) = super::logstream::create_stream_if_not_exists(&stream_name).await { + return Err(PostError::CreateStream(e.into())); + } + push_logs(stream_name, req, body).await?; Ok(HttpResponse::Ok().finish()) } else { Err(PostError::Header(ParseHeaderError::MissingStreamName)) @@ -167,6 +169,8 @@ pub mod error { Event(#[from] EventError), #[error("Invalid Request")] Invalid, + #[error("Failed to create stream due to {0}")] + CreateStream(Box), } impl actix_web::ResponseError for PostError { @@ -175,6 +179,7 @@ pub mod error { PostError::Header(_) => StatusCode::BAD_REQUEST, PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::Invalid => StatusCode::BAD_REQUEST, + PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 1d6d4dba1..3f5034d55 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -19,48 +19,29 @@ use std::fs; use actix_web::http::StatusCode; -use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use actix_web::{web, HttpRequest, Responder}; use chrono::Utc; use serde_json::Value; use crate::alerts::Alerts; +use crate::event; use crate::option::CONFIG; -use crate::storage::StorageDir; -use crate::{event, response}; +use crate::storage::{ObjectStorageError, StorageDir}; use crate::{metadata, validator}; -pub async fn delete(req: HttpRequest) -> HttpResponse { +use self::error::StreamError; + +pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if let Err(e) = validator::stream_name(&stream_name) { - // fail to proceed if there is an error in log stream name validation - return response::ServerResponse { - msg: format!("failed to get log stream schema due to err: {}", e), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } + validator::stream_name(&stream_name)?; let objectstore = CONFIG.storage().get_object_store(); if objectstore.get_schema(&stream_name).await.is_err() { - return response::ServerResponse { - msg: format!("log stream {} does not exist", stream_name), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } - - if let Err(e) = objectstore.delete_stream(&stream_name).await { - return response::ServerResponse { - msg: format!( - "failed to delete log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); + return Err(StreamError::StreamNotFound(stream_name.to_string())); } + objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() { @@ -79,63 +60,42 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { ) } - response::ServerResponse { - msg: format!("log stream {} deleted", stream_name), - code: StatusCode::OK, - } - .to_http() + Ok(( + format!("log stream {} deleted", stream_name), + StatusCode::OK, + )) } pub async fn list(_: HttpRequest) -> impl Responder { - response::list_response( - CONFIG - .storage() - .get_object_store() - .list_streams() - .await - .unwrap(), - ) + let body = CONFIG + .storage() + .get_object_store() + .list_streams() + .await + .unwrap(); + web::Json(body) } -pub async fn schema(req: HttpRequest) -> HttpResponse { +pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); match metadata::STREAM_INFO.schema(&stream_name) { - Ok(schema) => response::ServerResponse { - msg: schema - .and_then(|ref schema| serde_json::to_string(schema).ok()) - .unwrap_or_default(), - code: StatusCode::OK, - } - .to_http(), + Ok(schema) => Ok((web::Json(schema), StatusCode::OK)), Err(_) => match CONFIG .storage() .get_object_store() .get_schema(&stream_name) .await { - Ok(None) => response::ServerResponse { - msg: "log stream is not initialized, please post an event before fetching schema" - .to_string(), - code: StatusCode::BAD_REQUEST, - } - .to_http(), - Ok(Some(ref schema)) => response::ServerResponse { - msg: serde_json::to_string(schema).unwrap(), - code: StatusCode::OK, - } - .to_http(), - Err(_) => response::ServerResponse { - msg: "failed to get log stream schema, because log stream doesn't exist" - .to_string(), - code: StatusCode::BAD_REQUEST, - } - .to_http(), + Ok(Some(schema)) => Ok((web::Json(Some(schema)), StatusCode::OK)), + Ok(None) => Err(StreamError::UninitializedLogstream), + Err(ObjectStorageError::NoSuchKey(_)) => Err(StreamError::StreamNotFound(stream_name)), + Err(err) => Err(err.into()), }, } } -pub async fn get_alert(req: HttpRequest) -> HttpResponse { +pub async fn get_alert(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let alerts = metadata::STREAM_INFO @@ -148,45 +108,49 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse { let mut alerts = match alerts { Some(alerts) => alerts, - None => match CONFIG - .storage() - .get_object_store() - .get_alerts(&stream_name) - .await - { - Ok(alerts) if alerts.alerts.is_empty() => { - return response::ServerResponse { - msg: "alert configuration not set for log stream {}".to_string(), - code: StatusCode::BAD_REQUEST, - } - .to_http() - } - Ok(alerts) => serde_json::to_value(alerts).expect("alerts can serialize to valid json"), - Err(_) => { - return response::ServerResponse { - msg: "alert doesn't exist".to_string(), - code: StatusCode::BAD_REQUEST, - } - .to_http() + None => { + let alerts = CONFIG + .storage() + .get_object_store() + .get_alerts(&stream_name) + .await?; + + if alerts.alerts.is_empty() { + return Err(StreamError::NoAlertsSet); } - }, + + serde_json::to_value(alerts).expect("alerts can serialize to valid json") + } }; remove_id_from_alerts(&mut alerts); - response::ServerResponse { - msg: alerts.to_string(), - code: StatusCode::OK, - } - .to_http() + Ok((web::Json(alerts), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest) -> HttpResponse { +pub async fn put_stream(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - create_stream_if_not_exists(stream_name).await + + if metadata::STREAM_INFO.stream_exists(&stream_name) { + // Error if the log stream already exists + return Err(StreamError::Custom { + msg: format!( + "log stream {} already exists, please create a new log stream with unique name", + stream_name + ), + status: StatusCode::BAD_REQUEST, + }); + } else { + create_stream(stream_name).await?; + } + + Ok((format!("log stream created"), StatusCode::OK)) } -pub async fn put_alert(req: HttpRequest, body: web::Json) -> HttpResponse { +pub async fn put_alert( + req: HttpRequest, + body: web::Json, +) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let mut body = body.into_inner(); @@ -194,28 +158,15 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> let alerts: Alerts = match serde_json::from_value(body) { Ok(alerts) => alerts, - Err(e) => { - return response::ServerResponse { - msg: format!( - "failed to set alert configuration for log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::BAD_REQUEST, - } - .to_http() + Err(err) => { + return Err(StreamError::BadAlertJson { + stream: stream_name, + err, + }) } }; - if let Err(e) = validator::alert(&alerts) { - return response::ServerResponse { - msg: format!( - "failed to set alert configuration for log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } + validator::alert(&alerts)?; match metadata::STREAM_INFO.schema(&stream_name) { Ok(Some(schema)) => { @@ -225,77 +176,35 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> .find(|alert| !alert.rule.valid_for_schema(&schema)); if let Some(alert) = invalid_alert { - return response::ServerResponse { - msg: - format!("alert - \"{}\" is invalid, please check if alert is valid according to this stream's schema and try again", alert.name), - code: StatusCode::BAD_REQUEST, - } - .to_http(); + return Err(StreamError::InvalidAlert(alert.name.to_string())); } } - Ok(None) => { - return response::ServerResponse { - msg: "log stream is not initialized, please post an event before setting up alerts" - .to_string(), - code: StatusCode::BAD_REQUEST, - } - .to_http() - } - Err(_) => { - return response::ServerResponse { - msg: "log stream is not found".to_string(), - code: StatusCode::BAD_REQUEST, - } - .to_http() - } + Ok(None) => return Err(StreamError::UninitializedLogstream), + Err(_) => return Err(StreamError::StreamNotFound(stream_name)), } - if let Err(e) = CONFIG + CONFIG .storage() .get_object_store() .put_alerts(&stream_name, &alerts) - .await - { - return response::ServerResponse { - msg: format!( - "failed to set alert configuration for log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } + .await?; - if let Err(e) = metadata::STREAM_INFO.set_alert(&stream_name, alerts) { - return response::ServerResponse { - msg: format!( - "failed to set alert configuration for log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } + metadata::STREAM_INFO + .set_alert(&stream_name, alerts) + .expect("alerts set on existing stream"); - response::ServerResponse { - msg: format!("set alert configuration for log stream {}", stream_name), - code: StatusCode::OK, - } - .to_http() + Ok(( + format!("set alert configuration for log stream {}", stream_name), + StatusCode::OK, + )) } -pub async fn get_stats(req: HttpRequest) -> HttpResponse { +pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let stats = match metadata::STREAM_INFO.get_stats(&stream_name) { Ok(stats) => stats, - Err(e) => { - return response::ServerResponse { - msg: format!("Could not return stats due to error: {}", e), - code: StatusCode::BAD_REQUEST, - } - .to_http() - } + Err(_) => return Err(StreamError::StreamNotFound(stream_name)), }; let time = Utc::now(); @@ -313,11 +222,7 @@ pub async fn get_stats(req: HttpRequest) -> HttpResponse { } }); - response::ServerResponse { - msg: stats.to_string(), - code: StatusCode::OK, - } - .to_http() + Ok((web::Json(stats), StatusCode::OK)) } fn remove_id_from_alerts(value: &mut Value) { @@ -332,45 +237,90 @@ fn remove_id_from_alerts(value: &mut Value) { } // Check if the stream exists and create a new stream if doesn't exist -pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse { - if metadata::STREAM_INFO.stream_exists(stream_name.as_str()) { - // Error if the log stream already exists - response::ServerResponse { - msg: format!( - "log stream {} already exists, please create a new log stream with unique name", - stream_name - ), - code: StatusCode::BAD_REQUEST, - } - .to_http(); +pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), StreamError> { + if metadata::STREAM_INFO.stream_exists(stream_name) { + return Ok(()); } + create_stream(stream_name.to_string()).await +} + +pub async fn create_stream(stream_name: String) -> Result<(), StreamError> { // fail to proceed if invalid stream name - if let Err(e) = validator::stream_name(&stream_name) { - response::ServerResponse { - msg: format!("failed to create log stream due to err: {}", e), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } + validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); if let Err(e) = storage.create_stream(&stream_name).await { // Fail if unable to create log stream on object store backend - response::ServerResponse { + return Err(StreamError::Custom { msg: format!( "failed to create log stream {} due to err: {}", - stream_name, e + stream_name, + e.to_string() ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); + status: StatusCode::INTERNAL_SERVER_ERROR, + }); } metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); - response::ServerResponse { - msg: format!("created log stream {}", stream_name), - code: StatusCode::OK, + + Ok(()) +} + +pub mod error { + + use actix_web::http::header::ContentType; + use http::StatusCode; + + use crate::{ + storage::ObjectStorageError, + validator::error::{AlertValidationError, StreamNameValidationError}, + }; + + #[derive(Debug, thiserror::Error)] + pub enum StreamError { + #[error("Stream name validation failed due to {0}")] + StreamNameValidation(#[from] StreamNameValidationError), + #[error("Log stream {0} does not exist")] + StreamNotFound(String), + #[error("Log stream is not initialized, send an event to this logstream and try again")] + UninitializedLogstream, + #[error("Storage Error {0}")] + Storage(#[from] ObjectStorageError), + #[error("No alerts configured for this stream")] + NoAlertsSet, + #[error("failed to set alert configuration for log stream {stream} due to err: {err}")] + BadAlertJson { + stream: String, + err: serde_json::Error, + }, + #[error("Alert validation failed due to {0}")] + AlertValidation(#[from] AlertValidationError), + #[error("alert - \"{0}\" is invalid, please check if alert is valid according to this stream's schema and try again")] + InvalidAlert(String), + #[error("{msg}")] + Custom { msg: String, status: StatusCode }, + } + + impl actix_web::ResponseError for StreamError { + fn status_code(&self) -> http::StatusCode { + match self { + StreamError::StreamNameValidation(_) => StatusCode::BAD_REQUEST, + StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, + StreamError::Custom { status, .. } => *status, + StreamError::UninitializedLogstream => StatusCode::METHOD_NOT_ALLOWED, + StreamError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR, + StreamError::NoAlertsSet => StatusCode::NOT_FOUND, + StreamError::BadAlertJson { .. } => StatusCode::BAD_REQUEST, + StreamError::AlertValidation(_) => StatusCode::BAD_REQUEST, + StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } } - .to_http() } diff --git a/server/src/response.rs b/server/src/response.rs index 2a1af671b..16a14d12e 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -17,31 +17,9 @@ */ use actix_web::http::StatusCode; -use actix_web::{error, web, HttpResponse, HttpResponseBuilder, Responder}; +use actix_web::{HttpResponse, HttpResponseBuilder}; use datafusion::arrow::json; use datafusion::arrow::record_batch::RecordBatch; -use derive_more::{Display, Error}; - -use crate::storage; - -pub struct ServerResponse { - pub code: StatusCode, - pub msg: String, -} - -impl ServerResponse { - pub fn to_http(&self) -> HttpResponse { - log::info!("{}", self.msg); - - HttpResponseBuilder::new(self.code) - .content_type("text") - .body(self.msg.to_string()) - } -} - -pub fn list_response(body: Vec) -> impl Responder { - web::Json(body) -} pub struct QueryResponse { pub code: StatusCode, @@ -70,10 +48,3 @@ impl From> for QueryResponse { } } } - -#[derive(Debug, Display, Error)] -pub struct EventError { - pub msg: String, -} - -impl error::ResponseError for EventError {} From 263f36f41e150f503c21cff09970a241c082b5c5 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 4 Jan 2023 12:38:31 +0530 Subject: [PATCH 2/2] Fix --- server/src/handlers/logstream.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 3f5034d55..706f611b8 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -144,7 +144,7 @@ pub async fn put_stream(req: HttpRequest) -> Result create_stream(stream_name).await?; } - Ok((format!("log stream created"), StatusCode::OK)) + Ok(("log stream created", StatusCode::OK)) } pub async fn put_alert( @@ -256,8 +256,7 @@ pub async fn create_stream(stream_name: String) -> Result<(), StreamError> { return Err(StreamError::Custom { msg: format!( "failed to create log stream {} due to err: {}", - stream_name, - e.to_string() + stream_name, e ), status: StatusCode::INTERNAL_SERVER_ERROR, });