From de14229d1f94f2c38aaa3c8735a3af09cce908c1 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 7 Aug 2023 12:58:09 +0530 Subject: [PATCH 1/2] Query param for attaching fields to response json --- server/src/handlers/http/query.rs | 32 +++++++++++++++++++++++-------- server/src/response.rs | 26 ++++++++++++------------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index eac411028..1ccf265fb 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -18,12 +18,13 @@ use actix_web::error::ErrorUnauthorized; use actix_web::http::header::ContentType; -use actix_web::web::Json; +use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; use actix_web_httpauth::extractors::basic::BasicAuth; use futures_util::Future; use http::StatusCode; use serde_json::Value; +use std::collections::HashMap; use std::pin::Pin; use std::time::Instant; @@ -36,22 +37,37 @@ use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; -pub async fn query(query: Query) -> Result { +pub async fn query( + query: Query, + web::Query(params): web::Query>, +) -> Result { let time = Instant::now(); + // format output json to include field names + let with_fields = params.get("withFields").cloned().unwrap_or(false); + // Fill missing columns with null + let fill_null = params + .get("fillNull") + .cloned() + .or(Some(query.fill_null)) + .unwrap_or(false); + let storage = CONFIG.storage().get_object_store(); - let query_result = query.execute(storage).await; - let query_result = query_result - .map(|(records, fields)| QueryResponse::new(records, fields, query.fill_null)) - .map(|response| response.to_http()) - .map_err(|e| e.into()); + let (records, fields) = query.execute(storage).await?; + let response = QueryResponse { + records, + fields, + fill_null, + with_fields, + } + .to_http(); let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME .with_label_values(&[query.stream_name.as_str()]) .observe(time); - query_result + Ok(response) } impl FromRequest for Query { diff --git a/server/src/response.rs b/server/src/response.rs index da44e95f8..1d0981bb0 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -16,30 +16,20 @@ * */ -use actix_web::http::StatusCode; use actix_web::{web, Responder}; use datafusion::arrow::json::writer::record_batches_to_json_rows; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; -use serde_json::Value; +use serde_json::{json, Value}; pub struct QueryResponse { - pub code: StatusCode, pub records: Vec, pub fields: Vec, pub fill_null: bool, + pub with_fields: bool, } impl QueryResponse { - pub fn new(records: Vec, fields: Vec, fill_null: bool) -> Self { - Self { - code: StatusCode::OK, - records, - fields, - fill_null, - } - } - pub fn to_http(&self) -> impl Responder { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); @@ -53,8 +43,16 @@ impl QueryResponse { } } } - let values = json_records.into_iter().map(Value::Object).collect_vec(); - web::Json(values) + let response = if self.with_fields { + json!({ + "fields": self.fields, + "records": values + }) + } else { + Value::Array(values) + }; + + web::Json(response) } } From e2bd4cce4410d3a71f0dccbe13dbdb1788c878b7 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 7 Aug 2023 15:09:46 +0530 Subject: [PATCH 2/2] Change query param --- server/src/handlers/http/query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 1ccf265fb..3a1441399 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -44,7 +44,7 @@ pub async fn query( let time = Instant::now(); // format output json to include field names - let with_fields = params.get("withFields").cloned().unwrap_or(false); + let with_fields = params.get("fields").cloned().unwrap_or(false); // Fill missing columns with null let fill_null = params .get("fillNull")