From 868ada37c56e611d1f42e26180039777b903aaf4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 6 Mar 2025 22:50:38 +0530 Subject: [PATCH 1/5] perf: don't construct a tokio runtime for each query --- src/handlers/airplane.rs | 14 +++++++------- src/handlers/http/query.rs | 20 ++++++-------------- src/parseable/mod.rs | 2 +- src/query/mod.rs | 7 +++---- 4 files changed, 17 insertions(+), 26 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index f3e0bfcb8..6bd7e54c7 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -216,13 +216,13 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let stream_name_clone = stream_name.clone(); - let (records, _) = - match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await { - Ok(Ok((records, fields))) => (records, fields), - Ok(Err(e)) => return Err(Status::internal(e.to_string())), - Err(err) => return Err(Status::internal(err.to_string())), - }; + let stream = PARSEABLE + .get_stream(&stream_name) + .map_err(|err| Status::internal(err.to_string()))?; + let (records, _) = query + .execute(&stream) + .await + .map_err(|err| Status::internal(err.to_string()))?; /* * INFO: No returning the schema with the data. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 0d7d0b340..8316cf2fd 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,7 +19,6 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; -use arrow_array::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -40,7 +39,7 @@ use crate::handlers::http::fetch_schema; use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; -use crate::parseable::PARSEABLE; +use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; @@ -131,7 +130,9 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result<(Vec, Vec), QueryError> { - match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { - Ok(Ok(result)) => Ok(result), - Ok(Err(e)) => Err(QueryError::Execute(e)), - Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - } -} - pub async fn get_counts( req: HttpRequest, counts_request: Json, @@ -330,6 +320,8 @@ Description: {0}"# ActixError(#[from] actix_web::Error), #[error("Error: {0}")] Anyhow(#[from] anyhow::Error), + #[error("Error: {0}")] + StreamNotFound(#[from] StreamNotFound) } impl actix_web::ResponseError for QueryError { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 6e4f55e94..14e5a85af 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; -pub use streams::{StreamNotFound, Streams}; +pub use streams::{StreamNotFound, Streams, Stream}; use tracing::error; #[cfg(feature = "kafka")] diff --git a/src/query/mod.rs b/src/query/mod.rs index fa422e9a1..8ca9de5e5 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -53,7 +53,7 @@ use crate::catalog::Snapshot as CatalogSnapshot; use crate::event; use crate::handlers::http::query::QueryError; use crate::option::Mode; -use crate::parseable::PARSEABLE; +use crate::parseable::{Stream, PARSEABLE}; use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}; use crate::utils::time::TimeRange; @@ -129,12 +129,11 @@ impl Query { SessionContext::new_with_state(state) } - #[tokio::main(flavor = "multi_thread")] pub async fn execute( &self, - stream_name: String, + stream: &Stream, ) -> Result<(Vec, Vec), ExecuteError> { - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); + let time_partition = stream.get_time_partition(); let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(&time_partition)) From 090f9a6907493ba8cfd9753420e55b836a253dac Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 7 Mar 2025 14:22:06 +0530 Subject: [PATCH 2/5] recactor: query only expects `time_partition` --- src/alerts/alerts_utils.rs | 4 +++- src/alerts/mod.rs | 12 +++++++----- src/handlers/airplane.rs | 7 ++++--- src/handlers/http/query.rs | 6 +++--- src/query/mod.rs | 27 +++++++++++++-------------- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index c29d61573..878f78644 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -31,6 +31,7 @@ use tracing::trace; use crate::{ alerts::AggregateCondition, + parseable::PARSEABLE, query::{TableScanVisitor, QUERY_SESSION}, rbac::{ map::SessionKey, @@ -137,8 +138,9 @@ async fn execute_base_query( AlertError::CustomError(format!("Table name not found in query- {}", original_query)) })?; + let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); query - .get_dataframe(stream_name) + .get_dataframe(time_partition.as_ref()) .await .map_err(|err| AlertError::CustomError(err.to_string())) } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 66857d686..db22754f5 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -37,7 +37,7 @@ use ulid::Ulid; pub mod alerts_utils; pub mod target; -use crate::parseable::PARSEABLE; +use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::map::SessionKey; use crate::storage; @@ -514,17 +514,16 @@ impl AlertConfig { // for now proceed in a similar fashion as we do in query // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) - let stream_name = if let Some(stream_name) = query.first_table_name() { - stream_name - } else { + let Some(stream_name) = query.first_table_name() else { return Err(AlertError::CustomError(format!( "Table name not found in query- {}", self.query ))); }; + let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); let base_df = query - .get_dataframe(stream_name) + .get_dataframe(time_partition.as_ref()) .await .map_err(|err| AlertError::CustomError(err.to_string()))?; @@ -704,6 +703,8 @@ pub enum AlertError { CustomError(String), #[error("Invalid State Change: {0}")] InvalidStateChange(String), + #[error("{0}")] + StreamNotFound(#[from] StreamNotFound), } impl actix_web::ResponseError for AlertError { @@ -717,6 +718,7 @@ impl actix_web::ResponseError for AlertError { Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::CustomError(_) => StatusCode::BAD_REQUEST, Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST, + Self::StreamNotFound(_) => StatusCode::NOT_FOUND, } } diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 6bd7e54c7..89cd0f97c 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -216,11 +216,12 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let stream = PARSEABLE + let time_partition = PARSEABLE .get_stream(&stream_name) - .map_err(|err| Status::internal(err.to_string()))?; + .map_err(|err| Status::internal(err.to_string()))? + .get_time_partition(); let (records, _) = query - .execute(&stream) + .execute(time_partition.as_ref()) .await .map_err(|err| Status::internal(err.to_string()))?; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 8316cf2fd..51d713e41 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -131,8 +131,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, ) -> Result<(Vec, Vec), ExecuteError> { - let time_partition = stream.get_time_partition(); - let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(&time_partition)) + .execute_logical_plan(self.final_logical_plan(time_partition)) .await?; let fields = df @@ -155,18 +153,19 @@ impl Query { Ok((results, fields)) } - pub async fn get_dataframe(&self, stream_name: String) -> Result { - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); - + pub async fn get_dataframe( + &self, + time_partition: Option<&String>, + ) -> Result { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(&time_partition)) + .execute_logical_plan(self.final_logical_plan(time_partition)) .await?; Ok(df) } /// return logical plan with all time filters applied through - fn final_logical_plan(&self, time_partition: &Option) -> LogicalPlan { + fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -486,7 +485,7 @@ fn transform( plan: LogicalPlan, start_time: NaiveDateTime, end_time: NaiveDateTime, - time_partition: &Option, + time_partition: Option<&String>, ) -> Transformed { plan.transform(&|plan| match plan { LogicalPlan::TableScan(table) => { @@ -544,7 +543,7 @@ fn transform( fn table_contains_any_time_filters( table: &datafusion::logical_expr::TableScan, - time_partition: &Option, + time_partition: Option<&String>, ) -> bool { table .filters @@ -558,8 +557,8 @@ fn table_contains_any_time_filters( }) .any(|expr| { matches!(&*expr.left, Expr::Column(Column { name, .. }) - if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) || - (!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY))) + if (time_partition.is_some_and(|field| field == name) || + (time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY))) }) } From f55dee4d376276301c0acd8294cb6e05994c2701 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 8 Mar 2025 01:00:40 +0530 Subject: [PATCH 3/5] feat: dedicated runtime for all queries --- src/handlers/airplane.rs | 9 ++------- src/handlers/http/query.rs | 5 ++--- src/parseable/mod.rs | 2 +- src/query/mod.rs | 17 +++++++++++++++++ 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 89cd0f97c..1a72c7470 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed}; use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; use crate::parseable::PARSEABLE; -use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::query::{execute, TableScanVisitor, QUERY_SESSION}; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, @@ -216,12 +216,7 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let time_partition = PARSEABLE - .get_stream(&stream_name) - .map_err(|err| Status::internal(err.to_string()))? - .get_time_partition(); - let (records, _) = query - .execute(time_partition.as_ref()) + let (records, _) = execute(query, &stream_name) .await .map_err(|err| Status::internal(err.to_string()))?; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 51d713e41..7d9c33a45 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -41,7 +41,7 @@ use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; -use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery}; +use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -131,8 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result = Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +/// Dedicated multi-threaded runtime to run +pub static QUERY_RUNTIME: Lazy = + Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); + +pub async fn execute( + query: Query, + stream_name: &str, +) -> Result<(Vec, Vec), ExecuteError> { + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); + QUERY_RUNTIME + .spawn(async move { query.execute(time_partition.as_ref()).await }) + .await + .expect("The Join should have been successful") +} + // A query request by client #[derive(Debug)] pub struct Query { @@ -150,6 +166,7 @@ impl Query { } let results = df.collect().await?; + Ok((results, fields)) } From 3ad7b487269ff55b3cfbc1ddcbe7e2688a84620b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 8 Mar 2025 01:14:23 +0530 Subject: [PATCH 4/5] doc: explain the change --- src/query/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index b2d90131f..7ab8bb4bf 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -61,10 +61,13 @@ use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); -/// Dedicated multi-threaded runtime to run +/// Dedicated multi-threaded runtime to run all queries on pub static QUERY_RUNTIME: Lazy = Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); + +/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single CPU +/// at a time and has access to the entire thread, enabling better concurrent processing, and thus quicker results. pub async fn execute( query: Query, stream_name: &str, From ec21e29bae805789126e1db482208b91c89699f5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 8 Mar 2025 01:17:32 +0530 Subject: [PATCH 5/5] fix explain Signed-off-by: Devdutt Shenoi --- src/query/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 7ab8bb4bf..9fb50b94e 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -66,8 +66,8 @@ pub static QUERY_RUNTIME: Lazy = Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); -/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single CPU -/// at a time and has access to the entire thread, enabling better concurrent processing, and thus quicker results. +/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU +/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results. pub async fn execute( query: Query, stream_name: &str,