diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index a4f9b800c..8b62d8f18 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -46,6 +46,9 @@ pub async fn delete(req: HttpRequest) -> Result { objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); + stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { + log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) + }); let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { @@ -229,6 +232,10 @@ pub async fn put_retention( pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; @@ -363,3 +370,30 @@ pub mod error { } } } + +#[cfg(test)] +mod tests { + use crate::handlers::http::logstream::error::StreamError; + use crate::handlers::http::logstream::get_stats; + use actix_web::test::TestRequest; + use anyhow::bail; + + #[actix_web::test] + #[should_panic] + async fn get_stats_panics_without_logstream() { + let req = TestRequest::default().to_http_request(); + let _ = get_stats(req).await; + } + + #[actix_web::test] + async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { + let req = TestRequest::default() + .param("logstream", "test") + .to_http_request(); + + match get_stats(req).await { + Err(StreamError::StreamNotFound(_)) => Ok(()), + _ => bail!("expected StreamNotFound error"), + } + } +} diff --git a/server/src/stats.rs b/server/src/stats.rs index 0953bf640..c3588e931 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -27,16 +27,19 @@ pub struct Stats { } pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { + let event_labels = event_labels(stream_name, format); + let storage_size_labels = storage_size_labels(stream_name); + let events_ingested = EVENTS_INGESTED - .get_metric_with_label_values(&[stream_name, format]) + .get_metric_with_label_values(&event_labels) .ok()? .get(); let ingestion_size = EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&[stream_name, format]) + .get_metric_with_label_values(&event_labels) .ok()? .get(); let storage_size = STORAGE_SIZE - .get_metric_with_label_values(&["data", stream_name, "parquet"]) + .get_metric_with_label_values(&storage_size_labels) .ok()? .get(); // this should be valid for all cases given that gauge must never go negative @@ -49,3 +52,22 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option prometheus::Result<()> { + let event_labels = event_labels(stream_name, format); + let storage_size_labels = storage_size_labels(stream_name); + + EVENTS_INGESTED.remove_label_values(&event_labels)?; + EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; + STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + + Ok(()) +} + +fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] { + [stream_name, format] +} + +fn storage_size_labels(stream_name: &str) -> [&str; 3] { + ["data", stream_name, "parquet"] +}