diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2b44fc542..c7aa8977d 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -18,21 +18,52 @@ */ mod file_writer; +mod mem_writer; use std::{ collections::HashMap, - sync::{Mutex, RwLock}, + sync::{Arc, Mutex, RwLock}, }; -use self::{errors::StreamWriterError, file_writer::FileWriter}; -use arrow_array::RecordBatch; +use crate::utils; + +use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; +use arrow_array::{RecordBatch, TimestampMillisecondArray}; +use arrow_schema::Schema; +use chrono::Utc; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); +#[derive(Default)] +pub struct Writer { + pub mem: MemWriter<16384>, + pub disk: FileWriter, +} + +impl Writer { + fn push( + &mut self, + stream_name: &str, + schema_key: &str, + rb: RecordBatch, + ) -> Result<(), StreamWriterError> { + let rb = utils::arrow::replace_columns( + rb.schema(), + &rb, + &[0], + &[Arc::new(get_timestamp_array(rb.num_rows()))], + ); + + self.disk.push(stream_name, schema_key, &rb)?; + self.mem.push(schema_key, rb); + Ok(()) + } +} + #[derive(Deref, DerefMut, Default)] -pub struct WriterTable(RwLock>>); +pub struct WriterTable(RwLock>>); impl WriterTable { // append to a existing stream @@ -49,7 +80,7 @@ impl WriterTable { stream_writer .lock() .unwrap() - .push(stream_name, schema_key, &record)?; + .push(stream_name, schema_key, record)?; } None => { drop(hashmap_guard); @@ -60,10 +91,10 @@ impl WriterTable { writer .lock() .unwrap() - .push(stream_name, schema_key, &record)?; + .push(stream_name, schema_key, record)?; } else { - let mut writer = FileWriter::default(); - writer.push(stream_name, schema_key, &record)?; + let mut writer = Writer::default(); + writer.push(stream_name, schema_key, record)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } @@ -81,9 +112,31 @@ impl WriterTable { drop(table); for writer in map.into_values() { let writer = writer.into_inner().unwrap(); - writer.close_all(); + writer.disk.close_all(); } } + + pub fn recordbatches_cloned( + &self, + stream_name: &str, + schema: &Arc, + ) -> Option> { + let records = self + .0 + .read() + .unwrap() + .get(stream_name)? + .lock() + .unwrap() + .mem + .recordbatch_cloned(schema); + + Some(records) + } +} + +fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { + TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) } pub mod errors { diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 8c40e435b..a8c1d8918 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -17,17 +17,15 @@ * */ -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::writer::StreamWriter; -use chrono::Utc; -use derive_more::{Deref, DerefMut}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::path::PathBuf; -use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; +use derive_more::{Deref, DerefMut}; use crate::storage::staging::StorageDir; -use crate::utils; use super::errors::StreamWriterError; @@ -47,24 +45,17 @@ impl FileWriter { schema_key: &str, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let record = utils::arrow::replace_columns( - record.schema(), - record, - &[0], - &[Arc::new(get_timestamp_array(record.num_rows()))], - ); - match self.get_mut(schema_key) { Some(writer) => { writer .writer - .write(&record) + .write(record) .map_err(StreamWriterError::Writer)?; } // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, &record)?; + let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; self.insert( schema_key.to_owned(), ArrowWriter { @@ -85,10 +76,6 @@ impl FileWriter { } } -fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) -} - fn init_new_stream_writer_file( stream_name: &str, schema_key: &str, diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs new file mode 100644 index 000000000..2716d48cd --- /dev/null +++ b/server/src/event/writer/mem_writer.rs @@ -0,0 +1,120 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashSet, sync::Arc}; + +use arrow_array::RecordBatch; +use arrow_schema::Schema; +use arrow_select::concat::concat_batches; +use itertools::Itertools; + +use crate::utils::arrow::adapt_batch; + +/// Structure to keep recordbatches in memory. +/// +/// Any new schema is updated in the schema map. +/// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer +#[derive(Debug)] +pub struct MemWriter { + schema: Schema, + // for checking uniqueness of schema + schema_map: HashSet, + read_buffer: Vec, + mutable_buffer: MutableBuffer, +} + +impl Default for MemWriter { + fn default() -> Self { + Self { + schema: Schema::empty(), + schema_map: HashSet::default(), + read_buffer: Vec::default(), + mutable_buffer: MutableBuffer::default(), + } + } +} + +impl MemWriter { + pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { + if !self.schema_map.contains(schema_key) { + self.schema_map.insert(schema_key.to_owned()); + self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap(); + } + + if let Some(record) = self.mutable_buffer.push(rb) { + let record = concat_records(&Arc::new(self.schema.clone()), &record); + self.read_buffer.push(record); + } + } + + pub fn recordbatch_cloned(&self, schema: &Arc) -> Vec { + let mut read_buffer = self.read_buffer.clone(); + if self.mutable_buffer.rows > 0 { + let rb = concat_records(schema, &self.mutable_buffer.inner); + read_buffer.push(rb) + } + + read_buffer + .into_iter() + .map(|rb| adapt_batch(schema, &rb)) + .collect() + } +} + +fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { + let records = record.iter().map(|x| adapt_batch(schema, x)).collect_vec(); + let record = concat_batches(schema, records.iter()).unwrap(); + record +} + +#[derive(Debug, Default)] +struct MutableBuffer { + pub inner: Vec, + pub rows: usize, +} + +impl MutableBuffer { + fn push(&mut self, rb: RecordBatch) -> Option> { + if self.rows + rb.num_rows() >= N { + let left = N - self.rows; + let right = rb.num_rows() - left; + let left_slice = rb.slice(0, left); + let right_slice = if left < rb.num_rows() { + Some(rb.slice(left, right)) + } else { + None + }; + self.inner.push(left_slice); + // take all records + let src = Vec::with_capacity(self.inner.len()); + let inner = std::mem::replace(&mut self.inner, src); + self.rows = 0; + + if let Some(right_slice) = right_slice { + self.rows = right_slice.num_rows(); + self.inner.push(right_slice); + } + + Some(inner) + } else { + self.rows += rb.num_rows(); + self.inner.push(rb); + None + } + } +} diff --git a/server/src/query.rs b/server/src/query.rs index 551a7f3d6..8997ccdf7 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -17,6 +17,7 @@ */ mod filter_optimizer; +mod table_provider; use chrono::TimeZone; use chrono::{DateTime, Utc}; @@ -30,19 +31,21 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; use itertools::Itertools; use serde_json::Value; -use std::path::Path; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; use std::sync::Arc; use sysinfo::{System, SystemExt}; use crate::event::DEFAULT_TIMESTAMP_KEY; use crate::option::CONFIG; -use crate::storage::ObjectStorageError; use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; +use crate::storage::{ObjectStorageError, StorageDir}; use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; use self::filter_optimizer::FilterOptimizerRule; +use self::table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -124,30 +127,13 @@ impl Query { storage: Arc, ) -> Result<(Vec, Vec), ExecuteError> { let ctx = self.create_session_context(); - let prefixes = storage.query_prefixes(self.get_prefixes()); - - if prefixes.is_empty() { - return Ok((Vec::new(), Vec::new())); - } + let remote_listing_table = self._remote_query(storage)?; + let memtable = + crate::event::STREAM_WRITERS.recordbatches_cloned(&self.stream_name, &self.schema); + let table = + QueryTableProvider::try_new(memtable, remote_listing_table, self.schema.clone())?; - let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - let listing_options = ListingOptions { - file_extension: ".parquet".to_string(), - file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]], - infinite_source: false, - format: Arc::new(file_format), - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 32, - }; - - let config = ListingTableConfig::new_with_multi_paths(prefixes) - .with_listing_options(listing_options) - .with_schema(self.schema.clone()); - - let table = Arc::new(ListingTable::try_new(config)?); - - ctx.register_table(&*self.stream_name, table) + ctx.register_table(&*self.stream_name, Arc::new(table)) .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; @@ -165,15 +151,52 @@ impl Query { Ok((results, fields)) } + + fn _remote_query( + &self, + storage: Arc, + ) -> Result>, ExecuteError> { + let prefixes = storage.query_prefixes(self.get_prefixes()); + if prefixes.is_empty() { + return Ok(None); + } + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; + let listing_options = ListingOptions { + file_extension: ".parquet".to_string(), + file_sort_order, + infinite_source: false, + format: Arc::new(file_format), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 32, + }; + let config = ListingTableConfig::new_with_multi_paths(prefixes) + .with_listing_options(listing_options) + .with_schema(self.schema.clone()); + + let listing_table = Arc::new(ListingTable::try_new(config)?); + Ok(Some(listing_table)) + } +} + +#[allow(dead_code)] +fn get_staging_prefixes( + stream_name: &str, + start: DateTime, + end: DateTime, +) -> HashMap> { + let dir = StorageDir::new(stream_name); + let mut files = dir.arrow_files_grouped_by_time(); + files.retain(|k, _| path_intersects_query(k, start, end)); + files } -#[allow(unused)] fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTime) -> bool { let time = time_from_path(path); starttime <= time && time <= endtime } -#[allow(unused)] fn time_from_path(path: &Path) -> DateTime { let prefix = path .file_name() diff --git a/server/src/query/filter_optimizer.rs b/server/src/query/filter_optimizer.rs index be055de00..4427445ce 100644 --- a/server/src/query/filter_optimizer.rs +++ b/server/src/query/filter_optimizer.rs @@ -32,6 +32,10 @@ pub struct FilterOptimizerRule { pub literals: Vec, } +// Try to add filter node on table scan +// As every table supports projection push down +// we try to directly add projection for column directly to table +// To preserve the orignal projection we must add a projection node with orignal projection impl OptimizerRule for FilterOptimizerRule { fn try_optimize( &self, @@ -41,6 +45,12 @@ impl OptimizerRule for FilterOptimizerRule { // if there are no patterns then the rule cannot be performed let Some(filter_expr) = self.expr() else { return Ok(None); }; + if let LogicalPlan::Filter(filter) = plan { + if filter.predicate == filter_expr { + return Ok(None); + } + } + if let LogicalPlan::TableScan(table) = plan { if table.projection.is_none() || table @@ -53,7 +63,9 @@ impl OptimizerRule for FilterOptimizerRule { let mut table = table.clone(); let schema = &table.source.schema(); + let orignal_projection = table.projected_schema.clone(); + // add filtered column projection to table if !table .projected_schema .has_column_with_unqualified_name(&self.column) @@ -76,14 +88,13 @@ impl OptimizerRule for FilterOptimizerRule { } } - let projected_schema = table.projected_schema.clone(); let filter = LogicalPlan::Filter(Filter::try_new( filter_expr, Arc::new(LogicalPlan::TableScan(table)), )?); let plan = LogicalPlan::Projection(Projection::new_from_schema( Arc::new(filter), - projected_schema, + orignal_projection, )); return Ok(Some(plan)); diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs new file mode 100644 index 000000000..275cc9914 --- /dev/null +++ b/server/src/query/table_provider.rs @@ -0,0 +1,132 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use async_trait::async_trait; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::listing::ListingTable; + +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; +use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::Expr; + +use std::any::Any; +use std::sync::Arc; +use std::vec; + +pub struct QueryTableProvider { + staging: Option, + // remote table + storage: Option>, + schema: Arc, +} + +impl QueryTableProvider { + pub fn try_new( + staging: Option>, + storage: Option>, + schema: Arc, + ) -> Result { + let memtable = staging + .map(|records| MemTable::try_new(schema.clone(), vec![records])) + .transpose()?; + + Ok(Self { + staging: memtable, + storage, + schema, + }) + } + + async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let mut exec = vec![]; + + if let Some(table) = &self.staging { + exec.push(table.scan(ctx, projection, filters, limit).await?) + } + + if let Some(storage_listing) = self.storage.clone() { + exec.push( + storage_listing + .scan(ctx, projection, filters, limit) + .await?, + ); + } + + let exec: Arc = if exec.is_empty() { + let schema = match projection { + Some(projection) => Arc::new(self.schema.project(projection)?), + None => self.schema.clone(), + }; + Arc::new(EmptyExec::new(false, schema)) + } else if exec.len() == 1 { + exec.pop().unwrap() + } else { + Arc::new(UnionExec::new(exec)) + }; + + Ok(exec) + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::error::Result> { + Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Inexact) + .collect()) + } +} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 48af6d11d..90bc0fcf1 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -99,7 +99,7 @@ impl StorageDir { paths } - #[allow(unused)] + #[allow(dead_code)] pub fn arrow_files_grouped_by_time(&self) -> HashMap> { // hashmap let mut grouped_arrow_file: HashMap> = HashMap::new(); @@ -206,7 +206,7 @@ pub fn convert_disk_files_to_parquet( let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(&schema) { + for ref record in record_reader.merged_iter(schema) { writer.write(record)?; } diff --git a/server/src/utils/arrow/batch_adapter.rs b/server/src/utils/arrow/batch_adapter.rs index 5f3a91935..c9a58c74f 100644 --- a/server/src/utils/arrow/batch_adapter.rs +++ b/server/src/utils/arrow/batch_adapter.rs @@ -30,7 +30,7 @@ use std::sync::Arc; // log stream schema. // This is necessary because all the record batches in a log // stream need to have all the fields. -pub fn adapt_batch(table_schema: &Schema, batch: RecordBatch) -> RecordBatch { +pub fn adapt_batch(table_schema: &Schema, batch: &RecordBatch) -> RecordBatch { let batch_schema = &*batch.schema(); let batch_cols = batch.columns().to_vec(); diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 102e4b7cf..19a29ac6d 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -17,7 +17,7 @@ * */ -use std::{fs::File, io::BufReader, path::PathBuf}; +use std::{fs::File, io::BufReader, path::PathBuf, sync::Arc}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_ipc::reader::StreamReader; @@ -44,7 +44,7 @@ impl MergedRecordReader { Ok(Self { readers }) } - pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { + pub fn merged_iter(self, schema: Arc) -> impl Iterator { let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten()); kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { @@ -52,7 +52,7 @@ impl MergedRecordReader { let b_time = get_timestamp_millis(b); a_time < b_time }) - .map(|batch| adapt_batch(schema, batch)) + .map(move |batch| adapt_batch(&schema, &batch)) } pub fn merged_schema(&self) -> Schema {