diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 747139233..54c829b0d 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -41,10 +41,7 @@ use super::modal::IngesterMetadata; pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { let ingester_infos = get_ingester_info().await.map_err(|err| { log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to get ingester info\n{:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Anyhow(err) })?; let mut errored = false; @@ -96,10 +93,7 @@ pub async fn fetch_stats_from_ingesters( let ingester_infos = get_ingester_info().await.map_err(|err| { log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to get ingester info\n{:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Anyhow(err) })?; for ingester in ingester_infos { @@ -163,13 +157,7 @@ async fn send_stream_sync_request( ingester.domain_name, err ); - StreamError::Custom { - msg: format!( - "failed to forward create stream request to ingester: {}\n Error: {:?}", - ingester.domain_name, err - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Network(err) })?; if !res.status().is_success() { @@ -178,14 +166,7 @@ async fn send_stream_sync_request( ingester.domain_name, res ); - return Err(StreamError::Custom { - msg: format!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name, - res.text().await.unwrap_or_default() - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); + return Err(StreamError::Network(res.error_for_status().unwrap_err())); } Ok(()) @@ -214,13 +195,7 @@ async fn send_stream_rollback_request( ingester.domain_name, err ); - StreamError::Custom { - msg: format!( - "failed to rollback stream creation: {}\n Error: {:?}", - ingester.domain_name, err - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Network(err) })?; // if the response is not successful, log the error and return a custom error @@ -247,10 +222,7 @@ async fn send_stream_rollback_request( pub async fn get_cluster_info() -> Result { let ingester_infos = get_ingester_info().await.map_err(|err| { log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to get ingester info\n{:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Anyhow(err) })?; let mut infos = vec![]; @@ -275,19 +247,13 @@ pub async fn get_cluster_info() -> Result { let resp_data = resp.bytes().await.map_err(|err| { log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err); - StreamError::Custom { - msg: format!("failed to parse ingester info to bytes: {:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Network(err) })?; let sp = serde_json::from_slice::(&resp_data) .map_err(|err| { log::error!("Fatal: failed to parse ingester info: {:?}", err); - StreamError::Custom { - msg: format!("failed to parse ingester info: {:?}", err), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::ResponseError(err) })? .get("staging") .unwrap() @@ -321,7 +287,7 @@ pub async fn get_cluster_info() -> Result { pub async fn get_cluster_metrics() -> Result { let ingester_metadata = get_ingester_info().await.map_err(|err| { log::error!("Fatal: failed to get ingester info: {:?}", err); - PostError::CustomError(err.to_string()) + PostError::Invalid(err) })?; let mut dresses = vec![]; @@ -341,10 +307,7 @@ pub async fn get_cluster_metrics() -> Result { .await; if let Ok(res) = res { - let text = res - .text() - .await - .map_err(|err| PostError::CustomError(err.to_string()))?; + let text = res.text().await.map_err(PostError::NetworkError)?; let lines: Vec> = text.lines().map(|line| Ok(line.to_owned())).collect_vec(); diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index dc436f4ee..870aaa850 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -123,7 +123,7 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .iter() .map(|x| x.creation_time.parse::>().unwrap()) .min() - .unwrap(); // should never be None + .unwrap(); // should never be None // get the stream name let stream_name = stats[0].stream.clone(); @@ -138,7 +138,7 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { None => Utc::now(), // current time ie the max time }) .min() - .unwrap(); // should never be None + .unwrap(); // should never be None let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); @@ -222,13 +222,7 @@ pub async fn send_stats_request( err ); - StreamError::Custom { - msg: format!( - "failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, err - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - } + StreamError::Network(err) })?; if !res.status().is_success() { diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index d9751fefd..bba472a81 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -172,6 +172,8 @@ pub enum PostError { CreateStream(#[from] CreateStreamError), #[error("Error: {0}")] CustomError(String), + #[error("Error: {0}")] + NetworkError(#[from] reqwest::Error), } impl actix_web::ResponseError for PostError { @@ -187,6 +189,7 @@ impl actix_web::ResponseError for PostError { PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 3329f011a..64851ed68 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -391,6 +391,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> Ok(()) } +fn classify_json_error(kind: serde_json::error::Category) -> StatusCode { + match kind { + serde_json::error::Category::Io => StatusCode::INTERNAL_SERVER_ERROR, + serde_json::error::Category::Syntax => StatusCode::BAD_REQUEST, + serde_json::error::Category::Data => StatusCode::INTERNAL_SERVER_ERROR, + serde_json::error::Category::Eof => StatusCode::BAD_REQUEST, + } +} + pub mod error { use actix_web::http::header::ContentType; @@ -402,6 +411,8 @@ pub mod error { validator::error::{AlertValidationError, StreamNameValidationError}, }; + use super::classify_json_error; + #[derive(Debug, thiserror::Error)] pub enum CreateStreamError { #[error("Stream name validation failed due to {0}")] @@ -446,6 +457,12 @@ pub mod error { InvalidRetentionConfig(serde_json::Error), #[error("{msg}")] Custom { msg: String, status: StatusCode }, + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), + #[error("Network Error: {0}")] + Network(#[from] reqwest::Error), + #[error("Error: {0}")] + ResponseError(#[from] serde_json::Error), } impl actix_web::ResponseError for StreamError { @@ -468,6 +485,11 @@ pub mod error { StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST, StreamError::InvalidAlertMessage(_, _) => StatusCode::BAD_REQUEST, StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST, + StreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, + StreamError::Network(err) => { + err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + } + StreamError::ResponseError(err) => classify_json_error(err.classify()), } } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 5f539ce49..6a3fefd0a 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -30,6 +30,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; +use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::event::commit_schema; @@ -41,6 +42,7 @@ use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; +use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use super::send_query_request_to_ingester; @@ -76,11 +78,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result), + #[error("Error: {0}")] + PathError(relative_path::FromPathError), #[allow(dead_code)] #[error("Authentication Error: {0}")] diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index bd717a21a..7b403095a 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -434,9 +434,8 @@ impl ObjectStorage for S3 { let byts = self .get_object( - RelativePath::from_path(meta.location.as_ref()).map_err(|err| { - ObjectStorageError::Custom(format!("Error while getting files: {:}", err)) - })?, + RelativePath::from_path(meta.location.as_ref()) + .map_err(ObjectStorageError::PathError)?, ) .await?;