@@ -34,6 +34,7 @@ use crate::event::error::EventError;
3434use crate :: handlers:: http:: fetch_schema;
3535
3636use crate :: event:: commit_schema;
37+ use crate :: handlers:: { CACHE_RESULTS_HEADER_KEY , CACHE_VIEW_HEADER_KEY } ;
3738use crate :: localcache:: CacheError ;
3839use crate :: metrics:: QUERY_EXECUTE_TIME ;
3940use crate :: option:: { Mode , CONFIG } ;
@@ -74,36 +75,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
7475 // create a visitor to extract the table name
7576 let mut visitor = TableScanVisitor :: default ( ) ;
7677 let _ = raw_logical_plan. visit ( & mut visitor) ;
77- let stream = visitor. top ( ) ;
78+ let stream = visitor
79+ . top ( )
80+ . ok_or_else ( || QueryError :: MalformedQuery ( "Table Name not found in SQL" ) ) ?;
81+
7882 let query_cache_manager = QueryCacheManager :: global ( CONFIG . parseable . query_cache_size )
7983 . await
8084 . unwrap_or ( None ) ;
8185
82- if let Some ( query_cache_manager) = query_cache_manager {
83- let mut query_cache = query_cache_manager. get_cache ( stream) . await ?;
84-
85- let ( start, end) = parse_human_time ( & query_request. start_time , & query_request. end_time ) ?;
86- let key = format ! (
87- "{}-{}-{}" ,
88- start. to_rfc3339( ) ,
89- end. to_rfc3339( ) ,
90- query_request. query. clone( )
91- ) ;
92-
93- let file_path = query_cache. get_file ( key) ;
94- if let Some ( file_path) = file_path {
95- let ( records, fields) = query_cache. get_cached_records ( & file_path) . await ?;
96- let response = QueryResponse {
97- records,
98- fields,
99- fill_null : query_request. send_null ,
100- with_fields : query_request. fields ,
101- }
102- . to_http ( ) ?;
86+ let cache_results = req
87+ . headers ( )
88+ . iter ( )
89+ . find ( |& ( key, _) | key == CACHE_RESULTS_HEADER_KEY ) ;
90+
91+ let show_cached = req
92+ . headers ( )
93+ . iter ( )
94+ . find ( |& ( key, _) | key == CACHE_VIEW_HEADER_KEY ) ;
95+
96+ match ( show_cached, query_cache_manager) {
97+ ( None , None ) => { }
98+ ( None , Some ( _) ) => { }
99+ ( Some ( _) , None ) => {
100+ log:: warn!(
101+ "Instructed to show cached results but Query Caching is not Enabledon Server"
102+ ) ;
103+ }
104+ ( Some ( _) , Some ( query_cache_manager) ) => {
105+ let mut query_cache = query_cache_manager. get_cache ( stream) . await ?;
106+
107+ let ( start, end) =
108+ parse_human_time ( & query_request. start_time , & query_request. end_time ) ?;
109+ let key = format ! (
110+ "{}-{}-{}" ,
111+ start. to_rfc3339( ) ,
112+ end. to_rfc3339( ) ,
113+ query_request. query. clone( )
114+ ) ;
115+
116+ let file_path = query_cache. get_file ( key) ;
117+ if let Some ( file_path) = file_path {
118+ let ( records, fields) = query_cache. get_cached_records ( & file_path) . await ?;
119+ let response = QueryResponse {
120+ records,
121+ fields,
122+ fill_null : query_request. send_null ,
123+ with_fields : query_request. fields ,
124+ }
125+ . to_http ( ) ?;
103126
104- return Ok ( response) ;
127+ return Ok ( response) ;
128+ }
105129 }
106- } ;
130+ }
107131
108132 let tables = visitor. into_inner ( ) ;
109133
@@ -125,23 +149,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
125149
126150 let table_name = query
127151 . first_table_name ( )
128- . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" . to_string ( ) ) ) ?;
152+ . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" ) ) ?;
129153 authorize_and_set_filter_tags ( & mut query, permissions, & table_name) ?;
130154
131155 let time = Instant :: now ( ) ;
132156
133157 let ( records, fields) = query. execute ( table_name. clone ( ) ) . await ?;
134- // put the rbs to parquet
135- if let Some ( query_cache_manager) = query_cache_manager {
136- query_cache_manager
137- . create_parquet_cache (
138- & table_name,
139- & records,
140- query. start . to_rfc3339 ( ) ,
141- query. end . to_rfc3339 ( ) ,
142- query_request. query ,
143- )
144- . await ?;
158+
159+ match ( cache_results, query_cache_manager) {
160+ ( None , None ) => { }
161+ ( None , Some ( _) ) => { }
162+ ( Some ( _) , None ) => {
163+ log:: warn!(
164+ "Instructed to cache query results but Query Caching is not Enabled in Server"
165+ ) ;
166+ }
167+ // do cache
168+ ( Some ( _) , Some ( query_cache_manager) ) => {
169+ query_cache_manager
170+ . create_parquet_cache (
171+ & table_name,
172+ & records,
173+ query. start . to_rfc3339 ( ) ,
174+ query. end . to_rfc3339 ( ) ,
175+ query_request. query ,
176+ )
177+ . await ?
178+ }
145179 }
146180
147181 let response = QueryResponse {
@@ -348,7 +382,7 @@ pub enum QueryError {
348382 #[ error( "Evern Error: {0}" ) ]
349383 EventError ( #[ from] EventError ) ,
350384 #[ error( "Error: {0}" ) ]
351- MalformedQuery ( String ) ,
385+ MalformedQuery ( & ' static str ) ,
352386 #[ error(
353387 r#"Error: Failed to Parse Record Batch into Json
354388Description: {0}"#
0 commit comments