Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,36 +156,43 @@ impl Server {

// get the dashboards web scope
pub fn get_dashboards_webscope() -> Scope {
web::scope("/dashboards").service(
web::scope("/{user_id}")
.service(
web::scope("/dashboards")
.service(
web::resource("").route(
web::post()
.to(dashboards::post)
.authorize(Action::CreateDashboard),
),
)
.service(
web::scope("/dashboard").service(
web::resource("/{dashboard_id}")
.route(
web::get()
.to(dashboards::get)
.authorize(Action::GetDashboard),
)
.route(
web::delete()
.to(dashboards::delete)
.authorize(Action::DeleteDashboard),
)
.route(
web::put()
.to(dashboards::update)
.authorize(Action::CreateDashboard),
),
),
)
.service(
web::scope("/{user_id}").service(
web::resource("").route(
web::get()
.to(dashboards::list)
.authorize(Action::ListDashboard),
),
)
.service(
web::scope("/{dashboard_id}").service(
web::resource("")
.route(
web::get()
.to(dashboards::get)
.authorize(Action::GetDashboard),
)
.route(
web::post()
.to(dashboards::post)
.authorize(Action::CreateDashboard),
)
.route(
web::delete()
.to(dashboards::delete)
.authorize(Action::DeleteDashboard),
),
),
),
)
)
}

// get the filters web scope
Expand Down
110 changes: 55 additions & 55 deletions server/src/handlers/http/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,103 +20,103 @@ use crate::{
handlers::http::ingest::PostError,
option::CONFIG,
storage::{object_storage::dashboard_path, ObjectStorageError},
users::dashboards::{Dashboard, DASHBOARDS},
users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS},
};
use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;

use chrono::Utc;
use http::StatusCode;
use serde_json::{Error as SerdeError, Value as JsonValue};
use serde_json::Error as SerdeError;

pub async fn list(req: HttpRequest) -> Result<impl Responder, DashboardError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(DashboardError::Metadata("No User Id Provided"))?;
let dashboards = DASHBOARDS.list_dashboards_by_user(user_id);

// .users/user_id/dashboards/
let path = dashboard_path(user_id, "");

let store = CONFIG.storage().get_object_store();
let dashboards = store
.get_objects(
Some(&path),
Box::new(|file_name: String| file_name.ends_with("json")),
)
.await?;

let mut dash = vec![];
for dashboard in dashboards {
dash.push(serde_json::from_slice::<JsonValue>(&dashboard)?)
}

Ok((web::Json(dash), StatusCode::OK))
Ok((web::Json(dashboards), StatusCode::OK))
}

pub async fn get(req: HttpRequest) -> Result<impl Responder, DashboardError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(DashboardError::Metadata("No User Id Provided"))?;

let dash_id = req
let dashboard_id = req
.match_info()
.get("dashboard_id")
.ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?;

if let Some(dashboard) = DASHBOARDS.find(dash_id) {
if let Some(dashboard) = DASHBOARDS.get_dashboard(dashboard_id) {
return Ok((web::Json(dashboard), StatusCode::OK));
}

//if dashboard is not in memory fetch from s3
let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id));
let resource = CONFIG
.storage()
.get_object_store()
.get_object(&dash_file_path)
Err(DashboardError::Metadata("Dashboard does not exist"))
}

pub async fn post(body: Bytes) -> Result<impl Responder, PostError> {
let mut dashboard: Dashboard = serde_json::from_slice(&body)?;
let dashboard_id = format!("{}.{}", &dashboard.user_id, Utc::now().timestamp_millis());
dashboard.dashboard_id = Some(dashboard_id.clone());
dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string());
DASHBOARDS.update(&dashboard);
for tile in dashboard.tiles.iter_mut() {
tile.tile_id = Some(format!(
"{}.{}",
&dashboard.user_id,
Utc::now().timestamp_micros()
));
}

let path = dashboard_path(&dashboard.user_id, &format!("{}.json", dashboard_id));

let store = CONFIG.storage().get_object_store();
let dashboard_bytes = serde_json::to_vec(&dashboard)?;
store
.put_object(&path, Bytes::from(dashboard_bytes))
.await?;
let resource = serde_json::from_slice::<Dashboard>(&resource)?;

Ok((web::Json(resource), StatusCode::OK))
Ok((web::Json(dashboard), StatusCode::OK))
}

pub async fn post(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(DashboardError::Metadata("No User Id Provided"))?;

let dash_id = req
pub async fn update(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let dashboard_id = req
.match_info()
.get("dashboard_id")
.ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?;
if DASHBOARDS.get_dashboard(dashboard_id).is_none() {
return Err(PostError::DashboardError(DashboardError::Metadata(
"Dashboard does not exist",
)));
}
let mut dashboard: Dashboard = serde_json::from_slice(&body)?;
dashboard.dashboard_id = Some(dashboard_id.to_string());
dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string());
DASHBOARDS.update(&dashboard);

let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id));

let dashboard = serde_json::from_slice::<Dashboard>(&body)?;
DASHBOARDS.update(dashboard);
let path = dashboard_path(&dashboard.user_id, &format!("{}.json", dashboard_id));

let store = CONFIG.storage().get_object_store();
store.put_object(&dash_file_path, body).await?;
let dashboard_bytes = serde_json::to_vec(&dashboard)?;
store
.put_object(&path, Bytes::from(dashboard_bytes))
.await?;

Ok(HttpResponse::Ok().finish())
}

pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(DashboardError::Metadata("No User Id Provided"))?;

let dash_id = req
let dashboard_id = req
.match_info()
.get("dashboard_id")
.ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?;
let dashboard = DASHBOARDS
.get_dashboard(dashboard_id)
.ok_or(DashboardError::Metadata("Dashboard does not exist"))?;

let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id));

let path = dashboard_path(&dashboard.user_id, &format!("{}.json", dashboard_id));
let store = CONFIG.storage().get_object_store();
store.delete_object(&dash_file_path).await?;
store.delete_object(&path).await?;

DASHBOARDS.delete_dashboard(dashboard_id);

Ok(HttpResponse::Ok().finish())
}
Expand Down
20 changes: 20 additions & 0 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,26 @@ impl ObjectStorage for LocalFS {
Ok(dirs)
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
for user in users {
if !user.path().is_dir() {
continue;
}
let dashboards_path = users_root_path.join(user.path()).join("dashboards");
let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?);
let dashboards_files: Vec<DirEntry> = directories.try_collect().await?;
for dashboard in dashboards_files {
let file = fs::read(dashboard.path()).await?;
dashboards.push(file.into());
}
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
let users_root_path = self.root.join(USERS_ROOT_DIR);
Expand Down
1 change: 1 addition & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
Expand Down
32 changes: 32 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,38 @@ impl ObjectStorage for S3 {
.collect::<Vec<_>>())
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
.list_with_delimiter(Some(&users_root_path))
.await?;

let users = resp
.common_prefixes
.iter()
.flat_map(|path| path.parts())
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>();
for user in users {
let user_dashboard_path = object_store::path::Path::from(format!(
"{}/{}/{}",
USERS_ROOT_DIR, user, "dashboards"
));
let dashboards_path = RelativePathBuf::from(&user_dashboard_path);
let dashboard_bytes = self
.get_objects(
Some(&dashboards_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
dashboards.extend(dashboard_bytes);
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
Expand Down
Loading