From 18089cc102883fd7f113c92703880c8e890861fa Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 7 Aug 2023 11:29:59 +0530 Subject: [PATCH] Check memtable against current minute when loading --- server/src/query.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 8997ccdf7..b1ac3cfbe 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -19,8 +19,8 @@ mod filter_optimizer; mod table_provider; -use chrono::TimeZone; use chrono::{DateTime, Utc}; +use chrono::{TimeZone, Timelike}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -128,8 +128,18 @@ impl Query { ) -> Result<(Vec, Vec), ExecuteError> { let ctx = self.create_session_context(); let remote_listing_table = self._remote_query(storage)?; - let memtable = - crate::event::STREAM_WRITERS.recordbatches_cloned(&self.stream_name, &self.schema); + + let current_minute = Utc::now() + .with_second(0) + .and_then(|x| x.with_nanosecond(0)) + .expect("zeroed value is valid"); + + let memtable = if self.end > current_minute { + crate::event::STREAM_WRITERS.recordbatches_cloned(&self.stream_name, &self.schema) + } else { + None + }; + let table = QueryTableProvider::try_new(memtable, remote_listing_table, self.schema.clone())?;