Skip to content

Commit 7cc0c36

Browse files
committed
Return 404 in get_stats if no metrics are present for the logstream. Deleting a logstream also deletes its corresponding stats. (#412)
1 parent 0626261 commit 7cc0c36

File tree

2 files changed

+75
-8
lines changed

2 files changed

+75
-8
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4646
objectstore.delete_stream(&stream_name).await?;
4747
metadata::STREAM_INFO.delete_stream(&stream_name);
4848
event::STREAM_WRITERS.delete_stream(&stream_name);
49+
stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| {
50+
log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e)
51+
});
4952

5053
let stream_dir = StorageDir::new(&stream_name);
5154
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
@@ -363,3 +366,30 @@ pub mod error {
363366
}
364367
}
365368
}
369+
370+
#[cfg(test)]
371+
mod tests {
372+
use crate::handlers::http::logstream::error::StreamError;
373+
use crate::handlers::http::logstream::get_stats;
374+
use actix_web::test::TestRequest;
375+
use anyhow::bail;
376+
377+
#[actix_web::test]
378+
#[should_panic]
379+
async fn get_stats_panics_without_logstream() {
380+
let req = TestRequest::default().to_http_request();
381+
let _ = get_stats(req).await;
382+
}
383+
384+
#[actix_web::test]
385+
async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> {
386+
let req = TestRequest::default()
387+
.param("logstream", "test")
388+
.to_http_request();
389+
390+
match get_stats(req).await {
391+
Err(StreamError::StreamNotFound(_)) => Ok(()),
392+
_ => bail!("expected StreamNotFound error"),
393+
}
394+
}
395+
}

server/src/stats.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,62 @@ pub struct Stats {
2727
}
2828

2929
pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stats> {
30+
let event_labels = event_labels(stream_name, format);
31+
let storage_size_labels = storage_size_labels(stream_name);
32+
3033
let events_ingested = EVENTS_INGESTED
31-
.get_metric_with_label_values(&[stream_name, format])
34+
.get_metric_with_label_values(&event_labels)
3235
.ok()?
3336
.get();
3437
let ingestion_size = EVENTS_INGESTED_SIZE
35-
.get_metric_with_label_values(&[stream_name, format])
38+
.get_metric_with_label_values(&event_labels)
3639
.ok()?
3740
.get();
3841
let storage_size = STORAGE_SIZE
39-
.get_metric_with_label_values(&["data", stream_name, "parquet"])
42+
.get_metric_with_label_values(&storage_size_labels)
4043
.ok()?
4144
.get();
4245
// this should be valid for all cases given that gauge must never go negative
4346
let ingestion_size = ingestion_size as u64;
4447
let storage_size = storage_size as u64;
4548

46-
Some(Stats {
47-
events: events_ingested,
48-
ingestion: ingestion_size,
49-
storage: storage_size,
50-
})
49+
// If all metrics are 0, it is presumed that 'stream_name' does not exist and therefore has no stats
50+
if events_ingested == 0 && ingestion_size == 0 && storage_size == 0 {
51+
None
52+
} else {
53+
Some(Stats {
54+
events: events_ingested,
55+
ingestion: ingestion_size,
56+
storage: storage_size,
57+
})
58+
}
59+
}
60+
61+
pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Result<()> {
62+
let event_labels = event_labels(stream_name, format);
63+
let storage_size_labels = storage_size_labels(stream_name);
64+
65+
EVENTS_INGESTED.remove_label_values(&event_labels)?;
66+
EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?;
67+
STORAGE_SIZE.remove_label_values(&storage_size_labels)?;
68+
69+
Ok(())
70+
}
71+
72+
fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] {
73+
[stream_name, format]
74+
}
75+
76+
fn storage_size_labels(stream_name: &str) -> [&str; 3] {
77+
["data", stream_name, "parquet"]
78+
}
79+
80+
#[cfg(test)]
81+
mod tests {
82+
use crate::stats::get_current_stats;
83+
84+
#[test]
85+
fn get_current_stats_returns_none_without_data() {
86+
assert!(get_current_stats("test", "json").is_none())
87+
}
5188
}

0 commit comments

Comments
 (0)