From 950dc60a6dd02618747f749b20e10d9524c07af4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 28 Feb 2023 13:49:34 +0530 Subject: [PATCH 1/3] Fix LocalFS stream listing --- server/src/storage/localfs.rs | 57 +++++++++++++++++----------- server/src/storage/object_storage.rs | 2 +- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index dccce14a7..01063640a 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -34,15 +34,15 @@ use datafusion::{ execution::runtime_env::{RuntimeConfig, RuntimeEnv}, }; use fs_extra::file::{move_file, CopyOptions}; -use futures::StreamExt; +use futures::{stream::FuturesUnordered, TryStreamExt}; use relative_path::RelativePath; -use tokio::fs; +use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::{option::validation, utils::validate_path_is_writeable}; -use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; +use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; #[derive(Debug, Clone, clap::Args)] #[command( @@ -152,29 +152,42 @@ impl ObjectStorage for LocalFS { let path = self.root.join(stream_name); Ok(fs::remove_dir_all(path).await?) } + async fn list_streams(&self) -> Result, ObjectStorageError> { + let dir_with_stream = |entry: DirEntry| async { + if entry.file_type().await?.is_dir() { + let path = entry.path(); + let stream_json_path = path.join(object_storage::STREAM_METADATA_FILE_NAME); + Ok(stream_json_path.exists().then_some(entry)) + } else { + Ok(None) + } + }; + let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); - let directories = directories - .filter_map(|res| async { - let entry = res.ok()?; - if entry.file_type().await.ok()?.is_dir() { - Some(LogStream { - name: entry - .path() - .file_name() - .expect("valid path") - .to_str() - .expect("valid unicode") - .to_string(), - }) - } else { - None - } + let entries: Vec = directories.try_collect().await?; + let entries = entries.into_iter().map(|entry| dir_with_stream(entry)); + let entries = FuturesUnordered::from_iter(entries); + + let logstream_dirs: Result>, std::io::Error> = + entries.try_collect().await; + + let logstreams = logstream_dirs? + .into_iter() + .flatten() + .map(|entry| { + entry + .path() + .file_name() + .expect("valid path") + .to_str() + .expect("valid unicode") + .to_owned() }) - .collect::>() - .await; + .map(|name| LogStream { name }) + .collect(); - Ok(directories) + Ok(logstreams) } async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index be1c3a728..884172f4d 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -57,7 +57,7 @@ use std::{ }; // metadata file names in a Stream prefix -const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; +pub(super) const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; From 1d2936df5a53fd72d6f085f9263e38322e0863d5 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 28 Feb 2023 14:25:35 +0530 Subject: [PATCH 2/3] add ignore directories --- server/src/storage/localfs.rs | 62 +++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 01063640a..555a215b6 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -154,36 +154,19 @@ impl ObjectStorage for LocalFS { } async fn list_streams(&self) -> Result, ObjectStorageError> { - let dir_with_stream = |entry: DirEntry| async { - if entry.file_type().await?.is_dir() { - let path = entry.path(); - let stream_json_path = path.join(object_storage::STREAM_METADATA_FILE_NAME); - Ok(stream_json_path.exists().then_some(entry)) - } else { - Ok(None) - } - }; - + let ignore_dir = &["lost+found"]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; - let entries = entries.into_iter().map(|entry| dir_with_stream(entry)); - let entries = FuturesUnordered::from_iter(entries); + let entries = entries + .into_iter() + .map(|entry| dir_with_stream(entry, ignore_dir)); - let logstream_dirs: Result>, std::io::Error> = - entries.try_collect().await; + let logstream_dirs: Vec> = + FuturesUnordered::from_iter(entries).try_collect().await?; - let logstreams = logstream_dirs? + let logstreams = logstream_dirs .into_iter() .flatten() - .map(|entry| { - entry - .path() - .file_name() - .expect("valid path") - .to_str() - .expect("valid unicode") - .to_owned() - }) .map(|name| LogStream { name }) .collect(); @@ -241,6 +224,37 @@ impl ObjectStorage for LocalFS { } } +async fn dir_with_stream( + entry: DirEntry, + ignore_dirs: &[&str], +) -> Result, ObjectStorageError> { + let dir_name = entry + .path() + .file_name() + .expect("valid path") + .to_str() + .expect("valid unicode") + .to_owned(); + + if ignore_dirs.contains(&dir_name.as_str()) { + return Ok(None); + } + + if entry.file_type().await?.is_dir() { + let path = entry.path(); + let stream_json_path = path.join(object_storage::STREAM_METADATA_FILE_NAME); + if stream_json_path.exists() { + Ok(Some(dir_name)) + } else { + let err: Box = + format!("found {}", entry.path().display()).into(); + Err(ObjectStorageError::UnhandledError(err)) + } + } else { + Ok(None) + } +} + impl From for ObjectStorageError { fn from(e: fs_extra::error::Error) -> Self { ObjectStorageError::UnhandledError(Box::new(e)) From 8d18ea5d9a4c0b1b3a751340cce487a29c101caa Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 28 Feb 2023 16:19:41 +0530 Subject: [PATCH 3/3] Check for stream.json in s3 --- server/src/storage/s3.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index c903c1bab..9a454bb4f 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -36,7 +36,9 @@ use datafusion::datasource::listing::{ use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::error::DataFusionError; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use futures::StreamExt; +use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use md5::{Digest, Md5}; use object_store::aws::AmazonS3Builder; use object_store::limit::LimitStore; @@ -50,7 +52,7 @@ use std::time::Instant; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; -use super::ObjectStorageProvider; +use super::{object_storage, ObjectStorageProvider}; #[derive(Debug, Clone, clap::Args)] #[command( @@ -265,15 +267,36 @@ impl S3 { let common_prefixes = resp.common_prefixes().unwrap_or_default(); // return prefixes at the root level - let logstreams: Vec<_> = common_prefixes + let dirs: Vec<_> = common_prefixes .iter() .filter_map(CommonPrefix::prefix) .filter_map(|name| name.strip_suffix('/')) .map(String::from) - .map(|name| LogStream { name }) .collect(); - Ok(logstreams) + let stream_json_check = FuturesUnordered::new(); + + for dir in &dirs { + let key = format!("{}/{}", dir, object_storage::STREAM_METADATA_FILE_NAME); + let task = async move { + self.client + .head_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + .map(|_| ()) + }; + + stream_json_check.push(task); + } + + stream_json_check.try_collect().await?; + + Ok(dirs + .into_iter() + .map(|name| LogStream { name }) + .collect_vec()) } async fn _upload_file(