Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions src/correlation/mod.rs → src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,32 @@ pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);

impl Correlation {
pub async fn load(&self) -> Result<(), CorrelationError> {
// lead correlations from storage
//load correlations from storage
pub async fn load(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();
let all_correlations = store.get_correlations().await.unwrap_or_default();

let mut correlations = vec![];
for corr in all_correlations {
if corr.is_empty() {
continue;
}

let correlation: CorrelationConfig = match serde_json::from_slice(&corr) {
Ok(c) => c,
Err(e) => {
error!("Unable to load correlation- {e}");
continue;
}
};

correlations.push(correlation);
}
let all_correlations = store.get_all_correlations().await.unwrap_or_default();

let correlations: Vec<CorrelationConfig> = all_correlations
.into_iter()
.flat_map(|(_, correlations_bytes)| correlations_bytes)
.filter_map(|correlation| {
serde_json::from_slice(&correlation)
.inspect_err(|e| {
error!("Unable to load correlation: {e}");
})
.ok()
})
.collect();

let mut s = self.0.write().await;
s.append(&mut correlations.clone());
s.extend(correlations);
Ok(())
}

pub async fn list_correlations_for_user(
&self,
session_key: &SessionKey,
user_id: &str,
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
let correlations = self.0.read().await.iter().cloned().collect_vec();

Expand All @@ -87,27 +83,29 @@ impl Correlation {
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_query(&permissions, tables).is_ok() {
if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id {
user_correlations.push(c);
}
}
Ok(user_correlations)
}

pub async fn get_correlation_by_id(
pub async fn get_correlation(
&self,
correlation_id: &str,
user_id: &str,
) -> Result<CorrelationConfig, CorrelationError> {
let read = self.0.read().await;
let correlation = read.iter().find(|c| c.id == correlation_id).cloned();
let correlation = read
.iter()
.find(|c| c.id == correlation_id && c.user_id == user_id)
.cloned();

if let Some(c) = correlation {
Ok(c)
} else {
Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
correlation.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
))))
}
)))
})
}

pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
Expand Down Expand Up @@ -152,6 +150,7 @@ pub struct CorrelationConfig {
pub version: CorrelationVersion,
pub title: String,
pub id: String,
pub user_id: String,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
pub filter: Option<FilterQuery>,
Expand All @@ -164,7 +163,6 @@ impl CorrelationConfig {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorrelationRequest {
pub version: CorrelationVersion,
pub title: String,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
Expand All @@ -176,9 +174,10 @@ pub struct CorrelationRequest {
impl From<CorrelationRequest> for CorrelationConfig {
fn from(val: CorrelationRequest) -> Self {
Self {
version: val.version,
version: CorrelationVersion::V1,
title: val.title,
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
user_id: String::default(),
table_configs: val.table_configs,
join_config: val.join_config,
filter: val.filter,
Expand All @@ -189,11 +188,12 @@ impl From<CorrelationRequest> for CorrelationConfig {
}

impl CorrelationRequest {
pub fn generate_correlation_config(self, id: String) -> CorrelationConfig {
pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig {
CorrelationConfig {
version: self.version,
version: CorrelationVersion::V1,
title: self.title,
id,
user_id,
table_configs: self.table_configs,
join_config: self.join_config,
filter: self.filter,
Expand Down
76 changes: 53 additions & 23 deletions src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,48 @@
*/

use actix_web::{web, HttpRequest, HttpResponse, Responder};
use anyhow::Error;
use bytes::Bytes;
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::rbac::Users;
use crate::utils::user_auth_for_query;
use crate::{
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
utils::actix::extract_session_key_from_req,
};
use crate::storage::object_storage::correlation_path;
use crate::utils::{get_hash, get_user_from_request, user_auth_for_query};
use crate::{option::CONFIG, utils::actix::extract_session_key_from_req};

use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};

pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlations = CORRELATIONS
.list_correlations_for_user(&session_key)
.list_correlations_for_user(&session_key, &user_id)
.await?;

Ok(web::Json(correlations))
}

pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlation_id = req
.match_info()
.get("correlation_id")
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;

let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
let correlation = CORRELATIONS
.get_correlation(correlation_id, &user_id)
.await?;

let permissions = Users.get_permissions(&session_key);

Expand All @@ -68,16 +76,24 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;

correlation_request.validate(&session_key).await?;

let correlation: CorrelationConfig = correlation_request.into();
let mut correlation: CorrelationConfig = correlation_request.into();
correlation.user_id.clone_from(&user_id);
let correlation_id = &correlation.id;
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));

// Save to disk
let store = CONFIG.storage().get_object_store();
store.put_correlation(&correlation).await?;
let correlation_bytes = serde_json::to_vec(&correlation)?;
store
.put_object(&path, Bytes::from(correlation_bytes))
.await?;

// Save to memory
CORRELATIONS.update(&correlation).await?;
Expand All @@ -88,14 +104,19 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, Corre
pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlation_id = req
.match_info()
.get("correlation_id")
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;

// validate whether user has access to this correlation object or not
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
let correlation = CORRELATIONS
.get_correlation(correlation_id, &user_id)
.await?;
let permissions = Users.get_permissions(&session_key);
let tables = &correlation
.table_configs
Expand All @@ -108,11 +129,17 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
correlation_request.validate(&session_key).await?;

let correlation = correlation_request.generate_correlation_config(correlation_id.to_owned());
let correlation =
correlation_request.generate_correlation_config(correlation_id.to_owned(), user_id.clone());

let correlation_id = &correlation.id;
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));

// Save to disk
let store = CONFIG.storage().get_object_store();
store.put_correlation(&correlation).await?;
let correlation_bytes = serde_json::to_vec(&correlation)?;
store
.put_object(&path, Bytes::from(correlation_bytes))
.await?;

// Save to memory
CORRELATIONS.update(&correlation).await?;
Expand All @@ -123,13 +150,18 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlation_id = req
.match_info()
.get("correlation_id")
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;

let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
let correlation = CORRELATIONS
.get_correlation(correlation_id, &user_id)
.await?;

// validate user's query auth
let permissions = Users.get_permissions(&session_key);
Expand All @@ -141,12 +173,10 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError

user_auth_for_query(&permissions, tables)?;

// Delete from disk
let correlation_id = &correlation.id;
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));

let store = CONFIG.storage().get_object_store();
let path = RelativePathBuf::from_iter([
CORRELATIONS_ROOT_DIRECTORY,
&format!("{}.json", correlation_id),
]);
store.delete_object(&path).await?;

// Delete from memory
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/users/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub mod filters;
pub const USERS_ROOT_DIR: &str = ".users";
pub const DASHBOARDS_DIR: &str = "dashboards";
pub const FILTER_DIR: &str = "filters";
pub const CORRELATION_DIR: &str = "correlations";
Loading
Loading