Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ futures = "0.3"
fs_extra = "1.3"
http = "0.2"
humantime-serde = "1.1"
lazy_static = "1.4"
log = "0.4"
num_cpus = "1.15"
sysinfo = "0.28.4"
Expand Down
6 changes: 2 additions & 4 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::storage;

use chrono::{DateTime, Utc};
use clokwerk::{AsyncScheduler, Interval};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
Expand All @@ -36,9 +36,7 @@ use ulid::Ulid;
const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1);

lazy_static! {
pub static ref SYS_INFO: Mutex<System> = Mutex::new(System::new_all());
}
pub static SYS_INFO: Lazy<Mutex<System>> = Lazy::new(|| Mutex::new(System::new_all()));

pub fn refresh_sys_info() {
let mut sys_info = SYS_INFO.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Event {
// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(&self, schema_key: &str) -> Result<(), EventError> {
STREAM_WRITERS::append_to_local(&self.stream_name, schema_key, &self.rb)?;
STREAM_WRITERS.append_to_local(&self.stream_name, schema_key, &self.rb)?;
Ok(())
}
}
Expand Down
58 changes: 40 additions & 18 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::{self, Debug, Formatter};
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::{Deref, DerefMut};
use std::sync::{Mutex, RwLock};

use crate::storage::StorageDir;
Expand All @@ -33,21 +35,44 @@ use self::errors::StreamWriterError;
type ArrowWriter<T> = StreamWriter<T>;
type LocalWriter<T> = Mutex<Option<ArrowWriter<T>>>;

lazy_static! {
#[derive(Default)]
pub static ref STREAM_WRITERS: RwLock<WriterTable<String, String, File>> = RwLock::new(WriterTable::new());
pub static STREAM_WRITERS: Lazy<InnerStreamWriter> =
Lazy::new(|| InnerStreamWriter(RwLock::new(WriterTable::new())));

/*
A wrapper type for global struct to implement methods over
*/
pub struct InnerStreamWriter(RwLock<WriterTable<String, String, File>>);

impl Deref for InnerStreamWriter {
type Target = RwLock<WriterTable<String, String, File>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for InnerStreamWriter {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/*
Manually implmenting for the Type
since it depends on the types which are missing it
*/
impl Debug for InnerStreamWriter {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("InnerStreamWriter { __private_field: () }")
}
}

impl STREAM_WRITERS {
impl InnerStreamWriter {
// append to a existing stream
pub fn append_to_local(
&self,
stream: &str,
schema_key: &str,
record: &RecordBatch,
) -> Result<(), StreamWriterError> {
let hashmap_guard = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisoned)?;
let hashmap_guard = self.read().map_err(|_| StreamWriterError::RwPoisoned)?;

match hashmap_guard.get(stream, schema_key) {
Some(localwriter) => {
Expand All @@ -71,7 +96,7 @@ impl STREAM_WRITERS {
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
drop(hashmap_guard);
STREAM_WRITERS::create_entry(stream.to_owned(), schema_key.to_owned(), record)?;
self.create_entry(stream.to_owned(), schema_key.to_owned(), record)?;
}
};
Ok(())
Expand All @@ -80,13 +105,12 @@ impl STREAM_WRITERS {
// create a new entry with new stream_writer
// Only create entry for valid streams
fn create_entry(
&self,
stream: String,
schema_key: String,
record: &RecordBatch,
) -> Result<(), StreamWriterError> {
let mut hashmap_guard = STREAM_WRITERS
.write()
.map_err(|_| StreamWriterError::RwPoisoned)?;
let mut hashmap_guard = self.write().map_err(|_| StreamWriterError::RwPoisoned)?;

let writer = init_new_stream_writer_file(&stream, &schema_key, record)?;

Expand All @@ -95,14 +119,12 @@ impl STREAM_WRITERS {
Ok(())
}

pub fn delete_stream(stream: &str) {
STREAM_WRITERS.write().unwrap().delete_stream(stream);
pub fn delete_stream(&self, stream: &str) {
self.write().unwrap().delete_stream(stream);
}

pub fn unset_all() -> Result<(), StreamWriterError> {
let table = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisoned)?;
pub fn unset_all(&self) -> Result<(), StreamWriterError> {
let table = self.read().map_err(|_| StreamWriterError::RwPoisoned)?;

for writer in table.iter() {
if let Some(mut streamwriter) = writer
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {

objectstore.delete_stream(&stream_name).await?;
metadata::STREAM_INFO.delete_stream(&stream_name);
event::STREAM_WRITERS::delete_stream(&stream_name);
event::STREAM_WRITERS.delete_stream(&stream_name);

let stream_dir = StorageDir::new(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<(
scheduler
.every((storage::LOCAL_SYNC_INTERVAL as u32).seconds())
.run(move || {
if let Err(e) = crate::event::STREAM_WRITERS::unset_all() {
if let Err(e) = crate::event::STREAM_WRITERS.unset_all() {
log::warn!("failed to sync local data. {:?}", e);
}
});
Expand Down
16 changes: 8 additions & 8 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use arrow_schema::Schema;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

Expand All @@ -28,14 +28,14 @@ use crate::stats::{Stats, StatsCounter};
use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir};

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
use derive_more::{Deref, DerefMut};

// TODO: make return type be of 'static lifetime instead of cloning
lazy_static! {
#[derive(Debug)]
// A read-write lock to allow multiple reads while and isolated write
pub static ref STREAM_INFO: RwLock<HashMap<String, LogStreamMetadata>> =
RwLock::new(HashMap::new());
}
// A read-write lock to allow multiple reads while and isolated write
pub static STREAM_INFO: Lazy<StreamInfo> = Lazy::new(StreamInfo::default);

#[derive(Debug, Deref, DerefMut, Default)]
pub struct StreamInfo(RwLock<HashMap<String, LogStreamMetadata>>);

#[derive(Debug)]
pub struct LogStreamMetadata {
Expand Down Expand Up @@ -63,7 +63,7 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding
// 3. When a stream is deleted (remove the entry from the map)
// 4. When first event is sent to stream (update the schema)
// 5. When set alert API is called (update the alert)
impl STREAM_INFO {
impl StreamInfo {
pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> {
let map = self.read().expect(LOCK_EXPECT);
let meta = map
Expand Down
57 changes: 36 additions & 21 deletions server/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,61 @@
pub mod storage;

use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry};

use crate::{handlers::http::metrics_path, metadata::STREAM_INFO};

pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");

lazy_static! {
pub static ref EVENTS_INGESTED: IntCounterVec = IntCounterVec::new(
pub static EVENTS_INGESTED: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE),
&["stream", "format"]
&["stream", "format"],
)
.expect("metric can be created");
pub static ref EVENTS_INGESTED_SIZE: IntGaugeVec = IntGaugeVec::new(
.expect("metric can be created")
});

pub static EVENTS_INGESTED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("events_ingested_size", "Events ingested size bytes")
.namespace(METRICS_NAMESPACE),
&["stream", "format"]
&["stream", "format"],
)
.expect("metric can be created");
pub static ref STORAGE_SIZE: IntGaugeVec = IntGaugeVec::new(
.expect("metric can be created")
});

pub static STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE),
&["type", "stream", "format"]
&["type", "stream", "format"],
)
.expect("metric can be created");
pub static ref STAGING_FILES: IntGaugeVec = IntGaugeVec::new(
.expect("metric can be created")
});

pub static STAGING_FILES: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE),
&["stream"]
&["stream"],
)
.expect("metric can be created");
pub static ref QUERY_EXECUTE_TIME: HistogramVec = HistogramVec::new(
.expect("metric can be created")
});

pub static QUERY_EXECUTE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE),
&["stream"]
&["stream"],
)
.expect("metric can be created");
pub static ref ALERTS_STATES: IntCounterVec = IntCounterVec::new(
.expect("metric can be created")
});

pub static ALERTS_STATES: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE),
&["stream", "name", "state"]
&["stream", "name", "state"],
)
.expect("metric can be created");
}
.expect("metric can be created")
});

