From 50bc2e01c88f98260ccc610bc16375862e0045b8 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 27 Mar 2024 17:06:56 +0530 Subject: [PATCH 1/2] update the root path for the metadata files only the .parseable.json and the ingest metadata files --- server/src/handlers/http/modal/ingest_server.rs | 12 ++++-------- server/src/migration.rs | 14 ++++++++------ server/src/storage.rs | 1 + server/src/storage/localfs.rs | 5 +++-- server/src/storage/object_storage.rs | 14 ++++++++++---- server/src/storage/s3.rs | 6 ++++-- server/src/storage/store_metadata.rs | 4 ++-- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index baf49bf65..254f47085 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -27,8 +27,9 @@ use crate::metrics; use crate::rbac; use crate::rbac::role::Action; use crate::storage; +use crate::storage::object_storage::ingester_metadata_path; +use crate::storage::object_storage::parseable_json_path; use crate::storage::ObjectStorageError; -use crate::storage::PARSEABLE_METADATA_FILE_NAME; use crate::sync; use super::server::Server; @@ -180,13 +181,8 @@ impl IngestServer { async fn set_ingester_metadata(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); - // remove ip adn go with the domain name let sock = Server::get_server_address(); - let path = RelativePathBuf::from(format!( - "ingester.{}.{}.json", - sock.ip(), // this might be wrong - sock.port() - )); + let path = ingester_metadata_path(sock.ip().to_string(), sock.port().to_string()); if store.get_object(&path).await.is_ok() { println!("Ingester metadata already exists"); @@ -228,7 +224,7 @@ impl IngestServer { // i.e the querier will create the `.parseable.json` file let store = CONFIG.storage().get_object_store(); - let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + let path = parseable_json_path(); match store.get_object(&path).await { Ok(_) => Ok(()), diff --git a/server/src/migration.rs b/server/src/migration.rs index 40e7c1340..4322d5994 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -30,8 +30,7 @@ use serde::Serialize; use crate::{ option::Config, storage::{ - object_storage::stream_json_path, ObjectStorage, ObjectStorageError, - PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, + object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError,SCHEMA_FILE_NAME, }, }; @@ -153,7 +152,8 @@ fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { } pub fn get_staging_metadata(config: &Config) -> anyhow::Result> { - let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); + let path = parseable_json_path().to_path(config.staging_dir()); + let bytes = match std::fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { @@ -162,13 +162,14 @@ pub fn get_staging_metadata(config: &Config) -> anyhow::Result anyhow::Result> { - let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + let path = parseable_json_path(); match storage.get_object(&path).await { Ok(bytes) => Ok(Some( serde_json::from_slice(&bytes).expect("parseable config is valid json"), @@ -187,13 +188,14 @@ pub async fn put_remote_metadata( storage: &dyn ObjectStorage, metadata: &serde_json::Value, ) -> anyhow::Result<()> { - let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + let path = parseable_json_path(); let metadata = serde_json::to_vec(metadata)?.into(); Ok(storage.put_object(&path, metadata).await?) } pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> { - let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); + let path = parseable_json_path().to_path(config.staging_dir()); + //config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() .create(true) .truncate(true) diff --git a/server/src/storage.rs b/server/src/storage.rs index 192b12733..2eda8f592 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -43,6 +43,7 @@ pub use self::staging::StorageDir; // metadata file names in a Stream prefix pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; +pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERT_FILE_NAME: &str = ".alert.json"; pub const MANIFEST_FILE: &str = "manifest.json"; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 0bfd26b7d..0fb662ca6 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -35,7 +35,8 @@ use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::option::validation; use super::{ - LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, STREAM_METADATA_FILE_NAME, + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, }; #[derive(Debug, Clone, clap::Args)] @@ -202,7 +203,7 @@ impl ObjectStorage for LocalFS { } async fn list_streams(&self) -> Result, ObjectStorageError> { - let ignore_dir = &["lost+found"]; + let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 9f54c7a30..671a2bd9d 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -21,8 +21,8 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, + ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, }; use crate::option::Mode; @@ -508,9 +508,10 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { } } +/// path will be ".parseable/.parsable.json" #[inline(always)] -fn parseable_json_path() -> RelativePathBuf { - RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME) +pub fn parseable_json_path() -> RelativePathBuf { + RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, PARSEABLE_METADATA_FILE_NAME]) } #[inline(always)] @@ -524,3 +525,8 @@ fn manifest_path(prefix: &str) -> RelativePathBuf { let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); RelativePathBuf::from_iter([prefix, &mainfest_file_name]) } + +#[inline(always)] +pub fn ingester_metadata_path(ip: String, port: String) -> RelativePathBuf { + RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, &format!("ingester.{}.{}.json", ip, port)]) +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 3db6aaf23..81d8e2ac8 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -39,7 +39,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; +use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; use super::metrics_layer::MetricLayer; use super::{ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, STREAM_METADATA_FILE_NAME}; @@ -297,11 +297,13 @@ impl S3 { let common_prefixes = resp.common_prefixes; // return prefixes at the root level - let dirs: Vec<_> = common_prefixes + let mut dirs: Vec<_> = common_prefixes .iter() .filter_map(|path| path.parts().next()) .map(|name| name.as_ref().to_string()) .collect(); + // filter out the root directory + dirs.retain(|x| x != PARSEABLE_ROOT_DIRECTORY); let stream_json_check = FuturesUnordered::new(); diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 4ad410e34..c5d0c441d 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -33,7 +33,7 @@ use crate::{ utils::uid, }; -use super::PARSEABLE_METADATA_FILE_NAME; +use super::{object_storage::parseable_json_path, PARSEABLE_METADATA_FILE_NAME}; // Expose some static variables for internal usage pub static STORAGE_METADATA: OnceCell = OnceCell::new(); @@ -237,7 +237,7 @@ fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataE } pub fn get_staging_metadata() -> io::Result> { - let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); + let path = parseable_json_path().to_path(CONFIG.staging_dir()); let bytes = match fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { From 3f4b94f0d9ceea5d0e3eaa0573fc49b586a6ac8b Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 29 Mar 2024 18:33:42 +0530 Subject: [PATCH 2/2] migration of metadata files to seperate directorie --- server/src/handlers/http/cluster/mod.rs | 3 +- .../src/handlers/http/modal/ingest_server.rs | 24 ++- .../src/handlers/http/modal/query_server.rs | 22 ++- server/src/handlers/http/modal/server.rs | 13 +- server/src/main.rs | 1 - server/src/migration.rs | 103 ++++++++++++- server/src/option.rs | 4 +- server/src/storage.rs | 1 + server/src/storage/localfs.rs | 143 +++++++++++++++++- server/src/storage/object_storage.rs | 36 +++-- server/src/storage/s3.rs | 104 +++++++++++-- server/src/storage/store_metadata.rs | 56 ++++--- 12 files changed, 423 insertions(+), 87 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index d9e30c0b9..8a8a48d09 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -27,6 +27,7 @@ use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::storage::ObjectStorageError; +use crate::storage::PARSEABLE_ROOT_DIRECTORY; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use http::StatusCode; @@ -338,7 +339,7 @@ pub async fn get_cluster_metrics() -> Result { pub async fn get_ingester_info() -> anyhow::Result { let store = CONFIG.storage().get_object_store(); - let root_path = RelativePathBuf::from(""); + let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); let arr = store .get_objects(Some(&root_path)) .await? diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 254f47085..63665e11b 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -102,6 +102,17 @@ impl ParseableServer for IngestServer { /// implement the init method will just invoke the initialize method async fn init(&self) -> anyhow::Result<()> { self.validate()?; + // check for querier state. Is it there, or was it there in the past + self.check_querier_state().await?; + // to get the .parseable.json file in staging + self.validate_credentials().await?; + + let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; + rbac::map::init(&metadata); + // set the info in the global metadata + metadata.set_global(); + self.initialize().await } @@ -267,19 +278,6 @@ impl IngestServer { } async fn initialize(&self) -> anyhow::Result<()> { - // check for querier state. Is it there, or was it there in the past - self.check_querier_state().await?; - // to get the .parseable.json file in staging - self.validate_credentials().await?; - - let metadata = storage::resolve_parseable_metadata().await?; - banner::print(&CONFIG, &metadata).await; - - rbac::map::init(&metadata); - - // set the info in the global metadata - metadata.set_global(); - if let Some(cache_manager) = LocalCacheManager::global() { cache_manager .validate(CONFIG.parseable.local_cache_size) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 825808566..ca554dbf5 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -98,6 +98,15 @@ impl ParseableServer for QueryServer { /// implementation of init should just invoke a call to initialize async fn init(&self) -> anyhow::Result<()> { self.validate()?; + migration::run_file_migration(&CONFIG).await?; + CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG).await?; + let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; + // initialize the rbac map + rbac::map::init(&metadata); + // keep metadata info in mem + metadata.set_global(); self.initialize().await } @@ -165,18 +174,6 @@ impl QueryServer { /// initialize the server, run migrations as needed and start the server async fn initialize(&self) -> anyhow::Result<()> { - migration::run_metadata_migration(&CONFIG).await?; - - let metadata = storage::resolve_parseable_metadata().await?; - - banner::print(&CONFIG, &metadata).await; - - // initialize the rbac map - rbac::map::init(&metadata); - - // keep metadata info in mem - metadata.set_global(); - let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); @@ -189,7 +186,6 @@ impl QueryServer { // track all parquet files already in the data directory storage::retention::load_retention_from_global(); - // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 7d6a8067b..2f51d07e4 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -141,6 +141,13 @@ impl ParseableServer for Server { /// implementation of init should just invoke a call to initialize async fn init(&self) -> anyhow::Result<()> { self.validate()?; + migration::run_file_migration(&CONFIG).await?; + CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG).await?; + let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; + rbac::map::init(&metadata); + metadata.set_global(); self.initialize().await } @@ -405,12 +412,6 @@ impl Server { } async fn initialize(&self) -> anyhow::Result<()> { - migration::run_metadata_migration(&CONFIG).await?; - let metadata = storage::resolve_parseable_metadata().await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - if let Some(cache_manager) = LocalCacheManager::global() { cache_manager .validate(CONFIG.parseable.local_cache_size) diff --git a/server/src/main.rs b/server/src/main.rs index 68c713960..cf4a0fc5b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -56,7 +56,6 @@ pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; #[actix_web::main] async fn main() -> anyhow::Result<()> { env_logger::init(); - CONFIG.validate_storage().await?; // these are empty ptrs so mem footprint should be minimal let server: Arc = match CONFIG.parseable.mode { diff --git a/server/src/migration.rs b/server/src/migration.rs index 4322d5994..1207bc30a 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -21,16 +21,19 @@ mod metadata_migration; mod schema_migration; mod stream_metadata_migration; -use std::fs::OpenOptions; +use std::{fs::OpenOptions, sync::Arc}; use bytes::Bytes; +use itertools::Itertools; use relative_path::RelativePathBuf; use serde::Serialize; use crate::{ option::Config, storage::{ - object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError,SCHEMA_FILE_NAME, + object_storage::{parseable_json_path, stream_json_path}, + ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; @@ -120,7 +123,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: .put_object(&path, to_bytes(&new_stream_metadata)) .await?; - let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]); + let schema_path = + RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema).ok(); let map = schema_migration::v1_v3(schema)?; @@ -132,7 +136,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: .put_object(&path, to_bytes(&new_stream_metadata)) .await?; - let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]); + let schema_path = + RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema)?; let map = schema_migration::v2_v3(schema)?; @@ -195,7 +200,6 @@ pub async fn put_remote_metadata( pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> { let path = parseable_json_path().to_path(config.staging_dir()); - //config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() .create(true) .truncate(true) @@ -204,3 +208,92 @@ pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> an serde_json::to_writer(&mut file, metadata)?; Ok(()) } + +pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { + let object_store = config.storage().get_object_store(); + + let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + + // if this errors that means migrations is already done + if let Err(err) = object_store.get_object(&old_meta_file_path).await { + if matches!(err, ObjectStorageError::NoSuchKey(_)) { + return Ok(()); + } + return Err(err.into()); + } + + run_meta_file_migration(&object_store, old_meta_file_path).await?; + run_stream_files_migration(object_store).await?; + + Ok(()) +} + +async fn run_meta_file_migration( + object_store: &Arc, + old_meta_file_path: RelativePathBuf, +) -> anyhow::Result<()> { + log::info!("Migrating metadata files to new location"); + + // get the list of all meta files + let mut meta_files = object_store.get_ingester_meta_file_paths().await?; + meta_files.push(old_meta_file_path); + + for file in meta_files { + match object_store.get_object(&file).await { + Ok(bytes) => { + // we can unwrap here because we know the file exists + let new_path = RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + file.file_name().unwrap(), + ]); + object_store.put_object(&new_path, bytes).await?; + object_store.delete_object(&file).await?; + } + Err(err) => { + // if error is not a no such key error, something weird happened + // so return the error + if !matches!(err, ObjectStorageError::NoSuchKey(_)) { + return Err(err.into()); + } + } + } + } + + Ok(()) +} + +async fn run_stream_files_migration( + object_store: Arc, +) -> anyhow::Result<()> { + let streams = object_store + .list_old_streams() + .await? + .into_iter() + .map(|stream| stream.name) + .collect_vec(); + + for stream in streams { + let paths = object_store.get_stream_file_paths(&stream).await?; + + for path in paths { + match object_store.get_object(&path).await { + Ok(bytes) => { + let new_path = RelativePathBuf::from_iter([ + stream.as_str(), + STREAM_ROOT_DIRECTORY, + path.file_name().unwrap(), + ]); + object_store.put_object(&new_path, bytes).await?; + object_store.delete_object(&path).await?; + } + Err(err) => { + if !matches!(err, ObjectStorageError::NoSuchKey(_)) { + return Err(err.into()); + } + } + } + } + } + + Ok(()) +} diff --git a/server/src/option.rs b/server/src/option.rs index 0607c826c..43bed851d 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -26,7 +26,7 @@ use std::path::PathBuf; use std::sync::Arc; use crate::cli::Cli; -use crate::storage::PARSEABLE_METADATA_FILE_NAME; +use crate::storage::object_storage::parseable_json_path; use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB pub const JOIN_COMMUNITY: &str = @@ -102,7 +102,7 @@ impl Config { // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> { let obj_store = self.storage.get_object_store(); - let rel_path = relative_path::RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + let rel_path = parseable_json_path(); let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok(); diff --git a/server/src/storage.rs b/server/src/storage.rs index 2eda8f592..2134cf50b 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -43,6 +43,7 @@ pub use self::staging::StorageDir; // metadata file names in a Stream prefix pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; +pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERT_FILE_NAME: &str = ".alert.json"; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 0fb662ca6..73f75d5b6 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -27,7 +27,7 @@ use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; use fs_extra::file::CopyOptions; use futures::{stream::FuturesUnordered, TryStreamExt}; -use relative_path::RelativePath; +use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; @@ -36,7 +36,7 @@ use crate::option::validation; use super::{ LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, - STREAM_METADATA_FILE_NAME, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; #[derive(Debug, Clone, clap::Args)] @@ -114,6 +114,81 @@ impl ObjectStorage for LocalFS { res } + async fn get_ingester_meta_file_paths( + &self, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + + let mut path_arr = vec![]; + let mut entries = fs::read_dir(&self.root).await?; + + while let Some(entry) = entries.next_entry().await? { + let flag = entry + .path() + .file_name() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .contains("ingester"); + + if flag { + path_arr.push( + RelativePathBuf::from_path(entry.path().file_name().unwrap()) + .map_err(ObjectStorageError::PathError)?, + ); + } + } + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) // this might not be the right status code + .observe(time); + + Ok(path_arr) + } + + async fn get_stream_file_paths( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + + // = data/stream_name + let stream_dir_path = self.path_in_root(&RelativePathBuf::from(stream_name)); + let mut entries = fs::read_dir(&stream_dir_path).await?; + + while let Some(entry) = entries.next_entry().await? { + let flag = entry + .path() + .file_name() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .contains("ingester"); + + if flag { + path_arr.push(RelativePathBuf::from_iter([ + stream_name, + entry.path().file_name().unwrap().to_str().unwrap(), + ])); + } + } + + path_arr.push(RelativePathBuf::from_iter([ + stream_name, + STREAM_METADATA_FILE_NAME, + ])); + path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])); + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) // this might not be the right status code + .observe(time); + + Ok(path_arr) + } + async fn get_objects( &self, base_path: Option<&RelativePath>, @@ -183,6 +258,12 @@ impl ObjectStorage for LocalFS { Ok(()) } + async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + let path = self.path_in_root(path); + tokio::fs::remove_file(path).await?; + Ok(()) + } + async fn check(&self) -> Result<(), ObjectStorageError> { fs::create_dir_all(&self.root) .await @@ -222,6 +303,26 @@ impl ObjectStorage for LocalFS { Ok(logstreams) } + async fn list_old_streams(&self) -> Result, ObjectStorageError> { + let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY]; + let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); + let entries: Vec = directories.try_collect().await?; + let entries = entries + .into_iter() + .map(|entry| dir_with_old_stream(entry, ignore_dir)); + + let logstream_dirs: Vec> = + FuturesUnordered::from_iter(entries).try_collect().await?; + + let logstreams = logstream_dirs + .into_iter() + .flatten() + .map(|name| LogStream { name }) + .collect(); + + Ok(logstreams) + } + async fn list_dirs(&self) -> Result, ObjectStorageError> { let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?) .try_collect::>() @@ -293,7 +394,7 @@ impl ObjectStorage for LocalFS { } } -async fn dir_with_stream( +async fn dir_with_old_stream( entry: DirEntry, ignore_dirs: &[&str], ) -> Result, ObjectStorageError> { @@ -327,6 +428,42 @@ async fn dir_with_stream( } } +async fn dir_with_stream( + entry: DirEntry, + ignore_dirs: &[&str], +) -> Result, ObjectStorageError> { + let dir_name = entry + .path() + .file_name() + .expect("valid path") + .to_str() + .expect("valid unicode") + .to_owned(); + + if ignore_dirs.contains(&dir_name.as_str()) { + return Ok(None); + } + + if entry.file_type().await?.is_dir() { + let path = entry.path(); + + // even in ingest mode, we should only look for the global stream metadata file + let stream_json_path = path + .join(STREAM_ROOT_DIRECTORY) + .join(STREAM_METADATA_FILE_NAME); + + if stream_json_path.exists() { + Ok(Some(dir_name)) + } else { + let err: Box = + format!("found {}", entry.path().display()).into(); + Err(ObjectStorageError::UnhandledError(err)) + } + } else { + Ok(None) + } +} + async fn dir_name(entry: DirEntry) -> Result, ObjectStorageError> { if entry.file_type().await?.is_dir() { let dir_name = entry diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 671a2bd9d..fef2c8199 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -22,7 +22,7 @@ use super::{ }; use super::{ ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::option::Mode; @@ -79,9 +79,18 @@ pub trait ObjectStorage: Sync + 'static { async fn check(&self) -> Result<(), ObjectStorageError>; async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; async fn list_streams(&self) -> Result, ObjectStorageError>; + async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; + async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>; + async fn get_ingester_meta_file_paths( + &self, + ) -> Result, ObjectStorageError>; + async fn get_stream_file_paths( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError>; async fn try_delete_ingester_meta( &self, ingester_filename: String, @@ -180,7 +189,8 @@ pub trait ObjectStorage: Sync + 'static { &self, stream_name: &str, ) -> Result { - let schema_path = RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]); + let schema_path = + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema_map = self.get_object(&schema_path).await?; Ok(serde_json::from_slice(&schema_map)?) } @@ -218,6 +228,7 @@ pub trait ObjectStorage: Sync + 'static { let bytes = self .get_object(&RelativePathBuf::from_iter([ stream_name, + STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, ])) .await?; @@ -488,9 +499,11 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { let (ip, port) = get_address(); let file_name = format!(".ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME); - RelativePathBuf::from_iter([stream_name, &file_name]) + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } + Mode::All | Mode::Query => { + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]) } - Mode::All | Mode::Query => RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]), } } @@ -500,11 +513,13 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { Mode::Ingest => { let (ip, port) = get_address(); let file_name = format!(".ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME); - RelativePathBuf::from_iter([stream_name, &file_name]) - } - Mode::Query | Mode::All => { - RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]) + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) } + Mode::Query | Mode::All => RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, + ]), } } @@ -528,5 +543,8 @@ fn manifest_path(prefix: &str) -> RelativePathBuf { #[inline(always)] pub fn ingester_metadata_path(ip: String, port: String) -> RelativePathBuf { - RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, &format!("ingester.{}.{}.json", ip, port)]) + RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + &format!("ingester.{}.{}.json", ip, port), + ]) } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 81d8e2ac8..27ec949fc 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -42,7 +42,10 @@ use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; use super::metrics_layer::MetricLayer; -use super::{ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, STREAM_METADATA_FILE_NAME}; +use super::{ + ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, +}; // in bytes const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; @@ -294,24 +297,23 @@ impl S3 { async fn _list_streams(&self) -> Result, ObjectStorageError> { let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; + let common_prefixes = resp.common_prefixes; // get all dirs // return prefixes at the root level - let mut dirs: Vec<_> = common_prefixes + let dirs: Vec<_> = common_prefixes .iter() .filter_map(|path| path.parts().next()) .map(|name| name.as_ref().to_string()) + .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) .collect(); - // filter out the root directory - dirs.retain(|x| x != PARSEABLE_ROOT_DIRECTORY); let stream_json_check = FuturesUnordered::new(); - // even in ingest mode, we should only look for the global stream metadata file - let file_name = STREAM_METADATA_FILE_NAME.to_string(); - for dir in &dirs { - let key = format!("{}/{}", dir, file_name); + let key = format!( + "{}/{}/{}", + dir, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME + ); let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } @@ -452,6 +454,60 @@ impl ObjectStorage for S3 { Ok(res) } + async fn get_ingester_meta_file_paths( + &self, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + let mut object_stream = self.client.list(Some(&self.root)).await?; + + while let Some(meta) = object_stream.next().await.transpose()? { + let flag = meta.location.filename().unwrap().starts_with("ingester"); + + if flag { + path_arr.push(RelativePathBuf::from(meta.location.as_ref())); + } + } + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + + Ok(path_arr) + } + + async fn get_stream_file_paths( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + let path = to_object_store_path(&RelativePathBuf::from(stream_name)); + let mut object_stream = self.client.list(Some(&path)).await?; + + while let Some(meta) = object_stream.next().await.transpose()? { + let flag = meta.location.filename().unwrap().starts_with(".ingester"); + + if flag { + path_arr.push(RelativePathBuf::from(meta.location.as_ref())); + } + } + + path_arr.push(RelativePathBuf::from_iter([ + stream_name, + STREAM_METADATA_FILE_NAME, + ])); + path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])); + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + + Ok(path_arr) + } + async fn put_object( &self, path: &RelativePath, @@ -470,6 +526,10 @@ impl ObjectStorage for S3 { Ok(()) } + async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + Ok(self.client.delete(&to_object_store_path(path)).await?) + } + async fn check(&self) -> Result<(), ObjectStorageError> { Ok(self .client @@ -511,6 +571,32 @@ impl ObjectStorage for S3 { Ok(streams) } + async fn list_old_streams(&self) -> Result, ObjectStorageError> { + let resp = self.client.list_with_delimiter(None).await?; + + let common_prefixes = resp.common_prefixes; // get all dirs + + // return prefixes at the root level + let dirs: Vec<_> = common_prefixes + .iter() + .filter_map(|path| path.parts().next()) + .map(|name| name.as_ref().to_string()) + .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) + .collect(); + + let stream_json_check = FuturesUnordered::new(); + + for dir in &dirs { + let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); + let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + stream_json_check.push(task); + } + + stream_json_check.try_collect().await?; + + Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let streams = self._list_dates(stream_name).await?; diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index c5d0c441d..18a3efaeb 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -126,33 +126,39 @@ pub async fn resolve_parseable_metadata() -> Result { - create_dir_all(CONFIG.staging_dir())?; - metadata.staging = CONFIG.staging_dir().canonicalize()?; - // this flag is set to true so that metadata is copied to staging - overwrite_staging = true; - // overwrite remote in all and query mode - // because staging dir has changed. - match CONFIG.parseable.mode { - Mode::All => { - standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) - .map_err(|err| { - ObjectStorageError::Custom(err.to_string()) - })?; + // if server is started in ingest mode,we need to make sure that query mode has been started + // i.e the metadata is updated to reflect the server mode = Query + if Mode::from_string(&metadata.server_mode).unwrap() == Mode::All && CONFIG.parseable.mode == Mode::Ingest { + Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") + } else { + create_dir_all(CONFIG.staging_dir())?; + metadata.staging = CONFIG.staging_dir().canonicalize()?; + // this flag is set to true so that metadata is copied to staging + overwrite_staging = true; + // overwrite remote in all and query mode + // because staging dir has changed. + match CONFIG.parseable.mode { + Mode::All => { + standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) + .map_err(|err| { + ObjectStorageError::Custom(err.to_string()) + })?; + overwrite_remote = true; + }, + Mode::Query => { overwrite_remote = true; - }, - Mode::Query => { - overwrite_remote = true; - metadata.server_mode = CONFIG.parseable.mode.to_string(); - metadata.staging = CONFIG.staging_dir().to_path_buf(); - }, - Mode::Ingest => { - // if ingest server is started fetch the metadata from remote - // update the server mode for local metadata - metadata.server_mode = CONFIG.parseable.mode.to_string(); - metadata.staging = CONFIG.staging_dir().to_path_buf(); - }, + metadata.server_mode = CONFIG.parseable.mode.to_string(); + metadata.staging = CONFIG.staging_dir().to_path_buf(); + }, + Mode::Ingest => { + // if ingest server is started fetch the metadata from remote + // update the server mode for local metadata + metadata.server_mode = CONFIG.parseable.mode.to_string(); + metadata.staging = CONFIG.staging_dir().to_path_buf(); + }, + } + Ok(metadata) } - Ok(metadata) } EnvChange::CreateBoth => { create_dir_all(CONFIG.staging_dir())?;