diff --git a/server/src/event.rs b/server/src/event.rs index 6fac8a684..b8b4672b6 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -213,6 +213,11 @@ impl Event { self.process_first_event::(event, inferred_schema)? }; + metadata::STREAM_INFO.update_stats( + &self.stream_name, + std::mem::size_of_val(self.body.as_bytes()) as u64, + )?; + if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { log::error!("Error checking for alerts. {:?}", e); } @@ -227,7 +232,7 @@ impl Event { &self, mut event: json::Reader, schema: Schema, - ) -> Result { + ) -> Result<(), EventError> { // note for functions _schema_with_map and _set_schema_with_map, // these are to be called while holding a write lock specifically. // this guarantees two things @@ -282,7 +287,7 @@ impl Event { } }); - Ok(0) + Ok(()) } } @@ -291,13 +296,13 @@ impl Event { fn process_event( &self, mut event: json::Reader, - ) -> Result { + ) -> Result<(), EventError> { let rb = event.next()?.ok_or(EventError::MissingRecord)?; let stream_name = &self.stream_name; STREAM_WRITERS::append_to_local(stream_name, &rb)?; - Ok(0) + Ok(()) } // inferSchema is a constructor to Schema diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 7784e0d36..afb15f7e4 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -20,6 +20,7 @@ use std::fs; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use chrono::Utc; use serde_json::Value; use crate::alerts::Alerts; @@ -302,6 +303,42 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> .to_http() } +pub async fn get_stats(req: HttpRequest) -> HttpResponse { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + let stats = match metadata::STREAM_INFO.get_stats(&stream_name) { + Ok(stats) => stats, + Err(e) => { + return response::ServerResponse { + msg: format!("Could not return stats due to error: {}", e), + code: StatusCode::BAD_REQUEST, + } + .to_http() + } + }; + + let time = Utc::now(); + + let stats = serde_json::json!({ + "stream": stream_name, + "time": time, + "ingestion": { + "size": format!("{} {}", stats.ingestion, "Bytes"), + "format": "json" + }, + "storage": { + "size": format!("{} {}", stats.storage, "Bytes"), + "format": "parquet" + } + }); + + response::ServerResponse { + msg: stats.to_string(), + code: StatusCode::OK, + } + .to_http() +} + fn remove_id_from_alerts(value: &mut Value) { if let Some(Value::Array(alerts)) = value.get_mut("alerts") { alerts diff --git a/server/src/main.rs b/server/src/main.rs index 3b336a90f..37476bf04 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -50,6 +50,7 @@ mod option; mod query; mod response; mod s3; +mod stats; mod storage; mod utils; mod validator; @@ -342,6 +343,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::resource(schema_path("{logstream}")) .route(web::get().to(handlers::logstream::schema)), ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource(stats_path("{logstream}")) + .route(web::get().to(handlers::logstream::get_stats)), + ) // GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command .service(web::resource(liveness_path()).route(web::get().to(handlers::liveness))) // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes @@ -399,3 +405,7 @@ fn alert_path(stream_name: &str) -> String { fn schema_path(stream_name: &str) -> String { format!("{}/schema", logstream_path(stream_name)) } + +fn stats_path(stream_name: &str) -> String { + format!("{}/stats", logstream_path(stream_name)) +} diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 89f521d53..a66522fcb 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,12 +18,12 @@ use datafusion::arrow::datatypes::Schema; use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::RwLock; use crate::alerts::Alerts; use crate::event::Event; +use crate::stats::{Stats, StatsCounter}; use crate::storage::ObjectStorage; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; @@ -32,25 +32,7 @@ use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; pub struct LogStreamMetadata { pub schema: Option, pub alerts: Alerts, - pub stats: Stats, -} - -#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)] -pub struct Stats { - pub size: u64, - pub compressed_size: u64, - #[serde(skip)] - pub prev_compressed: u64, -} - -impl Stats { - /// Update stats considering the following facts about params: - /// - `size`: The event body's binary size. - /// - `compressed_size`: Binary size of parquet file, total compressed_size is this plus size of all past parquet files. - pub fn update(&mut self, size: u64, compressed_size: u64) { - self.size += size; - self.compressed_size = self.prev_compressed + compressed_size; - } + pub stats: StatsCounter, } lazy_static! { @@ -138,11 +120,12 @@ impl STREAM_INFO { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; let schema = storage.get_schema(&stream.name).await?; + let stats = storage.get_stats(&stream.name).await?; let metadata = LogStreamMetadata { schema, alerts, - ..LogStreamMetadata::default() + stats: stats.into(), }; let mut map = self.write().expect(LOCK_EXPECT); @@ -161,22 +144,24 @@ impl STREAM_INFO { .collect() } - #[allow(dead_code)] - pub fn update_stats( - &self, - stream_name: &str, - size: u64, - compressed_size: u64, - ) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); + pub fn update_stats(&self, stream_name: &str, size: u64) -> Result<(), MetadataError> { + let map = self.read().expect(LOCK_EXPECT); let stream = map - .get_mut(stream_name) + .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; - stream.stats.update(size, compressed_size); + stream.stats.add_ingestion_size(size); Ok(()) } + + pub fn get_stats(&self, stream_name: &str) -> Result { + self.read() + .expect(LOCK_EXPECT) + .get(stream_name) + .map(|metadata| Stats::from(&metadata.stats)) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned())) + } } pub mod error { diff --git a/server/src/s3.rs b/server/src/s3.rs index bf84bb708..1c2fbc5d7 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -22,14 +22,15 @@ use http::Uri; use object_store::aws::AmazonS3Builder; use object_store::limit::LimitStore; use serde::{Deserialize, Serialize}; +use serde_json::Value; use std::fs; use std::iter::Iterator; use std::sync::Arc; use crate::alerts::Alerts; -use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; +use crate::stats::Stats; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; // Default object storage currently is DO Spaces bucket @@ -221,21 +222,27 @@ impl S3 { .key(format!("{}/.schema", stream_name)) .send() .await?; - // create .parseable.json file in the stream-name prefix. - // This indicates the format version for this stream. - // This is helpful in case we may change the backend format - // in the future + self._put_parseable_config(stream_name, format).await?; + // Prefix created on S3, now create the directory in + // the local storage as well + let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); + Ok(()) + } + + async fn _put_parseable_config( + &self, + stream_name: &str, + body: Vec, + ) -> Result<(), AwsSdkError> { let _resp = self .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) .key(format!("{}/.parseable.json", stream_name)) - .body(format.into()) + .body(body.into()) .send() .await?; - // Prefix created on S3, now create the directory in - // the local storage as well - let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); + Ok(()) } @@ -290,8 +297,8 @@ impl S3 { self._get(stream_name, "alert.json").await } - async fn _get_stats(&self, stream_name: &str) -> Result { - self._get(stream_name, "stats.json").await + async fn _get_parseable_config(&self, stream_name: &str) -> Result { + self._get(stream_name, "parseable.json").await } async fn _get(&self, stream_name: &str, resource: &str) -> Result { @@ -434,11 +441,30 @@ impl ObjectStorage for S3 { } async fn get_stats(&self, stream_name: &str) -> Result { - let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?; + let parseable_metadata = self._get_parseable_config(stream_name).await?; + let parseable_metadata: Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + let stats = &parseable_metadata["stats"]; + + let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); Ok(stats) } + async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { + let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); + let parseable_metadata = self._get_parseable_config(stream_name).await?; + let mut parseable_metadata: Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + parseable_metadata["stats"] = stats; + + self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes()) + .await?; + Ok(()) + } + async fn list_streams(&self) -> Result, ObjectStorageError> { let streams = self._list_streams().await?; diff --git a/server/src/stats.rs b/server/src/stats.rs new file mode 100644 index 000000000..778e68885 --- /dev/null +++ b/server/src/stats.rs @@ -0,0 +1,72 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug)] +pub struct StatsCounter { + ingestion_size: AtomicU64, + storage_size: AtomicU64, +} + +impl Default for StatsCounter { + fn default() -> Self { + Self { + ingestion_size: AtomicU64::new(0), + storage_size: AtomicU64::new(0), + } + } +} + +impl PartialEq for StatsCounter { + fn eq(&self, other: &Self) -> bool { + self.ingestion_size() == other.ingestion_size() + && self.storage_size() == other.storage_size() + } +} + +impl StatsCounter { + pub fn new(ingestion_size: u64, storage_size: u64) -> Self { + Self { + ingestion_size: AtomicU64::new(ingestion_size), + storage_size: AtomicU64::new(storage_size), + } + } + + pub fn ingestion_size(&self) -> u64 { + self.ingestion_size.load(Ordering::Relaxed) + } + + pub fn storage_size(&self) -> u64 { + self.storage_size.load(Ordering::Relaxed) + } + + pub fn add_ingestion_size(&self, size: u64) { + self.ingestion_size.fetch_add(size, Ordering::AcqRel); + } + + pub fn add_storage_size(&self, size: u64) { + self.storage_size.fetch_add(size, Ordering::AcqRel); + } +} + +/// Helper struct type created by copying stats values from metadata +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct Stats { + pub ingestion: u64, + pub storage: u64, +} + +impl From<&StatsCounter> for Stats { + fn from(stats: &StatsCounter) -> Self { + Self { + ingestion: stats.ingestion_size(), + storage: stats.storage_size(), + } + } +} + +impl From for StatsCounter { + fn from(stats: Stats) -> Self { + StatsCounter::new(stats.ingestion, stats.storage) + } +} diff --git a/server/src/storage.rs b/server/src/storage.rs index 3bc2ee0e0..027bc38d6 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -17,9 +17,10 @@ */ use crate::alerts::Alerts; -use crate::metadata::{Stats, STREAM_INFO}; +use crate::metadata::{LOCK_EXPECT, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; +use crate::stats::Stats; use crate::{event, utils}; use async_trait::async_trait; @@ -33,6 +34,7 @@ use datafusion::parquet::errors::ParquetError; use datafusion::parquet::file::properties::WriterProperties; use serde::Serialize; +use std::collections::HashMap; use std::fmt::Debug; use std::fs::{self, File}; use std::io; @@ -67,6 +69,7 @@ pub trait ObjectStorage: Sync + 'static { stream_name: &str, alerts: &Alerts, ) -> Result<(), ObjectStorageError>; + async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError>; async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn get_alerts(&self, stream_name: &str) -> Result; async fn get_stats(&self, stream_name: &str) -> Result; @@ -125,7 +128,9 @@ pub trait ObjectStorage: Sync + 'static { let streams = STREAM_INFO.list_streams(); - for stream in streams { + let mut stream_stats = HashMap::new(); + + for stream in &streams { // get dir let dir = StorageDir::new(stream.clone()); // walk dir, find all .tmp files and convert to parquet @@ -198,6 +203,12 @@ pub trait ObjectStorage: Sync + 'static { let s3_path = format!("{}/{}", stream, file_suffix); let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?; + + stream_stats + .entry(stream) + .and_modify(|size| *size += file.metadata().map(|meta| meta.len()).unwrap_or(0)) + .or_insert_with(|| file.metadata().map(|meta| meta.len()).unwrap_or(0)); + if let Err(e) = fs::remove_file(&file) { log::error!( "Error deleting parquet file in path {} due to error [{}]", @@ -207,6 +218,24 @@ pub trait ObjectStorage: Sync + 'static { } } } + + for (stream, compressed_size) in stream_stats { + let stats = STREAM_INFO + .read() + .expect(LOCK_EXPECT) + .get(stream) + .map(|metadata| { + metadata.stats.add_storage_size(compressed_size); + Stats::from(&metadata.stats) + }); + + if let Some(stats) = stats { + if let Err(e) = self.put_stats(stream, &stats).await { + log::warn!("Error updating stats to s3 due to error [{}]", e); + } + } + } + Ok(()) } } diff --git a/server/src/utils.rs b/server/src/utils.rs index 8356501e4..ecaa87003 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -243,7 +243,7 @@ impl TimePeriod { pub fn generate_prefixes(&self, prefix: &str) -> Vec { let prefix = format!("{}/", prefix); - let end_minute = self.end.minute() + if self.end.second() > 0 { 1 } else { 0 }; + let end_minute = self.end.minute() + u32::from(self.end.second() > 0); self.generate_date_prefixes( &prefix,