Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ impl WriterTable {
}
}
}

pub fn clone_read_buf(&self, stream_name: &str) -> Option<ReadBuf> {
let hashmap_guard = self.read().unwrap();
let (writer, context) = hashmap_guard.get(stream_name)?;
let writer = writer.lock().unwrap();
match &*writer {
StreamWriter::Mem(mem) => Some(ReadBuf {
time: context.time,
buf: mem.recordbatch_cloned(),
}),
StreamWriter::Disk(_) => None,
}
}
}

pub mod errors {
Expand Down
1 change: 0 additions & 1 deletion server/src/event/writer/mem_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl<const N: usize> MemWriter<N> {
self.mutable_buffer.push(rb)
}

#[allow(unused)]
pub fn recordbatch_cloned(&self) -> Vec<RecordBatch> {
let mut read_buffer = self.read_buffer.clone();
let rb = self.mutable_buffer.recordbatch_cloned();
Expand Down
43 changes: 36 additions & 7 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

pub mod table_provider;

use chrono::TimeZone;
use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
Expand All @@ -27,13 +29,16 @@ use serde_json::Value;
use std::path::Path;
use std::sync::Arc;

use crate::event::STREAM_WRITERS;
use crate::option::CONFIG;
use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS};
use crate::storage::ObjectStorageError;
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
use crate::utils::TimePeriod;
use crate::validator;

use self::error::{ExecuteError, ParseError};
use self::table_provider::QueryTableProvider;

type Key = &'static str;
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
Expand Down Expand Up @@ -73,10 +78,6 @@ impl Query {
.collect()
}

pub fn get_schema(&self) -> &Schema {
&self.schema
}

/// Execute query on object storage(and if necessary on cache as well) with given stream information
/// TODO: find a way to query all selected parquet files together in a single context.
pub async fn execute(
Expand All @@ -88,9 +89,13 @@ impl Query {
CONFIG.storage().get_datafusion_runtime(),
);

let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else {
return Ok((Vec::new(), Vec::new()));
};
let prefixes = self.get_prefixes();
let table = QueryTableProvider::new(
prefixes,
storage,
get_all_read_buf(&self.stream_name, self.start, self.end),
Arc::clone(&self.schema),
);

ctx.register_table(
&*self.stream_name,
Expand Down Expand Up @@ -176,6 +181,30 @@ pub mod error {
}
}

fn get_all_read_buf(stream_name: &str, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<ReadBuf> {
let now = Utc::now();
let include_mutable = start <= now && now <= end;
// copy from mutable buffer
let mut queryable_read_buffer = Vec::new();

if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) {
for read_buffer in mem {
let time = read_buffer.time;
if start.naive_utc() <= time && time <= end.naive_utc() {
queryable_read_buffer.push(read_buffer.clone())
}
}
}

if include_mutable {
if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) {
queryable_read_buffer.push(x);
}
}

queryable_read_buffer
}

#[cfg(test)]
mod tests {
use super::time_from_path;
Expand Down
143 changes: 143 additions & 0 deletions server/src/query/table_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use async_trait::async_trait;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::TableType;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use std::any::Any;
use std::sync::Arc;

use crate::storage::staging::ReadBuf;
use crate::storage::ObjectStorage;
use crate::utils::arrow::adapt_batch;

pub struct QueryTableProvider {
storage_prefixes: Vec<String>,
storage: Arc<dyn ObjectStorage + Send>,
readable_buffer: Vec<ReadBuf>,
schema: Arc<Schema>,
}

impl QueryTableProvider {
pub fn new(
storage_prefixes: Vec<String>,
storage: Arc<dyn ObjectStorage + Send>,
readable_buffer: Vec<ReadBuf>,
schema: Arc<Schema>,
) -> Self {
Self {
storage_prefixes,
storage,
readable_buffer,
schema,
}
}

async fn create_physical_plan(
&self,
ctx: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let memexec = self.get_mem_exec(ctx, projection, filters, limit).await?;
let table = self
.storage
.query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?;

let mut exec = Vec::new();
if let Some(memexec) = memexec {
exec.push(memexec);
}

if let Some(ref storage_listing) = table {
exec.push(
storage_listing
.scan(ctx, projection, filters, limit)
.await?,
);
}

if exec.is_empty() {
Ok(Arc::new(EmptyExec::new(false, Arc::clone(&self.schema))))
} else {
Ok(Arc::new(UnionExec::new(exec)))
}
}

async fn get_mem_exec(
&self,
ctx: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
if self.readable_buffer.is_empty() {
return Ok(None);
}

let mem_records: Vec<Vec<_>> = self
.readable_buffer
.iter()
.map(|r| {
r.buf
.iter()
.cloned()
.map(|rb| adapt_batch(&self.schema, rb))
.collect()
})
.collect();

let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
let memexec = memtable.scan(ctx, projection, filters, limit).await?;
Ok(Some(memexec))
}
}

#[async_trait]
impl TableProvider for QueryTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
ctx: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
self.create_physical_plan(ctx, projection, filters, limit)
.await
}
}
1 change: 1 addition & 0 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub fn take_all_read_bufs() -> Vec<(String, Vec<ReadBuf>)> {
res
}

#[derive(Debug, Clone)]
pub struct ReadBuf {
pub time: NaiveDateTime,
pub buf: Vec<RecordBatch>,
Expand Down