diff --git a/server/Cargo.toml b/server/Cargo.toml index 7d008e748..423d1316e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -23,8 +23,8 @@ aws-types = "0.47" bytes = "1" chrono = "0.4.19" crossterm = "0.23.2" -datafusion = "8.0" -datafusion-objectstore-s3 = { git = "https://github.com/parseablehq/datafusion-objectstore-s3" } +datafusion = "11.0" +object_store = { version = "0.4", features=["aws"] } derive_more = "0.99.17" env_logger = "0.9.0" futures = "0.3" diff --git a/server/src/s3.rs b/server/src/s3.rs index 65b73fb8d..42094cf7b 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -4,41 +4,80 @@ use aws_sdk_s3::model::{Delete, ObjectIdentifier}; use aws_sdk_s3::types::{ByteStream, SdkError}; use aws_sdk_s3::Error as AwsSdkError; use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; -use aws_types::credentials::SharedCredentialsProvider; use bytes::Bytes; use crossterm::style::Stylize; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::listing::{ListingTable, ListingTableConfig}; -use datafusion::prelude::SessionContext; -use datafusion_objectstore_s3::object_store::s3::S3FileSystem; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::object_store::ObjectStoreRegistry; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use futures::StreamExt; use http::Uri; +use object_store::aws::AmazonS3Builder; +use object_store::limit::LimitStore; use std::collections::HashSet; use std::fs; use std::iter::Iterator; use std::sync::Arc; use structopt::StructOpt; -use tokio_stream::StreamExt; use crate::alerts::Alerts; use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; +use crate::utils::hostname_unchecked; // Default object storage currently is DO Spaces bucket // Any user who starts the Parseable server with default configuration // will point to this bucket and will see any data present on this bucket -const DEFAULT_S3_URL: &str = "https://sgp1.digitaloceanspaces.com"; -const DEFAULT_S3_REGION: &str = "sgp1"; +const DEFAULT_S3_URL: &str = "https://minio.parseable.io:9000"; +const DEFAULT_S3_REGION: &str = "us-east-1"; const DEFAULT_S3_BUCKET: &str = "parseable"; -const DEFAULT_S3_ACCESS_KEY: &str = "DO00YF68WC2P3QUAM82K"; -const DEFAULT_S3_SECRET_KEY: &str = "Ov6D7DvM6NHlyU4W2ajrHhRnT4IVRqKxExLPhekNIKw"; +const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; +const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; const S3_URL_ENV_VAR: &str = "P_S3_URL"; +// max concurrent request allowed for datafusion object store +const MAX_OBJECT_STORE_REQUESTS: usize = 1000; +// max concurrent request allowed for prefix checks during listing +const MAX_CONCURRENT_PREFIX_CHECK: usize = 1000; + lazy_static::lazy_static! { #[derive(Debug)] pub static ref S3_CONFIG: Arc = Arc::new(S3Config::from_args()); + + // runtime to be used in query session + pub static ref STORAGE_RUNTIME: Arc = { + + let s3 = AmazonS3Builder::new() + .with_region(&S3_CONFIG.s3_default_region) + .with_endpoint(&S3_CONFIG.s3_endpoint_url) + .with_bucket_name(&S3_CONFIG.s3_bucket_name) + .with_access_key_id(&S3_CONFIG.s3_access_key_id) + .with_secret_access_key(&S3_CONFIG.s3_secret_key) + // allow http for local instances + .with_allow_http(true) + .build() + .unwrap(); + + // limit objectstore to a concurrent request limit + let s3 = LimitStore::new(s3, MAX_OBJECT_STORE_REQUESTS); + + let object_store_registry = ObjectStoreRegistry::new(); + object_store_registry.register_store("s3", &S3_CONFIG.s3_bucket_name, Arc::new(s3)); + + let config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); + + let runtime = RuntimeEnv::new(config).unwrap(); + + Arc::new(runtime) + + }; } #[derive(Debug, Clone, StructOpt)] @@ -125,7 +164,6 @@ impl S3Options { } pub struct S3 { - options: S3Options, client: aws_sdk_s3::Client, } @@ -133,14 +171,14 @@ impl S3 { pub fn new() -> Self { let options = S3Options::new(); let config = aws_sdk_s3::Config::builder() - .region(options.region.clone()) - .endpoint_resolver(options.endpoint.clone()) - .credentials_provider(options.creds.clone()) + .region(options.region) + .endpoint_resolver(options.endpoint) + .credentials_provider(options.creds) .build(); let client = Client::from_conf(config); - Self { options, client } + Self { client } } async fn _put_schema(&self, stream_name: String, body: String) -> Result<(), AwsSdkError> { @@ -376,37 +414,66 @@ impl ObjectStorage for S3 { query: &Query, results: &mut Vec, ) -> Result<(), ObjectStorageError> { - let s3_file_system = Arc::new( - S3FileSystem::new( - Some(SharedCredentialsProvider::new(self.options.creds.clone())), - Some(self.options.region.clone()), - Some(self.options.endpoint.clone()), - None, - None, - None, - ) - .await, - ); - - for prefix in query.get_prefixes() { - let ctx = SessionContext::new(); - let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix); - - if !self.prefix_exists(&prefix).await? { - continue; - } + let ctx = + SessionContext::with_config_rt(SessionConfig::default(), Arc::clone(&STORAGE_RUNTIME)); + + // Get all prefix paths and convert them into futures which yeilds ListingTableUrl + let handles = query.get_prefixes().into_iter().map(|prefix| async move { + let t: Result, ObjectStorageError> = { + if self.prefix_exists(&prefix).await? { + let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix); + Ok(Some(ListingTableUrl::parse(path)?)) + } else { + Ok(None) + } + }; + t + }); + + // Poll futures but limit them to concurrency of MAX_CONCURRENT_PREFIX_CHECK + let list: Vec<_> = futures::stream::iter(handles) + .buffer_unordered(MAX_CONCURRENT_PREFIX_CHECK) + .collect() + .await; + + // Collect all available prefixes + let prefixes: Vec<_> = list + .into_iter() + .filter_map(|fetch| { + fetch + .map_err(|err| { + log::error!("prefix lookup failed due to {}", err); + err + }) + .ok() + }) + .flatten() + .collect(); + + if prefixes.is_empty() { + return Ok(()); + } - let config = ListingTableConfig::new(s3_file_system.clone(), &path) - .infer() - .await?; + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions { + file_extension: format!("{}.data.parquet", hostname_unchecked()), + format: Arc::new(file_format), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 1, + }; + + let config = ListingTableConfig::new_with_multi_paths(prefixes) + .with_listing_options(listing_options) + .infer(&ctx.state()) + .await?; - let table = ListingTable::try_new(config)?; - ctx.register_table(query.stream_name.as_str(), Arc::new(table))?; + let table = ListingTable::try_new(config)?; + ctx.register_table(query.stream_name.as_str(), Arc::new(table))?; - // execute the query and collect results - let df = ctx.sql(query.query.as_str()).await?; - results.extend(df.collect().await?); - } + // execute the query and collect results + let df = ctx.sql(&query.query).await?; + results.extend(df.collect().await?); Ok(()) } @@ -414,7 +481,7 @@ impl ObjectStorage for S3 { impl From for ObjectStorageError { fn from(error: AwsSdkError) -> Self { - ObjectStorageError::UnhandledError(error.into()) + ObjectStorageError::UnhandledError(Box::new(error)) } } @@ -429,15 +496,15 @@ impl From> for ObjectStorageError { }, .. } => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()), - SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()), + SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(Box::new(err)), SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err), - err => ObjectStorageError::UnhandledError(err.into()), + err => ObjectStorageError::UnhandledError(Box::new(err)), } } } impl From for ObjectStorageError { fn from(error: serde_json::Error) -> Self { - ObjectStorageError::UnhandledError(error.into()) + ObjectStorageError::UnhandledError(Box::new(error)) } } diff --git a/server/src/storage.rs b/server/src/storage.rs index 07971526c..e1998412c 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -209,13 +209,13 @@ pub enum ObjectStorageError { #[error("Bucket {0} not found")] NoSuchBucket(String), #[error("Connection Error: {0}")] - ConnectionError(Box), + ConnectionError(Box), #[error("IO Error: {0}")] IoError(#[from] std::io::Error), #[error("DataFusion Error: {0}")] DataFusionError(#[from] datafusion::error::DataFusionError), #[error("Unhandled Error: {0}")] - UnhandledError(Box), + UnhandledError(Box), } impl From for crate::error::Error {