1616 *
1717 */
1818
19+ use crate :: event:: error:: EventError ;
20+ use crate :: handlers:: http:: fetch_schema;
1921use actix_web:: http:: header:: ContentType ;
2022use actix_web:: web:: { self , Json } ;
2123use actix_web:: { FromRequest , HttpRequest , HttpResponse , Responder } ;
@@ -24,7 +26,8 @@ use chrono::{DateTime, Utc};
2426use datafusion:: common:: tree_node:: TreeNode ;
2527use datafusion:: error:: DataFusionError ;
2628use datafusion:: execution:: context:: SessionState ;
27- use futures:: StreamExt ;
29+ use futures:: stream:: once;
30+ use futures:: { future, Stream , StreamExt } ;
2831use futures_util:: Future ;
2932use http:: StatusCode ;
3033use serde:: { Deserialize , Serialize } ;
@@ -35,9 +38,6 @@ use std::sync::Arc;
3538use std:: time:: Instant ;
3639use tracing:: error;
3740
38- use crate :: event:: error:: EventError ;
39- use crate :: handlers:: http:: fetch_schema;
40-
4141use crate :: event:: commit_schema;
4242use crate :: metrics:: QUERY_EXECUTE_TIME ;
4343use crate :: option:: Mode ;
@@ -46,11 +46,13 @@ use crate::query::error::ExecuteError;
4646use crate :: query:: { execute, execute_stream, CountsRequest , CountsResponse , Query as LogicalQuery } ;
4747use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
4848use crate :: rbac:: Users ;
49- use crate :: response:: { QueryResponse , TIME_ELAPSED_HEADER } ;
49+ use crate :: response:: QueryResponse ;
5050use crate :: storage:: ObjectStorageError ;
5151use crate :: utils:: actix:: extract_session_key_from_req;
5252use crate :: utils:: time:: { TimeParseError , TimeRange } ;
5353use crate :: utils:: user_auth_for_datasets;
54+
55+ const TIME_ELAPSED_HEADER : & str = "p-time-elapsed" ;
5456/// Query Request through http endpoint.
5557#[ derive( Debug , Deserialize , Serialize , Clone ) ]
5658#[ serde( rename_all = "camelCase" ) ]
@@ -167,7 +169,7 @@ async fn handle_non_streaming_query(
167169 fill_null : query_request. send_null ,
168170 with_fields : query_request. fields ,
169171 }
170- . to_http ( ) ?;
172+ . to_json ( ) ?;
171173 Ok ( HttpResponse :: Ok ( )
172174 . insert_header ( ( TIME_ELAPSED_HEADER , total_time. as_str ( ) ) )
173175 . json ( response) )
@@ -190,30 +192,65 @@ async fn handle_streaming_query(
190192 let send_null = query_request. send_null ;
191193 let with_fields = query_request. fields ;
192194
193- let stream = records_stream. map ( move |batch_result| match batch_result {
194- Ok ( batch) => {
195- let response = QueryResponse {
196- records : vec ! [ batch] ,
197- fields : fields. clone ( ) ,
198- fill_null : send_null,
199- with_fields,
195+ let stream = if with_fields {
196+ // send the fields as an initial chunk
197+ let fields_json = serde_json:: json!( {
198+ "fields" : fields
199+ } )
200+ . to_string ( ) ;
201+
202+ // stream the records without fields
203+ let records_stream = records_stream. map ( move |batch_result| match batch_result {
204+ Ok ( batch) => {
205+ let response = QueryResponse {
206+ records : vec ! [ batch] ,
207+ fields : Vec :: new ( ) ,
208+ fill_null : send_null,
209+ with_fields : false ,
210+ }
211+ . to_json ( )
212+ . unwrap_or_else ( |e| {
213+ error ! ( "Failed to parse record batch into JSON: {}" , e) ;
214+ json ! ( { } )
215+ } ) ;
216+ Ok ( Bytes :: from ( format ! ( "{}\n " , response) ) )
200217 }
201- . to_http ( )
202- . unwrap_or_else ( |e| {
203- error ! ( "Failed to parse record batch into JSON: {}" , e) ;
204- json ! ( { } )
205- } ) ;
206- Ok ( Bytes :: from ( format ! ( "{}\n " , response) ) )
207- }
208- Err ( e) => Err ( actix_web:: error:: ErrorInternalServerError ( e) ) ,
209- } ) ;
218+ Err ( e) => Err ( actix_web:: error:: ErrorInternalServerError ( e) ) ,
219+ } ) ;
220+
221+ // Combine the initial fields chunk with the records stream
222+ let fields_chunk = once ( future:: ok :: < _ , actix_web:: Error > ( Bytes :: from ( format ! (
223+ "{}\n " ,
224+ fields_json
225+ ) ) ) ) ;
226+ Box :: pin ( fields_chunk. chain ( records_stream) )
227+ as Pin < Box < dyn Stream < Item = Result < Bytes , actix_web:: Error > > > >
228+ } else {
229+ let stream = records_stream. map ( move |batch_result| match batch_result {
230+ Ok ( batch) => {
231+ let response = QueryResponse {
232+ records : vec ! [ batch] ,
233+ fields : fields. clone ( ) ,
234+ fill_null : send_null,
235+ with_fields,
236+ }
237+ . to_json ( )
238+ . unwrap_or_else ( |e| {
239+ error ! ( "Failed to parse record batch into JSON: {}" , e) ;
240+ json ! ( { } )
241+ } ) ;
242+ Ok ( Bytes :: from ( format ! ( "{}\n " , response) ) )
243+ }
244+ Err ( e) => Err ( actix_web:: error:: ErrorInternalServerError ( e) ) ,
245+ } ) ;
210246
211- let boxed_stream = Box :: pin ( stream) ;
247+ Box :: pin ( stream) as Pin < Box < dyn Stream < Item = Result < Bytes , actix_web:: Error > > > >
248+ } ;
212249
213250 Ok ( HttpResponse :: Ok ( )
214- . content_type ( "application/json " )
251+ . content_type ( "application/x-ndjson " )
215252 . insert_header ( ( TIME_ELAPSED_HEADER , total_time. as_str ( ) ) )
216- . streaming ( boxed_stream ) )
253+ . streaming ( stream ) )
217254}
218255
219256pub async fn get_counts (
0 commit comments