From 56391bd0afe4d51cdc5ce83a1245a5f3fb344d97 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 17 Feb 2025 16:00:30 +0530 Subject: [PATCH 1/2] fix: consider current time --- src/query/stream_schema_provider.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 1bf34e701..d943d444a 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -739,14 +739,12 @@ pub fn include_now(filters: &[Expr], time_partition: &Option) -> bool { let time_filters = extract_primary_filter(filters, time_partition); - let upper_bound_matches = time_filters.iter().any(|filter| match filter { + if time_filters.iter().any(|filter| match filter { PartialTimeFilter::High(Bound::Excluded(time)) | PartialTimeFilter::High(Bound::Included(time)) - | PartialTimeFilter::Eq(time) => time > ¤t_minute, + | PartialTimeFilter::Eq(time) => time >= ¤t_minute, _ => false, - }); - - if upper_bound_matches { + }) { return true; } From e005ddf76c5408b3dc8f47c314662434456b7cda Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 14:25:04 +0530 Subject: [PATCH 2/2] fix: consider staging for upto 5 minutes --- src/query/stream_schema_provider.rs | 16 ++++++++-------- src/utils/arrow/flight.rs | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index d943d444a..81a1bd85c 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -27,7 +27,7 @@ use crate::{ use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; -use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc}; use datafusion::catalog::Session; use datafusion::common::stats::Precision; use datafusion::logical_expr::utils::conjunction; @@ -442,7 +442,7 @@ impl TableProvider for StandardTableProvider { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters, &time_partition) { + if is_within_staging_window(&time_filters) { if let Ok(staging) = PARSEABLE.get_stream(&self.stream) { let records = staging.recordbatches_cloned(&self.schema); let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; @@ -730,19 +730,19 @@ fn return_listing_time_filters( } } -pub fn include_now(filters: &[Expr], time_partition: &Option) -> bool { - let current_minute = Utc::now() +/// We should consider data in staging for queries concerning a time period, +/// ending within 5 minutes from now. e.g. If current time is 5 +pub fn is_within_staging_window(time_filters: &[PartialTimeFilter]) -> bool { + let five_minutes_back = (Utc::now() - TimeDelta::minutes(5)) .with_second(0) .and_then(|x| x.with_nanosecond(0)) .expect("zeroed value is valid") .naive_utc(); - let time_filters = extract_primary_filter(filters, time_partition); - if time_filters.iter().any(|filter| match filter { PartialTimeFilter::High(Bound::Excluded(time)) | PartialTimeFilter::High(Bound::Included(time)) - | PartialTimeFilter::Eq(time) => time >= ¤t_minute, + | PartialTimeFilter::Eq(time) => time >= &five_minutes_back, _ => false, }) { return true; @@ -826,7 +826,7 @@ pub async fn collect_manifest_files( } // Extract start time and end time from filter predicate -fn extract_primary_filter( +pub fn extract_primary_filter( filters: &[Expr], time_partition: &Option, ) -> Vec { diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 46336beb2..c8d2dacf2 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -20,7 +20,7 @@ use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; use crate::handlers::http::query::Query as QueryJson; use crate::parseable::PARSEABLE; -use crate::query::stream_schema_provider::include_now; +use crate::query::stream_schema_provider::{extract_primary_filter, is_within_staging_window}; use crate::{handlers::http::modal::IngestorMetadata, option::Mode}; use arrow_array::RecordBatch; @@ -131,9 +131,9 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { datafusion::logical_expr::Operator::Lt, Box::new(filter_end), ); - let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)]; - - PARSEABLE.options.mode == Mode::Query && include_now(&ex, &None) + let time_filters = + extract_primary_filter(&[Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)], &None); + PARSEABLE.options.mode == Mode::Query && is_within_staging_window(&time_filters) } fn lit_timestamp_milli(time: i64) -> Expr {