diff --git a/src/cli.rs b/src/cli.rs index 6770c17a7..a319bb161 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -23,7 +23,8 @@ use url::Url; use crate::{ oidc::{self, OpenidConfig}, - option::{validation, Compression, Mode}, storage::{AzureBlobConfig, FSConfig, S3Config}, + option::{validation, Compression, Mode}, + storage::{AzureBlobConfig, FSConfig, S3Config}, }; #[cfg(any( @@ -43,7 +44,6 @@ use std::string::String as KafkaSslProtocol; pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; - #[derive(Parser)] #[command( name = "parseable", @@ -81,11 +81,11 @@ pub struct Cli { #[derive(Parser)] pub enum StorageOptions { #[command(name = "local-store")] - Local(LocalStoreArgs), - + Local(LocalStoreArgs), + #[command(name = "s3-store")] S3(S3StoreArgs), - + #[command(name = "blob-store")] Blob(BlobStoreArgs), } @@ -125,7 +125,7 @@ pub struct Options { // Server configuration #[arg( - long, + long, env = "P_ADDR", default_value = "0.0.0.0:8000", value_parser = validation::socket_addr, @@ -381,13 +381,13 @@ pub struct Options { )] pub audit_logger: Option, - #[arg(long ,env = "P_AUDIT_USERNAME", help = "Audit logger username")] + #[arg(long, env = "P_AUDIT_USERNAME", help = "Audit logger username")] pub audit_username: Option, - #[arg(long ,env = "P_AUDIT_PASSWORD", help = "Audit logger password")] + #[arg(long, env = "P_AUDIT_PASSWORD", help = "Audit logger password")] pub audit_password: Option, - #[arg(long ,env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] + #[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] pub ms_clarity_tag: Option, } @@ -405,7 +405,11 @@ impl Options { } pub fn openid(&self) -> Option { - match (&self.oidc_client_id, &self.oidc_client_secret, &self.oidc_issuer) { + match ( + &self.oidc_client_id, + &self.oidc_client_secret, + &self.oidc_issuer, + ) { (Some(id), Some(secret), Some(issuer)) => { let origin = if let Some(url) = self.domain_address.clone() { oidc::Origin::Production(url) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 9ebca9358..cc94e0f59 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -52,6 +52,7 @@ impl ParseableServer for QueryServer { config .service( web::scope(&base_path()) + .service(Server::get_date_bin()) .service(Server::get_correlation_webscope()) .service(Server::get_query_factory()) .service(Server::get_liveness_factory()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index b0bde3250..b82a9ddf1 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -80,6 +80,7 @@ impl ParseableServer for Server { .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) + .service(Self::get_date_bin()) .service(Self::get_metrics_webscope()), ) .service(Self::get_ingest_otel_factory()) @@ -266,6 +267,10 @@ impl Server { ), ) } + pub fn get_date_bin() -> Resource { + web::resource("/datebin") + .route(web::post().to(query::get_date_bin).authorize(Action::Query)) + } // get the query factory // POST "/query" ==> Get results of the SQL query passed in request body diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index c2a361dff..d9201a4d7 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -18,13 +18,15 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, Responder}; +use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -39,7 +41,7 @@ use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; -use crate::query::Query as LogicalQuery; +use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -52,7 +54,7 @@ use crate::utils::user_auth_for_query; use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, @@ -66,7 +68,7 @@ pub struct Query { pub filter_tags: Option>, } -pub async fn query(req: HttpRequest, query_request: Query) -> Result { +pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); let raw_logical_plan = match session_state .create_logical_plan(&query_request.query) @@ -81,11 +83,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result, +) -> Result { + let creds = extract_session_key_from_req(&req)?; + let permissions = Users.get_permissions(&creds); + + // does user have access to table? + user_auth_for_query(&permissions, &[date_bin.stream.clone()])?; + + let date_bin_records = date_bin.get_bin_density().await?; + + Ok(web::Json(DateBinResponse { + fields: vec!["date_bin_timestamp".into(), "log_count".into()], + records: date_bin_records, + })) +} + pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), QueryError> { if CONFIG.options.mode == Mode::Query { for table in tables { diff --git a/src/query/mod.rs b/src/query/mod.rs index 1542b1455..ea9361357 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -20,7 +20,7 @@ mod filter_optimizer; mod listing_table_builder; pub mod stream_schema_provider; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use chrono::{NaiveDateTime, TimeZone}; use datafusion::arrow::record_batch::RecordBatch; @@ -28,25 +28,36 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tr use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::SessionStateBuilder; -use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan}; +use datafusion::logical_expr::expr::Alias; +use datafusion::logical_expr::{ + Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, +}; use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; +use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::Arc; +use stream_schema_provider::collect_manifest_files; use sysinfo::System; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; +use crate::catalog::column::{Int64Type, TypedStatistics}; +use crate::catalog::manifest::Manifest; +use crate::catalog::snapshot::Snapshot; +use crate::catalog::Snapshot as CatalogSnapshot; use crate::event; +use crate::handlers::http::query::QueryError; use crate::metadata::STREAM_INFO; -use crate::option::CONFIG; -use crate::storage::{ObjectStorageProvider, StorageDir}; +use crate::option::{Mode, CONFIG}; +use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, StorageDir, STREAM_ROOT_DIRECTORY}; use crate::utils::time::TimeRange; - pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); @@ -193,6 +204,180 @@ impl Query { let _ = self.raw_logical_plan.visit(&mut visitor); visitor.into_inner().pop() } + + /// Evaluates to Some("count(*)") | Some("column_name") if the logical plan is a Projection: SELECT COUNT(*) | SELECT COUNT(*) as column_name + pub fn is_logical_plan_count_without_filters(&self) -> Option<&String> { + // Check if the raw logical plan is a Projection: SELECT + let LogicalPlan::Projection(Projection { input, expr, .. }) = &self.raw_logical_plan else { + return None; + }; + // Check if the input of the Projection is an Aggregate: COUNT(*) + let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input else { + return None; + }; + + // Ensure the input of the Aggregate is a TableScan and there is exactly one expression: SELECT COUNT(*) + if !matches!(&**input, LogicalPlan::TableScan { .. }) || expr.len() != 1 { + return None; + } + + // Check if the expression is a column or an alias for COUNT(*) + match &expr[0] { + // Direct column check + Expr::Column(Column { name, .. }) if name.to_lowercase() == "count(*)" => Some(name), + // Alias for COUNT(*) + Expr::Alias(Alias { + expr: inner_expr, + name: alias_name, + .. + }) => { + if let Expr::Column(Column { name, .. }) = &**inner_expr { + if name.to_lowercase() == "count(*)" { + return Some(alias_name); + } + } + None + } + // Unsupported expression type + _ => None, + } + } +} + +/// DateBinRecord +#[derive(Debug, Serialize, Clone)] +pub struct DateBinRecord { + pub date_bin_timestamp: String, + pub log_count: u64, +} + +struct DateBinBounds { + start: DateTime, + end: DateTime, +} + +/// DateBin Request. +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DateBinRequest { + pub stream: String, + pub start_time: String, + pub end_time: String, + pub num_bins: u64, +} + +impl DateBinRequest { + /// This function is supposed to read maninfest files for the given stream, + /// get the sum of `num_rows` between the `startTime` and `endTime`, + /// divide that by number of bins and return in a manner acceptable for the console + pub async fn get_bin_density(&self) -> Result, QueryError> { + let time_partition = STREAM_INFO + .get_time_partition(&self.stream.clone()) + .map_err(|err| anyhow::Error::msg(err.to_string()))? + .unwrap_or_else(|| event::DEFAULT_TIMESTAMP_KEY.to_owned()); + + // get time range + let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; + let all_manifest_files = get_manifest_list(&self.stream, &time_range).await?; + // get final date bins + let final_date_bins = self.get_bins(&time_range); + + // we have start and end times for each bin + // we also have all the manifest files for the given time range + // now we iterate over start and end times for each bin + // then we iterate over the manifest files which are within that time range + // we sum up the num_rows + let mut date_bin_records = Vec::new(); + + for bin in final_date_bins { + // extract start and end time to compare + let date_bin_timestamp = bin.start.timestamp_millis(); + + // Sum up the number of rows that fall within the bin + let total_num_rows: u64 = all_manifest_files + .iter() + .flat_map(|m| &m.files) + .filter_map(|f| { + if f.columns.iter().any(|c| { + c.name == time_partition + && c.stats.as_ref().is_some_and(|stats| match stats { + TypedStatistics::Int(Int64Type { min, .. }) => { + let min = DateTime::from_timestamp_millis(*min).unwrap(); + bin.start <= min && bin.end >= min // Determines if a column matches the bin's time range. + } + _ => false, + }) + }) { + Some(f.num_rows) + } else { + None + } + }) + .sum(); + + date_bin_records.push(DateBinRecord { + date_bin_timestamp: DateTime::from_timestamp_millis(date_bin_timestamp) + .unwrap() + .to_rfc3339(), + log_count: total_num_rows, + }); + } + Ok(date_bin_records) + } + + /// Calculate the end time for each bin based on the number of bins + fn get_bins(&self, time_range: &TimeRange) -> Vec { + let total_minutes = time_range + .end + .signed_duration_since(time_range.start) + .num_minutes() as u64; + + // divide minutes by num bins to get minutes per bin + let quotient = total_minutes / self.num_bins; + let remainder = total_minutes % self.num_bins; + let have_remainder = remainder > 0; + + // now create multiple bins [startTime, endTime) + // Should we exclude the last one??? + let mut final_date_bins = vec![]; + + let mut start = time_range.start; + + let loop_end = if have_remainder { + self.num_bins + } else { + self.num_bins - 1 + }; + + // Create bins for all but the last date + for _ in 0..loop_end { + let end = start + Duration::minutes(quotient as i64); + final_date_bins.push(DateBinBounds { start, end }); + start = end; + } + + // Add the last bin, accounting for any remainder, should we include it? + if have_remainder { + final_date_bins.push(DateBinBounds { + start, + end: start + Duration::minutes(remainder as i64), + }); + } else { + final_date_bins.push(DateBinBounds { + start, + end: start + Duration::minutes(quotient as i64), + }); + } + + final_date_bins + } +} + +/// DateBin Response. +#[derive(Debug, Serialize, Clone)] +pub struct DateBinResponse { + pub fields: Vec, + pub records: Vec, } #[derive(Debug, Default)] @@ -224,6 +409,72 @@ impl TreeNodeVisitor<'_> for TableScanVisitor { } } +pub async fn get_manifest_list( + stream_name: &str, + time_range: &TimeRange, +) -> Result, QueryError> { + let glob_storage = CONFIG.storage().get_object_store(); + + let object_store = QUERY_SESSION + .state() + .runtime_env() + .object_store_registry + .get_store(&glob_storage.store_url()) + .unwrap(); + + // get object store + let object_store_format = glob_storage + .get_object_store_format(stream_name) + .await + .map_err(|err| DataFusionError::Plan(err.to_string()))?; + + // all the manifests will go here + let mut merged_snapshot: Snapshot = Snapshot::default(); + + // get a list of manifests + if CONFIG.options.mode == Mode::Query { + let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let obs = glob_storage + .get_objects( + Some(&path), + Box::new(|file_name| file_name.ends_with("stream.json")), + ) + .await; + if let Ok(obs) = obs { + for ob in obs { + if let Ok(object_store_format) = serde_json::from_slice::(&ob) { + let snapshot = object_store_format.snapshot; + for manifest in snapshot.manifest_list { + merged_snapshot.manifest_list.push(manifest); + } + } + } + } + } else { + merged_snapshot = object_store_format.snapshot; + } + + // Download all the manifest files + let time_filter = [ + PartialTimeFilter::Low(Bound::Included(time_range.start.naive_utc())), + PartialTimeFilter::High(Bound::Included(time_range.end.naive_utc())), + ]; + + let all_manifest_files = collect_manifest_files( + object_store, + merged_snapshot + .manifests(&time_filter) + .into_iter() + .sorted_by_key(|file| file.time_lower_bound) + .map(|item| item.manifest_path) + .collect(), + ) + .await + .map_err(|err| anyhow::Error::msg(err.to_string()))?; + + Ok(all_manifest_files) +} + fn transform( plan: LogicalPlan, start_time: NaiveDateTime, diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index eacc87315..13510b56e 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -799,7 +799,7 @@ fn extract_timestamp_bound( } } -async fn collect_manifest_files( +pub async fn collect_manifest_files( storage: Arc, manifest_urls: Vec, ) -> Result, object_store::Error> { diff --git a/src/response.rs b/src/response.rs index ab89ffead..bbab052ae 100644 --- a/src/response.rs +++ b/src/response.rs @@ -23,7 +23,7 @@ use crate::{ record_batches_to_json, }, }; -use actix_web::{web, Responder}; +use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; @@ -38,7 +38,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> Result { + pub fn to_http(&self) -> Result { info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json(&records)?; @@ -63,7 +63,7 @@ impl QueryResponse { Value::Array(values) }; - Ok(web::Json(response)) + Ok(HttpResponse::Ok().json(response)) } #[allow(dead_code)]