Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
*/

use crate::about::{current, platform};
use crate::metadata;
use crate::option::CONFIG;
use crate::storage;
use crate::{metadata, stats};

use chrono::{DateTime, Utc};
use clokwerk::{AsyncScheduler, Interval};
Expand Down Expand Up @@ -113,7 +113,7 @@ fn total_event_stats() -> (u64, u64, u64) {
let mut total_json_bytes: u64 = 0;

for stream in metadata::STREAM_INFO.list_streams() {
let stats = metadata::STREAM_INFO.get_stats(&stream).unwrap();
let Some(stats) = stats::get_current_stats(&stream, "json") else {continue;};
total_events += stats.events;
total_parquet_bytes += stats.storage;
total_json_bytes += stats.ingestion;
Expand Down
8 changes: 3 additions & 5 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use chrono::Utc;
use serde_json::Value;

use crate::alerts::Alerts;
use crate::event;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::retention::{self, Retention};
use crate::storage::{LogStream, StorageDir};
use crate::{event, stats};
use crate::{metadata, validator};

use self::error::StreamError;
Expand Down Expand Up @@ -233,10 +233,8 @@ pub async fn put_retention(
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

let stats = match metadata::STREAM_INFO.get_stats(&stream_name) {
Ok(stats) => stats,
Err(_) => return Err(StreamError::StreamNotFound(stream_name)),
};
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let time = Utc::now();

Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn main() -> anyhow::Result<()> {
// track all parquet files already in the data directory
storage::retention::load_retention_from_global().await;
// load data from stats back to prometheus metrics
metrics::load_from_global_stats();
metrics::load_from_stats_from_storage().await;

let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
Expand Down
28 changes: 2 additions & 26 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::sync::{Arc, RwLock};

use crate::alerts::Alerts;
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
use crate::stats::{Stats, StatsCounter};
use crate::storage::{ObjectStorage, StorageDir};
use crate::utils::arrow::MergedRecordReader;

Expand All @@ -42,15 +41,13 @@ pub struct StreamInfo(RwLock<HashMap<String, LogStreamMetadata>>);
pub struct LogStreamMetadata {
pub schema: Arc<Schema>,
pub alerts: Alerts,
pub stats: StatsCounter,
}

impl Default for LogStreamMetadata {
fn default() -> Self {
Self {
schema: Arc::new(Schema::empty()),
alerts: Alerts::default(),
stats: StatsCounter::default(),
}
}
}
Expand Down Expand Up @@ -132,15 +129,10 @@ impl StreamInfo {
for stream in storage.list_streams().await? {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema(&stream.name).await?;
let stats = storage.get_stats(&stream.name).await?;

let schema = Arc::new(update_schema_from_staging(&stream.name, schema));

let metadata = LogStreamMetadata {
schema,
alerts,
stats: stats.into(),
};
let metadata = LogStreamMetadata { schema, alerts };

let mut map = self.write().expect(LOCK_EXPECT);

Expand All @@ -165,30 +157,14 @@ impl StreamInfo {
size: u64,
num_rows: u64,
) -> Result<(), MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
let stream = map
.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?;

stream.stats.add_ingestion_size(size);
stream.stats.increase_event_by_n(num_rows);
EVENTS_INGESTED
.with_label_values(&[stream_name, origin])
.inc();
.inc_by(num_rows);
EVENTS_INGESTED_SIZE
.with_label_values(&[stream_name, origin])
.add(size as i64);

Ok(())
}

pub fn get_stats(&self, stream_name: &str) -> Result<Stats, MetadataError> {
self.read()
.expect(LOCK_EXPECT)
.get(stream_name)
.map(|metadata| Stats::from(&metadata.stats))
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))
}
}

fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Schema {
Expand Down
12 changes: 9 additions & 3 deletions server/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
use once_cell::sync::Lazy;
use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry};

use crate::{handlers::http::metrics_path, metadata::STREAM_INFO};
use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, option::CONFIG};

pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");

Expand Down Expand Up @@ -122,9 +122,15 @@ fn prom_process_metrics(metrics: &PrometheusMetrics) {
#[cfg(not(target_os = "linux"))]
fn prom_process_metrics(_metrics: &PrometheusMetrics) {}

pub fn load_from_global_stats() {
pub async fn load_from_stats_from_storage() {
for stream_name in STREAM_INFO.list_streams() {
let stats = STREAM_INFO.get_stats(&stream_name).expect("stream exists");
let stats = CONFIG
.storage()
.get_object_store()
.get_stats(&stream_name)
.await
.expect("stats are loaded properly");

EVENTS_INGESTED
.with_label_values(&[&stream_name, "json"])
.inc_by(stats.events);
Expand Down
96 changes: 23 additions & 73 deletions server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,7 @@
*
*/

use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug)]
pub struct StatsCounter {
pub events_ingested: AtomicU64,
ingestion_size: AtomicU64,
storage_size: AtomicU64,
}

impl Default for StatsCounter {
fn default() -> Self {
Self {
events_ingested: AtomicU64::new(0),
ingestion_size: AtomicU64::new(0),
storage_size: AtomicU64::new(0),
}
}
}

impl PartialEq for StatsCounter {
fn eq(&self, other: &Self) -> bool {
self.ingestion_size() == other.ingestion_size()
&& self.storage_size() == other.storage_size()
}
}

impl StatsCounter {
pub fn new(ingestion_size: u64, storage_size: u64, event_ingested: u64) -> Self {
Self {
ingestion_size: ingestion_size.into(),
storage_size: storage_size.into(),
events_ingested: event_ingested.into(),
}
}

pub fn events_ingested(&self) -> u64 {
self.events_ingested.load(Ordering::Relaxed)
}

pub fn ingestion_size(&self) -> u64 {
self.ingestion_size.load(Ordering::Relaxed)
}

pub fn storage_size(&self) -> u64 {
self.storage_size.load(Ordering::Relaxed)
}

pub fn add_ingestion_size(&self, size: u64) {
self.ingestion_size.fetch_add(size, Ordering::AcqRel);
}

pub fn add_storage_size(&self, size: u64) {
self.storage_size.fetch_add(size, Ordering::AcqRel);
}

pub fn increase_event_by_n(&self, n: u64) {
self.events_ingested.fetch_add(n, Ordering::AcqRel);
}
}
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE, STORAGE_SIZE};

/// Helper struct type created by copying stats values from metadata
#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)]
Expand All @@ -84,18 +26,26 @@ pub struct Stats {
pub storage: u64,
}

impl From<&StatsCounter> for Stats {
fn from(stats: &StatsCounter) -> Self {
Self {
events: stats.events_ingested(),
ingestion: stats.ingestion_size(),
storage: stats.storage_size(),
}
}
}

impl From<Stats> for StatsCounter {
fn from(stats: Stats) -> Self {
StatsCounter::new(stats.ingestion, stats.storage, stats.events)
}
pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stats> {
let events_ingested = EVENTS_INGESTED
.get_metric_with_label_values(&[stream_name, format])
.ok()?
.get();
let ingestion_size = EVENTS_INGESTED_SIZE
.get_metric_with_label_values(&[stream_name, format])
.ok()?
.get();
let storage_size = STORAGE_SIZE
.get_metric_with_label_values(&["data", stream_name, "parquet"])
.ok()?
.get();
// this should be valid for all cases given that gauge must never go negative
let ingestion_size = ingestion_size as u64;
let storage_size = storage_size as u64;

Some(Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
})
}
14 changes: 5 additions & 9 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
metadata::STREAM_INFO,
metrics::{storage::StorageMetrics, STORAGE_SIZE},
option::CONFIG,
stats::Stats,
stats::{self, Stats},
};

use actix_web_prometheus::PrometheusMetrics;
Expand Down Expand Up @@ -286,14 +286,10 @@ pub trait ObjectStorage: Sync + 'static {
}

for (stream, compressed_size) in stream_stats {
let stats = STREAM_INFO.read().unwrap().get(stream).map(|metadata| {
metadata.stats.add_storage_size(compressed_size);
STORAGE_SIZE
.with_label_values(&["data", stream, "parquet"])
.add(compressed_size as i64);
Stats::from(&metadata.stats)
});

STORAGE_SIZE
.with_label_values(&["data", stream, "parquet"])
.add(compressed_size as i64);
let stats = stats::get_current_stats(stream, "json");
if let Some(stats) = stats {
if let Err(e) = self.put_stats(stream, &stats).await {
log::warn!("Error updating stats to objectstore due to error [{}]", e);
Expand Down