Skip to content

Commit 8266092

Browse files
committed
updated stream check for local file storage
1 parent 23f1b84 commit 8266092

File tree

4 files changed

+22
-10
lines changed

4 files changed

+22
-10
lines changed

src/handlers/http/correlation.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use bytes::Bytes;
2121
use relative_path::RelativePathBuf;
2222

2323
use crate::{
24-
option::CONFIG, storage::CORRELATION_DIRECTORY, utils::actix::extract_session_key_from_req,
24+
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
25+
utils::actix::extract_session_key_from_req,
2526
};
2627

2728
use crate::correlation::{
@@ -125,7 +126,8 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
125126

126127
// Delete from disk
127128
let store = CONFIG.storage().get_object_store();
128-
let path = RelativePathBuf::from_iter([CORRELATION_DIRECTORY, &correlation.id.to_string()]);
129+
let path =
130+
RelativePathBuf::from_iter([CORRELATIONS_ROOT_DIRECTORY, &correlation.id.to_string()]);
129131
store.delete_object(&path).await?;
130132

131133
// Delete from memory

src/storage/localfs.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ use crate::{
3939
};
4040

4141
use super::{
42-
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
43-
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
42+
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider,
43+
CORRELATIONS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
44+
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
4445
};
4546

4647
#[derive(Debug, Clone, clap::Args)]
@@ -295,7 +296,12 @@ impl ObjectStorage for LocalFS {
295296
}
296297

297298
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
298-
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR];
299+
let ignore_dir = &[
300+
"lost+found",
301+
PARSEABLE_ROOT_DIRECTORY,
302+
USERS_ROOT_DIR,
303+
CORRELATIONS_ROOT_DIRECTORY,
304+
];
299305
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
300306
let entries: Vec<DirEntry> = directories.try_collect().await?;
301307
let entries = entries
@@ -315,7 +321,11 @@ impl ObjectStorage for LocalFS {
315321
}
316322

317323
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
318-
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
324+
let ignore_dir = &[
325+
"lost+found",
326+
PARSEABLE_ROOT_DIRECTORY,
327+
CORRELATIONS_ROOT_DIRECTORY,
328+
];
319329
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
320330
let entries: Vec<DirEntry> = directories.try_collect().await?;
321331
let entries = entries

src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
5151
pub const SCHEMA_FILE_NAME: &str = ".schema";
5252
pub const ALERT_FILE_NAME: &str = ".alert.json";
5353
pub const MANIFEST_FILE: &str = "manifest.json";
54-
pub const CORRELATION_DIRECTORY: &str = ".correlations";
54+
pub const CORRELATIONS_ROOT_DIRECTORY: &str = ".correlations";
5555

5656
/// local sync interval to move data.records to /tmp dir of that stream.
5757
/// 60 sec is a reasonable value.

src/storage/object_storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use super::{
2121
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
2222
};
2323
use super::{
24-
ALERT_FILE_NAME, CORRELATION_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
24+
ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
2525
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2626
};
2727

@@ -631,15 +631,15 @@ pub trait ObjectStorage: Send + Sync + 'static {
631631
correlation: &CorrelationConfig,
632632
) -> Result<(), ObjectStorageError> {
633633
let path = RelativePathBuf::from_iter([
634-
CORRELATION_DIRECTORY,
634+
CORRELATIONS_ROOT_DIRECTORY,
635635
&format!("{}.json", correlation.id),
636636
]);
637637
self.put_object(&path, to_bytes(correlation)).await?;
638638
Ok(())
639639
}
640640

641641
async fn get_correlations(&self) -> Result<Vec<Bytes>, CorrelationError> {
642-
let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIRECTORY]);
642+
let correlation_path = RelativePathBuf::from_iter([CORRELATIONS_ROOT_DIRECTORY]);
643643
let correlation_bytes = self
644644
.get_objects(
645645
Some(&correlation_path),

0 commit comments

Comments
 (0)