diff --git a/server/src/query.rs b/server/src/query.rs index a79941174..2a2a235d8 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,28 +16,15 @@ * */ -use arrow_schema::SchemaRef; -use async_trait::async_trait; +mod table_provider; + use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::ipc::reader::StreamReader; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::ListingOptions; -use datafusion::datasource::listing::ListingTable; -use datafusion::datasource::listing::ListingTableConfig; -use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; -use datafusion::logical_expr::TableType; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::datasource::TableProvider; use datafusion::prelude::*; use serde_json::Value; -use std::any::Any; -use std::fs::File; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -50,6 +37,7 @@ use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; +use table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -110,31 +98,14 @@ impl Query { let parquet_files: Vec = possible_parquet_files.chain(parquet_files).collect(); - let mut results = vec![]; - - if !(arrow_files.is_empty() && parquet_files.is_empty()) { - self.execute_on_cache( - arrow_files, - parquet_files, - self.schema.clone(), - &mut results, - ) - .await?; - } - - storage.query(self, &mut results).await?; - Ok(results) - } - - async fn execute_on_cache( - &self, - arrow_files: Vec, - parquet_files: Vec, - schema: Arc, - results: &mut Vec, - ) -> Result<(), ExecuteError> { - let ctx = SessionContext::new(); - let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema)); + let ctx = + SessionContext::with_config_rt(SessionConfig::default(), storage.query_runtime_env()); + let table = Arc::new(QueryTableProvider::new( + arrow_files, + parquet_files, + storage.query_table(self)?, + Arc::clone(&self.schema), + )); ctx.register_table( &*self.stream_name, Arc::clone(&table) as Arc, @@ -142,9 +113,9 @@ impl Query { .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; - results.extend(df.collect().await?); + let results = df.collect().await?; table.remove_preserve(); - Ok(()) + Ok(results) } } @@ -166,127 +137,6 @@ fn time_from_path(path: &Path) -> DateTime { .expect("valid prefix is parsed") } -#[derive(Debug)] -struct QueryTableProvider { - arrow_files: Vec, - parquet_files: Vec, - schema: Arc, -} - -impl QueryTableProvider { - fn new(arrow_files: Vec, parquet_files: Vec, schema: Arc) -> Self { - // By the time this query executes the arrow files could be converted to parquet files - // we want to preserve these files as well in case - - let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - for file in &parquet_files { - parquet_cached.upsert(file) - } - - Self { - arrow_files, - parquet_files, - schema, - } - } - - pub fn remove_preserve(&self) { - let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - for file in &self.parquet_files { - parquet_cached.remove(file) - } - } - - pub async fn create_physical_plan( - &self, - ctx: &SessionState, - projection: &Option>, - filters: &[Expr], - limit: Option, - ) -> Result, DataFusionError> { - let mut mem_records: Vec> = Vec::new(); - let mut parquet_files = self.parquet_files.clone(); - for file in &self.arrow_files { - let Ok(arrow_file) = File::open(file) else { continue; }; - let reader = StreamReader::try_new(arrow_file, None)?; - let records = reader - .filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("warning from arrow stream {:?}", e); - None - } - }) - .collect(); - mem_records.push(records); - - let mut file = file.clone(); - file.set_extension("parquet"); - - parquet_files.retain(|p| p != &file) - } - - let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; - let memexec = memtable.scan(ctx, projection, filters, limit).await?; - - if parquet_files.is_empty() { - Ok(memexec) - } else { - let listing_options = ListingOptions { - file_extension: ".parquet".to_owned(), - format: Arc::new(ParquetFormat::default().with_enable_pruning(true)), - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 1, - }; - - let paths = parquet_files - .clone() - .into_iter() - .map(|path| { - ListingTableUrl::parse(path.to_str().expect("path should is valid unicode")) - .expect("path is valid for filesystem listing") - }) - .collect(); - - let config = ListingTableConfig::new_with_multi_paths(paths) - .with_listing_options(listing_options) - .with_schema(Arc::clone(&self.schema)); - - let listtable = ListingTable::try_new(config).unwrap(); - let listexec = listtable.scan(ctx, projection, filters, limit).await?; - - Ok(Arc::new(UnionExec::new(vec![memexec, listexec]))) - } - } -} - -#[async_trait] -impl TableProvider for QueryTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - ctx: &SessionState, - projection: &Option>, - filters: &[Expr], - limit: Option, - ) -> datafusion::error::Result> { - self.create_physical_plan(ctx, projection, filters, limit) - .await - } -} - pub mod error { use datafusion::error::DataFusionError; diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs new file mode 100644 index 000000000..44e94acde --- /dev/null +++ b/server/src/query/table_provider.rs @@ -0,0 +1,179 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_schema::Schema; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::arrow::ipc::reader::StreamReader; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::Expr; +use std::any::Any; +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +pub struct QueryTableProvider { + arrow_files: Vec, + parquet_files: Vec, + storage: ListingTable, + schema: Arc, +} + +impl QueryTableProvider { + pub fn new( + arrow_files: Vec, + parquet_files: Vec, + storage: ListingTable, + schema: Arc, + ) -> Self { + // By the time this query executes the arrow files could be converted to parquet files + // we want to preserve these files as well in case + + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &parquet_files { + parquet_cached.upsert(file) + } + + Self { + arrow_files, + parquet_files, + storage, + schema, + } + } + + pub fn remove_preserve(&self) { + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &self.parquet_files { + parquet_cached.remove(file) + } + } + + async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let mut mem_records: Vec> = Vec::new(); + let mut parquet_files = self.parquet_files.clone(); + for file in &self.arrow_files { + load_arrows(file, &mut mem_records, &mut parquet_files); + } + + let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; + let memexec = memtable.scan(ctx, projection, filters, limit).await?; + + let cache_exec = if parquet_files.is_empty() { + memexec + } else { + let listtable = local_parquet_table(&parquet_files, &self.schema)?; + let listexec = listtable.scan(ctx, projection, filters, limit).await?; + Arc::new(UnionExec::new(vec![memexec, listexec])) + }; + let storage_exec = self.storage.scan(ctx, projection, filters, limit).await?; + + Ok(Arc::new(UnionExec::new(vec![cache_exec, storage_exec]))) + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await + } +} + +fn local_parquet_table( + parquet_files: &[PathBuf], + schema: &SchemaRef, +) -> Result { + let listing_options = ListingOptions { + file_extension: ".parquet".to_owned(), + format: Arc::new(ParquetFormat::default().with_enable_pruning(true)), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 1, + }; + + let paths = parquet_files + .iter() + .map(|path| { + ListingTableUrl::parse(path.to_str().expect("path should is valid unicode")) + .expect("path is valid for filesystem listing") + }) + .collect(); + + let config = ListingTableConfig::new_with_multi_paths(paths) + .with_listing_options(listing_options) + .with_schema(Arc::clone(schema)); + + ListingTable::try_new(config) +} + +fn load_arrows( + file: &PathBuf, + mem_records: &mut Vec>, + parquet_files: &mut Vec, +) { + let Ok(arrow_file) = File::open(file) else { return; }; + let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return; }; + let records = reader + .filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("warning from arrow stream {:?}", e); + None + } + }) + .collect(); + mem_records.push(records); + let mut file = file.clone(); + file.set_extension("parquet"); + parquet_files.retain(|p| p != &file); +} diff --git a/server/src/s3.rs b/server/src/s3.rs index 1c2fbc5d7..ca732d318 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use async_trait::async_trait; use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind}; use aws_sdk_s3::model::{CommonPrefix, Delete, ObjectIdentifier}; @@ -9,14 +27,12 @@ use aws_smithy_async::rt::sleep::default_async_sleep; use bytes::Bytes; use clap::builder::ArgPredicate; use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use datafusion::prelude::{SessionConfig, SessionContext}; use futures::StreamExt; use http::Uri; use object_store::aws::AmazonS3Builder; @@ -45,14 +61,14 @@ const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; -#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { #[serde(rename = "objectstore-format")] pub version: String, } -impl ObjectStoreFormat { - pub fn new() -> Self { +impl Default for ObjectStoreFormat { + fn default() -> Self { Self { version: "v1".to_string(), } @@ -398,7 +414,7 @@ impl ObjectStorage for S3 { } async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - let format = ObjectStoreFormat::new(); + let format = ObjectStoreFormat::default(); let body = serde_json::to_vec(&format)?; self._create_stream(stream_name, body).await?; @@ -422,6 +438,19 @@ impl ObjectStorage for S3 { Ok(()) } + async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { + let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); + let parseable_metadata = self._get_parseable_config(stream_name).await?; + let mut parseable_metadata: Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + parseable_metadata["stats"] = stats; + + self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes()) + .await?; + Ok(()) + } + async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError> { let body_bytes = self._get_schema(stream_name).await?; let schema = serde_json::from_slice(&body_bytes).ok(); @@ -452,19 +481,6 @@ impl ObjectStorage for S3 { Ok(stats) } - async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { - let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); - let parseable_metadata = self._get_parseable_config(stream_name).await?; - let mut parseable_metadata: Value = - serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); - - parseable_metadata["stats"] = stats; - - self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes()) - .await?; - Ok(()) - } - async fn list_streams(&self) -> Result, ObjectStorageError> { let streams = self._list_streams().await?; @@ -477,14 +493,7 @@ impl ObjectStorage for S3 { Ok(()) } - async fn query( - &self, - query: &Query, - results: &mut Vec, - ) -> Result<(), ObjectStorageError> { - let ctx = - SessionContext::with_config_rt(SessionConfig::default(), Arc::clone(&STORAGE_RUNTIME)); - + fn query_table(&self, query: &Query) -> Result { // Get all prefix paths and convert them into futures which yeilds ListingTableUrl let prefixes = query .get_prefixes() @@ -508,14 +517,11 @@ impl ObjectStorage for S3 { .with_listing_options(listing_options) .with_schema(Arc::clone(&query.schema)); - let table = ListingTable::try_new(config)?; - ctx.register_table(query.stream_name.as_str(), Arc::new(table))?; - - // execute the query and collect results - let df = ctx.sql(&query.query).await?; - results.extend(df.collect().await?); + Ok(ListingTable::try_new(config)?) + } - Ok(()) + fn query_runtime_env(&self) -> Arc { + Arc::clone(&STORAGE_RUNTIME) } } diff --git a/server/src/storage.rs b/server/src/storage.rs index 22f54244d..4b8b2e280 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -29,7 +29,8 @@ use chrono::{NaiveDateTime, Timelike, Utc}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; use datafusion::arrow::ipc::reader::StreamReader; -use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::listing::ListingTable; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::errors::ParquetError; use datafusion::parquet::file::properties::WriterProperties; @@ -41,7 +42,7 @@ use std::fmt::Debug; use std::fs::{self, File}; use std::iter::Iterator; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use self::file_link::CacheState; @@ -91,11 +92,8 @@ pub trait ObjectStorage: Sync + 'static { async fn get_stats(&self, stream_name: &str) -> Result; async fn list_streams(&self) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError>; - async fn query( - &self, - query: &Query, - results: &mut Vec, - ) -> Result<(), ObjectStorageError>; + fn query_table(&self, query: &Query) -> Result; + fn query_runtime_env(&self) -> Arc; async fn s3_sync(&self) -> Result<(), MoveDataError> { if !Path::new(&CONFIG.parseable.local_disk_path).exists() {