From 429ab172729233ea7449fee4e78a03a1726f51cc Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 9 Jan 2023 19:30:55 +0530 Subject: [PATCH] Add events ingested counter in stats --- server/src/handlers/logstream.rs | 1 + server/src/metadata.rs | 1 + server/src/stats.rs | 21 +++++++++++++++++---- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 706f611b8..d9cdd45e9 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -213,6 +213,7 @@ pub async fn get_stats(req: HttpRequest) -> Result "stream": stream_name, "time": time, "ingestion": { + "count": stats.events, "size": format!("{} {}", stats.ingestion, "Bytes"), "format": "json" }, diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 161228ddf..075755bbd 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -154,6 +154,7 @@ impl STREAM_INFO { .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; stream.stats.add_ingestion_size(size); + stream.stats.increase_event_by_one(); Ok(()) } diff --git a/server/src/stats.rs b/server/src/stats.rs index 0a20572a5..35b020d33 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug)] pub struct StatsCounter { + events_ingested: AtomicU64, ingestion_size: AtomicU64, storage_size: AtomicU64, } @@ -29,6 +30,7 @@ pub struct StatsCounter { impl Default for StatsCounter { fn default() -> Self { Self { + events_ingested: AtomicU64::new(0), ingestion_size: AtomicU64::new(0), storage_size: AtomicU64::new(0), } @@ -43,13 +45,18 @@ impl PartialEq for StatsCounter { } impl StatsCounter { - pub fn new(ingestion_size: u64, storage_size: u64) -> Self { + pub fn new(ingestion_size: u64, storage_size: u64, event_ingested: u64) -> Self { Self { - ingestion_size: AtomicU64::new(ingestion_size), - storage_size: AtomicU64::new(storage_size), + ingestion_size: ingestion_size.into(), + storage_size: storage_size.into(), + events_ingested: event_ingested.into(), } } + pub fn events_ingested(&self) -> u64 { + self.events_ingested.load(Ordering::Relaxed) + } + pub fn ingestion_size(&self) -> u64 { self.ingestion_size.load(Ordering::Relaxed) } @@ -65,11 +72,16 @@ impl StatsCounter { pub fn add_storage_size(&self, size: u64) { self.storage_size.fetch_add(size, Ordering::AcqRel); } + + pub fn increase_event_by_one(&self) { + self.events_ingested.fetch_add(1, Ordering::Relaxed); + } } /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct Stats { + pub events: u64, pub ingestion: u64, pub storage: u64, } @@ -77,6 +89,7 @@ pub struct Stats { impl From<&StatsCounter> for Stats { fn from(stats: &StatsCounter) -> Self { Self { + events: stats.events_ingested(), ingestion: stats.ingestion_size(), storage: stats.storage_size(), } @@ -85,6 +98,6 @@ impl From<&StatsCounter> for Stats { impl From for StatsCounter { fn from(stats: Stats) -> Self { - StatsCounter::new(stats.ingestion, stats.storage) + StatsCounter::new(stats.ingestion, stats.storage, stats.events) } }