@@ -87,12 +87,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8787
8888 let cache_results = req. headers ( ) . get ( CACHE_RESULTS_HEADER_KEY ) ;
8989 let show_cached = req. headers ( ) . get ( CACHE_VIEW_HEADER_KEY ) ;
90- let user_id = req
91- . headers ( )
92- . get ( USER_ID_HEADER_KEY )
93- . ok_or_else ( || QueryError :: Anyhow ( anyhow ! ( "User Id not provided" ) ) ) ?
94- . to_str ( )
95- . map_err ( |err| anyhow ! ( err) ) ?;
90+ let user_id = req. headers ( ) . get ( USER_ID_HEADER_KEY ) ;
9691
9792 // deal with cached data
9893 if let Ok ( results) = get_results_from_cache (
@@ -156,7 +151,114 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
156151 Ok ( response)
157152}
158153
159- fn authorize_and_set_filter_tags (
154+ async fn update_schema_when_distributed ( tables : Vec < String > ) -> Result < ( ) , QueryError > {
155+ if CONFIG . parseable . mode == Mode :: Query {
156+ for table in tables {
157+ if let Ok ( new_schema) = fetch_schema ( & table) . await {
158+ // commit schema merges the schema internally and updates the schema in storage.
159+ commit_schema_to_storage ( & table, new_schema. clone ( ) ) . await ?;
160+
161+ commit_schema ( & table, Arc :: new ( new_schema) ) ?;
162+ }
163+ }
164+ }
165+
166+ Ok ( ( ) )
167+ }
168+
169+ #[ allow( clippy:: too_many_arguments) ]
170+ async fn put_results_in_cache (
171+ cache_results : Option < & HeaderValue > ,
172+ user_id : Option < & HeaderValue > ,
173+ query_cache_manager : Option < & QueryCacheManager > ,
174+ stream : & str ,
175+ records : & [ RecordBatch ] ,
176+ start : String ,
177+ end : String ,
178+ query : String ,
179+ ) {
180+ match ( cache_results, query_cache_manager) {
181+ ( Some ( _) , None ) => {
182+ log:: warn!(
183+ "Instructed to cache query results but Query Caching is not Enabled in Server"
184+ ) ;
185+ }
186+ // do cache
187+ ( Some ( _) , Some ( query_cache_manager) ) => {
188+ let user_id = user_id
189+ . expect ( "User Id was provided" )
190+ . to_str ( )
191+ . expect ( "is proper ASCII" ) ;
192+
193+ if let Err ( err) = query_cache_manager
194+ . create_parquet_cache ( stream, records, user_id, start, end, query)
195+ . await
196+ {
197+ log:: error!( "Error occured while caching query results: {:?}" , err) ;
198+ if query_cache_manager
199+ . clear_cache ( stream, user_id)
200+ . await
201+ . is_err ( )
202+ {
203+ log:: error!( "Error Clearing Unwanted files from cache dir" ) ;
204+ }
205+ }
206+ }
207+ ( None , _) => { }
208+ }
209+ }
210+
211+ #[ allow( clippy:: too_many_arguments) ]
212+ async fn get_results_from_cache (
213+ show_cached : Option < & HeaderValue > ,
214+ query_cache_manager : Option < & QueryCacheManager > ,
215+ stream : & str ,
216+ user_id : Option < & HeaderValue > ,
217+ start_time : & str ,
218+ end_time : & str ,
219+ query : & str ,
220+ send_null : bool ,
221+ send_fields : bool ,
222+ ) -> Result < QueryResponse , QueryError > {
223+ match ( show_cached, query_cache_manager) {
224+ ( Some ( _) , None ) => {
225+ log:: warn!(
226+ "Instructed to show cached results but Query Caching is not Enabled on Server"
227+ ) ;
228+ None
229+ }
230+ ( Some ( _) , Some ( query_cache_manager) ) => {
231+ let user_id = user_id
232+ . ok_or_else ( || QueryError :: Anyhow ( anyhow ! ( "User Id not provided" ) ) ) ?
233+ . to_str ( )
234+ . map_err ( |err| anyhow ! ( err) ) ?;
235+
236+ let mut query_cache = query_cache_manager. get_cache ( stream, user_id) . await ?;
237+
238+ let ( start, end) = parse_human_time ( start_time, end_time) ?;
239+ let key = format ! ( "{}-{}-{}" , start. to_rfc3339( ) , end. to_rfc3339( ) , query) ;
240+
241+ let file_path = query_cache. get_file ( key) ;
242+ if let Some ( file_path) = file_path {
243+ let ( records, fields) = query_cache. get_cached_records ( & file_path) . await ?;
244+ let response = QueryResponse {
245+ records,
246+ fields,
247+ fill_null : send_null,
248+ with_fields : send_fields,
249+ } ;
250+
251+ Some ( Ok ( response) )
252+ } else {
253+ None
254+ }
255+ }
256+ ( _, _) => None ,
257+ }
258+ . map_or_else ( || Err ( QueryError :: CacheMiss ) , |ret_val| ret_val)
259+ }
260+
261+ pub fn authorize_and_set_filter_tags (
160262 query : & mut LogicalQuery ,
161263 permissions : Vec < Permission > ,
162264 table_name : & str ,
0 commit comments