Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions server/src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
use crate::metrics::storage::StorageMetrics;
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -650,8 +650,10 @@ impl ObjectStorage for BlobStore {
.collect::<Vec<_>>())
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand All @@ -677,13 +679,19 @@ impl ObjectStorage for BlobStore {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
dashboards.extend(dashboard_bytes);

dashboards
.entry(dashboards_path)
.or_default()
.extend(dashboard_bytes);
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand Down Expand Up @@ -720,7 +728,10 @@ impl ObjectStorage for BlobStore {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters.extend(filter_bytes);
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
Expand Down
38 changes: 29 additions & 9 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
path::{Path, PathBuf},
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -351,8 +351,10 @@ impl ObjectStorage for LocalFS {
Ok(dirs)
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
Expand All @@ -364,15 +366,25 @@ impl ObjectStorage for LocalFS {
let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?);
let dashboards_files: Vec<DirEntry> = directories.try_collect().await?;
for dashboard in dashboards_files {
let file = fs::read(dashboard.path()).await?;
dashboards.push(file.into());
let dashboard_absolute_path = dashboard.path();
let file = fs::read(dashboard_absolute_path.clone()).await?;
let dashboard_relative_path = dashboard_absolute_path
.strip_prefix(self.root.as_path())
.unwrap();

dashboards
.entry(RelativePathBuf::from_path(dashboard_relative_path).unwrap())
.or_default()
.push(file.into());
}
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
Expand All @@ -394,8 +406,16 @@ impl ObjectStorage for LocalFS {
let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?);
let filters_files: Vec<DirEntry> = directories.try_collect().await?;
for filter in filters_files {
let file = fs::read(filter.path()).await?;
filters.push(file.into());
let filter_absolute_path = filter.path();
let file = fs::read(filter_absolute_path.clone()).await?;
let filter_relative_path = filter_absolute_path
.strip_prefix(self.root.as_path())
.unwrap();

filters
.entry(RelativePathBuf::from_path(filter_relative_path).unwrap())
.or_default()
.push(file.into());
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn list_manifest_files(
&self,
Expand Down
31 changes: 21 additions & 10 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ use std::path::Path as StdPath;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};

use super::metrics_layer::MetricLayer;
use super::object_storage::parseable_json_path;
use super::{
ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};
use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use std::collections::HashMap;

#[allow(dead_code)]
// in bytes
Expand Down Expand Up @@ -689,8 +689,10 @@ impl ObjectStorage for S3 {
.collect::<Vec<_>>())
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand All @@ -716,13 +718,19 @@ impl ObjectStorage for S3 {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
dashboards.extend(dashboard_bytes);

dashboards
.entry(dashboards_path)
.or_default()
.extend(dashboard_bytes);
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand Down Expand Up @@ -759,7 +767,10 @@ impl ObjectStorage for S3 {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters.extend(filter_bytes);
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
Expand Down
105 changes: 51 additions & 54 deletions server/src/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,64 +114,61 @@ impl Dashboards {
pub async fn load(&self) -> anyhow::Result<()> {
let mut this = vec![];
let store = CONFIG.storage().get_object_store();
let dashboards = store.get_all_dashboards().await.unwrap_or_default();
for dashboard in dashboards {
if dashboard.is_empty() {
continue;
}
let mut dashboard_value = serde_json::from_slice::<serde_json::Value>(&dashboard)?;
if let Some(meta) = dashboard_value.clone().as_object() {
let version = meta.get("version").and_then(|version| version.as_str());
let dashboard_id = meta
.get("dashboard_id")
.and_then(|dashboard_id| dashboard_id.as_str());
match version {
Some("v1") => {
dashboard_value = migrate_v1_v2(dashboard_value);
dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard_bytes)
{
this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
}
}
Some("v2") => {
dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard_bytes)
{
this.retain(|d| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
let all_dashboards = store.get_all_dashboards().await.unwrap_or_default();
for (dashboard_relative_path, dashboards) in all_dashboards {
for dashboard in dashboards {
if dashboard.is_empty() {
continue;
}
let mut dashboard_value = serde_json::from_slice::<serde_json::Value>(&dashboard)?;
if let Some(meta) = dashboard_value.clone().as_object() {
let version = meta.get("version").and_then(|version| version.as_str());
let dashboard_id = meta
.get("dashboard_id")
.and_then(|dashboard_id| dashboard_id.as_str());
match version {
Some("v1") => {
//delete older version of the dashboard
store.delete_object(&dashboard_relative_path).await?;

dashboard_value = migrate_v1_v2(dashboard_value);
dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
}
}
_ => {
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard) {
this.retain(|d| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
Some("v2") => {
//delete older version of the dashboard
store.delete_object(&dashboard_relative_path).await?;

dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
}
_ => {}
}
}
if let Ok(dashboard) = serde_json::from_value::<Dashboard>(dashboard_value) {
this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
}
}
}

Expand Down
Loading
Loading