diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index 6f8ebbb23..ce26404fa 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -41,7 +41,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME; use crate::metrics::storage::StorageMetrics; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -650,8 +650,10 @@ impl ObjectStorage for BlobStore { .collect::>()) } - async fn get_all_dashboards(&self) -> Result, ObjectStorageError> { - let mut dashboards = vec![]; + async fn get_all_dashboards( + &self, + ) -> Result>, ObjectStorageError> { + let mut dashboards: HashMap> = HashMap::new(); let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); let resp = self .client @@ -677,13 +679,19 @@ impl ObjectStorage for BlobStore { Box::new(|file_name| file_name.ends_with(".json")), ) .await?; - dashboards.extend(dashboard_bytes); + + dashboards + .entry(dashboards_path) + .or_default() + .extend(dashboard_bytes); } Ok(dashboards) } - async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { - let mut filters = vec![]; + async fn get_all_saved_filters( + &self, + ) -> Result>, ObjectStorageError> { + let mut filters: HashMap> = HashMap::new(); let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); let resp = self .client @@ -720,7 +728,10 @@ impl ObjectStorage for BlobStore { Box::new(|file_name| file_name.ends_with(".json")), ) .await?; - filters.extend(filter_bytes); + filters + .entry(filters_path) + .or_default() + .extend(filter_bytes); } } Ok(filters) diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 4aa2d65d8..a84247f0b 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -351,8 +351,10 @@ impl ObjectStorage for LocalFS { Ok(dirs) } - async fn get_all_dashboards(&self) -> Result, ObjectStorageError> { - let mut dashboards = vec![]; + async fn get_all_dashboards( + &self, + ) -> Result>, ObjectStorageError> { + let mut dashboards: HashMap> = HashMap::new(); let users_root_path = self.root.join(USERS_ROOT_DIR); let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); let users: Vec = directories.try_collect().await?; @@ -364,15 +366,25 @@ impl ObjectStorage for LocalFS { let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?); let dashboards_files: Vec = directories.try_collect().await?; for dashboard in dashboards_files { - let file = fs::read(dashboard.path()).await?; - dashboards.push(file.into()); + let dashboard_absolute_path = dashboard.path(); + let file = fs::read(dashboard_absolute_path.clone()).await?; + let dashboard_relative_path = dashboard_absolute_path + .strip_prefix(self.root.as_path()) + .unwrap(); + + dashboards + .entry(RelativePathBuf::from_path(dashboard_relative_path).unwrap()) + .or_default() + .push(file.into()); } } Ok(dashboards) } - async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { - let mut filters = vec![]; + async fn get_all_saved_filters( + &self, + ) -> Result>, ObjectStorageError> { + let mut filters: HashMap> = HashMap::new(); let users_root_path = self.root.join(USERS_ROOT_DIR); let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); let users: Vec = directories.try_collect().await?; @@ -394,8 +406,16 @@ impl ObjectStorage for LocalFS { let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?); let filters_files: Vec = directories.try_collect().await?; for filter in filters_files { - let file = fs::read(filter.path()).await?; - filters.push(file.into()); + let filter_absolute_path = filter.path(); + let file = fs::read(filter_absolute_path.clone()).await?; + let filter_relative_path = filter_absolute_path + .strip_prefix(self.root.as_path()) + .unwrap(); + + filters + .entry(RelativePathBuf::from_path(filter_relative_path).unwrap()) + .or_default() + .push(file.into()); } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7f1e75936..93b8a5bcd 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -85,8 +85,12 @@ pub trait ObjectStorage: Sync + 'static { async fn list_streams(&self) -> Result, ObjectStorageError>; async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; - async fn get_all_saved_filters(&self) -> Result, ObjectStorageError>; - async fn get_all_dashboards(&self) -> Result, ObjectStorageError>; + async fn get_all_saved_filters( + &self, + ) -> Result>, ObjectStorageError>; + async fn get_all_dashboards( + &self, + ) -> Result>, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn list_manifest_files( &self, diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d359a11a3..0627100a2 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -37,15 +37,15 @@ use std::path::Path as StdPath; use std::sync::Arc; use std::time::{Duration, Instant}; -use crate::handlers::http::users::USERS_ROOT_DIR; -use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; - use super::metrics_layer::MetricLayer; use super::object_storage::parseable_json_path; use super::{ ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::handlers::http::users::USERS_ROOT_DIR; +use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; +use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; +use std::collections::HashMap; #[allow(dead_code)] // in bytes @@ -689,8 +689,10 @@ impl ObjectStorage for S3 { .collect::>()) } - async fn get_all_dashboards(&self) -> Result, ObjectStorageError> { - let mut dashboards = vec![]; + async fn get_all_dashboards( + &self, + ) -> Result>, ObjectStorageError> { + let mut dashboards: HashMap> = HashMap::new(); let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); let resp = self .client @@ -716,13 +718,19 @@ impl ObjectStorage for S3 { Box::new(|file_name| file_name.ends_with(".json")), ) .await?; - dashboards.extend(dashboard_bytes); + + dashboards + .entry(dashboards_path) + .or_default() + .extend(dashboard_bytes); } Ok(dashboards) } - async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { - let mut filters = vec![]; + async fn get_all_saved_filters( + &self, + ) -> Result>, ObjectStorageError> { + let mut filters: HashMap> = HashMap::new(); let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); let resp = self .client @@ -759,7 +767,10 @@ impl ObjectStorage for S3 { Box::new(|file_name| file_name.ends_with(".json")), ) .await?; - filters.extend(filter_bytes); + filters + .entry(filters_path) + .or_default() + .extend(filter_bytes); } } Ok(filters) diff --git a/server/src/users/dashboards.rs b/server/src/users/dashboards.rs index 72673031e..46b3bbcf2 100644 --- a/server/src/users/dashboards.rs +++ b/server/src/users/dashboards.rs @@ -114,64 +114,61 @@ impl Dashboards { pub async fn load(&self) -> anyhow::Result<()> { let mut this = vec![]; let store = CONFIG.storage().get_object_store(); - let dashboards = store.get_all_dashboards().await.unwrap_or_default(); - for dashboard in dashboards { - if dashboard.is_empty() { - continue; - } - let mut dashboard_value = serde_json::from_slice::(&dashboard)?; - if let Some(meta) = dashboard_value.clone().as_object() { - let version = meta.get("version").and_then(|version| version.as_str()); - let dashboard_id = meta - .get("dashboard_id") - .and_then(|dashboard_id| dashboard_id.as_str()); - match version { - Some("v1") => { - dashboard_value = migrate_v1_v2(dashboard_value); - dashboard_value = migrate_v2_v3(dashboard_value); - let user_id = dashboard_value - .as_object() - .unwrap() - .get("user_id") - .and_then(|user_id| user_id.as_str()); - let path = dashboard_path( - user_id.unwrap(), - &format!("{}.json", dashboard_id.unwrap()), - ); - let dashboard_bytes = to_bytes(&dashboard_value); - store.put_object(&path, dashboard_bytes.clone()).await?; - if let Ok(dashboard) = serde_json::from_slice::(&dashboard_bytes) - { - this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id); - this.push(dashboard); - } - } - Some("v2") => { - dashboard_value = migrate_v2_v3(dashboard_value); - let user_id = dashboard_value - .as_object() - .unwrap() - .get("user_id") - .and_then(|user_id| user_id.as_str()); - let path = dashboard_path( - user_id.unwrap(), - &format!("{}.json", dashboard_id.unwrap()), - ); - let dashboard_bytes = to_bytes(&dashboard_value); - store.put_object(&path, dashboard_bytes.clone()).await?; - if let Ok(dashboard) = serde_json::from_slice::(&dashboard_bytes) - { - this.retain(|d| d.dashboard_id != dashboard.dashboard_id); - this.push(dashboard); + let all_dashboards = store.get_all_dashboards().await.unwrap_or_default(); + for (dashboard_relative_path, dashboards) in all_dashboards { + for dashboard in dashboards { + if dashboard.is_empty() { + continue; + } + let mut dashboard_value = serde_json::from_slice::(&dashboard)?; + if let Some(meta) = dashboard_value.clone().as_object() { + let version = meta.get("version").and_then(|version| version.as_str()); + let dashboard_id = meta + .get("dashboard_id") + .and_then(|dashboard_id| dashboard_id.as_str()); + match version { + Some("v1") => { + //delete older version of the dashboard + store.delete_object(&dashboard_relative_path).await?; + + dashboard_value = migrate_v1_v2(dashboard_value); + dashboard_value = migrate_v2_v3(dashboard_value); + let user_id = dashboard_value + .as_object() + .unwrap() + .get("user_id") + .and_then(|user_id| user_id.as_str()); + let path = dashboard_path( + user_id.unwrap(), + &format!("{}.json", dashboard_id.unwrap()), + ); + let dashboard_bytes = to_bytes(&dashboard_value); + store.put_object(&path, dashboard_bytes.clone()).await?; } - } - _ => { - if let Ok(dashboard) = serde_json::from_slice::(&dashboard) { - this.retain(|d| d.dashboard_id != dashboard.dashboard_id); - this.push(dashboard); + Some("v2") => { + //delete older version of the dashboard + store.delete_object(&dashboard_relative_path).await?; + + dashboard_value = migrate_v2_v3(dashboard_value); + let user_id = dashboard_value + .as_object() + .unwrap() + .get("user_id") + .and_then(|user_id| user_id.as_str()); + let path = dashboard_path( + user_id.unwrap(), + &format!("{}.json", dashboard_id.unwrap()), + ); + let dashboard_bytes = to_bytes(&dashboard_value); + store.put_object(&path, dashboard_bytes.clone()).await?; } + _ => {} } } + if let Ok(dashboard) = serde_json::from_value::(dashboard_value) { + this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id); + this.push(dashboard); + } } } diff --git a/server/src/users/filters.rs b/server/src/users/filters.rs index 016a76c8f..9b970b897 100644 --- a/server/src/users/filters.rs +++ b/server/src/users/filters.rs @@ -76,52 +76,47 @@ impl Filters { pub async fn load(&self) -> anyhow::Result<()> { let mut this = vec![]; let store = CONFIG.storage().get_object_store(); - let filters = store.get_all_saved_filters().await.unwrap_or_default(); - - for filter in filters { - if filter.is_empty() { - continue; - } - - let mut filter_value = serde_json::from_slice::(&filter)?; - if let Some(meta) = filter_value.clone().as_object() { - let version = meta.get("version").and_then(|version| version.as_str()); - let user_id = meta.get("user_id").and_then(|user_id| user_id.as_str()); - let filter_id = meta - .get("filter_id") - .and_then(|filter_id| filter_id.as_str()); - let stream_name = meta - .get("stream_name") - .and_then(|stream_name| stream_name.as_str()); - - if version == Some("v1") { - filter_value = migrate_v1_v2(filter_value); - if let (Some(user_id), Some(stream_name), Some(filter_id)) = - (user_id, stream_name, filter_id) - { - let path = filter_path( - &get_hash(user_id), - stream_name, - &format!("{}.json", filter_id), - ); - let filter_bytes = to_bytes(&filter_value); - store.put_object(&path, filter_bytes.clone()).await?; - if let Ok(filter) = serde_json::from_slice::(&filter_bytes) { - this.retain(|f: &Filter| f.filter_id != filter.filter_id); - this.push(filter); + let all_filters = store.get_all_saved_filters().await.unwrap_or_default(); + for (filter_relative_path, filters) in all_filters { + for filter in filters { + if filter.is_empty() { + continue; + } + let mut filter_value = serde_json::from_slice::(&filter)?; + if let Some(meta) = filter_value.clone().as_object() { + let version = meta.get("version").and_then(|version| version.as_str()); + + if version == Some("v1") { + //delete older version of the filter + store.delete_object(&filter_relative_path).await?; + + filter_value = migrate_v1_v2(filter_value); + let user_id = filter_value + .as_object() + .unwrap() + .get("user_id") + .and_then(|user_id| user_id.as_str()); + let filter_id = filter_value + .as_object() + .unwrap() + .get("filter_id") + .and_then(|filter_id| filter_id.as_str()); + let stream_name = filter_value + .as_object() + .unwrap() + .get("stream_name") + .and_then(|stream_name| stream_name.as_str()); + if let (Some(user_id), Some(stream_name), Some(filter_id)) = + (user_id, stream_name, filter_id) + { + let path = + filter_path(user_id, stream_name, &format!("{}.json", filter_id)); + let filter_bytes = to_bytes(&filter_value); + store.put_object(&path, filter_bytes.clone()).await?; } } - } else if let (Some(user_id), Some(stream_name), Some(filter_id)) = - (user_id, stream_name, filter_id) - { - let path = filter_path( - &get_hash(user_id), - stream_name, - &format!("{}.json", filter_id), - ); - let filter_bytes = to_bytes(&filter_value); - store.put_object(&path, filter_bytes.clone()).await?; - if let Ok(filter) = serde_json::from_slice::(&filter) { + + if let Ok(filter) = serde_json::from_value::(filter_value) { this.retain(|f: &Filter| f.filter_id != filter.filter_id); this.push(filter); }