Skip to content

Commit 3f38a77

Browse files
get count(*) without filters from datebin implementation
avoid datafusion
1 parent 93b5f24 commit 3f38a77

File tree

4 files changed

+197
-202
lines changed

4 files changed

+197
-202
lines changed

src/handlers/http/modal/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,8 @@ impl Server {
268268
)
269269
}
270270
pub fn get_date_bin() -> Resource {
271-
web::resource("/datebin").route(web::post().to(query::get_date_bin).authorize(Action::Query)
272-
)
271+
web::resource("/datebin")
272+
.route(web::post().to(query::get_date_bin).authorize(Action::Query))
273273
}
274274

275275
// get the query factory

src/handlers/http/query.rs

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@
1818

1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
21-
use actix_web::{FromRequest, HttpRequest, Responder};
21+
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
2222
use bytes::Bytes;
2323
use chrono::{DateTime, Utc};
2424
use datafusion::common::tree_node::TreeNode;
25+
use datafusion::common::Column;
2526
use datafusion::error::DataFusionError;
2627
use datafusion::execution::context::SessionState;
28+
use datafusion::logical_expr::expr::Alias;
29+
use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection};
30+
use datafusion::prelude::Expr;
2731
use futures_util::Future;
2832
use http::StatusCode;
33+
use serde_json::{json, Value};
2934
use std::collections::HashMap;
3035
use std::pin::Pin;
3136
use std::sync::Arc;
@@ -67,32 +72,31 @@ pub struct Query {
6772
pub filter_tags: Option<Vec<String>>,
6873
}
6974

70-
7175
/// DateBin Request.
7276
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
7377
#[serde(rename_all = "camelCase")]
7478
pub struct DateBin {
7579
pub stream: String,
7680
pub start_time: String,
7781
pub end_time: String,
78-
pub num_bins: u64
82+
pub num_bins: u64,
7983
}
8084

8185
/// DateBinRecord
8286
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
8387
pub struct DateBinRecord {
8488
pub date_bin_timestamp: String,
85-
pub log_count: u64
89+
pub log_count: u64,
8690
}
8791

8892
/// DateBin Response.
8993
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
9094
pub struct DateBinResponse {
9195
pub fields: Vec<String>,
92-
pub records: Vec<DateBinRecord>
96+
pub records: Vec<DateBinRecord>,
9397
}
9498

95-
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
99+
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
96100
let session_state = QUERY_SESSION.state();
97101
let raw_logical_plan = match session_state
98102
.create_logical_plan(&query_request.query)
@@ -107,11 +111,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
107111
.await?
108112
}
109113
};
110-
111114
let time_range =
112115
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
113116

114-
// create a visitor to extract the table names present in query
117+
// Create a visitor to extract the table names present in query
115118
let mut visitor = TableScanVisitor::default();
116119
let _ = raw_logical_plan.visit(&mut visitor);
117120

@@ -125,10 +128,36 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
125128
let table_name = query
126129
.first_table_name()
127130
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
131+
let time = Instant::now();
132+
133+
if let (true, column_name) = is_logical_plan_aggregate_without_filters(&raw_logical_plan) {
134+
let date_bin_request = DateBin {
135+
stream: table_name.clone(),
136+
start_time: query_request.start_time.clone(),
137+
end_time: query_request.end_time.clone(),
138+
num_bins: 1,
139+
};
140+
let date_bin_records = date_bin_request.get_bin_density().await?;
141+
let response = if query_request.fields {
142+
json!({
143+
"fields": vec![&column_name],
144+
"records": vec![json!({&column_name: date_bin_records[0].log_count})]
145+
})
146+
} else {
147+
Value::Array(vec![json!({&column_name: date_bin_records[0].log_count})])
148+
};
149+
150+
let time = time.elapsed().as_secs_f64();
151+
152+
QUERY_EXECUTE_TIME
153+
.with_label_values(&[&table_name])
154+
.observe(time);
155+
156+
return Ok(HttpResponse::Ok().json(response));
157+
}
128158

129159
user_auth_for_query(&permissions, &tables)?;
130160

131-
let time = Instant::now();
132161
let (records, fields) = query.execute(table_name.clone()).await?;
133162

134163
let response = QueryResponse {
@@ -149,16 +178,15 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
149178
}
150179

151180
pub async fn get_date_bin(req: HttpRequest, body: Bytes) -> Result<impl Responder, QueryError> {
152-
153-
let date_bin_request: DateBin = serde_json::from_slice(&body)
154-
.map_err(|err| anyhow::Error::msg(err.to_string()))?;
181+
let date_bin_request: DateBin =
182+
serde_json::from_slice(&body).map_err(|err| anyhow::Error::msg(err.to_string()))?;
155183

156184
let creds = extract_session_key_from_req(&req)?;
157185
let permissions = Users.get_permissions(&creds);
158186

159187
// does user have access to table?
160188
user_auth_for_query(&permissions, &[date_bin_request.stream.clone()])?;
161-
189+
162190
let date_bin_records = date_bin_request.get_bin_density().await?;
163191

164192
Ok(web::Json(DateBinResponse {
@@ -198,6 +226,37 @@ pub async fn create_streams_for_querier() {
198226
}
199227
}
200228

229+
fn is_logical_plan_aggregate_without_filters(plan: &LogicalPlan) -> (bool, String) {
230+
match plan {
231+
LogicalPlan::Projection(Projection { input, expr, .. }) => {
232+
if let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input {
233+
if let LogicalPlan::TableScan { .. } = &**input {
234+
if expr.len() == 1 {
235+
return match &expr[0] {
236+
Expr::Column(Column { name, .. }) => (name == "count(*)", name.clone()),
237+
Expr::Alias(Alias {
238+
expr: inner_expr,
239+
name,
240+
..
241+
}) => {
242+
let alias_name = name;
243+
if let Expr::Column(Column { name, .. }) = &**inner_expr {
244+
(name == "count(*)", alias_name.to_string())
245+
} else {
246+
(false, "".to_string())
247+
}
248+
}
249+
_ => (false, "".to_string()),
250+
};
251+
}
252+
}
253+
}
254+
}
255+
_ => return (false, "".to_string()),
256+
}
257+
(false, "".to_string())
258+
}
259+
201260
impl FromRequest for Query {
202261
type Error = actix_web::Error;
203262
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;

0 commit comments

Comments
 (0)