Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::alerts::Alerts;
use crate::event;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::retention::{self, Retention};
use crate::storage::{LogStream, StorageDir};
use crate::{metadata, validator};

Expand Down Expand Up @@ -173,6 +174,36 @@ pub async fn put_alert(
))
}

pub async fn put_retention(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let body = body.into_inner();

let retention: Retention = match serde_json::from_value(body) {
Ok(retention) => retention,
Err(err) => return Err(StreamError::InvalidRetentionConfig(err)),
};

if !STREAM_INFO.stream_initialized(&stream_name)? {
return Err(StreamError::UninitializedLogstream);
}

CONFIG
.storage()
.get_object_store()
.put_retention(&stream_name, &retention)
.await?;

retention::init_scheduler(&stream_name, retention);

Ok((
format!("set retention configuration for log stream {stream_name}"),
StatusCode::OK,
))
}

pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

Expand Down Expand Up @@ -270,6 +301,8 @@ pub mod error {
AlertValidation(#[from] AlertValidationError),
#[error("alert - \"{0}\" is invalid, please check if alert is valid according to this stream's schema and try again")]
InvalidAlert(String),
#[error("failed to set retention configuration due to err: {0}")]
InvalidRetentionConfig(serde_json::Error),
#[error("{msg}")]
Custom { msg: String, status: StatusCode },
}
Expand All @@ -286,6 +319,7 @@ pub mod error {
StreamError::BadAlertJson { .. } => StatusCode::BAD_REQUEST,
StreamError::AlertValidation(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
11 changes: 10 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> {

// track all parquet files already in the data directory
storage::CACHED_FILES.track_parquet();

storage::retention::load_retention_from_global().await;
// load data from stats back to prometheus metrics
metrics::load_from_global_stats();

Expand Down Expand Up @@ -342,6 +342,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
web::resource(stats_path("{logstream}"))
.route(web::get().to(handlers::logstream::get_stats)),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource(retention_path("{logstream}"))
.route(web::put().to(handlers::logstream::put_retention)),
)
// GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
.service(web::resource(liveness_path()).route(web::get().to(handlers::liveness)))
// GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
Expand Down Expand Up @@ -412,3 +417,7 @@ fn schema_path(stream_name: &str) -> String {
fn stats_path(stream_name: &str) -> String {
format!("{}/stats", logstream_path(stream_name))
}

fn retention_path(stream_name: &str) -> String {
format!("{}/retention", logstream_path(stream_name))
}
1 change: 1 addition & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::sync::{Arc, Mutex};
mod file_link;
mod localfs;
mod object_storage;
pub mod retention;
mod s3;
mod store_metadata;

Expand Down
31 changes: 31 additions & 0 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ impl ObjectStorage for LocalFS {
res.map_err(Into::into)
}

async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> {
let path = self.path_in_root(path);
tokio::fs::remove_dir_all(path).await?;
Ok(())
}

async fn check(&self) -> Result<(), ObjectStorageError> {
fs::create_dir_all(&self.root).await?;
validate_path_is_writeable(&self.root)
Expand Down Expand Up @@ -173,6 +179,16 @@ impl ObjectStorage for LocalFS {
Ok(logstreams)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name);
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
let entries: Vec<DirEntry> = directories.try_collect().await?;
let entries = entries.into_iter().map(dir_name);
let dates: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?;

Ok(dates.into_iter().flatten().collect())
}

async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
let op = CopyOptions {
overwrite: true,
Expand Down Expand Up @@ -255,6 +271,21 @@ async fn dir_with_stream(
}
}

async fn dir_name(entry: DirEntry) -> Result<Option<String>, ObjectStorageError> {
if entry.file_type().await?.is_dir() {
let dir_name = entry
.path()
.file_name()
.expect("valid path")
.to_str()
.expect("valid unicode")
.to_owned();
Ok(Some(dir_name))
} else {
Ok(None)
}
}

impl From<fs_extra::error::Error> for ObjectStorageError {
fn from(e: fs_extra::error::Error) -> Self {
ObjectStorageError::UnhandledError(Box::new(e))
Expand Down
41 changes: 39 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/

use super::{
file_link::CacheState, LogStream, MoveDataError, ObjectStorageError, ObjectStoreFormat,
Permisssion, StorageDir, StorageMetadata, CACHED_FILES,
file_link::CacheState, retention::Retention, LogStream, MoveDataError, ObjectStorageError,
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, CACHED_FILES,
};
use crate::{
alerts::Alerts,
Expand Down Expand Up @@ -77,9 +77,11 @@ pub trait ObjectStorage: Sync + 'static {
path: &RelativePath,
resource: Bytes,
) -> Result<(), ObjectStorageError>;
async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
async fn check(&self) -> Result<(), ObjectStorageError>;
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
fn query_table(
&self,
Expand Down Expand Up @@ -142,6 +144,23 @@ pub trait ObjectStorage: Sync + 'static {
self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_retention(
&self,
stream_name: &str,
retention: &Retention,
) -> Result<(), ObjectStorageError> {
let path = stream_json_path(stream_name);
let stream_metadata = self.get_object(&path).await?;
let stats =
serde_json::to_value(retention).expect("rentention tasks are perfectly serializable");
let mut stream_metadata: serde_json::Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");

stream_metadata["retention"] = stats;

self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_metadata(
&self,
parseable_metadata: &StorageMetadata,
Expand Down Expand Up @@ -216,6 +235,24 @@ pub trait ObjectStorage: Sync + 'static {
Ok(stats)
}

async fn get_retention(&self, stream_name: &str) -> Result<Retention, ObjectStorageError> {
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
let stream_metadata: Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");

let retention = stream_metadata
.as_object()
.expect("is object")
.get("retention")
.cloned();

if let Some(retention) = retention {
Ok(serde_json::from_value(retention).unwrap())
} else {
Ok(Retention::default())
}
}

async fn get_metadata(&self) -> Result<Option<StorageMetadata>, ObjectStorageError> {
let parseable_metadata: Option<StorageMetadata> =
match self.get_object(&parseable_json_path()).await {
Expand Down
Loading