Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 45 additions & 22 deletions server/src/handlers/http/oidc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
option::CONFIG,
rbac::{
map::{SessionKey, DEFAULT_ROLE},
user::{User, UserType},
user::{self, User, UserType},
Users,
},
storage::{self, ObjectStorageError, StorageMetadata},
Expand Down Expand Up @@ -138,19 +138,31 @@ pub async fn reply_login(
else {
return Ok(HttpResponse::Unauthorized().finish());
};
let username = user_info.sub.unwrap();
let group: Option<HashSet<String>> = claims
let username = user_info
.sub
.clone()
.expect("OIDC provider did not return a sub which is currently required.");
let user_info: user::UserInfo = user_info.into();

let group: HashSet<String> = claims
.other
.remove("groups")
.map(serde_json::from_value)
.transpose()?;
.transpose()?
.unwrap_or_else(|| {
DEFAULT_ROLE
.lock()
.unwrap()
.clone()
.map(|role| HashSet::from([role]))
.unwrap_or_default()
});

// User may not exist
// create a new one depending on state of metadata
let user = match (Users.get_user(&username), group) {
(Some(user), Some(group)) => update_user_if_changed(user, group).await?,
(Some(user), None) => user,
(None, group) => put_user(&username, group).await?,
(Some(user), group) => update_user_if_changed(user, group, user_info).await?,
(None, group) => put_user(&username, group, user_info).await?,
};
let id = Ulid::new();
Users.new_session(&user, SessionKey::SessionId(id));
Expand Down Expand Up @@ -257,25 +269,18 @@ async fn request_token(
// update local cache
async fn put_user(
username: &str,
group: Option<HashSet<String>>,
group: HashSet<String>,
user_info: user::UserInfo,
) -> Result<User, ObjectStorageError> {
let mut metadata = get_metadata().await?;
let group = group.unwrap_or_else(|| {
DEFAULT_ROLE
.lock()
.unwrap()
.clone()
.map(|role| HashSet::from([role]))
.unwrap_or_default()
});

let user = metadata
.users
.iter()
.find(|user| user.username() == username)
.cloned()
.unwrap_or_else(|| {
let user = User::new_oauth(username.to_owned(), group);
let user = User::new_oauth(username.to_owned(), group, user_info);
metadata.users.push(user.clone());
user
});
Expand All @@ -288,14 +293,32 @@ async fn put_user(
async fn update_user_if_changed(
mut user: User,
group: HashSet<String>,
user_info: user::UserInfo,
) -> Result<User, ObjectStorageError> {
// update user if roles have changed
if user.roles == group {
let User { ty, roles } = &mut user;
let UserType::OAuth(oauth_user) = ty else {
unreachable!()
};

// update user only if roles or userinfo has changed
if roles == &group && oauth_user.user_info == user_info {
return Ok(user);
}
let metadata = get_metadata().await?;
user.roles = group;
put_metadata(&metadata).await?;

oauth_user.user_info = user_info;
*roles = group;

let mut metadata = get_metadata().await?;

if let Some(entry) = metadata
.users
.iter_mut()
.find(|x| x.username() == user.username())
{
entry.clone_from(&user);
put_metadata(&metadata).await?;
}

Users.put_user(user.clone());
Ok(user)
}
Expand Down
8 changes: 3 additions & 5 deletions server/src/rbac/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,8 @@ impl Sessions {
permissions: Vec<Permission>,
) {
self.remove_expired_session(&user);
self.user_sessions
.entry(user.clone())
.and_modify(|sessions| sessions.push((key.clone(), expiry)))
.or_default();
let sessions = self.user_sessions.entry(user.clone()).or_default();
sessions.push((key.clone(), expiry));
self.active_sessions.insert(key, (user, permissions));
}

Expand Down Expand Up @@ -220,7 +218,7 @@ impl Sessions {
};
(action == required_action || action == Action::All) && ok_stream
}
Permission::SelfRole if required_action == Action::GetUserRoles => {
Permission::SelfUser if required_action == Action::GetUserRoles => {
context_user.map(|x| x == username).unwrap_or_default()
}
_ => false,
Expand Down
48 changes: 24 additions & 24 deletions server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum Permission {
Unit(Action),
Stream(Action, String),
StreamWithTag(Action, String, Option<String>),
SelfRole,
SelfUser,
}

// Currently Roles are tied to one stream
Expand All @@ -77,37 +77,37 @@ impl RoleBuilder {
let mut perms = Vec::new();
for action in self.actions {
let perm = match action {
Action::Ingest => Permission::Stream(action, self.stream.clone().unwrap()),
Action::Query => Permission::StreamWithTag(
action,
self.stream.clone().unwrap(),
self.tag.clone(),
),
Action::CreateStream => Permission::Unit(action),
Action::DeleteStream => Permission::Unit(action),
Action::ListStream => Permission::Unit(action),
Action::GetSchema => Permission::Stream(action, self.stream.clone().unwrap()),
Action::GetStats => Permission::Stream(action, self.stream.clone().unwrap()),
Action::GetRetention => Permission::Stream(action, self.stream.clone().unwrap()),
Action::PutRetention => Permission::Stream(action, self.stream.clone().unwrap()),
Action::PutAlert => Permission::Stream(action, self.stream.clone().unwrap()),
Action::GetAlert => Permission::Stream(action, self.stream.clone().unwrap()),
Action::PutUser => Permission::Unit(action),
Action::ListUser => Permission::Unit(action),
Action::PutUserRoles => Permission::Unit(action),
Action::GetUserRoles => Permission::Unit(action),
Action::DeleteUser => Permission::Unit(action),
Action::GetAbout => Permission::Unit(action),
Action::QueryLLM => Permission::Unit(action),
Action::PutRole => Permission::Unit(action),
Action::GetRole => Permission::Unit(action),
Action::DeleteRole => Permission::Unit(action),
Action::ListRole => Permission::Unit(action),
Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
Action::PutUser
| Action::ListUser
| Action::PutUserRoles
| Action::GetUserRoles
| Action::DeleteUser
| Action::GetAbout
| Action::QueryLLM
| Action::PutRole
| Action::GetRole
| Action::DeleteRole
| Action::ListRole
| Action::CreateStream
| Action::DeleteStream
| Action::ListStream => Permission::Unit(action),
Action::Ingest
| Action::GetSchema
| Action::GetStats
| Action::GetRetention
| Action::PutRetention
| Action::PutAlert
| Action::GetAlert
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
};
perms.push(perm);
}
perms.push(Permission::SelfRole);
perms.push(Permission::SelfUser);
perms
}
}
Expand Down
41 changes: 38 additions & 3 deletions server/src/rbac/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ impl User {
)
}

pub fn new_oauth(username: String, roles: HashSet<String>) -> Self {
pub fn new_oauth(username: String, roles: HashSet<String>, user_info: UserInfo) -> Self {
Self {
ty: UserType::OAuth(OAuth { userid: username }),
ty: UserType::OAuth(OAuth {
userid: username,
user_info,
}),
roles,
}
}
Expand All @@ -69,6 +72,7 @@ impl User {
UserType::Native(Basic { ref username, .. }) => username,
UserType::OAuth(OAuth {
userid: ref username,
..
}) => username,
}
}
Expand Down Expand Up @@ -148,5 +152,36 @@ pub fn get_admin_user() -> User {

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct OAuth {
userid: String,
pub userid: String,
pub user_info: UserInfo,
}

#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct UserInfo {
#[serde(default)]
/// User's full name for display purposes.
pub name: Option<String>,
#[serde(default)]
pub preferred_username: Option<String>,
#[serde(default)]
pub picture: Option<url::Url>,
#[serde(default)]
pub email: Option<String>,
#[serde(default)]
pub gender: Option<String>,
#[serde(default)]
pub updated_at: Option<i64>,
}

impl From<openid::Userinfo> for UserInfo {
fn from(user: openid::Userinfo) -> Self {
UserInfo {
name: user.name,
preferred_username: user.preferred_username,
picture: user.picture,
email: user.email,
gender: user.gender,
updated_at: user.updated_at,
}
}
}