@@ -42,6 +42,7 @@ use std::ops::Bound;
4242use std:: sync:: Arc ;
4343use stream_schema_provider:: collect_manifest_files;
4444use sysinfo:: System ;
45+ use tokio:: runtime:: Runtime ;
4546
4647use self :: error:: ExecuteError ;
4748use self :: stream_schema_provider:: GlobalSchemaProvider ;
@@ -60,6 +61,24 @@ use crate::utils::time::TimeRange;
6061pub static QUERY_SESSION : Lazy < SessionContext > =
6162 Lazy :: new ( || Query :: create_session_context ( PARSEABLE . storage ( ) ) ) ;
6263
64+ /// Dedicated multi-threaded runtime to run all queries on
65+ pub static QUERY_RUNTIME : Lazy < Runtime > =
66+ Lazy :: new ( || Runtime :: new ( ) . expect ( "Runtime should be constructible" ) ) ;
67+
68+
69+ /// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
70+ /// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
71+ pub async fn execute (
72+ query : Query ,
73+ stream_name : & str ,
74+ ) -> Result < ( Vec < RecordBatch > , Vec < String > ) , ExecuteError > {
75+ let time_partition = PARSEABLE . get_stream ( stream_name) ?. get_time_partition ( ) ;
76+ QUERY_RUNTIME
77+ . spawn ( async move { query. execute ( time_partition. as_ref ( ) ) . await } )
78+ . await
79+ . expect ( "The Join should have been successful" )
80+ }
81+
6382// A query request by client
6483#[ derive( Debug ) ]
6584pub struct Query {
@@ -129,15 +148,12 @@ impl Query {
129148 SessionContext :: new_with_state ( state)
130149 }
131150
132- #[ tokio:: main( flavor = "multi_thread" ) ]
133151 pub async fn execute (
134152 & self ,
135- stream_name : String ,
153+ time_partition : Option < & String > ,
136154 ) -> Result < ( Vec < RecordBatch > , Vec < String > ) , ExecuteError > {
137- let time_partition = PARSEABLE . get_stream ( & stream_name) ?. get_time_partition ( ) ;
138-
139155 let df = QUERY_SESSION
140- . execute_logical_plan ( self . final_logical_plan ( & time_partition) )
156+ . execute_logical_plan ( self . final_logical_plan ( time_partition) )
141157 . await ?;
142158
143159 let fields = df
@@ -153,21 +169,23 @@ impl Query {
153169 }
154170
155171 let results = df. collect ( ) . await ?;
172+
156173 Ok ( ( results, fields) )
157174 }
158175
159- pub async fn get_dataframe ( & self , stream_name : String ) -> Result < DataFrame , ExecuteError > {
160- let time_partition = PARSEABLE . get_stream ( & stream_name) ?. get_time_partition ( ) ;
161-
176+ pub async fn get_dataframe (
177+ & self ,
178+ time_partition : Option < & String > ,
179+ ) -> Result < DataFrame , ExecuteError > {
162180 let df = QUERY_SESSION
163- . execute_logical_plan ( self . final_logical_plan ( & time_partition) )
181+ . execute_logical_plan ( self . final_logical_plan ( time_partition) )
164182 . await ?;
165183
166184 Ok ( df)
167185 }
168186
169187 /// return logical plan with all time filters applied through
170- fn final_logical_plan ( & self , time_partition : & Option < String > ) -> LogicalPlan {
188+ fn final_logical_plan ( & self , time_partition : Option < & String > ) -> LogicalPlan {
171189 // see https://github.com/apache/arrow-datafusion/pull/8400
172190 // this can be eliminated in later version of datafusion but with slight caveat
173191 // transform cannot modify stringified plans by itself
@@ -487,7 +505,7 @@ fn transform(
487505 plan : LogicalPlan ,
488506 start_time : NaiveDateTime ,
489507 end_time : NaiveDateTime ,
490- time_partition : & Option < String > ,
508+ time_partition : Option < & String > ,
491509) -> Transformed < LogicalPlan > {
492510 plan. transform ( & |plan| match plan {
493511 LogicalPlan :: TableScan ( table) => {
@@ -545,7 +563,7 @@ fn transform(
545563
546564fn table_contains_any_time_filters (
547565 table : & datafusion:: logical_expr:: TableScan ,
548- time_partition : & Option < String > ,
566+ time_partition : Option < & String > ,
549567) -> bool {
550568 table
551569 . filters
@@ -559,8 +577,8 @@ fn table_contains_any_time_filters(
559577 } )
560578 . any ( |expr| {
561579 matches ! ( & * expr. left, Expr :: Column ( Column { name, .. } )
562- if ( ( time_partition. is_some ( ) && name == time_partition . as_ref ( ) . unwrap ( ) ) ||
563- ( ! time_partition. is_some ( ) && name == event:: DEFAULT_TIMESTAMP_KEY ) ) )
580+ if ( time_partition. is_some_and ( |field| field == name ) ||
581+ ( time_partition. is_none ( ) && name == event:: DEFAULT_TIMESTAMP_KEY ) ) )
564582 } )
565583}
566584
0 commit comments