diff --git a/src/correlation/mod.rs b/src/correlation.rs similarity index 87% rename from src/correlation/mod.rs rename to src/correlation.rs index 2befb5f07..cf2c7bd82 100644 --- a/src/correlation/mod.rs +++ b/src/correlation.rs @@ -45,36 +45,32 @@ pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); pub struct Correlation(RwLock>); impl Correlation { - pub async fn load(&self) -> Result<(), CorrelationError> { - // lead correlations from storage + //load correlations from storage + pub async fn load(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); - let all_correlations = store.get_correlations().await.unwrap_or_default(); - - let mut correlations = vec![]; - for corr in all_correlations { - if corr.is_empty() { - continue; - } - - let correlation: CorrelationConfig = match serde_json::from_slice(&corr) { - Ok(c) => c, - Err(e) => { - error!("Unable to load correlation- {e}"); - continue; - } - }; - - correlations.push(correlation); - } + let all_correlations = store.get_all_correlations().await.unwrap_or_default(); + + let correlations: Vec = all_correlations + .into_iter() + .flat_map(|(_, correlations_bytes)| correlations_bytes) + .filter_map(|correlation| { + serde_json::from_slice(&correlation) + .inspect_err(|e| { + error!("Unable to load correlation: {e}"); + }) + .ok() + }) + .collect(); let mut s = self.0.write().await; - s.append(&mut correlations.clone()); + s.extend(correlations); Ok(()) } pub async fn list_correlations_for_user( &self, session_key: &SessionKey, + user_id: &str, ) -> Result, CorrelationError> { let correlations = self.0.read().await.iter().cloned().collect_vec(); @@ -87,27 +83,29 @@ impl Correlation { .iter() .map(|t| t.table_name.clone()) .collect_vec(); - if user_auth_for_query(&permissions, tables).is_ok() { + if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id { user_correlations.push(c); } } Ok(user_correlations) } - pub async fn get_correlation_by_id( + pub async fn get_correlation( &self, correlation_id: &str, + user_id: &str, ) -> Result { let read = self.0.read().await; - let correlation = read.iter().find(|c| c.id == correlation_id).cloned(); + let correlation = read + .iter() + .find(|c| c.id == correlation_id && c.user_id == user_id) + .cloned(); - if let Some(c) = correlation { - Ok(c) - } else { - Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + correlation.ok_or_else(|| { + CorrelationError::AnyhowError(anyhow::Error::msg(format!( "Unable to find correlation with ID- {correlation_id}" - )))) - } + ))) + }) } pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { @@ -152,6 +150,7 @@ pub struct CorrelationConfig { pub version: CorrelationVersion, pub title: String, pub id: String, + pub user_id: String, pub table_configs: Vec, pub join_config: JoinConfig, pub filter: Option, @@ -164,7 +163,6 @@ impl CorrelationConfig {} #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationRequest { - pub version: CorrelationVersion, pub title: String, pub table_configs: Vec, pub join_config: JoinConfig, @@ -176,9 +174,10 @@ pub struct CorrelationRequest { impl From for CorrelationConfig { fn from(val: CorrelationRequest) -> Self { Self { - version: val.version, + version: CorrelationVersion::V1, title: val.title, id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), + user_id: String::default(), table_configs: val.table_configs, join_config: val.join_config, filter: val.filter, @@ -189,11 +188,12 @@ impl From for CorrelationConfig { } impl CorrelationRequest { - pub fn generate_correlation_config(self, id: String) -> CorrelationConfig { + pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig { CorrelationConfig { - version: self.version, + version: CorrelationVersion::V1, title: self.title, id, + user_id, table_configs: self.table_configs, join_config: self.join_config, filter: self.filter, diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 549783178..08a9b13d2 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -17,25 +17,27 @@ */ use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use anyhow::Error; use bytes::Bytes; use itertools::Itertools; -use relative_path::RelativePathBuf; use crate::rbac::Users; -use crate::utils::user_auth_for_query; -use crate::{ - option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY, - utils::actix::extract_session_key_from_req, -}; +use crate::storage::object_storage::correlation_path; +use crate::utils::{get_hash, get_user_from_request, user_auth_for_query}; +use crate::{option::CONFIG, utils::actix::extract_session_key_from_req}; use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS}; pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; + + let user_id = get_user_from_request(&req) + .map(|s| get_hash(&s.to_string())) + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlations = CORRELATIONS - .list_correlations_for_user(&session_key) + .list_correlations_for_user(&session_key, &user_id) .await?; Ok(web::Json(correlations)) @@ -43,14 +45,20 @@ pub async fn list(req: HttpRequest) -> Result pub async fn get(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; + + let user_id = get_user_from_request(&req) + .map(|s| get_hash(&s.to_string())) + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlation_id = req .match_info() .get("correlation_id") .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; + let correlation = CORRELATIONS + .get_correlation(correlation_id, &user_id) + .await?; let permissions = Users.get_permissions(&session_key); @@ -68,16 +76,24 @@ pub async fn get(req: HttpRequest) -> Result { pub async fn post(req: HttpRequest, body: Bytes) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + let user_id = get_user_from_request(&req) + .map(|s| get_hash(&s.to_string())) + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; correlation_request.validate(&session_key).await?; - let correlation: CorrelationConfig = correlation_request.into(); + let mut correlation: CorrelationConfig = correlation_request.into(); + correlation.user_id.clone_from(&user_id); + let correlation_id = &correlation.id; + let path = correlation_path(&user_id, &format!("{}.json", correlation_id)); - // Save to disk let store = CONFIG.storage().get_object_store(); - store.put_correlation(&correlation).await?; + let correlation_bytes = serde_json::to_vec(&correlation)?; + store + .put_object(&path, Bytes::from(correlation_bytes)) + .await?; // Save to memory CORRELATIONS.update(&correlation).await?; @@ -88,6 +104,9 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + let user_id = get_user_from_request(&req) + .map(|s| get_hash(&s.to_string())) + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlation_id = req .match_info() @@ -95,7 +114,9 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Result Result Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + let user_id = get_user_from_request(&req) + .map(|s| get_hash(&s.to_string())) + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlation_id = req .match_info() .get("correlation_id") .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; + let correlation = CORRELATIONS + .get_correlation(correlation_id, &user_id) + .await?; // validate user's query auth let permissions = Users.get_permissions(&session_key); @@ -141,12 +173,10 @@ pub async fn delete(req: HttpRequest) -> Result>(); for user in users { - let user_dashboard_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "dashboards" - )); + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); let dashboards_path = RelativePathBuf::from(&user_dashboard_path); let dashboard_bytes = self .get_objects( @@ -716,10 +714,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_filters_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "filters" - )); + let user_filters_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); let resp = self .client .list_with_delimiter(Some(&user_filters_path)) @@ -747,6 +743,43 @@ impl ObjectStorage for BlobStore { Ok(filters) } + ///fetch all correlations uploaded in object store + /// return the correlation file path and all correlation json bytes for each file path + async fn get_all_correlations( + &self, + ) -> Result>, ObjectStorageError> { + let mut correlations: HashMap> = HashMap::new(); + let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); + let resp = self + .client + .list_with_delimiter(Some(&users_root_path)) + .await?; + + let users = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for user in users { + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations")); + let correlations_path = RelativePathBuf::from(&user_correlation_path); + let correlation_bytes = self + .get_objects( + Some(&correlations_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + correlations + .entry(correlations_path) + .or_default() + .extend(correlation_bytes); + } + Ok(correlations) + } fn get_bucket_name(&self) -> String { self.container.clone() } diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 64ae1f9a2..5d8041fc2 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -27,7 +27,7 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; -use futures::{stream::FuturesUnordered, TryStreamExt}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; @@ -39,9 +39,8 @@ use crate::{ }; use super::{ - LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, - CORRELATIONS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; #[derive(Debug, Clone, clap::Args)] @@ -297,12 +296,7 @@ impl ObjectStorage for LocalFS { } async fn list_streams(&self) -> Result, ObjectStorageError> { - let ignore_dir = &[ - "lost+found", - PARSEABLE_ROOT_DIRECTORY, - USERS_ROOT_DIR, - CORRELATIONS_ROOT_DIRECTORY, - ]; + let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries @@ -322,11 +316,7 @@ impl ObjectStorage for LocalFS { } async fn list_old_streams(&self) -> Result, ObjectStorageError> { - let ignore_dir = &[ - "lost+found", - PARSEABLE_ROOT_DIRECTORY, - CORRELATIONS_ROOT_DIRECTORY, - ]; + let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries @@ -433,6 +423,37 @@ impl ObjectStorage for LocalFS { Ok(filters) } + ///fetch all correlations stored in disk + /// return the correlation file path and all correlation json bytes for each file path + async fn get_all_correlations( + &self, + ) -> Result>, ObjectStorageError> { + let mut correlations: HashMap> = HashMap::new(); + let users_root_path = self.root.join(USERS_ROOT_DIR); + let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); + while let Some(user) = directories.next().await { + let user = user?; + if !user.path().is_dir() { + continue; + } + let correlations_path = users_root_path.join(user.path()).join("correlations"); + let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?); + while let Some(correlation) = files.next().await { + let correlation_absolute_path = correlation?.path(); + let file = fs::read(correlation_absolute_path.clone()).await?; + let correlation_relative_path = correlation_absolute_path + .strip_prefix(self.root.as_path()) + .unwrap(); + + correlations + .entry(RelativePathBuf::from_path(correlation_relative_path).unwrap()) + .or_default() + .push(file.into()); + } + } + Ok(correlations) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let path = self.root.join(stream_name); let directories = ReadDirStream::new(fs::read_dir(&path).await?); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 21c651aff..454a17ebe 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -56,7 +56,6 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERT_FILE_NAME: &str = ".alert.json"; pub const MANIFEST_FILE: &str = "manifest.json"; -pub const CORRELATIONS_ROOT_DIRECTORY: &str = ".correlations"; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 35aaf6434..05c046179 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,15 +21,13 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - Owner, ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, - PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + Owner, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; -use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; @@ -102,6 +100,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn get_all_dashboards( &self, ) -> Result>, ObjectStorageError>; + async fn get_all_correlations( + &self, + ) -> Result>, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn list_manifest_files( &self, @@ -624,30 +625,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // pick a better name fn get_bucket_name(&self) -> String; - - async fn put_correlation( - &self, - correlation: &CorrelationConfig, - ) -> Result<(), ObjectStorageError> { - let path = RelativePathBuf::from_iter([ - CORRELATIONS_ROOT_DIRECTORY, - &format!("{}.json", correlation.id), - ]); - self.put_object(&path, to_bytes(correlation)).await?; - Ok(()) - } - - async fn get_correlations(&self) -> Result, CorrelationError> { - let correlation_path = RelativePathBuf::from_iter([CORRELATIONS_ROOT_DIRECTORY]); - let correlation_bytes = self - .get_objects( - Some(&correlation_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - Ok(correlation_bytes) - } } pub async fn commit_schema_to_storage( @@ -721,6 +698,15 @@ pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) -> ]) } +pub fn correlation_path(user_id: &str, correlation_file_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([ + USERS_ROOT_DIR, + user_id, + CORRELATION_DIR, + correlation_file_name, + ]) +} + /// path will be ".parseable/.parsable.json" #[inline(always)] pub fn parseable_json_path() -> RelativePathBuf { diff --git a/src/storage/s3.rs b/src/storage/s3.rs index ccf1be172..e115dd0bf 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -811,10 +811,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_dashboard_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "dashboards" - )); + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); let dashboards_path = RelativePathBuf::from(&user_dashboard_path); let dashboard_bytes = self .get_objects( @@ -849,10 +847,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_filters_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "filters" - )); + let user_filters_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); let resp = self .client .list_with_delimiter(Some(&user_filters_path)) @@ -880,6 +876,44 @@ impl ObjectStorage for S3 { Ok(filters) } + ///fetch all correlations stored in object store + /// return the correlation file path and all correlation json bytes for each file path + async fn get_all_correlations( + &self, + ) -> Result>, ObjectStorageError> { + let mut correlations: HashMap> = HashMap::new(); + let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); + let resp = self + .client + .list_with_delimiter(Some(&users_root_path)) + .await?; + + let users = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for user in users { + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); + let correlations_path = RelativePathBuf::from(&user_correlation_path); + let correlation_bytes = self + .get_objects( + Some(&correlations_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + correlations + .entry(correlations_path) + .or_default() + .extend(correlation_bytes); + } + Ok(correlations) + } + fn get_bucket_name(&self) -> String { self.bucket.clone() }