From c17557f6cb3a63f85b9551332fe53fd8674263f8 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 3 Nov 2022 15:45:39 +0530 Subject: [PATCH 1/5] Introduce stats API Adds stats implementation based on atomic u64 counters. On each event ingestion counter is updated and on every sync parquet file size is added to compressed counter. On call to metadata api for stats it produces a json serializable `Stats` instance which is copy of metadata stats counter. Stats are synced on every s3 sync cycle and is stored inside parseable.json file. --- server/Cargo.toml | 1 + server/src/event.rs | 13 ++++-- server/src/handlers/logstream.rs | 21 ++++++++++ server/src/main.rs | 10 +++++ server/src/metadata.rs | 47 +++++++-------------- server/src/s3.rs | 50 ++++++++++++++++------ server/src/stats.rs | 72 ++++++++++++++++++++++++++++++++ server/src/storage.rs | 33 ++++++++++++++- 8 files changed, 198 insertions(+), 49 deletions(-) create mode 100644 server/src/stats.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index a379af0bf..449f6b6da 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -67,6 +67,7 @@ zip = { version = "0.6.3", default_features = false, features = ["deflate"] } [dev-dependencies] maplit = "1.0.2" rstest = "0.15.0" +serde_test = "1.0" serial_test = { version = "0.9.0", default-features = false } [package.metadata.parseable_ui] diff --git a/server/src/event.rs b/server/src/event.rs index 6fac8a684..b8b4672b6 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -213,6 +213,11 @@ impl Event { self.process_first_event::(event, inferred_schema)? }; + metadata::STREAM_INFO.update_stats( + &self.stream_name, + std::mem::size_of_val(self.body.as_bytes()) as u64, + )?; + if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { log::error!("Error checking for alerts. {:?}", e); } @@ -227,7 +232,7 @@ impl Event { &self, mut event: json::Reader, schema: Schema, - ) -> Result { + ) -> Result<(), EventError> { // note for functions _schema_with_map and _set_schema_with_map, // these are to be called while holding a write lock specifically. // this guarantees two things @@ -282,7 +287,7 @@ impl Event { } }); - Ok(0) + Ok(()) } } @@ -291,13 +296,13 @@ impl Event { fn process_event( &self, mut event: json::Reader, - ) -> Result { + ) -> Result<(), EventError> { let rb = event.next()?.ok_or(EventError::MissingRecord)?; let stream_name = &self.stream_name; STREAM_WRITERS::append_to_local(stream_name, &rb)?; - Ok(0) + Ok(()) } // inferSchema is a constructor to Schema diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 7784e0d36..3cb10c128 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -302,6 +302,27 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> .to_http() } +pub async fn get_stats(req: HttpRequest) -> HttpResponse { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + let stats = metadata::STREAM_INFO + .get_stats(&stream_name) + .map(|ref stats| serde_json::to_string(stats).expect("stats can serialize to json")); + + match stats { + Ok(stats) => response::ServerResponse { + msg: stats, + code: StatusCode::OK, + } + .to_http(), + Err(e) => response::ServerResponse { + msg: format!("Could not return stats due to error: {}", e), + code: StatusCode::BAD_REQUEST, + } + .to_http(), + } +} + fn remove_id_from_alerts(value: &mut Value) { if let Some(Value::Array(alerts)) = value.get_mut("alerts") { alerts diff --git a/server/src/main.rs b/server/src/main.rs index 3b336a90f..37476bf04 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -50,6 +50,7 @@ mod option; mod query; mod response; mod s3; +mod stats; mod storage; mod utils; mod validator; @@ -342,6 +343,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::resource(schema_path("{logstream}")) .route(web::get().to(handlers::logstream::schema)), ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource(stats_path("{logstream}")) + .route(web::get().to(handlers::logstream::get_stats)), + ) // GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command .service(web::resource(liveness_path()).route(web::get().to(handlers::liveness))) // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes @@ -399,3 +405,7 @@ fn alert_path(stream_name: &str) -> String { fn schema_path(stream_name: &str) -> String { format!("{}/schema", logstream_path(stream_name)) } + +fn stats_path(stream_name: &str) -> String { + format!("{}/stats", logstream_path(stream_name)) +} diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 89f521d53..a66522fcb 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,12 +18,12 @@ use datafusion::arrow::datatypes::Schema; use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::RwLock; use crate::alerts::Alerts; use crate::event::Event; +use crate::stats::{Stats, StatsCounter}; use crate::storage::ObjectStorage; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; @@ -32,25 +32,7 @@ use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; pub struct LogStreamMetadata { pub schema: Option, pub alerts: Alerts, - pub stats: Stats, -} - -#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)] -pub struct Stats { - pub size: u64, - pub compressed_size: u64, - #[serde(skip)] - pub prev_compressed: u64, -} - -impl Stats { - /// Update stats considering the following facts about params: - /// - `size`: The event body's binary size. - /// - `compressed_size`: Binary size of parquet file, total compressed_size is this plus size of all past parquet files. - pub fn update(&mut self, size: u64, compressed_size: u64) { - self.size += size; - self.compressed_size = self.prev_compressed + compressed_size; - } + pub stats: StatsCounter, } lazy_static! { @@ -138,11 +120,12 @@ impl STREAM_INFO { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; let schema = storage.get_schema(&stream.name).await?; + let stats = storage.get_stats(&stream.name).await?; let metadata = LogStreamMetadata { schema, alerts, - ..LogStreamMetadata::default() + stats: stats.into(), }; let mut map = self.write().expect(LOCK_EXPECT); @@ -161,22 +144,24 @@ impl STREAM_INFO { .collect() } - #[allow(dead_code)] - pub fn update_stats( - &self, - stream_name: &str, - size: u64, - compressed_size: u64, - ) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); + pub fn update_stats(&self, stream_name: &str, size: u64) -> Result<(), MetadataError> { + let map = self.read().expect(LOCK_EXPECT); let stream = map - .get_mut(stream_name) + .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; - stream.stats.update(size, compressed_size); + stream.stats.add_ingestion_size(size); Ok(()) } + + pub fn get_stats(&self, stream_name: &str) -> Result { + self.read() + .expect(LOCK_EXPECT) + .get(stream_name) + .map(|metadata| Stats::from(&metadata.stats)) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned())) + } } pub mod error { diff --git a/server/src/s3.rs b/server/src/s3.rs index bf84bb708..1c2fbc5d7 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -22,14 +22,15 @@ use http::Uri; use object_store::aws::AmazonS3Builder; use object_store::limit::LimitStore; use serde::{Deserialize, Serialize}; +use serde_json::Value; use std::fs; use std::iter::Iterator; use std::sync::Arc; use crate::alerts::Alerts; -use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; +use crate::stats::Stats; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; // Default object storage currently is DO Spaces bucket @@ -221,21 +222,27 @@ impl S3 { .key(format!("{}/.schema", stream_name)) .send() .await?; - // create .parseable.json file in the stream-name prefix. - // This indicates the format version for this stream. - // This is helpful in case we may change the backend format - // in the future + self._put_parseable_config(stream_name, format).await?; + // Prefix created on S3, now create the directory in + // the local storage as well + let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); + Ok(()) + } + + async fn _put_parseable_config( + &self, + stream_name: &str, + body: Vec, + ) -> Result<(), AwsSdkError> { let _resp = self .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) .key(format!("{}/.parseable.json", stream_name)) - .body(format.into()) + .body(body.into()) .send() .await?; - // Prefix created on S3, now create the directory in - // the local storage as well - let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); + Ok(()) } @@ -290,8 +297,8 @@ impl S3 { self._get(stream_name, "alert.json").await } - async fn _get_stats(&self, stream_name: &str) -> Result { - self._get(stream_name, "stats.json").await + async fn _get_parseable_config(&self, stream_name: &str) -> Result { + self._get(stream_name, "parseable.json").await } async fn _get(&self, stream_name: &str, resource: &str) -> Result { @@ -434,11 +441,30 @@ impl ObjectStorage for S3 { } async fn get_stats(&self, stream_name: &str) -> Result { - let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?; + let parseable_metadata = self._get_parseable_config(stream_name).await?; + let parseable_metadata: Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + let stats = &parseable_metadata["stats"]; + + let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); Ok(stats) } + async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { + let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); + let parseable_metadata = self._get_parseable_config(stream_name).await?; + let mut parseable_metadata: Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + parseable_metadata["stats"] = stats; + + self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes()) + .await?; + Ok(()) + } + async fn list_streams(&self) -> Result, ObjectStorageError> { let streams = self._list_streams().await?; diff --git a/server/src/stats.rs b/server/src/stats.rs new file mode 100644 index 000000000..daee0fc2f --- /dev/null +++ b/server/src/stats.rs @@ -0,0 +1,72 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug)] +pub struct StatsCounter { + ingestion_size: AtomicU64, + storage_size: AtomicU64, +} + +impl Default for StatsCounter { + fn default() -> Self { + Self { + ingestion_size: AtomicU64::new(0), + storage_size: AtomicU64::new(0), + } + } +} + +impl PartialEq for StatsCounter { + fn eq(&self, other: &Self) -> bool { + self.ingestion_size() == other.ingestion_size() + && self.storage_size() == other.storage_size() + } +} + +impl StatsCounter { + pub fn new(ingestion_size: u64, storage_size: u64) -> Self { + Self { + ingestion_size: AtomicU64::new(ingestion_size), + storage_size: AtomicU64::new(storage_size), + } + } + + pub fn ingestion_size(&self) -> u64 { + self.ingestion_size.load(Ordering::Relaxed) + } + + pub fn storage_size(&self) -> u64 { + self.storage_size.load(Ordering::Relaxed) + } + + pub fn add_ingestion_size(&self, size: u64) { + self.ingestion_size.fetch_add(size, Ordering::AcqRel); + } + + pub fn add_storage_size(&self, size: u64) { + self.storage_size.fetch_add(size, Ordering::AcqRel); + } +} + +/// Helper struct type created by copying stats values from metadata +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct Stats { + ingestion: u64, + storage: u64, +} + +impl From<&StatsCounter> for Stats { + fn from(stats: &StatsCounter) -> Self { + Self { + ingestion: stats.ingestion_size(), + storage: stats.storage_size(), + } + } +} + +impl From for StatsCounter { + fn from(stats: Stats) -> Self { + StatsCounter::new(stats.ingestion, stats.storage) + } +} diff --git a/server/src/storage.rs b/server/src/storage.rs index 3bc2ee0e0..027bc38d6 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -17,9 +17,10 @@ */ use crate::alerts::Alerts; -use crate::metadata::{Stats, STREAM_INFO}; +use crate::metadata::{LOCK_EXPECT, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; +use crate::stats::Stats; use crate::{event, utils}; use async_trait::async_trait; @@ -33,6 +34,7 @@ use datafusion::parquet::errors::ParquetError; use datafusion::parquet::file::properties::WriterProperties; use serde::Serialize; +use std::collections::HashMap; use std::fmt::Debug; use std::fs::{self, File}; use std::io; @@ -67,6 +69,7 @@ pub trait ObjectStorage: Sync + 'static { stream_name: &str, alerts: &Alerts, ) -> Result<(), ObjectStorageError>; + async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError>; async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn get_alerts(&self, stream_name: &str) -> Result; async fn get_stats(&self, stream_name: &str) -> Result; @@ -125,7 +128,9 @@ pub trait ObjectStorage: Sync + 'static { let streams = STREAM_INFO.list_streams(); - for stream in streams { + let mut stream_stats = HashMap::new(); + + for stream in &streams { // get dir let dir = StorageDir::new(stream.clone()); // walk dir, find all .tmp files and convert to parquet @@ -198,6 +203,12 @@ pub trait ObjectStorage: Sync + 'static { let s3_path = format!("{}/{}", stream, file_suffix); let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?; + + stream_stats + .entry(stream) + .and_modify(|size| *size += file.metadata().map(|meta| meta.len()).unwrap_or(0)) + .or_insert_with(|| file.metadata().map(|meta| meta.len()).unwrap_or(0)); + if let Err(e) = fs::remove_file(&file) { log::error!( "Error deleting parquet file in path {} due to error [{}]", @@ -207,6 +218,24 @@ pub trait ObjectStorage: Sync + 'static { } } } + + for (stream, compressed_size) in stream_stats { + let stats = STREAM_INFO + .read() + .expect(LOCK_EXPECT) + .get(stream) + .map(|metadata| { + metadata.stats.add_storage_size(compressed_size); + Stats::from(&metadata.stats) + }); + + if let Some(stats) = stats { + if let Err(e) = self.put_stats(stream, &stats).await { + log::warn!("Error updating stats to s3 due to error [{}]", e); + } + } + } + Ok(()) } } From 83e6a23e23813128adb2f9befe9398bd8bb08f43 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 3 Nov 2022 16:35:58 +0530 Subject: [PATCH 2/5] Remove serde_test --- server/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 449f6b6da..a379af0bf 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -67,7 +67,6 @@ zip = { version = "0.6.3", default_features = false, features = ["deflate"] } [dev-dependencies] maplit = "1.0.2" rstest = "0.15.0" -serde_test = "1.0" serial_test = { version = "0.9.0", default-features = false } [package.metadata.parseable_ui] From 71deab6b6d6d410cd084ee32087e9054123c512a Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 3 Nov 2022 18:06:51 +0530 Subject: [PATCH 3/5] make Stats field pub --- server/src/stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/stats.rs b/server/src/stats.rs index daee0fc2f..778e68885 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -52,8 +52,8 @@ impl StatsCounter { /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct Stats { - ingestion: u64, - storage: u64, + pub ingestion: u64, + pub storage: u64, } impl From<&StatsCounter> for Stats { From df516231a022b6d6718da415a253e9c764a7f853 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 3 Nov 2022 18:07:21 +0530 Subject: [PATCH 4/5] Format API response --- server/src/handlers/logstream.rs | 42 ++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 3cb10c128..afb15f7e4 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -20,6 +20,7 @@ use std::fs; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use chrono::Utc; use serde_json::Value; use crate::alerts::Alerts; @@ -305,22 +306,37 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> pub async fn get_stats(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let stats = metadata::STREAM_INFO - .get_stats(&stream_name) - .map(|ref stats| serde_json::to_string(stats).expect("stats can serialize to json")); - - match stats { - Ok(stats) => response::ServerResponse { - msg: stats, - code: StatusCode::OK, + let stats = match metadata::STREAM_INFO.get_stats(&stream_name) { + Ok(stats) => stats, + Err(e) => { + return response::ServerResponse { + msg: format!("Could not return stats due to error: {}", e), + code: StatusCode::BAD_REQUEST, + } + .to_http() } - .to_http(), - Err(e) => response::ServerResponse { - msg: format!("Could not return stats due to error: {}", e), - code: StatusCode::BAD_REQUEST, + }; + + let time = Utc::now(); + + let stats = serde_json::json!({ + "stream": stream_name, + "time": time, + "ingestion": { + "size": format!("{} {}", stats.ingestion, "Bytes"), + "format": "json" + }, + "storage": { + "size": format!("{} {}", stats.storage, "Bytes"), + "format": "parquet" } - .to_http(), + }); + + response::ServerResponse { + msg: stats.to_string(), + code: StatusCode::OK, } + .to_http() } fn remove_id_from_alerts(value: &mut Value) { From 0436b85024d2b907e36da460e76cfc3ef9aa12aa Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 4 Nov 2022 11:19:59 +0530 Subject: [PATCH 5/5] bool to u32 --- server/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index 8356501e4..ecaa87003 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -243,7 +243,7 @@ impl TimePeriod { pub fn generate_prefixes(&self, prefix: &str) -> Vec { let prefix = format!("{}/", prefix); - let end_minute = self.end.minute() + if self.end.second() > 0 { 1 } else { 0 }; + let end_minute = self.end.minute() + u32::from(self.end.second() > 0); self.generate_date_prefixes( &prefix,