Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl Event {
self.process_first_event::<s3::S3, _>(event, inferred_schema)?
};

metadata::STREAM_INFO.update_stats(
&self.stream_name,
std::mem::size_of_val(self.body.as_bytes()) as u64,
)?;

if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
log::error!("Error checking for alerts. {:?}", e);
}
Expand All @@ -227,7 +232,7 @@ impl Event {
&self,
mut event: json::Reader<R>,
schema: Schema,
) -> Result<u64, EventError> {
) -> Result<(), EventError> {
// note for functions _schema_with_map and _set_schema_with_map,
// these are to be called while holding a write lock specifically.
// this guarantees two things
Expand Down Expand Up @@ -282,7 +287,7 @@ impl Event {
}
});

Ok(0)
Ok(())
}
}

Expand All @@ -291,13 +296,13 @@ impl Event {
fn process_event<R: std::io::Read>(
&self,
mut event: json::Reader<R>,
) -> Result<u64, EventError> {
) -> Result<(), EventError> {
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
let stream_name = &self.stream_name;

STREAM_WRITERS::append_to_local(stream_name, &rb)?;

Ok(0)
Ok(())
}

// inferSchema is a constructor to Schema
Expand Down
37 changes: 37 additions & 0 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs;

use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use chrono::Utc;
use serde_json::Value;

use crate::alerts::Alerts;
Expand Down Expand Up @@ -302,6 +303,42 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
.to_http()
}

pub async fn get_stats(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

let stats = match metadata::STREAM_INFO.get_stats(&stream_name) {
Ok(stats) => stats,
Err(e) => {
return response::ServerResponse {
msg: format!("Could not return stats due to error: {}", e),
code: StatusCode::BAD_REQUEST,
}
.to_http()
}
};

let time = Utc::now();

let stats = serde_json::json!({
"stream": stream_name,
"time": time,
"ingestion": {
"size": format!("{} {}", stats.ingestion, "Bytes"),
"format": "json"
},
"storage": {
"size": format!("{} {}", stats.storage, "Bytes"),
"format": "parquet"
}
});

response::ServerResponse {
msg: stats.to_string(),
code: StatusCode::OK,
}
.to_http()
}

fn remove_id_from_alerts(value: &mut Value) {
if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
alerts
Expand Down
10 changes: 10 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod option;
mod query;
mod response;
mod s3;
mod stats;
mod storage;
mod utils;
mod validator;
Expand Down Expand Up @@ -342,6 +343,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
web::resource(schema_path("{logstream}"))
.route(web::get().to(handlers::logstream::schema)),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource(stats_path("{logstream}"))
.route(web::get().to(handlers::logstream::get_stats)),
)
// GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
.service(web::resource(liveness_path()).route(web::get().to(handlers::liveness)))
// GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
Expand Down Expand Up @@ -399,3 +405,7 @@ fn alert_path(stream_name: &str) -> String {
fn schema_path(stream_name: &str) -> String {
format!("{}/schema", logstream_path(stream_name))
}

fn stats_path(stream_name: &str) -> String {
format!("{}/stats", logstream_path(stream_name))
}
47 changes: 16 additions & 31 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

use datafusion::arrow::datatypes::Schema;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::RwLock;

use crate::alerts::Alerts;
use crate::event::Event;
use crate::stats::{Stats, StatsCounter};
use crate::storage::ObjectStorage;

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
Expand All @@ -32,25 +32,7 @@ use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
pub struct LogStreamMetadata {
pub schema: Option<Schema>,
pub alerts: Alerts,
pub stats: Stats,
}

#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
pub struct Stats {
pub size: u64,
pub compressed_size: u64,
#[serde(skip)]
pub prev_compressed: u64,
}

impl Stats {
/// Update stats considering the following facts about params:
/// - `size`: The event body's binary size.
/// - `compressed_size`: Binary size of parquet file, total compressed_size is this plus size of all past parquet files.
pub fn update(&mut self, size: u64, compressed_size: u64) {
self.size += size;
self.compressed_size = self.prev_compressed + compressed_size;
}
pub stats: StatsCounter,
}

lazy_static! {
Expand Down Expand Up @@ -138,11 +120,12 @@ impl STREAM_INFO {
for stream in storage.list_streams().await? {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema(&stream.name).await?;
let stats = storage.get_stats(&stream.name).await?;

let metadata = LogStreamMetadata {
schema,
alerts,
..LogStreamMetadata::default()
stats: stats.into(),
};

let mut map = self.write().expect(LOCK_EXPECT);
Expand All @@ -161,22 +144,24 @@ impl STREAM_INFO {
.collect()
}

#[allow(dead_code)]
pub fn update_stats(
&self,
stream_name: &str,
size: u64,
compressed_size: u64,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
pub fn update_stats(&self, stream_name: &str, size: u64) -> Result<(), MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
let stream = map
.get_mut(stream_name)
.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?;

stream.stats.update(size, compressed_size);
stream.stats.add_ingestion_size(size);

Ok(())
}

pub fn get_stats(&self, stream_name: &str) -> Result<Stats, MetadataError> {
self.read()
.expect(LOCK_EXPECT)
.get(stream_name)
.map(|metadata| Stats::from(&metadata.stats))
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))
}
}

pub mod error {
Expand Down
50 changes: 38 additions & 12 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use http::Uri;
use object_store::aws::AmazonS3Builder;
use object_store::limit::LimitStore;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fs;
use std::iter::Iterator;
use std::sync::Arc;

use crate::alerts::Alerts;
use crate::metadata::Stats;
use crate::option::{StorageOpt, CONFIG};
use crate::query::Query;
use crate::stats::Stats;
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};

// Default object storage currently is DO Spaces bucket
Expand Down Expand Up @@ -221,21 +222,27 @@ impl S3 {
.key(format!("{}/.schema", stream_name))
.send()
.await?;
// create .parseable.json file in the stream-name prefix.
// This indicates the format version for this stream.
// This is helpful in case we may change the backend format
// in the future
self._put_parseable_config(stream_name, format).await?;
// Prefix created on S3, now create the directory in
// the local storage as well
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
Ok(())
}

async fn _put_parseable_config(
&self,
stream_name: &str,
body: Vec<u8>,
) -> Result<(), AwsSdkError> {
let _resp = self
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.parseable.json", stream_name))
.body(format.into())
.body(body.into())
.send()
.await?;
// Prefix created on S3, now create the directory in
// the local storage as well
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));

