From 4b0076171e1aea64b38e77cbef2a977a0838cfe2 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 30 Jun 2023 14:19:00 +0530 Subject: [PATCH 01/11] Add local query --- server/src/query.rs | 80 +++++--- server/src/query/table_provider.rs | 234 ++++++++++++++++++++++++ server/src/storage/staging.rs | 3 +- server/src/utils/arrow/merged_reader.rs | 6 +- 4 files changed, 290 insertions(+), 33 deletions(-) create mode 100644 server/src/query/table_provider.rs diff --git a/server/src/query.rs b/server/src/query.rs index 551a7f3d6..8bfd3318b 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -17,6 +17,7 @@ */ mod filter_optimizer; +mod table_provider; use chrono::TimeZone; use chrono::{DateTime, Utc}; @@ -30,19 +31,21 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; use itertools::Itertools; use serde_json::Value; -use std::path::Path; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; use std::sync::Arc; use sysinfo::{System, SystemExt}; use crate::event::DEFAULT_TIMESTAMP_KEY; use crate::option::CONFIG; -use crate::storage::ObjectStorageError; use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; +use crate::storage::{ObjectStorageError, StorageDir}; use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; use self::filter_optimizer::FilterOptimizerRule; +use self::table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -124,30 +127,15 @@ impl Query { storage: Arc, ) -> Result<(Vec, Vec), ExecuteError> { let ctx = self.create_session_context(); - let prefixes = storage.query_prefixes(self.get_prefixes()); - - if prefixes.is_empty() { - return Ok((Vec::new(), Vec::new())); - } - - let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - let listing_options = ListingOptions { - file_extension: ".parquet".to_string(), - file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]], - infinite_source: false, - format: Arc::new(file_format), - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 32, - }; - - let config = ListingTableConfig::new_with_multi_paths(prefixes) - .with_listing_options(listing_options) - .with_schema(self.schema.clone()); - - let table = Arc::new(ListingTable::try_new(config)?); - - ctx.register_table(&*self.stream_name, table) + let remote_listing_table = self._remote_query(storage)?; + let staging_prefixes = get_staging_prefixes(&self.stream_name, self.start, self.end); + let table = QueryTableProvider::new( + staging_prefixes.into_values().collect(), + remote_listing_table, + self.schema.clone(), + ); + + ctx.register_table(&*self.stream_name, Arc::new(table)) .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; @@ -165,15 +153,51 @@ impl Query { Ok((results, fields)) } + + fn _remote_query( + &self, + storage: Arc, + ) -> Result>, ExecuteError> { + let prefixes = storage.query_prefixes(self.get_prefixes()); + if prefixes.is_empty() { + return Ok(None); + } + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; + let listing_options = ListingOptions { + file_extension: ".parquet".to_string(), + file_sort_order, + infinite_source: false, + format: Arc::new(file_format), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 32, + }; + let config = ListingTableConfig::new_with_multi_paths(prefixes) + .with_listing_options(listing_options) + .with_schema(self.schema.clone()); + + let listing_table = Arc::new(ListingTable::try_new(config)?); + Ok(Some(listing_table)) + } +} + +fn get_staging_prefixes( + stream_name: &str, + start: DateTime, + end: DateTime, +) -> HashMap> { + let dir = StorageDir::new(stream_name); + let mut files = dir.arrow_files_grouped_by_time(); + files.retain(|k, _| path_intersects_query(k, start, end)); + files } -#[allow(unused)] fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTime) -> bool { let time = time_from_path(path); starttime <= time && time <= endtime } -#[allow(unused)] fn time_from_path(path: &Path) -> DateTime { let prefix = path .file_name() diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs new file mode 100644 index 000000000..9a248bd1d --- /dev/null +++ b/server/src/query/table_provider.rs @@ -0,0 +1,234 @@ +use arrow_select::concat::concat; +use async_trait::async_trait; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; + +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; +use datafusion::datasource::TableProvider; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream}; +use datafusion::prelude::Expr; +use futures_util::{Stream, StreamExt}; +use std::any::Any; +use std::path::PathBuf; +use std::sync::Arc; +use std::vec; + +use crate::utils::arrow::MergedRecordReader; + +pub struct QueryTableProvider { + // ( arrow files ) sorted by time asc + staging_arrows: Vec>, + // remote table + storage: Option>, + schema: Arc, +} + +impl QueryTableProvider { + pub fn new( + staging_arrows: Vec>, + storage: Option>, + schema: Arc, + ) -> Self { + Self { + staging_arrows, + storage, + schema, + } + } + + async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let mut exec = vec![]; + let local_table = get_streaming_table(self.staging_arrows.clone(), self.schema.clone()); + if let Some(table) = local_table { + exec.push(table.scan(ctx, projection, filters, limit).await?) + } + if let Some(storage_listing) = self.storage.clone() { + exec.push( + storage_listing + .scan(ctx, projection, filters, limit) + .await?, + ); + } + + let exec: Arc = if exec.len() == 0 { + Arc::new(EmptyExec::new(false, self.schema.clone())) + } else if exec.len() == 1 { + exec.pop().unwrap() + } else { + Arc::new(UnionExec::new(exec)) + }; + + Ok(exec) + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await + } +} + +// create streaming table from arrow files on staging. +// partitions are created in sorted order. +// if partition cannot be created mid way then only latest available partitions are returned +fn get_streaming_table( + staging_arrow: Vec>, + schema: Arc, +) -> Option { + let mut partitions: Vec> = Vec::new(); + + for files in staging_arrow { + let partition = ArrowFilesPartition { + schema: schema.clone(), + files, + }; + partitions.push(Arc::new(partition)) + } + + if partitions.is_empty() { + None + } else { + //todo remove unwrap + Some(StreamingTable::try_new(schema, partitions).unwrap()) + } +} + +struct ConcatIterator> { + buffer: Vec, + schema: Arc, + max_size: usize, + iter: T, +} + +impl> Iterator for ConcatIterator { + type Item = RecordBatch; + + fn next(&mut self) -> Option { + let mut size = 0; + + while size < self.max_size { + if let Some(rb) = self.iter.next() { + size += rb.num_rows(); + self.buffer.push(rb); + } else { + break; + } + } + + if self.buffer.len() == 0 { + return None; + } + + // create new batch + let field_num = self.schema.fields().len(); + let mut arrays = Vec::with_capacity(field_num); + for i in 0..field_num { + let array = concat( + &self + .buffer + .iter() + .map(|batch| batch.column(i).as_ref()) + .collect::>(), + ) + .expect("all records are of same schema and valid"); + arrays.push(array); + } + let res = RecordBatch::try_new(self.schema.clone(), arrays).unwrap(); + + // clear buffer + self.buffer.clear(); + + //return resulting batch + return Some(res); + } +} + +struct ArrowRecordBatchStream + Unpin> { + schema: Arc, + stream: T, +} + +impl Stream for ArrowRecordBatchStream +where + T: Stream + Unpin, +{ + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().stream.poll_next_unpin(cx).map(|x| x.map(Ok)) + } +} + +impl RecordBatchStream for ArrowRecordBatchStream +where + T: Stream + Unpin, +{ + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +struct ArrowFilesPartition { + schema: Arc, + files: Vec, +} + +impl PartitionStream for ArrowFilesPartition { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute( + &self, + _ctx: Arc, + ) -> datafusion::physical_plan::SendableRecordBatchStream { + let reader = MergedRecordReader::try_new(&self.files).unwrap(); + let reader = reader.merged_iter(self.schema.clone()); + let reader = ConcatIterator { + buffer: Vec::new(), + schema: self.schema.clone(), + max_size: 10000, + iter: reader, + }; + + Box::pin(ArrowRecordBatchStream { + schema: self.schema.clone(), + stream: futures::stream::iter(reader), + }) + } +} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 48af6d11d..a2a27e995 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -99,7 +99,6 @@ impl StorageDir { paths } - #[allow(unused)] pub fn arrow_files_grouped_by_time(&self) -> HashMap> { // hashmap let mut grouped_arrow_file: HashMap> = HashMap::new(); @@ -206,7 +205,7 @@ pub fn convert_disk_files_to_parquet( let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(&schema) { + for ref record in record_reader.merged_iter(schema) { writer.write(record)?; } diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 102e4b7cf..86248b728 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -17,7 +17,7 @@ * */ -use std::{fs::File, io::BufReader, path::PathBuf}; +use std::{fs::File, io::BufReader, path::PathBuf, sync::Arc}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_ipc::reader::StreamReader; @@ -44,7 +44,7 @@ impl MergedRecordReader { Ok(Self { readers }) } - pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { + pub fn merged_iter(self, schema: Arc) -> impl Iterator { let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten()); kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { @@ -52,7 +52,7 @@ impl MergedRecordReader { let b_time = get_timestamp_millis(b); a_time < b_time }) - .map(|batch| adapt_batch(schema, batch)) + .map(move |batch| adapt_batch(&schema, batch)) } pub fn merged_schema(&self) -> Schema { From bb9222754f8fdf41e65bfd363e4ee5a4541dd794 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 30 Jun 2023 14:24:41 +0530 Subject: [PATCH 02/11] Clippy --- server/src/query/table_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 9a248bd1d..c57509dd6 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -147,7 +147,7 @@ impl> Iterator for ConcatIterator { } } - if self.buffer.len() == 0 { + if self.buffer.is_empty() { return None; } From 509bcd53c91163915a2f5fe2232b0aab145273bf Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 30 Jun 2023 14:27:13 +0530 Subject: [PATCH 03/11] Banner --- server/src/query/table_provider.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index c57509dd6..1ec92f207 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -1,7 +1,24 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use arrow_select::concat::concat; use async_trait::async_trait; use datafusion::arrow::datatypes::{Schema, SchemaRef}; - use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::listing::ListingTable; use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; @@ -14,6 +31,7 @@ use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream}; use datafusion::prelude::Expr; use futures_util::{Stream, StreamExt}; + use std::any::Any; use std::path::PathBuf; use std::sync::Arc; From e52c47beedb1ab60c4fdbda2198712603c034cb1 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 30 Jun 2023 14:33:58 +0530 Subject: [PATCH 04/11] Clippy --- server/src/query/table_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 1ec92f207..b8737ce25 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -80,7 +80,7 @@ impl QueryTableProvider { ); } - let exec: Arc = if exec.len() == 0 { + let exec: Arc = if exec.is_empty() { Arc::new(EmptyExec::new(false, self.schema.clone())) } else if exec.len() == 1 { exec.pop().unwrap() From 76154bad07eb8f492ee4a8a8c2ffb85ff8c78b85 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 30 Jun 2023 18:06:01 +0530 Subject: [PATCH 05/11] Clippy --- server/src/query/table_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index b8737ce25..b1a6535ae 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -189,7 +189,7 @@ impl> Iterator for ConcatIterator { self.buffer.clear(); //return resulting batch - return Some(res); + Some(res) } } From 21851def3dbd23ddf235f66fbd717e3965b3619f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 30 Jun 2023 21:02:46 +0530 Subject: [PATCH 06/11] Fix filter optimizer --- server/src/query/filter_optimizer.rs | 15 +++++++++++++-- server/src/query/table_provider.rs | 12 +++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/server/src/query/filter_optimizer.rs b/server/src/query/filter_optimizer.rs index be055de00..4427445ce 100644 --- a/server/src/query/filter_optimizer.rs +++ b/server/src/query/filter_optimizer.rs @@ -32,6 +32,10 @@ pub struct FilterOptimizerRule { pub literals: Vec, } +// Try to add filter node on table scan +// As every table supports projection push down +// we try to directly add projection for column directly to table +// To preserve the orignal projection we must add a projection node with orignal projection impl OptimizerRule for FilterOptimizerRule { fn try_optimize( &self, @@ -41,6 +45,12 @@ impl OptimizerRule for FilterOptimizerRule { // if there are no patterns then the rule cannot be performed let Some(filter_expr) = self.expr() else { return Ok(None); }; + if let LogicalPlan::Filter(filter) = plan { + if filter.predicate == filter_expr { + return Ok(None); + } + } + if let LogicalPlan::TableScan(table) = plan { if table.projection.is_none() || table @@ -53,7 +63,9 @@ impl OptimizerRule for FilterOptimizerRule { let mut table = table.clone(); let schema = &table.source.schema(); + let orignal_projection = table.projected_schema.clone(); + // add filtered column projection to table if !table .projected_schema .has_column_with_unqualified_name(&self.column) @@ -76,14 +88,13 @@ impl OptimizerRule for FilterOptimizerRule { } } - let projected_schema = table.projected_schema.clone(); let filter = LogicalPlan::Filter(Filter::try_new( filter_expr, Arc::new(LogicalPlan::TableScan(table)), )?); let plan = LogicalPlan::Projection(Projection::new_from_schema( Arc::new(filter), - projected_schema, + orignal_projection, )); return Ok(Some(plan)); diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index b1a6535ae..86c5dea9f 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -25,7 +25,7 @@ use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; -use datafusion::logical_expr::TableType; +use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream}; @@ -116,6 +116,16 @@ impl TableProvider for QueryTableProvider { self.create_physical_plan(ctx, projection, filters, limit) .await } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::error::Result> { + Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Inexact) + .collect()) + } } // create streaming table from arrow files on staging. From 8fa7739022ef6720a5a3d58c57387b370568a832 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 1 Jul 2023 10:54:20 +0530 Subject: [PATCH 07/11] Fix empty exec projection --- server/src/query/table_provider.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 86c5dea9f..e1e46c1e6 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -81,7 +81,11 @@ impl QueryTableProvider { } let exec: Arc = if exec.is_empty() { - Arc::new(EmptyExec::new(false, self.schema.clone())) + let schema = match projection { + Some(projection) => Arc::new(self.schema.project(&projection)?), + None => self.schema.clone(), + }; + Arc::new(EmptyExec::new(false, schema)) } else if exec.len() == 1 { exec.pop().unwrap() } else { From f1dd5395f4da3aacfffd56dffba51ab01d9308a1 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 1 Jul 2023 11:00:58 +0530 Subject: [PATCH 08/11] Fix --- server/src/query/table_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index e1e46c1e6..9af2e27fc 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -82,7 +82,7 @@ impl QueryTableProvider { let exec: Arc = if exec.is_empty() { let schema = match projection { - Some(projection) => Arc::new(self.schema.project(&projection)?), + Some(projection) => Arc::new(self.schema.project(projection)?), None => self.schema.clone(), }; Arc::new(EmptyExec::new(false, schema)) From db793bd8c888f83fa3bff17ab6fb7feeb1647cb4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 10 Jul 2023 11:06:18 +0530 Subject: [PATCH 09/11] Use memtable instead of streaming table --- server/src/event/writer.rs | 55 ++++++-- server/src/event/writer/mem_writer.rs | 120 +++++++++++++++++ server/src/query.rs | 11 +- server/src/query/table_provider.rs | 168 +++--------------------- server/src/storage/staging.rs | 1 + server/src/utils/arrow/batch_adapter.rs | 2 +- server/src/utils/arrow/merged_reader.rs | 2 +- 7 files changed, 192 insertions(+), 167 deletions(-) create mode 100644 server/src/event/writer/mem_writer.rs diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2b44fc542..cdd82a029 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -18,21 +18,42 @@ */ mod file_writer; +mod mem_writer; use std::{ collections::HashMap, - sync::{Mutex, RwLock}, + sync::{Arc, Mutex, RwLock}, }; -use self::{errors::StreamWriterError, file_writer::FileWriter}; +use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; use arrow_array::RecordBatch; +use arrow_schema::Schema; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); +#[derive(Default)] +pub struct Writer { + pub mem: MemWriter<1024>, + pub disk: FileWriter, +} + +impl Writer { + fn push( + &mut self, + stream_name: &str, + schema_key: &str, + rb: RecordBatch, + ) -> Result<(), StreamWriterError> { + self.disk.push(stream_name, schema_key, &rb)?; + self.mem.push(schema_key, rb); + Ok(()) + } +} + #[derive(Deref, DerefMut, Default)] -pub struct WriterTable(RwLock>>); +pub struct WriterTable(RwLock>>); impl WriterTable { // append to a existing stream @@ -49,7 +70,7 @@ impl WriterTable { stream_writer .lock() .unwrap() - .push(stream_name, schema_key, &record)?; + .push(stream_name, schema_key, record)?; } None => { drop(hashmap_guard); @@ -60,10 +81,10 @@ impl WriterTable { writer .lock() .unwrap() - .push(stream_name, schema_key, &record)?; + .push(stream_name, schema_key, record)?; } else { - let mut writer = FileWriter::default(); - writer.push(stream_name, schema_key, &record)?; + let mut writer = Writer::default(); + writer.push(stream_name, schema_key, record)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } @@ -81,9 +102,27 @@ impl WriterTable { drop(table); for writer in map.into_values() { let writer = writer.into_inner().unwrap(); - writer.close_all(); + writer.disk.close_all(); } } + + pub fn recordbatches_cloned( + &self, + stream_name: &str, + schema: &Arc, + ) -> Option> { + let records = self + .0 + .read() + .unwrap() + .get(stream_name)? + .lock() + .unwrap() + .mem + .recordbatch_cloned(schema); + + Some(records) + } } pub mod errors { diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs new file mode 100644 index 000000000..2716d48cd --- /dev/null +++ b/server/src/event/writer/mem_writer.rs @@ -0,0 +1,120 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashSet, sync::Arc}; + +use arrow_array::RecordBatch; +use arrow_schema::Schema; +use arrow_select::concat::concat_batches; +use itertools::Itertools; + +use crate::utils::arrow::adapt_batch; + +/// Structure to keep recordbatches in memory. +/// +/// Any new schema is updated in the schema map. +/// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer +#[derive(Debug)] +pub struct MemWriter { + schema: Schema, + // for checking uniqueness of schema + schema_map: HashSet, + read_buffer: Vec, + mutable_buffer: MutableBuffer, +} + +impl Default for MemWriter { + fn default() -> Self { + Self { + schema: Schema::empty(), + schema_map: HashSet::default(), + read_buffer: Vec::default(), + mutable_buffer: MutableBuffer::default(), + } + } +} + +impl MemWriter { + pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { + if !self.schema_map.contains(schema_key) { + self.schema_map.insert(schema_key.to_owned()); + self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap(); + } + + if let Some(record) = self.mutable_buffer.push(rb) { + let record = concat_records(&Arc::new(self.schema.clone()), &record); + self.read_buffer.push(record); + } + } + + pub fn recordbatch_cloned(&self, schema: &Arc) -> Vec { + let mut read_buffer = self.read_buffer.clone(); + if self.mutable_buffer.rows > 0 { + let rb = concat_records(schema, &self.mutable_buffer.inner); + read_buffer.push(rb) + } + + read_buffer + .into_iter() + .map(|rb| adapt_batch(schema, &rb)) + .collect() + } +} + +fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { + let records = record.iter().map(|x| adapt_batch(schema, x)).collect_vec(); + let record = concat_batches(schema, records.iter()).unwrap(); + record +} + +#[derive(Debug, Default)] +struct MutableBuffer { + pub inner: Vec, + pub rows: usize, +} + +impl MutableBuffer { + fn push(&mut self, rb: RecordBatch) -> Option> { + if self.rows + rb.num_rows() >= N { + let left = N - self.rows; + let right = rb.num_rows() - left; + let left_slice = rb.slice(0, left); + let right_slice = if left < rb.num_rows() { + Some(rb.slice(left, right)) + } else { + None + }; + self.inner.push(left_slice); + // take all records + let src = Vec::with_capacity(self.inner.len()); + let inner = std::mem::replace(&mut self.inner, src); + self.rows = 0; + + if let Some(right_slice) = right_slice { + self.rows = right_slice.num_rows(); + self.inner.push(right_slice); + } + + Some(inner) + } else { + self.rows += rb.num_rows(); + self.inner.push(rb); + None + } + } +} diff --git a/server/src/query.rs b/server/src/query.rs index 8bfd3318b..8997ccdf7 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -128,12 +128,10 @@ impl Query { ) -> Result<(Vec, Vec), ExecuteError> { let ctx = self.create_session_context(); let remote_listing_table = self._remote_query(storage)?; - let staging_prefixes = get_staging_prefixes(&self.stream_name, self.start, self.end); - let table = QueryTableProvider::new( - staging_prefixes.into_values().collect(), - remote_listing_table, - self.schema.clone(), - ); + let memtable = + crate::event::STREAM_WRITERS.recordbatches_cloned(&self.stream_name, &self.schema); + let table = + QueryTableProvider::try_new(memtable, remote_listing_table, self.schema.clone())?; ctx.register_table(&*self.stream_name, Arc::new(table)) .map_err(ObjectStorageError::DataFusionError)?; @@ -182,6 +180,7 @@ impl Query { } } +#[allow(dead_code)] fn get_staging_prefixes( stream_name: &str, start: DateTime, diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 9af2e27fc..275cc9914 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -16,48 +16,46 @@ * */ -use arrow_select::concat::concat; use async_trait::async_trait; use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::listing::ListingTable; -use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; -use datafusion::datasource::TableProvider; + +use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; -use futures_util::{Stream, StreamExt}; use std::any::Any; -use std::path::PathBuf; use std::sync::Arc; use std::vec; -use crate::utils::arrow::MergedRecordReader; - pub struct QueryTableProvider { - // ( arrow files ) sorted by time asc - staging_arrows: Vec>, + staging: Option, // remote table storage: Option>, schema: Arc, } impl QueryTableProvider { - pub fn new( - staging_arrows: Vec>, + pub fn try_new( + staging: Option>, storage: Option>, schema: Arc, - ) -> Self { - Self { - staging_arrows, + ) -> Result { + let memtable = staging + .map(|records| MemTable::try_new(schema.clone(), vec![records])) + .transpose()?; + + Ok(Self { + staging: memtable, storage, schema, - } + }) } async fn create_physical_plan( @@ -68,10 +66,11 @@ impl QueryTableProvider { limit: Option, ) -> Result, DataFusionError> { let mut exec = vec![]; - let local_table = get_streaming_table(self.staging_arrows.clone(), self.schema.clone()); - if let Some(table) = local_table { + + if let Some(table) = &self.staging { exec.push(table.scan(ctx, projection, filters, limit).await?) } + if let Some(storage_listing) = self.storage.clone() { exec.push( storage_listing @@ -131,136 +130,3 @@ impl TableProvider for QueryTableProvider { .collect()) } } - -// create streaming table from arrow files on staging. -// partitions are created in sorted order. -// if partition cannot be created mid way then only latest available partitions are returned -fn get_streaming_table( - staging_arrow: Vec>, - schema: Arc, -) -> Option { - let mut partitions: Vec> = Vec::new(); - - for files in staging_arrow { - let partition = ArrowFilesPartition { - schema: schema.clone(), - files, - }; - partitions.push(Arc::new(partition)) - } - - if partitions.is_empty() { - None - } else { - //todo remove unwrap - Some(StreamingTable::try_new(schema, partitions).unwrap()) - } -} - -struct ConcatIterator> { - buffer: Vec, - schema: Arc, - max_size: usize, - iter: T, -} - -impl> Iterator for ConcatIterator { - type Item = RecordBatch; - - fn next(&mut self) -> Option { - let mut size = 0; - - while size < self.max_size { - if let Some(rb) = self.iter.next() { - size += rb.num_rows(); - self.buffer.push(rb); - } else { - break; - } - } - - if self.buffer.is_empty() { - return None; - } - - // create new batch - let field_num = self.schema.fields().len(); - let mut arrays = Vec::with_capacity(field_num); - for i in 0..field_num { - let array = concat( - &self - .buffer - .iter() - .map(|batch| batch.column(i).as_ref()) - .collect::>(), - ) - .expect("all records are of same schema and valid"); - arrays.push(array); - } - let res = RecordBatch::try_new(self.schema.clone(), arrays).unwrap(); - - // clear buffer - self.buffer.clear(); - - //return resulting batch - Some(res) - } -} - -struct ArrowRecordBatchStream + Unpin> { - schema: Arc, - stream: T, -} - -impl Stream for ArrowRecordBatchStream -where - T: Stream + Unpin, -{ - type Item = Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.get_mut().stream.poll_next_unpin(cx).map(|x| x.map(Ok)) - } -} - -impl RecordBatchStream for ArrowRecordBatchStream -where - T: Stream + Unpin, -{ - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -struct ArrowFilesPartition { - schema: Arc, - files: Vec, -} - -impl PartitionStream for ArrowFilesPartition { - fn schema(&self) -> &SchemaRef { - &self.schema - } - - fn execute( - &self, - _ctx: Arc, - ) -> datafusion::physical_plan::SendableRecordBatchStream { - let reader = MergedRecordReader::try_new(&self.files).unwrap(); - let reader = reader.merged_iter(self.schema.clone()); - let reader = ConcatIterator { - buffer: Vec::new(), - schema: self.schema.clone(), - max_size: 10000, - iter: reader, - }; - - Box::pin(ArrowRecordBatchStream { - schema: self.schema.clone(), - stream: futures::stream::iter(reader), - }) - } -} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index a2a27e995..90bc0fcf1 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -99,6 +99,7 @@ impl StorageDir { paths } + #[allow(dead_code)] pub fn arrow_files_grouped_by_time(&self) -> HashMap> { // hashmap let mut grouped_arrow_file: HashMap> = HashMap::new(); diff --git a/server/src/utils/arrow/batch_adapter.rs b/server/src/utils/arrow/batch_adapter.rs index 5f3a91935..c9a58c74f 100644 --- a/server/src/utils/arrow/batch_adapter.rs +++ b/server/src/utils/arrow/batch_adapter.rs @@ -30,7 +30,7 @@ use std::sync::Arc; // log stream schema. // This is necessary because all the record batches in a log // stream need to have all the fields. -pub fn adapt_batch(table_schema: &Schema, batch: RecordBatch) -> RecordBatch { +pub fn adapt_batch(table_schema: &Schema, batch: &RecordBatch) -> RecordBatch { let batch_schema = &*batch.schema(); let batch_cols = batch.columns().to_vec(); diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 86248b728..19a29ac6d 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -52,7 +52,7 @@ impl MergedRecordReader { let b_time = get_timestamp_millis(b); a_time < b_time }) - .map(move |batch| adapt_batch(&schema, batch)) + .map(move |batch| adapt_batch(&schema, &batch)) } pub fn merged_schema(&self) -> Schema { From 97c33fc73baed332ed113b0fb1a9401bb52e2027 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 11 Jul 2023 11:53:23 +0530 Subject: [PATCH 10/11] Fix --- server/src/event/writer.rs | 16 +++++++++++++++- server/src/event/writer/file_writer.rs | 25 ++++++------------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index cdd82a029..1b343593f 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -25,9 +25,12 @@ use std::{ sync::{Arc, Mutex, RwLock}, }; +use crate::utils; + use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; -use arrow_array::RecordBatch; +use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; +use chrono::Utc; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -46,6 +49,13 @@ impl Writer { schema_key: &str, rb: RecordBatch, ) -> Result<(), StreamWriterError> { + let rb = utils::arrow::replace_columns( + rb.schema(), + &rb, + &[0], + &[Arc::new(get_timestamp_array(rb.num_rows()))], + ); + self.disk.push(stream_name, schema_key, &rb)?; self.mem.push(schema_key, rb); Ok(()) @@ -125,6 +135,10 @@ impl WriterTable { } } +fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { + TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) +} + pub mod errors { #[derive(Debug, thiserror::Error)] diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 8c40e435b..a8c1d8918 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -17,17 +17,15 @@ * */ -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::writer::StreamWriter; -use chrono::Utc; -use derive_more::{Deref, DerefMut}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::path::PathBuf; -use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; +use derive_more::{Deref, DerefMut}; use crate::storage::staging::StorageDir; -use crate::utils; use super::errors::StreamWriterError; @@ -47,24 +45,17 @@ impl FileWriter { schema_key: &str, record: &RecordBatch, ) -> Result<(), StreamWriterError> { - let record = utils::arrow::replace_columns( - record.schema(), - record, - &[0], - &[Arc::new(get_timestamp_array(record.num_rows()))], - ); - match self.get_mut(schema_key) { Some(writer) => { writer .writer - .write(&record) + .write(record) .map_err(StreamWriterError::Writer)?; } // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, &record)?; + let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; self.insert( schema_key.to_owned(), ArrowWriter { @@ -85,10 +76,6 @@ impl FileWriter { } } -fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) -} - fn init_new_stream_writer_file( stream_name: &str, schema_key: &str, From 5bab6c9f199cafeb3af63c9cd6c99d9124da9a4b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 13 Jul 2023 12:11:33 +0530 Subject: [PATCH 11/11] Change limit --- server/src/event/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 1b343593f..c7aa8977d 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -38,7 +38,7 @@ pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); #[derive(Default)] pub struct Writer { - pub mem: MemWriter<1024>, + pub mem: MemWriter<16384>, pub disk: FileWriter, }