Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ use datafusion::{
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
};
use fs_extra::file::{move_file, CopyOptions};
use futures::StreamExt;
use futures::{stream::FuturesUnordered, TryStreamExt};
use relative_path::RelativePath;
use tokio::fs;
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;

use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::{option::validation, utils::validate_path_is_writeable};

use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};

#[derive(Debug, Clone, clap::Args)]
#[command(
Expand Down Expand Up @@ -152,29 +152,25 @@ impl ObjectStorage for LocalFS {
let path = self.root.join(stream_name);
Ok(fs::remove_dir_all(path).await?)
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let ignore_dir = &["lost+found"];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
let directories = directories
.filter_map(|res| async {
let entry = res.ok()?;
if entry.file_type().await.ok()?.is_dir() {
Some(LogStream {
name: entry
.path()
.file_name()
.expect("valid path")
.to_str()
.expect("valid unicode")
.to_string(),
})
} else {
None
}
})
.collect::<Vec<LogStream>>()
.await;
let entries: Vec<DirEntry> = directories.try_collect().await?;
let entries = entries
.into_iter()
.map(|entry| dir_with_stream(entry, ignore_dir));

let logstream_dirs: Vec<Option<String>> =
FuturesUnordered::from_iter(entries).try_collect().await?;

let logstreams = logstream_dirs
.into_iter()
.flatten()
.map(|name| LogStream { name })
.collect();

Ok(directories)
Ok(logstreams)
}

async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
Expand Down Expand Up @@ -228,6 +224,37 @@ impl ObjectStorage for LocalFS {
}
}

async fn dir_with_stream(
entry: DirEntry,
ignore_dirs: &[&str],
) -> Result<Option<String>, ObjectStorageError> {
let dir_name = entry
.path()
.file_name()
.expect("valid path")
.to_str()
.expect("valid unicode")
.to_owned();

if ignore_dirs.contains(&dir_name.as_str()) {
return Ok(None);
}

if entry.file_type().await?.is_dir() {
let path = entry.path();
let stream_json_path = path.join(object_storage::STREAM_METADATA_FILE_NAME);
if stream_json_path.exists() {
Ok(Some(dir_name))
} else {
let err: Box<dyn std::error::Error + Send + Sync + 'static> =
format!("found {}", entry.path().display()).into();
Err(ObjectStorageError::UnhandledError(err))
}
} else {
Ok(None)
}
}

impl From<fs_extra::error::Error> for ObjectStorageError {
fn from(e: fs_extra::error::Error) -> Self {
ObjectStorageError::UnhandledError(Box::new(e))
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use std::{
};

// metadata file names in a Stream prefix
const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub(super) const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
const SCHEMA_FILE_NAME: &str = ".schema";
const ALERT_FILE_NAME: &str = ".alert.json";
Expand Down
33 changes: 28 additions & 5 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use datafusion::datasource::listing::{
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use md5::{Digest, Md5};
use object_store::aws::AmazonS3Builder;
use object_store::limit::LimitStore;
Expand All @@ -50,7 +52,7 @@ use std::time::Instant;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};

use super::ObjectStorageProvider;
use super::{object_storage, ObjectStorageProvider};

#[derive(Debug, Clone, clap::Args)]
#[command(
Expand Down Expand Up @@ -265,15 +267,36 @@ impl S3 {
let common_prefixes = resp.common_prefixes().unwrap_or_default();

// return prefixes at the root level
let logstreams: Vec<_> = common_prefixes
let dirs: Vec<_> = common_prefixes
.iter()
.filter_map(CommonPrefix::prefix)
.filter_map(|name| name.strip_suffix('/'))
.map(String::from)
.map(|name| LogStream { name })
.collect();

Ok(logstreams)
let stream_json_check = FuturesUnordered::new();

for dir in &dirs {
let key = format!("{}/{}", dir, object_storage::STREAM_METADATA_FILE_NAME);
let task = async move {
self.client
.head_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.map(|_| ())
};

stream_json_check.push(task);
}

stream_json_check.try_collect().await?;

Ok(dirs
.into_iter()
.map(|name| LogStream { name })
.collect_vec())
}

async fn _upload_file(
Expand Down