From aa27ed234de48b1928747cf3ad5c796ea49c9e07 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 22 Apr 2023 12:15:57 +0530 Subject: [PATCH 1/3] Add local query for in memory mode --- server/src/event/writer.rs | 13 +++ server/src/event/writer/mem_writer.rs | 1 - server/src/query.rs | 43 ++++++-- server/src/query/table_provider.rs | 137 ++++++++++++++++++++++++++ server/src/storage/staging.rs | 1 + 5 files changed, 187 insertions(+), 8 deletions(-) create mode 100644 server/src/query/table_provider.rs diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index ea36b5e03..6cc0efbec 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -150,6 +150,19 @@ impl WriterTable { } } } + + pub fn clone_read_buf(&self, stream_name: &str) -> Option { + let hashmap_guard = self.read().unwrap(); + let (writer, context) = hashmap_guard.get(stream_name)?; + let writer = writer.lock().unwrap(); + match &*writer { + StreamWriter::Mem(mem) => Some(ReadBuf { + time: context.time, + buf: mem.recordbatch_cloned(), + }), + StreamWriter::Disk(_) => None, + } + } } pub mod errors { diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index c661fc427..7dc7d4203 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -46,7 +46,6 @@ impl MemWriter { self.mutable_buffer.push(rb) } - #[allow(unused)] pub fn recordbatch_cloned(&self) -> Vec { let mut read_buffer = self.read_buffer.clone(); let rb = self.mutable_buffer.recordbatch_cloned(); diff --git a/server/src/query.rs b/server/src/query.rs index 270fa388b..1e6ee258c 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,6 +16,8 @@ * */ +pub mod table_provider; + use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; @@ -27,13 +29,16 @@ use serde_json::Value; use std::path::Path; use std::sync::Arc; +use crate::event::STREAM_WRITERS; use crate::option::CONFIG; +use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS}; use crate::storage::ObjectStorageError; use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; +use self::table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -73,10 +78,6 @@ impl Query { .collect() } - pub fn get_schema(&self) -> &Schema { - &self.schema - } - /// Execute query on object storage(and if necessary on cache as well) with given stream information /// TODO: find a way to query all selected parquet files together in a single context. pub async fn execute( @@ -88,9 +89,13 @@ impl Query { CONFIG.storage().get_datafusion_runtime(), ); - let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else { - return Ok((Vec::new(), Vec::new())); - }; + let prefixes = self.get_prefixes(); + let table = QueryTableProvider::new( + prefixes, + storage, + get_all_read_buf(&self.stream_name, self.start, self.end), + Arc::clone(&self.schema), + ); ctx.register_table( &*self.stream_name, @@ -188,3 +193,27 @@ mod tests { assert_eq!(time.timestamp(), 1640995200); } } + +fn get_all_read_buf(stream_name: &str, start: DateTime, end: DateTime) -> Vec { + let now = Utc::now(); + let include_mutable = start <= now && now <= end; + // copy from mutable buffer + let mut queryable_read_buffer = Vec::new(); + + if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) { + for read_buffer in mem { + let time = read_buffer.time; + if start.naive_utc() <= time && time <= end.naive_utc() { + queryable_read_buffer.push(read_buffer.clone()) + } + } + } + + if include_mutable { + if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) { + queryable_read_buffer.push(x); + } + } + + queryable_read_buffer +} diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs new file mode 100644 index 000000000..9e037b6ef --- /dev/null +++ b/server/src/query/table_provider.rs @@ -0,0 +1,137 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use async_trait::async_trait; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::Expr; +use std::any::Any; +use std::sync::Arc; + +use crate::storage::staging::ReadBuf; +use crate::storage::ObjectStorage; +use crate::utils::arrow::adapt_batch; + +pub struct QueryTableProvider { + storage_prefixes: Vec, + storage: Arc, + readable_buffer: Vec, + schema: Arc, +} + +impl QueryTableProvider { + pub fn new( + storage_prefixes: Vec, + storage: Arc, + readable_buffer: Vec, + schema: Arc, + ) -> Self { + Self { + storage_prefixes, + storage, + readable_buffer, + schema, + } + } + + async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let memexec = self.get_mem_exec(ctx, projection, filters, limit).await?; + let table = self + .storage + .query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?; + + let mut exec = Vec::new(); + if let Some(memexec) = memexec { + exec.push(memexec); + } + + if let Some(ref storage_listing) = table { + exec.push( + storage_listing + .scan(ctx, projection, filters, limit) + .await?, + ); + } + Ok(Arc::new(UnionExec::new(exec))) + } + + async fn get_mem_exec( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result>, DataFusionError> { + if self.readable_buffer.is_empty() { + return Ok(None); + } + + let mem_records: Vec> = self + .readable_buffer + .iter() + .map(|r| { + r.buf + .iter() + .cloned() + .map(|rb| adapt_batch(&self.schema, rb)) + .collect() + }) + .collect(); + + let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; + let memexec = memtable.scan(ctx, projection, filters, limit).await?; + Ok(Some(memexec)) + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await + } +} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index a140488f3..47eaab7a0 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -62,6 +62,7 @@ pub fn take_all_read_bufs() -> Vec<(String, Vec)> { res } +#[derive(Debug, Clone)] pub struct ReadBuf { pub time: NaiveDateTime, pub buf: Vec, From a38abef7ce1d35321b14fdd3a38ca9588d069edf Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 23 Apr 2023 23:21:48 +0530 Subject: [PATCH 2/3] Fix --- server/src/query/table_provider.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 9e037b6ef..eb79385d1 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -22,6 +22,7 @@ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::logical_expr::TableType; +use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; @@ -78,7 +79,12 @@ impl QueryTableProvider { .await?, ); } - Ok(Arc::new(UnionExec::new(exec))) + + if exec.is_empty() { + Ok(Arc::new(EmptyExec::new(false, Arc::clone(&self.schema)))) + } else { + Ok(Arc::new(UnionExec::new(exec))) + } } async fn get_mem_exec( From 9528666d5ecb9b068d41c4142b582bd02f7ca1b9 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 24 Apr 2023 12:43:24 +0530 Subject: [PATCH 3/3] Fix --- server/src/query.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 1e6ee258c..a419925ea 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -181,19 +181,6 @@ pub mod error { } } -#[cfg(test)] -mod tests { - use super::time_from_path; - use std::path::PathBuf; - - #[test] - fn test_time_from_parquet_path() { - let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); - let time = time_from_path(path.as_path()); - assert_eq!(time.timestamp(), 1640995200); - } -} - fn get_all_read_buf(stream_name: &str, start: DateTime, end: DateTime) -> Vec { let now = Utc::now(); let include_mutable = start <= now && now <= end; @@ -217,3 +204,16 @@ fn get_all_read_buf(stream_name: &str, start: DateTime, end: DateTime) queryable_read_buffer } + +#[cfg(test)] +mod tests { + use super::time_from_path; + use std::path::PathBuf; + + #[test] + fn test_time_from_parquet_path() { + let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); + let time = time_from_path(path.as_path()); + assert_eq!(time.timestamp(), 1640995200); + } +}