Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ aws-types = "0.47"
bytes = "1"
chrono = "0.4.19"
crossterm = "0.23.2"
datafusion = "8.0"
datafusion-objectstore-s3 = { git = "https://github.com/parseablehq/datafusion-objectstore-s3" }
datafusion = "11.0"
object_store = { version = "0.4", features=["aws"] }
derive_more = "0.99.17"
env_logger = "0.9.0"
futures = "0.3"
Expand Down
159 changes: 113 additions & 46 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,80 @@ use aws_sdk_s3::model::{Delete, ObjectIdentifier};
use aws_sdk_s3::types::{ByteStream, SdkError};
use aws_sdk_s3::Error as AwsSdkError;
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
use aws_types::credentials::SharedCredentialsProvider;
use bytes::Bytes;
use crossterm::style::Stylize;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::listing::{ListingTable, ListingTableConfig};
use datafusion::prelude::SessionContext;
use datafusion_objectstore_s3::object_store::s3::S3FileSystem;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::{SessionConfig, SessionContext};
use futures::StreamExt;
use http::Uri;
use object_store::aws::AmazonS3Builder;
use object_store::limit::LimitStore;
use std::collections::HashSet;
use std::fs;
use std::iter::Iterator;
use std::sync::Arc;
use structopt::StructOpt;
use tokio_stream::StreamExt;

use crate::alerts::Alerts;
use crate::metadata::Stats;
use crate::option::{StorageOpt, CONFIG};
use crate::query::Query;
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
use crate::utils::hostname_unchecked;

// Default object storage currently is DO Spaces bucket
// Any user who starts the Parseable server with default configuration
// will point to this bucket and will see any data present on this bucket
const DEFAULT_S3_URL: &str = "https://sgp1.digitaloceanspaces.com";
const DEFAULT_S3_REGION: &str = "sgp1";
const DEFAULT_S3_URL: &str = "https://minio.parseable.io:9000";
const DEFAULT_S3_REGION: &str = "us-east-1";
const DEFAULT_S3_BUCKET: &str = "parseable";
const DEFAULT_S3_ACCESS_KEY: &str = "DO00YF68WC2P3QUAM82K";
const DEFAULT_S3_SECRET_KEY: &str = "Ov6D7DvM6NHlyU4W2ajrHhRnT4IVRqKxExLPhekNIKw";
const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin";
const DEFAULT_S3_SECRET_KEY: &str = "minioadmin";

const S3_URL_ENV_VAR: &str = "P_S3_URL";

// max concurrent request allowed for datafusion object store
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
// max concurrent request allowed for prefix checks during listing
const MAX_CONCURRENT_PREFIX_CHECK: usize = 1000;

lazy_static::lazy_static! {
#[derive(Debug)]
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(S3Config::from_args());

// runtime to be used in query session
pub static ref STORAGE_RUNTIME: Arc<RuntimeEnv> = {

let s3 = AmazonS3Builder::new()
.with_region(&S3_CONFIG.s3_default_region)
.with_endpoint(&S3_CONFIG.s3_endpoint_url)
.with_bucket_name(&S3_CONFIG.s3_bucket_name)
.with_access_key_id(&S3_CONFIG.s3_access_key_id)
.with_secret_access_key(&S3_CONFIG.s3_secret_key)
// allow http for local instances
.with_allow_http(true)
.build()
.unwrap();

// limit objectstore to a concurrent request limit
let s3 = LimitStore::new(s3, MAX_OBJECT_STORE_REQUESTS);

let object_store_registry = ObjectStoreRegistry::new();
object_store_registry.register_store("s3", &S3_CONFIG.s3_bucket_name, Arc::new(s3));

let config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));

let runtime = RuntimeEnv::new(config).unwrap();

Arc::new(runtime)

};
}

#[derive(Debug, Clone, StructOpt)]
Expand Down Expand Up @@ -125,22 +164,21 @@ impl S3Options {
}

pub struct S3 {
options: S3Options,
client: aws_sdk_s3::Client,
}

