Skip to content

Commit d08cd82

Browse files
author
Devdutt Shenoi
committed
refactor: DRY get_all_saved_filters
1 parent 3e5e8e4 commit d08cd82

File tree

4 files changed

+41
-142
lines changed

4 files changed

+41
-142
lines changed

src/storage/azure_blob.rs

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,10 @@ impl ObjectStorage for BlobStore {
653653
.collect::<Vec<_>>())
654654
}
655655

656-
async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result<Vec<String>, ObjectStorageError> {
656+
async fn list_dirs_relative(
657+
&self,
658+
relative_path: &RelativePath,
659+
) -> Result<Vec<String>, ObjectStorageError> {
657660
let prefix = object_store::path::Path::from(relative_path.as_str());
658661
let resp = self.client.list_with_delimiter(Some(&prefix)).await?;
659662

@@ -701,53 +704,6 @@ impl ObjectStorage for BlobStore {
701704
Ok(dashboards)
702705
}
703706

704-
async fn get_all_saved_filters(
705-
&self,
706-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
707-
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
708-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
709-
let resp = self
710-
.client
711-
.list_with_delimiter(Some(&users_root_path))
712-
.await?;
713-
714-
let users = resp
715-
.common_prefixes
716-
.iter()
717-
.flat_map(|path| path.parts())
718-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
719-
.map(|name| name.as_ref().to_string())
720-
.collect::<Vec<_>>();
721-
for user in users {
722-
let user_filters_path =
723-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",));
724-
let resp = self
725-
.client
726-
.list_with_delimiter(Some(&user_filters_path))
727-
.await?;
728-
let streams = resp
729-
.common_prefixes
730-
.iter()
731-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
732-
.map(|name| name.as_ref().to_string())
733-
.collect::<Vec<_>>();
734-
for stream in streams {
735-
let filters_path = RelativePathBuf::from(&stream);
736-
let filter_bytes = self
737-
.get_objects(
738-
Some(&filters_path),
739-
Box::new(|file_name| file_name.ends_with(".json")),
740-
)
741-
.await?;
742-
filters
743-
.entry(filters_path)
744-
.or_default()
745-
.extend(filter_bytes);
746-
}
747-
}
748-
Ok(filters)
749-
}
750-
751707
///fetch all correlations uploaded in object store
752708
/// return the correlation file path and all correlation json bytes for each file path
753709
async fn get_all_correlations(

src/storage/localfs.rs

Lines changed: 5 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,10 @@ impl ObjectStorage for LocalFS {
353353
Ok(dirs)
354354
}
355355

356-
357-
async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result<Vec<String>, ObjectStorageError> {
356+
async fn list_dirs_relative(
357+
&self,
358+
relative_path: &RelativePath,
359+
) -> Result<Vec<String>, ObjectStorageError> {
358360
let root = self.root.join(relative_path.as_str());
359361
let dirs = ReadDirStream::new(fs::read_dir(root).await?)
360362
.try_collect::<Vec<DirEntry>>()
@@ -368,7 +370,7 @@ impl ObjectStorage for LocalFS {
368370
.into_iter()
369371
.flatten()
370372
.collect::<Vec<_>>();
371-
373+
372374
Ok(dirs)
373375
}
374376

@@ -402,47 +404,6 @@ impl ObjectStorage for LocalFS {
402404
Ok(dashboards)
403405
}
404406

405-
async fn get_all_saved_filters(
406-
&self,
407-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
408-
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
409-
let users_root_path = self.root.join(USERS_ROOT_DIR);
410-
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
411-
let users: Vec<DirEntry> = directories.try_collect().await?;
412-
for user in users {
413-
if !user.path().is_dir() {
414-
continue;
415-
}
416-
let stream_root_path = users_root_path.join(user.path()).join("filters");
417-
let directories = ReadDirStream::new(fs::read_dir(&stream_root_path).await?);
418-
let streams: Vec<DirEntry> = directories.try_collect().await?;
419-
for stream in streams {
420-
if !stream.path().is_dir() {
421-
continue;
422-
}
423-
let filters_path = users_root_path
424-
.join(user.path())
425-
.join("filters")
426-
.join(stream.path());
427-
let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?);
428-
let filters_files: Vec<DirEntry> = directories.try_collect().await?;
429-
for filter in filters_files {
430-
let filter_absolute_path = filter.path();
431-
let file = fs::read(filter_absolute_path.clone()).await?;
432-
let filter_relative_path = filter_absolute_path
433-
.strip_prefix(self.root.as_path())
434-
.unwrap();
435-
436-
filters
437-
.entry(RelativePathBuf::from_path(filter_relative_path).unwrap())
438-
.or_default()
439-
.push(file.into());
440-
}
441-
}
442-
}
443-
Ok(filters)
444-
}
445-
446407
///fetch all correlations stored in disk
447408
/// return the correlation file path and all correlation json bytes for each file path
448409
async fn get_all_correlations(

src/storage/object_storage.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,36 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
9494
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError>;
9595
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError>;
9696
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
97-
async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result<Vec<String>, ObjectStorageError>;
97+
async fn list_dirs_relative(
98+
&self,
99+
relative_path: &RelativePath,
100+
) -> Result<Vec<String>, ObjectStorageError>;
101+
98102
async fn get_all_saved_filters(
99103
&self,
100-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
104+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
105+
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
106+
107+
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
108+
for user in self.list_dirs_relative(&users_dir).await? {
109+
let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]);
110+
for stream in self.list_dirs_relative(&stream_dir).await? {
111+
let filters_path = RelativePathBuf::from(&stream);
112+
let filter_bytes = self
113+
.get_objects(
114+
Some(&filters_path),
115+
Box::new(|file_name| file_name.ends_with(".json")),
116+
)
117+
.await?;
118+
filters
119+
.entry(filters_path)
120+
.or_default()
121+
.extend(filter_bytes);
122+
}
123+
}
124+
Ok(filters)
125+
}
126+
101127
async fn get_all_dashboards(
102128
&self,
103129
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;

src/storage/s3.rs

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,10 @@ impl ObjectStorage for S3 {
786786
.collect::<Vec<_>>())
787787
}
788788

789-
async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result<Vec<String>, ObjectStorageError> {
789+
async fn list_dirs_relative(
790+
&self,
791+
relative_path: &RelativePath,
792+
) -> Result<Vec<String>, ObjectStorageError> {
790793
let prefix = object_store::path::Path::from(relative_path.as_str());
791794
let resp = self.client.list_with_delimiter(Some(&prefix)).await?;
792795

@@ -834,53 +837,6 @@ impl ObjectStorage for S3 {
834837
Ok(dashboards)
835838
}
836839

837-
async fn get_all_saved_filters(
838-
&self,
839-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
840-
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
841-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
842-
let resp = self
843-
.client
844-
.list_with_delimiter(Some(&users_root_path))
845-
.await?;
846-
847-
let users = resp
848-
.common_prefixes
849-
.iter()
850-
.flat_map(|path| path.parts())
851-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
852-
.map(|name| name.as_ref().to_string())
853-
.collect::<Vec<_>>();
854-
for user in users {
855-
let user_filters_path =
856-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",));
857-
let resp = self
858-
.client
859-
.list_with_delimiter(Some(&user_filters_path))
860-
.await?;
861-
let streams = resp
862-
.common_prefixes
863-
.iter()
864-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
865-
.map(|name| name.as_ref().to_string())
866-
.collect::<Vec<_>>();
867-
for stream in streams {
868-
let filters_path = RelativePathBuf::from(&stream);
869-
let filter_bytes = self
870-
.get_objects(
871-
Some(&filters_path),
872-
Box::new(|file_name| file_name.ends_with(".json")),
873-
)
874-
.await?;
875-
filters
876-
.entry(filters_path)
877-
.or_default()
878-
.extend(filter_bytes);
879-
}
880-
}
881-
Ok(filters)
882-
}
883-
884840
///fetch all correlations stored in object store
885841
/// return the correlation file path and all correlation json bytes for each file path
886842
async fn get_all_correlations(

0 commit comments

Comments
 (0)