Ok(())
}

Expand Down Expand Up @@ -290,8 +297,8 @@ impl S3 {
self._get(stream_name, "alert.json").await
}

async fn _get_stats(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
self._get(stream_name, "stats.json").await
async fn _get_parseable_config(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
self._get(stream_name, "parseable.json").await
}

async fn _get(&self, stream_name: &str, resource: &str) -> Result<Bytes, AwsSdkError> {
Expand Down Expand Up @@ -434,11 +441,30 @@ impl ObjectStorage for S3 {
}

async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?;
let parseable_metadata = self._get_parseable_config(stream_name).await?;
let parseable_metadata: Value =
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");

let stats = &parseable_metadata["stats"];

let stats = serde_json::from_value(stats.clone()).unwrap_or_default();

Ok(stats)
}

async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> {
let stats = serde_json::to_value(stats).expect("stats are perfectly serializable");
let parseable_metadata = self._get_parseable_config(stream_name).await?;
let mut parseable_metadata: Value =
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");

parseable_metadata["stats"] = stats;

self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes())
.await?;
Ok(())
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Expand Down
72 changes: 72 additions & 0 deletions server/src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::sync::atomic::{AtomicU64, Ordering};

use serde::{Deserialize, Serialize};

#[derive(Debug)]
pub struct StatsCounter {
ingestion_size: AtomicU64,
storage_size: AtomicU64,
}

impl Default for StatsCounter {
fn default() -> Self {
Self {
ingestion_size: AtomicU64::new(0),
storage_size: AtomicU64::new(0),
}
}
}

impl PartialEq for StatsCounter {
fn eq(&self, other: &Self) -> bool {
self.ingestion_size() == other.ingestion_size()
&& self.storage_size() == other.storage_size()
}
}

impl StatsCounter {
pub fn new(ingestion_size: u64, storage_size: u64) -> Self {
Self {
ingestion_size: AtomicU64::new(ingestion_size),
storage_size: AtomicU64::new(storage_size),
}
}

pub fn ingestion_size(&self) -> u64 {
self.ingestion_size.load(Ordering::Relaxed)
}

pub fn storage_size(&self) -> u64 {
self.storage_size.load(Ordering::Relaxed)
}

pub fn add_ingestion_size(&self, size: u64) {
self.ingestion_size.fetch_add(size, Ordering::AcqRel);
}

pub fn add_storage_size(&self, size: u64) {
self.storage_size.fetch_add(size, Ordering::AcqRel);
}
}

/// Helper struct type created by copying stats values from metadata
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Stats {
pub ingestion: u64,
pub storage: u64,
}

impl From<&StatsCounter> for Stats {
fn from(stats: &StatsCounter) -> Self {
Self {
ingestion: stats.ingestion_size(),
storage: stats.storage_size(),
}
}
}

impl From<Stats> for StatsCounter {
fn from(stats: Stats) -> Self {
StatsCounter::new(stats.ingestion, stats.storage)
}
}
Loading