Skip to content

Commit f4ad0ad

Browse files
artech-gittrueleo
andauthored
lazy_static init method substituted with once_cell lazy type (#363)
* Utilise Lazy type for init wherever possible * For more types where static types demand some implementation, a child struct is created to accommodate the values of the inner and type and traits Deref, DerefMut & Debug are explicitly implemented Co-authored-by: Satyam Singh <[email protected]>
1 parent 5fa2c49 commit f4ad0ad

File tree

14 files changed

+116
-81
lines changed

14 files changed

+116
-81
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ futures = "0.3"
4040
fs_extra = "1.3"
4141
http = "0.2"
4242
humantime-serde = "1.1"
43-
lazy_static = "1.4"
4443
log = "0.4"
4544
num_cpus = "1.15"
4645
sysinfo = "0.28.4"

server/src/analytics.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::storage;
2424

2525
use chrono::{DateTime, Utc};
2626
use clokwerk::{AsyncScheduler, Interval};
27-
use lazy_static::lazy_static;
27+
use once_cell::sync::Lazy;
2828
use serde::{Deserialize, Serialize};
2929
use serde_json::Value;
3030
use std::collections::HashMap;
@@ -36,9 +36,7 @@ use ulid::Ulid;
3636
const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
3737
const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1);
3838

39-
lazy_static! {
40-
pub static ref SYS_INFO: Mutex<System> = Mutex::new(System::new_all());
41-
}
39+
pub static SYS_INFO: Lazy<Mutex<System>> = Lazy::new(|| Mutex::new(System::new_all()));
4240

