From 274bb5d4886b8de881eead2059fd22b24593c103 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 7 Mar 2023 18:10:49 +0530 Subject: [PATCH] Add retention mode * Add API for setting up retention * Support for both local mode and s3 mode --- server/src/handlers/logstream.rs | 34 ++++ server/src/main.rs | 11 +- server/src/storage.rs | 1 + server/src/storage/localfs.rs | 31 ++++ server/src/storage/object_storage.rs | 41 ++++- server/src/storage/retention.rs | 251 +++++++++++++++++++++++++++ server/src/storage/s3.rs | 69 ++++++++ 7 files changed, 435 insertions(+), 3 deletions(-) create mode 100644 server/src/storage/retention.rs diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 57345c09e..ca050fe47 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -27,6 +27,7 @@ use crate::alerts::Alerts; use crate::event; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; +use crate::storage::retention::{self, Retention}; use crate::storage::{LogStream, StorageDir}; use crate::{metadata, validator}; @@ -173,6 +174,36 @@ pub async fn put_alert( )) } +pub async fn put_retention( + req: HttpRequest, + body: web::Json, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let body = body.into_inner(); + + let retention: Retention = match serde_json::from_value(body) { + Ok(retention) => retention, + Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), + }; + + if !STREAM_INFO.stream_initialized(&stream_name)? { + return Err(StreamError::UninitializedLogstream); + } + + CONFIG + .storage() + .get_object_store() + .put_retention(&stream_name, &retention) + .await?; + + retention::init_scheduler(&stream_name, retention); + + Ok(( + format!("set retention configuration for log stream {stream_name}"), + StatusCode::OK, + )) +} + pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -270,6 +301,8 @@ pub mod error { AlertValidation(#[from] AlertValidationError), #[error("alert - \"{0}\" is invalid, please check if alert is valid according to this stream's schema and try again")] InvalidAlert(String), + #[error("failed to set retention configuration due to err: {0}")] + InvalidRetentionConfig(serde_json::Error), #[error("{msg}")] Custom { msg: String, status: StatusCode }, } @@ -286,6 +319,7 @@ pub mod error { StreamError::BadAlertJson { .. } => StatusCode::BAD_REQUEST, StreamError::AlertValidation(_) => StatusCode::BAD_REQUEST, StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST, + StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST, } } diff --git a/server/src/main.rs b/server/src/main.rs index 7c97cb0bc..769e029ea 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> { // track all parquet files already in the data directory storage::CACHED_FILES.track_parquet(); - + storage::retention::load_retention_from_global().await; // load data from stats back to prometheus metrics metrics::load_from_global_stats(); @@ -342,6 +342,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::resource(stats_path("{logstream}")) .route(web::get().to(handlers::logstream::get_stats)), ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource(retention_path("{logstream}")) + .route(web::put().to(handlers::logstream::put_retention)), + ) // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command .service(web::resource(liveness_path()).route(web::get().to(handlers::liveness))) // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes @@ -412,3 +417,7 @@ fn schema_path(stream_name: &str) -> String { fn stats_path(stream_name: &str) -> String { format!("{}/stats", logstream_path(stream_name)) } + +fn retention_path(stream_name: &str) -> String { + format!("{}/retention", logstream_path(stream_name)) +} diff --git a/server/src/storage.rs b/server/src/storage.rs index 24f140276..6eaf3af11 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -38,6 +38,7 @@ use std::sync::{Arc, Mutex}; mod file_link; mod localfs; mod object_storage; +pub mod retention; mod s3; mod store_metadata; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 555a215b6..ce1407e5b 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -142,6 +142,12 @@ impl ObjectStorage for LocalFS { res.map_err(Into::into) } + async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + let path = self.path_in_root(path); + tokio::fs::remove_dir_all(path).await?; + Ok(()) + } + async fn check(&self) -> Result<(), ObjectStorageError> { fs::create_dir_all(&self.root).await?; validate_path_is_writeable(&self.root) @@ -173,6 +179,16 @@ impl ObjectStorage for LocalFS { Ok(logstreams) } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { + let path = self.root.join(stream_name); + let directories = ReadDirStream::new(fs::read_dir(&path).await?); + let entries: Vec = directories.try_collect().await?; + let entries = entries.into_iter().map(dir_name); + let dates: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?; + + Ok(dates.into_iter().flatten().collect()) + } + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let op = CopyOptions { overwrite: true, @@ -255,6 +271,21 @@ async fn dir_with_stream( } } +async fn dir_name(entry: DirEntry) -> Result, ObjectStorageError> { + if entry.file_type().await?.is_dir() { + let dir_name = entry + .path() + .file_name() + .expect("valid path") + .to_str() + .expect("valid unicode") + .to_owned(); + Ok(Some(dir_name)) + } else { + Ok(None) + } +} + impl From for ObjectStorageError { fn from(e: fs_extra::error::Error) -> Self { ObjectStorageError::UnhandledError(Box::new(e)) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 8b210e3d9..f4013d5ac 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -17,8 +17,8 @@ */ use super::{ - file_link::CacheState, LogStream, MoveDataError, ObjectStorageError, ObjectStoreFormat, - Permisssion, StorageDir, StorageMetadata, CACHED_FILES, + file_link::CacheState, retention::Retention, LogStream, MoveDataError, ObjectStorageError, + ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, CACHED_FILES, }; use crate::{ alerts::Alerts, @@ -77,9 +77,11 @@ pub trait ObjectStorage: Sync + 'static { path: &RelativePath, resource: Bytes, ) -> Result<(), ObjectStorageError>; + async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError>; async fn check(&self) -> Result<(), ObjectStorageError>; async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; async fn list_streams(&self) -> Result, ObjectStorageError>; + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; fn query_table( &self, @@ -142,6 +144,23 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&stream_metadata)).await } + async fn put_retention( + &self, + stream_name: &str, + retention: &Retention, + ) -> Result<(), ObjectStorageError> { + let path = stream_json_path(stream_name); + let stream_metadata = self.get_object(&path).await?; + let stats = + serde_json::to_value(retention).expect("rentention tasks are perfectly serializable"); + let mut stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + + stream_metadata["retention"] = stats; + + self.put_object(&path, to_bytes(&stream_metadata)).await + } + async fn put_metadata( &self, parseable_metadata: &StorageMetadata, @@ -216,6 +235,24 @@ pub trait ObjectStorage: Sync + 'static { Ok(stats) } + async fn get_retention(&self, stream_name: &str) -> Result { + let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; + let stream_metadata: Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + + let retention = stream_metadata + .as_object() + .expect("is object") + .get("retention") + .cloned(); + + if let Some(retention) = retention { + Ok(serde_json::from_value(retention).unwrap()) + } else { + Ok(Retention::default()) + } + } + async fn get_metadata(&self) -> Result, ObjectStorageError> { let parseable_metadata: Option = match self.get_object(&parseable_json_path()).await { diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs new file mode 100644 index 000000000..8ce387138 --- /dev/null +++ b/server/src/storage/retention.rs @@ -0,0 +1,251 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::hash::Hash; +use std::num::NonZeroU32; +use std::sync::Mutex; +use std::thread; +use std::time::Duration; + +use clokwerk::AsyncScheduler; +use clokwerk::Job; +use clokwerk::TimeUnits; +use derive_more::Display; +use once_cell::sync::Lazy; + +use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; + +type SchedulerHandle = thread::JoinHandle<()>; + +static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); + +fn async_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .thread_name("retention-task-thread") + .enable_all() + .build() + .unwrap() +} + +pub async fn load_retention_from_global() { + log::info!("loading retention for all streams"); + for stream in STREAM_INFO.list_streams() { + let res = CONFIG + .storage() + .get_object_store() + .get_retention(&stream) + .await; + match res { + Ok(config) => { + if config.tasks.is_empty() { + log::info!("skipping loading retention for {stream}"); + continue; + } + init_scheduler(&stream, config) + } + Err(err) => log::warn!("failed to load retention config for {stream} due to {err:?}"), + } + } +} + +pub fn init_scheduler(stream: &str, config: Retention) { + log::info!("Setting up schedular for {stream}"); + let mut scheduler = AsyncScheduler::new(); + for Task { action, days, .. } in config.tasks.into_iter() { + let func = match action { + Action::Delete => { + let stream = stream.to_string(); + move || action::delete(stream.clone(), u32::from(days)) + } + }; + + scheduler + .every(u32::from(days).days()) + .at("00:00") + .run(func); + } + + let handler = thread::spawn(|| { + let rt = async_runtime(); + rt.block_on(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + scheduler.run_pending().await; + } + }); + }); + + *SCHEDULER_HANDLER.lock().unwrap() = Some(handler); + log::info!("Scheduler is initialized") +} + +#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] +#[serde(try_from = "Vec")] +#[serde(into = "Vec")] +pub struct Retention { + tasks: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Task { + description: String, + action: Action, + days: NonZeroU32, +} + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Display, serde::Serialize, serde::Deserialize, +)] +#[serde(rename_all = "lowercase")] +enum Action { + Delete, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct TaskView { + description: String, + action: Action, + duration: String, +} + +impl TryFrom> for Retention { + type Error = String; + + fn try_from(task_view: Vec) -> Result { + let mut set = Vec::with_capacity(2); + let mut tasks = Vec::new(); + + for task in task_view { + let duration = task.duration; + if !duration.ends_with('d') { + return Err("missing 'd' suffix for duration value".to_string()); + } + let Ok(days) = duration[0..duration.len() - 1].parse() else { return Err("could not convert duration to an unsigned number".to_string());}; + + if set.contains(&task.action) { + return Err(format!( + "Configuration contains two task both of action \"{}\"", + task.action + )); + } else { + set.push(task.action) + } + + tasks.push(Task { + description: task.description, + action: task.action, + days, + }) + } + + Ok(Retention { tasks }) + } +} + +impl From for Vec { + fn from(value: Retention) -> Self { + value + .tasks + .into_iter() + .map(|task| { + let duration = format!("{}d", task.days); + TaskView { + description: task.description, + action: task.action, + duration, + } + }) + .collect() + } +} + +mod action { + use chrono::{Days, NaiveDate, Utc}; + use futures::{stream::FuturesUnordered, StreamExt}; + use itertools::Itertools; + use relative_path::RelativePathBuf; + + use crate::option::CONFIG; + + pub(super) async fn delete(stream_name: String, days: u32) { + log::info!("running retention task - delete"); + let retain_until = get_retain_until(Utc::now().date_naive(), days as u64); + + let Ok(dates) = CONFIG.storage().get_object_store().list_dates(&stream_name) + .await else { return }; + + let dates_to_delete = dates + .into_iter() + .filter(|date| string_to_date(date) < retain_until) + .collect_vec(); + + let delete_tasks = FuturesUnordered::new(); + for date in dates_to_delete { + let path = RelativePathBuf::from_iter([&stream_name, &date]); + delete_tasks.push(async move { + CONFIG + .storage() + .get_object_store() + .delete_prefix(&path) + .await + }); + } + + let res: Vec<_> = delete_tasks.collect().await; + + for res in res { + if let Err(err) = res { + log::error!("Failed to run delete task {err:?}") + } + } + } + + fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate { + current_date - Days::new(days) + } + + fn string_to_date(date: &str) -> NaiveDate { + let year = date[5..9].parse().unwrap(); + let month = date[10..12].parse().unwrap(); + let day = date[13..15].parse().unwrap(); + + NaiveDate::from_ymd_opt(year, month, day).unwrap() + } + + #[cfg(test)] + mod tests { + use chrono::{Datelike, NaiveDate}; + + use super::get_retain_until; + use super::string_to_date; + + #[test] + fn test_time_from_string() { + let value = "date=2000-01-01"; + let time = string_to_date(value); + assert_eq!(time, NaiveDate::from_ymd_opt(2000, 1, 1).unwrap()); + } + #[test] + fn test_retain_day() { + let current_date = NaiveDate::from_ymd_opt(2000, 1, 2).unwrap(); + let date = get_retain_until(current_date, 1); + assert_eq!(date.day(), 1) + } + } +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 9a454bb4f..d44a1a436 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -255,6 +255,36 @@ impl S3 { Ok(()) } + async fn _delete_prefix(&self, path: &RelativePath) -> Result<(), AwsSdkError> { + let mut pages = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(path.as_str()) + .into_paginator() + .send(); + + let mut delete_objects: Vec = vec![]; + while let Some(page) = pages.next().await { + let page = page?; + for obj in page.contents.unwrap() { + let obj_id = ObjectIdentifier::builder().set_key(obj.key).build(); + delete_objects.push(obj_id); + } + } + + let delete = Delete::builder().set_objects(Some(delete_objects)).build(); + + self.client + .delete_objects() + .bucket(&self.bucket) + .delete(delete) + .send() + .await?; + + Ok(()) + } + async fn _list_streams(&self) -> Result, AwsSdkError> { let resp = self .client @@ -299,6 +329,33 @@ impl S3 { .collect_vec()) } + async fn _list_dates(&self, stream: &str) -> Result, AwsSdkError> { + let prefix = format!("{stream}/"); + let resp = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(&prefix) + .delimiter('/') + .send() + .await?; + + let common_prefixes = resp.common_prefixes().unwrap_or_default(); + + // return prefixes at the root level + let dates: Vec<_> = common_prefixes + .iter() + .filter_map(CommonPrefix::prefix) + .filter_map(|name| { + name.strip_suffix('/') + .and_then(|name| name.strip_prefix(&prefix)) + }) + .map(String::from) + .collect(); + + Ok(dates) + } + async fn _upload_file( &self, key: &str, @@ -353,6 +410,12 @@ impl ObjectStorage for S3 { Ok(()) } + async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + self._delete_prefix(path).await?; + + Ok(()) + } + async fn check(&self) -> Result<(), ObjectStorageError> { self.client .head_bucket() @@ -375,6 +438,12 @@ impl ObjectStorage for S3 { Ok(streams) } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { + let streams = self._list_dates(stream_name).await?; + + Ok(streams) + } + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let hash = if self.set_content_md5 { let mut file = std::fs::File::open(path)?;