Skip to content

Commit e7c1ec5

Browse files
Devdutt Shenoinikhilsinhaparseable
authored andcommitted
refactor: move code closer to data
1 parent 93debf5 commit e7c1ec5

File tree

2 files changed

+45
-37
lines changed

2 files changed

+45
-37
lines changed

src/handlers/http/query.rs

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,8 @@ use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
2222
use chrono::{DateTime, Utc};
2323
use datafusion::common::tree_node::TreeNode;
24-
use datafusion::common::Column;
2524
use datafusion::error::DataFusionError;
2625
use datafusion::execution::context::SessionState;
27-
use datafusion::logical_expr::expr::Alias;
28-
use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection};
29-
use datafusion::prelude::Expr;
3026
use futures_util::Future;
3127
use http::StatusCode;
3228
use serde::{Deserialize, Serialize};
@@ -112,7 +108,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
112108
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
113109
let time = Instant::now();
114110

115-
if let (true, column_name) = is_logical_plan_aggregate_without_filters(&raw_logical_plan) {
111+
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
116112
let date_bin_request = DateBinRequest {
117113
stream: table_name.clone(),
118114
start_time: query_request.start_time.clone(),
@@ -123,10 +119,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
123119
let response = if query_request.fields {
124120
json!({
125121
"fields": vec![&column_name],
126-
"records": vec![json!({&column_name: date_bin_records[0].log_count})]
122+
"records": vec![json!({column_name: date_bin_records[0].log_count})]
127123
})
128124
} else {
129-
Value::Array(vec![json!({&column_name: date_bin_records[0].log_count})])
125+
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
130126
};
131127

132128
let time = time.elapsed().as_secs_f64();
@@ -208,35 +204,6 @@ pub async fn create_streams_for_querier() {
208204
}
209205
}
210206

211-
fn is_logical_plan_aggregate_without_filters(plan: &LogicalPlan) -> (bool, String) {
212-
match plan {
213-
LogicalPlan::Projection(Projection { input, expr, .. }) => {
214-
if let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input {
215-
if matches!(&**input, LogicalPlan::TableScan { .. }) && expr.len() == 1 {
216-
return match &expr[0] {
217-
Expr::Column(Column { name, .. }) => (name == "count(*)", name.clone()),
218-
Expr::Alias(Alias {
219-
expr: inner_expr,
220-
name,
221-
..
222-
}) => {
223-
let alias_name = name;
224-
if let Expr::Column(Column { name, .. }) = &**inner_expr {
225-
(name == "count(*)", alias_name.to_string())
226-
} else {
227-
(false, "".to_string())
228-
}
229-
}
230-
_ => (false, "".to_string()),
231-
};
232-
}
233-
}
234-
}
235-
_ => return (false, "".to_string()),
236-
}
237-
(false, "".to_string())
238-
}
239-
240207
impl FromRequest for Query {
241208
type Error = actix_web::Error;
242209
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;

src/query/mod.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tr
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::disk_manager::DiskManagerConfig;
3030
use datafusion::execution::SessionStateBuilder;
31-
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
31+
use datafusion::logical_expr::expr::Alias;
32+
use datafusion::logical_expr::{
33+
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
34+
};
3235
use datafusion::prelude::*;
3336
use itertools::Itertools;
3437
use once_cell::sync::Lazy;
@@ -200,6 +203,44 @@ impl Query {
200203
let _ = self.raw_logical_plan.visit(&mut visitor);
201204
visitor.into_inner().pop()
202205
}
206+
207+
/// Evaluates to Some("count(*)") | Some("column_name") if the logical plan is a Projection: SELECT COUNT(*) | SELECT COUNT(*) as column_name
208+
pub fn is_logical_plan_count_without_filters(&self) -> Option<&String> {
209+
// Check if the raw logical plan is a Projection: SELECT
210+
let LogicalPlan::Projection(Projection { input, expr, .. }) = &self.raw_logical_plan else {
211+
return None;
212+
};
213+
// Check if the input of the Projection is an Aggregate: COUNT(*)
214+
let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input else {
215+
return None;
216+
};
217+
218+
// Ensure the input of the Aggregate is a TableScan and there is exactly one expression: SELECT COUNT(*)
219+
if !matches!(&**input, LogicalPlan::TableScan { .. }) || expr.len() == 1 {
220+
return None;
221+
}
222+
223+
// Check if the expression is a column or an alias for COUNT(*)
224+
match &expr[0] {
225+
// Direct column check
226+
Expr::Column(Column { name, .. }) if name == "count(*)" => Some(name),
227+
// Alias for COUNT(*)
228+
Expr::Alias(Alias {
229+
expr: inner_expr,
230+
name: alias_name,
231+
..
232+
}) => {
233+
if let Expr::Column(Column { name, .. }) = &**inner_expr {
234+
if name == "count(*)" {
235+
return Some(alias_name);
236+
}
237+
}
238+
None
239+
}
240+
// Unsupported expression type
241+
_ => None,
242+
}
243+
}
203244
}
204245

205246
/// DateBinRecord

0 commit comments

Comments
 (0)