diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index ea36b5e03..6cc0efbec 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -150,6 +150,19 @@ impl WriterTable { } } } + + pub fn clone_read_buf(&self, stream_name: &str) -> Option { + let hashmap_guard = self.read().unwrap(); + let (writer, context) = hashmap_guard.get(stream_name)?; + let writer = writer.lock().unwrap(); + match &*writer { + StreamWriter::Mem(mem) => Some(ReadBuf { + time: context.time, + buf: mem.recordbatch_cloned(), + }), + StreamWriter::Disk(_) => None, + } + } } pub mod errors { diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index c661fc427..7dc7d4203 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -46,7 +46,6 @@ impl MemWriter { self.mutable_buffer.push(rb) } - #[allow(unused)] pub fn recordbatch_cloned(&self) -> Vec { let mut read_buffer = self.read_buffer.clone(); let rb = self.mutable_buffer.recordbatch_cloned(); diff --git a/server/src/query.rs b/server/src/query.rs index 270fa388b..a419925ea 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,6 +16,8 @@ * */ +pub mod table_provider; + use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; @@ -27,13 +29,16 @@ use serde_json::Value; use std::path::Path; use std::sync::Arc; +use crate::event::STREAM_WRITERS; use crate::option::CONFIG; +use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS}; use crate::storage::ObjectStorageError; use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; +use self::table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -73,10 +78,6 @@ impl Query { .collect() } - pub fn get_schema(&self) -> &Schema { - &self.schema - } - /// Execute query on object storage(and if necessary on cache as well) with given stream information /// TODO: find a way to query all selected parquet files together in a single context. pub async fn execute( @@ -88,9 +89,13 @@ impl Query { CONFIG.storage().get_datafusion_runtime(), ); - let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else { - return Ok((Vec::new(), Vec::new())); - }; + let prefixes = self.get_prefixes(); + let table = QueryTableProvider::new( + prefixes, + storage, + get_all_read_buf(&self.stream_name, self.start, self.end), + Arc::clone(&self.schema), + ); ctx.register_table( &*self.stream_name, @@ -176,6 +181,30 @@ pub mod error { } } +fn get_all_read_buf(stream_name: &str, start: DateTime, end: DateTime) -> Vec { + let now = Utc::now(); + let include_mutable = start <= now && now <= end; + // copy from mutable buffer + let mut queryable_read_buffer = Vec::new(); + + if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) { + for read_buffer in mem { + let time = read_buffer.time; + if start.naive_utc() <= time && time <= end.naive_utc() { + queryable_read_buffer.push(read_buffer.clone()) + } + } + } + + if include_mutable { + if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) { + queryable_read_buffer.push(x); + } + } + + queryable_read_buffer +} + #[cfg(test)] mod tests { use super::time_from_path; diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs new file mode 100644 index 000000000..eb79385d1 --- /dev/null +++ b/server/src/query/table_provider.rs @@ -0,0 +1,143 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use async_trait::async_trait; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::Expr; +use std::any::Any; +use std::sync::Arc; + +use crate::storage::staging::ReadBuf; +use crate::storage::ObjectStorage; +use crate::utils::arrow::adapt_batch; + +pub struct QueryTableProvider { + storage_prefixes: Vec, + storage: Arc, + readable_buffer: Vec, + schema: Arc, +} + +impl QueryTableProvider { + pub fn new( + storage_prefixes: Vec, + storage: Arc, + readable_buffer: Vec, + schema: Arc, + ) -> Self { + Self { + storage_prefixes, + storage, + readable_buffer, + schema, + } + } + + async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let memexec = self.get_mem_exec(ctx, projection, filters, limit).await?; + let table = self + .storage + .query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?; + + let mut exec = Vec::new(); + if let Some(memexec) = memexec { + exec.push(memexec); + } + + if let Some(ref storage_listing) = table { + exec.push( + storage_listing + .scan(ctx, projection, filters, limit) + .await?, + ); + } + + if exec.is_empty() { + Ok(Arc::new(EmptyExec::new(false, Arc::clone(&self.schema)))) + } else { + Ok(Arc::new(UnionExec::new(exec))) + } + } + + async fn get_mem_exec( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result>, DataFusionError> { + if self.readable_buffer.is_empty() { + return Ok(None); + } + + let mem_records: Vec> = self + .readable_buffer + .iter() + .map(|r| { + r.buf + .iter() + .cloned() + .map(|rb| adapt_batch(&self.schema, rb)) + .collect() + }) + .collect(); + + let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; + let memexec = memtable.scan(ctx, projection, filters, limit).await?; + Ok(Some(memexec)) + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await + } +} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index a140488f3..47eaab7a0 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -62,6 +62,7 @@ pub fn take_all_read_bufs() -> Vec<(String, Vec)> { res } +#[derive(Debug, Clone)] pub struct ReadBuf { pub time: NaiveDateTime, pub buf: Vec,