Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 217 additions & 3 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
Expand All @@ -39,13 +40,15 @@ use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::de::Error;
use serde_json::error::Error as SerdeError;
use serde_json::Value as JsonValue;
use serde_json::{to_vec, Value as JsonValue};
use url::Url;
type IngestorMetadataArr = Vec<IngestorMetadata>;

use self::utils::StorageStats;

use super::base_path_without_preceding_slash;
use super::rbac::RBACError;
use std::collections::HashSet;
use std::time::Duration;

use super::modal::IngestorMetadata;
Expand Down Expand Up @@ -94,7 +97,7 @@ pub async fn sync_cache_with_ingestors(
Ok(())
}

// forward the request to all ingestors to keep them in sync
// forward the create/update stream request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
body: Bytes,
Expand Down Expand Up @@ -142,7 +145,218 @@ pub async fn sync_streams_with_ingestors(
log::error!(
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res
res.text().await
);
}
}

Ok(())
}

// forward the role update request to all ingestors to keep them in sync
pub async fn sync_users_with_roles_with_ingestors(
username: &String,
role: &HashSet<String>,
) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
let role = to_vec(&role.clone()).map_err(|err| {
log::error!("Fatal: failed to serialize role: {:?}", err);
RBACError::SerdeError(err)
})?;
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}/role",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.body(role.clone())
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

// forward the delete user request to all ingestors to keep them in sync
pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.delete(url)
.header(header::AUTHORIZATION, &ingestor.token)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

// forward the create user request to all ingestors to keep them in sync
pub async fn sync_user_creation_with_ingestors(
user: User,
role: &Option<HashSet<String>>,
) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;

let mut user = user.clone();

if let Some(role) = role {
user.roles.clone_from(role);
}
let username = user.username();
let client = reqwest::Client::new();

let user = to_vec(&user).map_err(|err| {
log::error!("Fatal: failed to serialize user: {:?}", err);
RBACError::SerdeError(err)
})?;

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.body(user.clone())
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

// forward the password reset request to all ingestors to keep them in sync
pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}/generate-new-password",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}
Expand Down
44 changes: 43 additions & 1 deletion server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
use crate::analytics;
use crate::banner;
use crate::handlers::airplane;
use crate::handlers::http;
use crate::handlers::http::health_check;
use crate::handlers::http::ingest;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::DisAllowRootUser;
use crate::handlers::http::middleware::RouteExt;
use crate::localcache::LocalCacheManager;
use crate::metrics;
Expand Down Expand Up @@ -181,6 +183,7 @@ impl IngestServer {
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Self::get_user_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_readiness_factory()),
)
Expand All @@ -197,7 +200,46 @@ impl IngestServer {
),
)
}

// get the user webscope
fn get_user_webscope() -> Scope {
web::scope("/user")
.service(
web::resource("/{username}")
// PUT /user/{username} => Create a new user
.route(
web::post()
.to(http::rbac::post_user)
.authorize(Action::PutUser),
)
// DELETE /user/{username} => Delete a user
.route(
web::delete()
.to(http::rbac::delete_user)
.authorize(Action::DeleteUser),
)
.wrap(DisAllowRootUser),
)
.service(
web::resource("/{username}/role")
// PUT /user/{username}/roles => Put roles for user
.route(
web::put()
.to(http::rbac::put_role)
.authorize(Action::PutUserRoles)
.wrap(DisAllowRootUser),
),
)
.service(
web::resource("/{username}/generate-new-password")
// POST /user/{username}/generate-new-password => reset password for this user
.route(
web::post()
.to(http::rbac::post_gen_password)
.authorize(Action::PutUser)
.wrap(DisAllowRootUser),
),
)
}
fn logstream_api() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}")
Expand Down
Loading