impl S3 {
pub fn new() -> Self {
let options = S3Options::new();
let config = aws_sdk_s3::Config::builder()
.region(options.region.clone())
.endpoint_resolver(options.endpoint.clone())
.credentials_provider(options.creds.clone())
.region(options.region)
.endpoint_resolver(options.endpoint)
.credentials_provider(options.creds)
.build();

let client = Client::from_conf(config);

Self { options, client }
Self { client }
}

async fn _put_schema(&self, stream_name: String, body: String) -> Result<(), AwsSdkError> {
Expand Down Expand Up @@ -376,45 +414,74 @@ impl ObjectStorage for S3 {
query: &Query,
results: &mut Vec<RecordBatch>,
) -> Result<(), ObjectStorageError> {
let s3_file_system = Arc::new(
S3FileSystem::new(
Some(SharedCredentialsProvider::new(self.options.creds.clone())),
Some(self.options.region.clone()),
Some(self.options.endpoint.clone()),
None,
None,
None,
)
.await,
);

for prefix in query.get_prefixes() {
let ctx = SessionContext::new();
let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix);

if !self.prefix_exists(&prefix).await? {
continue;
}
let ctx =
SessionContext::with_config_rt(SessionConfig::default(), Arc::clone(&STORAGE_RUNTIME));

// Get all prefix paths and convert them into futures which yeilds ListingTableUrl
let handles = query.get_prefixes().into_iter().map(|prefix| async move {
let t: Result<Option<ListingTableUrl>, ObjectStorageError> = {
if self.prefix_exists(&prefix).await? {
let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix);
Ok(Some(ListingTableUrl::parse(path)?))
} else {
Ok(None)
}
};
t
});

// Poll futures but limit them to concurrency of MAX_CONCURRENT_PREFIX_CHECK
let list: Vec<_> = futures::stream::iter(handles)
.buffer_unordered(MAX_CONCURRENT_PREFIX_CHECK)
.collect()
.await;

// Collect all available prefixes
let prefixes: Vec<_> = list
.into_iter()
.filter_map(|fetch| {
fetch
.map_err(|err| {
log::error!("prefix lookup failed due to {}", err);
err
})
.ok()
})
.flatten()
.collect();

if prefixes.is_empty() {
return Ok(());
}

let config = ListingTableConfig::new(s3_file_system.clone(), &path)
.infer()
.await?;
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions {
file_extension: format!("{}.data.parquet", hostname_unchecked()),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
};

let config = ListingTableConfig::new_with_multi_paths(prefixes)
.with_listing_options(listing_options)
.infer(&ctx.state())
.await?;

let table = ListingTable::try_new(config)?;
ctx.register_table(query.stream_name.as_str(), Arc::new(table))?;
let table = ListingTable::try_new(config)?;
ctx.register_table(query.stream_name.as_str(), Arc::new(table))?;

// execute the query and collect results
let df = ctx.sql(query.query.as_str()).await?;
results.extend(df.collect().await?);
}
// execute the query and collect results
let df = ctx.sql(&query.query).await?;
results.extend(df.collect().await?);

Ok(())
}
}

impl From<AwsSdkError> for ObjectStorageError {
fn from(error: AwsSdkError) -> Self {
ObjectStorageError::UnhandledError(error.into())
ObjectStorageError::UnhandledError(Box::new(error))
}
}

Expand All @@ -429,15 +496,15 @@ impl From<SdkError<HeadBucketError>> for ObjectStorageError {
},
..
} => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()),
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()),
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(Box::new(err)),
SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err),
err => ObjectStorageError::UnhandledError(err.into()),
err => ObjectStorageError::UnhandledError(Box::new(err)),
}
}
}

impl From<serde_json::Error> for ObjectStorageError {
fn from(error: serde_json::Error) -> Self {
ObjectStorageError::UnhandledError(error.into())
ObjectStorageError::UnhandledError(Box::new(error))
}
}
4 changes: 2 additions & 2 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ pub enum ObjectStorageError {
#[error("Bucket {0} not found")]
NoSuchBucket(String),
#[error("Connection Error: {0}")]
ConnectionError(Box<dyn std::error::Error>),
ConnectionError(Box<dyn std::error::Error + Send + 'static>),
#[error("IO Error: {0}")]
IoError(#[from] std::io::Error),
#[error("DataFusion Error: {0}")]
DataFusionError(#[from] datafusion::error::DataFusionError),
#[error("Unhandled Error: {0}")]
UnhandledError(Box<dyn std::error::Error>),
UnhandledError(Box<dyn std::error::Error + Send + 'static>),
}

impl From<ObjectStorageError> for crate::error::Error {
Expand Down