From e60a884ce082891fda9d6950bc81515db7e139a0 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 25 Dec 2024 00:09:16 +0530 Subject: [PATCH 1/5] Feature: Backend for Correlation --- src/cli.rs | 4 +- src/correlation/correlation_utils.rs | 77 ++++++++++ src/correlation/http_handlers.rs | 149 ++++++++++++++++++ src/correlation/mod.rs | 193 ++++++++++++++++++++++++ src/handlers/http/modal/query_server.rs | 5 + src/handlers/http/modal/server.rs | 41 +++++ src/handlers/http/query.rs | 2 +- src/lib.rs | 1 + src/rbac/role.rs | 20 +++ src/storage/mod.rs | 1 + src/storage/object_storage.rs | 31 +++- 11 files changed, 520 insertions(+), 4 deletions(-) create mode 100644 src/correlation/correlation_utils.rs create mode 100644 src/correlation/http_handlers.rs create mode 100644 src/correlation/mod.rs diff --git a/src/cli.rs b/src/cli.rs index 38648c7ad..1205fdfb2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -529,7 +529,9 @@ impl FromArgMatches for Cli { self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_security_protocol = m + .get_one::(Self::KAFKA_SECURITY_PROTOCOL) + .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); diff --git a/src/correlation/correlation_utils.rs b/src/correlation/correlation_utils.rs new file mode 100644 index 000000000..ec9d25c5c --- /dev/null +++ b/src/correlation/correlation_utils.rs @@ -0,0 +1,77 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use datafusion::common::tree_node::TreeNode; + +use crate::{ + query::{TableScanVisitor, QUERY_SESSION}, + rbac::{ + map::SessionKey, + role::{Action, Permission}, + Users, + }, +}; + +use super::CorrelationError; + +async fn get_tables_from_query(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state + .create_logical_plan(query) + .await + .map_err(|err| CorrelationError::AnyhowError(err.into()))?; + + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + Ok(visitor) +} + +pub async fn user_auth_for_query( + session_key: &SessionKey, + query: &str, +) -> Result<(), CorrelationError> { + let tables = get_tables_from_query(query).await?; + let permissions = Users.get_permissions(session_key); + + for table_name in tables.into_inner().iter() { + let mut authorized = false; + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions.iter() { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(Action::Query, ref stream, _) + if stream == table_name || stream == "*" => + { + authorized = true; + } + _ => (), + } + } + + if !authorized { + return Err(CorrelationError::Unauthorized); + } + } + + Ok(()) +} diff --git a/src/correlation/http_handlers.rs b/src/correlation/http_handlers.rs new file mode 100644 index 000000000..a3ac05359 --- /dev/null +++ b/src/correlation/http_handlers.rs @@ -0,0 +1,149 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, Responder}; +use bytes::Bytes; +use relative_path::RelativePathBuf; + +use crate::{ + option::CONFIG, + storage::{CORRELATION_DIRECTORY, PARSEABLE_ROOT_DIRECTORY}, + utils::{actix::extract_session_key_from_req, uid::Uid}, +}; + +use super::{ + correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError, + CorrelationRequest, CORRELATIONS, +}; + +pub async fn list(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlations = CORRELATIONS + .list_correlations_for_user(&session_key) + .await?; + + Ok(web::Json(correlations)) +} + +pub async fn get(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_id = req + .match_info() + .get("correlation_id") + .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; + + let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; + + if user_auth_for_query(&session_key, &correlation.query) + .await + .is_ok() + { + Ok(web::Json(correlation)) + } else { + Err(CorrelationError::Unauthorized) + } +} + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; + let correlation: CorrelationConfig = correlation_request.into(); + + // validate user's query auth + user_auth_for_query(&session_key, &correlation.query).await?; + + // Save to disk + let store = CONFIG.storage().get_object_store(); + store.put_correlation(&correlation).await?; + + // Save to memory + CORRELATIONS.update(&correlation).await?; + + Ok(format!( + "Saved correlation with ID- {}", + correlation.id.to_string() + )) +} + +pub async fn modify(req: HttpRequest, body: Bytes) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_id = req + .match_info() + .get("correlation_id") + .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; + + let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; + + // validate user's query auth + user_auth_for_query(&session_key, &correlation_request.query).await?; + + let correlation: CorrelationConfig = CorrelationConfig { + version: correlation_request.version, + id: Uid::from_string(correlation_id) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?, + query: correlation_request.query, + }; + + // Save to disk + let store = CONFIG.storage().get_object_store(); + store.put_correlation(&correlation).await?; + + // Save to memory + CORRELATIONS.update(&correlation).await?; + + Ok(format!( + "Modified correlation with ID- {}", + correlation.id.to_string() + )) +} + +pub async fn delete(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_id = req + .match_info() + .get("correlation_id") + .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; + + let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; + + // validate user's query auth + user_auth_for_query(&session_key, &correlation.query).await?; + + // Delete from disk + let store = CONFIG.storage().get_object_store(); + let path = RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + CORRELATION_DIRECTORY, + &format!("{}", correlation.id), + ]); + store.delete_object(&path).await?; + + // Delete from memory + CORRELATIONS.delete(correlation_id).await?; + Ok(format!("Deleted correlation with ID- {correlation_id}")) +} diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs new file mode 100644 index 000000000..35f990ce4 --- /dev/null +++ b/src/correlation/mod.rs @@ -0,0 +1,193 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::http::header::ContentType; +use correlation_utils::user_auth_for_query; +use http::StatusCode; +use itertools::Itertools; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use serde_json::Error as SerdeError; +use tokio::sync::RwLock; +use tracing::{trace, warn}; + +use crate::{ + handlers::http::rbac::RBACError, option::CONFIG, rbac::map::SessionKey, + storage::ObjectStorageError, utils::uid::Uid, +}; + +pub mod correlation_utils; +pub mod http_handlers; + +pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); + +#[derive(Debug, Default)] +pub struct Correlation(RwLock>); + +impl Correlation { + pub async fn load(&self) -> Result<(), CorrelationError> { + // lead correlations from storage + let store = CONFIG.storage().get_object_store(); + let all_correlations = store.get_correlations().await.unwrap_or_default(); + + let mut correlations = vec![]; + for corr in all_correlations { + if corr.is_empty() { + continue; + } + + let correlation: CorrelationConfig = serde_json::from_slice(&corr)?; + + correlations.push(correlation); + } + + let mut s = self.0.write().await; + s.append(&mut correlations.clone()); + Ok(()) + } + + pub async fn list_correlations_for_user( + &self, + session_key: &SessionKey, + ) -> Result, CorrelationError> { + let correlations = self.0.read().await.iter().cloned().collect_vec(); + + let mut user_correlations = vec![]; + for c in correlations { + if user_auth_for_query(session_key, &c.query).await.is_ok() { + user_correlations.push(c); + } + } + Ok(user_correlations) + } + + pub async fn get_correlation_by_id( + &self, + correlation_id: &str, + ) -> Result { + let read = self.0.read().await; + let correlation = read + .iter() + .find(|c| c.id.to_string() == correlation_id) + .cloned(); + + if let Some(c) = correlation { + Ok(c) + } else { + Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlation with ID- {correlation_id}" + )))) + } + } + + pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { + // save to memory + let mut s = self.0.write().await; + s.retain(|c| c.id != correlation.id); + s.push(correlation.clone()); + Ok(()) + } + + pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> { + // now delete from memory + let read_access = self.0.read().await; + + let index = read_access + .iter() + .enumerate() + .find(|(_, c)| c.id.to_string() == correlation_id) + .to_owned(); + + if let Some((index, _)) = index { + // drop the read access in order to get exclusive write access + drop(read_access); + self.0.write().await.remove(index); + trace!("removed correlation from memory"); + } else { + warn!("Correlation ID- {correlation_id} not found in memory!"); + } + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CorrelationVersion { + V1, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CorrelationConfig { + pub version: CorrelationVersion, + pub id: Uid, + pub query: String, +} + +impl CorrelationConfig {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CorrelationRequest { + pub version: CorrelationVersion, + pub query: String, +} + +impl From for CorrelationConfig { + fn from(val: CorrelationRequest) -> Self { + Self { + version: val.version, + id: crate::utils::uid::gen(), + query: val.query, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CorrelationError { + #[error("Failed to connect to storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), + #[error("User does not exist")] + UserDoesNotExist(#[from] RBACError), + #[error("Error: {0}")] + AnyhowError(#[from] anyhow::Error), + #[error("Unauthorized")] + Unauthorized, +} + +impl actix_web::ResponseError for CorrelationError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, + Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Unauthorized => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 792bb6571..32bf8e2d5 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::correlation::CORRELATIONS; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; @@ -50,6 +51,7 @@ impl ParseableServer for QueryServer { config .service( web::scope(&base_path()) + .service(Server::get_correlation_webscope()) .service(Server::get_query_factory()) .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) @@ -94,6 +96,9 @@ impl ParseableServer for QueryServer { //create internal stream at server start create_internal_stream_if_not_exists().await?; + if let Err(e) = CORRELATIONS.load().await { + error!("{e}"); + } FILTERS.load().await?; DASHBOARDS.load().await?; // track all parquet files already in the data directory diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 6c0ec9fd8..7646ae95e 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -17,6 +17,8 @@ */ use crate::analytics; +use crate::correlation; +use crate::correlation::CORRELATIONS; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; @@ -67,6 +69,7 @@ impl ParseableServer for Server { config .service( web::scope(&base_path()) + .service(Self::get_correlation_webscope()) .service(Self::get_query_factory()) .service(Self::get_trino_factory()) .service(Self::get_cache_webscope()) @@ -102,6 +105,9 @@ impl ParseableServer for Server { migration::run_migration(&CONFIG).await?; + if let Err(e) = CORRELATIONS.load().await { + error!("{e}"); + } FILTERS.load().await?; DASHBOARDS.load().await?; @@ -172,6 +178,41 @@ impl Server { ) } + pub fn get_correlation_webscope() -> Scope { + web::scope("/correlation") + .service( + web::resource("") + .route( + web::get() + .to(correlation::http_handlers::list) + .authorize(Action::GetCorrelation), + ) + .route( + web::post() + .to(correlation::http_handlers::post) + .authorize(Action::CreateCorrelation), + ), + ) + .service( + web::resource("/correlation/{correlation_id}") + .route( + web::get() + .to(correlation::http_handlers::get) + .authorize(Action::GetCorrelation), + ) + .route( + web::put() + .to(correlation::http_handlers::modify) + .authorize(Action::PutCorrelation), + ) + .route( + web::delete() + .to(correlation::http_handlers::delete) + .authorize(Action::DeleteCorrelation), + ), + ) + } + // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { web::scope("/dashboards") diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 27414b9d0..895032d20 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -52,7 +52,7 @@ use crate::utils::time::{TimeParseError, TimeRange}; use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, diff --git a/src/lib.rs b/src/lib.rs index 7a85f54e7..5c8e09274 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ pub mod analytics; pub mod banner; mod catalog; mod cli; +pub mod correlation; mod event; pub mod handlers; pub mod hottier; diff --git a/src/rbac/role.rs b/src/rbac/role.rs index f94c8f171..f383345ad 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -62,6 +62,10 @@ pub enum Action { DeleteFilter, Login, Metrics, + GetCorrelation, + CreateCorrelation, + DeleteCorrelation, + PutCorrelation, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -120,6 +124,10 @@ impl RoleBuilder { | Action::ListStream | Action::ListCluster | Action::ListClusterMetrics + | Action::CreateCorrelation + | Action::DeleteCorrelation + | Action::GetCorrelation + | Action::PutCorrelation | Action::Deleteingestor | Action::PutHotTierEnabled | Action::GetHotTierEnabled @@ -208,6 +216,10 @@ pub mod model { Action::DeleteStream, Action::ListStream, Action::GetStreamInfo, + Action::CreateCorrelation, + Action::DeleteCorrelation, + Action::GetCorrelation, + Action::PutCorrelation, Action::DetectSchema, Action::GetSchema, Action::GetStats, @@ -250,6 +262,10 @@ pub mod model { Action::PutHotTierEnabled, Action::GetHotTierEnabled, Action::DeleteHotTierEnabled, + Action::CreateCorrelation, + Action::DeleteCorrelation, + Action::GetCorrelation, + Action::PutCorrelation, Action::ListDashboard, Action::GetDashboard, Action::CreateDashboard, @@ -282,6 +298,10 @@ pub mod model { Action::GetFilter, Action::CreateFilter, Action::DeleteFilter, + Action::CreateCorrelation, + Action::DeleteCorrelation, + Action::GetCorrelation, + Action::PutCorrelation, Action::ListDashboard, Action::GetDashboard, Action::CreateDashboard, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a018c2b1c..8275b238a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -51,6 +51,7 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERT_FILE_NAME: &str = ".alert.json"; pub const MANIFEST_FILE: &str = "manifest.json"; +pub const CORRELATION_DIRECTORY: &str = ".correlations"; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 34a1bb631..4ec34834b 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,10 +21,11 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + ALERT_FILE_NAME, CORRELATION_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; @@ -631,6 +632,32 @@ pub trait ObjectStorage: Send + Sync + 'static { // pick a better name fn get_bucket_name(&self) -> String; + + async fn put_correlation( + &self, + correlation: &CorrelationConfig, + ) -> Result<(), ObjectStorageError> { + let path = RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + CORRELATION_DIRECTORY, + &format!("{}", correlation.id), + ]); + self.put_object(&path, to_bytes(correlation)).await?; + Ok(()) + } + + async fn get_correlations(&self) -> Result, CorrelationError> { + let correlation_path = + RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, CORRELATION_DIRECTORY]); + let correlation_bytes = self + .get_objects( + Some(&correlation_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + Ok(correlation_bytes) + } } pub async fn commit_schema_to_storage( From 54d76b9bfe086ddfcf7e8d93d312b0ae5ac17cf7 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 26 Dec 2024 13:50:44 +0530 Subject: [PATCH 2/5] refactor: modified struct --- src/correlation/correlation_utils.rs | 33 ++----- src/correlation/http_handlers.rs | 39 ++++---- src/correlation/mod.rs | 127 ++++++++++++++++++++++++--- src/handlers/http/modal/server.rs | 2 +- src/storage/object_storage.rs | 2 +- 5 files changed, 140 insertions(+), 63 deletions(-) diff --git a/src/correlation/correlation_utils.rs b/src/correlation/correlation_utils.rs index ec9d25c5c..0975836bf 100644 --- a/src/correlation/correlation_utils.rs +++ b/src/correlation/correlation_utils.rs @@ -16,39 +16,24 @@ * */ -use datafusion::common::tree_node::TreeNode; +use itertools::Itertools; -use crate::{ - query::{TableScanVisitor, QUERY_SESSION}, - rbac::{ - map::SessionKey, - role::{Action, Permission}, - Users, - }, +use crate::rbac::{ + map::SessionKey, + role::{Action, Permission}, + Users, }; -use super::CorrelationError; - -async fn get_tables_from_query(query: &str) -> Result { - let session_state = QUERY_SESSION.state(); - let raw_logical_plan = session_state - .create_logical_plan(query) - .await - .map_err(|err| CorrelationError::AnyhowError(err.into()))?; - - let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); - Ok(visitor) -} +use super::{CorrelationError, TableConfig}; pub async fn user_auth_for_query( session_key: &SessionKey, - query: &str, + table_configs: &[TableConfig], ) -> Result<(), CorrelationError> { - let tables = get_tables_from_query(query).await?; + let tables = table_configs.iter().map(|t| &t.table_name).collect_vec(); let permissions = Users.get_permissions(session_key); - for table_name in tables.into_inner().iter() { + for table_name in tables { let mut authorized = false; // in permission check if user can run query on the stream. diff --git a/src/correlation/http_handlers.rs b/src/correlation/http_handlers.rs index a3ac05359..d4d3986a7 100644 --- a/src/correlation/http_handlers.rs +++ b/src/correlation/http_handlers.rs @@ -23,7 +23,7 @@ use relative_path::RelativePathBuf; use crate::{ option::CONFIG, storage::{CORRELATION_DIRECTORY, PARSEABLE_ROOT_DIRECTORY}, - utils::{actix::extract_session_key_from_req, uid::Uid}, + utils::actix::extract_session_key_from_req, }; use super::{ @@ -53,7 +53,7 @@ pub async fn get(req: HttpRequest) -> Result { let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; - if user_auth_for_query(&session_key, &correlation.query) + if user_auth_for_query(&session_key, &correlation.table_configs) .await .is_ok() { @@ -68,10 +68,10 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result Result Result { @@ -95,17 +92,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Result Result { @@ -132,14 +123,14 @@ pub async fn delete(req: HttpRequest) -> Result Result { let read = self.0.read().await; - let correlation = read - .iter() - .find(|c| c.id.to_string() == correlation_id) - .cloned(); + let correlation = read.iter().find(|c| c.id == correlation_id).cloned(); if let Some(c) = correlation { Ok(c) @@ -110,7 +114,7 @@ impl Correlation { let index = read_access .iter() .enumerate() - .find(|(_, c)| c.id.to_string() == correlation_id) + .find(|(_, c)| c.id == correlation_id) .to_owned(); if let Some((index, _)) = index { @@ -126,6 +130,7 @@ impl Correlation { } #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum CorrelationVersion { V1, } @@ -134,8 +139,12 @@ pub enum CorrelationVersion { #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { pub version: CorrelationVersion, - pub id: Uid, - pub query: String, + pub id: String, + pub table_configs: Vec, + pub join_config: JoinConfig, + pub filter: Option, + pub start_time: Option, + pub end_time: Option, } impl CorrelationConfig {} @@ -144,16 +153,89 @@ impl CorrelationConfig {} #[serde(rename_all = "camelCase")] pub struct CorrelationRequest { pub version: CorrelationVersion, - pub query: String, + pub table_configs: Vec, + pub join_config: JoinConfig, + pub filter: Option, + pub start_time: Option, + pub end_time: Option, } impl From for CorrelationConfig { fn from(val: CorrelationRequest) -> Self { Self { version: val.version, - id: crate::utils::uid::gen(), - query: val.query, + id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), + table_configs: val.table_configs, + join_config: val.join_config, + filter: val.filter, + start_time: val.start_time, + end_time: val.end_time, + } + } +} + +impl CorrelationRequest { + pub fn generate_correlation_config(self, id: String) -> CorrelationConfig { + CorrelationConfig { + version: self.version, + id, + table_configs: self.table_configs, + join_config: self.join_config, + filter: self.filter, + start_time: self.start_time, + end_time: self.end_time, + } + } + + /// This function will validate the TableConfigs, JoinConfig, and user auth + pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> { + let ctx = &QUERY_SESSION; + + let h1: HashSet<&String> = self.table_configs.iter().map(|t| &t.table_name).collect(); + let h2 = HashSet::from([&self.join_config.table_one, &self.join_config.table_two]); + + // check if table config tables are the same + if h1.len() != 2 { + return Err(CorrelationError::Metadata( + "Must provide config for two unique tables", + )); + } + + // check that the tables mentioned in join config are + // the same as those in table config + if h1 != h2 { + return Err(CorrelationError::Metadata( + "Must provide same tables for join config and table config", + )); + } + + // check if user has access to table + user_auth_for_query(session_key, &self.table_configs).await?; + + // to validate table config, we need to check whether the mentioned fields + // are present in the table or not + for table_config in self.table_configs.iter() { + // table config check + let df = ctx.table(&table_config.table_name).await?; + + let mut selected_fields = table_config + .selected_fields + .iter() + .map(|c| c.as_str()) + .collect_vec(); + let join_field = if table_config.table_name == self.join_config.table_one { + &self.join_config.field_one + } else { + &self.join_config.field_two + }; + + selected_fields.push(join_field.as_str()); + + // if this errors out then the table config is incorrect or join config is incorrect + df.select_columns(selected_fields.as_slice())?; } + + Ok(()) } } @@ -171,6 +253,8 @@ pub enum CorrelationError { AnyhowError(#[from] anyhow::Error), #[error("Unauthorized")] Unauthorized, + #[error("DataFusion Error: {0}")] + DataFusion(#[from] DataFusionError), } impl actix_web::ResponseError for CorrelationError { @@ -182,6 +266,7 @@ impl actix_web::ResponseError for CorrelationError { Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::Unauthorized => StatusCode::BAD_REQUEST, + Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -191,3 +276,19 @@ impl actix_web::ResponseError for CorrelationError { .body(self.to_string()) } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TableConfig { + pub selected_fields: Vec, + pub table_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JoinConfig { + pub table_one: String, + pub field_one: String, + pub table_two: String, + pub field_two: String, +} diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 7646ae95e..d3925ba9d 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -194,7 +194,7 @@ impl Server { ), ) .service( - web::resource("/correlation/{correlation_id}") + web::resource("/{correlation_id}") .route( web::get() .to(correlation::http_handlers::get) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 4ec34834b..7c2b0084f 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -640,7 +640,7 @@ pub trait ObjectStorage: Send + Sync + 'static { let path = RelativePathBuf::from_iter([ PARSEABLE_ROOT_DIRECTORY, CORRELATION_DIRECTORY, - &format!("{}", correlation.id), + &format!("{}.json", correlation.id), ]); self.put_object(&path, to_bytes(correlation)).await?; Ok(()) From 4154bbc41cc265ef82bed76ea39cc3642ae89e6b Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 27 Dec 2024 08:10:16 +0530 Subject: [PATCH 3/5] refactor: Restructured the correlation module --- src/correlation/mod.rs | 48 +++++++++++++------ .../http/correlation.rs} | 2 +- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/server.rs | 11 ++--- 4 files changed, 41 insertions(+), 21 deletions(-) rename src/{correlation/http_handlers.rs => handlers/http/correlation.rs} (99%) diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs index d2dbdefdd..a64bfbfb4 100644 --- a/src/correlation/mod.rs +++ b/src/correlation/mod.rs @@ -28,7 +28,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; use tokio::sync::RwLock; -use tracing::{trace, warn}; +use tracing::{error, trace, warn}; use crate::{ handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey, @@ -36,7 +36,6 @@ use crate::{ }; pub mod correlation_utils; -pub mod http_handlers; pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); @@ -55,7 +54,13 @@ impl Correlation { continue; } - let correlation: CorrelationConfig = serde_json::from_slice(&corr)?; + let correlation: CorrelationConfig = match serde_json::from_slice(&corr) { + Ok(c) => c, + Err(e) => { + error!("Unable to load correlation- {e}"); + continue; + }, + }; correlations.push(correlation); } @@ -192,7 +197,12 @@ impl CorrelationRequest { let ctx = &QUERY_SESSION; let h1: HashSet<&String> = self.table_configs.iter().map(|t| &t.table_name).collect(); - let h2 = HashSet::from([&self.join_config.table_one, &self.join_config.table_two]); + let h2: HashSet<&String> = self + .join_config + .join_conditions + .iter() + .map(|j| &j.table_name) + .collect(); // check if table config tables are the same if h1.len() != 2 { @@ -223,13 +233,19 @@ impl CorrelationRequest { .iter() .map(|c| c.as_str()) .collect_vec(); - let join_field = if table_config.table_name == self.join_config.table_one { - &self.join_config.field_one - } else { - &self.join_config.field_two - }; - selected_fields.push(join_field.as_str()); + // unwrap because we have determined that the tables in table config are the same as those in join config + let condition = self + .join_config + .join_conditions + .iter() + .find(|j| j.table_name == table_config.table_name) + .unwrap(); + let join_field = condition.field.as_str(); + + if !selected_fields.contains(&join_field) { + selected_fields.push(join_field); + } // if this errors out then the table config is incorrect or join config is incorrect df.select_columns(selected_fields.as_slice())?; @@ -284,11 +300,15 @@ pub struct TableConfig { pub table_name: String, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JoinCondition { + pub table_name: String, + pub field: String, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct JoinConfig { - pub table_one: String, - pub field_one: String, - pub table_two: String, - pub field_two: String, + pub join_conditions: Vec, } diff --git a/src/correlation/http_handlers.rs b/src/handlers/http/correlation.rs similarity index 99% rename from src/correlation/http_handlers.rs rename to src/handlers/http/correlation.rs index d4d3986a7..4778286b0 100644 --- a/src/correlation/http_handlers.rs +++ b/src/handlers/http/correlation.rs @@ -26,7 +26,7 @@ use crate::{ utils::actix::extract_session_key_from_req, }; -use super::{ +use crate::correlation::{ correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS, }; diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f627b613a..8fabc8f1f 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -29,6 +29,7 @@ use self::{cluster::get_ingestor_info, query::Query}; pub mod about; pub mod cluster; +pub mod correlation; pub mod health_check; pub mod ingest; mod kinesis; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d3925ba9d..d50edb96e 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -17,7 +17,6 @@ */ use crate::analytics; -use crate::correlation; use crate::correlation::CORRELATIONS; use crate::handlers; use crate::handlers::http::about; @@ -184,12 +183,12 @@ impl Server { web::resource("") .route( web::get() - .to(correlation::http_handlers::list) + .to(http::correlation::list) .authorize(Action::GetCorrelation), ) .route( web::post() - .to(correlation::http_handlers::post) + .to(http::correlation::post) .authorize(Action::CreateCorrelation), ), ) @@ -197,17 +196,17 @@ impl Server { web::resource("/{correlation_id}") .route( web::get() - .to(correlation::http_handlers::get) + .to(http::correlation::get) .authorize(Action::GetCorrelation), ) .route( web::put() - .to(correlation::http_handlers::modify) + .to(http::correlation::modify) .authorize(Action::PutCorrelation), ) .route( web::delete() - .to(correlation::http_handlers::delete) + .to(http::correlation::delete) .authorize(Action::DeleteCorrelation), ), ) From 23f1b847dbf2c700942ea39812b19c00722a7ee0 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 30 Dec 2024 15:39:28 +0530 Subject: [PATCH 4/5] Updated base directory path --- src/correlation/mod.rs | 2 +- src/handlers/http/correlation.rs | 10 ++-------- src/storage/object_storage.rs | 4 +--- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs index a64bfbfb4..a5022407a 100644 --- a/src/correlation/mod.rs +++ b/src/correlation/mod.rs @@ -59,7 +59,7 @@ impl Correlation { Err(e) => { error!("Unable to load correlation- {e}"); continue; - }, + } }; correlations.push(correlation); diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 4778286b0..1fd1ffb3b 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -21,9 +21,7 @@ use bytes::Bytes; use relative_path::RelativePathBuf; use crate::{ - option::CONFIG, - storage::{CORRELATION_DIRECTORY, PARSEABLE_ROOT_DIRECTORY}, - utils::actix::extract_session_key_from_req, + option::CONFIG, storage::CORRELATION_DIRECTORY, utils::actix::extract_session_key_from_req, }; use crate::correlation::{ @@ -127,11 +125,7 @@ pub async fn delete(req: HttpRequest) -> Result Result<(), ObjectStorageError> { let path = RelativePathBuf::from_iter([ - PARSEABLE_ROOT_DIRECTORY, CORRELATION_DIRECTORY, &format!("{}.json", correlation.id), ]); @@ -640,8 +639,7 @@ pub trait ObjectStorage: Send + Sync + 'static { } async fn get_correlations(&self) -> Result, CorrelationError> { - let correlation_path = - RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, CORRELATION_DIRECTORY]); + let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIRECTORY]); let correlation_bytes = self .get_objects( Some(&correlation_path), From 8266092753eb4e798e1f36f5bec6f7b8d3ca4ad5 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 31 Dec 2024 11:10:53 +0530 Subject: [PATCH 5/5] updated stream check for local file storage --- src/handlers/http/correlation.rs | 6 ++++-- src/storage/localfs.rs | 18 ++++++++++++++---- src/storage/mod.rs | 2 +- src/storage/object_storage.rs | 6 +++--- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 1fd1ffb3b..dc5799c01 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -21,7 +21,8 @@ use bytes::Bytes; use relative_path::RelativePathBuf; use crate::{ - option::CONFIG, storage::CORRELATION_DIRECTORY, utils::actix::extract_session_key_from_req, + option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY, + utils::actix::extract_session_key_from_req, }; use crate::correlation::{ @@ -125,7 +126,8 @@ pub async fn delete(req: HttpRequest) -> Result Result, ObjectStorageError> { - let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR]; + let ignore_dir = &[ + "lost+found", + PARSEABLE_ROOT_DIRECTORY, + USERS_ROOT_DIR, + CORRELATIONS_ROOT_DIRECTORY, + ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries @@ -315,7 +321,11 @@ impl ObjectStorage for LocalFS { } async fn list_old_streams(&self) -> Result, ObjectStorageError> { - let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY]; + let ignore_dir = &[ + "lost+found", + PARSEABLE_ROOT_DIRECTORY, + CORRELATIONS_ROOT_DIRECTORY, + ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8275b238a..9b4c6163b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -51,7 +51,7 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERT_FILE_NAME: &str = ".alert.json"; pub const MANIFEST_FILE: &str = "manifest.json"; -pub const CORRELATION_DIRECTORY: &str = ".correlations"; +pub const CORRELATIONS_ROOT_DIRECTORY: &str = ".correlations"; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 220b995c4..b5b1e74f5 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,7 +21,7 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - ALERT_FILE_NAME, CORRELATION_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; @@ -631,7 +631,7 @@ pub trait ObjectStorage: Send + Sync + 'static { correlation: &CorrelationConfig, ) -> Result<(), ObjectStorageError> { let path = RelativePathBuf::from_iter([ - CORRELATION_DIRECTORY, + CORRELATIONS_ROOT_DIRECTORY, &format!("{}.json", correlation.id), ]); self.put_object(&path, to_bytes(correlation)).await?; @@ -639,7 +639,7 @@ pub trait ObjectStorage: Send + Sync + 'static { } async fn get_correlations(&self) -> Result, CorrelationError> { - let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIRECTORY]); + let correlation_path = RelativePathBuf::from_iter([CORRELATIONS_ROOT_DIRECTORY]); let correlation_bytes = self .get_objects( Some(&correlation_path),