1+ /*
2+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+ *
4+ * This program is free software: you can redistribute it and/or modify
5+ * it under the terms of the GNU Affero General Public License as
6+ * published by the Free Software Foundation, either version 3 of the
7+ * License, or (at your option) any later version.
8+ *
9+ * This program is distributed in the hope that it will be useful,
10+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
11+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+ * GNU Affero General Public License for more details.
13+ *
14+ * You should have received a copy of the GNU Affero General Public License
15+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
16+ *
17+ */
18+
119use arrow_array:: RecordBatch ;
2- use arrow_flight:: encode:: FlightDataEncoderBuilder ;
320use arrow_flight:: flight_service_server:: FlightServiceServer ;
421use arrow_flight:: PollInfo ;
522use arrow_schema:: ArrowError ;
623
724use datafusion:: common:: tree_node:: TreeNode ;
825use serde_json:: json;
926use std:: net:: SocketAddr ;
10- use std:: sync:: Arc ;
1127use std:: time:: Instant ;
1228use tonic:: codec:: CompressionEncoding ;
1329
@@ -16,34 +32,38 @@ use futures_util::{Future, TryFutureExt};
1632use tonic:: transport:: { Identity , Server , ServerTlsConfig } ;
1733use tonic_web:: GrpcWebLayer ;
1834
19- use crate :: event:: commit_schema;
2035use crate :: handlers:: http:: cluster:: get_ingestor_info;
21- use crate :: handlers:: http:: fetch_schema;
2236
37+ use crate :: handlers:: { CACHE_RESULTS_HEADER_KEY , CACHE_VIEW_HEADER_KEY , USER_ID_HEADER_KEY } ;
2338use crate :: metrics:: QUERY_EXECUTE_TIME ;
24- use crate :: option:: { Mode , CONFIG } ;
39+ use crate :: option:: CONFIG ;
2540
2641use crate :: handlers:: livetail:: cross_origin_config;
2742
28- use crate :: handlers:: http:: query:: { authorize_and_set_filter_tags, into_query} ;
43+ use crate :: handlers:: http:: query:: {
44+ authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
45+ } ;
2946use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
30- use crate :: storage :: object_storage :: commit_schema_to_storage ;
47+ use crate :: querycache :: QueryCacheManager ;
3148use crate :: utils:: arrow:: flight:: {
32- append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester,
49+ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
50+ send_to_ingester,
3351} ;
3452use arrow_flight:: {
3553 flight_service_server:: FlightService , Action , ActionType , Criteria , Empty , FlightData ,
3654 FlightDescriptor , FlightInfo , HandshakeRequest , HandshakeResponse , PutResult , SchemaAsIpc ,
3755 SchemaResult , Ticket ,
3856} ;
3957use arrow_ipc:: writer:: IpcWriteOptions ;
40- use futures:: { stream, TryStreamExt } ;
58+ use futures:: stream;
4159use tonic:: { Request , Response , Status , Streaming } ;
4260
4361use crate :: handlers:: livetail:: extract_session_key;
4462use crate :: metadata:: STREAM_INFO ;
4563use crate :: rbac:: Users ;
4664
65+ use super :: http:: query:: get_results_from_cache;
66+
4767#[ derive( Clone , Debug ) ]
4868pub struct AirServiceImpl { }
4969
@@ -112,7 +132,7 @@ impl FlightService for AirServiceImpl {
112132 async fn do_get ( & self , req : Request < Ticket > ) -> Result < Response < Self :: DoGetStream > , Status > {
113133 let key = extract_session_key ( req. metadata ( ) ) ?;
114134
115- let ticket = get_query_from_ticket ( req) ?;
135+ let ticket = get_query_from_ticket ( & req) ?;
116136
117137 log:: info!( "query requested to airplane: {:?}" , ticket) ;
118138
@@ -132,32 +152,57 @@ impl FlightService for AirServiceImpl {
132152 let mut visitor = TableScanVisitor :: default ( ) ;
133153 let _ = raw_logical_plan. visit ( & mut visitor) ;
134154
135- let tables = visitor. into_inner ( ) ;
136-
137- if CONFIG . parseable . mode == Mode :: Query {
138- // using http to get the schema. may update to use flight later
139- for table in tables {
140- if let Ok ( new_schema) = fetch_schema ( & table) . await {
141- // commit schema merges the schema internally and updates the schema in storage.
142- commit_schema_to_storage ( & table, new_schema. clone ( ) )
143- . await
144- . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
145- commit_schema ( & table, Arc :: new ( new_schema) )
146- . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
147- }
148- }
155+ let streams = visitor. into_inner ( ) ;
156+
157+ let query_cache_manager = QueryCacheManager :: global ( CONFIG . parseable . query_cache_size )
158+ . await
159+ . unwrap_or ( None ) ;
160+
161+ let cache_results = req
162+ . metadata ( )
163+ . get ( CACHE_RESULTS_HEADER_KEY )
164+ . and_then ( |value| value. to_str ( ) . ok ( ) ) ; // I dont think we need to own this.
165+
166+ let show_cached = req
167+ . metadata ( )
168+ . get ( CACHE_VIEW_HEADER_KEY )
169+ . and_then ( |value| value. to_str ( ) . ok ( ) ) ;
170+
171+ let user_id = req
172+ . metadata ( )
173+ . get ( USER_ID_HEADER_KEY )
174+ . and_then ( |value| value. to_str ( ) . ok ( ) ) ;
175+ let stream_name = streams
176+ . first ( )
177+ . ok_or_else ( || Status :: aborted ( "Malformed SQL Provided, Table Name Not Found" ) ) ?
178+ . to_owned ( ) ;
179+
180+ // send the cached results
181+ if let Ok ( cache_results) = get_results_from_cache (
182+ show_cached,
183+ query_cache_manager,
184+ & stream_name,
185+ user_id,
186+ & ticket. start_time ,
187+ & ticket. end_time ,
188+ & ticket. query ,
189+ ticket. send_null ,
190+ ticket. fields ,
191+ )
192+ . await
193+ {
194+ return cache_results. into_flight ( ) ;
149195 }
150196
197+ update_schema_when_distributed ( streams)
198+ . await
199+ . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
200+
151201 // map payload to query
152202 let mut query = into_query ( & ticket, & session_state)
153203 . await
154204 . map_err ( |_| Status :: internal ( "Failed to parse query" ) ) ?;
155205
156- // if table name is not present it is a Malformed Query
157- let stream_name = query
158- . first_table_name ( )
159- . ok_or_else ( || Status :: invalid_argument ( "Malformed Query" ) ) ?;
160-
161206 let event =
162207 if send_to_ingester ( query. start . timestamp_millis ( ) , query. end . timestamp_millis ( ) ) {
163208 let sql = format ! ( "select * from {}" , & stream_name) ;
@@ -192,11 +237,26 @@ impl FlightService for AirServiceImpl {
192237 Status :: permission_denied ( "User Does not have permission to access this" )
193238 } ) ?;
194239 let time = Instant :: now ( ) ;
195- let ( results , _) = query
240+ let ( records , _) = query
196241 . execute ( stream_name. clone ( ) )
197242 . await
198243 . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
199244
245+ if let Err ( err) = put_results_in_cache (
246+ cache_results,
247+ user_id,
248+ query_cache_manager,
249+ & stream_name,
250+ & records,
251+ query. start . to_rfc3339 ( ) ,
252+ query. end . to_rfc3339 ( ) ,
253+ ticket. query ,
254+ )
255+ . await
256+ {
257+ log:: error!( "{}" , err) ;
258+ } ;
259+
200260 /*
201261 * INFO: No returning the schema with the data.
202262 * kept it in case it needs to be sent in the future.
@@ -208,18 +268,7 @@ impl FlightService for AirServiceImpl {
208268 .collect::<Vec<_>>();
209269 let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
210270 */
211- let input_stream = futures:: stream:: iter ( results. into_iter ( ) . map ( Ok ) ) ;
212- let write_options = IpcWriteOptions :: default ( )
213- . try_with_compression ( Some ( arrow_ipc:: CompressionType ( 1 ) ) )
214- . map_err ( |err| Status :: failed_precondition ( err. to_string ( ) ) ) ?;
215-
216- let flight_data_stream = FlightDataEncoderBuilder :: new ( )
217- . with_max_flight_data_size ( usize:: MAX )
218- . with_options ( write_options)
219- // .with_schema(schema.into())
220- . build ( input_stream) ;
221-
222- let flight_data_stream = flight_data_stream. map_err ( |err| Status :: unknown ( err. to_string ( ) ) ) ;
271+ let out = into_flight_data ( records) ;
223272
224273 if let Some ( event) = event {
225274 event. clear ( & stream_name) ;
@@ -230,9 +279,7 @@ impl FlightService for AirServiceImpl {
230279 . with_label_values ( & [ & format ! ( "flight-query-{}" , stream_name) ] )
231280 . observe ( time) ;
232281
233- Ok ( Response :: new (
234- Box :: pin ( flight_data_stream) as Self :: DoGetStream
235- ) )
282+ out
236283 }
237284
238285 async fn do_put (
0 commit comments