Skip to content

Commit d481323

Browse files
authored
cleanup map_err (#724)
1 parent 5e124da commit d481323

File tree

7 files changed

+50
-64
lines changed

7 files changed

+50
-64
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ use super::modal::IngesterMetadata;
4141
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
4242
let ingester_infos = get_ingester_info().await.map_err(|err| {
4343
log::error!("Fatal: failed to get ingester info: {:?}", err);
44-
StreamError::Custom {
45-
msg: format!("failed to get ingester info\n{:?}", err),
46-
status: StatusCode::INTERNAL_SERVER_ERROR,
47-
}
44+
StreamError::Anyhow(err)
4845
})?;
4946

5047
let mut errored = false;
@@ -96,10 +93,7 @@ pub async fn fetch_stats_from_ingesters(
9693

9794
let ingester_infos = get_ingester_info().await.map_err(|err| {
9895
log::error!("Fatal: failed to get ingester info: {:?}", err);
99-
StreamError::Custom {
100-
msg: format!("failed to get ingester info\n{:?}", err),
101-
status: StatusCode::INTERNAL_SERVER_ERROR,
102-
}
96+
StreamError::Anyhow(err)
10397
})?;
10498

10599
for ingester in ingester_infos {
@@ -163,13 +157,7 @@ async fn send_stream_sync_request(
163157
ingester.domain_name,
164158
err
165159
);
166-
StreamError::Custom {
167-
msg: format!(
168-
"failed to forward create stream request to ingester: {}\n Error: {:?}",
169-
ingester.domain_name, err
170-
),
171-
status: StatusCode::INTERNAL_SERVER_ERROR,
172-
}
160+
StreamError::Network(err)
173161
})?;
174162

175163
if !res.status().is_success() {
@@ -178,14 +166,7 @@ async fn send_stream_sync_request(
178166
ingester.domain_name,
179167
res
180168
);
181-
return Err(StreamError::Custom {
182-
msg: format!(
183-
"failed to forward create stream request to ingester: {}\nResponse Returned: {:?}",
184-
ingester.domain_name,
185-
res.text().await.unwrap_or_default()
186-
),
187-
status: StatusCode::INTERNAL_SERVER_ERROR,
188-
});
169+
return Err(StreamError::Network(res.error_for_status().unwrap_err()));
189170
}
190171

191172
Ok(())
@@ -214,13 +195,7 @@ async fn send_stream_rollback_request(
214195
ingester.domain_name,
215196
err
216197
);
217-
StreamError::Custom {
218-
msg: format!(
219-
"failed to rollback stream creation: {}\n Error: {:?}",
220-
ingester.domain_name, err
221-
),
222-
status: StatusCode::INTERNAL_SERVER_ERROR,
223-
}
198+
StreamError::Network(err)
224199
})?;
225200

226201
// if the response is not successful, log the error and return a custom error
@@ -247,10 +222,7 @@ async fn send_stream_rollback_request(
247222
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
248223
let ingester_infos = get_ingester_info().await.map_err(|err| {
249224
log::error!("Fatal: failed to get ingester info: {:?}", err);
250-
StreamError::Custom {
251-
msg: format!("failed to get ingester info\n{:?}", err),
252-
status: StatusCode::INTERNAL_SERVER_ERROR,
253-
}
225+
StreamError::Anyhow(err)
254226
})?;
255227

256228
let mut infos = vec![];
@@ -275,19 +247,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
275247

276248
let resp_data = resp.bytes().await.map_err(|err| {
277249
log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err);
278-
StreamError::Custom {
279-
msg: format!("failed to parse ingester info to bytes: {:?}", err),
280-
status: StatusCode::INTERNAL_SERVER_ERROR,
281-
}
250+
StreamError::Network(err)
282251
})?;
283252

284253
let sp = serde_json::from_slice::<JsonValue>(&resp_data)
285254
.map_err(|err| {
286255
log::error!("Fatal: failed to parse ingester info: {:?}", err);
287-
StreamError::Custom {
288-
msg: format!("failed to parse ingester info: {:?}", err),
289-
status: StatusCode::INTERNAL_SERVER_ERROR,
290-
}
256+
StreamError::ResponseError(err)
291257
})?
292258
.get("staging")
293259
.unwrap()
@@ -321,7 +287,7 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
321287
pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
322288
let ingester_metadata = get_ingester_info().await.map_err(|err| {
323289
log::error!("Fatal: failed to get ingester info: {:?}", err);
324-
PostError::CustomError(err.to_string())
290+
PostError::Invalid(err)
325291
})?;
326292

327293
let mut dresses = vec![];
@@ -341,10 +307,7 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
341307
.await;
342308

343309
if let Ok(res) = res {
344-
let text = res
345-
.text()
346-
.await
347-
.map_err(|err| PostError::CustomError(err.to_string()))?;
310+
let text = res.text().await.map_err(PostError::NetworkError)?;
348311
let lines: Vec<Result<String, std::io::Error>> =
349312
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
350313

server/src/handlers/http/cluster/utils.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
123123
.iter()
124124
.map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
125125
.min()
126-
.unwrap(); // should never be None
126+
.unwrap(); // should never be None
127127

128128
// get the stream name
129129
let stream_name = stats[0].stream.clone();
@@ -138,7 +138,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
138138
None => Utc::now(), // current time ie the max time
139139
})
140140
.min()
141-
.unwrap(); // should never be None
141+
.unwrap(); // should never be None
142142

