From 6d716ce7a85852a2f0a237d55b9e46fc4e82c657 Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 4 Jul 2025 16:33:51 +0530 Subject: [PATCH 1/9] feat: Introduce User Groups to Parseable - Added support for user groups - Migrated `PUT /user/{username}/role` to `PATCH /user/{username}/role/add` and `PATCH /user/{username}/role/remove` --- src/handlers/http/cluster/mod.rs | 13 +- .../http/modal/ingest/ingestor_rbac.rs | 80 ++++- src/handlers/http/modal/ingest_server.rs | 18 +- src/handlers/http/modal/query/querier_rbac.rs | 235 ++++++++++++++- src/handlers/http/modal/query_server.rs | 33 +- src/handlers/http/modal/server.rs | 32 +- src/handlers/http/oidc.rs | 2 +- src/handlers/http/rbac.rs | 285 +++++++++++++++++- src/handlers/http/role.rs | 42 ++- src/migration/metadata_migration.rs | 23 ++ src/migration/mod.rs | 31 +- src/rbac/map.rs | 74 ++++- src/rbac/mod.rs | 46 ++- src/rbac/role.rs | 89 ++++-- src/rbac/user.rs | 163 +++++++++- src/rbac/utils.rs | 29 +- src/storage/store_metadata.rs | 11 +- src/utils/mod.rs | 3 +- 18 files changed, 1095 insertions(+), 114 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 6fb787af4..6eab244bc 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -160,7 +160,13 @@ pub async fn sync_streams_with_ingestors( pub async fn sync_users_with_roles_with_ingestors( username: &str, role: &HashSet, + operation: &str, ) -> Result<(), RBACError> { + match operation { + "add" | "remove" => {} + _ => return Err(RBACError::InvalidSyncOperation(operation.to_string())), + } + let role_data = to_vec(&role.clone()).map_err(|err| { error!("Fatal: failed to serialize role: {:?}", err); RBACError::SerdeError(err) @@ -168,12 +174,15 @@ pub async fn sync_users_with_roles_with_ingestors( let username = username.to_owned(); + let op = operation.to_string(); + for_each_live_ingestor(move |ingestor| { let url = format!( - "{}{}/user/{}/role/sync", + "{}{}/user/{}/role/sync/{}", ingestor.domain_name, base_path_without_preceding_slash(), - username + username, + op ); let role_data = role_data.clone(); diff --git a/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs index f25abe688..dc93af60f 100644 --- a/src/handlers/http/modal/ingest/ingestor_rbac.rs +++ b/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -48,7 +48,7 @@ pub async fn post_user( let _ = storage::put_staging_metadata(&metadata); let created_role = user.roles.clone(); Users.put_user(user.clone()); - Users.put_role(&username, created_role.clone()); + Users.add_roles(&username, created_role.clone()); } Ok(generated_password) @@ -73,14 +73,45 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user -// Put roles for given user -pub async fn put_role( +// // Handler PUT /user/{username}/roles => Put roles for user +// // Put roles for given user +// pub async fn put_role( +// username: web::Path, +// role: web::Json>, +// ) -> Result { +// let username = username.into_inner(); +// let role = role.into_inner(); + +// if !Users.contains(&username) { +// return Err(RBACError::UserDoesNotExist); +// }; +// // update parseable.json first +// let mut metadata = get_metadata().await?; +// if let Some(user) = metadata +// .users +// .iter_mut() +// .find(|user| user.username() == username) +// { +// user.roles.clone_from(&role); +// } else { +// // should be unreachable given state is always consistent +// return Err(RBACError::UserDoesNotExist); +// } + +// let _ = storage::put_staging_metadata(&metadata); +// // update in mem table +// Users.add_roles(&username.clone(), role.clone()); + +// Ok(format!("Roles updated successfully for {username}")) +// } + +// Handler PATCH /user/{username}/role/sync/add => Add roles to a user +pub async fn add_roles_to_user( username: web::Path, - role: web::Json>, + roles_to_add: web::Json>, ) -> Result { let username = username.into_inner(); - let role = role.into_inner(); + let roles_to_add = roles_to_add.into_inner(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); @@ -92,7 +123,7 @@ pub async fn put_role( .iter_mut() .find(|user| user.username() == username) { - user.roles.clone_from(&role); + user.roles.extend(roles_to_add.clone()); } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); @@ -100,7 +131,40 @@ pub async fn put_role( let _ = storage::put_staging_metadata(&metadata); // update in mem table - Users.put_role(&username.clone(), role.clone()); + Users.add_roles(&username.clone(), roles_to_add.clone()); + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler PATCH /user/{username}/role/sync/add => Add roles to a user +pub async fn remove_roles_from_user( + username: web::Path, + roles_to_remove: web::Json>, +) -> Result { + let username = username.into_inner(); + let roles_to_remove = roles_to_remove.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + let _ = storage::put_staging_metadata(&metadata); + // update in mem table + Users.remove_roles(&username.clone(), roles_to_remove.clone()); Ok(format!("Roles updated successfully for {username}")) } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index c0564449c..3057e3a83 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -198,11 +198,21 @@ impl IngestServer { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role/sync") - // PUT /user/{username}/roles => Put roles for user + web::resource("/{username}/role/sync/add") + // PATCH /user/{username}/role/sync/add => Add roles to a user .route( - web::put() - .to(ingestor_rbac::put_role) + web::patch() + .to(ingestor_rbac::add_roles_to_user) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ), + ) + .service( + web::resource("/{username}/role/sync/remove") + // PATCH /user/{username}/role/sync/remove => Remove roles from a user + .route( + web::patch() + .to(ingestor_rbac::remove_roles_from_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index ae2af1c2d..59d4bee99 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -30,13 +30,65 @@ use crate::{ modal::utils::rbac_utils::{get_metadata, put_metadata}, rbac::RBACError, }, - rbac::{user, Users}, + rbac::{ + map::{mut_users, roles, write_user_groups}, + user, Users, + }, validator, }; -// async aware lock for updating storage metadata and user map atomicically +// async aware lock for updating storage metadata and user map atomically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); +// // Handler for POST /api/v1/user/{username} +// // Creates a new user by username if it does not exists +// pub async fn post_user( +// username: web::Path, +// body: Option>, +// ) -> Result { +// let username = username.into_inner(); + +// let mut metadata = get_metadata().await?; + +// validator::user_name(&username)?; +// let roles: HashSet = if let Some(body) = body { +// serde_json::from_value(body.into_inner())? +// } else { +// return Err(RBACError::RoleValidationError); +// }; + +// if roles.is_empty() { +// return Err(RBACError::RoleValidationError); +// } +// let _ = UPDATE_LOCK.lock().await; +// if Users.contains(&username) +// || metadata +// .users +// .iter() +// .any(|user| user.username() == username) +// { +// return Err(RBACError::UserExists); +// } + +// let (user, password) = user::User::new_basic(username.clone()); + +// metadata.users.push(user.clone()); + +// put_metadata(&metadata).await?; +// let created_role = roles.clone(); +// Users.put_user(user.clone()); + +// sync_user_creation_with_ingestors(user, &Some(roles)).await?; + +// put_role( +// web::Path::::from(username.clone()), +// web::Json(created_role), +// ) +// .await?; + +// Ok(password) +// } + // Handler for POST /api/v1/user/{username} // Creates a new user by username if it does not exists pub async fn post_user( @@ -48,14 +100,27 @@ pub async fn post_user( let mut metadata = get_metadata().await?; validator::user_name(&username)?; - let roles: HashSet = if let Some(body) = body { + let user_roles: HashSet = if let Some(body) = body { serde_json::from_value(body.into_inner())? } else { return Err(RBACError::RoleValidationError); }; - if roles.is_empty() { + if user_roles.is_empty() { return Err(RBACError::RoleValidationError); + } else { + let mut non_existant_roles = Vec::new(); + user_roles + .iter() + .map(|r| { + if !roles().contains_key(r) { + non_existant_roles.push(r.clone()); + } + }) + .for_each(drop); + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } } let _ = UPDATE_LOCK.lock().await; if Users.contains(&username) @@ -72,12 +137,12 @@ pub async fn post_user( metadata.users.push(user.clone()); put_metadata(&metadata).await?; - let created_role = roles.clone(); + let created_role = user_roles.clone(); Users.put_user(user.clone()); - sync_user_creation_with_ingestors(user, &Some(roles)).await?; + sync_user_creation_with_ingestors(user, &Some(user_roles)).await?; - put_role( + add_roles_to_user( web::Path::::from(username.clone()), web::Json(created_role), ) @@ -98,6 +163,40 @@ pub async fn delete_user(username: web::Path) -> Result) -> Result Put roles for user -// Put roles for given user -pub async fn put_role( +// // Handler PUT /user/{username}/roles => Put roles for user +// // Put roles for given user +// pub async fn put_role( +// username: web::Path, +// role: web::Json>, +// ) -> Result { +// let username = username.into_inner(); +// let role = role.into_inner(); + +// if !Users.contains(&username) { +// return Err(RBACError::UserDoesNotExist); +// }; +// // update parseable.json first +// let mut metadata = get_metadata().await?; +// if let Some(user) = metadata +// .users +// .iter_mut() +// .find(|user| user.username() == username) +// { +// user.roles.clone_from(&role); +// } else { +// // should be unreachable given state is always consistent +// return Err(RBACError::UserDoesNotExist); +// } + +// put_metadata(&metadata).await?; +// // update in mem table +// Users.put_role(&username.clone(), role.clone()); + +// sync_users_with_roles_with_ingestors(&username, &role).await?; + +// Ok(format!("Roles updated successfully for {username}")) +// } + +// Handler PATCH /user/{username}/role/add => Add roles to a user +pub async fn add_roles_to_user( + username: web::Path, + roles_to_add: web::Json>, +) -> Result { + let username = username.into_inner(); + let roles_to_add = roles_to_add.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + + let mut non_existant_roles = Vec::new(); + + // check if the role exists + roles_to_add.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existant_roles.push(r.clone()); + } + }); + + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } + + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.extend(roles_to_add.clone()); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + put_metadata(&metadata).await?; + // update in mem table + Users.add_roles(&username.clone(), roles_to_add.clone()); + + sync_users_with_roles_with_ingestors(&username, &roles_to_add, "add").await?; + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler PATCH /user/{username}/role/remove => Remove roles from a user +pub async fn remove_roles_from_user( username: web::Path, - role: web::Json>, + roles_to_remove: web::Json>, ) -> Result { let username = username.into_inner(); - let role = role.into_inner(); + let roles_to_remove = roles_to_remove.into_inner(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + let mut non_existant_roles = Vec::new(); + + // check if the role exists + roles_to_remove.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existant_roles.push(r.clone()); + } + }); + + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } + + // check for role not present with user + let user_roles: HashSet = HashSet::from_iter(Users.get_role(&username)); + let roles_not_with_user: HashSet = + HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); + if !roles_not_with_user.is_empty() { + return Err(RBACError::RolesNotAssigned(Vec::from_iter( + roles_not_with_user, + ))); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata @@ -126,7 +329,9 @@ pub async fn put_role( .iter_mut() .find(|user| user.username() == username) { - user.roles.clone_from(&role); + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); @@ -134,9 +339,9 @@ pub async fn put_role( put_metadata(&metadata).await?; // update in mem table - Users.put_role(&username.clone(), role.clone()); + Users.remove_roles(&username.clone(), roles_to_remove.clone()); - sync_users_with_roles_with_ingestors(&username, &role).await?; + sync_users_with_roles_with_ingestors(&username, &roles_to_remove, "remove").await?; Ok(format!("Roles updated successfully for {username}")) } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 595c52833..d2f027535 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -94,9 +94,8 @@ impl ParseableServer for QueryServer { )); } - let parseable_json = PARSEABLE.validate_storage().await?; - migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?; - + let mut parseable_json = PARSEABLE.validate_storage().await?; + migration::run_metadata_migration(&PARSEABLE, &mut parseable_json).await?; Ok(parseable_json) } @@ -209,18 +208,30 @@ impl QueryServer { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user + web::resource("/{username}/role").route( + web::get() + .to(rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/role/add") + // PATCH /user/{username}/role/add => Add roles to a user .route( - web::put() - .to(querier_rbac::put_role) + web::patch() + .to(rbac::add_roles_to_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), - ) + ), + ) + .service( + web::resource("/{username}/role/remove") + // PATCH /user/{username}/role/remove => Remove roles from a user .route( - web::get() - .to(rbac::get_role) - .authorize_for_user(Action::GetUserRoles), + web::patch() + .to(rbac::remove_roles_from_user) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), ), ) .service( diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 575f2925d..a65915156 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -114,8 +114,8 @@ impl ParseableServer for Server { async fn load_metadata(&self) -> anyhow::Result> { //TODO: removed file migration //deprecated support for deployments < v1.0.0 - let parseable_json = PARSEABLE.validate_storage().await?; - migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?; + let mut parseable_json = PARSEABLE.validate_storage().await?; + migration::run_metadata_migration(&PARSEABLE, &mut parseable_json).await?; Ok(parseable_json) } @@ -622,18 +622,30 @@ impl Server { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user + web::resource("/{username}/role").route( + web::get() + .to(http::rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/role/add") + // PATCH /user/{username}/role/add => Add roles to a user .route( - web::put() - .to(http::rbac::put_role) + web::patch() + .to(http::rbac::add_roles_to_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), - ) + ), + ) + .service( + web::resource("/{username}/role/remove") + // PATCH /user/{username}/role/remove => Remove roles from a user .route( - web::get() - .to(http::rbac::get_role) - .authorize_for_user(Action::GetUserRoles), + web::patch() + .to(http::rbac::remove_roles_from_user) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), ), ) .service( diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index 13e65a4e1..54baec3ac 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -344,7 +344,7 @@ async fn update_user_if_changed( group: HashSet, user_info: user::UserInfo, ) -> Result { - let User { ty, roles } = &mut user; + let User { ty, roles, .. } = &mut user; let UserType::OAuth(oauth_user) = ty else { unreachable!() }; diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 3870c88a9..a7e43fc7a 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -19,7 +19,14 @@ use std::collections::{HashMap, HashSet}; use crate::{ - rbac::{self, map::roles, role::model::DefaultPrivilege, user, utils::to_prism_user, Users}, + rbac::{ + self, + map::{mut_users, read_user_groups, roles, write_user_groups}, + role::model::DefaultPrivilege, + user, + utils::to_prism_user, + Users, + }, storage::ObjectStorageError, validator::{self, error::UsernameValidationError}, }; @@ -30,6 +37,8 @@ use actix_web::{ }; use http::StatusCode; use itertools::Itertools; +use serde::Serialize; +use serde_json::json; use tokio::sync::Mutex; use super::modal::utils::rbac_utils::{get_metadata, put_metadata}; @@ -97,14 +106,27 @@ pub async fn post_user( let mut metadata = get_metadata().await?; validator::user_name(&username)?; - let roles: HashSet = if let Some(body) = body { + let user_roles: HashSet = if let Some(body) = body { serde_json::from_value(body.into_inner())? } else { return Err(RBACError::RoleValidationError); }; - if roles.is_empty() { + if user_roles.is_empty() { return Err(RBACError::RoleValidationError); + } else { + let mut non_existant_roles = Vec::new(); + user_roles + .iter() + .map(|r| { + if !roles().contains_key(r) { + non_existant_roles.push(r.clone()); + } + }) + .for_each(drop); + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } } let _ = UPDATE_LOCK.lock().await; if Users.contains(&username) @@ -121,10 +143,10 @@ pub async fn post_user( metadata.users.push(user.clone()); put_metadata(&metadata).await?; - let created_role = roles.clone(); + let created_role = user_roles.clone(); Users.put_user(user.clone()); - put_role( + add_roles_to_user( web::Path::::from(username.clone()), web::Json(created_role), ) @@ -170,7 +192,7 @@ pub async fn get_role(username: web::Path) -> Result> = Users + let direct_roles: HashMap> = Users .get_role(&username) .iter() .filter_map(|role_name| { @@ -180,6 +202,26 @@ pub async fn get_role(username: web::Path) -> Result>> = HashMap::new(); + // user might be part of some user groups, fetch the roles from there as well + for user_group in Users.get_user_groups(&username) { + if let Some(group) = read_user_groups().get(&user_group) { + let ug_roles: HashMap> = group + .roles + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + group_roles.insert(group.name.clone(), ug_roles); + } + } + let res = RolesResponse { + direct_roles, + group_roles, + }; Ok(web::Json(res)) } @@ -195,6 +237,40 @@ pub async fn delete_user(username: web::Path) -> Result) -> Result Put roles for user -// Put roles for given user -pub async fn put_role( +// // Handler PUT /user/{username}/roles => Put roles for user +// // Put roles for given user +// pub async fn put_role( +// username: web::Path, +// role: web::Json>, +// ) -> Result { +// let username = username.into_inner(); +// let role = role.into_inner(); + +// if !Users.contains(&username) { +// return Err(RBACError::UserDoesNotExist); +// }; +// // update parseable.json first +// let mut metadata = get_metadata().await?; +// if let Some(user) = metadata +// .users +// .iter_mut() +// .find(|user| user.username() == username) +// { +// user.roles.clone_from(&role); +// } else { +// // should be unreachable given state is always consistent +// return Err(RBACError::UserDoesNotExist); +// } + +// put_metadata(&metadata).await?; +// // update in mem table +// Users.add_roles(&username.clone(), role.clone()); + +// Ok(format!("Roles updated successfully for {username}")) +// } + +// Handler PATCH /user/{username}/role/add => Add roles to a user +pub async fn add_roles_to_user( + username: web::Path, + roles_to_add: web::Json>, +) -> Result { + let username = username.into_inner(); + let roles_to_add = roles_to_add.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + + let mut non_existant_roles = Vec::new(); + + // check if the role exists + roles_to_add.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existant_roles.push(r.clone()); + } + }); + + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } + + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.extend(roles_to_add.clone()); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + put_metadata(&metadata).await?; + // update in mem table + Users.add_roles(&username.clone(), roles_to_add); + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler PATCH /user/{username}/role/remove => Remove roles from a user +pub async fn remove_roles_from_user( username: web::Path, - role: web::Json>, + roles_to_remove: web::Json>, ) -> Result { let username = username.into_inner(); - let role = role.into_inner(); + let roles_to_remove = roles_to_remove.into_inner(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + let mut non_existant_roles = Vec::new(); + + // check if the role exists + roles_to_remove.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existant_roles.push(r.clone()); + } + }); + + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } + + // check for role not present with user + let user_roles: HashSet = HashSet::from_iter(Users.get_role(&username)); + let roles_not_with_user: HashSet = + HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); + if !roles_not_with_user.is_empty() { + return Err(RBACError::RolesNotAssigned(Vec::from_iter( + roles_not_with_user, + ))); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata @@ -221,7 +397,9 @@ pub async fn put_role( .iter_mut() .find(|user| user.username() == username) { - user.roles.clone_from(&role); + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); @@ -229,11 +407,40 @@ pub async fn put_role( put_metadata(&metadata).await?; // update in mem table - Users.put_role(&username.clone(), role.clone()); + Users.remove_roles(&username.clone(), roles_to_remove); Ok(format!("Roles updated successfully for {username}")) } +#[derive(Debug, Serialize)] +#[serde(rename = "camelCase")] +pub struct InvalidUserGroupError { + pub valid_name: bool, + pub non_existant_roles: Vec, + pub non_existant_users: Vec, + pub roles_not_in_group: Vec, + pub users_not_in_group: Vec, + pub comments: String, +} + +// impl Display for InvalidUserGroupRequestStruct { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// if !self.invalid_name { +// write!( +// f, +// "Invalid user group request- {{invalidName: {}\nnonExistantRoles: {:?}\nnonExistantUsers: {:?}\nThe name should follow this regex- ^[A-Za-z0-9_-]+$}}", +// self.invalid_name, self.non_existant_roles, self.non_existant_users +// ) +// } else { +// write!( +// f, +// "Invalid user group request- {{nonExistantRoles: {:?}\nnonExistantUsers: {:?}}}", +// self.non_existant_roles, self.non_existant_users +// ) +// } +// } +// } + #[derive(Debug, thiserror::Error)] pub enum RBACError { #[error("User exists already")] @@ -252,6 +459,20 @@ pub enum RBACError { Anyhow(#[from] anyhow::Error), #[error("User cannot be created without a role")] RoleValidationError, + #[error("User group already exists: '{0}'")] + UserGroupExists(String), + #[error("UserGroup does not exist: {0}")] + UserGroupDoesNotExist(String), + #[error("Invalid Roles: {0:?}")] + RolesDoNotExist(Vec), + #[error("Roles have not been assigned: {0:?}")] + RolesNotAssigned(Vec), + #[error("{0:?}")] + InvalidUserGroupRequest(Box), + #[error("{0}")] + InvalidSyncOperation(String), + #[error("User group still being used by users: {0}")] + UserGroupNotEmpty(String), } impl actix_web::ResponseError for RBACError { @@ -265,12 +486,44 @@ impl actix_web::ResponseError for RBACError { Self::Network(_) => StatusCode::BAD_GATEWAY, Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::RoleValidationError => StatusCode::BAD_REQUEST, + Self::UserGroupExists(_) => StatusCode::BAD_REQUEST, + Self::UserGroupDoesNotExist(_) => StatusCode::BAD_REQUEST, + Self::RolesDoNotExist(_) => StatusCode::BAD_REQUEST, + Self::RolesNotAssigned(_) => StatusCode::BAD_REQUEST, + Self::InvalidUserGroupRequest(_) => StatusCode::BAD_REQUEST, + Self::InvalidSyncOperation(_) => StatusCode::BAD_REQUEST, + Self::UserGroupNotEmpty(_) => StatusCode::BAD_REQUEST, } } fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::plaintext()) - .body(self.to_string()) + match self { + RBACError::RolesNotAssigned(obj) => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .json(json!({ + "roles_not_assigned": obj + })), + RBACError::RolesDoNotExist(obj) => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .json(json!({ + "non_existant_roles": obj + })), + RBACError::InvalidUserGroupRequest(obj) => { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .json(obj) + } + _ => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()), + } } } + +#[derive(Serialize)] +#[serde(rename = "camelCase")] +pub struct RolesResponse { + #[serde(rename = "roles")] + pub direct_roles: HashMap>, + pub group_roles: HashMap>>, +} diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index c649ac248..7e7e303a5 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -16,17 +16,20 @@ * */ +use std::collections::HashSet; + use actix_web::{ http::header::ContentType, web::{self, Json}, HttpResponse, Responder, }; use http::StatusCode; +use itertools::Itertools; use crate::{ parseable::PARSEABLE, rbac::{ - map::{mut_roles, DEFAULT_ROLE}, + map::{mut_roles, read_user_groups, write_user_groups, DEFAULT_ROLE}, role::model::DefaultPrivilege, }, storage::{self, ObjectStorageError, StorageMetadata}, @@ -73,7 +76,7 @@ pub async fn list_roles() -> Result { Ok(web::Json(roles)) } -// Handler for DELETE /api/v1/role/{username} +// Handler for DELETE /api/v1/role/{name} // Delete existing role pub async fn delete(name: web::Path) -> Result { let name = name.into_inner(); @@ -84,6 +87,41 @@ pub async fn delete(name: web::Path) -> Result JsonValue { storage_metadata } +pub fn v5_v6(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + metadata.remove_entry("version"); + metadata.insert("version".to_string(), JsonValue::String("v6".to_string())); + + // If user_groups is missing, add an empty array or your default structure + if !metadata.contains_key("user_groups") { + metadata.insert("user_groups".to_string(), JsonValue::Array(vec![])); + } + + // introduce user groups entry for all users + let users = metadata.get_mut("users").unwrap().as_array_mut().unwrap(); + for user in users.iter_mut() { + if !user.as_object_mut().unwrap().contains_key("user_groups") { + user.as_object_mut() + .unwrap() + .insert("user_groups".to_string(), JsonValue::Array(vec![])); + } + } + + storage_metadata +} + /// Remove the querier endpoint and auth token from the storage metadata pub fn remove_querier_metadata(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); diff --git a/src/migration/mod.rs b/src/migration/mod.rs index d11ae3f79..24d67f2a4 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -45,7 +45,7 @@ use crate::{ /// This is a one time migration pub async fn run_metadata_migration( config: &Parseable, - parseable_json: &Option, + parseable_json: &mut Option, ) -> anyhow::Result<()> { let object_store = config.storage.get_object_store(); let mut storage_metadata: Option = None; @@ -62,45 +62,54 @@ pub async fn run_metadata_migration( .and_then(|version| version.as_str()) } + warn!(verion=?get_version(storage_metadata.as_ref().unwrap())); // if storage metadata is none do nothing if let Some(storage_metadata) = storage_metadata { match get_version(&storage_metadata) { Some("v1") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v1_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v2") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v2_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v3") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v3_v4(storage_metadata); metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v4") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v4_v5(storage_metadata); - metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + Some("v5") => { + let metadata = metadata_migration::v5_v6(storage_metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } _ => { - //remove querier endpooint and token from storage metadata let metadata = metadata_migration::remove_querier_metadata(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 02adc3463..039b70602 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -16,8 +16,9 @@ * */ -use crate::rbac::user::User; +use crate::rbac::user::{User, UserGroup}; use crate::{parseable::PARSEABLE, storage::StorageMetadata}; +use std::collections::HashSet; use std::{collections::HashMap, sync::Mutex}; use super::Response; @@ -35,6 +36,23 @@ pub static USERS: OnceCell> = OnceCell::new(); pub static ROLES: OnceCell> = OnceCell::new(); pub static DEFAULT_ROLE: Lazy>> = Lazy::new(|| Mutex::new(None)); pub static SESSIONS: OnceCell> = OnceCell::new(); +pub static USER_GROUPS: OnceCell> = OnceCell::new(); + +pub fn read_user_groups() -> RwLockReadGuard<'static, UserGroups> { + USER_GROUPS + .get() + .expect("UserGroups map not created") + .read() + .expect("UserGroups map is poisoned") +} + +pub fn write_user_groups() -> RwLockWriteGuard<'static, UserGroups> { + USER_GROUPS + .get() + .expect("UserGroups map not created") + .write() + .expect("UserGroups map is poisoned") +} pub fn users() -> RwLockReadGuard<'static, Users> { USERS @@ -90,6 +108,7 @@ pub fn mut_sessions() -> RwLockWriteGuard<'static, Sessions> { // as users authenticate pub fn init(metadata: &StorageMetadata) { let users = metadata.users.clone(); + let user_groups = metadata.user_groups.clone(); let mut roles = metadata.roles.clone(); DEFAULT_ROLE @@ -122,6 +141,9 @@ pub fn init(metadata: &StorageMetadata) { SESSIONS .set(RwLock::new(sessions)) .expect("map is only set once"); + USER_GROUPS + .set(RwLock::new(UserGroups::from(user_groups))) + .expect("Unable to create UserGroups map from storage"); } // A session is loosly active mapping to permissions @@ -203,6 +225,33 @@ impl Sessions { context_user: Option<&str>, ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { + // if user is a part of any user groups, then add permissions + let perms: HashSet = + if !users().0.get(username).unwrap().user_groups.is_empty() { + let groups = users().0.get(username).unwrap().user_groups.clone(); + let all_groups_roles = groups + .iter() + .filter(|id| (read_user_groups().0.contains_key(*id))) + .map(|id| read_user_groups().0.get(id).unwrap().roles.clone()) + .reduce(|mut acc, e| { + acc.extend(e); + acc + }) + .unwrap_or_default(); + let mut privilege_list = Vec::new(); + all_groups_roles + .iter() + .filter_map(|role| roles().get(role).cloned()) + .for_each(|privileges| privilege_list.extend(privileges)); + + let mut perms = HashSet::from_iter(perms.clone()); + for privs in privilege_list { + perms.extend(RoleBuilder::from(&privs).build()) + } + perms + } else { + HashSet::from_iter(perms.clone()) + }; if perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize @@ -257,3 +306,26 @@ impl From> for Users { map } } + +// Map of [user group ID --> UserGroup] +// This map is populated at startup with the list of user groups from parseable.json file +#[derive(Debug, Default, Clone, derive_more::Deref, derive_more::DerefMut)] +pub struct UserGroups(HashMap); + +impl UserGroups { + pub fn insert(&mut self, user_group: UserGroup) { + self.0.insert(user_group.name.clone(), user_group); + } +} + +impl From> for UserGroups { + fn from(user_groups: Vec) -> Self { + let mut map = Self::default(); + map.extend( + user_groups + .into_iter() + .map(|group| (group.name.to_owned(), group)), + ); + map + } +} diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 28ead768b..cc7bb2326 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -29,7 +29,7 @@ use role::model::DefaultPrivilege; use serde::Serialize; use url::Url; -use crate::rbac::map::{mut_sessions, mut_users, sessions, users}; +use crate::rbac::map::{mut_sessions, mut_users, read_user_groups, roles, sessions, users}; use crate::rbac::role::Action; use crate::rbac::user::User; @@ -54,6 +54,13 @@ impl Users { mut_users().insert(user); } + pub fn get_user_groups(&self, username: &str) -> HashSet { + users() + .get(username) + .map(|user| user.user_groups.clone()) + .unwrap_or_default() + } + pub fn get_user(&self, username: &str) -> Option { users().get(username).cloned() } @@ -90,9 +97,17 @@ impl Users { }; } - pub fn put_role(&self, username: &str, roles: HashSet) { + pub fn add_roles(&self, username: &str, roles: HashSet) { + if let Some(user) = mut_users().get_mut(username) { + user.roles.extend(roles); + mut_sessions().remove_user(username) + }; + } + + pub fn remove_roles(&self, username: &str, roles: HashSet) { if let Some(user) = mut_users().get_mut(username) { - user.roles = roles; + let diff = HashSet::from_iter(user.roles.difference(&roles).cloned()); + user.roles = diff; mut_sessions().remove_user(username) }; } @@ -102,7 +117,23 @@ impl Users { } pub fn get_permissions(&self, session: &SessionKey) -> Vec { - sessions().get(session).cloned().unwrap_or_default() + let mut permissions = sessions().get(session).cloned().unwrap_or_default(); + + let username = self.get_username_from_session(session).unwrap(); + let user_groups = self.get_user_groups(&username); + for group in user_groups { + if let Some(group) = read_user_groups().get(&group) { + let group_roles = &group.roles; + for role in group_roles { + if let Some(privelege_list) = roles().get(role) { + for privelege in privelege_list { + permissions.extend(RoleBuilder::from(privelege).build()); + } + } + } + } + } + permissions.into_iter().collect_vec() } pub fn session_exists(&self, session: &SessionKey) -> bool { @@ -174,6 +205,7 @@ impl Users { /// /// TODO: rename this after deprecating the older struct #[derive(Debug, Serialize, Clone)] +#[serde(rename = "camelCase")] pub struct UsersPrism { // username pub id: String, @@ -183,8 +215,12 @@ pub struct UsersPrism { pub email: Option, // picture only if oauth pub picture: Option, - // roles for the user + // roles given directly to the user pub roles: HashMap>, + // roles inherited by the user from their usergroups + pub group_roles: HashMap>>, + // user groups + pub user_groups: HashSet, } fn roles_to_permission(roles: Vec) -> Vec { diff --git a/src/rbac/role.rs b/src/rbac/role.rs index f0546aaaa..c9ba138ff 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -20,6 +20,10 @@ // Represents actions that corresponds to an api #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Action { + CreateUserGroup, + GetUserGroup, + ModifyUserGroup, + DeleteUserGroup, Ingest, Query, CreateStream, @@ -47,10 +51,10 @@ pub enum Action { DeleteRole, ListRole, GetAbout, - QueryLLM, AddLLM, DeleteLLM, GetLLM, + QueryLLM, ListLLM, ListCluster, ListClusterMetrics, @@ -78,6 +82,7 @@ pub enum Permission { Unit(Action), Stream(Action, String), StreamWithTag(Action, String, Option), + Resource(Action, Option, Option), SelfUser, } @@ -87,6 +92,8 @@ pub struct RoleBuilder { actions: Vec, stream: Option, tag: Option, + resource_id: Option, + resource_type: Option, } // R x P @@ -101,6 +108,12 @@ impl RoleBuilder { self } + pub fn with_resource(mut self, resource_id: String, resource_type: String) -> Self { + self.resource_id = Some(resource_id); + self.resource_type = Some(resource_type); + self + } + pub fn build(self) -> Vec { let mut perms = Vec::new(); for action in self.actions { @@ -118,11 +131,6 @@ impl RoleBuilder { | Action::GetUserRoles | Action::DeleteUser | Action::GetAbout - | Action::QueryLLM - | Action::AddLLM - | Action::DeleteLLM - | Action::GetLLM - | Action::ListLLM | Action::PutRole | Action::GetRole | Action::DeleteRole @@ -151,7 +159,20 @@ impl RoleBuilder { | Action::PutAlert | Action::GetAlert | Action::DeleteAlert + | Action::CreateUserGroup + | Action::GetUserGroup + | Action::DeleteUserGroup + | Action::ModifyUserGroup | Action::GetAnalytics => Permission::Unit(action), + Action::QueryLLM + | Action::AddLLM + | Action::DeleteLLM + | Action::GetLLM + | Action::ListLLM => Permission::Resource( + action, + self.resource_type.clone(), + self.resource_id.clone(), + ), Action::Ingest | Action::ListStream | Action::GetSchema @@ -179,9 +200,20 @@ pub mod model { pub enum DefaultPrivilege { Admin, Editor, - Writer { stream: String }, - Ingestor { stream: String }, - Reader { stream: String, tag: Option }, + Writer { + stream: String, + }, + Ingestor { + stream: String, + }, + Reader { + stream: String, + tag: Option, + }, + Resource { + resource_id: String, + resource_type: String, + }, } impl From<&DefaultPrivilege> for RoleBuilder { @@ -202,6 +234,11 @@ pub mod model { DefaultPrivilege::Ingestor { stream } => { ingest_perm_builder().with_stream(stream.to_owned()) } + DefaultPrivilege::Resource { + resource_id, + resource_type, + } => resource_perm_builder() + .with_resource(resource_id.clone(), resource_type.clone()), } } } @@ -211,6 +248,8 @@ pub mod model { actions: vec![Action::All], stream: Some("*".to_string()), tag: None, + resource_type: Some("*".to_string()), + resource_id: Some("*".to_string()), } } @@ -241,11 +280,11 @@ pub mod model { Action::PutAlert, Action::GetAlert, Action::DeleteAlert, - Action::QueryLLM, - Action::GetLLM, - Action::ListLLM, Action::AddLLM, Action::DeleteLLM, + Action::GetLLM, + Action::QueryLLM, + Action::ListLLM, Action::CreateFilter, Action::ListFilter, Action::GetFilter, @@ -258,6 +297,8 @@ pub mod model { ], stream: Some("*".to_string()), tag: None, + resource_id: None, + resource_type: None, } } @@ -287,11 +328,9 @@ pub mod model { Action::CreateDashboard, Action::DeleteDashboard, Action::Ingest, - Action::QueryLLM, Action::GetLLM, + Action::QueryLLM, Action::ListLLM, - Action::AddLLM, - Action::DeleteLLM, Action::GetStreamInfo, Action::GetFilter, Action::ListFilter, @@ -301,6 +340,8 @@ pub mod model { ], stream: None, tag: None, + resource_id: None, + resource_type: None, } } @@ -313,11 +354,9 @@ pub mod model { Action::ListStream, Action::GetSchema, Action::GetStats, - Action::QueryLLM, Action::GetLLM, + Action::QueryLLM, Action::ListLLM, - Action::AddLLM, - Action::DeleteLLM, Action::ListFilter, Action::GetFilter, Action::CreateFilter, @@ -337,6 +376,18 @@ pub mod model { ], stream: None, tag: None, + resource_id: None, + resource_type: None, + } + } + + fn resource_perm_builder() -> RoleBuilder { + RoleBuilder { + actions: vec![Action::GetLLM, Action::ListLLM, Action::QueryLLM], + stream: None, + tag: None, + resource_id: None, + resource_type: None, } } @@ -345,6 +396,8 @@ pub mod model { actions: vec![Action::Ingest], stream: None, tag: None, + resource_id: None, + resource_type: None, } } } diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 45fc50d79..6dc5070e5 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -25,7 +25,14 @@ use argon2::{ use rand::distributions::{Alphanumeric, DistString}; -use crate::parseable::PARSEABLE; +use crate::{ + handlers::http::{ + modal::utils::rbac_utils::{get_metadata, put_metadata}, + rbac::{InvalidUserGroupError, RBACError}, + }, + parseable::PARSEABLE, + rbac::map::{mut_users, read_user_groups, roles, users}, +}; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(untagged)] @@ -39,6 +46,7 @@ pub struct User { #[serde(flatten)] pub ty: UserType, pub roles: HashSet, + pub user_groups: HashSet, } impl User { @@ -52,6 +60,7 @@ impl User { password_hash: hash, }), roles: HashSet::new(), + user_groups: HashSet::new(), }, password, ) @@ -64,6 +73,7 @@ impl User { user_info, }), roles, + user_groups: HashSet::new(), } } @@ -147,6 +157,7 @@ pub fn get_admin_user() -> User { password_hash: hashcode, }), roles: ["admin".to_string()].into(), + user_groups: HashSet::new(), } } @@ -185,3 +196,153 @@ impl From for UserInfo { } } } + +/// Logically speaking, UserGroup is a collection of roles and is applied to a collection of users. +/// +/// The users present in a group inherit all the roles present in the group for as long as they are a part of the group. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct UserGroup { + pub name: String, + // #[serde(default = "crate::utils::uid::gen")] + // pub id: Ulid, + pub roles: HashSet, + pub users: HashSet, +} + +fn is_valid_group_name(name: &str) -> bool { + let re = regex::Regex::new(r"^[A-Za-z0-9_-]+$").unwrap(); + re.is_match(name) +} + +impl UserGroup { + pub fn validate(&self) -> Result<(), RBACError> { + let valid_name = is_valid_group_name(&self.name); + + if read_user_groups().contains_key(&self.name) { + return Err(RBACError::UserGroupExists(self.name.clone())); + } + let mut non_existant_roles = Vec::new(); + if !self.roles.is_empty() { + // validate that the roles exist + for role in &self.roles { + if !roles().contains_key(role) { + non_existant_roles.push(role.clone()); + } + } + } + let mut non_existant_users = Vec::new(); + if !self.users.is_empty() { + // validate that the users exist + for user in &self.users { + if !users().contains_key(user) { + non_existant_users.push(user.clone()); + } + } + } + + if !non_existant_roles.is_empty() || !non_existant_users.is_empty() || !valid_name { + let comments = if !valid_name { + "The name should follow this regex- `^[A-Za-z0-9_-]+$`".to_string() + } else { + "".to_string() + }; + Err(RBACError::InvalidUserGroupRequest(Box::new( + InvalidUserGroupError { + valid_name, + non_existant_roles, + non_existant_users, + roles_not_in_group: vec![], + users_not_in_group: vec![], + comments, + }, + ))) + } else { + Ok(()) + } + } + pub fn new(name: String, roles: HashSet, users: HashSet) -> Self { + UserGroup { name, roles, users } + } + + pub async fn add_roles(&mut self, roles: HashSet) -> Result<(), RBACError> { + self.roles.extend(roles.clone()); + let new_roles = self.roles.clone(); + let mut metadata = get_metadata().await?; + metadata + .user_groups + .iter_mut() + .filter(|ug| ug.name == self.name) + .map(|ug| ug.roles.clone_from(&new_roles)) + .for_each(drop); + put_metadata(&metadata).await?; + Ok(()) + } + + pub async fn add_users(&mut self, users: HashSet) -> Result<(), RBACError> { + // ensure that the users add the user group to their map + let mut metadata = get_metadata().await?; + + users + .iter() + .map(|user| { + if let Some(user) = mut_users().get_mut(user) { + user.user_groups.insert(self.name.clone()); + metadata.users.retain(|u| u.username() != user.username()); + metadata.users.push(user.clone()); + } + }) + .for_each(drop); + put_metadata(&metadata).await?; + self.users.extend(users); + Ok(()) + } + + pub fn remove_roles(&mut self, roles: HashSet) -> Result<(), RBACError> { + let old_roles = &self.roles; + let new_roles = HashSet::from_iter(self.roles.difference(&roles).cloned()); + + if old_roles.eq(&new_roles) { + return Ok(()); + } + self.roles.clone_from(&new_roles); + + Ok(()) + } + + pub fn remove_users(&mut self, users: HashSet) -> Result<(), RBACError> { + let old_users = &self.users; + let new_users = HashSet::from_iter(self.users.difference(&users).cloned()); + + if old_users.eq(&new_users) { + return Ok(()); + } + self.users.clone_from(&new_users); + + Ok(()) + } + + pub async fn update_in_metadata(&self) -> Result<(), RBACError> { + let mut metadata = get_metadata().await?; + metadata.user_groups.retain(|x| x.name != self.name); + metadata.user_groups.push(self.clone()); + put_metadata(&metadata).await?; + Ok(()) + } + + // // are these methods even needed?? + // pub fn group_name(&self) -> String { + // self.name.clone() + // } + + // pub fn group_id(&self) -> Ulid { + // self.id + // } + + // pub fn group_roles(&self) -> HashSet { + // self.roles.clone() + // } + + // pub fn group_users(&self) -> HashSet { + // self.users.clone() + // } +} diff --git a/src/rbac/utils.rs b/src/rbac/utils.rs index a52d03868..df9dd52bf 100644 --- a/src/rbac/utils.rs +++ b/src/rbac/utils.rs @@ -15,11 +15,11 @@ * along with this program. If not, see . * */ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use url::Url; -use crate::parseable::PARSEABLE; +use crate::{parseable::PARSEABLE, rbac::map::read_user_groups}; use super::{ map::roles, @@ -38,7 +38,7 @@ pub fn to_prism_user(user: &User) -> UsersPrism { oauth.user_info.picture.clone(), ), }; - let roles: HashMap> = Users + let direct_roles: HashMap> = Users .get_role(id) .iter() .filter_map(|role_name| { @@ -48,12 +48,33 @@ pub fn to_prism_user(user: &User) -> UsersPrism { }) .collect(); + let mut group_roles: HashMap>> = HashMap::new(); + let mut user_groups = HashSet::new(); + // user might be part of some user groups, fetch the roles from there as well + for user_group in Users.get_user_groups(user.username()) { + if let Some(group) = read_user_groups().get(&user_group) { + let ug_roles: HashMap> = group + .roles + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + group_roles.insert(group.name.clone(), ug_roles); + user_groups.insert(user_group); + } + } + UsersPrism { id: id.into(), method: method.into(), email: mask_pii_string(email), picture: mask_pii_url(picture), - roles, + roles: direct_roles, + group_roles, + user_groups, } } diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 93a3ca76c..31f5283b4 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -30,7 +30,10 @@ use std::io; use crate::{ option::Mode, parseable::{JOIN_COMMUNITY, PARSEABLE}, - rbac::{role::model::DefaultPrivilege, user::User}, + rbac::{ + role::model::DefaultPrivilege, + user::{User, UserGroup}, + }, storage::ObjectStorageError, utils::uid, }; @@ -39,7 +42,7 @@ use super::PARSEABLE_METADATA_FILE_NAME; // Expose some static variables for internal usage pub static STORAGE_METADATA: OnceCell = OnceCell::new(); -pub const CURRENT_STORAGE_METADATA_VERSION: &str = "v4"; +pub const CURRENT_STORAGE_METADATA_VERSION: &str = "v6"; // For use in global static #[derive(Debug, PartialEq, Eq)] pub struct StaticStorageMetadata { @@ -57,6 +60,7 @@ pub struct StorageMetadata { #[serde(default = "crate::utils::uid::gen")] pub deployment_id: uid::Uid, pub users: Vec, + pub user_groups: Vec, pub streams: Vec, pub server_mode: Mode, #[serde(default)] @@ -75,6 +79,7 @@ impl Default for StorageMetadata { deployment_id: uid::gen(), server_mode: PARSEABLE.options.mode, users: Vec::new(), + user_groups: Vec::new(), streams: Vec::new(), roles: HashMap::default(), default_role: None, @@ -86,7 +91,7 @@ impl StorageMetadata { pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() - .expect("gloabal static is initialized") + .expect("global static is initialized") } pub fn set_global(self) { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 71c07f5dc..a606b585a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -93,9 +93,8 @@ pub async fn user_auth_for_query( session_key: &SessionKey, query: &str, ) -> Result<(), actix_web::error::Error> { - let tables = get_tables_from_query(query).await?; + let tables = get_tables_from_query(query).await?.into_inner(); let permissions = Users.get_permissions(session_key); - let tables = tables.into_inner(); user_auth_for_datasets(&permissions, &tables) } From 90d1ed3f43ea667cfe6ad4e34ffbc1107d48bfb6 Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 4 Jul 2025 16:54:30 +0530 Subject: [PATCH 2/9] updates: coderabbit suggestions --- src/handlers/http/modal/query/querier_rbac.rs | 83 ------------------- src/handlers/http/rbac.rs | 32 ------- src/rbac/map.rs | 52 ++++++------ 3 files changed, 26 insertions(+), 141 deletions(-) diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index 59d4bee99..d09d87adf 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -40,55 +40,6 @@ use crate::{ // async aware lock for updating storage metadata and user map atomically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); -// // Handler for POST /api/v1/user/{username} -// // Creates a new user by username if it does not exists -// pub async fn post_user( -// username: web::Path, -// body: Option>, -// ) -> Result { -// let username = username.into_inner(); - -// let mut metadata = get_metadata().await?; - -// validator::user_name(&username)?; -// let roles: HashSet = if let Some(body) = body { -// serde_json::from_value(body.into_inner())? -// } else { -// return Err(RBACError::RoleValidationError); -// }; - -// if roles.is_empty() { -// return Err(RBACError::RoleValidationError); -// } -// let _ = UPDATE_LOCK.lock().await; -// if Users.contains(&username) -// || metadata -// .users -// .iter() -// .any(|user| user.username() == username) -// { -// return Err(RBACError::UserExists); -// } - -// let (user, password) = user::User::new_basic(username.clone()); - -// metadata.users.push(user.clone()); - -// put_metadata(&metadata).await?; -// let created_role = roles.clone(); -// Users.put_user(user.clone()); - -// sync_user_creation_with_ingestors(user, &Some(roles)).await?; - -// put_role( -// web::Path::::from(username.clone()), -// web::Json(created_role), -// ) -// .await?; - -// Ok(password) -// } - // Handler for POST /api/v1/user/{username} // Creates a new user by username if it does not exists pub async fn post_user( @@ -206,40 +157,6 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user -// // Put roles for given user -// pub async fn put_role( -// username: web::Path, -// role: web::Json>, -// ) -> Result { -// let username = username.into_inner(); -// let role = role.into_inner(); - -// if !Users.contains(&username) { -// return Err(RBACError::UserDoesNotExist); -// }; -// // update parseable.json first -// let mut metadata = get_metadata().await?; -// if let Some(user) = metadata -// .users -// .iter_mut() -// .find(|user| user.username() == username) -// { -// user.roles.clone_from(&role); -// } else { -// // should be unreachable given state is always consistent -// return Err(RBACError::UserDoesNotExist); -// } - -// put_metadata(&metadata).await?; -// // update in mem table -// Users.put_role(&username.clone(), role.clone()); - -// sync_users_with_roles_with_ingestors(&username, &role).await?; - -// Ok(format!("Roles updated successfully for {username}")) -// } - // Handler PATCH /user/{username}/role/add => Add roles to a user pub async fn add_roles_to_user( username: web::Path, diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index a7e43fc7a..fc735504f 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -278,38 +278,6 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user -// // Put roles for given user -// pub async fn put_role( -// username: web::Path, -// role: web::Json>, -// ) -> Result { -// let username = username.into_inner(); -// let role = role.into_inner(); - -// if !Users.contains(&username) { -// return Err(RBACError::UserDoesNotExist); -// }; -// // update parseable.json first -// let mut metadata = get_metadata().await?; -// if let Some(user) = metadata -// .users -// .iter_mut() -// .find(|user| user.username() == username) -// { -// user.roles.clone_from(&role); -// } else { -// // should be unreachable given state is always consistent -// return Err(RBACError::UserDoesNotExist); -// } - -// put_metadata(&metadata).await?; -// // update in mem table -// Users.add_roles(&username.clone(), role.clone()); - -// Ok(format!("Roles updated successfully for {username}")) -// } - // Handler PATCH /user/{username}/role/add => Add roles to a user pub async fn add_roles_to_user( username: web::Path, diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 039b70602..d2992a02b 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -226,32 +226,32 @@ impl Sessions { ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { // if user is a part of any user groups, then add permissions - let perms: HashSet = - if !users().0.get(username).unwrap().user_groups.is_empty() { - let groups = users().0.get(username).unwrap().user_groups.clone(); - let all_groups_roles = groups - .iter() - .filter(|id| (read_user_groups().0.contains_key(*id))) - .map(|id| read_user_groups().0.get(id).unwrap().roles.clone()) - .reduce(|mut acc, e| { - acc.extend(e); - acc - }) - .unwrap_or_default(); - let mut privilege_list = Vec::new(); - all_groups_roles - .iter() - .filter_map(|role| roles().get(role).cloned()) - .for_each(|privileges| privilege_list.extend(privileges)); - - let mut perms = HashSet::from_iter(perms.clone()); - for privs in privilege_list { - perms.extend(RoleBuilder::from(&privs).build()) - } - perms - } else { - HashSet::from_iter(perms.clone()) - }; + let perms: HashSet = if let Some(user) = users().0.get(username) { + let all_groups_roles = user + .user_groups + .iter() + .filter(|id| (read_user_groups().0.contains_key(*id))) + .map(|id| read_user_groups().0.get(id).unwrap().roles.clone()) + .reduce(|mut acc, e| { + acc.extend(e); + acc + }) + .unwrap_or_default(); + + let mut privilege_list = Vec::new(); + all_groups_roles + .iter() + .filter_map(|role| roles().get(role).cloned()) + .for_each(|privileges| privilege_list.extend(privileges)); + + let mut perms = HashSet::from_iter(perms.clone()); + for privs in privilege_list { + perms.extend(RoleBuilder::from(&privs).build()) + } + perms + } else { + HashSet::from_iter(perms.clone()) + }; if perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize From 4d3fa6a2490cb5fbac1cb11db481a3af2e958503 Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 4 Jul 2025 18:31:39 +0530 Subject: [PATCH 3/9] feat: Introduce resources - Introduce resource types (stream, llm, all) for privileges --- src/handlers/http/modal/query/querier_rbac.rs | 18 +- src/handlers/http/rbac.rs | 18 +- src/migration/mod.rs | 1 - src/rbac/map.rs | 29 +-- src/rbac/role.rs | 169 +++++++++--------- src/utils/mod.rs | 15 +- 6 files changed, 115 insertions(+), 135 deletions(-) diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index d09d87adf..8b0f81551 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -31,7 +31,7 @@ use crate::{ rbac::RBACError, }, rbac::{ - map::{mut_users, roles, write_user_groups}, + map::{roles, write_user_groups}, user, Users, }, validator, @@ -127,22 +127,6 @@ pub async fn delete_user(username: web::Path) -> Result) -> Result, + context_resource: Option<&str>, context_user: Option<&str>, ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { @@ -256,15 +257,23 @@ impl Sessions { match *user_perm { // if any action is ALL then we we authorize Permission::Unit(action) => action == required_action || action == Action::All, - Permission::Stream(action, ref stream) - | Permission::StreamWithTag(action, ref stream, _) => { - let ok_stream = if let Some(context_stream) = context_stream { - stream == context_stream || stream == "*" - } else { - // if no stream to match then stream check is not needed - true - }; - (action == required_action || action == Action::All) && ok_stream + Permission::Resource(action, ref resource_type) => { + match resource_type { + ParseableResourceType::Stream(resource_id) + | ParseableResourceType::Llm(resource_id) => { + let ok_resource = if let Some(context_resource_id) = context_resource { + resource_id == context_resource_id || resource_id == "*" + } else { + // if no resource to match then resource check is not needed + // WHEN IS THIS VALID?? + true + }; + (action == required_action || action == Action::All) && ok_resource + } + ParseableResourceType::All => { + action == required_action || action == Action::All + }, + } } Permission::SelfUser if required_action == Action::GetUserRoles => { context_user.map(|x| x == username).unwrap_or_default() diff --git a/src/rbac/role.rs b/src/rbac/role.rs index c9ba138ff..3d3fd0bf8 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -77,12 +77,22 @@ pub enum Action { PutCorrelation, } +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub enum ParseableResourceType { + #[serde(rename="stream")] + Stream(String), + #[serde(rename="llm")] + Llm(String), + #[serde(rename="all")] + All +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Permission { Unit(Action), - Stream(Action, String), - StreamWithTag(Action, String, Option), - Resource(Action, Option, Option), + // Stream(Action, String), + // StreamWithTag(Action, String, Option), + Resource(Action, ParseableResourceType), SelfUser, } @@ -90,39 +100,44 @@ pub enum Permission { #[derive(Debug, Default)] pub struct RoleBuilder { actions: Vec, - stream: Option, - tag: Option, - resource_id: Option, - resource_type: Option, + // stream: Option, + // tag: Option, + // resource_id: Option, + resource_type: Option, } // R x P impl RoleBuilder { - pub fn with_stream(mut self, stream: String) -> Self { - self.stream = Some(stream); + pub fn with_resource( + mut self, + resource_type: ParseableResourceType, + // resource_id: String, + ) -> Self { + self.resource_type = Some(resource_type); + // self.resource_id = Some(resource_id); self } - pub fn with_tag(mut self, tag: String) -> Self { - self.tag = Some(tag); - self - } + // pub fn with_stream(mut self, stream: String) -> Self { + // self.stream = Some(stream); + // self + // } - pub fn with_resource(mut self, resource_id: String, resource_type: String) -> Self { - self.resource_id = Some(resource_id); - self.resource_type = Some(resource_type); - self - } + // pub fn with_tag(mut self, tag: String) -> Self { + // self.tag = Some(tag); + // self + // } + + // pub fn with_resource(mut self, resource_id: String, resource_type: ParseableResourceType) -> Self { + // self.resource_id = Some(resource_id); + // self.resource_type = Some(resource_type); + // self + // } pub fn build(self) -> Vec { let mut perms = Vec::new(); for action in self.actions { let perm = match action { - Action::Query => Permission::StreamWithTag( - action, - self.stream.clone().unwrap(), - self.tag.clone(), - ), Action::Login | Action::Metrics | Action::PutUser @@ -164,23 +179,24 @@ impl RoleBuilder { | Action::DeleteUserGroup | Action::ModifyUserGroup | Action::GetAnalytics => Permission::Unit(action), - Action::QueryLLM + Action::Query + | Action::QueryLLM | Action::AddLLM | Action::DeleteLLM | Action::GetLLM - | Action::ListLLM => Permission::Resource( - action, - self.resource_type.clone(), - self.resource_id.clone(), - ), - Action::Ingest + | Action::ListLLM + | Action::Ingest | Action::ListStream | Action::GetSchema | Action::DetectSchema | Action::GetStats | Action::GetRetention | Action::PutRetention - | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), + | Action::All => Permission::Resource( + action, + self.resource_type.clone().unwrap(), + // self.resource_id.clone().unwrap(), + ), }; perms.push(perm); } @@ -193,26 +209,25 @@ impl RoleBuilder { // we can put same model in the backend // user -> Vec pub mod model { + use crate::rbac::role::ParseableResourceType; + use super::{Action, RoleBuilder}; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Hash)] - #[serde(tag = "privilege", content = "resource", rename_all = "lowercase")] + #[serde(tag = "privilege", rename_all = "lowercase")] pub enum DefaultPrivilege { Admin, Editor, Writer { - stream: String, + resource: ParseableResourceType, + // resource_id: String, }, Ingestor { - stream: String, + resource: ParseableResourceType, }, Reader { - stream: String, - tag: Option, - }, - Resource { - resource_id: String, - resource_type: String, + resource: ParseableResourceType, + // resource_id: String, }, } @@ -221,24 +236,18 @@ pub mod model { match value { DefaultPrivilege::Admin => admin_perm_builder(), DefaultPrivilege::Editor => editor_perm_builder(), - DefaultPrivilege::Writer { stream } => { - writer_perm_builder().with_stream(stream.to_owned()) - } - DefaultPrivilege::Reader { stream, tag } => { - let mut reader = reader_perm_builder().with_stream(stream.to_owned()); - if let Some(tag) = tag { - reader = reader.with_tag(tag.to_owned()) - } - reader - } - DefaultPrivilege::Ingestor { stream } => { - ingest_perm_builder().with_stream(stream.to_owned()) - } - DefaultPrivilege::Resource { - resource_id, - resource_type, - } => resource_perm_builder() - .with_resource(resource_id.clone(), resource_type.clone()), + DefaultPrivilege::Writer { + resource, + // resource_id, + } => writer_perm_builder() + .with_resource(resource.to_owned()), + DefaultPrivilege::Reader { + resource, + // resource_id, + } => reader_perm_builder() + .with_resource(resource.to_owned()), + DefaultPrivilege::Ingestor { resource } => ingest_perm_builder() + .with_resource(resource.to_owned()), } } } @@ -246,10 +255,10 @@ pub mod model { fn admin_perm_builder() -> RoleBuilder { RoleBuilder { actions: vec![Action::All], - stream: Some("*".to_string()), - tag: None, - resource_type: Some("*".to_string()), - resource_id: Some("*".to_string()), + // stream: Some("*".to_string()), + // tag: None, + resource_type: Some(ParseableResourceType::All), + // resource_id: Some("*".to_string()), } } @@ -295,10 +304,10 @@ pub mod model { Action::DeleteDashboard, Action::GetUserRoles, ], - stream: Some("*".to_string()), - tag: None, - resource_id: None, - resource_type: None, + // stream: Some("*".to_string()), + // tag: None, + // resource_id: Some("*".to_string()), + resource_type: Some(ParseableResourceType::All), } } @@ -338,9 +347,9 @@ pub mod model { Action::DeleteFilter, Action::GetUserRoles, ], - stream: None, - tag: None, - resource_id: None, + // stream: None, + // tag: None, + // resource_id: None, resource_type: None, } } @@ -374,19 +383,9 @@ pub mod model { Action::GetUserRoles, Action::GetAlert, ], - stream: None, - tag: None, - resource_id: None, - resource_type: None, - } - } - - fn resource_perm_builder() -> RoleBuilder { - RoleBuilder { - actions: vec![Action::GetLLM, Action::ListLLM, Action::QueryLLM], - stream: None, - tag: None, - resource_id: None, + // stream: None, + // tag: None, + // resource_id: None, resource_type: None, } } @@ -394,9 +393,9 @@ pub mod model { fn ingest_perm_builder() -> RoleBuilder { RoleBuilder { actions: vec![Action::Ingest], - stream: None, - tag: None, - resource_id: None, + // stream: None, + // tag: None, + // resource_id: None, resource_type: None, } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index a606b585a..fd0ec88e8 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -109,14 +109,19 @@ pub fn user_auth_for_datasets( // also while iterating add any filter tags for this stream for permission in permissions.iter() { match permission { - Permission::Stream(Action::All, _) => { + Permission::Resource(Action::All, _) => { authorized = true; break; } - Permission::StreamWithTag(Action::Query, ref stream, _) - if stream == table_name || stream == "*" => - { - authorized = true; + Permission::Resource(Action::Query, ref resource_type) => { + match resource_type { + crate::rbac::role::ParseableResourceType::Stream(stream) => { + if stream == table_name || stream == "*" { + authorized = true; + } + } + _ => {}, + } } _ => (), } From b8f13be141717010075dd1b3334fd51d11fb8784 Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 4 Jul 2025 19:35:13 +0530 Subject: [PATCH 4/9] updates for new privileges - roles don't need any migration - auth flow modified to account for resource type --- src/handlers/http/middleware.rs | 20 +++++++---- src/handlers/http/modal/ingest_server.rs | 10 +++--- src/handlers/http/modal/query_server.rs | 22 ++++++------- src/handlers/http/modal/server.rs | 42 ++++++++++++------------ src/handlers/http/rbac.rs | 3 ++ src/rbac/map.rs | 17 +++++----- src/rbac/role.rs | 19 +++++------ src/rbac/user.rs | 31 +++-------------- src/utils/mod.rs | 14 ++++---- 9 files changed, 82 insertions(+), 96 deletions(-) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 4c1e92f39..acb1ec645 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -59,7 +59,7 @@ pub struct CommonAttributes { pub trait RouteExt { fn authorize(self, action: Action) -> Self; - fn authorize_for_stream(self, action: Action) -> Self; + fn authorize_for_resource(self, action: Action) -> Self; fn authorize_for_user(self, action: Action) -> Self; } @@ -71,10 +71,10 @@ impl RouteExt for Route { }) } - fn authorize_for_stream(self, action: Action) -> Self { + fn authorize_for_resource(self, action: Action) -> Self { self.wrap(Auth { action, - method: auth_stream_context, + method: auth_resource_context, }) } @@ -182,18 +182,26 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result Result { let creds = extract_session_key(req); + let usergroup = req.match_info().get("usergroup"); + let llmid = req.match_info().get("llmid"); let mut stream = req.match_info().get("logstream"); - if stream.is_none() { + if let Some(usergroup) = usergroup { + creds.map(|key| Users.authorize(key, action, Some(usergroup), None)) + } else if let Some(llmid) = llmid { + creds.map(|key| Users.authorize(key, action, Some(llmid), None)) + } else if let Some(stream) = stream { + creds.map(|key| Users.authorize(key, action, Some(stream), None)) + } else { if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) { stream = Some(stream_name.to_str().unwrap()); } + creds.map(|key| Users.authorize(key, action, stream, None)) } - creds.map(|key| Users.authorize(key, action, stream, None)) } pub fn auth_user_context( diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 3057e3a83..1ecc5dff8 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -237,7 +237,7 @@ impl IngestServer { .route( web::post() .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .wrap(from_fn( resource_check::check_resource_utilization_middleware, @@ -255,7 +255,7 @@ impl IngestServer { .route( web::put() .to(ingestor_logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_resource(Action::CreateStream), ), ) .service( @@ -263,7 +263,7 @@ impl IngestServer { web::resource("/info").route( web::get() .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_resource(Action::GetStreamInfo), ), ) .service( @@ -271,7 +271,7 @@ impl IngestServer { web::resource("/stats").route( web::get() .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), + .authorize_for_resource(Action::GetStats), ), ) .service( @@ -279,7 +279,7 @@ impl IngestServer { web::resource("/cleanup").route( web::post() .to(ingestor_logstream::retention_cleanup) - .authorize_for_stream(Action::PutRetention), + .authorize_for_resource(Action::PutRetention), ), ), ), diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index d2f027535..335e29896 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -273,19 +273,19 @@ impl QueryServer { .route( web::put() .to(querier_logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_resource(Action::CreateStream), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(querier_ingest::post_event) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( web::delete() .to(querier_logstream::delete) - .authorize_for_stream(Action::DeleteStream), + .authorize_for_resource(Action::DeleteStream), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -294,7 +294,7 @@ impl QueryServer { web::resource("/info").route( web::get() .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_resource(Action::GetStreamInfo), ), ) .service( @@ -302,7 +302,7 @@ impl QueryServer { web::resource("/schema").route( web::get() .to(logstream::get_schema) - .authorize_for_stream(Action::GetSchema), + .authorize_for_resource(Action::GetSchema), ), ) .service( @@ -310,7 +310,7 @@ impl QueryServer { web::resource("/stats").route( web::get() .to(querier_logstream::get_stats) - .authorize_for_stream(Action::GetStats), + .authorize_for_resource(Action::GetStats), ), ) .service( @@ -319,13 +319,13 @@ impl QueryServer { .route( web::put() .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), + .authorize_for_resource(Action::PutRetention), ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route( web::get() .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetRetention), ), ) .service( @@ -334,17 +334,17 @@ impl QueryServer { .route( web::put() .to(logstream::put_stream_hot_tier) - .authorize_for_stream(Action::PutHotTierEnabled), + .authorize_for_resource(Action::PutHotTierEnabled), ) .route( web::get() .to(logstream::get_stream_hot_tier) - .authorize_for_stream(Action::GetHotTierEnabled), + .authorize_for_resource(Action::GetHotTierEnabled), ) .route( web::delete() .to(logstream::delete_stream_hot_tier) - .authorize_for_stream(Action::DeleteHotTierEnabled), + .authorize_for_resource(Action::DeleteHotTierEnabled), ), ), ) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index a65915156..615af1d8e 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -182,9 +182,9 @@ impl Server { web::resource("/info").route( web::get() .to(http::prism_logstream::get_info) - .authorize_for_stream(Action::GetStreamInfo) - .authorize_for_stream(Action::GetStats) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetStreamInfo) + .authorize_for_resource(Action::GetStats) + .authorize_for_resource(Action::GetRetention), ), ), ) @@ -195,9 +195,9 @@ impl Server { "", web::post() .to(http::prism_logstream::post_datasets) - .authorize_for_stream(Action::GetStreamInfo) - .authorize_for_stream(Action::GetStats) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetStreamInfo) + .authorize_for_resource(Action::GetStats) + .authorize_for_resource(Action::GetRetention), ) } @@ -408,13 +408,13 @@ impl Server { .route( web::put() .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_resource(Action::CreateStream), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(ingest::post_event) - .authorize_for_stream(Action::Ingest) + .authorize_for_resource(Action::Ingest) .wrap(from_fn( resource_check::check_resource_utilization_middleware, )), @@ -423,7 +423,7 @@ impl Server { .route( web::delete() .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), + .authorize_for_resource(Action::DeleteStream), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -432,7 +432,7 @@ impl Server { web::resource("/info").route( web::get() .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_resource(Action::GetStreamInfo), ), ) .service( @@ -440,7 +440,7 @@ impl Server { web::resource("/schema").route( web::get() .to(logstream::get_schema) - .authorize_for_stream(Action::GetSchema), + .authorize_for_resource(Action::GetSchema), ), ) .service( @@ -448,7 +448,7 @@ impl Server { web::resource("/stats").route( web::get() .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), + .authorize_for_resource(Action::GetStats), ), ) .service( @@ -457,13 +457,13 @@ impl Server { .route( web::put() .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), + .authorize_for_resource(Action::PutRetention), ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route( web::get() .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetRetention), ), ) .service( @@ -472,17 +472,17 @@ impl Server { .route( web::put() .to(logstream::put_stream_hot_tier) - .authorize_for_stream(Action::PutHotTierEnabled), + .authorize_for_resource(Action::PutHotTierEnabled), ) .route( web::get() .to(logstream::get_stream_hot_tier) - .authorize_for_stream(Action::GetHotTierEnabled), + .authorize_for_resource(Action::GetHotTierEnabled), ) .route( web::delete() .to(logstream::delete_stream_hot_tier) - .authorize_for_stream(Action::DeleteHotTierEnabled), + .authorize_for_resource(Action::DeleteHotTierEnabled), ), ), ) @@ -494,7 +494,7 @@ impl Server { .route( web::post() .to(ingest::ingest) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) } @@ -507,7 +507,7 @@ impl Server { .route( web::post() .to(ingest::handle_otel_logs_ingestion) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -516,7 +516,7 @@ impl Server { .route( web::post() .to(ingest::handle_otel_metrics_ingestion) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -525,7 +525,7 @@ impl Server { .route( web::post() .to(ingest::handle_otel_traces_ingestion) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index bb92d6661..02d71611d 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -425,6 +425,8 @@ pub enum RBACError { InvalidSyncOperation(String), #[error("User group still being used by users: {0}")] UserGroupNotEmpty(String), + #[error("Resource in use: {0}")] + ResourceInUse(String), } impl actix_web::ResponseError for RBACError { @@ -445,6 +447,7 @@ impl actix_web::ResponseError for RBACError { Self::InvalidUserGroupRequest(_) => StatusCode::BAD_REQUEST, Self::InvalidSyncOperation(_) => StatusCode::BAD_REQUEST, Self::UserGroupNotEmpty(_) => StatusCode::BAD_REQUEST, + Self::ResourceInUse(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 49458abdc..2864716ae 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -261,18 +261,19 @@ impl Sessions { match resource_type { ParseableResourceType::Stream(resource_id) | ParseableResourceType::Llm(resource_id) => { - let ok_resource = if let Some(context_resource_id) = context_resource { - resource_id == context_resource_id || resource_id == "*" - } else { - // if no resource to match then resource check is not needed - // WHEN IS THIS VALID?? - true - }; + let ok_resource = + if let Some(context_resource_id) = context_resource { + resource_id == context_resource_id || resource_id == "*" + } else { + // if no resource to match then resource check is not needed + // WHEN IS THIS VALID?? + true + }; (action == required_action || action == Action::All) && ok_resource } ParseableResourceType::All => { action == required_action || action == Action::All - }, + } } } Permission::SelfUser if required_action == Action::GetUserRoles => { diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 3d3fd0bf8..b14b5dc2a 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -79,12 +79,12 @@ pub enum Action { #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum ParseableResourceType { - #[serde(rename="stream")] + #[serde(rename = "stream")] Stream(String), - #[serde(rename="llm")] + #[serde(rename = "llm")] Llm(String), - #[serde(rename="all")] - All + #[serde(rename = "all")] + All, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -239,15 +239,14 @@ pub mod model { DefaultPrivilege::Writer { resource, // resource_id, - } => writer_perm_builder() - .with_resource(resource.to_owned()), + } => writer_perm_builder().with_resource(resource.to_owned()), DefaultPrivilege::Reader { resource, // resource_id, - } => reader_perm_builder() - .with_resource(resource.to_owned()), - DefaultPrivilege::Ingestor { resource } => ingest_perm_builder() - .with_resource(resource.to_owned()), + } => reader_perm_builder().with_resource(resource.to_owned()), + DefaultPrivilege::Ingestor { resource } => { + ingest_perm_builder().with_resource(resource.to_owned()) + } } } } diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 6dc5070e5..0a3aaaddb 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -31,7 +31,7 @@ use crate::{ rbac::{InvalidUserGroupError, RBACError}, }, parseable::PARSEABLE, - rbac::map::{mut_users, read_user_groups, roles, users}, + rbac::map::{read_user_groups, roles, users}, }; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -264,35 +264,12 @@ impl UserGroup { UserGroup { name, roles, users } } - pub async fn add_roles(&mut self, roles: HashSet) -> Result<(), RBACError> { - self.roles.extend(roles.clone()); - let new_roles = self.roles.clone(); - let mut metadata = get_metadata().await?; - metadata - .user_groups - .iter_mut() - .filter(|ug| ug.name == self.name) - .map(|ug| ug.roles.clone_from(&new_roles)) - .for_each(drop); - put_metadata(&metadata).await?; + pub fn add_roles(&mut self, roles: HashSet) -> Result<(), RBACError> { + self.roles.extend(roles); Ok(()) } - pub async fn add_users(&mut self, users: HashSet) -> Result<(), RBACError> { - // ensure that the users add the user group to their map - let mut metadata = get_metadata().await?; - - users - .iter() - .map(|user| { - if let Some(user) = mut_users().get_mut(user) { - user.user_groups.insert(self.name.clone()); - metadata.users.retain(|u| u.username() != user.username()); - metadata.users.push(user.clone()); - } - }) - .for_each(drop); - put_metadata(&metadata).await?; + pub fn add_users(&mut self, users: HashSet) -> Result<(), RBACError> { self.users.extend(users); Ok(()) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index fd0ec88e8..e4edb0692 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -113,14 +113,12 @@ pub fn user_auth_for_datasets( authorized = true; break; } - Permission::Resource(Action::Query, ref resource_type) => { - match resource_type { - crate::rbac::role::ParseableResourceType::Stream(stream) => { - if stream == table_name || stream == "*" { - authorized = true; - } - } - _ => {}, + Permission::Resource( + Action::Query, + crate::rbac::role::ParseableResourceType::Stream(stream), + ) => { + if stream == table_name || stream == "*" { + authorized = true; } } _ => (), From 7d8c2591ac47d4f8cb9f83a34231ac6b980bea91 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 7 Jul 2025 18:36:05 +0530 Subject: [PATCH 5/9] update: migrate from v4 to v6 --- src/migration/metadata_migration.rs | 14 ++++++++++++++ src/migration/mod.rs | 9 +++++++++ 2 files changed, 23 insertions(+) diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index 7d7a266c0..c0c2c42af 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -181,6 +181,20 @@ pub fn v5_v6(mut storage_metadata: JsonValue) -> JsonValue { } } + if let Some(JsonValue::Object(roles)) = metadata.get_mut("roles") { + for (_, role_permissions) in roles.iter_mut() { + if let JsonValue::Array(permissions) = role_permissions { + for permission in permissions.iter_mut() { + if let JsonValue::Object(perm_obj) = permission { + if let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") { + resource.remove("tag"); + } + } + } + } + } + } + storage_metadata } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index d109206d0..745824124 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -132,6 +132,15 @@ pub async fn run_metadata_migration( let metadata = metadata_migration::v3_v4(staging_metadata); put_staging_metadata(config, &metadata)?; } + Some("v4") => { + let metadata = metadata_migration::v4_v5(staging_metadata); + let metadata = metadata_migration::v5_v6(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v5") => { + let metadata = metadata_migration::v5_v6(staging_metadata); + put_staging_metadata(config, &metadata)?; + } _ => (), } } From 601b6cf3901008c0663e6449279748aabc866fc8 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 7 Jul 2025 21:28:43 +0530 Subject: [PATCH 6/9] Updates: coderabbit suggestions --- .../http/modal/ingest/ingestor_rbac.rs | 70 +++++++++-------- src/handlers/http/rbac.rs | 77 +++++++------------ src/handlers/http/role.rs | 28 +++---- src/migration/mod.rs | 71 +++++++++-------- src/rbac/map.rs | 59 +++++++------- src/rbac/mod.rs | 5 +- src/rbac/role.rs | 77 +++---------------- src/rbac/user.rs | 14 ++-- 8 files changed, 171 insertions(+), 230 deletions(-) diff --git a/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs index dc93af60f..ee50c0fce 100644 --- a/src/handlers/http/modal/ingest/ingestor_rbac.rs +++ b/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -24,6 +24,7 @@ use tokio::sync::Mutex; use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError}, rbac::{ + map::roles, user::{self, User as ParseableUser}, Users, }, @@ -73,38 +74,6 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user -// // Put roles for given user -// pub async fn put_role( -// username: web::Path, -// role: web::Json>, -// ) -> Result { -// let username = username.into_inner(); -// let role = role.into_inner(); - -// if !Users.contains(&username) { -// return Err(RBACError::UserDoesNotExist); -// }; -// // update parseable.json first -// let mut metadata = get_metadata().await?; -// if let Some(user) = metadata -// .users -// .iter_mut() -// .find(|user| user.username() == username) -// { -// user.roles.clone_from(&role); -// } else { -// // should be unreachable given state is always consistent -// return Err(RBACError::UserDoesNotExist); -// } - -// let _ = storage::put_staging_metadata(&metadata); -// // update in mem table -// Users.add_roles(&username.clone(), role.clone()); - -// Ok(format!("Roles updated successfully for {username}")) -// } - // Handler PATCH /user/{username}/role/sync/add => Add roles to a user pub async fn add_roles_to_user( username: web::Path, @@ -116,6 +85,19 @@ pub async fn add_roles_to_user( if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + // check if all roles exist + let mut non_existent_roles = Vec::new(); + roles_to_add.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existent_roles.push(r.clone()); + } + }); + + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata @@ -147,6 +129,30 @@ pub async fn remove_roles_from_user( if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + // check if all roles exist + let mut non_existent_roles = Vec::new(); + roles_to_remove.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existent_roles.push(r.clone()); + } + }); + + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } + + // check that user actually has these roles + let user_roles: HashSet = HashSet::from_iter(Users.get_role(&username)); + let roles_not_with_user: HashSet = + HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); + + if !roles_not_with_user.is_empty() { + return Err(RBACError::RolesNotAssigned(Vec::from_iter( + roles_not_with_user, + ))); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 02d71611d..bc66c03b5 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -115,17 +115,17 @@ pub async fn post_user( if user_roles.is_empty() { return Err(RBACError::RoleValidationError); } else { - let mut non_existant_roles = Vec::new(); + let mut non_existent_roles = Vec::new(); user_roles .iter() .map(|r| { if !roles().contains_key(r) { - non_existant_roles.push(r.clone()); + non_existent_roles.push(r.clone()); } }) .for_each(drop); - if !non_existant_roles.is_empty() { - return Err(RBACError::RolesDoNotExist(non_existant_roles)); + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); } } let _ = UPDATE_LOCK.lock().await; @@ -237,24 +237,23 @@ pub async fn delete_user(username: web::Path) -> Result, - pub non_existant_users: Vec, + pub non_existent_roles: Vec, + pub non_existent_users: Vec, pub roles_not_in_group: Vec, pub users_not_in_group: Vec, pub comments: String, } -// impl Display for InvalidUserGroupRequestStruct { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// if !self.invalid_name { -// write!( -// f, -// "Invalid user group request- {{invalidName: {}\nnonExistantRoles: {:?}\nnonExistantUsers: {:?}\nThe name should follow this regex- ^[A-Za-z0-9_-]+$}}", -// self.invalid_name, self.non_existant_roles, self.non_existant_users -// ) -// } else { -// write!( -// f, -// "Invalid user group request- {{nonExistantRoles: {:?}\nnonExistantUsers: {:?}}}", -// self.non_existant_roles, self.non_existant_users -// ) -// } -// } -// } - #[derive(Debug, thiserror::Error)] pub enum RBACError { #[error("User exists already")] @@ -461,7 +442,7 @@ impl actix_web::ResponseError for RBACError { RBACError::RolesDoNotExist(obj) => actix_web::HttpResponse::build(self.status_code()) .insert_header(ContentType::plaintext()) .json(json!({ - "non_existant_roles": obj + "non_existent_roles": obj })), RBACError::InvalidUserGroupRequest(obj) => { actix_web::HttpResponse::build(self.status_code()) diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index 7e7e303a5..cdd70719e 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -16,8 +16,6 @@ * */ -use std::collections::HashSet; - use actix_web::{ http::header::ContentType, web::{self, Json}, @@ -103,23 +101,21 @@ pub async fn delete(name: web::Path) -> Result Option<&str> { + metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()) +} + /// Migrate the metdata from v1 or v2 to v3 /// This is a one time migration pub async fn run_metadata_migration( @@ -55,13 +62,6 @@ pub async fn run_metadata_migration( } let staging_metadata = get_staging_metadata(config)?; - fn get_version(metadata: &serde_json::Value) -> Option<&str> { - metadata - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()) - } - // if storage metadata is none do nothing if let Some(storage_metadata) = storage_metadata { match get_version(&storage_metadata) { @@ -117,37 +117,42 @@ pub async fn run_metadata_migration( // if staging metadata is none do nothing if let Some(staging_metadata) = staging_metadata { - match get_version(&staging_metadata) { - Some("v1") => { - let mut metadata = metadata_migration::v1_v3(staging_metadata); - metadata = metadata_migration::v3_v4(metadata); - put_staging_metadata(config, &metadata)?; - } - Some("v2") => { - let mut metadata = metadata_migration::v2_v3(staging_metadata); - metadata = metadata_migration::v3_v4(metadata); - put_staging_metadata(config, &metadata)?; - } - Some("v3") => { - let metadata = metadata_migration::v3_v4(staging_metadata); - put_staging_metadata(config, &metadata)?; - } - Some("v4") => { - let metadata = metadata_migration::v4_v5(staging_metadata); - let metadata = metadata_migration::v5_v6(metadata); - put_staging_metadata(config, &metadata)?; - } - Some("v5") => { - let metadata = metadata_migration::v5_v6(staging_metadata); - put_staging_metadata(config, &metadata)?; - } - _ => (), - } + migrate_staging(config, staging_metadata)?; } Ok(()) } +fn migrate_staging(config: &Parseable, staging_metadata: Value) -> anyhow::Result<()> { + match get_version(&staging_metadata) { + Some("v1") => { + let mut metadata = metadata_migration::v1_v3(staging_metadata); + metadata = metadata_migration::v3_v4(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v2") => { + let mut metadata = metadata_migration::v2_v3(staging_metadata); + metadata = metadata_migration::v3_v4(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v3") => { + let metadata = metadata_migration::v3_v4(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v4") => { + let metadata = metadata_migration::v4_v5(staging_metadata); + let metadata = metadata_migration::v5_v6(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v5") => { + let metadata = metadata_migration::v5_v6(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + _ => (), + } + Ok(()) +} + /// run the migration for all streams concurrently pub async fn run_migration(config: &Parseable) -> anyhow::Result<()> { let storage = config.storage.get_object_store(); diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 2864716ae..8da51db7a 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -226,33 +226,9 @@ impl Sessions { context_user: Option<&str>, ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { - // if user is a part of any user groups, then add permissions - let perms: HashSet = if let Some(user) = users().0.get(username) { - let all_groups_roles = user - .user_groups - .iter() - .filter(|id| (read_user_groups().0.contains_key(*id))) - .map(|id| read_user_groups().0.get(id).unwrap().roles.clone()) - .reduce(|mut acc, e| { - acc.extend(e); - acc - }) - .unwrap_or_default(); - - let mut privilege_list = Vec::new(); - all_groups_roles - .iter() - .filter_map(|role| roles().get(role).cloned()) - .for_each(|privileges| privilege_list.extend(privileges)); - - let mut perms = HashSet::from_iter(perms.clone()); - for privs in privilege_list { - perms.extend(RoleBuilder::from(&privs).build()) - } - perms - } else { - HashSet::from_iter(perms.clone()) - }; + let mut perms: HashSet = HashSet::from_iter(perms.clone()); + perms.extend(aggregate_group_permissions(username)); + if perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize @@ -317,6 +293,35 @@ impl From> for Users { } } +fn aggregate_group_permissions(username: &str) -> HashSet { + let mut group_perms = HashSet::new(); + + let Some(user) = users().get(username).cloned() else { + return group_perms; + }; + + if user.user_groups.is_empty() { + return group_perms; + } + + for group_name in &user.user_groups { + let Some(group) = read_user_groups().get(group_name).cloned() else { + continue; + }; + + for role_name in &group.roles { + let Some(privileges) = roles().get(role_name).cloned() else { + continue; + }; + + for privilege in privileges { + group_perms.extend(RoleBuilder::from(&privilege).build()); + } + } + } + + group_perms +} // Map of [user group ID --> UserGroup] // This map is populated at startup with the list of user groups from parseable.json file #[derive(Debug, Default, Clone, derive_more::Deref, derive_more::DerefMut)] diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index cc7bb2326..307066726 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -119,7 +119,10 @@ impl Users { pub fn get_permissions(&self, session: &SessionKey) -> Vec { let mut permissions = sessions().get(session).cloned().unwrap_or_default(); - let username = self.get_username_from_session(session).unwrap(); + let Some(username) = self.get_username_from_session(session) else { + return permissions.into_iter().collect_vec(); + }; + let user_groups = self.get_user_groups(&username); for group in user_groups { if let Some(group) = read_user_groups().get(&group) { diff --git a/src/rbac/role.rs b/src/rbac/role.rs index b14b5dc2a..da013113c 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -90,8 +90,6 @@ pub enum ParseableResourceType { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Permission { Unit(Action), - // Stream(Action, String), - // StreamWithTag(Action, String, Option), Resource(Action, ParseableResourceType), SelfUser, } @@ -100,40 +98,16 @@ pub enum Permission { #[derive(Debug, Default)] pub struct RoleBuilder { actions: Vec, - // stream: Option, - // tag: Option, - // resource_id: Option, resource_type: Option, } // R x P impl RoleBuilder { - pub fn with_resource( - mut self, - resource_type: ParseableResourceType, - // resource_id: String, - ) -> Self { + pub fn with_resource(mut self, resource_type: ParseableResourceType) -> Self { self.resource_type = Some(resource_type); - // self.resource_id = Some(resource_id); self } - // pub fn with_stream(mut self, stream: String) -> Self { - // self.stream = Some(stream); - // self - // } - - // pub fn with_tag(mut self, tag: String) -> Self { - // self.tag = Some(tag); - // self - // } - - // pub fn with_resource(mut self, resource_id: String, resource_type: ParseableResourceType) -> Self { - // self.resource_id = Some(resource_id); - // self.resource_type = Some(resource_type); - // self - // } - pub fn build(self) -> Vec { let mut perms = Vec::new(); for action in self.actions { @@ -192,11 +166,7 @@ impl RoleBuilder { | Action::GetStats | Action::GetRetention | Action::PutRetention - | Action::All => Permission::Resource( - action, - self.resource_type.clone().unwrap(), - // self.resource_id.clone().unwrap(), - ), + | Action::All => Permission::Resource(action, self.resource_type.clone().unwrap()), }; perms.push(perm); } @@ -218,17 +188,9 @@ pub mod model { pub enum DefaultPrivilege { Admin, Editor, - Writer { - resource: ParseableResourceType, - // resource_id: String, - }, - Ingestor { - resource: ParseableResourceType, - }, - Reader { - resource: ParseableResourceType, - // resource_id: String, - }, + Writer { resource: ParseableResourceType }, + Ingestor { resource: ParseableResourceType }, + Reader { resource: ParseableResourceType }, } impl From<&DefaultPrivilege> for RoleBuilder { @@ -236,14 +198,12 @@ pub mod model { match value { DefaultPrivilege::Admin => admin_perm_builder(), DefaultPrivilege::Editor => editor_perm_builder(), - DefaultPrivilege::Writer { - resource, - // resource_id, - } => writer_perm_builder().with_resource(resource.to_owned()), - DefaultPrivilege::Reader { - resource, - // resource_id, - } => reader_perm_builder().with_resource(resource.to_owned()), + DefaultPrivilege::Writer { resource } => { + writer_perm_builder().with_resource(resource.to_owned()) + } + DefaultPrivilege::Reader { resource } => { + reader_perm_builder().with_resource(resource.to_owned()) + } DefaultPrivilege::Ingestor { resource } => { ingest_perm_builder().with_resource(resource.to_owned()) } @@ -254,10 +214,7 @@ pub mod model { fn admin_perm_builder() -> RoleBuilder { RoleBuilder { actions: vec![Action::All], - // stream: Some("*".to_string()), - // tag: None, resource_type: Some(ParseableResourceType::All), - // resource_id: Some("*".to_string()), } } @@ -303,9 +260,6 @@ pub mod model { Action::DeleteDashboard, Action::GetUserRoles, ], - // stream: Some("*".to_string()), - // tag: None, - // resource_id: Some("*".to_string()), resource_type: Some(ParseableResourceType::All), } } @@ -346,9 +300,6 @@ pub mod model { Action::DeleteFilter, Action::GetUserRoles, ], - // stream: None, - // tag: None, - // resource_id: None, resource_type: None, } } @@ -382,9 +333,6 @@ pub mod model { Action::GetUserRoles, Action::GetAlert, ], - // stream: None, - // tag: None, - // resource_id: None, resource_type: None, } } @@ -392,9 +340,6 @@ pub mod model { fn ingest_perm_builder() -> RoleBuilder { RoleBuilder { actions: vec![Action::Ingest], - // stream: None, - // tag: None, - // resource_id: None, resource_type: None, } } diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 0a3aaaddb..0aa1b7ab3 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -221,26 +221,26 @@ impl UserGroup { if read_user_groups().contains_key(&self.name) { return Err(RBACError::UserGroupExists(self.name.clone())); } - let mut non_existant_roles = Vec::new(); + let mut non_existent_roles = Vec::new(); if !self.roles.is_empty() { // validate that the roles exist for role in &self.roles { if !roles().contains_key(role) { - non_existant_roles.push(role.clone()); + non_existent_roles.push(role.clone()); } } } - let mut non_existant_users = Vec::new(); + let mut non_existent_users = Vec::new(); if !self.users.is_empty() { // validate that the users exist for user in &self.users { if !users().contains_key(user) { - non_existant_users.push(user.clone()); + non_existent_users.push(user.clone()); } } } - if !non_existant_roles.is_empty() || !non_existant_users.is_empty() || !valid_name { + if !non_existent_roles.is_empty() || !non_existent_users.is_empty() || !valid_name { let comments = if !valid_name { "The name should follow this regex- `^[A-Za-z0-9_-]+$`".to_string() } else { @@ -249,8 +249,8 @@ impl UserGroup { Err(RBACError::InvalidUserGroupRequest(Box::new( InvalidUserGroupError { valid_name, - non_existant_roles, - non_existant_users, + non_existent_roles, + non_existent_users, roles_not_in_group: vec![], users_not_in_group: vec![], comments, From 6955a299b78ac64861c5fe094d9f1f25ae306555 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 8 Jul 2025 11:08:18 +0530 Subject: [PATCH 7/9] bugfixes - user and role deletion while still being used - renamed resource types --- src/handlers/http/rbac.rs | 13 +++++++++++- src/handlers/http/role.rs | 43 ++++++++------------------------------- src/rbac/role.rs | 2 +- 3 files changed, 22 insertions(+), 36 deletions(-) diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index bc66c03b5..a1c77ae81 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -228,11 +228,19 @@ pub async fn get_role(username: web::Path) -> Result) -> Result { let username = username.into_inner(); - let _ = UPDATE_LOCK.lock().await; + + // if user is a part of any groups then don't allow deletion + if !Users.get_user_groups(&username).is_empty() { + return Err(RBACError::InvalidDeletionRequest(format!( + "User: {username} should not be a part of any groups" + ))); + } // fail this request if the user does not exists if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + let _ = UPDATE_LOCK.lock().await; + // delete from parseable.json first let mut metadata = get_metadata().await?; metadata.users.retain(|user| user.username() != username); @@ -408,6 +416,8 @@ pub enum RBACError { UserGroupNotEmpty(String), #[error("Resource in use: {0}")] ResourceInUse(String), + #[error("{0}")] + InvalidDeletionRequest(String), } impl actix_web::ResponseError for RBACError { @@ -429,6 +439,7 @@ impl actix_web::ResponseError for RBACError { Self::InvalidSyncOperation(_) => StatusCode::BAD_REQUEST, Self::UserGroupNotEmpty(_) => StatusCode::BAD_REQUEST, Self::ResourceInUse(_) => StatusCode::BAD_REQUEST, + Self::InvalidDeletionRequest(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index cdd70719e..50ba678ee 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -22,12 +22,11 @@ use actix_web::{ HttpResponse, Responder, }; use http::StatusCode; -use itertools::Itertools; use crate::{ parseable::PARSEABLE, rbac::{ - map::{mut_roles, read_user_groups, write_user_groups, DEFAULT_ROLE}, + map::{mut_roles, DEFAULT_ROLE}, role::model::DefaultPrivilege, }, storage::{self, ObjectStorageError, StorageMetadata}, @@ -78,46 +77,22 @@ pub async fn list_roles() -> Result { // Delete existing role pub async fn delete(name: web::Path) -> Result { let name = name.into_inner(); + // check if the role is being used by any user or group let mut metadata = get_metadata().await?; if metadata.users.iter().any(|user| user.roles.contains(&name)) { return Err(RoleError::RoleInUse); } + if metadata + .user_groups + .iter() + .any(|user_group| user_group.roles.contains(&name)) + { + return Err(RoleError::RoleInUse); + } metadata.roles.remove(&name); put_metadata(&metadata).await?; mut_roles().remove(&name); - // also delete from user groups - let groups = read_user_groups().keys().cloned().collect_vec(); - let mut group_names = Vec::new(); - - for user_group in groups { - if let Some(ug) = read_user_groups().get(&user_group) { - if ug.roles.contains(&name) { - return Err(RoleError::RoleInUse); - } - group_names.push(ug.name.clone()); - } else { - continue; - }; - } - - // remove role from all user groups that have it - let mut groups_to_update = Vec::new(); - for group in write_user_groups().values_mut() { - if group.roles.remove(&name) { - groups_to_update.push(group.clone()); - } - } - - // update metadata only if there are changes - if !groups_to_update.is_empty() { - metadata - .user_groups - .retain(|x| !groups_to_update.contains(x)); - metadata.user_groups.extend(groups_to_update); - } - put_metadata(&metadata).await?; - Ok(HttpResponse::Ok().finish()) } diff --git a/src/rbac/role.rs b/src/rbac/role.rs index da013113c..9e54bb96a 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -81,7 +81,7 @@ pub enum Action { pub enum ParseableResourceType { #[serde(rename = "stream")] Stream(String), - #[serde(rename = "llm")] + #[serde(rename = "llmKey")] Llm(String), #[serde(rename = "all")] All, From 0b118abee432280394f98e0a247b615b87d77c94 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 9 Jul 2025 16:43:40 +0530 Subject: [PATCH 8/9] bugfix - user sessions get removed upon modifying group's roles --- src/handlers/http/rbac.rs | 49 +++++++++++---------------------------- src/rbac/user.rs | 37 ++++++++++++++--------------- 2 files changed, 32 insertions(+), 54 deletions(-) diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index a1c77ae81..a313207a8 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet}; use crate::{ rbac::{ self, - map::{read_user_groups, roles, write_user_groups}, + map::{read_user_groups, roles}, role::model::DefaultPrivilege, user, utils::to_prism_user, @@ -116,14 +116,11 @@ pub async fn post_user( return Err(RBACError::RoleValidationError); } else { let mut non_existent_roles = Vec::new(); - user_roles - .iter() - .map(|r| { - if !roles().contains_key(r) { - non_existent_roles.push(r.clone()); - } - }) - .for_each(drop); + for role in &user_roles { + if !roles().contains_key(role) { + non_existent_roles.push(role.clone()); + } + } if !non_existent_roles.is_empty() { return Err(RBACError::RolesDoNotExist(non_existent_roles)); } @@ -244,24 +241,6 @@ pub async fn delete_user(username: web::Path) -> Result) -> Result<(), RBACError> { self.roles.extend(roles); + // also refresh all user sessions + for username in &self.users { + mut_sessions().remove_user(username); + } Ok(()) } pub fn add_users(&mut self, users: HashSet) -> Result<(), RBACError> { - self.users.extend(users); + self.users.extend(users.clone()); + // also refresh all user sessions + for username in &users { + mut_sessions().remove_user(username); + } Ok(()) } @@ -283,6 +291,10 @@ impl UserGroup { } self.roles.clone_from(&new_roles); + // also refresh all user sessions + for username in &self.users { + mut_sessions().remove_user(username); + } Ok(()) } @@ -293,6 +305,10 @@ impl UserGroup { if old_users.eq(&new_users) { return Ok(()); } + // also refresh all user sessions + for username in &users { + mut_sessions().remove_user(username); + } self.users.clone_from(&new_users); Ok(()) @@ -305,21 +321,4 @@ impl UserGroup { put_metadata(&metadata).await?; Ok(()) } - - // // are these methods even needed?? - // pub fn group_name(&self) -> String { - // self.name.clone() - // } - - // pub fn group_id(&self) -> Ulid { - // self.id - // } - - // pub fn group_roles(&self) -> HashSet { - // self.roles.clone() - // } - - // pub fn group_users(&self) -> HashSet { - // self.users.clone() - // } } From 83c6c9b0a2ddd643e796d8d1acd9ea9cbcf96983 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 10 Jul 2025 16:17:48 +0530 Subject: [PATCH 9/9] Updates - session refresh in case of role modification - better error messages --- .../http/modal/ingest/ingestor_role.rs | 27 ++++++++++++++++++- src/handlers/http/modal/query/querier_role.rs | 27 ++++++++++++++++++- src/handlers/http/rbac.rs | 8 +++--- src/handlers/http/role.rs | 24 ++++++++++++++++- 4 files changed, 79 insertions(+), 7 deletions(-) diff --git a/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs index d48b9efdf..b2656c979 100644 --- a/src/handlers/http/modal/ingest/ingestor_role.rs +++ b/src/handlers/http/modal/ingest/ingestor_role.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use actix_web::{ web::{self, Json}, HttpResponse, Responder, @@ -23,7 +25,10 @@ use actix_web::{ use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, - rbac::{map::mut_roles, role::model::DefaultPrivilege}, + rbac::{ + map::{mut_roles, mut_sessions, read_user_groups, users}, + role::model::DefaultPrivilege, + }, storage, }; @@ -40,5 +45,25 @@ pub async fn put( let _ = storage::put_staging_metadata(&metadata); mut_roles().insert(name.clone(), privileges); + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + for user_group in read_user_groups().values().cloned() { + if user_group.roles.contains(&name) { + session_refresh_users.extend(user_group.users); + } + } + + // iterate over all users to see if they have this role + for user in users().values().cloned() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.username().to_string()); + } + } + + for username in session_refresh_users { + mut_sessions().remove_user(&username); + } + Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index b8b6f4639..40a7024b4 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use actix_web::{ web::{self, Json}, HttpResponse, Responder, @@ -27,7 +29,10 @@ use crate::{ modal::utils::rbac_utils::{get_metadata, put_metadata}, role::RoleError, }, - rbac::{map::mut_roles, role::model::DefaultPrivilege}, + rbac::{ + map::{mut_roles, mut_sessions, read_user_groups, users}, + role::model::DefaultPrivilege, + }, }; // Handler for PUT /api/v1/role/{name} @@ -43,6 +48,26 @@ pub async fn put( put_metadata(&metadata).await?; mut_roles().insert(name.clone(), privileges.clone()); + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + for user_group in read_user_groups().values().cloned() { + if user_group.roles.contains(&name) { + session_refresh_users.extend(user_group.users); + } + } + + // iterate over all users to see if they have this role + for user in users().values().cloned() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.username().to_string()); + } + } + + for username in session_refresh_users { + mut_sessions().remove_user(&username); + } + sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; Ok(HttpResponse::Ok().finish()) diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index a313207a8..acaabf3a6 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -379,9 +379,9 @@ pub enum RBACError { Anyhow(#[from] anyhow::Error), #[error("User cannot be created without a role")] RoleValidationError, - #[error("User group already exists: '{0}'")] + #[error("User group `{0}` already exists")] UserGroupExists(String), - #[error("UserGroup does not exist: {0}")] + #[error("UserGroup `{0}` does not exist")] UserGroupDoesNotExist(String), #[error("Invalid Roles: {0:?}")] RolesDoNotExist(Vec), @@ -391,9 +391,9 @@ pub enum RBACError { InvalidUserGroupRequest(Box), #[error("{0}")] InvalidSyncOperation(String), - #[error("User group still being used by users: {0}")] + #[error("User group `{0}` is still being used")] UserGroupNotEmpty(String), - #[error("Resource in use: {0}")] + #[error("Resource `{0}` is still in use")] ResourceInUse(String), #[error("{0}")] InvalidDeletionRequest(String), diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index 50ba678ee..e37ab61c4 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use actix_web::{ http::header::ContentType, web::{self, Json}, @@ -26,7 +28,7 @@ use http::StatusCode; use crate::{ parseable::PARSEABLE, rbac::{ - map::{mut_roles, DEFAULT_ROLE}, + map::{mut_roles, mut_sessions, read_user_groups, users, DEFAULT_ROLE}, role::model::DefaultPrivilege, }, storage::{self, ObjectStorageError, StorageMetadata}, @@ -45,6 +47,26 @@ pub async fn put( put_metadata(&metadata).await?; mut_roles().insert(name.clone(), privileges.clone()); + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + for user_group in read_user_groups().values().cloned() { + if user_group.roles.contains(&name) { + session_refresh_users.extend(user_group.users); + } + } + + // iterate over all users to see if they have this role + for user in users().values().cloned() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.username().to_string()); + } + } + + for username in session_refresh_users { + mut_sessions().remove_user(&username); + } + Ok(HttpResponse::Ok().finish()) }