From c01fafb38b92273bef396900797251aeeb0862b1 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 14 Mar 2024 15:24:51 +0530 Subject: [PATCH 1/4] feat: Impl Query Server Add: Schema API for Ingestion Server RM: Redundant API Endpoints feat: Last min data is Queryable Add: Unit Tests --- server/src/handlers/http.rs | 74 ++++++++++- .../src/handlers/http/modal/ingest_server.rs | 32 ++--- .../src/handlers/http/modal/query_server.rs | 21 +-- server/src/handlers/http/query.rs | 81 +++++++++++- server/src/query.rs | 120 ++++++++++++++++++ server/src/response.rs | 13 +- server/src/storage/object_storage.rs | 2 +- 7 files changed, 292 insertions(+), 51 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 1d6f0b3ac..60534eb80 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -17,6 +17,10 @@ */ use actix_cors::Cors; +use arrow_schema::Schema; +use serde_json::Value; + +use self::{modal::query_server::QueryServer, query::Query}; pub(crate) mod about; pub(crate) mod health_check; @@ -33,11 +37,11 @@ pub(crate) mod rbac; pub(crate) mod role; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; -pub const API_BASE_PATH: &str = "/api"; +pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; pub(crate) fn base_path() -> String { - format!("{API_BASE_PATH}/{API_VERSION}") + format!("/{API_BASE_PATH}/{API_VERSION}") } pub fn metrics_path() -> String { @@ -53,5 +57,69 @@ pub(crate) fn cross_origin_config() -> Cors { } pub fn base_path_without_preceding_slash() -> String { - base_path().trim_start_matches('/').to_string() + format!("/{API_BASE_PATH}/{API_VERSION}") +} + +pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { + let mut res = vec![]; + let ima = QueryServer::get_ingester_info().await.unwrap(); + + for im in ima { + let uri = format!( + "{}{}/logstream/{}/schema", + im.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + let reqw = reqwest::Client::new() + .get(uri) + .header(http::header::AUTHORIZATION, im.token.clone()) + .header(http::header::CONTENT_TYPE, "application/json") + .send() + .await?; + + if reqw.status().is_success() { + let v = serde_json::from_slice(&reqw.bytes().await?)?; + res.push(v); + } + } + + let new_schema = Schema::try_merge(res)?; + + Ok(new_schema) +} + +pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result> { + // send the query request to the ingester + let mut res = vec![]; + let ima = QueryServer::get_ingester_info().await.unwrap(); + + for im in ima.iter() { + let uri = format!( + "{}{}/{}", + im.domain_name, + base_path_without_preceding_slash(), + "query" + ); + let reqw = reqwest::Client::new() + .post(uri) + .json(query) + .header(http::header::AUTHORIZATION, im.token.clone()) + .header(http::header::CONTENT_TYPE, "application/json") + .send() + .await?; + + if reqw.status().is_success() { + let v: Value = serde_json::from_slice(&reqw.bytes().await?)?; + // the value returned is an array of json objects + // so it needs to be flattened + if let Some(arr) = v.as_array() { + for val in arr { + res.push(val.to_owned()) + } + } + } + } + + Ok(res) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 7a3dcd78d..47a86c30b 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -126,29 +126,7 @@ impl IngestServer { .service(Server::get_about_factory()), ) .service(Server::get_liveness_factory()) - .service(Server::get_readiness_factory()) - .service(Self::get_metrics_webscope()); - } - - fn get_metrics_webscope() -> Scope { - web::scope("/logstream").service( - web::scope("/{logstream}") - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ), - ) + .service(Server::get_readiness_factory()); } fn logstream_api() -> Scope { @@ -176,6 +154,14 @@ impl IngestServer { ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) .service( // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream web::resource("/stats").route( diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 800539234..ea17975c8 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -35,11 +35,8 @@ use relative_path::RelativePathBuf; use reqwest::Response; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::io::AsyncWriteExt; use url::Url; -use tokio::fs::File as TokioFile; - use crate::option::CONFIG; use super::server::Server; @@ -170,10 +167,6 @@ impl QueryServer { // TODO: add validation logic here // validate the ingester metadata - - let mut f = Self::get_meta_file().await; - // writer the arr in f - let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; Ok(arr) } @@ -224,8 +217,11 @@ impl QueryServer { /// initialize the server, run migrations as needed and start the server async fn initialize(&self) -> anyhow::Result<()> { migration::run_metadata_migration(&CONFIG).await?; + let metadata = storage::resolve_parseable_metadata().await?; + // do not commit the below line tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; + banner::print(&CONFIG, &metadata).await; // initialize the rbac map @@ -276,17 +272,6 @@ impl QueryServer { } } - async fn get_meta_file() -> TokioFile { - let meta_path = CONFIG.staging_dir().join(".query.json"); - - tokio::fs::OpenOptions::new() - .read(true) - .write(true) - .open(meta_path) - .await - .unwrap() - } - // forward the request to all ingesters to keep them in sync pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { let ingester_infos = Self::get_ingester_info().await.map_err(|err| { diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index af5120c49..17f3435ff 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -26,18 +26,26 @@ use futures_util::Future; use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; +use std::sync::Arc; use std::time::Instant; +use crate::handlers::http::fetch_schema; + +use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; +use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::QUERY_SESSION; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; +use crate::storage::object_storage::commit_schema_to_storage; use crate::utils::actix::extract_session_key_from_req; +use super::send_query_request_to_ingester; + /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { query: String, @@ -52,11 +60,39 @@ pub struct Query { } pub async fn query(req: HttpRequest, query_request: Query) -> Result { - let creds = extract_session_key_from_req(&req).expect("expects basic auth"); - let permissions = Users.get_permissions(&creds); let session_state = QUERY_SESSION.state(); let mut query = into_query(&query_request, &session_state).await?; + if CONFIG.parseable.mode == Mode::Query { + if let Ok(new_schema) = fetch_schema(&query.table_name().unwrap()).await { + commit_schema_to_storage(&query.table_name().unwrap(), new_schema.clone()) + .await + .map_err(|err| { + QueryError::Custom(format!("Error committing schema to storage\nError:{err}")) + })?; + commit_schema(&query.table_name().unwrap(), Arc::new(new_schema)) + .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?; + } + } + + // ? run this code only if the query start time and now is less than 1 minute + margin + let mmem = if CONFIG.parseable.mode == Mode::Query { + // create a new query to send to the ingesters + if let Some(que) = transform_query_for_ingester(&query_request) { + let vals = send_query_request_to_ingester(&que) + .await + .map_err(|err| QueryError::Custom(err.to_string()))?; + Some(vals) + } else { + None + } + } else { + None + }; + + let creds = extract_session_key_from_req(&req).expect("expects basic auth"); + let permissions = Users.get_permissions(&creds); + // check authorization of this query if it references physical table; let table_name = query.table_name(); if let Some(ref table) = table_name { @@ -101,7 +137,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Option { + if query.query.is_empty() { + return None; + } + + if query.start_time.is_empty() { + return None; + } + + if query.end_time.is_empty() { + return None; + } + + let end_time: DateTime = if query.end_time == "now" { + Utc::now() + } else { + DateTime::parse_from_rfc3339(&query.end_time) + .ok()? + .with_timezone(&Utc) + }; + + let start_time = end_time - chrono::Duration::minutes(1); + // when transforming the query, the ingesters are forced to return an array of values + let q = Query { + query: query.query.clone(), + fields: false, + filter_tags: query.filter_tags.clone(), + send_null: query.send_null, + start_time: start_time.to_rfc3339(), + end_time: end_time.to_rfc3339(), + }; + + Some(q) +} + #[derive(Debug, thiserror::Error)] pub enum QueryError { #[error("Query cannot be empty")] @@ -207,6 +278,8 @@ pub enum QueryError { Datafusion(#[from] DataFusionError), #[error("Execution Error: {0}")] Execute(#[from] ExecuteError), + #[error("Error: {0}")] + Custom(String), } impl actix_web::ResponseError for QueryError { diff --git a/server/src/query.rs b/server/src/query.rs index e3f9d8dbc..83c346287 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; +use serde_json::{json, Value}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -51,6 +52,7 @@ pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); // A query request by client +#[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, pub start: DateTime, @@ -115,6 +117,10 @@ impl Query { .cloned() .collect_vec(); + if fields.is_empty() { + return Ok((vec![], fields)); + } + let results = df.collect().await?; Ok((results, fields)) } @@ -290,6 +296,45 @@ fn time_from_path(path: &Path) -> DateTime { .unwrap() } +pub fn flatten_objects_for_count(objects: Vec) -> Vec { + if objects.is_empty() { + return objects; + } + + // check if all the keys start with "COUNT" + let flag = objects.iter().all(|obj| { + obj.as_object() + .unwrap() + .keys() + .all(|key| key.starts_with("COUNT")) + }); + + if flag { + let mut accum = 0u64; + let key = objects[0] + .as_object() + .unwrap() + .keys() + .next() + .unwrap() + .clone(); + + for obj in objects { + let count = obj.as_object().unwrap().keys().fold(0, |acc, key| { + let value = obj.as_object().unwrap().get(key).unwrap().as_u64().unwrap(); + acc + value + }); + accum += count; + } + + vec![json!({ + key: accum + })] + } else { + objects + } +} + pub mod error { use crate::storage::ObjectStorageError; use datafusion::error::DataFusionError; @@ -305,6 +350,10 @@ pub mod error { #[cfg(test)] mod tests { + use serde_json::json; + + use crate::query::flatten_objects_for_count; + use super::time_from_path; use std::path::PathBuf; @@ -314,4 +363,75 @@ mod tests { let time = time_from_path(path.as_path()); assert_eq!(time.timestamp(), 1640995200); } + + #[test] + fn test_flat() { + let val = vec![ + json!({ + "COUNT(*)": 1 + }), + json!({ + "COUNT(*)": 2 + }), + json!({ + "COUNT(*)": 3 + }), + ]; + + let out = flatten_objects_for_count(val); + assert_eq!(out, vec![json!({"COUNT(*)": 6})]); + } + + #[test] + fn test_flat_empty() { + let val = vec![]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } + + #[test] + fn test_flat_single() { + let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(ALPHA)": 2})]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(vec![json!({"COUNT(ALPHA)": 3})], out); + } + + #[test] + fn test_flat_fail() { + let val = vec![ + json!({ + "Num": 1 + }), + json!({ + "Num": 2 + }), + json!({ + "Num": 3 + }), + ]; + + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } + + #[test] + fn test_flat_multi_key() { + let val = vec![ + json!({ + "Num": 1, + "COUNT(*)": 1 + }), + json!({ + "Num": 2, + "COUNT(*)": 2 + }), + json!({ + "Num": 3, + "COUNT(*)": 3 + }), + ]; + + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } } diff --git a/server/src/response.rs b/server/src/response.rs index 18b86d78f..6275864b5 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -22,6 +22,8 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; +use crate::query::flatten_objects_for_count; + pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -30,7 +32,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> impl Responder { + pub fn to_http(&self, imem: Option>) -> impl Responder { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json_rows(&records).unwrap(); @@ -43,7 +45,14 @@ impl QueryResponse { } } } - let values = json_records.into_iter().map(Value::Object).collect_vec(); + let mut values = json_records.into_iter().map(Value::Object).collect_vec(); + + if let Some(mut imem) = imem { + values.append(&mut imem); + } + + let values = flatten_objects_for_count(values); + let response = if self.with_fields { json!({ "fields": self.fields, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d0f07fab0..486fd8403 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -432,7 +432,7 @@ pub trait ObjectStorage: Sync + 'static { fn get_bucket_name(&self) -> String; } -async fn commit_schema_to_storage( +pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, ) -> Result<(), ObjectStorageError> { From a2d3cd4cb0db7d0fd3c821844e34e7e8b5eabedc Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 14 Mar 2024 17:52:36 +0530 Subject: [PATCH 2/4] fix: func flatten_objects_for_count Fix function flatten_objects_for_count Update Tests --- server/src/query.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 83c346287..81ca44fd6 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -307,6 +307,11 @@ pub fn flatten_objects_for_count(objects: Vec) -> Vec { .unwrap() .keys() .all(|key| key.starts_with("COUNT")) + }) && objects.iter().all(|obj| { + obj.as_object() + .unwrap() + .keys() + .all(|key| key == objects[0].as_object().unwrap().keys().next().unwrap()) }); if flag { @@ -365,7 +370,7 @@ mod tests { } #[test] - fn test_flat() { + fn test_flat_simple() { let val = vec![ json!({ "COUNT(*)": 1 @@ -390,12 +395,19 @@ mod tests { } #[test] - fn test_flat_single() { + fn test_flat_same_multi() { let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(ALPHA)": 2})]; let out = flatten_objects_for_count(val.clone()); assert_eq!(vec![json!({"COUNT(ALPHA)": 3})], out); } + #[test] + fn test_flat_diff_multi() { + let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(BETA)": 2})]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(out, val); + } + #[test] fn test_flat_fail() { let val = vec![ From f6f76cf597bd0b7592fc75beeff0fc4755252a05 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 15 Mar 2024 11:59:59 +0530 Subject: [PATCH 3/4] fix: base_path_without_preciding_slah function --- server/src/handlers/http.rs | 2 +- server/src/handlers/http/modal/query_server.rs | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 60534eb80..7068a7b73 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -57,7 +57,7 @@ pub(crate) fn cross_origin_config() -> Cors { } pub fn base_path_without_preceding_slash() -> String { - format!("/{API_BASE_PATH}/{API_VERSION}") + format!("{API_BASE_PATH}/{API_VERSION}") } pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index ea17975c8..615a2a9d9 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -309,6 +309,7 @@ impl QueryServer { stream_name ); + // roll back the stream creation Self::send_stream_rollback_request(&url, ingester.clone()).await?; } @@ -322,6 +323,7 @@ impl QueryServer { Ok(()) } + /// get the cumulative stats from all ingesters pub async fn fetch_stats_from_ingesters( stream_name: &str, ) -> Result { @@ -376,6 +378,7 @@ impl QueryServer { Ok(stats) } + /// send a request to the ingester to fetch its stats async fn send_stats_request( url: &str, ingester: IngesterMetadata, @@ -473,6 +476,7 @@ impl QueryServer { Ok(()) } + /// send a rollback request to all ingesters async fn send_stream_rollback_request( url: &str, ingester: IngesterMetadata, @@ -489,6 +493,7 @@ impl QueryServer { .send() .await .map_err(|err| { + // log the error and return a custom error log::error!( "Fatal: failed to rollback stream creation: {}\n Error: {:?}", ingester.domain_name, @@ -503,6 +508,8 @@ impl QueryServer { } })?; + // if the response is not successful, log the error and return a custom error + // this could be a bit too much, but we need to be sure it covers all cases if !resp.status().is_success() { log::error!( "failed to rollback stream creation: {}\nResponse Returned: {:?}", From 9e1ec9cc033e25f71656d87e82446cd964b36176 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 15 Mar 2024 18:21:08 +0530 Subject: [PATCH 4/4] fix: Session Context not in sync 1. Session Context was not being synced at the time of schema update 2. Make struct TableScanVisitor pub at crate level --- server/src/handlers/http/query.rs | 21 ++++++++++++++++----- server/src/query.rs | 4 ++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 17f3435ff..5f539ce49 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -20,6 +20,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; use chrono::{DateTime, Utc}; +use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; @@ -35,7 +36,7 @@ use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; -use crate::query::QUERY_SESSION; +use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -61,20 +62,30 @@ pub struct Query { pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); - let mut query = into_query(&query_request, &session_state).await?; + + // get the logical plan and extract the table name + let raw_logical_plan = session_state + .create_logical_plan(&query_request.query) + .await?; + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + let table_name = visitor.into_inner().pop().unwrap(); if CONFIG.parseable.mode == Mode::Query { - if let Ok(new_schema) = fetch_schema(&query.table_name().unwrap()).await { - commit_schema_to_storage(&query.table_name().unwrap(), new_schema.clone()) + if let Ok(new_schema) = fetch_schema(&table_name).await { + commit_schema_to_storage(&table_name, new_schema.clone()) .await .map_err(|err| { QueryError::Custom(format!("Error committing schema to storage\nError:{err}")) })?; - commit_schema(&query.table_name().unwrap(), Arc::new(new_schema)) + commit_schema(&table_name, Arc::new(new_schema)) .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?; } } + let mut query = into_query(&query_request, &session_state).await?; + // ? run this code only if the query start time and now is less than 1 minute + margin let mmem = if CONFIG.parseable.mode == Mode::Query { // create a new query to send to the ingesters diff --git a/server/src/query.rs b/server/src/query.rs index 81ca44fd6..d94d72125 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -162,12 +162,12 @@ impl Query { } #[derive(Debug, Default)] -struct TableScanVisitor { +pub(crate) struct TableScanVisitor { tables: Vec, } impl TableScanVisitor { - fn into_inner(self) -> Vec { + pub fn into_inner(self) -> Vec { self.tables } }