From 82f7799bc87aec7fd913b359fe994927fef2b097 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 14 Dec 2023 13:18:27 +0530 Subject: [PATCH] Transform with table reference --- server/src/query.rs | 131 ++++++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 58 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index cf89a463f..663873dae 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -21,8 +21,8 @@ mod listing_table_builder; mod stream_schema_provider; mod table_provider; -use chrono::TimeZone; use chrono::{DateTime, Utc}; +use chrono::{NaiveDateTime, TimeZone}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; @@ -122,61 +122,7 @@ impl Query { /// return logical plan with all time filters applied through fn final_logical_plan(&self) -> LogicalPlan { - fn tag_filter(filters: Vec) -> Option { - filters - .iter() - .map(|literal| { - Expr::Column(Column::from_name(event::DEFAULT_TAGS_KEY)) - .like(lit(format!("%{}%", literal))) - }) - .reduce(or) - } - - fn transform( - plan: LogicalPlan, - start_time_filter: Expr, - end_time_filter: Expr, - filters: Option, - ) -> LogicalPlan { - plan.transform(&|plan| match plan { - LogicalPlan::TableScan(table) => { - let mut new_filters = vec![]; - if !table.filters.iter().any(|expr| { - let Expr::BinaryExpr(binexpr) = expr else {return false}; - matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == event::DEFAULT_TIMESTAMP_KEY) - }) { - new_filters.push(start_time_filter.clone()); - new_filters.push(end_time_filter.clone()); - } - - if let Some(tag_filters) = filters.clone() { - new_filters.push(tag_filters) - } - - let new_filter = new_filters.into_iter().reduce(and); - - if let Some(new_filter) = new_filter { - let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); - Ok(Transformed::Yes(LogicalPlan::Filter(filter))) - } else { - Ok(Transformed::No(LogicalPlan::TableScan(table))) - } - }, - x => Ok(Transformed::No(x)), - }) - .expect("transform only transforms the tablescan") - } - let filters = self.filter_tag.clone().and_then(tag_filter); - let start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(self.start.naive_utc())).binary_expr( - Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)), - ); - let end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(self.end.naive_utc())).binary_expr( - Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)), - ); - // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -185,8 +131,8 @@ impl Query { LogicalPlan::Explain(plan) => { let transformed = transform( plan.plan.as_ref().clone(), - start_time_filter, - end_time_filter, + self.start.naive_utc(), + self.end.naive_utc(), filters, ); LogicalPlan::Explain(Explain { @@ -199,7 +145,7 @@ impl Query { logical_optimization_succeeded: plan.logical_optimization_succeeded, }) } - x => transform(x, start_time_filter, end_time_filter, filters), + x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters), } } @@ -235,6 +181,75 @@ impl TreeNodeVisitor for TableScanVisitor { } } +fn tag_filter(filters: Vec) -> Option { + filters + .iter() + .map(|literal| { + Expr::Column(Column::from_name(event::DEFAULT_TAGS_KEY)) + .like(lit(format!("%{}%", literal))) + }) + .reduce(or) +} + +fn transform( + plan: LogicalPlan, + start_time: NaiveDateTime, + end_time: NaiveDateTime, + filters: Option, +) -> LogicalPlan { + plan.transform(&|plan| match plan { + LogicalPlan::TableScan(table) => { + let mut new_filters = vec![]; + if !table_contains_any_time_filters(&table) { + let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( + start_time, + )) + .binary_expr(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + new_filters.push(start_time_filter); + new_filters.push(end_time_filter); + } + + if let Some(tag_filters) = filters.clone() { + new_filters.push(tag_filters) + } + + let new_filter = new_filters.into_iter().reduce(and); + + if let Some(new_filter) = new_filter { + let filter = + Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); + Ok(Transformed::Yes(LogicalPlan::Filter(filter))) + } else { + Ok(Transformed::No(LogicalPlan::TableScan(table))) + } + } + x => Ok(Transformed::No(x)), + }) + .expect("transform only transforms the tablescan") +} + +fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan) -> bool { + table + .filters + .iter() + .filter_map(|x| { + if let Expr::BinaryExpr(binexpr) = x { + Some(binexpr) + } else { + None + } + }) + .any(|expr| matches!(&*expr.left, Expr::Column(Column { name, .. }) if (name == event::DEFAULT_TIMESTAMP_KEY))) +} + #[allow(dead_code)] fn get_staging_prefixes( stream_name: &str,