143143
let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);
144144

@@ -222,13 +222,7 @@ pub async fn send_stats_request(
222222
err
223223
);
224224

225-
StreamError::Custom {
226-
msg: format!(
227-
"failed to fetch stats from ingester: {}\n Error: {:?}",
228-
ingester.domain_name, err
229-
),
230-
status: StatusCode::INTERNAL_SERVER_ERROR,
231-
}
225+
StreamError::Network(err)
232226
})?;
233227

234228
if !res.status().is_success() {

server/src/handlers/http/ingest.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ pub enum PostError {
172172
CreateStream(#[from] CreateStreamError),
173173
#[error("Error: {0}")]
174174
CustomError(String),
175+
#[error("Error: {0}")]
176+
NetworkError(#[from] reqwest::Error),
175177
}
176178

177179
impl actix_web::ResponseError for PostError {
@@ -187,6 +189,7 @@ impl actix_web::ResponseError for PostError {
187189
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
188190
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
189191
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
192+
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
190193
}
191194
}
192195

server/src/handlers/http/logstream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
391391
Ok(())
392392
}
393393

394+
fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
395+
match kind {
396+
serde_json::error::Category::Io => StatusCode::INTERNAL_SERVER_ERROR,
397+
serde_json::error::Category::Syntax => StatusCode::BAD_REQUEST,
398+
serde_json::error::Category::Data => StatusCode::INTERNAL_SERVER_ERROR,
399+
serde_json::error::Category::Eof => StatusCode::BAD_REQUEST,
400+
}
401+
}
402+
394403
pub mod error {
395404

396405
use actix_web::http::header::ContentType;
@@ -402,6 +411,8 @@ pub mod error {
402411
validator::error::{AlertValidationError, StreamNameValidationError},
403412
};
404413

414+
use super::classify_json_error;
415+
405416
#[derive(Debug, thiserror::Error)]
406417
pub enum CreateStreamError {
407418
#[error("Stream name validation failed due to {0}")]
@@ -446,6 +457,12 @@ pub mod error {
446457
InvalidRetentionConfig(serde_json::Error),
447458
#[error("{msg}")]
448459
Custom { msg: String, status: StatusCode },
460+
#[error("Error: {0}")]
461+
Anyhow(#[from] anyhow::Error),
462+
#[error("Network Error: {0}")]
463+
Network(#[from] reqwest::Error),
464+
#[error("Error: {0}")]
465+
ResponseError(#[from] serde_json::Error),
449466
}
450467

451468
impl actix_web::ResponseError for StreamError {
@@ -468,6 +485,11 @@ pub mod error {
468485
StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST,
469486
StreamError::InvalidAlertMessage(_, _) => StatusCode::BAD_REQUEST,
470487
StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST,
488+
StreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
489+
StreamError::Network(err) => {
490+
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
491+
}
492+
StreamError::ResponseError(err) => classify_json_error(err.classify()),
471493
}
472494
}
473495

server/src/handlers/http/query.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::pin::Pin;
3030
use std::sync::Arc;
3131
use std::time::Instant;
3232

33+
use crate::event::error::EventError;
3334
use crate::handlers::http::fetch_schema;
3435

3536
use crate::event::commit_schema;
@@ -41,6 +42,7 @@ use crate::rbac::role::{Action, Permission};
4142
use crate::rbac::Users;
4243
use crate::response::QueryResponse;
4344
use crate::storage::object_storage::commit_schema_to_storage;
45+
use crate::storage::ObjectStorageError;
4446
use crate::utils::actix::extract_session_key_from_req;
4547

4648
use super::send_query_request_to_ingester;
@@ -76,11 +78,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
7678
if let Ok(new_schema) = fetch_schema(&table_name).await {
7779
commit_schema_to_storage(&table_name, new_schema.clone())
7880
.await
79-
.map_err(|err| {
80-
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
81-
})?;
82-
commit_schema(&table_name, Arc::new(new_schema))
83-
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
81+
.map_err(QueryError::ObjectStorage)?;
82+
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
8483
}
8584
}
8685

@@ -291,6 +290,10 @@ pub enum QueryError {
291290
Execute(#[from] ExecuteError),
292291
#[error("Error: {0}")]
293292
Custom(String),
293+
#[error("ObjectStorage Error: {0}")]
294+
ObjectStorage(#[from] ObjectStorageError),
295+
#[error("Evern Error: {0}")]
296+
EventError(#[from] EventError),
294297
}
295298

296299
impl actix_web::ResponseError for QueryError {

server/src/storage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ pub enum ObjectStorageError {
173173

174174
#[error("Unhandled Error: {0}")]
175175
UnhandledError(Box<dyn std::error::Error + Send + Sync + 'static>),
176+
#[error("Error: {0}")]
177+
PathError(relative_path::FromPathError),
176178

177179
#[allow(dead_code)]
178180
#[error("Authentication Error: {0}")]

server/src/storage/s3.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ impl ObjectStorage for S3 {
434434

435435
let byts = self
436436
.get_object(
437-
RelativePath::from_path(meta.location.as_ref()).map_err(|err| {
438-
ObjectStorageError::Custom(format!("Error while getting files: {:}", err))
439-
})?,
437+
RelativePath::from_path(meta.location.as_ref())
438+
.map_err(ObjectStorageError::PathError)?,
440439
)
441440
.await?;
442441

0 commit comments

Comments
 (0)