Skip to content

Commit 7bd8102

Browse files
committed
removed unwraps in favor of error propagation
1 parent 8c05958 commit 7bd8102

File tree

13 files changed

+75
-43
lines changed

13 files changed

+75
-43
lines changed

server/src/analytics.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub struct Report {
7474
}
7575

7676
impl Report {
77-
pub async fn new() -> Self {
77+
pub async fn new() -> anyhow::Result<Self> {
7878
let mut upt: f64 = 0.0;
7979
if let Ok(uptime) = uptime_lib::get() {
8080
upt = uptime.as_secs_f64();
@@ -91,9 +91,9 @@ impl Report {
9191
cpu_count = info.cpus().len();
9292
mem_total = info.total_memory();
9393
}
94-
let ingestor_metrics = fetch_ingestors_metrics().await;
94+
let ingestor_metrics = fetch_ingestors_metrics().await?;
9595

96-
Self {
96+
Ok(Self {
9797
deployment_id: storage::StorageMetadata::global().deployment_id,
9898
uptime: upt,
9999
report_created_at: Utc::now(),
@@ -113,7 +113,7 @@ impl Report {
113113
total_json_bytes: ingestor_metrics.4,
114114
total_parquet_bytes: ingestor_metrics.5,
115115
metrics: build_metrics().await,
116-
}
116+
})
117117
}
118118

119119
pub async fn send(&self) {
@@ -148,7 +148,7 @@ fn total_event_stats() -> (u64, u64, u64) {
148148
(total_events, total_json_bytes, total_parquet_bytes)
149149
}
150150

151-
async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
151+
async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
152152
let event_stats = total_event_stats();
153153
let mut node_metrics =
154154
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
@@ -181,24 +181,24 @@ async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
181181
.header(header::CONTENT_TYPE, "application/json")
182182
.send()
183183
.await
184-
.unwrap(); // should respond
184+
.expect("should respond");
185185

186-
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
186+
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?)?;
187187
vec.push(data);
188188
active_ingestors += 1;
189189
}
190190

191191
node_metrics.accumulate(&mut vec);
192192
}
193193

194-
(
194+
Ok((
195195
active_ingestors,
196196
offline_ingestors,
197197
node_metrics.stream_count,
198198
node_metrics.total_events_count,
199199
node_metrics.total_json_bytes,
200200
node_metrics.total_parquet_bytes,
201-
)
201+
))
202202
}
203203

204204
async fn build_metrics() -> HashMap<String, Value> {
@@ -220,14 +220,23 @@ async fn build_metrics() -> HashMap<String, Value> {
220220
metrics
221221
}
222222

223-
pub fn init_analytics_scheduler() {
223+
pub fn init_analytics_scheduler() -> anyhow::Result<()> {
224224
log::info!("Setting up schedular for anonymous user analytics");
225225

226226
let mut scheduler = AsyncScheduler::new();
227227
scheduler
228228
.every(ANALYTICS_SEND_INTERVAL_SECONDS)
229229
.run(move || async {
230-
Report::new().await.send().await;
230+
Report::new()
231+
.await
232+
.unwrap_or_else(|err| {
233+
// panicing because seperate thread
234+
// TODO: a better way to handle this
235+
log::error!("Error while sending analytics: {}", err.to_string());
236+
panic!("{}", err.to_string());
237+
})
238+
.send()
239+
.await;
231240
});
232241

233242
tokio::spawn(async move {
@@ -236,6 +245,8 @@ pub fn init_analytics_scheduler() {
236245
tokio::time::sleep(Duration::from_secs(10)).await;
237246
}
238247
});
248+
249+
Ok(())
239250
}
240251

241252
#[derive(Serialize, Deserialize, Default, Debug)]

server/src/handlers/http.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
8585
.await?
8686
.iter()
8787
// we should be able to unwrap as we know the data is valid schema
88-
.map(|byte_obj| serde_json::from_slice(byte_obj).unwrap())
88+
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
8989
.collect_vec();
9090

9191
let new_schema = Schema::try_merge(res)?;
@@ -97,7 +97,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
9797
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
9898
// send the query request to the ingestor
9999
let mut res = vec![];
100-
let ima = get_ingestor_info().await.unwrap();
100+
let ima = get_ingestor_info().await?;
101101

102102
for im in ima.iter() {
103103
let uri = format!(

server/src/handlers/http/cluster/mod.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use chrono::Utc;
3737
use http::StatusCode;
3838
use itertools::Itertools;
3939
use relative_path::RelativePathBuf;
40+
use serde::de::Error;
41+
use serde_json::error::Error as SerdeError;
4042
use serde_json::Value as JsonValue;
4143
use url::Url;
4244

@@ -262,9 +264,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
262264
StreamError::SerdeError(err)
263265
})?
264266
.get("staging")
265-
.unwrap()
267+
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
268+
"staging",
269+
)))?
266270
.as_str()
267-
.unwrap()
271+
.ok_or(StreamError::SerdeError(SerdeError::custom(
272+
"staging path not a string/ not provided",
273+
)))?
268274
.to_string();
269275

270276
(true, sp, None, status)
@@ -304,7 +310,9 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
304310
&ingestor.domain_name,
305311
base_path_without_preceding_slash()
306312
))
307-
.unwrap();
313+
.map_err(|err| {
314+
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
315+
})?;
308316

