From f681bb824532deebc9a0edacfa2402563b5f59fb Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 17 Feb 2023 14:24:45 +0530 Subject: [PATCH 1/3] Merge null to fields that are undefined --- server/src/handlers/event.rs | 7 +++++-- server/src/query.rs | 15 ++++++++++++-- server/src/response.rs | 38 ++++++++++++++++++++++-------------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 34b556f77..043f47716 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -16,7 +16,7 @@ * */ -use actix_web::{web, HttpRequest, HttpResponse}; +use actix_web::{web, HttpRequest, HttpResponse, Responder}; use serde_json::Value; use std::time::Instant; @@ -35,7 +35,10 @@ const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const SEPARATOR: char = '^'; -pub async fn query(_req: HttpRequest, json: web::Json) -> Result { +pub async fn query( + _req: HttpRequest, + json: web::Json, +) -> Result { let time = Instant::now(); let json = json.into_inner(); let query = Query::parse(json)?; diff --git a/server/src/query.rs b/server/src/query.rs index abb25d1fb..df0d7aa5b 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -88,7 +88,7 @@ impl Query { pub async fn execute( &self, storage: Arc, - ) -> Result, ExecuteError> { + ) -> Result<(Vec, Vec), ExecuteError> { let dir = StorageDir::new(&self.stream_name); // take a look at local dir and figure out what local cache we could use for this query let staging_arrows = dir @@ -128,8 +128,19 @@ impl Query { .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; + // dataframe qualifies name by adding table name before columns. \ + // For now this is just actual names + let fields = df + .schema() + .fields() + .into_iter() + .map(|f| f.name()) + .cloned() + .collect_vec(); + let results = df.collect().await?; - Ok(results) + + Ok((results, fields)) } } diff --git a/server/src/response.rs b/server/src/response.rs index 16a14d12e..0aa75ef46 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -17,34 +17,42 @@ */ use actix_web::http::StatusCode; -use actix_web::{HttpResponse, HttpResponseBuilder}; -use datafusion::arrow::json; +use actix_web::{web, Responder}; +use datafusion::arrow::json::writer::record_batches_to_json_rows; use datafusion::arrow::record_batch::RecordBatch; +use itertools::Itertools; +use serde_json::Value; pub struct QueryResponse { pub code: StatusCode, - pub body: Vec, + pub records: Vec, + pub fields: Vec, } impl QueryResponse { - pub fn to_http(&self) -> HttpResponse { + pub fn to_http(&self) -> impl Responder { log::info!("{}", "Returning query results"); - let buf = Vec::new(); - let mut writer = json::ArrayWriter::new(buf); - writer.write_batches(&self.body).unwrap(); - writer.finish().unwrap(); - - HttpResponseBuilder::new(self.code) - .content_type("json") - .body(writer.into_inner()) + let mut json = record_batches_to_json_rows(&self.records).unwrap(); + for map in &mut json { + for field in &self.fields { + if !map.contains_key(field) { + map.insert(field.clone(), Value::Null); + } + } + } + let values = json.into_iter().map(|map| Value::Object(map)).collect_vec(); + web::Json(values) } } -impl From> for QueryResponse { - fn from(body: Vec) -> Self { +impl From<(Vec, Vec)> for QueryResponse { + fn from(body: (Vec, Vec)) -> Self { + let (records, fields) = body; + Self { code: StatusCode::OK, - body, + records, + fields, } } } From 6498533474cedd83320f782bf0cb3521a2e3c74a Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 17 Feb 2023 15:17:50 +0530 Subject: [PATCH 2/3] Fix --- server/src/query.rs | 2 +- server/src/response.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index df0d7aa5b..b94d314d5 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -133,7 +133,7 @@ impl Query { let fields = df .schema() .fields() - .into_iter() + .iter() .map(|f| f.name()) .cloned() .collect_vec(); diff --git a/server/src/response.rs b/server/src/response.rs index 0aa75ef46..ab0e936be 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -40,7 +40,7 @@ impl QueryResponse { } } } - let values = json.into_iter().map(|map| Value::Object(map)).collect_vec(); + let values = json.into_iter().map(Value::Object).collect_vec(); web::Json(values) } } From f3368d1a072f94f0df49180393a56371de536186 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 17 Feb 2023 16:07:12 +0530 Subject: [PATCH 3/3] Add fill_null option --- server/src/handlers/event.rs | 10 +++++++++- server/src/response.rs | 38 +++++++++++++++++++----------------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 043f47716..67fb15d58 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -33,6 +33,7 @@ use self::error::{PostError, QueryError}; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; +const FILL_NULL_OPTION_KEY: &str = "fill_null"; const SEPARATOR: char = '^'; pub async fn query( @@ -41,12 +42,19 @@ pub async fn query( ) -> Result { let time = Instant::now(); let json = json.into_inner(); + + let fill_null = json + .as_object() + .and_then(|map| map.get(FILL_NULL_OPTION_KEY)) + .and_then(|value| value.as_bool()) + .unwrap_or_default(); + let query = Query::parse(json)?; let storage = CONFIG.storage().get_object_store(); let query_result = query.execute(storage).await; let query_result = query_result - .map(Into::::into) + .map(|(records, fields)| QueryResponse::new(records, fields, fill_null)) .map(|response| response.to_http()) .map_err(|e| e.into()); diff --git a/server/src/response.rs b/server/src/response.rs index ab0e936be..02966e5d0 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -27,32 +27,34 @@ pub struct QueryResponse { pub code: StatusCode, pub records: Vec, pub fields: Vec, + pub fill_null: bool, } impl QueryResponse { + pub fn new(records: Vec, fields: Vec, fill_null: bool) -> Self { + Self { + code: StatusCode::OK, + records, + fields, + fill_null, + } + } + pub fn to_http(&self) -> impl Responder { log::info!("{}", "Returning query results"); - let mut json = record_batches_to_json_rows(&self.records).unwrap(); - for map in &mut json { - for field in &self.fields { - if !map.contains_key(field) { - map.insert(field.clone(), Value::Null); + let mut json_records = record_batches_to_json_rows(&self.records).unwrap(); + + if self.fill_null { + for map in &mut json_records { + for field in &self.fields { + if !map.contains_key(field) { + map.insert(field.clone(), Value::Null); + } } } } - let values = json.into_iter().map(Value::Object).collect_vec(); - web::Json(values) - } -} - -impl From<(Vec, Vec)> for QueryResponse { - fn from(body: (Vec, Vec)) -> Self { - let (records, fields) = body; - Self { - code: StatusCode::OK, - records, - fields, - } + let values = json_records.into_iter().map(Value::Object).collect_vec(); + web::Json(values) } }