diff --git a/Cargo.lock b/Cargo.lock index ce6f23575..16e427c8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3043,7 +3043,6 @@ dependencies = [ "http", "humantime-serde", "itertools", - "lazy_static", "log", "maplit", "num_cpus", diff --git a/server/Cargo.toml b/server/Cargo.toml index d6dc95dde..faa543e0f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -40,7 +40,6 @@ futures = "0.3" fs_extra = "1.3" http = "0.2" humantime-serde = "1.1" -lazy_static = "1.4" log = "0.4" num_cpus = "1.15" sysinfo = "0.28.4" diff --git a/server/src/analytics.rs b/server/src/analytics.rs index 9ea992821..9a91fb226 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -24,7 +24,7 @@ use crate::storage; use chrono::{DateTime, Utc}; use clokwerk::{AsyncScheduler, Interval}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -36,9 +36,7 @@ use ulid::Ulid; const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80"; const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1); -lazy_static! { - pub static ref SYS_INFO: Mutex = Mutex::new(System::new_all()); -} +pub static SYS_INFO: Lazy> = Lazy::new(|| Mutex::new(System::new_all())); pub fn refresh_sys_info() { let mut sys_info = SYS_INFO.lock().unwrap(); diff --git a/server/src/event.rs b/server/src/event.rs index da52c1b7d..64494d98b 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -91,7 +91,7 @@ impl Event { // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event(&self, schema_key: &str) -> Result<(), EventError> { - STREAM_WRITERS::append_to_local(&self.stream_name, schema_key, &self.rb)?; + STREAM_WRITERS.append_to_local(&self.stream_name, schema_key, &self.rb)?; Ok(()) } } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index bada41f55..05bdbbf0c 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -19,11 +19,13 @@ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use std::borrow::Borrow; use std::collections::HashMap; +use std::fmt::{self, Debug, Formatter}; use std::fs::{File, OpenOptions}; use std::io::Write; +use std::ops::{Deref, DerefMut}; use std::sync::{Mutex, RwLock}; use crate::storage::StorageDir; @@ -33,21 +35,44 @@ use self::errors::StreamWriterError; type ArrowWriter = StreamWriter; type LocalWriter = Mutex>>; -lazy_static! { - #[derive(Default)] - pub static ref STREAM_WRITERS: RwLock> = RwLock::new(WriterTable::new()); +pub static STREAM_WRITERS: Lazy = + Lazy::new(|| InnerStreamWriter(RwLock::new(WriterTable::new()))); + +/* + A wrapper type for global struct to implement methods over +*/ +pub struct InnerStreamWriter(RwLock>); + +impl Deref for InnerStreamWriter { + type Target = RwLock>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for InnerStreamWriter { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} +/* + Manually implmenting for the Type + since it depends on the types which are missing it +*/ +impl Debug for InnerStreamWriter { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("InnerStreamWriter { __private_field: () }") + } } -impl STREAM_WRITERS { +impl InnerStreamWriter { // append to a existing stream pub fn append_to_local( + &self, stream: &str, schema_key: &str, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let hashmap_guard = STREAM_WRITERS - .read() - .map_err(|_| StreamWriterError::RwPoisoned)?; + let hashmap_guard = self.read().map_err(|_| StreamWriterError::RwPoisoned)?; match hashmap_guard.get(stream, schema_key) { Some(localwriter) => { @@ -71,7 +96,7 @@ impl STREAM_WRITERS { None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock drop(hashmap_guard); - STREAM_WRITERS::create_entry(stream.to_owned(), schema_key.to_owned(), record)?; + self.create_entry(stream.to_owned(), schema_key.to_owned(), record)?; } }; Ok(()) @@ -80,13 +105,12 @@ impl STREAM_WRITERS { // create a new entry with new stream_writer // Only create entry for valid streams fn create_entry( + &self, stream: String, schema_key: String, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let mut hashmap_guard = STREAM_WRITERS - .write() - .map_err(|_| StreamWriterError::RwPoisoned)?; + let mut hashmap_guard = self.write().map_err(|_| StreamWriterError::RwPoisoned)?; let writer = init_new_stream_writer_file(&stream, &schema_key, record)?; @@ -95,14 +119,12 @@ impl STREAM_WRITERS { Ok(()) } - pub fn delete_stream(stream: &str) { - STREAM_WRITERS.write().unwrap().delete_stream(stream); + pub fn delete_stream(&self, stream: &str) { + self.write().unwrap().delete_stream(stream); } - pub fn unset_all() -> Result<(), StreamWriterError> { - let table = STREAM_WRITERS - .read() - .map_err(|_| StreamWriterError::RwPoisoned)?; + pub fn unset_all(&self) -> Result<(), StreamWriterError> { + let table = self.read().map_err(|_| StreamWriterError::RwPoisoned)?; for writer in table.iter() { if let Some(mut streamwriter) = writer diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index cd42e3e62..69f08adff 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -45,7 +45,7 @@ pub async fn delete(req: HttpRequest) -> Result { objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); - event::STREAM_WRITERS::delete_stream(&stream_name); + event::STREAM_WRITERS.delete_stream(&stream_name); let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { diff --git a/server/src/main.rs b/server/src/main.rs index dde50ac7c..90762cb8e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -194,7 +194,7 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<( scheduler .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) .run(move || { - if let Err(e) = crate::event::STREAM_WRITERS::unset_all() { + if let Err(e) = crate::event::STREAM_WRITERS.unset_all() { log::warn!("failed to sync local data. {:?}", e); } }); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 0d3387f39..2306ce6e9 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_schema::Schema; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -28,14 +28,14 @@ use crate::stats::{Stats, StatsCounter}; use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; +use derive_more::{Deref, DerefMut}; // TODO: make return type be of 'static lifetime instead of cloning -lazy_static! { - #[derive(Debug)] - // A read-write lock to allow multiple reads while and isolated write - pub static ref STREAM_INFO: RwLock> = - RwLock::new(HashMap::new()); -} +// A read-write lock to allow multiple reads while and isolated write +pub static STREAM_INFO: Lazy = Lazy::new(StreamInfo::default); + +#[derive(Debug, Deref, DerefMut, Default)] +pub struct StreamInfo(RwLock>); #[derive(Debug)] pub struct LogStreamMetadata { @@ -63,7 +63,7 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 3. When a stream is deleted (remove the entry from the map) // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) -impl STREAM_INFO { +impl StreamInfo { pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); let meta = map diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 820d8f0f9..c871cdd65 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -19,46 +19,61 @@ pub mod storage; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; use crate::{handlers::http::metrics_path, metadata::STREAM_INFO}; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); -lazy_static! { - pub static ref EVENTS_INGESTED: IntCounterVec = IntCounterVec::new( +pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { + IntCounterVec::new( Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), - &["stream", "format"] + &["stream", "format"], ) - .expect("metric can be created"); - pub static ref EVENTS_INGESTED_SIZE: IntGaugeVec = IntGaugeVec::new( + .expect("metric can be created") +}); + +pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( Opts::new("events_ingested_size", "Events ingested size bytes") .namespace(METRICS_NAMESPACE), - &["stream", "format"] + &["stream", "format"], ) - .expect("metric can be created"); - pub static ref STORAGE_SIZE: IntGaugeVec = IntGaugeVec::new( + .expect("metric can be created") +}); + +pub static STORAGE_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), - &["type", "stream", "format"] + &["type", "stream", "format"], ) - .expect("metric can be created"); - pub static ref STAGING_FILES: IntGaugeVec = IntGaugeVec::new( + .expect("metric can be created") +}); + +pub static STAGING_FILES: Lazy = Lazy::new(|| { + IntGaugeVec::new( Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), - &["stream"] + &["stream"], ) - .expect("metric can be created"); - pub static ref QUERY_EXECUTE_TIME: HistogramVec = HistogramVec::new( + .expect("metric can be created") +}); + +pub static QUERY_EXECUTE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE), - &["stream"] + &["stream"], ) - .expect("metric can be created"); - pub static ref ALERTS_STATES: IntCounterVec = IntCounterVec::new( + .expect("metric can be created") +}); + +pub static ALERTS_STATES: Lazy = Lazy::new(|| { + IntCounterVec::new( Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), - &["stream", "name", "state"] + &["stream", "name", "state"], ) - .expect("metric can be created"); -} + .expect("metric can be created") +}); fn custom_metrics(registry: &Registry) { registry diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs index d8ee85ef9..6d122ba39 100644 --- a/server/src/metrics/storage.rs +++ b/server/src/metrics/storage.rs @@ -24,19 +24,19 @@ pub trait StorageMetrics { pub mod localfs { use crate::{metrics::METRICS_NAMESPACE, storage::FSConfig}; - use lazy_static::lazy_static; + use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec}; use super::StorageMetrics; - lazy_static! { - pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new( + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") .namespace(METRICS_NAMESPACE), - &["method", "status"] + &["method", "status"], ) - .expect("metric can be created"); - } + .expect("metric can be created") + }); impl StorageMetrics for FSConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { @@ -50,19 +50,19 @@ pub mod localfs { pub mod s3 { use crate::{metrics::METRICS_NAMESPACE, storage::S3Config}; - use lazy_static::lazy_static; + use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec}; use super::StorageMetrics; - lazy_static! { - pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new( + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( HistogramOpts::new("s3_response_time", "S3 Request Latency") .namespace(METRICS_NAMESPACE), - &["method", "status"] + &["method", "status"], ) - .expect("metric can be created"); - } + .expect("metric can be created") + }); impl StorageMetrics for S3Config { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { diff --git a/server/src/option.rs b/server/src/option.rs index b43890824..f87c68d48 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -19,17 +19,16 @@ use clap::error::ErrorKind; use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; +use once_cell::sync::Lazy; use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; -lazy_static::lazy_static! { - #[derive(Debug)] - pub static ref CONFIG: Arc = Arc::new(Config::new()); -} +pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); +#[derive(Debug)] pub struct Config { pub parseable: Server, storage: Arc, diff --git a/server/src/storage.rs b/server/src/storage.rs index cbe47f213..f0bf16e64 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -25,14 +25,15 @@ use crate::utils; use chrono::{Local, NaiveDateTime, Timelike, Utc}; use datafusion::arrow::error::ArrowError; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::parquet::errors::ParquetError; -use lazy_static::lazy_static; +use derive_more::{Deref, DerefMut}; +use once_cell::sync::Lazy; use std::collections::HashMap; +use std::fmt::Debug; use std::fs::create_dir_all; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; mod file_link; mod localfs; @@ -182,12 +183,13 @@ async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), Object client.put_metadata(metadata).await } -lazy_static! { - pub static ref CACHED_FILES: Mutex> = Mutex::new(FileTable::new()); - pub static ref STORAGE_RUNTIME: Arc = CONFIG.storage().get_datafusion_runtime(); -} +pub static CACHED_FILES: Lazy = + Lazy::new(|| CachedFiles(Mutex::new(FileTable::new()))); + +#[derive(Debug, Deref, DerefMut)] +pub struct CachedFiles(Mutex>); -impl CACHED_FILES { +impl CachedFiles { pub fn track_parquet(&self) { let mut table = self.lock().expect("no poisoning"); STREAM_INFO diff --git a/server/src/storage/file_link.rs b/server/src/storage/file_link.rs index 90d2176a5..ebf38977f 100644 --- a/server/src/storage/file_link.rs +++ b/server/src/storage/file_link.rs @@ -70,6 +70,7 @@ impl Link for FileLink { } } +#[derive(Debug)] pub struct FileTable { inner: HashMap, } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 4b74e9077..e498d960a 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -61,7 +61,7 @@ pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; -pub trait ObjectStorageProvider: StorageMetrics { +pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { fn get_datafusion_runtime(&self) -> Arc; fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String;