309317
let res = reqwest::Client::new()
310318
.get(uri)

server/src/handlers/http/logstream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
169169
}
170170

171171
if !body.is_empty() && static_schema_flag == "true" {
172-
let static_schema: StaticSchema = serde_json::from_slice(&body).unwrap();
172+
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
173173
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
174174
if let Ok(parsed_schema) = parsed_schema {
175175
schema = parsed_schema;
@@ -357,7 +357,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
357357
None
358358
};
359359

360-
let hash_map = STREAM_INFO.read().unwrap();
360+
let hash_map = STREAM_INFO.read().expect("Readable");
361361
let stream_meta = &hash_map
362362
.get(&stream_name)
363363
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
@@ -396,7 +396,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
396396
stats
397397
};
398398

399-
let stats = serde_json::to_value(stats).unwrap();
399+
let stats = serde_json::to_value(stats)?;
400400

401401
Ok((web::Json(stats), StatusCode::OK))
402402
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use actix_web::body::MessageBody;
4242
use actix_web::Scope;
4343
use actix_web::{web, App, HttpServer};
4444
use actix_web_prometheus::PrometheusMetrics;
45+
use anyhow::anyhow;
4546
use async_trait::async_trait;
4647
use base64::Engine;
4748
use itertools::Itertools;
@@ -202,12 +203,11 @@ impl IngestServer {
202203
return Ok(());
203204
};
204205

205-
let resource = serde_json::to_string(&resource)
206-
.unwrap()
206+
let resource = serde_json::to_string(&resource)?
207207
.try_into_bytes()
208-
.unwrap();
208+
.map_err(|err| anyhow!(err))?;
209209

210-
store.put_object(&path, resource.clone()).await?;
210+
store.put_object(&path, resource).await?;
211211

212212
Ok(())
213213
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl QueryServer {
180180
// all internal data structures populated now.
181181
// start the analytics scheduler if enabled
182182
if CONFIG.parseable.send_analytics {
183-
analytics::init_analytics_scheduler();
183+
analytics::init_analytics_scheduler()?;
184184
}
185185

186186
self.start(prometheus, CONFIG.parseable.openid.clone())

server/src/handlers/http/modal/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ impl Server {
443443
sync::object_store_sync();
444444

445445
if CONFIG.parseable.send_analytics {
446-
analytics::init_analytics_scheduler();
446+
analytics::init_analytics_scheduler()?;
447447
}
448448

449449
tokio::spawn(handlers::livetail::server());

server/src/handlers/http/query.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
7070
// create a visitor to extract the table name
7171
let mut visitor = TableScanVisitor::default();
7272
let _ = raw_logical_plan.visit(&mut visitor);
73-
let table_name = visitor.into_inner().pop().unwrap();
73+
let table_name = visitor
74+
.into_inner()
75+
.pop()
76+
.ok_or(QueryError::MalformedQuery(
77+
"No table found from sql".to_string(),
78+
))?;
7479

7580
if CONFIG.parseable.mode == Mode::Query {
7681
if let Ok(new_schema) = fetch_schema(&table_name).await {
@@ -278,6 +283,8 @@ pub enum QueryError {
278283
ObjectStorage(#[from] ObjectStorageError),
279284
#[error("Evern Error: {0}")]
280285
EventError(#[from] EventError),
286+
#[error("Error: {0}")]
287+
MalformedQuery(String),
281288
}
282289

283290
impl actix_web::ResponseError for QueryError {

server/src/handlers/livetail.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ fn extract_cookie(header: &MetadataMap) -> Option<Cookie> {
270270
.iter()
271271
.filter_map(|value| value.to_str().ok())
272272
.flat_map(Cookie::split_parse)
273-
.map(|value| value.unwrap())
273+
.map(|value| value.expect("cookie is parseable"))
274274
.collect();
275275

276276
cookies

server/src/metrics/prom_utils.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ struct StorageMetrics {
2222

2323
impl Default for Metrics {
2424
fn default() -> Self {
25-
let socket = get_url();
25+
let url = get_url();
2626
let address = format!(
2727
"http://{}:{}",
28-
socket.domain().unwrap(),
29-
socket.port().unwrap_or_default()
28+
url.domain()
29+
.unwrap_or(url.host_str().expect("should have a host")),
30+
url.port().unwrap_or_default()
3031
);
3132
Metrics {
3233
address,
@@ -68,11 +69,11 @@ impl Metrics {
6869
prom_dress.process_resident_memory_bytes += val;
6970
}
7071
} else if sample.metric == "parseable_storage_size" {
71-
if sample.labels.get("type").unwrap() == "data" {
72+
if sample.labels.get("type").expect("type is present") == "data" {
7273
if let PromValue::Gauge(val) = sample.value {
7374
prom_dress.parseable_storage_size.data += val;
7475
}
75-
} else if sample.labels.get("type").unwrap() == "staging" {
76+
} else if sample.labels.get("type").expect("type is present") == "staging" {
7677
if let PromValue::Gauge(val) = sample.value {
7778
prom_dress.parseable_storage_size.staging += val;
7879
}

0 commit comments

Comments
 (0)