diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 1d6f0b3ac..7068a7b73 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -17,6 +17,10 @@ */ use actix_cors::Cors; +use arrow_schema::Schema; +use serde_json::Value; + +use self::{modal::query_server::QueryServer, query::Query}; pub(crate) mod about; pub(crate) mod health_check; @@ -33,11 +37,11 @@ pub(crate) mod rbac; pub(crate) mod role; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; -pub const API_BASE_PATH: &str = "/api"; +pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; pub(crate) fn base_path() -> String { - format!("{API_BASE_PATH}/{API_VERSION}") + format!("/{API_BASE_PATH}/{API_VERSION}") } pub fn metrics_path() -> String { @@ -53,5 +57,69 @@ pub(crate) fn cross_origin_config() -> Cors { } pub fn base_path_without_preceding_slash() -> String { - base_path().trim_start_matches('/').to_string() + format!("{API_BASE_PATH}/{API_VERSION}") +} + +pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { + let mut res = vec![]; + let ima = QueryServer::get_ingester_info().await.unwrap(); + + for im in ima { + let uri = format!( + "{}{}/logstream/{}/schema", + im.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + let reqw = reqwest::Client::new() + .get(uri) + .header(http::header::AUTHORIZATION, im.token.clone()) + .header(http::header::CONTENT_TYPE, "application/json") + .send() + .await?; + + if reqw.status().is_success() { + let v = serde_json::from_slice(&reqw.bytes().await?)?; + res.push(v); + } + } + + let new_schema = Schema::try_merge(res)?; + + Ok(new_schema) +} + +pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result> { + // send the query request to the ingester + let mut res = vec![]; + let ima = QueryServer::get_ingester_info().await.unwrap(); + + for im in ima.iter() { + let uri = format!( + "{}{}/{}", + im.domain_name, + base_path_without_preceding_slash(), + "query" + ); + let reqw = reqwest::Client::new() + .post(uri) + .json(query) + .header(http::header::AUTHORIZATION, im.token.clone()) + .header(http::header::CONTENT_TYPE, "application/json") + .send() + .await?; + + if reqw.status().is_success() { + let v: Value = serde_json::from_slice(&reqw.bytes().await?)?; + // the value returned is an array of json objects + // so it needs to be flattened + if let Some(arr) = v.as_array() { + for val in arr { + res.push(val.to_owned()) + } + } + } + } + + Ok(res) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 7a3dcd78d..47a86c30b 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -126,29 +126,7 @@ impl IngestServer { .service(Server::get_about_factory()), ) .service(Server::get_liveness_factory()) - .service(Server::get_readiness_factory()) - .service(Self::get_metrics_webscope()); - } - - fn get_metrics_webscope() -> Scope { - web::scope("/logstream").service( - web::scope("/{logstream}") - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ), - ) + .service(Server::get_readiness_factory()); } fn logstream_api() -> Scope { @@ -176,6 +154,14 @@ impl IngestServer { ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) .service( // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream web::resource("/stats").route( diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 800539234..615a2a9d9 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -35,11 +35,8 @@ use relative_path::RelativePathBuf; use reqwest::Response; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::io::AsyncWriteExt; use url::Url; -use tokio::fs::File as TokioFile; - use crate::option::CONFIG; use super::server::Server; @@ -170,10 +167,6 @@ impl QueryServer { // TODO: add validation logic here // validate the ingester metadata - - let mut f = Self::get_meta_file().await; - // writer the arr in f - let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; Ok(arr) } @@ -224,8 +217,11 @@ impl QueryServer { /// initialize the server, run migrations as needed and start the server async fn initialize(&self) -> anyhow::Result<()> { migration::run_metadata_migration(&CONFIG).await?; + let metadata = storage::resolve_parseable_metadata().await?; + // do not commit the below line tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; + banner::print(&CONFIG, &metadata).await; // initialize the rbac map @@ -276,17 +272,6 @@ impl QueryServer { } } - async fn get_meta_file() -> TokioFile { - let meta_path = CONFIG.staging_dir().join(".query.json"); - - tokio::fs::OpenOptions::new() - .read(true) - .write(true) - .open(meta_path) - .await - .unwrap() - } - // forward the request to all ingesters to keep them in sync pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { let ingester_infos = Self::get_ingester_info().await.map_err(|err| { @@ -324,6 +309,7 @@ impl QueryServer { stream_name ); + // roll back the stream creation Self::send_stream_rollback_request(&url, ingester.clone()).await?; } @@ -337,6 +323,7 @@ impl QueryServer { Ok(()) } + /// get the cumulative stats from all ingesters pub async fn fetch_stats_from_ingesters( stream_name: &str, ) -> Result { @@ -391,6 +378,7 @@ impl QueryServer { Ok(stats) } + /// send a request to the ingester to fetch its stats async fn send_stats_request( url: &str, ingester: IngesterMetadata, @@ -488,6 +476,7 @@ impl QueryServer { Ok(()) } + /// send a rollback request to all ingesters async fn send_stream_rollback_request( url: &str, ingester: IngesterMetadata, @@ -504,6 +493,7 @@ impl QueryServer { .send() .await .map_err(|err| { + // log the error and return a custom error log::error!( "Fatal: failed to rollback stream creation: {}\n Error: {:?}", ingester.domain_name, @@ -518,6 +508,8 @@ impl QueryServer { } })?; + // if the response is not successful, log the error and return a custom error + // this could be a bit too much, but we need to be sure it covers all cases if !resp.status().is_success() { log::error!( "failed to rollback stream creation: {}\nResponse Returned: {:?}", diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index af5120c49..5f539ce49 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -20,24 +20,33 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; use chrono::{DateTime, Utc}; +use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; +use std::sync::Arc; use std::time::Instant; +use crate::handlers::http::fetch_schema; + +use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; +use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; -use crate::query::QUERY_SESSION; +use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; +use crate::storage::object_storage::commit_schema_to_storage; use crate::utils::actix::extract_session_key_from_req; +use super::send_query_request_to_ingester; + /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { query: String, @@ -52,11 +61,49 @@ pub struct Query { } pub async fn query(req: HttpRequest, query_request: Query) -> Result { - let creds = extract_session_key_from_req(&req).expect("expects basic auth"); - let permissions = Users.get_permissions(&creds); let session_state = QUERY_SESSION.state(); + + // get the logical plan and extract the table name + let raw_logical_plan = session_state + .create_logical_plan(&query_request.query) + .await?; + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + let table_name = visitor.into_inner().pop().unwrap(); + + if CONFIG.parseable.mode == Mode::Query { + if let Ok(new_schema) = fetch_schema(&table_name).await { + commit_schema_to_storage(&table_name, new_schema.clone()) + .await + .map_err(|err| { + QueryError::Custom(format!("Error committing schema to storage\nError:{err}")) + })?; + commit_schema(&table_name, Arc::new(new_schema)) + .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?; + } + } + let mut query = into_query(&query_request, &session_state).await?; + // ? run this code only if the query start time and now is less than 1 minute + margin + let mmem = if CONFIG.parseable.mode == Mode::Query { + // create a new query to send to the ingesters + if let Some(que) = transform_query_for_ingester(&query_request) { + let vals = send_query_request_to_ingester(&que) + .await + .map_err(|err| QueryError::Custom(err.to_string()))?; + Some(vals) + } else { + None + } + } else { + None + }; + + let creds = extract_session_key_from_req(&req).expect("expects basic auth"); + let permissions = Users.get_permissions(&creds); + // check authorization of this query if it references physical table; let table_name = query.table_name(); if let Some(ref table) = table_name { @@ -101,7 +148,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Option { + if query.query.is_empty() { + return None; + } + + if query.start_time.is_empty() { + return None; + } + + if query.end_time.is_empty() { + return None; + } + + let end_time: DateTime = if query.end_time == "now" { + Utc::now() + } else { + DateTime::parse_from_rfc3339(&query.end_time) + .ok()? + .with_timezone(&Utc) + }; + + let start_time = end_time - chrono::Duration::minutes(1); + // when transforming the query, the ingesters are forced to return an array of values + let q = Query { + query: query.query.clone(), + fields: false, + filter_tags: query.filter_tags.clone(), + send_null: query.send_null, + start_time: start_time.to_rfc3339(), + end_time: end_time.to_rfc3339(), + }; + + Some(q) +} + #[derive(Debug, thiserror::Error)] pub enum QueryError { #[error("Query cannot be empty")] @@ -207,6 +289,8 @@ pub enum QueryError { Datafusion(#[from] DataFusionError), #[error("Execution Error: {0}")] Execute(#[from] ExecuteError), + #[error("Error: {0}")] + Custom(String), } impl actix_web::ResponseError for QueryError { diff --git a/server/src/query.rs b/server/src/query.rs index e3f9d8dbc..d94d72125 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; +use serde_json::{json, Value}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -51,6 +52,7 @@ pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); // A query request by client +#[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, pub start: DateTime, @@ -115,6 +117,10 @@ impl Query { .cloned() .collect_vec(); + if fields.is_empty() { + return Ok((vec![], fields)); + } + let results = df.collect().await?; Ok((results, fields)) } @@ -156,12 +162,12 @@ impl Query { } #[derive(Debug, Default)] -struct TableScanVisitor { +pub(crate) struct TableScanVisitor { tables: Vec, } impl TableScanVisitor { - fn into_inner(self) -> Vec { + pub fn into_inner(self) -> Vec { self.tables } } @@ -290,6 +296,50 @@ fn time_from_path(path: &Path) -> DateTime { .unwrap() } +pub fn flatten_objects_for_count(objects: Vec) -> Vec { + if objects.is_empty() { + return objects; + } + + // check if all the keys start with "COUNT" + let flag = objects.iter().all(|obj| { + obj.as_object() + .unwrap() + .keys() + .all(|key| key.starts_with("COUNT")) + }) && objects.iter().all(|obj| { + obj.as_object() + .unwrap() + .keys() + .all(|key| key == objects[0].as_object().unwrap().keys().next().unwrap()) + }); + + if flag { + let mut accum = 0u64; + let key = objects[0] + .as_object() + .unwrap() + .keys() + .next() + .unwrap() + .clone(); + + for obj in objects { + let count = obj.as_object().unwrap().keys().fold(0, |acc, key| { + let value = obj.as_object().unwrap().get(key).unwrap().as_u64().unwrap(); + acc + value + }); + accum += count; + } + + vec![json!({ + key: accum + })] + } else { + objects + } +} + pub mod error { use crate::storage::ObjectStorageError; use datafusion::error::DataFusionError; @@ -305,6 +355,10 @@ pub mod error { #[cfg(test)] mod tests { + use serde_json::json; + + use crate::query::flatten_objects_for_count; + use super::time_from_path; use std::path::PathBuf; @@ -314,4 +368,82 @@ mod tests { let time = time_from_path(path.as_path()); assert_eq!(time.timestamp(), 1640995200); } + + #[test] + fn test_flat_simple() { + let val = vec![ + json!({ + "COUNT(*)": 1 + }), + json!({ + "COUNT(*)": 2 + }), + json!({ + "COUNT(*)": 3 + }), + ]; + + let out = flatten_objects_for_count(val); + assert_eq!(out, vec![json!({"COUNT(*)": 6})]); + } + + #[test] + fn test_flat_empty() { + let val = vec![]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } + + #[test] + fn test_flat_same_multi() { + let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(ALPHA)": 2})]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(vec![json!({"COUNT(ALPHA)": 3})], out); + } + + #[test] + fn test_flat_diff_multi() { + let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(BETA)": 2})]; + let out = flatten_objects_for_count(val.clone()); + assert_eq!(out, val); + } + + #[test] + fn test_flat_fail() { + let val = vec![ + json!({ + "Num": 1 + }), + json!({ + "Num": 2 + }), + json!({ + "Num": 3 + }), + ]; + + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } + + #[test] + fn test_flat_multi_key() { + let val = vec![ + json!({ + "Num": 1, + "COUNT(*)": 1 + }), + json!({ + "Num": 2, + "COUNT(*)": 2 + }), + json!({ + "Num": 3, + "COUNT(*)": 3 + }), + ]; + + let out = flatten_objects_for_count(val.clone()); + assert_eq!(val, out); + } } diff --git a/server/src/response.rs b/server/src/response.rs index 18b86d78f..6275864b5 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -22,6 +22,8 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; +use crate::query::flatten_objects_for_count; + pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -30,7 +32,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> impl Responder { + pub fn to_http(&self, imem: Option>) -> impl Responder { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json_rows(&records).unwrap(); @@ -43,7 +45,14 @@ impl QueryResponse { } } } - let values = json_records.into_iter().map(Value::Object).collect_vec(); + let mut values = json_records.into_iter().map(Value::Object).collect_vec(); + + if let Some(mut imem) = imem { + values.append(&mut imem); + } + + let values = flatten_objects_for_count(values); + let response = if self.with_fields { json!({ "fields": self.fields, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d0f07fab0..486fd8403 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -432,7 +432,7 @@ pub trait ObjectStorage: Sync + 'static { fn get_bucket_name(&self) -> String; } -async fn commit_schema_to_storage( +pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, ) -> Result<(), ObjectStorageError> {