diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index f5492d2ad..ceaf35810 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -156,36 +156,43 @@ impl Server { // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { - web::scope("/dashboards").service( - web::scope("/{user_id}") - .service( + web::scope("/dashboards") + .service( + web::resource("").route( + web::post() + .to(dashboards::post) + .authorize(Action::CreateDashboard), + ), + ) + .service( + web::scope("/dashboard").service( + web::resource("/{dashboard_id}") + .route( + web::get() + .to(dashboards::get) + .authorize(Action::GetDashboard), + ) + .route( + web::delete() + .to(dashboards::delete) + .authorize(Action::DeleteDashboard), + ) + .route( + web::put() + .to(dashboards::update) + .authorize(Action::CreateDashboard), + ), + ), + ) + .service( + web::scope("/{user_id}").service( web::resource("").route( web::get() .to(dashboards::list) .authorize(Action::ListDashboard), ), - ) - .service( - web::scope("/{dashboard_id}").service( - web::resource("") - .route( - web::get() - .to(dashboards::get) - .authorize(Action::GetDashboard), - ) - .route( - web::post() - .to(dashboards::post) - .authorize(Action::CreateDashboard), - ) - .route( - web::delete() - .to(dashboards::delete) - .authorize(Action::DeleteDashboard), - ), - ), ), - ) + ) } // get the filters web scope diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index 4f2bbc6d3..6a558601e 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -20,103 +20,103 @@ use crate::{ handlers::http::ingest::PostError, option::CONFIG, storage::{object_storage::dashboard_path, ObjectStorageError}, - users::dashboards::{Dashboard, DASHBOARDS}, + users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS}, }; use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; +use chrono::Utc; use http::StatusCode; -use serde_json::{Error as SerdeError, Value as JsonValue}; +use serde_json::Error as SerdeError; pub async fn list(req: HttpRequest) -> Result { let user_id = req .match_info() .get("user_id") .ok_or(DashboardError::Metadata("No User Id Provided"))?; + let dashboards = DASHBOARDS.list_dashboards_by_user(user_id); - // .users/user_id/dashboards/ - let path = dashboard_path(user_id, ""); - - let store = CONFIG.storage().get_object_store(); - let dashboards = store - .get_objects( - Some(&path), - Box::new(|file_name: String| file_name.ends_with("json")), - ) - .await?; - - let mut dash = vec![]; - for dashboard in dashboards { - dash.push(serde_json::from_slice::(&dashboard)?) - } - - Ok((web::Json(dash), StatusCode::OK)) + Ok((web::Json(dashboards), StatusCode::OK)) } pub async fn get(req: HttpRequest) -> Result { - let user_id = req - .match_info() - .get("user_id") - .ok_or(DashboardError::Metadata("No User Id Provided"))?; - - let dash_id = req + let dashboard_id = req .match_info() .get("dashboard_id") .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; - if let Some(dashboard) = DASHBOARDS.find(dash_id) { + if let Some(dashboard) = DASHBOARDS.get_dashboard(dashboard_id) { return Ok((web::Json(dashboard), StatusCode::OK)); } - //if dashboard is not in memory fetch from s3 - let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); - let resource = CONFIG - .storage() - .get_object_store() - .get_object(&dash_file_path) + Err(DashboardError::Metadata("Dashboard does not exist")) +} + +pub async fn post(body: Bytes) -> Result { + let mut dashboard: Dashboard = serde_json::from_slice(&body)?; + let dashboard_id = format!("{}.{}", &dashboard.user_id, Utc::now().timestamp_millis()); + dashboard.dashboard_id = Some(dashboard_id.clone()); + dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); + DASHBOARDS.update(&dashboard); + for tile in dashboard.tiles.iter_mut() { + tile.tile_id = Some(format!( + "{}.{}", + &dashboard.user_id, + Utc::now().timestamp_micros() + )); + } + + let path = dashboard_path(&dashboard.user_id, &format!("{}.json", dashboard_id)); + + let store = CONFIG.storage().get_object_store(); + let dashboard_bytes = serde_json::to_vec(&dashboard)?; + store + .put_object(&path, Bytes::from(dashboard_bytes)) .await?; - let resource = serde_json::from_slice::(&resource)?; - Ok((web::Json(resource), StatusCode::OK)) + Ok((web::Json(dashboard), StatusCode::OK)) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { - let user_id = req - .match_info() - .get("user_id") - .ok_or(DashboardError::Metadata("No User Id Provided"))?; - - let dash_id = req +pub async fn update(req: HttpRequest, body: Bytes) -> Result { + let dashboard_id = req .match_info() .get("dashboard_id") .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + if DASHBOARDS.get_dashboard(dashboard_id).is_none() { + return Err(PostError::DashboardError(DashboardError::Metadata( + "Dashboard does not exist", + ))); + } + let mut dashboard: Dashboard = serde_json::from_slice(&body)?; + dashboard.dashboard_id = Some(dashboard_id.to_string()); + dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); + DASHBOARDS.update(&dashboard); - let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); - - let dashboard = serde_json::from_slice::(&body)?; - DASHBOARDS.update(dashboard); + let path = dashboard_path(&dashboard.user_id, &format!("{}.json", dashboard_id)); let store = CONFIG.storage().get_object_store(); - store.put_object(&dash_file_path, body).await?; + let dashboard_bytes = serde_json::to_vec(&dashboard)?; + store + .put_object(&path, Bytes::from(dashboard_bytes)) + .await?; Ok(HttpResponse::Ok().finish()) } pub async fn delete(req: HttpRequest) -> Result { - let user_id = req - .match_info() - .get("user_id") - .ok_or(DashboardError::Metadata("No User Id Provided"))?; - - let dash_id = req + let dashboard_id = req .match_info() .get("dashboard_id") .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + let dashboard = DASHBOARDS + .get_dashboard(dashboard_id) + .ok_or(DashboardError::Metadata("Dashboard does not exist"))?; - let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); - + let path = dashboard_path(&dashboard.user_id, &format!("{}.json", dashboard_id)); let store = CONFIG.storage().get_object_store(); - store.delete_object(&dash_file_path).await?; + store.delete_object(&path).await?; + + DASHBOARDS.delete_dashboard(dashboard_id); Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 5f396bf3a..78b02e8d5 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -350,6 +350,26 @@ impl ObjectStorage for LocalFS { Ok(dirs) } + async fn get_all_dashboards(&self) -> Result, ObjectStorageError> { + let mut dashboards = vec![]; + let users_root_path = self.root.join(USERS_ROOT_DIR); + let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); + let users: Vec = directories.try_collect().await?; + for user in users { + if !user.path().is_dir() { + continue; + } + let dashboards_path = users_root_path.join(user.path()).join("dashboards"); + let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?); + let dashboards_files: Vec = directories.try_collect().await?; + for dashboard in dashboards_files { + let file = fs::read(dashboard.path()).await?; + dashboards.push(file.into()); + } + } + Ok(dashboards) + } + async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { let mut filters = vec![]; let users_root_path = self.root.join(USERS_ROOT_DIR); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 1f2a96156..433327778 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -85,6 +85,7 @@ pub trait ObjectStorage: Sync + 'static { async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; async fn get_all_saved_filters(&self) -> Result, ObjectStorageError>; + async fn get_all_dashboards(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d7729eb92..d94b937d3 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -640,6 +640,38 @@ impl ObjectStorage for S3 { .collect::>()) } + async fn get_all_dashboards(&self) -> Result, ObjectStorageError> { + let mut dashboards = vec![]; + let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); + let resp = self + .client + .list_with_delimiter(Some(&users_root_path)) + .await?; + + let users = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for user in users { + let user_dashboard_path = object_store::path::Path::from(format!( + "{}/{}/{}", + USERS_ROOT_DIR, user, "dashboards" + )); + let dashboards_path = RelativePathBuf::from(&user_dashboard_path); + let dashboard_bytes = self + .get_objects( + Some(&dashboards_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + dashboards.extend(dashboard_bytes); + } + Ok(dashboards) + } + async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { let mut filters = vec![]; let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); diff --git a/server/src/users/dashboards.rs b/server/src/users/dashboards.rs index 561ec2507..c8c85d95e 100644 --- a/server/src/users/dashboards.rs +++ b/server/src/users/dashboards.rs @@ -19,39 +19,63 @@ use std::sync::RwLock; use once_cell::sync::Lazy; -use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; -use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG}; +use crate::{metadata::LOCK_EXPECT, option::CONFIG}; use super::TimeFilter; pub static DASHBOARDS: Lazy = Lazy::new(Dashboards::default); +pub const CURRENT_DASHBOARD_VERSION: &str = "v1"; #[derive(Debug, Serialize, Deserialize, Default, Clone)] -pub struct Pannel { - stream_name: String, +pub struct Tiles { + name: String, + pub tile_id: Option, + description: String, + stream: String, query: String, - chart_type: String, - columns: Vec, - headers: Vec, - dimensions: Vec, + order: Option, + visualization: Visualization, } #[derive(Debug, Serialize, Deserialize, Default, Clone)] -pub struct Dashboard { - version: String, - dashboard_name: String, - dashboard_id: String, - time_filter: TimeFilter, - refresh_interval: u64, - pannels: Vec, +pub struct Visualization { + visualization_type: String, + circular_chart_config: Option, + graph_config: Option, + size: String, + color_config: Vec, } -impl Dashboard { - pub fn dashboard_id(&self) -> &str { - &self.dashboard_id - } +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct CircularChartConfig { + name_key: String, + value_key: String, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct GraphConfig { + x_key: String, + y_key: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct ColorConfig { + field_name: String, + color_palette: String, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct Dashboard { + pub version: Option, + name: String, + description: String, + pub dashboard_id: Option, + pub user_id: String, + pub time_filter: Option, + refresh_interval: u64, + pub tiles: Vec, } #[derive(Default, Debug)] @@ -60,16 +84,12 @@ pub struct Dashboards(RwLock>); impl Dashboards { pub async fn load(&self) -> anyhow::Result<()> { let mut this = vec![]; - let path = RelativePathBuf::from(USERS_ROOT_DIR); let store = CONFIG.storage().get_object_store(); - let objs = store - .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) - .await - .unwrap_or_default(); - - for obj in objs { - if let Ok(filter) = serde_json::from_slice::(&obj) { - this.push(filter); + let dashboards = store.get_all_dashboards().await.unwrap_or_default(); + + for dashboard in dashboards { + if let Ok(dashboard) = serde_json::from_slice::(&dashboard) { + this.push(dashboard); } } @@ -79,18 +99,33 @@ impl Dashboards { Ok(()) } - pub fn update(&self, dashboard: Dashboard) { + pub fn update(&self, dashboard: &Dashboard) { + let mut s = self.0.write().expect(LOCK_EXPECT); + s.retain(|f| f.dashboard_id != dashboard.dashboard_id); + s.push(dashboard.clone()); + } + + pub fn delete_dashboard(&self, dashboard_id: &str) { let mut s = self.0.write().expect(LOCK_EXPECT); + s.retain(|f| f.dashboard_id != Some(dashboard_id.to_string())); + } - s.push(dashboard); + pub fn get_dashboard(&self, dashboard_id: &str) -> Option { + self.0 + .read() + .expect(LOCK_EXPECT) + .iter() + .find(|f| f.dashboard_id == Some(dashboard_id.to_string())) + .cloned() } - pub fn find(&self, dashboard_id: &str) -> Option { + pub fn list_dashboards_by_user(&self, user_id: &str) -> Vec { self.0 .read() .expect(LOCK_EXPECT) .iter() - .find(|dashboard| dashboard.dashboard_id() == dashboard_id) + .filter(|f| f.user_id == user_id) .cloned() + .collect() } }