1919use actix_web:: http:: header:: ContentType ;
2020use actix_web:: web:: { self , Json } ;
2121use actix_web:: { FromRequest , HttpRequest , HttpResponse , Responder } ;
22+ use bytes:: Bytes ;
2223use chrono:: { DateTime , Utc } ;
2324use datafusion:: common:: tree_node:: TreeNode ;
2425use datafusion:: error:: DataFusionError ;
2526use datafusion:: execution:: context:: SessionState ;
27+ use futures:: StreamExt ;
2628use futures_util:: Future ;
2729use http:: StatusCode ;
2830use serde:: { Deserialize , Serialize } ;
@@ -41,15 +43,16 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4143use crate :: option:: Mode ;
4244use crate :: parseable:: { StreamNotFound , PARSEABLE } ;
4345use crate :: query:: error:: ExecuteError ;
44- use crate :: query:: { execute , CountsRequest , CountsResponse , Query as LogicalQuery } ;
46+ use crate :: query:: { execute_stream , CountsRequest , CountsResponse , Query as LogicalQuery } ;
4547use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
4648use crate :: rbac:: Users ;
47- use crate :: response:: { QueryResponse , TIME_ELAPSED_HEADER } ;
49+ use crate :: response:: TIME_ELAPSED_HEADER ;
4850use crate :: storage:: ObjectStorageError ;
4951use crate :: utils:: actix:: extract_session_key_from_req;
52+ use crate :: utils:: arrow:: record_batches_to_json;
5053use crate :: utils:: time:: { TimeParseError , TimeRange } ;
5154use crate :: utils:: user_auth_for_datasets;
52-
55+ use futures_core :: Stream as CoreStream ;
5356/// Query Request through http endpoint.
5457#[ derive( Debug , Deserialize , Serialize , Clone ) ]
5558#[ serde( rename_all = "camelCase" ) ]
@@ -132,25 +135,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
132135 . insert_header ( ( TIME_ELAPSED_HEADER , total_time. as_str ( ) ) )
133136 . json ( response) ) ;
134137 }
138+ let ( records_stream, fields) = execute_stream ( query, & table_name) . await ?;
139+ let fields = fields. clone ( ) ;
140+ let stream = records_stream. map ( move |batch_result| {
141+ match batch_result {
142+ Ok ( batch) => {
143+ // convert record batch to JSON
144+ let json = record_batches_to_json ( & [ batch] )
145+ . map_err ( actix_web:: error:: ErrorInternalServerError ) ?;
146+ // // Serialize to JSON string
147+ // let json = serde_json::to_value(&json)
148+ // .map_err(actix_web::error::ErrorInternalServerError)?;
149+ let response = json ! ( {
150+ "fields" : fields,
151+ "records" : json,
152+ } ) ;
153+ Ok ( Bytes :: from ( response. to_string ( ) ) )
154+ }
155+ Err ( e) => Err ( actix_web:: error:: ErrorInternalServerError ( e) ) ,
156+ }
157+ } ) ;
135158
136- let ( records, fields) = execute ( query, & table_name) . await ?;
137- let total_time = format ! ( "{:?}" , time. elapsed( ) ) ;
138- let response = QueryResponse {
139- records,
140- fields,
141- fill_null : query_request. send_null ,
142- with_fields : query_request. fields ,
143- total_time,
144- }
145- . to_http ( ) ?;
146-
147- let time = time. elapsed ( ) . as_secs_f64 ( ) ;
148-
149- QUERY_EXECUTE_TIME
150- . with_label_values ( & [ & table_name] )
151- . observe ( time) ;
159+ let boxed_stream =
160+ Box :: pin ( stream) as Pin < Box < dyn CoreStream < Item = Result < Bytes , actix_web:: Error > > + Send > > ;
152161
153- Ok ( response)
162+ Ok ( HttpResponse :: Ok ( )
163+ . content_type ( "application/json" )
164+ . streaming ( boxed_stream) )
154165}
155166
156167pub async fn get_counts (
0 commit comments