1919use actix_web:: http:: header:: ContentType ;
2020use actix_web:: web:: { self , Json } ;
2121use actix_web:: { FromRequest , HttpRequest , Responder } ;
22+ use anyhow:: anyhow;
2223use chrono:: { DateTime , Utc } ;
2324use datafusion:: common:: tree_node:: TreeNode ;
2425use datafusion:: error:: DataFusionError ;
2526use datafusion:: execution:: context:: SessionState ;
2627use futures_util:: Future ;
27- use http:: StatusCode ;
28+ use http:: { HeaderValue , StatusCode } ;
2829use std:: collections:: HashMap ;
2930use std:: pin:: Pin ;
3031use std:: sync:: Arc ;
3132use std:: time:: Instant ;
3233
3334use crate :: event:: error:: EventError ;
3435use crate :: handlers:: http:: fetch_schema;
36+ use arrow_array:: RecordBatch ;
3537
3638use crate :: event:: commit_schema;
37- use crate :: handlers:: { CACHE_RESULTS_HEADER_KEY , CACHE_VIEW_HEADER_KEY } ;
39+ use crate :: handlers:: { CACHE_RESULTS_HEADER_KEY , CACHE_VIEW_HEADER_KEY , USER_ID_HEADER_KEY } ;
3840use crate :: localcache:: CacheError ;
3941use crate :: metrics:: QUERY_EXECUTE_TIME ;
4042use crate :: option:: { Mode , CONFIG } ;
@@ -83,100 +85,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8385 . await
8486 . unwrap_or ( None ) ;
8587
86- let cache_results = req
88+ let cache_results = req. headers ( ) . get ( CACHE_RESULTS_HEADER_KEY ) ;
89+ let show_cached = req. headers ( ) . get ( CACHE_VIEW_HEADER_KEY ) ;
90+ let user_id = req
8791 . headers ( )
88- . iter ( )
89- . find ( |& ( key, _) | key == CACHE_RESULTS_HEADER_KEY ) ;
90-
91- let show_cached = req
92- . headers ( )
93- . iter ( )
94- . find ( |& ( key, _) | key == CACHE_VIEW_HEADER_KEY ) ;
95-
96- match ( show_cached, query_cache_manager) {
97- ( None , None ) => { }
98- ( None , Some ( _) ) => { }
99- ( Some ( _) , None ) => {
100- log:: warn!(
101- "Instructed to show cached results but Query Caching is not Enabledon Server"
102- ) ;
103- }
104- ( Some ( _) , Some ( query_cache_manager) ) => {
105- let mut query_cache = query_cache_manager. get_cache ( stream) . await ?;
106-
107- let ( start, end) =
108- parse_human_time ( & query_request. start_time , & query_request. end_time ) ?;
109- let key = format ! (
110- "{}-{}-{}" ,
111- start. to_rfc3339( ) ,
112- end. to_rfc3339( ) ,
113- query_request. query. clone( )
114- ) ;
115-
116- let file_path = query_cache. get_file ( key) ;
117- if let Some ( file_path) = file_path {
118- let ( records, fields) = query_cache. get_cached_records ( & file_path) . await ?;
119- let response = QueryResponse {
120- records,
121- fields,
122- fill_null : query_request. send_null ,
123- with_fields : query_request. fields ,
124- }
125- . to_http ( ) ?;
126-
127- return Ok ( response) ;
128- }
129- }
130- }
92+ . get ( USER_ID_HEADER_KEY )
93+ . ok_or_else ( || QueryError :: Anyhow ( anyhow ! ( "User Id not provided" ) ) ) ?
94+ . to_str ( )
95+ . map_err ( |err| anyhow ! ( err) ) ?;
96+
97+ // deal with cached data
98+ if let Ok ( results) = get_results_from_cache (
99+ show_cached,
100+ query_cache_manager,
101+ stream,
102+ user_id,
103+ & query_request. start_time ,
104+ & query_request. end_time ,
105+ & query_request. query ,
106+ query_request. send_null ,
107+ query_request. fields ,
108+ )
109+ . await
110+ {
111+ return results. to_http ( ) ;
112+ } ;
131113
132114 let tables = visitor. into_inner ( ) ;
133-
134- if CONFIG . parseable . mode == Mode :: Query {
135- for table in tables {
136- if let Ok ( new_schema) = fetch_schema ( & table) . await {
137- // commit schema merges the schema internally and updates the schema in storage.
138- commit_schema_to_storage ( & table, new_schema. clone ( ) )
139- . await
140- . map_err ( QueryError :: ObjectStorage ) ?;
141- commit_schema ( & table, Arc :: new ( new_schema) ) . map_err ( QueryError :: EventError ) ?;
142- }
143- }
144- }
115+ update_schema_when_distributed ( tables) . await ?;
145116 let mut query: LogicalQuery = into_query ( & query_request, & session_state) . await ?;
146117
147- let creds = extract_session_key_from_req ( & req) . expect ( "expects basic auth" ) ;
148- let permissions: Vec < Permission > = Users . get_permissions ( & creds) ;
118+ let creds = extract_session_key_from_req ( & req) ? ;
119+ let permissions = Users . get_permissions ( & creds) ;
149120
150121 let table_name = query
151122 . first_table_name ( )
152123 . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" ) ) ?;
124+
153125 authorize_and_set_filter_tags ( & mut query, permissions, & table_name) ?;
154126
155127 let time = Instant :: now ( ) ;
156-
157128 let ( records, fields) = query. execute ( table_name. clone ( ) ) . await ?;
158-
159- match ( cache_results, query_cache_manager) {
160- ( None , None ) => { }
161- ( None , Some ( _) ) => { }
162- ( Some ( _) , None ) => {
163- log:: warn!(
164- "Instructed to cache query results but Query Caching is not Enabled in Server"
165- ) ;
166- }
167- // do cache
168- ( Some ( _) , Some ( query_cache_manager) ) => {
169- query_cache_manager
170- . create_parquet_cache (
171- & table_name,
172- & records,
173- query. start . to_rfc3339 ( ) ,
174- query. end . to_rfc3339 ( ) ,
175- query_request. query ,
176- )
177- . await ?
178- }
179- }
129+ // deal with cache saving
130+ put_results_in_cache (
131+ cache_results,
132+ user_id,
133+ query_cache_manager,
134+ & table_name,
135+ & records,
136+ query. start . to_rfc3339 ( ) ,
137+ query. end . to_rfc3339 ( ) ,
138+ query_request. query ,
139+ )
140+ . await ;
180141
181142 let response = QueryResponse {
182143 records,
@@ -195,7 +156,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
195156 Ok ( response)
196157}
197158
198- pub fn authorize_and_set_filter_tags (
159+ fn authorize_and_set_filter_tags (
199160 query : & mut LogicalQuery ,
200161 permissions : Vec < Permission > ,
201162 table_name : & str ,
@@ -379,15 +340,22 @@ pub enum QueryError {
379340 ObjectStorage ( #[ from] ObjectStorageError ) ,
380341 #[ error( "Cache Error: {0}" ) ]
381342 CacheError ( #[ from] CacheError ) ,
343+ #[ error( "" ) ]
344+ CacheMiss ,
382345 #[ error( "Evern Error: {0}" ) ]
383346 EventError ( #[ from] EventError ) ,
384347 #[ error( "Error: {0}" ) ]
385348 MalformedQuery ( & ' static str ) ,
349+ #[ allow( unused) ]
386350 #[ error(
387351 r#"Error: Failed to Parse Record Batch into Json
388352Description: {0}"#
389353 ) ]
390354 JsonParse ( String ) ,
355+ #[ error( "Error: {0}" ) ]
356+ ActixError ( #[ from] actix_web:: Error ) ,
357+ #[ error( "Error: {0}" ) ]
358+ Anyhow ( #[ from] anyhow:: Error ) ,
391359}
392360
393361impl actix_web:: ResponseError for QueryError {
0 commit comments