Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
objectstore.delete_stream(&stream_name).await?;
metadata::STREAM_INFO.delete_stream(&stream_name);
event::STREAM_WRITERS.delete_stream(&stream_name);
stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| {
log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e)
});

let stream_dir = StorageDir::new(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
Expand Down Expand Up @@ -229,6 +232,10 @@ pub async fn put_retention(
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

Expand Down Expand Up @@ -363,3 +370,30 @@ pub mod error {
}
}
}

#[cfg(test)]
mod tests {
use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::http::logstream::get_stats;
use actix_web::test::TestRequest;
use anyhow::bail;

#[actix_web::test]
#[should_panic]
async fn get_stats_panics_without_logstream() {
let req = TestRequest::default().to_http_request();
let _ = get_stats(req).await;
}

#[actix_web::test]
async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> {
let req = TestRequest::default()
.param("logstream", "test")
.to_http_request();

match get_stats(req).await {
Err(StreamError::StreamNotFound(_)) => Ok(()),
_ => bail!("expected StreamNotFound error"),
}
}
}
28 changes: 25 additions & 3 deletions server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ pub struct Stats {
}

pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stats> {
let event_labels = event_labels(stream_name, format);
let storage_size_labels = storage_size_labels(stream_name);

let events_ingested = EVENTS_INGESTED
.get_metric_with_label_values(&[stream_name, format])
.get_metric_with_label_values(&event_labels)
.ok()?
.get();
let ingestion_size = EVENTS_INGESTED_SIZE
.get_metric_with_label_values(&[stream_name, format])
.get_metric_with_label_values(&event_labels)
.ok()?
.get();
let storage_size = STORAGE_SIZE
.get_metric_with_label_values(&["data", stream_name, "parquet"])
.get_metric_with_label_values(&storage_size_labels)
.ok()?
.get();
// this should be valid for all cases given that gauge must never go negative
Expand All @@ -49,3 +52,22 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stat
storage: storage_size,
})
}

pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Result<()> {
let event_labels = event_labels(stream_name, format);
let storage_size_labels = storage_size_labels(stream_name);

EVENTS_INGESTED.remove_label_values(&event_labels)?;
EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?;
STORAGE_SIZE.remove_label_values(&storage_size_labels)?;

Ok(())
}

fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] {
[stream_name, format]
}

fn storage_size_labels(stream_name: &str) -> [&str; 3] {
["data", stream_name, "parquet"]
}