fn custom_metrics(registry: &Registry) {
registry
Expand Down
24 changes: 12 additions & 12 deletions server/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ pub trait StorageMetrics {

pub mod localfs {
use crate::{metrics::METRICS_NAMESPACE, storage::FSConfig};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use prometheus::{HistogramOpts, HistogramVec};

use super::StorageMetrics;

lazy_static! {
pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new(
pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency")
.namespace(METRICS_NAMESPACE),
&["method", "status"]
&["method", "status"],
)
.expect("metric can be created");
}
.expect("metric can be created")
});

impl StorageMetrics for FSConfig {
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
Expand All @@ -50,19 +50,19 @@ pub mod localfs {

pub mod s3 {
use crate::{metrics::METRICS_NAMESPACE, storage::S3Config};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use prometheus::{HistogramOpts, HistogramVec};

use super::StorageMetrics;

lazy_static! {
pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new(
pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("s3_response_time", "S3 Request Latency")
.namespace(METRICS_NAMESPACE),
&["method", "status"]
&["method", "status"],
)
.expect("metric can be created");
}
.expect("metric can be created")
});

impl StorageMetrics for S3Config {
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
Expand Down
7 changes: 3 additions & 4 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
use clap::error::ErrorKind;
use clap::{command, value_parser, Arg, Args, Command, FromArgMatches};

use once_cell::sync::Lazy;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL};
use crate::utils::validate_path_is_writeable;

lazy_static::lazy_static! {
#[derive(Debug)]
pub static ref CONFIG: Arc<Config> = Arc::new(Config::new());
}
pub static CONFIG: Lazy<Arc<Config>> = Lazy::new(|| Arc::new(Config::new()));

#[derive(Debug)]
pub struct Config {
pub parseable: Server,
storage: Arc<dyn ObjectStorageProvider + Send + Sync>,
Expand Down
Loading