Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 10 additions & 47 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ use super::modal::IngesterMetadata;
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
let ingester_infos = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Anyhow(err)
})?;

let mut errored = false;
Expand Down Expand Up @@ -96,10 +93,7 @@ pub async fn fetch_stats_from_ingesters(

let ingester_infos = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Anyhow(err)
})?;

for ingester in ingester_infos {
Expand Down Expand Up @@ -163,13 +157,7 @@ async fn send_stream_sync_request(
ingester.domain_name,
err
);
StreamError::Custom {
msg: format!(
"failed to forward create stream request to ingester: {}\n Error: {:?}",
ingester.domain_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

if !res.status().is_success() {
Expand All @@ -178,14 +166,7 @@ async fn send_stream_sync_request(
ingester.domain_name,
res
);
return Err(StreamError::Custom {
msg: format!(
"failed to forward create stream request to ingester: {}\nResponse Returned: {:?}",
ingester.domain_name,
res.text().await.unwrap_or_default()
),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
return Err(StreamError::Network(res.error_for_status().unwrap_err()));
}

Ok(())
Expand Down Expand Up @@ -214,13 +195,7 @@ async fn send_stream_rollback_request(
ingester.domain_name,
err
);
StreamError::Custom {
msg: format!(
"failed to rollback stream creation: {}\n Error: {:?}",
ingester.domain_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
Expand All @@ -247,10 +222,7 @@ async fn send_stream_rollback_request(
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingester_infos = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Anyhow(err)
})?;

let mut infos = vec![];
Expand All @@ -275,19 +247,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {

let resp_data = resp.bytes().await.map_err(|err| {
log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err);
StreamError::Custom {
msg: format!("failed to parse ingester info to bytes: {:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
log::error!("Fatal: failed to parse ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to parse ingester info: {:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::ResponseError(err)
})?
.get("staging")
.unwrap()
Expand Down Expand Up @@ -321,7 +287,7 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
let ingester_metadata = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
PostError::CustomError(err.to_string())
PostError::Invalid(err)
})?;

let mut dresses = vec![];
Expand All @@ -341,10 +307,7 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
.await;

if let Ok(res) = res {
let text = res
.text()
.await
.map_err(|err| PostError::CustomError(err.to_string()))?;
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

Expand Down
12 changes: 3 additions & 9 deletions server/src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
.iter()
.map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
.min()
.unwrap(); // should never be None
.unwrap(); // should never be None

// get the stream name
let stream_name = stats[0].stream.clone();
Expand All @@ -138,7 +138,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
None => Utc::now(), // current time ie the max time
})
.min()
.unwrap(); // should never be None
.unwrap(); // should never be None

let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

Expand Down Expand Up @@ -222,13 +222,7 @@ pub async fn send_stats_request(
err
);

StreamError::Custom {
msg: format!(
"failed to fetch stats from ingester: {}\n Error: {:?}",
ingester.domain_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

if !res.status().is_success() {
Expand Down
3 changes: 3 additions & 0 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ pub enum PostError {
CreateStream(#[from] CreateStreamError),
#[error("Error: {0}")]
CustomError(String),
#[error("Error: {0}")]
NetworkError(#[from] reqwest::Error),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -187,6 +189,7 @@ impl actix_web::ResponseError for PostError {
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
22 changes: 22 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
Ok(())
}

fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
match kind {
serde_json::error::Category::Io => StatusCode::INTERNAL_SERVER_ERROR,
serde_json::error::Category::Syntax => StatusCode::BAD_REQUEST,
serde_json::error::Category::Data => StatusCode::INTERNAL_SERVER_ERROR,
serde_json::error::Category::Eof => StatusCode::BAD_REQUEST,
}
}

pub mod error {

use actix_web::http::header::ContentType;
Expand All @@ -402,6 +411,8 @@ pub mod error {
validator::error::{AlertValidationError, StreamNameValidationError},
};

use super::classify_json_error;

#[derive(Debug, thiserror::Error)]
pub enum CreateStreamError {
#[error("Stream name validation failed due to {0}")]
Expand Down Expand Up @@ -446,6 +457,12 @@ pub mod error {
InvalidRetentionConfig(serde_json::Error),
#[error("{msg}")]
Custom { msg: String, status: StatusCode },
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("Network Error: {0}")]
Network(#[from] reqwest::Error),
#[error("Error: {0}")]
ResponseError(#[from] serde_json::Error),
}

impl actix_web::ResponseError for StreamError {
Expand All @@ -468,6 +485,11 @@ pub mod error {
StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidAlertMessage(_, _) => StatusCode::BAD_REQUEST,
StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST,
StreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
StreamError::Network(err) => {
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
StreamError::ResponseError(err) => classify_json_error(err.classify()),
}
}

Expand Down
13 changes: 8 additions & 5 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
Expand All @@ -41,6 +42,7 @@ use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingester;
Expand Down Expand Up @@ -76,11 +78,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
if let Ok(new_schema) = fetch_schema(&table_name).await {
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
commit_schema(&table_name, Arc::new(new_schema))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
}
}

Expand Down Expand Up @@ -291,6 +290,10 @@ pub enum QueryError {
Execute(#[from] ExecuteError),
#[error("Error: {0}")]
Custom(String),
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("Evern Error: {0}")]
EventError(#[from] EventError),
}

impl actix_web::ResponseError for QueryError {
Expand Down
2 changes: 2 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum ObjectStorageError {

#[error("Unhandled Error: {0}")]
UnhandledError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Error: {0}")]
PathError(relative_path::FromPathError),

#[allow(dead_code)]
#[error("Authentication Error: {0}")]
Expand Down
5 changes: 2 additions & 3 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,8 @@ impl ObjectStorage for S3 {

let byts = self
.get_object(
RelativePath::from_path(meta.location.as_ref()).map_err(|err| {
ObjectStorageError::Custom(format!("Error while getting files: {:}", err))
})?,
RelativePath::from_path(meta.location.as_ref())
.map_err(ObjectStorageError::PathError)?,
)
.await?;

Expand Down