Skip to content

Commit 754d813

Browse files
committed
update the root path for the metadata files
only the .parseable.json and the ingest metadata files
1 parent d481323 commit 754d813

File tree

7 files changed

+32
-24
lines changed

7 files changed

+32
-24
lines changed

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ use crate::metrics;
2727
use crate::rbac;
2828
use crate::rbac::role::Action;
2929
use crate::storage;
30+
use crate::storage::object_storage::ingester_metadata_path;
31+
use crate::storage::object_storage::parseable_json_path;
3032
use crate::storage::ObjectStorageError;
31-
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
3233
use crate::sync;
3334

3435
use super::server::Server;
@@ -180,13 +181,8 @@ impl IngestServer {
180181
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
181182
let store = CONFIG.storage().get_object_store();
182183

183-
// remove ip adn go with the domain name
184184
let sock = Server::get_server_address();
185-
let path = RelativePathBuf::from(format!(
186-
"ingester.{}.{}.json",
187-
sock.ip(), // this might be wrong
188-
sock.port()
189-
));
185+
let path = ingester_metadata_path(sock.ip().to_string(), sock.port().to_string());
190186

191187
if store.get_object(&path).await.is_ok() {
192188
println!("Ingester metadata already exists");
@@ -228,7 +224,7 @@ impl IngestServer {
228224
// i.e the querier will create the `.parseable.json` file
229225

230226
let store = CONFIG.storage().get_object_store();
231-
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
227+
let path = parseable_json_path();
232228

233229
match store.get_object(&path).await {
234230
Ok(_) => Ok(()),

server/src/migration.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ use serde::Serialize;
3030
use crate::{
3131
option::Config,
3232
storage::{
33-
object_storage::stream_json_path, ObjectStorage, ObjectStorageError,
34-
PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME,
33+
object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError,SCHEMA_FILE_NAME,
3534
},
3635
};
3736

@@ -153,7 +152,8 @@ fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
153152
}
154153

155154
pub fn get_staging_metadata(config: &Config) -> anyhow::Result<Option<serde_json::Value>> {
156-
let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
155+
let path = parseable_json_path().to_path(config.staging_dir());
156+
157157
let bytes = match std::fs::read(path) {
158158
Ok(bytes) => bytes,
159159
Err(err) => match err.kind() {
@@ -162,13 +162,14 @@ pub fn get_staging_metadata(config: &Config) -> anyhow::Result<Option<serde_json
162162
},
163163
};
164164
let meta: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
165+
165166
Ok(Some(meta))
166167
}
167168

168169
async fn get_storage_metadata(
169170
storage: &dyn ObjectStorage,
170171
) -> anyhow::Result<Option<serde_json::Value>> {
171-
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
172+
let path = parseable_json_path();
172173
match storage.get_object(&path).await {
173174
Ok(bytes) => Ok(Some(
174175
serde_json::from_slice(&bytes).expect("parseable config is valid json"),
@@ -187,13 +188,14 @@ pub async fn put_remote_metadata(
187188
storage: &dyn ObjectStorage,
188189
metadata: &serde_json::Value,
189190
) -> anyhow::Result<()> {
190-
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
191+
let path = parseable_json_path();
191192
let metadata = serde_json::to_vec(metadata)?.into();
192193
Ok(storage.put_object(&path, metadata).await?)
193194
}
194195

195196
pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> {
196-
let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
197+
let path = parseable_json_path().to_path(config.staging_dir());
198+
//config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
197199
let mut file = OpenOptions::new()
198200
.create(true)
199201
.truncate(true)

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use self::staging::StorageDir;
4343
// metadata file names in a Stream prefix
4444
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
4545
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
46+
pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
4647
pub const SCHEMA_FILE_NAME: &str = ".schema";
4748
pub const ALERT_FILE_NAME: &str = ".alert.json";
4849
pub const MANIFEST_FILE: &str = "manifest.json";

server/src/storage/localfs.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
3535
use crate::option::validation;
3636

3737
use super::{
38-
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, STREAM_METADATA_FILE_NAME,
38+
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
39+
STREAM_METADATA_FILE_NAME,
3940
};
4041

4142
#[derive(Debug, Clone, clap::Args)]
@@ -194,7 +195,7 @@ impl ObjectStorage for LocalFS {
194195
}
195196

196197
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
197-
let ignore_dir = &["lost+found"];
198+
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
198199
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
199200
let entries: Vec<DirEntry> = directories.try_collect().await?;
200201
let entries = entries

server/src/storage/object_storage.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use super::{
2121
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
2222
};
2323
use super::{
24-
ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME,
25-
STREAM_METADATA_FILE_NAME,
24+
ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
25+
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
2626
};
2727

2828
use crate::option::Mode;
@@ -505,9 +505,10 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
505505
}
506506
}
507507

508+
/// path will be ".parseable/.parsable.json"
508509
#[inline(always)]
509-
fn parseable_json_path() -> RelativePathBuf {
510-
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME)
510+
pub fn parseable_json_path() -> RelativePathBuf {
511+
RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, PARSEABLE_METADATA_FILE_NAME])
511512
}
512513

513514
#[inline(always)]
@@ -521,3 +522,8 @@ fn manifest_path(prefix: &str) -> RelativePathBuf {
521522
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
522523
RelativePathBuf::from_iter([prefix, &mainfest_file_name])
523524
}
525+
526+
#[inline(always)]
527+
pub fn ingester_metadata_path(ip: String, port: String) -> RelativePathBuf {
528+
RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, &format!("ingester.{}.{}.json", ip, port)])
529+
}

server/src/storage/s3.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::sync::Arc;
3939
use std::time::{Duration, Instant};
4040

4141
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
42-
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
42+
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
4343

4444
use super::metrics_layer::MetricLayer;
4545
use super::{ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, STREAM_METADATA_FILE_NAME};
@@ -297,11 +297,13 @@ impl S3 {
297297
let common_prefixes = resp.common_prefixes;
298298

299299
// return prefixes at the root level
300-
let dirs: Vec<_> = common_prefixes
300+
let mut dirs: Vec<_> = common_prefixes
301301
.iter()
302302
.filter_map(|path| path.parts().next())
303303
.map(|name| name.as_ref().to_string())
304304
.collect();
305+
// filter out the root directory
306+
dirs.retain(|x| x != PARSEABLE_ROOT_DIRECTORY);
305307

306308
let stream_json_check = FuturesUnordered::new();
307309

server/src/storage/store_metadata.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
utils::uid,
3434
};
3535

36-
use super::PARSEABLE_METADATA_FILE_NAME;
36+
use super::{object_storage::parseable_json_path, PARSEABLE_METADATA_FILE_NAME};
3737

3838
// Expose some static variables for internal usage
3939
pub static STORAGE_METADATA: OnceCell<StaticStorageMetadata> = OnceCell::new();
@@ -237,7 +237,7 @@ fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataE
237237
}
238238

239239
pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
240-
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
240+
let path = parseable_json_path().to_path(CONFIG.staging_dir());
241241
let bytes = match fs::read(path) {
242242
Ok(bytes) => bytes,
243243
Err(err) => match err.kind() {

0 commit comments

Comments
 (0)