From 3c16dca7d801ba8a50c7534af31b11d0ef50950a Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 14:09:22 +0530 Subject: [PATCH 1/9] Query Server Impl WIP --- server/src/handlers/http.rs | 26 +++++++++++++ .../src/handlers/http/modal/query_server.rs | 5 ++- server/src/handlers/http/query.rs | 37 +++++++++++++++++-- server/src/main.rs | 3 +- server/src/query.rs | 10 ++++- 5 files changed, 74 insertions(+), 7 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 1d6f0b3ac..4c59f8c0f 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -17,6 +17,9 @@ */ use actix_cors::Cors; +use serde_json::Value; + +use self::{modal::query_server::QueryServer, query::Query}; pub(crate) mod about; pub(crate) mod health_check; @@ -55,3 +58,26 @@ pub(crate) fn cross_origin_config() -> Cors { pub fn base_path_without_preceding_slash() -> String { base_path().trim_start_matches('/').to_string() } + +pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result> { + // send the query request to the ingestor + let mut res = vec![]; + let ima = QueryServer::get_ingestor_info().await.unwrap(); + + for im in ima { + let uri = format!("{}{}/{}",im.domain_name, base_path(), "query"); + let reqw = reqwest::Client::new() + .post(uri) + .json(query) + .basic_auth("admin", Some("admin")) + .send() + .await?; + + if reqw.status().is_success() { + let v: Value = serde_json::from_slice(&reqw.bytes().await?)?; + res.push(v); + } + } + + Ok(res) +} diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 800539234..f8bd55091 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -173,7 +173,7 @@ impl QueryServer { let mut f = Self::get_meta_file().await; // writer the arr in f - let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; + let write_size = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; Ok(arr) } @@ -224,8 +224,9 @@ impl QueryServer { /// initialize the server, run migrations as needed and start the server async fn initialize(&self) -> anyhow::Result<()> { migration::run_metadata_migration(&CONFIG).await?; - let metadata = storage::resolve_parseable_metadata().await?; tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; + + let metadata = storage::resolve_parseable_metadata().await?; banner::print(&CONFIG, &metadata).await; // initialize the rbac map diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index af5120c49..1a70a8af2 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Timelike, Utc}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; @@ -36,8 +36,10 @@ use crate::rbac::Users; use crate::response::QueryResponse; use crate::utils::actix::extract_session_key_from_req; +use super::send_query_request_to_ingestor; + /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { query: String, @@ -52,6 +54,15 @@ pub struct Query { } pub async fn query(req: HttpRequest, query_request: Query) -> Result { + // create a new query to send to the ingestors + let mut mmem= vec![]; + if let Some(query) = transform_query_for_ingestor(&query_request) { + mmem = send_query_request_to_ingestor(&query) + .await + .map_err(|err| QueryError::Custom(err.to_string()))?; + } + + // let rbj = arrow_json::ReaderBuilder::new(); let creds = extract_session_key_from_req(&req).expect("expects basic auth"); let permissions = Users.get_permissions(&creds); let session_state = QUERY_SESSION.state(); @@ -94,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Option { + let end_time = DateTime::parse_from_rfc3339(&query.end_time).ok()?; + let start_time = end_time - chrono::Duration::minutes(1); + + dbg!(start_time.minute()); + + let q = Query { + query: query.query.clone(), + fields: query.fields, + filter_tags: query.filter_tags.clone(), + send_null: query.send_null, + start_time: start_time.to_rfc3339(), + end_time: end_time.to_rfc3339(), + }; + + Some(q) +} + #[derive(Debug, thiserror::Error)] pub enum QueryError { #[error("Query cannot be empty")] @@ -207,6 +236,8 @@ pub enum QueryError { Datafusion(#[from] DataFusionError), #[error("Execution Error: {0}")] Execute(#[from] ExecuteError), + #[error("Query Error: {0}")] + Custom(String), } impl actix_web::ResponseError for QueryError { diff --git a/server/src/main.rs b/server/src/main.rs index d2991294a..c4e08a360 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -68,7 +68,8 @@ async fn main() -> anyhow::Result<()> { }; // MODE == Query / Ingest and storage = local-store - server.validate()?; + // server.validate()?; + server.init().await?; Ok(()) diff --git a/server/src/query.rs b/server/src/query.rs index e3f9d8dbc..391d00896 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; +use serde_json::Value; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -102,11 +103,18 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute(&self, mem: Option>) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan()) .await?; + let schema = df.schema(); + if let Some(mem) = mem { + let mem = arrow_json::ReaderBuilder::new(schema.clone()) + .build(mem.iter().map(|x| serde_json::to_string(x).unwrap()).collect()); + + } + let fields = df .schema() .fields() From 9ee58c3e426d40257f37de4520cc809939875fd7 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 11 Mar 2024 11:35:18 +0530 Subject: [PATCH 2/9] fix: typo --- server/src/handlers/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 4c59f8c0f..73ed0fba9 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -62,7 +62,7 @@ pub fn base_path_without_preceding_slash() -> String { pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; - let ima = QueryServer::get_ingestor_info().await.unwrap(); + let ima = QueryServer::get_ingester_info().await.unwrap(); for im in ima { let uri = format!("{}{}/{}",im.domain_name, base_path(), "query"); From 15605b380db89e604fd74997bf6110672847e6d7 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 8 Mar 2024 17:40:13 +0530 Subject: [PATCH 3/9] feat: Impl Query Server 1. send_request_ingest_server to return proper result 2. fix: bug if there is no parquet files in store 3. Query Server Now gets all the resultant data --- server/src/handlers/http.rs | 36 ++++++++- .../src/handlers/http/modal/query_server.rs | 6 +- server/src/handlers/http/query.rs | 81 ++++++++++++++----- server/src/query.rs | 17 ++-- server/src/response.rs | 7 +- server/src/storage/object_storage.rs | 2 +- 6 files changed, 114 insertions(+), 35 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 73ed0fba9..7bcaa7cdc 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -59,23 +59,51 @@ pub fn base_path_without_preceding_slash() -> String { base_path().trim_start_matches('/').to_string() } +pub async fn send_schema_request(stream_name: &str) -> anyhow::Result> { + let mut res = vec![]; + let ima = QueryServer::get_ingester_info().await.unwrap(); + + for im in ima { + // todo: + let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name); + let reqw = reqwest::Client::new() + .get(uri) + .header(http::header::AUTHORIZATION, im.token.clone()) + .header(http::header::CONTENT_TYPE, "application/json") + .send() + .await?; + + if reqw.status().is_success() { + let v = serde_json::from_slice(&reqw.bytes().await?)?; + res.push(v); + } + } + + Ok(res) +} + pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; let ima = QueryServer::get_ingester_info().await.unwrap(); - for im in ima { - let uri = format!("{}{}/{}",im.domain_name, base_path(), "query"); + for im in ima.iter() { + let uri = format!("{}api/v1/{}", im.domain_name, "query"); let reqw = reqwest::Client::new() .post(uri) .json(query) - .basic_auth("admin", Some("admin")) + .header(http::header::AUTHORIZATION, im.token.clone()) + .header(http::header::CONTENT_TYPE, "application/json") .send() .await?; if reqw.status().is_success() { let v: Value = serde_json::from_slice(&reqw.bytes().await?)?; - res.push(v); + if let Some(arr) = v.as_array() { + for val in arr { + res.push(val.clone()) + } + } } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index f8bd55091..6d5721a21 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -173,7 +173,7 @@ impl QueryServer { let mut f = Self::get_meta_file().await; // writer the arr in f - let write_size = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; + let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; Ok(arr) } @@ -224,9 +224,11 @@ impl QueryServer { /// initialize the server, run migrations as needed and start the server async fn initialize(&self) -> anyhow::Result<()> { migration::run_metadata_migration(&CONFIG).await?; - tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; let metadata = storage::resolve_parseable_metadata().await?; + // do not commit the below line + tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; + banner::print(&CONFIG, &metadata).await; // initialize the rbac map diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 1a70a8af2..d7d2998c0 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,24 +19,30 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use chrono::{DateTime, Timelike, Utc}; +use arrow_schema::Schema; +use chrono::{DateTime, Utc}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; +use std::sync::Arc; use std::time::Instant; +use crate::event::commit_schema; +use crate::handlers::http::send_schema_request; use crate::metrics::QUERY_EXECUTE_TIME; +use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::QUERY_SESSION; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; +use crate::storage::object_storage::commit_schema_to_storage; use crate::utils::actix::extract_session_key_from_req; -use super::send_query_request_to_ingestor; +use super::send_request_to_ingestor; /// Query Request through http endpoint. #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -54,19 +60,41 @@ pub struct Query { } pub async fn query(req: HttpRequest, query_request: Query) -> Result { - // create a new query to send to the ingestors - let mut mmem= vec![]; - if let Some(query) = transform_query_for_ingestor(&query_request) { - mmem = send_query_request_to_ingestor(&query) - .await - .map_err(|err| QueryError::Custom(err.to_string()))?; + let session_state = QUERY_SESSION.state(); + let mut query = into_query(&query_request, &session_state).await?; + + if CONFIG.parseable.mode == Mode::Query { + if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await { + let new_schema = + Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?; + + commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone())) + .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?; + + commit_schema_to_storage(&query.table_name().unwrap(), new_schema) + .await + .map_err(|err| { + QueryError::Custom(format!("Error committing schema to storage\nError:{err}")) + })?; + } } - // let rbj = arrow_json::ReaderBuilder::new(); + let mmem = if CONFIG.parseable.mode == Mode::Query { + // create a new query to send to the ingestors + if let Some(que) = transform_query_for_ingestor(&query_request) { + let vals = send_request_to_ingestor(&que) + .await + .map_err(|err| QueryError::Custom(err.to_string()))?; + Some(vals) + } else { + None + } + } else { + None + }; + let creds = extract_session_key_from_req(&req).expect("expects basic auth"); let permissions = Users.get_permissions(&creds); - let session_state = QUERY_SESSION.state(); - let mut query = into_query(&query_request, &session_state).await?; // check authorization of this query if it references physical table; let table_name = query.table_name(); @@ -105,14 +133,14 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Option { - let end_time = DateTime::parse_from_rfc3339(&query.end_time).ok()?; - let start_time = end_time - chrono::Duration::minutes(1); + if query.query.is_empty() { + return None; + } + + if query.start_time.is_empty() { + return None; + } - dbg!(start_time.minute()); + if query.end_time.is_empty() { + return None; + } + + let end_time: DateTime = if query.end_time == "now" { + Utc::now() + } else { + DateTime::parse_from_rfc3339(&query.end_time) + .ok()? + .with_timezone(&Utc) + }; + let start_time = end_time - chrono::Duration::minutes(1); + // when transforming the query, the ingestors are forced to return an array of values let q = Query { query: query.query.clone(), - fields: query.fields, + fields: false, filter_tags: query.filter_tags.clone(), send_null: query.send_null, start_time: start_time.to_rfc3339(), @@ -236,7 +281,7 @@ pub enum QueryError { Datafusion(#[from] DataFusionError), #[error("Execution Error: {0}")] Execute(#[from] ExecuteError), - #[error("Query Error: {0}")] + #[error("Error: {0}")] Custom(String), } diff --git a/server/src/query.rs b/server/src/query.rs index 391d00896..4bf5dba03 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -33,7 +33,6 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; -use serde_json::Value; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -103,18 +102,14 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute(&self, mem: Option>) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute( + &self, + mem: Option>, + ) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan()) .await?; - let schema = df.schema(); - if let Some(mem) = mem { - let mem = arrow_json::ReaderBuilder::new(schema.clone()) - .build(mem.iter().map(|x| serde_json::to_string(x).unwrap()).collect()); - - } - let fields = df .schema() .fields() @@ -123,6 +118,10 @@ impl Query { .cloned() .collect_vec(); + if fields.is_empty() { + return Ok((vec![], fields)); + } + let results = df.collect().await?; Ok((results, fields)) } diff --git a/server/src/response.rs b/server/src/response.rs index 18b86d78f..a3351c473 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -43,7 +43,12 @@ impl QueryResponse { } } } - let values = json_records.into_iter().map(Value::Object).collect_vec(); + let mut values = json_records.into_iter().map(Value::Object).collect_vec(); + + if let Some(mut imem) = imem { + values.append(&mut imem); + } + let response = if self.with_fields { json!({ "fields": self.fields, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d0f07fab0..486fd8403 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -432,7 +432,7 @@ pub trait ObjectStorage: Sync + 'static { fn get_bucket_name(&self) -> String; } -async fn commit_schema_to_storage( +pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, ) -> Result<(), ObjectStorageError> { From aa6927f33d7203b40196d6e9f622fc6e0fcd3843 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 11 Mar 2024 11:39:19 +0530 Subject: [PATCH 4/9] Merge Changes from stratchpad --- server/src/handlers/http.rs | 1 + .../src/handlers/http/modal/ingest_server.rs | 8 ++++ server/src/handlers/http/query.rs | 48 +++++++++++++------ server/src/query.rs | 6 +-- server/src/response.rs | 2 +- 5 files changed, 45 insertions(+), 20 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 7bcaa7cdc..185c083f2 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -59,6 +59,7 @@ pub fn base_path_without_preceding_slash() -> String { base_path().trim_start_matches('/').to_string() } + pub async fn send_schema_request(stream_name: &str) -> anyhow::Result> { let mut res = vec![]; let ima = QueryServer::get_ingester_info().await.unwrap(); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index c028b0c5c..244bae9e9 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -175,6 +175,14 @@ impl IngestServer { ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) .service( // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream web::resource("/stats").route( diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index d7d2998c0..b214efacf 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use arrow_schema::Schema; +use arrow_json::reader::infer_json_schema_from_iterator; use chrono::{DateTime, Utc}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; @@ -30,8 +30,13 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use crate::event::commit_schema; +// Eshan's Code Under test +#[allow(unused_imports)] +use arrow_schema::Schema; +#[allow(unused_imports)] use crate::handlers::http::send_schema_request; + +use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; @@ -63,21 +68,21 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result = Lazy::new(|| Query::create_session_context(CONFIG.storage())); // A query request by client +#[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, pub start: DateTime, @@ -102,10 +103,7 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute( - &self, - mem: Option>, - ) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan()) .await?; diff --git a/server/src/response.rs b/server/src/response.rs index a3351c473..2cd32a424 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -30,7 +30,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> impl Responder { + pub fn to_http(&self, imem: Option>) -> impl Responder { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json_rows(&records).unwrap(); From 337dae81fe3f63879ba30c17d42ccd431cffea67 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 11 Mar 2024 13:00:19 +0530 Subject: [PATCH 5/9] rm dead code --- server/src/handlers/http.rs | 6 ++- .../src/handlers/http/modal/ingest_server.rs | 24 +---------- server/src/handlers/http/query.rs | 42 ++++++------------- 3 files changed, 18 insertions(+), 54 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 185c083f2..619345f70 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -59,13 +59,12 @@ pub fn base_path_without_preceding_slash() -> String { base_path().trim_start_matches('/').to_string() } - pub async fn send_schema_request(stream_name: &str) -> anyhow::Result> { let mut res = vec![]; let ima = QueryServer::get_ingester_info().await.unwrap(); for im in ima { - // todo: + // TODO: when you rebase the code from the Cluster Info PR update this uri generation let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name); let reqw = reqwest::Client::new() .get(uri) @@ -89,6 +88,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result anyhow::Result Scope { - web::scope("/logstream").service( - web::scope("/{logstream}") - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ), - ) + .service(Server::get_readiness_factory()); } fn logstream_api() -> Scope { diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index b214efacf..476ba83b9 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,7 +19,6 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use arrow_json::reader::infer_json_schema_from_iterator; use chrono::{DateTime, Utc}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; @@ -30,10 +29,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -// Eshan's Code Under test -#[allow(unused_imports)] use arrow_schema::Schema; -#[allow(unused_imports)] use crate::handlers::http::send_schema_request; use crate::event::commit_schema; @@ -68,21 +64,21 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Date: Wed, 13 Mar 2024 17:13:04 +0530 Subject: [PATCH 6/9] chore: fix typos --- server/src/handlers/http.rs | 4 ++-- server/src/handlers/http/query.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 619345f70..3b460aabd 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -82,8 +82,8 @@ pub async fn send_schema_request(stream_name: &str) -> anyhow::Result anyhow::Result> { - // send the query request to the ingestor +pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result> { + // send the query request to the ingester let mut res = vec![]; let ima = QueryServer::get_ingester_info().await.unwrap(); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 476ba83b9..2f380ddfb 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -43,7 +43,7 @@ use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; use crate::utils::actix::extract_session_key_from_req; -use super::send_request_to_ingestor; +use super::send_query_request_to_ingester; /// Query Request through http endpoint. #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -81,9 +81,9 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Option { +fn transform_query_for_ingester(query: &Query) -> Option { if query.query.is_empty() { return None; } @@ -246,7 +246,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option { }; let start_time = end_time - chrono::Duration::minutes(1); - // when transforming the query, the ingestors are forced to return an array of values + // when transforming the query, the ingesters are forced to return an array of values let q = Query { query: query.query.clone(), fields: false, From 649b4367d91710e2db8e0ade3f3af93be92c2ba8 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 13 Mar 2024 17:25:25 +0530 Subject: [PATCH 7/9] WIP --- server/src/handlers/http.rs | 2 +- .../src/handlers/http/modal/query_server.rs | 5 +- server/src/handlers/http/query.rs | 59 +++++++++++++++++++ server/src/query.rs | 2 + server/src/response.rs | 11 ++++ 5 files changed, 76 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 3b460aabd..832cd4ff4 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -104,7 +104,7 @@ pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result Result) -> Vec { + let mut sum = 0; + let mut key = String::new(); + let mut result = vec![]; + for value in &values { + if let Some(obj) = value.as_object() { + if obj.keys().all(|key| key.starts_with("COUNT")) { + key = obj.keys().next().unwrap().to_string(); + for v in obj.values() { + if let Some(num_str) = v.as_str() { + if let Ok(num) = num_str.parse::() { + sum += num; + } + } + } + + result = vec![serde_json::json!({ key: sum })]; + } + } + } + + result = values; + + // let result = serde_json::json!({ key: sum }); + dbg!(&result); + result + + // let mut key = String::new(); + // let mut out = 0; + + // let it = res.iter_mut().map(|x| { + // x.as_object_mut().expect("Should always be an object") + // }).collect_vec(); + + // for obj in it { + // if obj.keys().all(|x| x.starts_with("COUNT")) { + // key = obj.keys().next().unwrap().to_string(); + // // get all the values, parse them and sum them + // let o = obj.values().map(|x|{ + // x.as_str() + // .expect("Should always be a string") + // .parse::() + // .expect("Should always be a number, Hence parseable") + // }).sum::(); + + // serde_json::json!({ + // key: o + // }) + // } else { + // res + // } + + // } +} diff --git a/server/src/query.rs b/server/src/query.rs index 1084389e5..26f373e50 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -116,6 +116,8 @@ impl Query { .cloned() .collect_vec(); + dbg!(&fields); + if fields.is_empty() { return Ok((vec![], fields)); } diff --git a/server/src/response.rs b/server/src/response.rs index 2cd32a424..36c41cd56 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -22,6 +22,8 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; +use crate::handlers::http::query::query_response_flatten; + pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -49,6 +51,15 @@ impl QueryResponse { values.append(&mut imem); } + // if values.iter().all(|v| { + // v.as_object() + // .expect("Should always be an object") + // .keys() + // .all(|key| key.starts_with("COUNT")) + // }) { + // values = query_response_flatten(values) + // } + let response = if self.with_fields { json!({ "fields": self.fields, From d2c9ea0fef5fac35fdc3afc4cb9677677a760e71 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 14 Mar 2024 14:56:23 +0530 Subject: [PATCH 8/9] chore: rm dead code --- .../src/handlers/http/modal/query_server.rs | 19 ----- server/src/handlers/http/query.rs | 77 ++----------------- server/src/response.rs | 8 -- 3 files changed, 8 insertions(+), 96 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 9423b8549..ea17975c8 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -35,11 +35,8 @@ use relative_path::RelativePathBuf; use reqwest::Response; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::io::AsyncWriteExt; use url::Url; -use tokio::fs::File as TokioFile; - use crate::option::CONFIG; use super::server::Server; @@ -170,11 +167,6 @@ impl QueryServer { // TODO: add validation logic here // validate the ingester metadata - - // ! Not Needed - // let mut f = Self::get_meta_file().await; - // writer the arr in f - // let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; Ok(arr) } @@ -280,17 +272,6 @@ impl QueryServer { } } - async fn get_meta_file() -> TokioFile { - let meta_path = CONFIG.staging_dir().join(".query.json"); - - tokio::fs::OpenOptions::new() - .read(true) - .write(true) - .open(meta_path) - .await - .unwrap() - } - // forward the request to all ingesters to keep them in sync pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { let ingester_infos = Self::get_ingester_info().await.map_err(|err| { diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 19f15686e..c89cbebc6 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -24,15 +24,12 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; -use itertools::Itertools; -use serde_json::Value; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use arrow_schema::Schema; -use crate::handlers::http::send_schema_request; +use crate::handlers::http::fetch_schema; use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; @@ -66,30 +63,27 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result) -> Vec { - let mut sum = 0; - let mut key = String::new(); - let mut result = vec![]; - for value in &values { - if let Some(obj) = value.as_object() { - if obj.keys().all(|key| key.starts_with("COUNT")) { - key = obj.keys().next().unwrap().to_string(); - for v in obj.values() { - if let Some(num_str) = v.as_str() { - if let Ok(num) = num_str.parse::() { - sum += num; - } - } - } - - result = vec![serde_json::json!({ key: sum })]; - } - } - } - - result = values; - - // let result = serde_json::json!({ key: sum }); - dbg!(&result); - result - - // let mut key = String::new(); - // let mut out = 0; - - // let it = res.iter_mut().map(|x| { - // x.as_object_mut().expect("Should always be an object") - // }).collect_vec(); - - // for obj in it { - // if obj.keys().all(|x| x.starts_with("COUNT")) { - // key = obj.keys().next().unwrap().to_string(); - // // get all the values, parse them and sum them - // let o = obj.values().map(|x|{ - // x.as_str() - // .expect("Should always be a string") - // .parse::() - // .expect("Should always be a number, Hence parseable") - // }).sum::(); - - // serde_json::json!({ - // key: o - // }) - // } else { - // res - // } - - // } -} diff --git a/server/src/response.rs b/server/src/response.rs index 36c41cd56..c7cb0a6bf 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -51,14 +51,6 @@ impl QueryResponse { values.append(&mut imem); } - // if values.iter().all(|v| { - // v.as_object() - // .expect("Should always be an object") - // .keys() - // .all(|key| key.starts_with("COUNT")) - // }) { - // values = query_response_flatten(values) - // } let response = if self.with_fields { json!({ From 829a72caad4c44bf52120d10061832a7a7a843c3 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 14 Mar 2024 14:56:59 +0530 Subject: [PATCH 9/9] fix: return val for count query --- server/src/handlers/http.rs | 7 ++- server/src/query.rs | 115 ++++++++++++++++++++++++++++++++++++ server/src/response.rs | 3 +- 3 files changed, 122 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 832cd4ff4..fe787bc05 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -17,6 +17,7 @@ */ use actix_cors::Cors; +use arrow_schema::Schema; use serde_json::Value; use self::{modal::query_server::QueryServer, query::Query}; @@ -59,7 +60,7 @@ pub fn base_path_without_preceding_slash() -> String { base_path().trim_start_matches('/').to_string() } -pub async fn send_schema_request(stream_name: &str) -> anyhow::Result> { +pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { let mut res = vec![]; let ima = QueryServer::get_ingester_info().await.unwrap(); @@ -79,7 +80,9 @@ pub async fn send_schema_request(stream_name: &str) -> anyhow::Result anyhow::Result> { diff --git a/server/src/query.rs b/server/src/query.rs index 26f373e50..3c3151a55 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; +use serde_json::{json, Value}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -297,6 +298,45 @@ fn time_from_path(path: &Path) -> DateTime { .unwrap() } +pub fn flatten_objects_for_count(objects: Vec) -> Vec { + if objects.is_empty() { + return objects; + } + + // check if all the keys start with "COUNT" + let flag = objects.iter().all(|obj| { + obj.as_object() + .unwrap() + .keys() + .all(|key| key.starts_with("COUNT")) + }); + + if flag { + let mut accum = 0u64; + let key = objects[0] + .as_object() + .unwrap() + .keys() + .next() + .unwrap() + .clone(); + + for obj in objects { + let count = obj.as_object().unwrap().keys().fold(0, |acc, key| { + let value = obj.as_object().unwrap().get(key).unwrap().as_u64().unwrap(); + acc + value + }); + accum += count; + } + + vec![json!({ + key: accum + })] + } else { + objects + } +} + pub mod error { use crate::storage::ObjectStorageError; use datafusion::error::DataFusionError; @@ -312,6 +352,10 @@ pub mod error { #[cfg(test)] mod tests { + use serde_json::json; + + use crate::query::flatten_objects_for_count; + use super::time_from_path; use std::path::PathBuf; @@ -321,4 +365,75 @@ mod tests { let time = time_from_path(path.as_path()); assert_eq!(time.timestamp(), 1640995200); } + + #[test] + fn test_flat() { + let val = vec![ + json!({ + "COUNT(*)": 1 + }), + json!({ + "COUNT(*)": 2 + }), + json!({ + "COUNT(*)": 3 + }), + ]; + + let out = flatten_objects_for_count(val); + assert_eq!(out, vec![json!({"COUNT(*)": 6})]); + } + + #[test] + fn test_flat_empty() { + let val = vec![]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } + + #[test] + fn test_flat_single() { + let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(ALPHA)": 2})]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(vec![json!({"COUNT(ALPHA)": 3})], out); + } + + #[test] + fn test_flat_fail() { + let val = vec![ + json!({ + "Num": 1 + }), + json!({ + "Num": 2 + }), + json!({ + "Num": 3 + }), + ]; + + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } + + #[test] + fn test_flat_multi_key() { + let val = vec![ + json!({ + "Num": 1, + "COUNT(*)": 1 + }), + json!({ + "Num": 2, + "COUNT(*)": 2 + }), + json!({ + "Num": 3, + "COUNT(*)": 3 + }), + ]; + + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } } diff --git a/server/src/response.rs b/server/src/response.rs index c7cb0a6bf..6275864b5 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -22,7 +22,7 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; -use crate::handlers::http::query::query_response_flatten; +use crate::query::flatten_objects_for_count; pub struct QueryResponse { pub records: Vec, @@ -51,6 +51,7 @@ impl QueryResponse { values.append(&mut imem); } + let values = flatten_objects_for_count(values); let response = if self.with_fields { json!({