diff --git a/Cargo.lock b/Cargo.lock index 4f536ee76..a69dc53ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -669,6 +669,22 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "biscuit" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28865439fc81744500265d96c920985ceb6b612ef8564d43f1cc78e7a6c89e26" +dependencies = [ + "chrono", + "data-encoding", + "num-bigint", + "num-traits", + "once_cell", + "ring", + "serde", + "serde_json", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -1226,6 +1242,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" + [[package]] name = "datafusion" version = "27.0.0" @@ -1558,9 +1580,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" dependencies = [ "percent-encoding", ] @@ -1933,14 +1955,20 @@ dependencies = [ [[package]] name = "idna" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" dependencies = [ "unicode-bidi", "unicode-normalization", ] +[[package]] +name = "if_chain" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed" + [[package]] name = "indexmap" version = "1.9.2" @@ -2508,6 +2536,25 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +[[package]] +name = "openid" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e216a817bf746f9457703dc9fb6e3563eacd48114e0ad539732f24a379ea4a05" +dependencies = [ + "base64 0.21.0", + "biscuit", + "chrono", + "lazy_static", + "mime", + "reqwest", + "serde", + "serde_json", + "thiserror", + "url", + "validator", +] + [[package]] name = "openssl" version = "0.10.55" @@ -2683,6 +2730,7 @@ dependencies = [ "num_cpus", "object_store", "once_cell", + "openid", "parquet", "prometheus", "pyroscope", @@ -2707,6 +2755,7 @@ dependencies = [ "ulid", "uptime_lib", "ureq", + "url", "vergen", "xxhash-rust", "xz2", @@ -2747,9 +2796,9 @@ checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696" [[package]] name = "percent-encoding" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "petgraph" @@ -3391,6 +3440,7 @@ version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" dependencies = [ + "indexmap", "itoa 1.0.5", "ryu", "serde", @@ -4100,13 +4150,14 @@ dependencies = [ [[package]] name = "url" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -4118,6 +4169,48 @@ dependencies = [ "getrandom", ] +[[package]] +name = "validator" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b92f40481c04ff1f4f61f304d61793c7b56ff76ac1469f1beb199b1445b253bd" +dependencies = [ + "idna", + "lazy_static", + "regex", + "serde", + "serde_derive", + "serde_json", + "url", + "validator_derive", +] + +[[package]] +name = "validator_derive" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc44ca3088bb3ba384d9aecf40c6a23a676ce23e09bdaca2073d99c207f864af" +dependencies = [ + "if_chain", + "lazy_static", + "proc-macro-error", + "proc-macro2", + "quote", + "regex", + "syn 1.0.107", + "validator_types", +] + +[[package]] +name = "validator_types" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "111abfe30072511849c5910134e8baf8dc05de4c0e5903d681cbd5c9c4d611e3" +dependencies = [ + "proc-macro2", + "syn 1.0.107", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/server/Cargo.toml b/server/Cargo.toml index cb6d4e933..92faed0e6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -89,6 +89,8 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features = ["static"] } nom = "7.1.3" humantime = "2.1.0" +openid = { version = "0.12.0", default-features = false, features = ["rustls"] } +url = "2.4.0" [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index d809f883c..ce6023188 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -18,11 +18,17 @@ use std::fs::File; use std::io::BufReader; +use std::sync::Arc; use actix_cors::Cors; -use actix_web::{web, App, HttpServer}; +use actix_web::{ + web::{self, resource}, + App, HttpServer, +}; use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; +use log::info; +use openid::Discovered; use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; @@ -37,8 +43,10 @@ mod ingest; mod llm; mod logstream; mod middleware; +mod oidc; mod query; mod rbac; +mod role; include!(concat!(env!("OUT_DIR"), "/generated.rs")); @@ -46,24 +54,36 @@ const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; const API_BASE_PATH: &str = "/api"; const API_VERSION: &str = "v1"; -#[macro_export] -macro_rules! create_app { - ($prometheus: expr) => { +pub async fn run_http( + prometheus: PrometheusMetrics, + oidc_client: Option, +) -> anyhow::Result<()> { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + None => None, + }; + + let create_app = move || { App::new() - .wrap($prometheus.clone()) - .configure(|cfg| configure_routes(cfg)) + .wrap(prometheus.clone()) + .configure(|cfg| configure_routes(cfg, oidc_client.clone())) .wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Compress::default()) .wrap( Cors::default() .allow_any_header() .allow_any_method() - .allow_any_origin(), + .allow_any_origin() + .expose_any_header() + .supports_credentials(), ) }; -} -pub async fn run_http(prometheus: PrometheusMetrics) -> anyhow::Result<()> { let ssl_acceptor = match ( &CONFIG.parseable.tls_cert_path, &CONFIG.parseable.tls_key_path, @@ -99,7 +119,7 @@ pub async fn run_http(prometheus: PrometheusMetrics) -> anyhow::Result<()> { }; // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(move || create_app!(prometheus)).workers(num_cpus::get()); + let http_server = HttpServer::new(create_app).workers(num_cpus::get()); if let Some(config) = ssl_acceptor { http_server .bind_rustls(&CONFIG.parseable.address, config)? @@ -112,7 +132,10 @@ pub async fn run_http(prometheus: PrometheusMetrics) -> anyhow::Result<()> { Ok(()) } -pub fn configure_routes(cfg: &mut web::ServiceConfig) { +pub fn configure_routes( + cfg: &mut web::ServiceConfig, + oidc_client: Option>>, +) { let generated = generate(); //log stream API @@ -211,13 +234,13 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .route( web::put() .to(rbac::put_role) - .authorize(Action::PutRoles) + .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ) .route( web::get() .to(rbac::get_role) - .authorize_for_user(Action::GetRole), + .authorize_for_user(Action::GetUserRoles), ), ) .service( @@ -238,6 +261,24 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .authorize(Action::QueryLLM), ), ); + let role_api = web::scope("/role") + .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service( + resource("/{name}") + .route(web::put().to(role::put).authorize(Action::PutRole)) + .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route(web::get().to(role::get).authorize(Action::GetRole)), + ); + + let mut oauth_api = web::scope("/o") + .service(resource("/login").route(web::get().to(oidc::login))) + .service(resource("/logout").route(web::get().to(oidc::logout))) + .service(resource("/code").route(web::get().to(oidc::reply_login))); + + if let Some(client) = oidc_client { + info!("Registered oidc client"); + oauth_api = oauth_api.app_data(web::Data::from(client)) + } // Deny request if username is same as the env variable P_USERNAME. cfg.service( @@ -280,7 +321,9 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { ), ) .service(user_api) - .service(llm_query_api), + .service(llm_query_api) + .service(oauth_api) + .service(role_api), ) // GET "/" ==> Serve the static frontend directory .service(ResourceFiles::new("/", generated).resolve_not_found_to_root()); diff --git a/server/src/handlers/http/middleware.rs b/server/src/handlers/http/middleware.rs index fa1d2b1e1..686f04ae2 100644 --- a/server/src/handlers/http/middleware.rs +++ b/server/src/handlers/http/middleware.rs @@ -21,13 +21,17 @@ use std::future::{ready, Ready}; use actix_web::{ dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, - error::{ErrorBadRequest, ErrorUnauthorized}, + error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized}, Error, Route, }; -use actix_web_httpauth::extractors::basic::BasicAuth; use futures_util::future::LocalBoxFuture; -use crate::{option::CONFIG, rbac::role::Action, rbac::Users}; +use crate::{ + option::CONFIG, + rbac::Users, + rbac::{self, role::Action}, + utils::actix::extract_session_key, +}; pub trait RouteExt { fn authorize(self, action: Action) -> Self; @@ -61,7 +65,7 @@ impl RouteExt for Route { // Authentication Layer with no context pub struct Auth { pub action: Action, - pub method: fn(&mut ServiceRequest, Action) -> Result, + pub method: fn(&mut ServiceRequest, Action) -> Result, } impl Transform for Auth @@ -87,7 +91,7 @@ where pub struct AuthMiddleware { action: Action, - auth_method: fn(&mut ServiceRequest, Action) -> Result, + auth_method: fn(&mut ServiceRequest, Action) -> Result, service: S, } @@ -104,44 +108,44 @@ where forward_ready!(service); fn call(&self, mut req: ServiceRequest) -> Self::Future { - let auth_result: Result = (self.auth_method)(&mut req, self.action); + let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action); let fut = self.service.call(req); Box::pin(async move { - if !auth_result? { - return Err(ErrorUnauthorized("Not authorized")); + match auth_result? { + rbac::Response::UnAuthorized => return Err( + ErrorForbidden("You don't have permission to access this resource. Please contact your administrator for assistance.") + ), + rbac::Response::ReloadRequired => return Err( + ErrorUnauthorized("Your session has expired or is no longer valid. Please re-authenticate to access this resource.") + ), + _ => {} } fut.await }) } } -pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result { - let creds = extract_basic_auth(req); - creds.map(|(username, password)| Users.authenticate(username, password, action, None, None)) +pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result { + let creds = extract_session_key(req); + creds.map(|key| Users.authorize(key, action, None, None)) } -pub fn auth_stream_context(req: &mut ServiceRequest, action: Action) -> Result { - let creds = extract_basic_auth(req); +pub fn auth_stream_context( + req: &mut ServiceRequest, + action: Action, +) -> Result { + let creds = extract_session_key(req); let stream = req.match_info().get("logstream"); - creds.map(|(username, password)| Users.authenticate(username, password, action, stream, None)) + creds.map(|key| Users.authorize(key, action, stream, None)) } -pub fn auth_user_context(req: &mut ServiceRequest, action: Action) -> Result { - let creds = extract_basic_auth(req); +pub fn auth_user_context( + req: &mut ServiceRequest, + action: Action, +) -> Result { + let creds = extract_session_key(req); let user = req.match_info().get("username"); - creds.map(|(username, password)| Users.authenticate(username, password, action, None, user)) -} - -fn extract_basic_auth(req: &mut ServiceRequest) -> Result<(String, String), Error> { - // Extract username and password from the request using basic auth extractor. - let creds = req.extract::().into_inner(); - creds.map_err(Into::into).map(|creds| { - let username = creds.user_id().trim().to_owned(); - // password is not mandatory by basic auth standard. - // If not provided then treat as empty string - let password = creds.password().unwrap_or("").trim().to_owned(); - (username, password) - }) + creds.map(|key| Users.authorize(key, action, None, user)) } // The credentials set in the env vars (P_USERNAME & P_PASSWORD) are treated diff --git a/server/src/handlers/http/oidc.rs b/server/src/handlers/http/oidc.rs new file mode 100644 index 000000000..1cce3819d --- /dev/null +++ b/server/src/handlers/http/oidc.rs @@ -0,0 +1,341 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashSet, sync::Arc}; + +use actix_web::{ + cookie::{time, Cookie, SameSite}, + http::header::{self, ContentType}, + web::{self, Data}, + HttpRequest, HttpResponse, +}; +use http::StatusCode; +use openid::{Options, Token, Userinfo}; +use serde::Deserialize; +use ulid::Ulid; +use url::Url; + +use crate::{ + oidc::{Claims, DiscoveredClient}, + option::CONFIG, + rbac::{ + map::SessionKey, + user::{User, UserType}, + Users, + }, + storage::{self, ObjectStorageError, StorageMetadata}, + utils::actix::extract_session_key_from_req, +}; + +// fetch common personalization scope to determine username. +const SCOPE: &str = "openid profile email"; +const COOKIE_AGE_DAYS: usize = 7; + +/// Struct representing query params returned from oidc provider +#[derive(Deserialize, Debug)] +pub struct Login { + pub code: String, + pub state: Option, +} + +/// Struct representing query param when visiting /login +/// Caller can set the state for code auth flow and this is +/// at the end used as target for redirect +#[derive(Deserialize, Debug)] +pub struct RedirectAfterLogin { + pub redirect: Url, +} + +pub async fn login( + req: HttpRequest, + query: web::Query, +) -> Result { + let oidc_client = req.app_data::>(); + let session_key = extract_session_key_from_req(&req).ok(); + + let (session_key, oidc_client) = match (session_key, oidc_client) { + (None, None) => return Ok(redirect_no_oauth_setup(query.redirect.clone())), + (None, Some(client)) => return Ok(redirect_to_oidc(query, client)), + (Some(session_key), client) => (session_key, client), + }; + + match session_key { + // We can exchange basic auth for session cookie + SessionKey::BasicAuth { username, password } => match Users.get_user(&username) { + Some( + ref user @ User { + ty: UserType::Native(ref basic), + .. + }, + ) if basic.verify_password(&password) => { + let user_cookie = cookie_username(&username); + let session_cookie = + exchange_basic_for_cookie(user, SessionKey::BasicAuth { username, password }); + Ok(redirect_to_client( + query.redirect.as_str(), + [user_cookie, session_cookie], + )) + } + _ => Err(OIDCError::BadRequest), + }, + // if it's a valid active session, just redirect back + key @ SessionKey::SessionId(_) => { + let resp = if Users.session_exists(&key) { + redirect_to_client(query.redirect.as_str(), None) + } else { + Users.remove_session(&key); + if let Some(oidc_client) = oidc_client { + redirect_to_oidc(query, oidc_client) + } else { + redirect_to_client(query.redirect.as_str(), None) + } + }; + Ok(resp) + } + } +} + +pub async fn logout(req: HttpRequest, query: web::Query) -> HttpResponse { + let oidc_client = req.app_data::>(); + let Some(session) = extract_session_key_from_req(&req).ok() else { + return redirect_to_client(query.redirect.as_str(), None); + }; + let user = Users.remove_session(&session); + let logout_endpoint = + oidc_client.and_then(|client| client.config().end_session_endpoint.clone()); + + match (user, logout_endpoint) { + (Some(username), Some(logout_endpoint)) + if Users.is_oauth(&username).unwrap_or_default() => + { + redirect_to_oidc_logout(logout_endpoint, &query.redirect) + } + _ => redirect_to_client(query.redirect.as_str(), None), + } +} + +/// Handler for code callback +/// User should be redirected to page they were trying to access with cookie +pub async fn reply_login( + oidc_client: Data, + login_query: web::Query, +) -> Result { + let oidc_client = Data::into_inner(oidc_client); + let Ok((mut claims, user_info)): Result<(Claims, Userinfo), anyhow::Error> = + request_token(oidc_client, &login_query).await + else { + return Ok(HttpResponse::Unauthorized().finish()); + }; + let username = user_info.sub.unwrap(); + let group: Option> = claims + .other + .remove("groups") + .map(serde_json::from_value) + .transpose()?; + + // User may not exist + // create a new one depending on state of metadata + let user = match (Users.get_user(&username), group) { + (Some(user), Some(group)) => update_user_if_changed(user, group).await?, + (Some(user), None) => user, + (None, group) => put_user(&username, group).await?, + }; + let id = Ulid::new(); + Users.new_session(&user, SessionKey::SessionId(id)); + + let redirect_url = login_query + .state + .clone() + .unwrap_or_else(|| CONFIG.parseable.address.to_string()); + + Ok(redirect_to_client( + &redirect_url, + [cookie_session(id), cookie_username(&username)], + )) +} + +fn exchange_basic_for_cookie(user: &User, key: SessionKey) -> Cookie<'static> { + let id = Ulid::new(); + Users.remove_session(&key); + Users.new_session(user, SessionKey::SessionId(id)); + cookie_session(id) +} + +fn redirect_to_oidc( + query: web::Query, + oidc_client: &DiscoveredClient, +) -> HttpResponse { + let redirect = query.into_inner().redirect.to_string(); + let auth_url = oidc_client.auth_url(&Options { + scope: Some(SCOPE.into()), + state: Some(redirect), + ..Default::default() + }); + let url: String = auth_url.into(); + HttpResponse::TemporaryRedirect() + .insert_header((header::LOCATION, url)) + .finish() +} + +fn redirect_to_oidc_logout(mut logout_endpoint: Url, redirect: &Url) -> HttpResponse { + logout_endpoint.set_query(Some(&format!("post_logout_redirect_uri={}", redirect))); + HttpResponse::TemporaryRedirect() + .insert_header((header::CACHE_CONTROL, "no-store")) + .insert_header((header::LOCATION, logout_endpoint.to_string())) + .finish() +} + +fn redirect_to_client( + url: &str, + cookies: impl IntoIterator>, +) -> HttpResponse { + let mut response = HttpResponse::MovedPermanently(); + response.insert_header((header::LOCATION, url)); + for cookie in cookies { + response.cookie(cookie); + } + response.insert_header((header::CACHE_CONTROL, "no-store")); + response.finish() +} + +fn redirect_no_oauth_setup(mut url: Url) -> HttpResponse { + url.set_path("oidc-not-configured"); + let mut response = HttpResponse::MovedPermanently(); + response.insert_header((header::LOCATION, url.as_str())); + response.insert_header((header::CACHE_CONTROL, "no-store")); + response.finish() +} + +fn cookie_session(id: Ulid) -> Cookie<'static> { + let authorization_cookie = Cookie::build("session", id.to_string()) + .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) + .same_site(SameSite::Strict) + .path("/") + .finish(); + authorization_cookie +} + +fn cookie_username(username: &str) -> Cookie<'static> { + let authorization_cookie = Cookie::build("username", username.to_string()) + .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) + .same_site(SameSite::Strict) + .path("/") + .finish(); + authorization_cookie +} + +async fn request_token( + oidc_client: Arc, + login_query: &Login, +) -> anyhow::Result<(Claims, Userinfo)> { + let mut token: Token = oidc_client.request_token(&login_query.code).await?.into(); + let Some(id_token) = token.id_token.as_mut() else { + return Err(anyhow::anyhow!("No id_token provided")); + }; + + oidc_client.decode_token(id_token)?; + oidc_client.validate_token(id_token, None, None)?; + let claims = id_token.payload().expect("payload is decoded").clone(); + + let userinfo = oidc_client.request_userinfo(&token).await?; + Ok((claims, userinfo)) +} + +// put new user in metadata if does not exits +// update local cache +async fn put_user( + username: &str, + group: Option>, +) -> Result { + let mut metadata = get_metadata().await?; + let user = match metadata + .users + .iter() + .find(|user| user.username() == username) + { + Some(user) => user.clone(), + None => { + let mut user = User::new_oauth(username.to_owned()); + if let Some(group) = group { + user.roles = group + } + metadata.users.push(user.clone()); + put_metadata(&metadata).await?; + user + } + }; + Users.put_user(user.clone()); + Ok(user) +} + +async fn update_user_if_changed( + mut user: User, + group: HashSet, +) -> Result { + // update user if roles have changed + if user.roles == group { + return Ok(user); + } + let metadata = get_metadata().await?; + user.roles = group; + put_metadata(&metadata).await?; + Users.put_user(user.clone()); + Ok(user) +} + +async fn get_metadata() -> Result { + let metadata = CONFIG + .storage() + .get_object_store() + .get_metadata() + .await? + .expect("metadata is initialized"); + Ok(metadata) +} + +async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + storage::put_remote_metadata(metadata).await?; + storage::put_staging_metadata(metadata)?; + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum OIDCError { + #[error("Failed to connect to storage: {0}")] + ObjectStorageError(#[from] ObjectStorageError), + #[error("{0}")] + Serde(#[from] serde_json::Error), + #[error("Bad Request")] + BadRequest, +} + +impl actix_web::ResponseError for OIDCError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::BadRequest => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 3a1441399..b41120912 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -20,7 +20,6 @@ use actix_web::error::ErrorUnauthorized; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use actix_web_httpauth::extractors::basic::BasicAuth; use futures_util::Future; use http::StatusCode; use serde_json::Value; @@ -36,6 +35,7 @@ use crate::query::Query; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; +use crate::utils::actix::extract_session_key_from_req; pub async fn query( query: Query, @@ -75,13 +75,8 @@ impl FromRequest for Query { type Future = Pin>>>; fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let creds = BasicAuth::extract(req) - .into_inner() - .expect("expects basic auth"); - // Extract username and password from the request using basic auth extractor. - let username = creds.user_id().trim().to_owned(); - let password = creds.password().unwrap_or("").trim().to_owned(); - let permissions = Users.get_permissions(username, password); + let creds = extract_session_key_from_req(req).expect("expects basic auth"); + let permissions = Users.get_permissions(&creds); let json = Json::::from_request(req, payload); let fut = async move { diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 52075443e..09fc94fe7 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -16,15 +16,11 @@ * */ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use crate::{ option::CONFIG, - rbac::{ - role::model::DefaultPrivilege, - user::{PassCode, User}, - Users, - }, + rbac::{map::roles, role::model::DefaultPrivilege, user, Users}, storage::{self, ObjectStorageError, StorageMetadata}, validator::{self, error::UsernameValidationError}, }; @@ -35,10 +31,30 @@ use tokio::sync::Mutex; // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); +#[derive(serde::Serialize)] +struct User { + id: String, + method: String, +} + +impl From<&user::User> for User { + fn from(user: &user::User) -> Self { + let method = match user.ty { + user::UserType::Native(_) => "native".to_string(), + user::UserType::OAuth(_) => "oauth".to_string(), + }; + + User { + id: user.username().to_owned(), + method, + } + } +} + // Handler for GET /api/v1/user // returns list of all registerd users pub async fn list_users() -> impl Responder { - web::Json(Users.list_users()) + web::Json(Users.collect_user::()) } // Handler for POST /api/v1/user/{username} @@ -48,24 +64,32 @@ pub async fn post_user( body: Option>, ) -> Result { let username = username.into_inner(); + let roles: Option> = body + .map(|body| serde_json::from_value(body.into_inner())) + .transpose()?; + validator::user_name(&username)?; let _ = UPDATE_LOCK.lock().await; if Users.contains(&username) { return Err(RBACError::UserExists); } let mut metadata = get_metadata().await?; - if metadata.users.iter().any(|user| user.username == username) { + if metadata + .users + .iter() + .any(|user| user.username() == username) + { // should be unreachable given state is always consistent return Err(RBACError::UserExists); } - let (user, password) = User::create_new(username.clone()); + let (user, password) = user::User::new_basic(username.clone()); metadata.users.push(user.clone()); put_metadata(&metadata).await?; // set this user to user map Users.put_user(user); - if let Some(body) = body { - put_role(web::Path::::from(username), body).await?; + if let Some(roles) = roles { + put_role(web::Path::::from(username), web::Json(roles)).await?; } Ok(password) @@ -76,19 +100,19 @@ pub async fn post_user( pub async fn post_gen_password(username: web::Path) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; - if !Users.contains(&username) { - return Err(RBACError::UserDoesNotExist); - } - let PassCode { password, hash } = User::gen_new_password(); + let user::PassCode { password, hash } = user::Basic::gen_new_password(); let mut metadata = get_metadata().await?; if let Some(user) = metadata .users .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) .find(|user| user.username == username) { user.password_hash.clone_from(&hash); } else { - // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); } put_metadata(&metadata).await?; @@ -102,8 +126,17 @@ pub async fn get_role(username: web::Path) -> Result> = Users + .get_role(&username) + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + + Ok(web::Json(res)) } // Handler for DELETE /api/v1/user/delete/{username} @@ -116,22 +149,21 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user // Put roles for given user pub async fn put_role( username: web::Path, - role: web::Json, + role: web::Json>, ) -> Result { let username = username.into_inner(); let role = role.into_inner(); - let role: HashSet = serde_json::from_value(role)?; - let role = role.into_iter().collect(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); @@ -141,9 +173,9 @@ pub async fn put_role( if let Some(user) = metadata .users .iter_mut() - .find(|user| user.username == username) + .find(|user| user.username() == username) { - user.role.clone_from(&role); + user.roles.clone_from(&role); } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); diff --git a/server/src/handlers/http/role.rs b/server/src/handlers/http/role.rs new file mode 100644 index 000000000..9052e2ec4 --- /dev/null +++ b/server/src/handlers/http/role.rs @@ -0,0 +1,111 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{http::header::ContentType, web, HttpResponse, Responder}; +use http::StatusCode; + +use crate::{ + option::CONFIG, + rbac::{map::mut_roles, role::model::DefaultPrivilege}, + storage::{self, ObjectStorageError, StorageMetadata}, +}; + +// Handler for PUT /api/v1/role/{name} +// Creates a new role or update existing one +pub async fn put( + name: web::Path, + body: web::Json>, +) -> Result { + let name = name.into_inner(); + let privileges = body.into_inner(); + let mut metadata = get_metadata().await?; + metadata.roles.insert(name.clone(), privileges.clone()); + put_metadata(&metadata).await?; + mut_roles().insert(name, privileges); + Ok(HttpResponse::Ok().finish()) +} + +// Handler for GET /api/v1/role/{name} +// Fetch role by name +pub async fn get(name: web::Path) -> Result { + let name = name.into_inner(); + let metadata = get_metadata().await?; + let privileges = metadata.roles.get(&name).cloned().unwrap_or_default(); + Ok(web::Json(privileges)) +} + +// Handler for GET /api/v1/role +// Fetch all roles in the system +pub async fn list() -> Result { + let metadata = get_metadata().await?; + let roles: Vec = metadata.roles.keys().cloned().collect(); + Ok(web::Json(roles)) +} + +// Handler for DELETE /api/v1/role/{username} +// Delete existing role +pub async fn delete(name: web::Path) -> Result { + let name = name.into_inner(); + let mut metadata = get_metadata().await?; + if metadata.users.iter().any(|user| user.roles.contains(&name)) { + return Err(RoleError::RoleInUse); + } + metadata.roles.remove(&name); + put_metadata(&metadata).await?; + mut_roles().remove(&name); + Ok(HttpResponse::Ok().finish()) +} + +async fn get_metadata() -> Result { + let metadata = CONFIG + .storage() + .get_object_store() + .get_metadata() + .await? + .expect("metadata is initialized"); + Ok(metadata) +} + +async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + storage::put_remote_metadata(metadata).await?; + storage::put_staging_metadata(metadata)?; + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum RoleError { + #[error("Failed to connect to storage: {0}")] + ObjectStorageError(#[from] ObjectStorageError), + #[error("Cannot perform this operation as role is assigned to an existing user.")] + RoleInUse, +} + +impl actix_web::ResponseError for RoleError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::RoleInUse => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 75ab8287a..2b35e7d6c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -39,6 +39,7 @@ mod handlers; mod metadata; mod metrics; mod migration; +mod oidc; mod option; mod query; mod rbac; @@ -64,7 +65,7 @@ async fn main() -> anyhow::Result<()> { migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; banner::print(&CONFIG, &metadata).await; - rbac::map::init_auth_maps(metadata.users.clone()); + rbac::map::init(metadata.users.clone(), metadata.roles.clone()); metadata.set_global(); let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); @@ -98,7 +99,7 @@ async fn main() -> anyhow::Result<()> { analytics::init_analytics_scheduler().await; } - let app = handlers::http::run_http(prometheus); + let app = handlers::http::run_http(prometheus, CONFIG.parseable.openid.clone()); tokio::pin!(app); loop { tokio::select! { diff --git a/server/src/migration.rs b/server/src/migration.rs index 649371836..ff3cbb61a 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -45,16 +45,30 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { } if let Some(storage_metadata) = storage_metadata { - if get_version(&storage_metadata) == Some("v1") { - let metadata = metadata_migration::v1_v2(storage_metadata); - put_remote_metadata(&*object_store, &metadata).await?; + match get_version(&storage_metadata) { + Some("v1") => { + let metadata = metadata_migration::v1_v3(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + Some("v2") => { + let metadata = metadata_migration::v2_v3(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + _ => (), } } if let Some(staging_metadata) = staging_metadata { - if get_version(&staging_metadata) == Some("v1") { - let metadata = metadata_migration::v1_v2(staging_metadata); - put_staging_metadata(config, &metadata)?; + match get_version(&staging_metadata) { + Some("v1") => { + let metadata = metadata_migration::v1_v3(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v2") => { + let metadata = metadata_migration::v2_v3(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + _ => (), } } diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index b949ab4d2..62368d926 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -16,16 +16,54 @@ * */ -use std::vec; +use rand::distributions::DistString; +use serde_json::{Map, Value}; -use serde_json::Value; - -pub fn v1_v2(mut storage_metadata: serde_json::Value) -> Value { +pub fn v1_v3(mut storage_metadata: serde_json::Value) -> Value { let metadata = storage_metadata.as_object_mut().unwrap(); - *metadata.get_mut("version").unwrap() = Value::String("v2".to_string()); + *metadata.get_mut("version").unwrap() = Value::String("v3".to_string()); metadata.remove("user"); metadata.remove("stream"); metadata.insert("users".to_string(), Value::Array(vec![])); metadata.insert("streams".to_string(), Value::Array(vec![])); + metadata.insert("roles".to_string(), Value::Array(vec![])); + storage_metadata +} + +pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value { + let metadata = storage_metadata.as_object_mut().unwrap(); + *metadata.get_mut("version").unwrap() = Value::String("v3".to_string()); + let users = metadata + .get_mut("users") + .expect("users field is present") + .as_array_mut() + .unwrap(); + + // role names - role value + let mut privileges_map = Vec::new(); + + for user in users { + // user is an object + let user = user.as_object_mut().unwrap(); + // take out privileges + let Value::Array(privileges) = user.remove("role").expect("role exists for v2") else { + panic!("privileges is an arrray") + }; + + let mut roles = Vec::new(); + + if !privileges.is_empty() { + let role_name = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8); + privileges_map.push((role_name.clone(), Value::Array(privileges))); + roles.push(Value::from(role_name)); + } + user.insert("roles".to_string(), roles.into()); + } + + metadata.insert( + "roles".to_string(), + Value::Object(Map::from_iter(privileges_map)), + ); storage_metadata } diff --git a/server/src/oidc.rs b/server/src/oidc.rs new file mode 100644 index 000000000..6f58286fb --- /dev/null +++ b/server/src/oidc.rs @@ -0,0 +1,84 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashMap; + +use openid::{Client, CompactJson, CustomClaims, Discovered, StandardClaims}; +use url::Url; + +pub type DiscoveredClient = Client; + +// If domain is not configured then +// we can assume running in a development mode or private environment +#[derive(Debug, Clone)] +pub enum Origin { + // socket address + Local { socket_addr: String, https: bool }, + // domain url + Production(Url), +} + +/// Configuration for OpenID Connect +#[derive(Debug, Clone)] +pub struct OpenidConfig { + /// Client id + pub id: String, + /// Client Secret + pub secret: String, + /// OP host address over which discovery can be done + pub issuer: Url, + /// Current client host address which will be used for redirects + pub origin: Origin, +} + +impl OpenidConfig { + /// Create a new oidc client from server configuration. + /// redirect_suffix + pub async fn connect( + self, + redirect_to: &str, + ) -> Result { + let redirect_uri = match self.origin { + Origin::Local { socket_addr, https } => { + let protocol = if https { "https" } else { "http" }; + url::Url::parse(&format!("{protocol}://{socket_addr}")).expect("valid url") + } + Origin::Production(url) => url, + }; + + let redirect_uri = redirect_uri.join(redirect_to).expect("valid suffix"); + DiscoveredClient::discover(self.id, self.secret, redirect_uri.to_string(), self.issuer) + .await + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct Claims { + #[serde(flatten)] + pub standard: StandardClaims, + #[serde(flatten)] + pub other: HashMap, +} + +impl CustomClaims for Claims { + fn standard_claims(&self) -> &StandardClaims { + &self.standard + } +} + +impl CompactJson for Claims {} diff --git a/server/src/option.rs b/server/src/option.rs index 610e0c209..5bc967f89 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -17,13 +17,15 @@ */ use clap::error::ErrorKind; -use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; +use clap::{command, value_parser, Arg, ArgGroup, Args, Command, FromArgMatches}; use once_cell::sync::Lazy; use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use url::Url; +use crate::oidc::{self, OpenidConfig}; use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; @@ -164,6 +166,10 @@ pub struct Server { /// The address on which the http server will listen. pub address: String, + /// Base domain under which server is hosted. + /// This information is used by OIDC to refer redirects + pub domain_address: Option, + /// The local staging path is used as a temporary landing point /// for incoming events and local cache pub local_staging_path: PathBuf, @@ -178,6 +184,9 @@ pub struct Server { /// Password for the basic authentication on the server pub password: String, + /// OpenId configuration + pub openid: Option, + /// Server should check for update or not pub check_update: bool, @@ -207,6 +216,11 @@ impl FromArgMatches for Server { fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); + self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); + let openid_client_id = m.get_one::(Self::OPENID_CLIENT_ID).cloned(); + let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); + let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); + self.address = m .get_one::(Self::ADDRESS) .cloned() @@ -260,6 +274,26 @@ impl FromArgMatches for Server { _ => unreachable!(), }; + self.openid = match (openid_client_id, openid_client_secret, openid_issuer) { + (Some(id), Some(secret), Some(issuer)) => { + let origin = if let Some(url) = self.domain_address.clone() { + oidc::Origin::Production(url) + } else { + oidc::Origin::Local { + socket_addr: self.address.clone(), + https: self.tls_cert_path.is_some() && self.tls_key_path.is_some(), + } + }; + Some(OpenidConfig { + id, + secret, + issuer, + origin, + }) + } + _ => None, + }; + Ok(()) } } @@ -269,6 +303,7 @@ impl Server { pub const TLS_CERT: &str = "tls-cert-path"; pub const TLS_KEY: &str = "tls-key-path"; pub const ADDRESS: &str = "address"; + pub const DOMAIN_URI: &str = "origin"; pub const STAGING: &str = "local-staging-path"; pub const UPLOAD_INTERVAL: &str = "upload-interval"; pub const USERNAME: &str = "username"; @@ -276,6 +311,10 @@ impl Server { pub const CHECK_UPDATE: &str = "check-update"; pub const SEND_ANALYTICS: &str = "send-analytics"; pub const OPEN_AI_KEY: &str = "open-ai-key"; + pub const OPENID_CLIENT_ID: &str = "oidc-client"; + pub const OPENID_CLIENT_SECRET: &str = "oidc-client-secret"; + pub const OPENID_ISSUER: &str = "oidc-issuer"; + // todo : what should this flag be pub const QUERY_MEM_POOL_SIZE: &str = "query-mempool-size"; pub const ROW_GROUP_SIZE: &str = "row-group-size"; pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo"; @@ -356,6 +395,16 @@ impl Server { .required(true) .help("Password for the basic authentication on the server"), ) + .arg( + Arg::new(Self::CHECK_UPDATE) + .long(Self::CHECK_UPDATE) + .env("P_CHECK_UPDATE") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Disable/Enable checking for updates"), + ) .arg( Arg::new(Self::SEND_ANALYTICS) .long(Self::SEND_ANALYTICS) @@ -375,14 +424,38 @@ impl Server { .help("Set OpenAI key to enable llm feature"), ) .arg( - Arg::new(Self::CHECK_UPDATE) - .long(Self::CHECK_UPDATE) - .env("P_CHECK_UPDATE") - .value_name("BOOL") + Arg::new(Self::OPENID_CLIENT_ID) + .long(Self::OPENID_CLIENT_ID) + .env("P_OIDC_CLIENT_ID") + .value_name("STRING") .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Disable/Enable checking for updates"), + .help("Set client id for oidc provider"), + ) + .arg( + Arg::new(Self::OPENID_CLIENT_SECRET) + .long(Self::OPENID_CLIENT_SECRET) + .env("P_OIDC_CLIENT_SECRET") + .value_name("STRING") + .required(false) + .help("Set client secret for oidc provider"), + ) + .arg( + Arg::new(Self::OPENID_ISSUER) + .long(Self::OPENID_ISSUER) + .env("P_OIDC_ISSUER") + .value_name("URl") + .required(false) + .value_parser(validation::url) + .help("Set OIDC provider's host address."), + ) + .arg( + Arg::new(Self::DOMAIN_URI) + .long(Self::DOMAIN_URI) + .env("P_ORIGIN_URI") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("Set host global domain address"), ) .arg( Arg::new(Self::QUERY_MEM_POOL_SIZE) @@ -419,7 +492,12 @@ impl Server { "lz4", "zstd"]) .help("Parquet compression algorithm"), - ) + ).group( + ArgGroup::new("oidc") + .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .multiple(true) + ) } } @@ -488,4 +566,8 @@ pub mod validation { .then_some(s.to_string()) .ok_or_else(|| "Socket Address for server is invalid".to_string()) } + + pub fn url(s: &str) -> Result { + url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) + } } diff --git a/server/src/rbac.rs b/server/src/rbac.rs index 05f179299..34c6fab62 100644 --- a/server/src/rbac.rs +++ b/server/src/rbac.rs @@ -20,11 +20,24 @@ pub mod map; pub mod role; pub mod user; -use crate::rbac::map::{auth_map, mut_auth_map, mut_user_map, user_map}; -use crate::rbac::role::{model::DefaultPrivilege, Action}; +use std::collections::HashSet; + +use chrono::{DateTime, Days, Utc}; +use itertools::Itertools; + +use crate::rbac::map::{mut_sessions, mut_users, sessions, users}; +use crate::rbac::role::Action; use crate::rbac::user::User; -use self::role::Permission; +use self::map::SessionKey; +use self::role::{Permission, RoleBuilder}; +use self::user::UserType; + +pub enum Response { + Authorized, + UnAuthorized, + ReloadRequired, +} // This type encapsulates both the user_map and auth_map // so other entities deal with only this type @@ -32,79 +45,141 @@ pub struct Users; impl Users { pub fn put_user(&self, user: User) { - mut_auth_map().remove(&user.username); - mut_user_map().insert(user); + mut_sessions().remove_user(user.username()); + mut_users().insert(user); + } + + pub fn get_user(&self, username: &str) -> Option { + users().get(username).cloned() + } + + pub fn is_oauth(&self, username: &str) -> Option { + users().get(username).map(|user| user.is_oauth()) } - pub fn list_users(&self) -> Vec { - user_map().keys().cloned().collect() + pub fn collect_user From<&'a User> + 'static>(&self) -> Vec { + users().values().map(|user| user.into()).collect_vec() } - pub fn get_role(&self, username: &str) -> Vec { - user_map() + pub fn get_role(&self, username: &str) -> Vec { + users() .get(username) - .map(|user| user.role.clone()) + .map(|user| user.roles.iter().cloned().collect()) .unwrap_or_default() } pub fn delete_user(&self, username: &str) { - mut_user_map().remove(username); - mut_auth_map().remove(username); + mut_users().remove(username); + mut_sessions().remove_user(username); } + // caller ensures that this operation is valid for the user pub fn change_password_hash(&self, username: &str, hash: &String) { - if let Some(user) = mut_user_map().get_mut(username) { - user.password_hash.clone_from(hash) + if let Some(User { + ty: UserType::Native(user), + .. + }) = mut_users().get_mut(username) + { + user.password_hash.clone_from(hash); + mut_sessions().remove_user(username); }; - mut_auth_map().remove(username); } - pub fn put_role(&self, username: &str, roles: Vec) { - if let Some(user) = mut_user_map().get_mut(username) { - user.role = roles; - mut_auth_map().remove(username) + pub fn put_role(&self, username: &str, roles: HashSet) { + if let Some(user) = mut_users().get_mut(username) { + user.roles = roles; + mut_sessions().remove_user(username) }; } pub fn contains(&self, username: &str) -> bool { - user_map().contains_key(username) + users().contains_key(username) + } + + pub fn get_permissions(&self, session: &SessionKey) -> Vec { + sessions().get(session).cloned().unwrap_or_default() } - pub fn get_permissions(&self, username: String, password: String) -> Vec { - let key = (username, password); - auth_map().get(&key).cloned().unwrap_or_default() + pub fn session_exists(&self, session: &SessionKey) -> bool { + sessions().get(session).is_some() } - pub fn authenticate( + pub fn remove_session(&self, session: &SessionKey) -> Option { + mut_sessions().remove_session(session) + } + + pub fn new_session(&self, user: &User, session: SessionKey) { + mut_sessions().track_new( + user.username().to_owned(), + session, + Utc::now() + Days::new(7), + roles_to_permission(user.roles()), + ) + } + + pub fn authorize( &self, - username: String, - password: String, + key: SessionKey, action: Action, context_stream: Option<&str>, context_user: Option<&str>, - ) -> bool { - let key = (username, password); + ) -> Response { // try fetch from auth map for faster auth flow - if let Some(res) = auth_map().check_auth(&key, action, context_stream, context_user) { - return res; + if let Some(res) = sessions().check_auth(&key, action, context_stream, context_user) { + return if res { + Response::Authorized + } else { + Response::UnAuthorized + }; } - // if not found in auth map, look into user map - let (username, password) = key; - if let Some(user) = user_map().get(&username) { + // attempt reloading permissions into new session for basic auth user + // id user will be reloaded only through login endpoint + let SessionKey::BasicAuth { username, password } = &key else { + return Response::ReloadRequired; + }; + if let Some( + user @ User { + ty: UserType::Native(basic_user), + .. + }, + ) = users().get(username) + { // if user exists and password matches // add this user to auth map - if user.verify_password(&password) { - let mut auth_map = mut_auth_map(); - auth_map.add_user(username.clone(), password.clone(), user.permissions()); - // verify from auth map and return - let key = (username, password); - return auth_map + if basic_user.verify_password(password) { + let mut sessions = mut_sessions(); + sessions.track_new( + username.clone(), + key.clone(), + DateTime::::MAX_UTC, + roles_to_permission(user.roles()), + ); + return if sessions .check_auth(&key, action, context_stream, context_user) - .expect("entry for this key just added"); + .expect("entry for this key just added") + { + Response::Authorized + } else { + Response::UnAuthorized + }; } } - false + Response::UnAuthorized + } +} + +fn roles_to_permission(roles: Vec) -> Vec { + let mut perms = HashSet::new(); + for role in &roles { + let role_map = &map::roles(); + let Some(privilege_list) = role_map.get(role) else { + continue; + }; + for privs in privilege_list { + perms.extend(RoleBuilder::from(privs).build()) + } } + perms.into_iter().collect() } diff --git a/server/src/rbac/map.rs b/server/src/rbac/map.rs index d309a63be..17fcc7531 100644 --- a/server/src/rbac/map.rs +++ b/server/src/rbac/map.rs @@ -21,41 +21,61 @@ use crate::rbac::user::User; use std::collections::HashMap; use super::{ - role::{Action, Permission}, + role::{model::DefaultPrivilege, Action, Permission, RoleBuilder}, user, }; +use chrono::{DateTime, Utc}; use once_cell::sync::OnceCell; use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -pub static USER_MAP: OnceCell> = OnceCell::new(); -pub static AUTH_MAP: OnceCell> = OnceCell::new(); +pub type Roles = HashMap>; -pub fn user_map() -> RwLockReadGuard<'static, UserMap> { - USER_MAP +pub static USERS: OnceCell> = OnceCell::new(); +pub static ROLES: OnceCell> = OnceCell::new(); +pub static SESSIONS: OnceCell> = OnceCell::new(); + +pub fn users() -> RwLockReadGuard<'static, Users> { + USERS + .get() + .expect("map is set") + .read() + .expect("not poisoned") +} + +pub fn mut_users() -> RwLockWriteGuard<'static, Users> { + USERS + .get() + .expect("map is set") + .write() + .expect("not poisoned") +} + +pub fn roles() -> RwLockReadGuard<'static, Roles> { + ROLES .get() .expect("map is set") .read() .expect("not poisoned") } -pub fn mut_user_map() -> RwLockWriteGuard<'static, UserMap> { - USER_MAP +pub fn mut_roles() -> RwLockWriteGuard<'static, Roles> { + ROLES .get() .expect("map is set") .write() .expect("not poisoned") } -pub fn auth_map() -> RwLockReadGuard<'static, AuthMap> { - AUTH_MAP +pub fn sessions() -> RwLockReadGuard<'static, Sessions> { + SESSIONS .get() .expect("map is set") .read() .expect("not poisoned") } -pub fn mut_auth_map() -> RwLockWriteGuard<'static, AuthMap> { - AUTH_MAP +pub fn mut_sessions() -> RwLockWriteGuard<'static, Sessions> { + SESSIONS .get() .expect("map is set") .write() @@ -66,61 +86,117 @@ pub fn mut_auth_map() -> RwLockWriteGuard<'static, AuthMap> { // the user_map is initialized from the config file and has a list of all users // the auth_map is initialized with admin user only and then gets lazily populated // as users authenticate -pub fn init_auth_maps(users: Vec) { - let mut user_map = UserMap::from(users); - let mut auth_map = AuthMap::default(); +pub fn init(users: Vec, mut roles: Roles) { + let admin_privilege = DefaultPrivilege::Admin; + let admin_permissions = RoleBuilder::from(&admin_privilege).build(); + roles.insert("admin".to_string(), vec![admin_privilege]); + + let mut users = Users::from(users); let admin = user::get_admin_user(); - let admin_permissions = admin.permissions(); - user_map.insert(admin); - auth_map.add_user( - CONFIG.parseable.username.clone(), - CONFIG.parseable.password.clone(), + let admin_username = admin.username().to_owned(); + users.insert(admin); + + let mut sessions = Sessions::default(); + sessions.track_new( + admin_username, + SessionKey::BasicAuth { + username: CONFIG.parseable.username.clone(), + password: CONFIG.parseable.password.clone(), + }, + chrono::DateTime::::MAX_UTC, admin_permissions, ); - USER_MAP - .set(RwLock::new(user_map)) - .expect("map is only set once"); - AUTH_MAP - .set(RwLock::new(auth_map)) + ROLES.set(RwLock::new(roles)).expect("map is only set once"); + USERS.set(RwLock::new(users)).expect("map is only set once"); + SESSIONS + .set(RwLock::new(sessions)) .expect("map is only set once"); } -// AuthMap is a map of [(username, password) --> permissions] -// This map is populated lazily as users send auth requests. -// First auth request for a user will populate the map with -// the user info (password and permissions) and subsequent -// requests will check the map for the user. -// If user is present in the map then we use this map for both -// authentication and authorization. +// A session is loosly active mapping to permissions +// this is lazily initialized and +// cleanup of unused session is done when a new session is added +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum SessionKey { + BasicAuth { username: String, password: String }, + SessionId(ulid::Ulid), +} + #[derive(Debug, Default)] -pub struct AuthMap { - inner: HashMap<(String, String), Vec>, +pub struct Sessions { + // map session key to user and their permission + active_sessions: HashMap)>, + // map user to one or more session + // this tracks session based on session id. Not basic auth + // Ulid time contains expiration datetime + user_sessions: HashMap)>>, } -impl AuthMap { - pub fn add_user(&mut self, username: String, password: String, permissions: Vec) { - self.inner.insert((username, password), permissions); +impl Sessions { + // track new session key + pub fn track_new( + &mut self, + user: String, + key: SessionKey, + expiry: DateTime, + permissions: Vec, + ) { + self.remove_expired_session(&user); + self.user_sessions + .entry(user.clone()) + .and_modify(|sessions| sessions.push((key.clone(), expiry))) + .or_default(); + self.active_sessions.insert(key, (user, permissions)); + } + + // remove a specific session + pub fn remove_session(&mut self, key: &SessionKey) -> Option { + let Some((user, _)) = self.active_sessions.remove(key) else { + return None; + }; + + if let Some(items) = self.user_sessions.get_mut(&user) { + items.retain(|(session, _)| session != key); + Some(user) + } else { + None + } + } + + // remove sessions related to a user + pub fn remove_user(&mut self, username: &str) { + let sessions = self.user_sessions.remove(username); + if let Some(sessions) = sessions { + sessions.into_iter().for_each(|(key, _)| { + self.active_sessions.remove(&key); + }) + } } - pub fn remove(&mut self, username: &str) { - self.inner.retain(|(x, _), _| x != username) + fn remove_expired_session(&mut self, user: &str) { + let now = Utc::now(); + let Some(sessions) = self.user_sessions.get_mut(user) else { + return; + }; + sessions.retain(|(_, expiry)| expiry < &now); } - pub fn get(&self, key: &(String, String)) -> Option<&Vec> { - self.inner.get(key) + // get permission related to this session + pub fn get(&self, key: &SessionKey) -> Option<&Vec> { + self.active_sessions.get(key).map(|(_, perms)| perms) } // returns None if user is not in the map // Otherwise returns Some(is_authenticated) pub fn check_auth( &self, - key: &(String, String), + key: &SessionKey, required_action: Action, context_stream: Option<&str>, context_user: Option<&str>, ) -> Option { - self.inner.get(key).map(|perms| { + self.active_sessions.get(key).map(|(username, perms)| { perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize @@ -135,10 +211,10 @@ impl AuthMap { }; (action == required_action || action == Action::All) && ok_stream } - Permission::SelfRole => { - context_user.map(|x| x == key.0).unwrap_or_default() - && required_action == Action::GetRole + Permission::SelfRole if required_action == Action::GetUserRoles => { + context_user.map(|x| x == username).unwrap_or_default() } + _ => false, } }) }) @@ -148,18 +224,22 @@ impl AuthMap { // UserMap is a map of [username --> User] // This map is populated at startup with the list of users from parseable.json file #[derive(Debug, Default, Clone, derive_more::Deref, derive_more::DerefMut)] -pub struct UserMap(HashMap); +pub struct Users(HashMap); -impl UserMap { +impl Users { pub fn insert(&mut self, user: User) { - self.0.insert(user.username.clone(), user); + self.0.insert(user.username().to_owned(), user); } } -impl From> for UserMap { +impl From> for Users { fn from(users: Vec) -> Self { let mut map = Self::default(); - map.extend(users.into_iter().map(|user| (user.username.clone(), user))); + map.extend( + users + .into_iter() + .map(|user| (user.username().to_owned(), user)), + ); map } } diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 5420b0f7f..62033238d 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -34,8 +34,12 @@ pub enum Action { PutUser, ListUser, DeleteUser, - PutRoles, + PutUserRoles, + GetUserRoles, + PutRole, GetRole, + DeleteRole, + ListRole, GetAbout, QueryLLM, All, @@ -90,11 +94,15 @@ impl RoleBuilder { Action::GetAlert => Permission::Stream(action, self.stream.clone().unwrap()), Action::PutUser => Permission::Unit(action), Action::ListUser => Permission::Unit(action), - Action::PutRoles => Permission::Unit(action), - Action::GetRole => Permission::Unit(action), + Action::PutUserRoles => Permission::Unit(action), + Action::GetUserRoles => Permission::Unit(action), Action::DeleteUser => Permission::Unit(action), Action::GetAbout => Permission::Unit(action), Action::QueryLLM => Permission::Unit(action), + Action::PutRole => Permission::Unit(action), + Action::GetRole => Permission::Unit(action), + Action::DeleteRole => Permission::Unit(action), + Action::ListRole => Permission::Unit(action), Action::All => Permission::Stream(action, self.stream.clone().unwrap()), }; perms.push(perm); diff --git a/server/src/rbac/user.rs b/server/src/rbac/user.rs index a0a83e4a0..c799bef75 100644 --- a/server/src/rbac/user.rs +++ b/server/src/rbac/user.rs @@ -27,32 +27,71 @@ use rand::distributions::{Alphanumeric, DistString}; use crate::option::CONFIG; -use crate::rbac::role::{model::DefaultPrivilege, Permission, RoleBuilder}; +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(untagged)] +pub enum UserType { + Native(Basic), + OAuth(OAuth), +} -// Represents a User in the system -// can be the root admin user (set with env vars at startup / restart) -// or user(s) created by the root user #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct User { - pub username: String, - pub password_hash: String, - pub role: Vec, + #[serde(flatten)] + pub ty: UserType, + pub roles: HashSet, } impl User { // create a new User and return self with password generated for said user. - pub fn create_new(username: String) -> (Self, String) { - let PassCode { password, hash } = Self::gen_new_password(); + pub fn new_basic(username: String) -> (Self, String) { + let PassCode { password, hash } = Basic::gen_new_password(); ( Self { - username, - password_hash: hash, - role: Vec::new(), + ty: UserType::Native(Basic { + username, + password_hash: hash, + }), + roles: HashSet::new(), }, password, ) } + pub fn new_oauth(username: String) -> Self { + Self { + ty: UserType::OAuth(OAuth { userid: username }), + roles: HashSet::new(), + } + } + + pub fn username(&self) -> &str { + match self.ty { + UserType::Native(Basic { ref username, .. }) => username, + UserType::OAuth(OAuth { + userid: ref username, + }) => username, + } + } + + pub fn is_oauth(&self) -> bool { + matches!(self.ty, UserType::OAuth(_)) + } + + pub fn roles(&self) -> Vec { + self.roles.iter().cloned().collect() + } +} + +// Represents a User in the system +// can be the root admin user (set with env vars at startup / restart) +// or user(s) created by the root user +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Basic { + pub username: String, + pub password_hash: String, +} + +impl Basic { // generate a new password pub fn gen_new_password() -> PassCode { let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); @@ -60,15 +99,6 @@ impl User { PassCode { password, hash } } - pub fn permissions(&self) -> Vec { - let perms: HashSet = self - .role - .iter() - .flat_map(|role| RoleBuilder::from(role).build()) - .collect(); - perms.into_iter().collect() - } - pub fn verify_password(&self, password: &str) -> bool { verify(&self.password_hash, password) } @@ -108,8 +138,15 @@ pub fn get_admin_user() -> User { let hashcode = gen_hash(&password); User { - username, - password_hash: hashcode, - role: vec![DefaultPrivilege::Admin], + ty: UserType::Native(Basic { + username, + password_hash: hashcode, + }), + roles: ["admin".to_string()].into(), } } + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct OAuth { + userid: String, +} diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 562a4c209..7ad461fb9 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -17,6 +17,7 @@ */ use std::{ + collections::HashMap, fs::{self, create_dir_all, OpenOptions}, path::PathBuf, }; @@ -24,7 +25,12 @@ use std::{ use once_cell::sync::OnceCell; use std::io; -use crate::{option::CONFIG, rbac::user::User, storage::ObjectStorageError, utils::uid}; +use crate::{ + option::CONFIG, + rbac::{role::model::DefaultPrivilege, user::User}, + storage::ObjectStorageError, + utils::uid, +}; use super::object_storage::PARSEABLE_METADATA_FILE_NAME; @@ -49,18 +55,21 @@ pub struct StorageMetadata { pub deployment_id: uid::Uid, pub users: Vec, pub streams: Vec, + #[serde(default)] + pub roles: HashMap>, } impl StorageMetadata { pub fn new() -> Self { Self { - version: "v2".to_string(), + version: "v3".to_string(), mode: CONFIG.storage_name.to_owned(), staging: CONFIG.staging_dir().canonicalize().unwrap(), storage: CONFIG.storage().get_endpoint(), deployment_id: uid::gen(), users: Vec::new(), streams: Vec::new(), + roles: HashMap::default(), } } diff --git a/server/src/utils.rs b/server/src/utils.rs index bc8bbded6..2f04ef419 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -16,6 +16,7 @@ * */ +pub mod actix; pub mod arrow; pub mod header_parsing; pub mod json; diff --git a/server/src/utils/actix.rs b/server/src/utils/actix.rs new file mode 100644 index 000000000..71b070db7 --- /dev/null +++ b/server/src/utils/actix.rs @@ -0,0 +1,71 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + +use actix_web::{ + dev::ServiceRequest, + error::{ErrorUnauthorized, ErrorUnprocessableEntity}, + Error, FromRequest, HttpRequest, +}; +use actix_web_httpauth::extractors::basic::BasicAuth; + +use crate::rbac::map::SessionKey; + +pub fn extract_session_key(req: &mut ServiceRequest) -> Result { + // Extract username and password from the request using basic auth extractor. + let creds = req.extract::().into_inner(); + let basic = creds.map(|creds| { + let username = creds.user_id().trim().to_owned(); + // password is not mandatory by basic auth standard. + // If not provided then treat as empty string + let password = creds.password().unwrap_or("").trim().to_owned(); + SessionKey::BasicAuth { username, password } + }); + + if let Ok(basic) = basic { + Ok(basic) + } else if let Some(cookie) = req.cookie("session") { + let ulid = ulid::Ulid::from_string(cookie.value()) + .map_err(|_| ErrorUnprocessableEntity("Cookie is tampered with or invalid"))?; + Ok(SessionKey::SessionId(ulid)) + } else { + Err(ErrorUnauthorized("No authentication method supplied")) + } +} + +pub fn extract_session_key_from_req(req: &HttpRequest) -> Result { + // Extract username and password from the request using basic auth extractor. + let creds = BasicAuth::extract(req).into_inner(); + let basic = creds.map(|creds| { + let username = creds.user_id().trim().to_owned(); + // password is not mandatory by basic auth standard. + // If not provided then treat as empty string + let password = creds.password().unwrap_or("").trim().to_owned(); + SessionKey::BasicAuth { username, password } + }); + + if let Ok(basic) = basic { + Ok(basic) + } else if let Some(cookie) = req.cookie("session") { + let ulid = ulid::Ulid::from_string(cookie.value()) + .map_err(|_| ErrorUnprocessableEntity("Cookie is tampered with or invalid"))?; + Ok(SessionKey::SessionId(ulid)) + } else { + Err(ErrorUnauthorized("No authentication method supplied")) + } +}