From cb350f885aadf5f8a69dfc30aa47f25b65652d0d Mon Sep 17 00:00:00 2001 From: artech-git <45896902+artech-git@users.noreply.github.com> Date: Wed, 12 Apr 2023 17:16:20 +0530 Subject: [PATCH 1/6] lazy_static init method substituted with once_cell lazy type 1.utilizing Lazy type for init wherever possible 2.for more types where static types demand some implementation a child struct is created to accomodate the values of the inner and type and traits Deref, DerefMut & Debug are explicitly implemented --- server/src/analytics.rs | 6 +-- server/src/event.rs | 2 +- server/src/event/writer.rs | 51 ++++++++++++++----- server/src/handlers/http/logstream.rs | 2 +- server/src/main.rs | 2 +- server/src/metadata.rs | 27 +++++++--- server/src/metrics/mod.rs | 71 ++++++++++++++------------- server/src/metrics/storage.rs | 34 ++++++------- server/src/option.rs | 7 ++- server/src/storage.rs | 34 +++++++++++-- server/src/storage/object_storage.rs | 2 +- 11 files changed, 150 insertions(+), 88 deletions(-) diff --git a/server/src/analytics.rs b/server/src/analytics.rs index 9ea992821..bab9e88b5 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -24,7 +24,7 @@ use crate::storage; use chrono::{DateTime, Utc}; use clokwerk::{AsyncScheduler, Interval}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -36,9 +36,7 @@ use ulid::Ulid; const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80"; const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1); -lazy_static! { - pub static ref SYS_INFO: Mutex = Mutex::new(System::new_all()); -} +pub static SYS_INFO: Lazy> = Lazy::new( || Mutex::new(System::new_all())); pub fn refresh_sys_info() { let mut sys_info = SYS_INFO.lock().unwrap(); diff --git a/server/src/event.rs b/server/src/event.rs index da52c1b7d..64494d98b 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -91,7 +91,7 @@ impl Event { // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event(&self, schema_key: &str) -> Result<(), EventError> { - STREAM_WRITERS::append_to_local(&self.stream_name, schema_key, &self.rb)?; + STREAM_WRITERS.append_to_local(&self.stream_name, schema_key, &self.rb)?; Ok(()) } } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index bada41f55..0b1603526 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -19,12 +19,14 @@ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use std::borrow::Borrow; use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::io::Write; use std::sync::{Mutex, RwLock}; +use std::ops::{Deref, DerefMut}; +use std::fmt::{self, Debug, Formatter}; use crate::storage::StorageDir; @@ -33,19 +35,43 @@ use self::errors::StreamWriterError; type ArrowWriter = StreamWriter; type LocalWriter = Mutex>>; -lazy_static! { - #[derive(Default)] - pub static ref STREAM_WRITERS: RwLock> = RwLock::new(WriterTable::new()); +pub static STREAM_WRITERS: Lazy = Lazy::new( || InnerStreamWriter(RwLock::new(WriterTable::new()))); + +/* + A wrapper type for global struct to implement methods over +*/ +pub struct InnerStreamWriter(RwLock>); + +impl Deref for InnerStreamWriter { + type Target = RwLock>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for InnerStreamWriter { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} +/* + Manually implmenting for the Type + since it depends on the types which are missing it + */ +impl Debug for InnerStreamWriter { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("InnerStreamWriter { __private_field: () }") + } } -impl STREAM_WRITERS { +impl InnerStreamWriter { // append to a existing stream pub fn append_to_local( + &self, stream: &str, schema_key: &str, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let hashmap_guard = STREAM_WRITERS + let hashmap_guard = self .read() .map_err(|_| StreamWriterError::RwPoisoned)?; @@ -71,7 +97,7 @@ impl STREAM_WRITERS { None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock drop(hashmap_guard); - STREAM_WRITERS::create_entry(stream.to_owned(), schema_key.to_owned(), record)?; + self.create_entry(stream.to_owned(), schema_key.to_owned(), record)?; } }; Ok(()) @@ -80,11 +106,12 @@ impl STREAM_WRITERS { // create a new entry with new stream_writer // Only create entry for valid streams fn create_entry( + &self, stream: String, schema_key: String, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let mut hashmap_guard = STREAM_WRITERS + let mut hashmap_guard = self .write() .map_err(|_| StreamWriterError::RwPoisoned)?; @@ -95,12 +122,12 @@ impl STREAM_WRITERS { Ok(()) } - pub fn delete_stream(stream: &str) { - STREAM_WRITERS.write().unwrap().delete_stream(stream); + pub fn delete_stream(&self, stream: &str) { + self.write().unwrap().delete_stream(stream); } - pub fn unset_all() -> Result<(), StreamWriterError> { - let table = STREAM_WRITERS + pub fn unset_all(&self) -> Result<(), StreamWriterError> { + let table = self .read() .map_err(|_| StreamWriterError::RwPoisoned)?; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index cd42e3e62..69f08adff 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -45,7 +45,7 @@ pub async fn delete(req: HttpRequest) -> Result { objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); - event::STREAM_WRITERS::delete_stream(&stream_name); + event::STREAM_WRITERS.delete_stream(&stream_name); let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { diff --git a/server/src/main.rs b/server/src/main.rs index dde50ac7c..90762cb8e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -194,7 +194,7 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<( scheduler .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) .run(move || { - if let Err(e) = crate::event::STREAM_WRITERS::unset_all() { + if let Err(e) = crate::event::STREAM_WRITERS.unset_all() { log::warn!("failed to sync local data. {:?}", e); } }); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 0d3387f39..fe5b07a96 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -17,9 +17,10 @@ */ use arrow_schema::Schema; -use lazy_static::lazy_static; use std::collections::HashMap; +use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; +use once_cell::sync::Lazy; use crate::alerts::Alerts; use crate::event::Event; @@ -30,11 +31,23 @@ use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; // TODO: make return type be of 'static lifetime instead of cloning -lazy_static! { - #[derive(Debug)] - // A read-write lock to allow multiple reads while and isolated write - pub static ref STREAM_INFO: RwLock> = - RwLock::new(HashMap::new()); +// A read-write lock to allow multiple reads while and isolated write +pub static STREAM_INFO: Lazy = + Lazy::new( || StreamType(RwLock::new(HashMap::new()))); + +#[derive(Debug)] +pub struct StreamType(RwLock>); + +impl Deref for StreamType { + type Target = RwLock>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for StreamType { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } #[derive(Debug)] @@ -63,7 +76,7 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 3. When a stream is deleted (remove the entry from the map) // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) -impl STREAM_INFO { +impl StreamType { pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); let meta = map diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 820d8f0f9..1595c450d 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -19,46 +19,49 @@ pub mod storage; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; use crate::{handlers::http::metrics_path, metadata::STREAM_INFO}; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); -lazy_static! { - pub static ref EVENTS_INGESTED: IntCounterVec = IntCounterVec::new( - Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), - &["stream", "format"] - ) - .expect("metric can be created"); - pub static ref EVENTS_INGESTED_SIZE: IntGaugeVec = IntGaugeVec::new( - Opts::new("events_ingested_size", "Events ingested size bytes") - .namespace(METRICS_NAMESPACE), - &["stream", "format"] - ) - .expect("metric can be created"); - pub static ref STORAGE_SIZE: IntGaugeVec = IntGaugeVec::new( - Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), - &["type", "stream", "format"] - ) - .expect("metric can be created"); - pub static ref STAGING_FILES: IntGaugeVec = IntGaugeVec::new( - Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), - &["stream"] - ) - .expect("metric can be created"); - pub static ref QUERY_EXECUTE_TIME: HistogramVec = HistogramVec::new( - HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE), - &["stream"] - ) - .expect("metric can be created"); - pub static ref ALERTS_STATES: IntCounterVec = IntCounterVec::new( - Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), - &["stream", "name", "state"] - ) - .expect("metric can be created"); -} +pub static EVENTS_INGESTED: Lazy = Lazy::new( || IntCounterVec::new( + Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), + &["stream", "format"] +) +.expect("metric can be created")); + +pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| IntGaugeVec::new( + Opts::new("events_ingested_size", "Events ingested size bytes") + .namespace(METRICS_NAMESPACE), + &["stream", "format"] +) +.expect("metric can be created")); + +pub static STORAGE_SIZE: Lazy = Lazy::new(|| IntGaugeVec::new( + Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), + &["type", "stream", "format"] +) +.expect("metric can be created")); + +pub static STAGING_FILES: Lazy = Lazy::new(|| IntGaugeVec::new( + Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), + &["stream"] +) +.expect("metric can be created")); + +pub static QUERY_EXECUTE_TIME: Lazy = Lazy::new(|| HistogramVec::new( + HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE), + &["stream"] +) +.expect("metric can be created")); + +pub static ALERTS_STATES: Lazy = Lazy::new( || IntCounterVec::new( + Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), + &["stream", "name", "state"] +) +.expect("metric can be created")); fn custom_metrics(registry: &Registry) { registry diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs index d8ee85ef9..879a01477 100644 --- a/server/src/metrics/storage.rs +++ b/server/src/metrics/storage.rs @@ -24,19 +24,18 @@ pub trait StorageMetrics { pub mod localfs { use crate::{metrics::METRICS_NAMESPACE, storage::FSConfig}; - use lazy_static::lazy_static; + use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec}; use super::StorageMetrics; - lazy_static! { - pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new( - HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"] - ) - .expect("metric can be created"); - } + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new( || HistogramVec::new( + HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"] + ) + .expect("metric can be created")); + impl StorageMetrics for FSConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { @@ -50,19 +49,18 @@ pub mod localfs { pub mod s3 { use crate::{metrics::METRICS_NAMESPACE, storage::S3Config}; - use lazy_static::lazy_static; + use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec}; use super::StorageMetrics; - lazy_static! { - pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new( - HistogramOpts::new("s3_response_time", "S3 Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"] - ) - .expect("metric can be created"); - } + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new( || HistogramVec::new( + HistogramOpts::new("s3_response_time", "S3 Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"] + ) + .expect("metric can be created")); + impl StorageMetrics for S3Config { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { diff --git a/server/src/option.rs b/server/src/option.rs index b43890824..d0a69e3a0 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -21,15 +21,14 @@ use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use once_cell::sync::Lazy; use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; -lazy_static::lazy_static! { - #[derive(Debug)] - pub static ref CONFIG: Arc = Arc::new(Config::new()); -} +pub static CONFIG: Lazy> = Lazy::new( || Arc::new(Config::new())); +#[derive(Debug)] pub struct Config { pub parseable: Server, storage: Arc, diff --git a/server/src/storage.rs b/server/src/storage.rs index cbe47f213..4aa34ad90 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -27,12 +27,14 @@ use chrono::{Local, NaiveDateTime, Timelike, Utc}; use datafusion::arrow::error::ArrowError; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::parquet::errors::ParquetError; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use std::collections::HashMap; +use std::fmt::Formatter; use std::fs::create_dir_all; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use std::ops::{Deref, DerefMut}; mod file_link; mod localfs; @@ -182,12 +184,34 @@ async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), Object client.put_metadata(metadata).await } -lazy_static! { - pub static ref CACHED_FILES: Mutex> = Mutex::new(FileTable::new()); - pub static ref STORAGE_RUNTIME: Arc = CONFIG.storage().get_datafusion_runtime(); +pub static CACHED_FILES: Lazy = Lazy::new(|| CachedFilesInnerType(Mutex::new(FileTable::new())) ); + +pub struct CachedFilesInnerType(Mutex>); + +impl Deref for CachedFilesInnerType { + + type Target = Mutex>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for CachedFilesInnerType { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Debug for CachedFilesInnerType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("CachedFilesInnerType { __private_field: () }") + } } -impl CACHED_FILES { +pub static STORAGE_RUNTIME: Lazy> = Lazy::new( || CONFIG.storage().get_datafusion_runtime()); + +impl CachedFilesInnerType { pub fn track_parquet(&self) { let mut table = self.lock().expect("no poisoning"); STREAM_INFO diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 4b74e9077..e498d960a 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -61,7 +61,7 @@ pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; -pub trait ObjectStorageProvider: StorageMetrics { +pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { fn get_datafusion_runtime(&self) -> Arc; fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String; From 4520f1f693560a46db22ed1665240c7265db07d6 Mon Sep 17 00:00:00 2001 From: artech-git <45896902+artech-git@users.noreply.github.com> Date: Wed, 12 Apr 2023 17:23:28 +0530 Subject: [PATCH 2/6] lazy_static crate dependency removed --- Cargo.lock | 1 - server/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce6f23575..16e427c8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3043,7 +3043,6 @@ dependencies = [ "http", "humantime-serde", "itertools", - "lazy_static", "log", "maplit", "num_cpus", diff --git a/server/Cargo.toml b/server/Cargo.toml index d6dc95dde..faa543e0f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -40,7 +40,6 @@ futures = "0.3" fs_extra = "1.3" http = "0.2" humantime-serde = "1.1" -lazy_static = "1.4" log = "0.4" num_cpus = "1.15" sysinfo = "0.28.4" From 59037aa1ee7a9e301ecf568647b7b3ff7f18dabe Mon Sep 17 00:00:00 2001 From: artech-git <45896902+artech-git@users.noreply.github.com> Date: Wed, 12 Apr 2023 18:06:04 +0530 Subject: [PATCH 3/6] [FIX] Missing Traits and Types imported --- server/src/storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage.rs b/server/src/storage.rs index 4aa34ad90..19c2899c4 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -30,7 +30,7 @@ use datafusion::parquet::errors::ParquetError; use once_cell::sync::Lazy; use std::collections::HashMap; -use std::fmt::Formatter; +use std::fmt::{self, Debug, Formatter}; use std::fs::create_dir_all; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; From 32208a6f8ac01971936c7fc76a465e9c1483f7fb Mon Sep 17 00:00:00 2001 From: artech-git <45896902+artech-git@users.noreply.github.com> Date: Wed, 12 Apr 2023 18:07:33 +0530 Subject: [PATCH 4/6] [FIX] Cargo formatting applied --- server/src/analytics.rs | 2 +- server/src/event/writer.rs | 29 +++++------- server/src/metadata.rs | 9 ++-- server/src/metrics/mod.rs | 84 ++++++++++++++++++++--------------- server/src/metrics/storage.rs | 30 +++++++------ server/src/option.rs | 4 +- server/src/storage.rs | 9 ++-- 7 files changed, 88 insertions(+), 79 deletions(-) diff --git a/server/src/analytics.rs b/server/src/analytics.rs index bab9e88b5..9a91fb226 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -36,7 +36,7 @@ use ulid::Ulid; const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80"; const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1); -pub static SYS_INFO: Lazy> = Lazy::new( || Mutex::new(System::new_all())); +pub static SYS_INFO: Lazy> = Lazy::new(|| Mutex::new(System::new_all())); pub fn refresh_sys_info() { let mut sys_info = SYS_INFO.lock().unwrap(); diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 0b1603526..05bdbbf0c 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -22,11 +22,11 @@ use arrow_ipc::writer::StreamWriter; use once_cell::sync::Lazy; use std::borrow::Borrow; use std::collections::HashMap; +use std::fmt::{self, Debug, Formatter}; use std::fs::{File, OpenOptions}; use std::io::Write; -use std::sync::{Mutex, RwLock}; use std::ops::{Deref, DerefMut}; -use std::fmt::{self, Debug, Formatter}; +use std::sync::{Mutex, RwLock}; use crate::storage::StorageDir; @@ -35,16 +35,17 @@ use self::errors::StreamWriterError; type ArrowWriter = StreamWriter; type LocalWriter = Mutex>>; -pub static STREAM_WRITERS: Lazy = Lazy::new( || InnerStreamWriter(RwLock::new(WriterTable::new()))); +pub static STREAM_WRITERS: Lazy = + Lazy::new(|| InnerStreamWriter(RwLock::new(WriterTable::new()))); /* - A wrapper type for global struct to implement methods over + A wrapper type for global struct to implement methods over */ pub struct InnerStreamWriter(RwLock>); impl Deref for InnerStreamWriter { type Target = RwLock>; - fn deref(&self) -> &Self::Target { + fn deref(&self) -> &Self::Target { &self.0 } } @@ -54,9 +55,9 @@ impl DerefMut for InnerStreamWriter { } } /* - Manually implmenting for the Type - since it depends on the types which are missing it - */ + Manually implmenting for the Type + since it depends on the types which are missing it +*/ impl Debug for InnerStreamWriter { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.write_str("InnerStreamWriter { __private_field: () }") @@ -71,9 +72,7 @@ impl InnerStreamWriter { schema_key: &str, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let hashmap_guard = self - .read() - .map_err(|_| StreamWriterError::RwPoisoned)?; + let hashmap_guard = self.read().map_err(|_| StreamWriterError::RwPoisoned)?; match hashmap_guard.get(stream, schema_key) { Some(localwriter) => { @@ -111,9 +110,7 @@ impl InnerStreamWriter { schema_key: String, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let mut hashmap_guard = self - .write() - .map_err(|_| StreamWriterError::RwPoisoned)?; + let mut hashmap_guard = self.write().map_err(|_| StreamWriterError::RwPoisoned)?; let writer = init_new_stream_writer_file(&stream, &schema_key, record)?; @@ -127,9 +124,7 @@ impl InnerStreamWriter { } pub fn unset_all(&self) -> Result<(), StreamWriterError> { - let table = self - .read() - .map_err(|_| StreamWriterError::RwPoisoned)?; + let table = self.read().map_err(|_| StreamWriterError::RwPoisoned)?; for writer in table.iter() { if let Some(mut streamwriter) = writer diff --git a/server/src/metadata.rs b/server/src/metadata.rs index fe5b07a96..7d876144d 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -17,10 +17,10 @@ */ use arrow_schema::Schema; +use once_cell::sync::Lazy; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; -use once_cell::sync::Lazy; use crate::alerts::Alerts; use crate::event::Event; @@ -32,19 +32,18 @@ use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; // TODO: make return type be of 'static lifetime instead of cloning // A read-write lock to allow multiple reads while and isolated write -pub static STREAM_INFO: Lazy = - Lazy::new( || StreamType(RwLock::new(HashMap::new()))); +pub static STREAM_INFO: Lazy = Lazy::new(|| StreamType(RwLock::new(HashMap::new()))); #[derive(Debug)] pub struct StreamType(RwLock>); -impl Deref for StreamType { +impl Deref for StreamType { type Target = RwLock>; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for StreamType { +impl DerefMut for StreamType { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 1595c450d..c871cdd65 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -26,42 +26,54 @@ use crate::{handlers::http::metrics_path, metadata::STREAM_INFO}; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); -pub static EVENTS_INGESTED: Lazy = Lazy::new( || IntCounterVec::new( - Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), - &["stream", "format"] -) -.expect("metric can be created")); - -pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| IntGaugeVec::new( - Opts::new("events_ingested_size", "Events ingested size bytes") - .namespace(METRICS_NAMESPACE), - &["stream", "format"] -) -.expect("metric can be created")); - -pub static STORAGE_SIZE: Lazy = Lazy::new(|| IntGaugeVec::new( - Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), - &["type", "stream", "format"] -) -.expect("metric can be created")); - -pub static STAGING_FILES: Lazy = Lazy::new(|| IntGaugeVec::new( - Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), - &["stream"] -) -.expect("metric can be created")); - -pub static QUERY_EXECUTE_TIME: Lazy = Lazy::new(|| HistogramVec::new( - HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE), - &["stream"] -) -.expect("metric can be created")); - -pub static ALERTS_STATES: Lazy = Lazy::new( || IntCounterVec::new( - Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), - &["stream", "name", "state"] -) -.expect("metric can be created")); +pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_ingested_size", "Events ingested size bytes") + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static STORAGE_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + +pub static STAGING_FILES: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), + &["stream"], + ) + .expect("metric can be created") +}); + +pub static QUERY_EXECUTE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE), + &["stream"], + ) + .expect("metric can be created") +}); + +pub static ALERTS_STATES: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), + &["stream", "name", "state"], + ) + .expect("metric can be created") +}); fn custom_metrics(registry: &Registry) { registry diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs index 879a01477..6d122ba39 100644 --- a/server/src/metrics/storage.rs +++ b/server/src/metrics/storage.rs @@ -29,13 +29,14 @@ pub mod localfs { use super::StorageMetrics; - pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new( || HistogramVec::new( - HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"] - ) - .expect("metric can be created")); - + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); impl StorageMetrics for FSConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { @@ -54,13 +55,14 @@ pub mod s3 { use super::StorageMetrics; - pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new( || HistogramVec::new( - HistogramOpts::new("s3_response_time", "S3 Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"] - ) - .expect("metric can be created")); - + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("s3_response_time", "S3 Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); impl StorageMetrics for S3Config { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { diff --git a/server/src/option.rs b/server/src/option.rs index d0a69e3a0..f87c68d48 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -19,14 +19,14 @@ use clap::error::ErrorKind; use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; +use once_cell::sync::Lazy; use std::path::{Path, PathBuf}; use std::sync::Arc; -use once_cell::sync::Lazy; use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; -pub static CONFIG: Lazy> = Lazy::new( || Arc::new(Config::new())); +pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] pub struct Config { diff --git a/server/src/storage.rs b/server/src/storage.rs index 19c2899c4..9452eb8ef 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -32,9 +32,9 @@ use once_cell::sync::Lazy; use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; use std::fs::create_dir_all; +use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; -use std::ops::{Deref, DerefMut}; mod file_link; mod localfs; @@ -184,12 +184,12 @@ async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), Object client.put_metadata(metadata).await } -pub static CACHED_FILES: Lazy = Lazy::new(|| CachedFilesInnerType(Mutex::new(FileTable::new())) ); +pub static CACHED_FILES: Lazy = + Lazy::new(|| CachedFilesInnerType(Mutex::new(FileTable::new()))); pub struct CachedFilesInnerType(Mutex>); impl Deref for CachedFilesInnerType { - type Target = Mutex>; fn deref(&self) -> &Self::Target { @@ -209,7 +209,8 @@ impl Debug for CachedFilesInnerType { } } -pub static STORAGE_RUNTIME: Lazy> = Lazy::new( || CONFIG.storage().get_datafusion_runtime()); +pub static STORAGE_RUNTIME: Lazy> = + Lazy::new(|| CONFIG.storage().get_datafusion_runtime()); impl CachedFilesInnerType { pub fn track_parquet(&self) { From 441a272324d478c1d3a5d964aac19ae9fe7742b5 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 12 Apr 2023 19:32:56 +0530 Subject: [PATCH 5/6] use derive_more instead of manual impl --- server/src/metadata.rs | 22 +++++-------------- server/src/storage.rs | 39 +++++++-------------------------- server/src/storage/file_link.rs | 1 + 3 files changed, 14 insertions(+), 48 deletions(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 7d876144d..2306ce6e9 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -19,7 +19,6 @@ use arrow_schema::Schema; use once_cell::sync::Lazy; use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; use crate::alerts::Alerts; @@ -29,25 +28,14 @@ use crate::stats::{Stats, StatsCounter}; use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; +use derive_more::{Deref, DerefMut}; // TODO: make return type be of 'static lifetime instead of cloning // A read-write lock to allow multiple reads while and isolated write -pub static STREAM_INFO: Lazy = Lazy::new(|| StreamType(RwLock::new(HashMap::new()))); +pub static STREAM_INFO: Lazy = Lazy::new(StreamInfo::default); -#[derive(Debug)] -pub struct StreamType(RwLock>); - -impl Deref for StreamType { - type Target = RwLock>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl DerefMut for StreamType { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} +#[derive(Debug, Deref, DerefMut, Default)] +pub struct StreamInfo(RwLock>); #[derive(Debug)] pub struct LogStreamMetadata { @@ -75,7 +63,7 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 3. When a stream is deleted (remove the entry from the map) // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) -impl StreamType { +impl StreamInfo { pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); let meta = map diff --git a/server/src/storage.rs b/server/src/storage.rs index 9452eb8ef..f0bf16e64 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -25,16 +25,15 @@ use crate::utils; use chrono::{Local, NaiveDateTime, Timelike, Utc}; use datafusion::arrow::error::ArrowError; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::parquet::errors::ParquetError; +use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; use std::collections::HashMap; -use std::fmt::{self, Debug, Formatter}; +use std::fmt::Debug; use std::fs::create_dir_all; -use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; mod file_link; mod localfs; @@ -184,35 +183,13 @@ async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), Object client.put_metadata(metadata).await } -pub static CACHED_FILES: Lazy = - Lazy::new(|| CachedFilesInnerType(Mutex::new(FileTable::new()))); +pub static CACHED_FILES: Lazy = + Lazy::new(|| CachedFiles(Mutex::new(FileTable::new()))); -pub struct CachedFilesInnerType(Mutex>); +#[derive(Debug, Deref, DerefMut)] +pub struct CachedFiles(Mutex>); -impl Deref for CachedFilesInnerType { - type Target = Mutex>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for CachedFilesInnerType { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Debug for CachedFilesInnerType { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.write_str("CachedFilesInnerType { __private_field: () }") - } -} - -pub static STORAGE_RUNTIME: Lazy> = - Lazy::new(|| CONFIG.storage().get_datafusion_runtime()); - -impl CachedFilesInnerType { +impl CachedFiles { pub fn track_parquet(&self) { let mut table = self.lock().expect("no poisoning"); STREAM_INFO diff --git a/server/src/storage/file_link.rs b/server/src/storage/file_link.rs index 90d2176a5..ebf38977f 100644 --- a/server/src/storage/file_link.rs +++ b/server/src/storage/file_link.rs @@ -70,6 +70,7 @@ impl Link for FileLink { } } +#[derive(Debug)] pub struct FileTable { inner: HashMap, } From c7e2f1609a127d2b71541bda9a75add65c70c59d Mon Sep 17 00:00:00 2001 From: artech-git <45896902+artech-git@users.noreply.github.com> Date: Wed, 12 Apr 2023 20:09:20 +0530 Subject: [PATCH 6/6] [FIX] STORAGE_RUNTIME removed for clippy warning --- server/src/storage.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/storage.rs b/server/src/storage.rs index 9452eb8ef..882e31ad9 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -25,7 +25,6 @@ use crate::utils; use chrono::{Local, NaiveDateTime, Timelike, Utc}; use datafusion::arrow::error::ArrowError; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::parquet::errors::ParquetError; use once_cell::sync::Lazy; @@ -34,7 +33,7 @@ use std::fmt::{self, Debug, Formatter}; use std::fs::create_dir_all; use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; mod file_link; mod localfs; @@ -209,9 +208,6 @@ impl Debug for CachedFilesInnerType { } } -pub static STORAGE_RUNTIME: Lazy> = - Lazy::new(|| CONFIG.storage().get_datafusion_runtime()); - impl CachedFilesInnerType { pub fn track_parquet(&self) { let mut table = self.lock().expect("no poisoning");