Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 14 additions & 164 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,15 @@
*
*/

use arrow_schema::SchemaRef;
use async_trait::async_trait;
mod table_provider;

use chrono::TimeZone;
use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::listing::ListingTableConfig;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::TableType;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use serde_json::Value;
use std::any::Any;
use std::fs::File;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -50,6 +37,7 @@ use crate::utils::TimePeriod;
use crate::validator;

use self::error::{ExecuteError, ParseError};
use table_provider::QueryTableProvider;

type Key = &'static str;
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
Expand Down Expand Up @@ -110,41 +98,24 @@ impl Query {

let parquet_files: Vec<PathBuf> = possible_parquet_files.chain(parquet_files).collect();

let mut results = vec![];

if !(arrow_files.is_empty() && parquet_files.is_empty()) {
self.execute_on_cache(
arrow_files,
parquet_files,
self.schema.clone(),
&mut results,
)
.await?;
}

storage.query(self, &mut results).await?;
Ok(results)
}

async fn execute_on_cache(
&self,
arrow_files: Vec<PathBuf>,
parquet_files: Vec<PathBuf>,
schema: Arc<Schema>,
results: &mut Vec<RecordBatch>,
) -> Result<(), ExecuteError> {
let ctx = SessionContext::new();
let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema));
let ctx =
SessionContext::with_config_rt(SessionConfig::default(), storage.query_runtime_env());
let table = Arc::new(QueryTableProvider::new(
arrow_files,
parquet_files,
storage.query_table(self)?,
Arc::clone(&self.schema),
));
ctx.register_table(
&*self.stream_name,
Arc::clone(&table) as Arc<dyn TableProvider>,
)
.map_err(ObjectStorageError::DataFusionError)?;
// execute the query and collect results
let df = ctx.sql(self.query.as_str()).await?;
results.extend(df.collect().await?);
let results = df.collect().await?;
table.remove_preserve();
Ok(())
Ok(results)
}
}

Expand All @@ -166,127 +137,6 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
.expect("valid prefix is parsed")
}

#[derive(Debug)]
struct QueryTableProvider {
arrow_files: Vec<PathBuf>,
parquet_files: Vec<PathBuf>,
schema: Arc<Schema>,
}

impl QueryTableProvider {
fn new(arrow_files: Vec<PathBuf>, parquet_files: Vec<PathBuf>, schema: Arc<Schema>) -> Self {
// By the time this query executes the arrow files could be converted to parquet files
// we want to preserve these files as well in case

let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &parquet_files {
parquet_cached.upsert(file)
}

Self {
arrow_files,
parquet_files,
schema,
}
}

pub fn remove_preserve(&self) {
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &self.parquet_files {
parquet_cached.remove(file)
}
}

pub async fn create_physical_plan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut mem_records: Vec<Vec<RecordBatch>> = Vec::new();
let mut parquet_files = self.parquet_files.clone();
for file in &self.arrow_files {
let Ok(arrow_file) = File::open(file) else { continue; };
let reader = StreamReader::try_new(arrow_file, None)?;
let records = reader
.filter_map(|record| match record {
Ok(record) => Some(record),
Err(e) => {
log::warn!("warning from arrow stream {:?}", e);
None
}
})
.collect();
mem_records.push(records);

let mut file = file.clone();
file.set_extension("parquet");

parquet_files.retain(|p| p != &file)
}

let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
let memexec = memtable.scan(ctx, projection, filters, limit).await?;

if parquet_files.is_empty() {
Ok(memexec)
} else {
let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
};

let paths = parquet_files
.clone()
.into_iter()
.map(|path| {
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
.expect("path is valid for filesystem listing")
})
.collect();

let config = ListingTableConfig::new_with_multi_paths(paths)
.with_listing_options(listing_options)
.with_schema(Arc::clone(&self.schema));

let listtable = ListingTable::try_new(config).unwrap();
let listexec = listtable.scan(ctx, projection, filters, limit).await?;

Ok(Arc::new(UnionExec::new(vec![memexec, listexec])))
}
}
}

#[async_trait]
impl TableProvider for QueryTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
self.create_physical_plan(ctx, projection, filters, limit)
.await
}
}

pub mod error {
use datafusion::error::DataFusionError;

Expand Down
179 changes: 179 additions & 0 deletions server/src/query/table_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Parseable Server (C) 2022 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use arrow_schema::Schema;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::TableType;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use std::any::Any;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;

pub struct QueryTableProvider {
arrow_files: Vec<PathBuf>,
parquet_files: Vec<PathBuf>,
storage: ListingTable,
schema: Arc<Schema>,
}

impl QueryTableProvider {
pub fn new(
arrow_files: Vec<PathBuf>,
parquet_files: Vec<PathBuf>,
storage: ListingTable,
schema: Arc<Schema>,
) -> Self {
// By the time this query executes the arrow files could be converted to parquet files
// we want to preserve these files as well in case

let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &parquet_files {
parquet_cached.upsert(file)
}

Self {
arrow_files,
parquet_files,
storage,
schema,
}
}

pub fn remove_preserve(&self) {
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &self.parquet_files {
parquet_cached.remove(file)
}
}

async fn create_physical_plan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut mem_records: Vec<Vec<RecordBatch>> = Vec::new();
let mut parquet_files = self.parquet_files.clone();
for file in &self.arrow_files {
load_arrows(file, &mut mem_records, &mut parquet_files);
}

let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
let memexec = memtable.scan(ctx, projection, filters, limit).await?;

let cache_exec = if parquet_files.is_empty() {
memexec
} else {
let listtable = local_parquet_table(&parquet_files, &self.schema)?;
let listexec = listtable.scan(ctx, projection, filters, limit).await?;
Arc::new(UnionExec::new(vec![memexec, listexec]))
};
let storage_exec = self.storage.scan(ctx, projection, filters, limit).await?;

Ok(Arc::new(UnionExec::new(vec![cache_exec, storage_exec])))
}
}

#[async_trait]
impl TableProvider for QueryTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
self.create_physical_plan(ctx, projection, filters, limit)
.await
}
}

fn local_parquet_table(
parquet_files: &[PathBuf],
schema: &SchemaRef,
) -> Result<ListingTable, DataFusionError> {
let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
};

let paths = parquet_files
.iter()
.map(|path| {
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
.expect("path is valid for filesystem listing")
})
.collect();

let config = ListingTableConfig::new_with_multi_paths(paths)
.with_listing_options(listing_options)
.with_schema(Arc::clone(schema));

ListingTable::try_new(config)
}

fn load_arrows(
file: &PathBuf,
mem_records: &mut Vec<Vec<RecordBatch>>,
parquet_files: &mut Vec<PathBuf>,
) {
let Ok(arrow_file) = File::open(file) else { return; };
let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return; };
let records = reader
.filter_map(|record| match record {
Ok(record) => Some(record),
Err(e) => {
log::warn!("warning from arrow stream {:?}", e);
None
}
})
.collect();
mem_records.push(records);
let mut file = file.clone();
file.set_extension("parquet");
parquet_files.retain(|p| p != &file);
}
Loading