4341
pub fn refresh_sys_info() {
4442
let mut sys_info = SYS_INFO.lock().unwrap();

server/src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl Event {
9191
// event process all events after the 1st event. Concatenates record batches
9292
// and puts them in memory store for each event.
9393
fn process_event(&self, schema_key: &str) -> Result<(), EventError> {
94-
STREAM_WRITERS::append_to_local(&self.stream_name, schema_key, &self.rb)?;
94+
STREAM_WRITERS.append_to_local(&self.stream_name, schema_key, &self.rb)?;
9595
Ok(())
9696
}
9797
}

server/src/event/writer.rs

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
use arrow_array::RecordBatch;
2121
use arrow_ipc::writer::StreamWriter;
22-
use lazy_static::lazy_static;
22+
use once_cell::sync::Lazy;
2323
use std::borrow::Borrow;
2424
use std::collections::HashMap;
25+
use std::fmt::{self, Debug, Formatter};
2526
use std::fs::{File, OpenOptions};
2627
use std::io::Write;
28+
use std::ops::{Deref, DerefMut};
2729
use std::sync::{Mutex, RwLock};
2830

2931
use crate::storage::StorageDir;
@@ -33,21 +35,44 @@ use self::errors::StreamWriterError;
3335
type ArrowWriter<T> = StreamWriter<T>;
3436
type LocalWriter<T> = Mutex<Option<ArrowWriter<T>>>;
3537

36-
lazy_static! {
37-
#[derive(Default)]
38-
pub static ref STREAM_WRITERS: RwLock<WriterTable<String, String, File>> = RwLock::new(WriterTable::new());
38+
pub static STREAM_WRITERS: Lazy<InnerStreamWriter> =
39+
Lazy::new(|| InnerStreamWriter(RwLock::new(WriterTable::new())));
40+
41+
/*
42+
A wrapper type for global struct to implement methods over
43+
*/
44+
pub struct InnerStreamWriter(RwLock<WriterTable<String, String, File>>);
45+
46+
impl Deref for InnerStreamWriter {
47+
type Target = RwLock<WriterTable<String, String, File>>;
48+
fn deref(&self) -> &Self::Target {
49+
&self.0
50+
}
51+
}
52+
impl DerefMut for InnerStreamWriter {
53+
fn deref_mut(&mut self) -> &mut Self::Target {
54+
&mut self.0
55+
}
56+
}
57+
/*
58+
Manually implmenting for the Type
59+
since it depends on the types which are missing it
60+
*/
61+
impl Debug for InnerStreamWriter {
62+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
63+
f.write_str("InnerStreamWriter { __private_field: () }")
64+
}
3965
}
4066

41-
impl STREAM_WRITERS {
67+
impl InnerStreamWriter {
4268
// append to a existing stream
4369
pub fn append_to_local(
70+
&self,
4471
stream: &str,
4572
schema_key: &str,
4673
record: &RecordBatch,
4774
) -> Result<(), StreamWriterError> {
48-
let hashmap_guard = STREAM_WRITERS
49-
.read()
50-
.map_err(|_| StreamWriterError::RwPoisoned)?;
75+
let hashmap_guard = self.read().map_err(|_| StreamWriterError::RwPoisoned)?;
5176

5277
match hashmap_guard.get(stream, schema_key) {
5378
Some(localwriter) => {
@@ -71,7 +96,7 @@ impl STREAM_WRITERS {
7196
None => {
7297
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
7398
drop(hashmap_guard);
74-
STREAM_WRITERS::create_entry(stream.to_owned(), schema_key.to_owned(), record)?;
99+
self.create_entry(stream.to_owned(), schema_key.to_owned(), record)?;
75100
}
76101
};
77102
Ok(())
@@ -80,13 +105,12 @@ impl STREAM_WRITERS {
80105
// create a new entry with new stream_writer
81106
// Only create entry for valid streams
82107
fn create_entry(
108+
&self,
83109
stream: String,
84110
schema_key: String,
85111
record: &RecordBatch,
86112
) -> Result<(), StreamWriterError> {
87-
let mut hashmap_guard = STREAM_WRITERS
88-
.write()
89-
.map_err(|_| StreamWriterError::RwPoisoned)?;
113+
let mut hashmap_guard = self.write().map_err(|_| StreamWriterError::RwPoisoned)?;
90114

91115
let writer = init_new_stream_writer_file(&stream, &schema_key, record)?;
92116

@@ -95,14 +119,12 @@ impl STREAM_WRITERS {
95119
Ok(())
96120
}
97121

98-
pub fn delete_stream(stream: &str) {
99-
STREAM_WRITERS.write().unwrap().delete_stream(stream);
122+
pub fn delete_stream(&self, stream: &str) {
123+
self.write().unwrap().delete_stream(stream);
100124
}
101125

102-
pub fn unset_all() -> Result<(), StreamWriterError> {
103-
let table = STREAM_WRITERS
104-
.read()
105-
.map_err(|_| StreamWriterError::RwPoisoned)?;
126+
pub fn unset_all(&self) -> Result<(), StreamWriterError> {
127+
let table = self.read().map_err(|_| StreamWriterError::RwPoisoned)?;
106128

107129
for writer in table.iter() {
108130
if let Some(mut streamwriter) = writer

server/src/handlers/http/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4545

4646
objectstore.delete_stream(&stream_name).await?;
4747
metadata::STREAM_INFO.delete_stream(&stream_name);
48-
event::STREAM_WRITERS::delete_stream(&stream_name);
48+
event::STREAM_WRITERS.delete_stream(&stream_name);
4949

5050
let stream_dir = StorageDir::new(&stream_name);
5151
if fs::remove_dir_all(&stream_dir.data_path).is_err() {

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<(
194194
scheduler
195195
.every((storage::LOCAL_SYNC_INTERVAL as u32).seconds())
196196
.run(move || {
197-
if let Err(e) = crate::event::STREAM_WRITERS::unset_all() {
197+
if let Err(e) = crate::event::STREAM_WRITERS.unset_all() {
198198
log::warn!("failed to sync local data. {:?}", e);
199199
}
200200
});

server/src/metadata.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use arrow_schema::Schema;
20-
use lazy_static::lazy_static;
20+
use once_cell::sync::Lazy;
2121
use std::collections::HashMap;
2222
use std::sync::{Arc, RwLock};
2323

@@ -28,14 +28,14 @@ use crate::stats::{Stats, StatsCounter};
2828
use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir};
2929

3030
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
31+
use derive_more::{Deref, DerefMut};
3132

3233
// TODO: make return type be of 'static lifetime instead of cloning
33-
lazy_static! {
34-
#[derive(Debug)]
35-
// A read-write lock to allow multiple reads while and isolated write
36-
pub static ref STREAM_INFO: RwLock<HashMap<String, LogStreamMetadata>> =
37-
RwLock::new(HashMap::new());
38-
}
34+
// A read-write lock to allow multiple reads while and isolated write
35+
pub static STREAM_INFO: Lazy<StreamInfo> = Lazy::new(StreamInfo::default);
36+
37+
#[derive(Debug, Deref, DerefMut, Default)]
38+
pub struct StreamInfo(RwLock<HashMap<String, LogStreamMetadata>>);
3939

4040
#[derive(Debug)]
4141
pub struct LogStreamMetadata {
@@ -63,7 +63,7 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding
6363
// 3. When a stream is deleted (remove the entry from the map)
6464
// 4. When first event is sent to stream (update the schema)
6565
// 5. When set alert API is called (update the alert)
66-
impl STREAM_INFO {
66+
impl StreamInfo {
6767
pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> {
6868
let map = self.read().expect(LOCK_EXPECT);
6969
let meta = map

server/src/metrics/mod.rs

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,61 @@
1919
pub mod storage;
2020

2121
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
22-
use lazy_static::lazy_static;
22+
use once_cell::sync::Lazy;
2323
use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry};
2424

2525
use crate::{handlers::http::metrics_path, metadata::STREAM_INFO};
2626

2727
pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");
2828

29-
lazy_static! {
30-
pub static ref EVENTS_INGESTED: IntCounterVec = IntCounterVec::new(
29+
pub static EVENTS_INGESTED: Lazy<IntCounterVec> = Lazy::new(|| {
30+
IntCounterVec::new(
3131
Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE),
32-
&["stream", "format"]
32+
&["stream", "format"],
3333
)
34-
.expect("metric can be created");
35-
pub static ref EVENTS_INGESTED_SIZE: IntGaugeVec = IntGaugeVec::new(
34+
.expect("metric can be created")
35+
});
36+
37+
pub static EVENTS_INGESTED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
38+
IntGaugeVec::new(
3639
Opts::new("events_ingested_size", "Events ingested size bytes")
3740
.namespace(METRICS_NAMESPACE),
38-
&["stream", "format"]
41+
&["stream", "format"],
3942
)
40-
.expect("metric can be created");
41-
pub static ref STORAGE_SIZE: IntGaugeVec = IntGaugeVec::new(
43+
.expect("metric can be created")
44+
});
45+
46+
pub static STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
47+
IntGaugeVec::new(
4248
Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE),
43-
&["type", "stream", "format"]
49+
&["type", "stream", "format"],
4450
)
45-
.expect("metric can be created");
46-
pub static ref STAGING_FILES: IntGaugeVec = IntGaugeVec::new(
51+
.expect("metric can be created")
52+
});
53+
54+
pub static STAGING_FILES: Lazy<IntGaugeVec> = Lazy::new(|| {
55+
IntGaugeVec::new(
4756
Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE),
48-
&["stream"]
57+
&["stream"],
4958
)
50-
.expect("metric can be created");
51-
pub static ref QUERY_EXECUTE_TIME: HistogramVec = HistogramVec::new(
59+
.expect("metric can be created")
60+
});
61+
62+
pub static QUERY_EXECUTE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
63+
HistogramVec::new(
5264
HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE),
53-
&["stream"]
65+
&["stream"],
5466
)
55-
.expect("metric can be created");
56-
pub static ref ALERTS_STATES: IntCounterVec = IntCounterVec::new(
67+
.expect("metric can be created")
68+
});
69+
70+
pub static ALERTS_STATES: Lazy<IntCounterVec> = Lazy::new(|| {
71+
IntCounterVec::new(
5772
Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE),
58-
&["stream", "name", "state"]
73+
&["stream", "name", "state"],
5974
)
60-
.expect("metric can be created");
61-
}
75+
.expect("metric can be created")
76+
});
6277

6378
fn custom_metrics(registry: &Registry) {
6479
registry

server/src/metrics/storage.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ pub trait StorageMetrics {
2424

2525
pub mod localfs {
2626
use crate::{metrics::METRICS_NAMESPACE, storage::FSConfig};
27-
use lazy_static::lazy_static;
27+
use once_cell::sync::Lazy;
2828
use prometheus::{HistogramOpts, HistogramVec};
2929

3030
use super::StorageMetrics;
3131

32-
lazy_static! {
33-
pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new(
32+
pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
33+
HistogramVec::new(
3434
HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency")
3535
.namespace(METRICS_NAMESPACE),
36-
&["method", "status"]
36+
&["method", "status"],
3737
)
38-
.expect("metric can be created");
39-
}
38+
.expect("metric can be created")
39+
});
4040

4141
impl StorageMetrics for FSConfig {
4242
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
@@ -50,19 +50,19 @@ pub mod localfs {
5050

5151
pub mod s3 {
5252
use crate::{metrics::METRICS_NAMESPACE, storage::S3Config};
53-
use lazy_static::lazy_static;
53+
use once_cell::sync::Lazy;
5454
use prometheus::{HistogramOpts, HistogramVec};
5555

5656
use super::StorageMetrics;
5757

58-
lazy_static! {
59-
pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new(
58+
pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
59+
HistogramVec::new(
6060
HistogramOpts::new("s3_response_time", "S3 Request Latency")
6161
.namespace(METRICS_NAMESPACE),
62-
&["method", "status"]
62+
&["method", "status"],
6363
)
64-
.expect("metric can be created");
65-
}
64+
.expect("metric can be created")
65+
});
6666

6767
impl StorageMetrics for S3Config {
6868
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {

0 commit comments

Comments
 (0)