diff --git a/Cargo.lock b/Cargo.lock
index 722287c71..b74617386 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2698,6 +2698,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
+ "serde_repr",
"sha1_smol",
"static-files",
"sysinfo",
@@ -3333,6 +3334,17 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serde_repr"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.37",
+]
+
[[package]]
name = "serde_spanned"
version = "0.6.1"
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 9a05bf1b7..50a651858 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -99,6 +99,7 @@ humantime = "2.1.0"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
+serde_repr = "0.1.17"
[build-dependencies]
cargo_toml = "0.15"
diff --git a/server/src/catalog.rs b/server/src/catalog.rs
new file mode 100644
index 000000000..f07179908
--- /dev/null
+++ b/server/src/catalog.rs
@@ -0,0 +1,171 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::sync::Arc;
+
+use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
+use relative_path::RelativePathBuf;
+
+use crate::{
+ catalog::manifest::Manifest,
+ query::PartialTimeFilter,
+ storage::{ObjectStorage, ObjectStorageError},
+};
+
+use self::{column::Column, snapshot::ManifestItem};
+
+pub mod column;
+pub mod manifest;
+pub mod snapshot;
+
+pub use manifest::create_from_parquet_file;
+
+pub trait Snapshot {
+ fn manifests(&self, time_predicates: Vec) -> Vec;
+}
+
+pub trait ManifestFile {
+ fn file_name(&self) -> &str;
+ fn ingestion_size(&self) -> u64;
+ fn file_size(&self) -> u64;
+ fn num_rows(&self) -> u64;
+ fn columns(&self) -> &[Column];
+}
+
+impl ManifestFile for manifest::File {
+ fn file_name(&self) -> &str {
+ &self.file_path
+ }
+
+ fn ingestion_size(&self) -> u64 {
+ self.ingestion_size
+ }
+
+ fn file_size(&self) -> u64 {
+ self.file_size
+ }
+
+ fn num_rows(&self) -> u64 {
+ self.num_rows
+ }
+
+ fn columns(&self) -> &[Column] {
+ self.columns.as_slice()
+ }
+}
+
+pub async fn update_snapshot(
+ storage: Arc,
+ stream_name: &str,
+ change: manifest::File,
+) -> Result<(), ObjectStorageError> {
+ fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) {
+ match file
+ .columns()
+ .iter()
+ .find(|col| col.name == "p_timestamp")
+ .unwrap()
+ .stats
+ .clone()
+ .unwrap()
+ {
+ column::TypedStatistics::Int(stats) => (
+ NaiveDateTime::from_timestamp_millis(stats.min)
+ .unwrap()
+ .and_utc(),
+ NaiveDateTime::from_timestamp_millis(stats.min)
+ .unwrap()
+ .and_utc(),
+ ),
+ _ => unreachable!(),
+ }
+ }
+
+ // get current snapshot
+ let mut meta = storage.get_snapshot(stream_name).await?;
+ let manifests = &mut meta.manifest_list;
+
+ let (lower_bound, _) = get_file_bounds(&change);
+ let pos = manifests.iter().position(|item| {
+ item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
+ });
+
+ // We update the manifest referenced by this position
+ // This updates an existing file so there is no need to create a snapshot entry.
+ if let Some(pos) = pos {
+ let info = &mut manifests[pos];
+ let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
+ let Some(mut manifest) = storage.get_manifest(&path).await? else {
+ return Err(ObjectStorageError::UnhandledError(
+ "Manifest found in snapshot but not in object-storage"
+ .to_string()
+ .into(),
+ ));
+ };
+ manifest.apply_change(change);
+ storage.put_manifest(&path, manifest).await?;
+ } else {
+ let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
+ let upper_bound = lower_bound
+ .date_naive()
+ .and_time(
+ NaiveTime::from_num_seconds_from_midnight_opt(
+ 23 * 3600 + 59 * 60 + 59,
+ 999_999_999,
+ )
+ .unwrap(),
+ )
+ .and_utc();
+
+ let manifest = Manifest {
+ files: vec![change],
+ ..Manifest::default()
+ };
+
+ let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
+ storage
+ .put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
+ .await?;
+ let path = storage.absolute_url(&path);
+ let new_snapshot_entriy = snapshot::ManifestItem {
+ manifest_path: path.to_string(),
+ time_lower_bound: lower_bound,
+ time_upper_bound: upper_bound,
+ };
+ manifests.push(new_snapshot_entriy);
+ storage.put_snapshot(stream_name, meta).await?;
+ }
+
+ Ok(())
+}
+
+/// Partition the path to which this manifest belongs.
+/// Useful when uploading the manifest file.
+fn partition_path(
+ stream: &str,
+ lower_bound: DateTime,
+ upper_bound: DateTime,
+) -> RelativePathBuf {
+ let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string();
+ let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string();
+ if lower == upper {
+ RelativePathBuf::from_iter([stream, &format!("date={}", lower)])
+ } else {
+ RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)])
+ }
+}
diff --git a/server/src/catalog/column.rs b/server/src/catalog/column.rs
new file mode 100644
index 000000000..17361609a
--- /dev/null
+++ b/server/src/catalog/column.rs
@@ -0,0 +1,146 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::cmp::{max, min};
+
+use parquet::file::statistics::Statistics;
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct BoolType {
+ pub min: bool,
+ pub max: bool,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct Float64Type {
+ pub min: f64,
+ pub max: f64,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct Int64Type {
+ pub min: i64,
+ pub max: i64,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct Utf8Type {
+ pub min: String,
+ pub max: String,
+}
+
+// Typed statistics are typed variant of statistics
+// Currently all parquet types are casted down to these 4 types
+// Binary types are assumed to be of valid Utf8
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub enum TypedStatistics {
+ Bool(BoolType),
+ Int(Int64Type),
+ Float(Float64Type),
+ String(Utf8Type),
+}
+
+impl TypedStatistics {
+ pub fn update(self, other: Self) -> Self {
+ match (self, other) {
+ (TypedStatistics::Bool(this), TypedStatistics::Bool(other)) => {
+ TypedStatistics::Bool(BoolType {
+ min: min(this.min, other.min),
+ max: max(this.max, other.max),
+ })
+ }
+ (TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
+ TypedStatistics::Float(Float64Type {
+ min: this.min.min(other.min),
+ max: this.max.max(other.max),
+ })
+ }
+ (TypedStatistics::Int(this), TypedStatistics::Int(other)) => {
+ TypedStatistics::Int(Int64Type {
+ min: min(this.min, other.min),
+ max: max(this.max, other.max),
+ })
+ }
+ (TypedStatistics::String(this), TypedStatistics::String(other)) => {
+ TypedStatistics::String(Utf8Type {
+ min: min(this.min, other.min),
+ max: max(this.max, other.max),
+ })
+ }
+ _ => panic!("Cannot update wrong types"),
+ }
+ }
+}
+
+/// Column statistics are used to track statistics for a column in a given file.
+/// This is similar to and derived from parquet statistics.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct Column {
+ pub name: String,
+ pub stats: Option,
+ pub uncompressed_size: u64,
+ pub compressed_size: u64,
+}
+
+impl TryFrom<&Statistics> for TypedStatistics {
+ type Error = parquet::errors::ParquetError;
+ fn try_from(value: &Statistics) -> Result {
+ if !value.has_min_max_set() {
+ return Err(parquet::errors::ParquetError::General(
+ "min max is not set".to_string(),
+ ));
+ }
+
+ let res = match value {
+ Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
+ min: *stats.min(),
+ max: *stats.max(),
+ }),
+ Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
+ min: *stats.min() as i64,
+ max: *stats.max() as i64,
+ }),
+ Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
+ min: *stats.min(),
+ max: *stats.max(),
+ }),
+ Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
+ min: stats.min().to_i64(),
+ max: stats.max().to_i64(),
+ }),
+ Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
+ min: *stats.min() as f64,
+ max: *stats.max() as f64,
+ }),
+ Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
+ min: *stats.min(),
+ max: *stats.max(),
+ }),
+ Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
+ min: stats.min().as_utf8()?.to_owned(),
+ max: stats.max().as_utf8()?.to_owned(),
+ }),
+ Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
+ min: stats.min().as_utf8()?.to_owned(),
+ max: stats.max().as_utf8()?.to_owned(),
+ }),
+ };
+
+ Ok(res)
+ }
+}
diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs
new file mode 100644
index 000000000..f0696fc89
--- /dev/null
+++ b/server/src/catalog/manifest.rs
@@ -0,0 +1,190 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::collections::HashMap;
+
+use itertools::Itertools;
+use parquet::{file::reader::FileReader, format::SortingColumn};
+
+use super::column::Column;
+
+#[derive(
+ Debug,
+ Default,
+ Clone,
+ Copy,
+ PartialEq,
+ Eq,
+ serde_repr::Serialize_repr,
+ serde_repr::Deserialize_repr,
+)]
+#[repr(u8)]
+pub enum SortOrder {
+ AscNullsFirst = 0,
+ AscNullsLast,
+ DescNullsLast,
+ #[default]
+ DescNullsFirst,
+}
+
+pub type SortInfo = (String, SortOrder);
+
+/// An entry in a manifest which points to a single file.
+/// Additionally, it is meant to store the statistics for the file it
+/// points to. Used for pruning file at planning level.
+#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
+pub struct File {
+ pub file_path: String,
+ pub num_rows: u64,
+ pub file_size: u64,
+ pub ingestion_size: u64,
+ pub columns: Vec,
+ pub sort_order_id: Vec,
+}
+
+/// A manifest file composed of multiple file entries.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct Manifest {
+ pub version: String,
+ pub files: Vec,
+}
+
+impl Default for Manifest {
+ fn default() -> Self {
+ Self {
+ version: "v1".to_string(),
+ files: Vec::default(),
+ }
+ }
+}
+
+impl Manifest {
+ pub fn apply_change(&mut self, change: File) {
+ if let Some(pos) = self
+ .files
+ .iter()
+ .position(|file| file.file_path == change.file_path)
+ {
+ self.files[pos] = change
+ } else {
+ self.files.push(change)
+ }
+ }
+}
+
+pub fn create_from_parquet_file(
+ object_store_path: String,
+ fs_file_path: &std::path::Path,
+) -> anyhow::Result {
+ let mut manifest_file = File {
+ file_path: object_store_path,
+ ..File::default()
+ };
+
+ let file = std::fs::File::open(fs_file_path)?;
+ let file = parquet::file::serialized_reader::SerializedFileReader::new(file)?;
+ let file_meta = file.metadata().file_metadata();
+ let row_groups = file.metadata().row_groups();
+
+ manifest_file.num_rows = file_meta.num_rows() as u64;
+ manifest_file.ingestion_size = row_groups
+ .iter()
+ .fold(0, |acc, x| acc + x.total_byte_size() as u64);
+ manifest_file.file_size = row_groups
+ .iter()
+ .fold(0, |acc, x| acc + x.compressed_size() as u64);
+
+ let columns = column_statistics(row_groups);
+ manifest_file.columns = columns.into_values().collect();
+ let mut sort_orders = sort_order(row_groups);
+
+ if let Some(last_sort_order) = sort_orders.pop() {
+ if sort_orders
+ .into_iter()
+ .all(|sort_order| sort_order == last_sort_order)
+ {
+ manifest_file.sort_order_id = last_sort_order;
+ }
+ }
+
+ Ok(manifest_file)
+}
+
+fn sort_order(
+ row_groups: &[parquet::file::metadata::RowGroupMetaData],
+) -> Vec> {
+ let mut sort_orders = Vec::new();
+ for row_group in row_groups {
+ let sort_order = row_group.sorting_columns().unwrap();
+ let sort_order = sort_order
+ .iter()
+ .map(|sort_order| {
+ let SortingColumn {
+ column_idx,
+ descending,
+ nulls_first,
+ } = sort_order;
+ let col = row_group
+ .column(*column_idx as usize)
+ .column_descr()
+ .path()
+ .string();
+ let sort_info = match (descending, nulls_first) {
+ (true, true) => SortOrder::DescNullsFirst,
+ (true, false) => SortOrder::DescNullsLast,
+ (false, true) => SortOrder::AscNullsFirst,
+ (false, false) => SortOrder::AscNullsLast,
+ };
+
+ (col, sort_info)
+ })
+ .collect_vec();
+
+ sort_orders.push(sort_order)
+ }
+ sort_orders
+}
+
+fn column_statistics(
+ row_groups: &[parquet::file::metadata::RowGroupMetaData],
+) -> HashMap {
+ let mut columns: HashMap = HashMap::new();
+ for row_group in row_groups {
+ for col in row_group.columns() {
+ let col_name = col.column_descr().path().string();
+ if let Some(entry) = columns.get_mut(&col_name) {
+ entry.compressed_size += col.compressed_size() as u64;
+ entry.uncompressed_size += col.uncompressed_size() as u64;
+ if let Some(other) = col.statistics().and_then(|stats| stats.try_into().ok()) {
+ entry.stats = entry.stats.clone().map(|this| this.update(other));
+ }
+ } else {
+ columns.insert(
+ col_name.clone(),
+ Column {
+ name: col_name,
+ stats: col.statistics().and_then(|stats| stats.try_into().ok()),
+ uncompressed_size: col.uncompressed_size() as u64,
+ compressed_size: col.compressed_size() as u64,
+ },
+ );
+ }
+ }
+ }
+ columns
+}
diff --git a/server/src/catalog/snapshot.rs b/server/src/catalog/snapshot.rs
new file mode 100644
index 000000000..4c0877207
--- /dev/null
+++ b/server/src/catalog/snapshot.rs
@@ -0,0 +1,78 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::ops::Bound;
+
+use chrono::{DateTime, Utc};
+
+use crate::query::PartialTimeFilter;
+
+#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+pub struct Snapshot {
+ pub version: String,
+ pub manifest_list: Vec,
+}
+
+impl Default for Snapshot {
+ fn default() -> Self {
+ Self {
+ version: "v1".to_string(),
+ manifest_list: Vec::default(),
+ }
+ }
+}
+
+impl super::Snapshot for Snapshot {
+ fn manifests(&self, time_predicates: Vec) -> Vec {
+ let mut manifests = self.manifest_list.clone();
+ for predicate in time_predicates {
+ match predicate {
+ PartialTimeFilter::Low(Bound::Included(time)) => manifests.retain(|item| {
+ let time = time.and_utc();
+ item.time_upper_bound >= time
+ }),
+ PartialTimeFilter::Low(Bound::Excluded(time)) => manifests.retain(|item| {
+ let time = time.and_utc();
+ item.time_upper_bound > time
+ }),
+ PartialTimeFilter::High(Bound::Included(time)) => manifests.retain(|item| {
+ let time = time.and_utc();
+ item.time_lower_bound <= time
+ }),
+ PartialTimeFilter::High(Bound::Excluded(time)) => manifests.retain(|item| {
+ let time = time.and_utc();
+ item.time_lower_bound < time
+ }),
+ PartialTimeFilter::Eq(time) => manifests.retain(|item| {
+ let time = time.and_utc();
+ item.time_lower_bound <= time && time <= item.time_upper_bound
+ }),
+ _ => (),
+ }
+ }
+
+ manifests
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+pub struct ManifestItem {
+ pub manifest_path: String,
+ pub time_lower_bound: DateTime,
+ pub time_upper_bound: DateTime,
+}
diff --git a/server/src/handlers.rs b/server/src/handlers.rs
index 294fc40a8..3d6409dc2 100644
--- a/server/src/handlers.rs
+++ b/server/src/handlers.rs
@@ -22,7 +22,6 @@ pub mod livetail;
const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
-const FILL_NULL_OPTION_KEY: &str = "send_null";
const SEPARATOR: char = '^';
const OIDC_SCOPE: &str = "openid profile email";
diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs
index b41120912..39de3ddee 100644
--- a/server/src/handlers/http/query.rs
+++ b/server/src/handlers/http/query.rs
@@ -16,56 +16,99 @@
*
*/
-use actix_web::error::ErrorUnauthorized;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
+use chrono::{DateTime, Utc};
+use datafusion::error::DataFusionError;
+use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::StatusCode;
-use serde_json::Value;
use std::collections::HashMap;
use std::pin::Pin;
use std::time::Instant;
-use crate::handlers::FILL_NULL_OPTION_KEY;
use crate::metrics::QUERY_EXECUTE_TIME;
-use crate::option::CONFIG;
-use crate::query::error::{ExecuteError, ParseError};
-use crate::query::Query;
+use crate::query::error::ExecuteError;
+use crate::query::QUERY_SESSION;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::utils::actix::extract_session_key_from_req;
-pub async fn query(
- query: Query,
- web::Query(params): web::Query>,
-) -> Result {
+/// Query Request through http endpoint.
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Query {
+ query: String,
+ start_time: String,
+ end_time: String,
+ #[serde(default)]
+ send_null: bool,
+ #[serde(skip)]
+ fields: bool,
+ #[serde(skip)]
+ filter_tags: Option>,
+}
+
+pub async fn query(req: HttpRequest, query_request: Query) -> Result {
+ let creds = extract_session_key_from_req(&req).expect("expects basic auth");
+ let permissions = Users.get_permissions(&creds);
+ let session_state = QUERY_SESSION.state();
+ let mut query = into_query(&query_request, &session_state).await?;
+
+ // check authorization of this query if it references physical table;
+ let table_name = query.table_name();
+ if let Some(ref table) = table_name {
+ let mut authorized = false;
+ let mut tags = Vec::new();
+
+ // in permission check if user can run query on the stream.
+ // also while iterating add any filter tags for this stream
+ for permission in permissions {
+ match permission {
+ Permission::Stream(Action::All, _) => {
+ authorized = true;
+ break;
+ }
+ Permission::StreamWithTag(Action::Query, ref stream, tag)
+ if stream == table || stream == "*" =>
+ {
+ authorized = true;
+ if let Some(tag) = tag {
+ tags.push(tag)
+ }
+ }
+ _ => (),
+ }
+ }
+
+ if !authorized {
+ return Err(QueryError::Unauthorized);
+ }
+
+ if !tags.is_empty() {
+ query.filter_tag = Some(tags)
+ }
+ }
+
let time = Instant::now();
- // format output json to include field names
- let with_fields = params.get("fields").cloned().unwrap_or(false);
- // Fill missing columns with null
- let fill_null = params
- .get("fillNull")
- .cloned()
- .or(Some(query.fill_null))
- .unwrap_or(false);
-
- let storage = CONFIG.storage().get_object_store();
- let (records, fields) = query.execute(storage).await?;
+ let (records, fields) = query.execute().await?;
let response = QueryResponse {
records,
fields,
- fill_null,
- with_fields,
+ fill_null: query_request.send_null,
+ with_fields: query_request.fields,
}
.to_http();
- let time = time.elapsed().as_secs_f64();
- QUERY_EXECUTE_TIME
- .with_label_values(&[query.stream_name.as_str()])
- .observe(time);
+ if let Some(table) = table_name {
+ let time = time.elapsed().as_secs_f64();
+ QUERY_EXECUTE_TIME
+ .with_label_values(&[&table])
+ .observe(time);
+ }
Ok(response)
}
@@ -75,48 +118,19 @@ impl FromRequest for Query {
type Future = Pin>>>;
fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future {
- let creds = extract_session_key_from_req(req).expect("expects basic auth");
- let permissions = Users.get_permissions(&creds);
- let json = Json::::from_request(req, payload);
+ let query = Json::::from_request(req, payload);
+ let params = web::Query::>::from_request(req, payload)
+ .into_inner()
+ .map(|x| x.0)
+ .unwrap_or_default();
let fut = async move {
- let json = json.await?.into_inner();
- // maybe move this option to query param instead so that it can simply be extracted from request
- let fill_null = json
- .as_object()
- .and_then(|map| map.get(FILL_NULL_OPTION_KEY))
- .and_then(|value| value.as_bool())
- .unwrap_or_default();
-
- let mut query = Query::parse(json).map_err(QueryError::Parse)?;
- query.fill_null = fill_null;
-
- let mut authorized = false;
- let mut tags = Vec::new();
-
- // in permission check if user can run query on the stream.
- // also while iterating add any filter tags for this stream
- for permission in permissions {
- match permission {
- Permission::Stream(Action::All, _) => authorized = true,
- Permission::StreamWithTag(Action::Query, stream, tag)
- if stream == query.stream_name || stream == "*" =>
- {
- authorized = true;
- if let Some(tag) = tag {
- tags.push(tag)
- }
- }
- _ => (),
- }
- }
-
- if !authorized {
- return Err(ErrorUnauthorized("Not authorized"));
- }
+ let mut query = query.await?.into_inner();
+ // format output json to include field names
+ query.fields = params.get("fields").cloned().unwrap_or(false);
- if !tags.is_empty() {
- query.filter_tag = Some(tags)
+ if !query.send_null {
+ query.send_null = params.get("sendNull").cloned().unwrap_or(false);
}
Ok(query)
@@ -126,10 +140,71 @@ impl FromRequest for Query {
}
}
+async fn into_query(
+ query: &Query,
+ session_state: &SessionState,
+) -> Result {
+ if query.query.is_empty() {
+ return Err(QueryError::EmptyQuery);
+ }
+
+ if query.start_time.is_empty() {
+ return Err(QueryError::EmptyStartTime);
+ }
+
+ if query.end_time.is_empty() {
+ return Err(QueryError::EmptyEndTime);
+ }
+
+ let start: DateTime;
+ let end: DateTime;
+
+ if query.end_time == "now" {
+ end = Utc::now();
+ start = end - chrono::Duration::from_std(humantime::parse_duration(&query.start_time)?)?;
+ } else {
+ start = DateTime::parse_from_rfc3339(&query.start_time)
+ .map_err(|_| QueryError::StartTimeParse)?
+ .into();
+ end = DateTime::parse_from_rfc3339(&query.end_time)
+ .map_err(|_| QueryError::EndTimeParse)?
+ .into();
+ };
+
+ if start.timestamp() > end.timestamp() {
+ return Err(QueryError::StartTimeAfterEndTime);
+ }
+
+ Ok(crate::query::Query {
+ raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
+ start,
+ end,
+ filter_tag: query.filter_tags.clone(),
+ })
+}
+
#[derive(Debug, thiserror::Error)]
pub enum QueryError {
- #[error("Bad request: {0}")]
- Parse(#[from] ParseError),
+ #[error("Query cannot be empty")]
+ EmptyQuery,
+ #[error("Start time cannot be empty")]
+ EmptyStartTime,
+ #[error("End time cannot be empty")]
+ EmptyEndTime,
+ #[error("Could not parse start time correctly")]
+ StartTimeParse,
+ #[error("Could not parse end time correctly")]
+ EndTimeParse,
+ #[error("While generating times for 'now' failed to parse duration")]
+ NotValidDuration(#[from] humantime::DurationError),
+ #[error("Parsed duration out of range")]
+ OutOfRange(#[from] chrono::OutOfRangeError),
+ #[error("Start time cannot be greater than the end time")]
+ StartTimeAfterEndTime,
+ #[error("Unauthorized")]
+ Unauthorized,
+ #[error("Datafusion Error: {0}")]
+ Datafusion(#[from] DataFusionError),
#[error("Query execution failed due to {0}")]
Execute(#[from] ExecuteError),
}
@@ -137,8 +212,8 @@ pub enum QueryError {
impl actix_web::ResponseError for QueryError {
fn status_code(&self) -> http::StatusCode {
match self {
- QueryError::Parse(_) => StatusCode::BAD_REQUEST,
QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ _ => StatusCode::BAD_REQUEST,
}
}
diff --git a/server/src/main.rs b/server/src/main.rs
index 74e2e523b..0d2bee0b3 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -29,6 +29,7 @@ mod about;
mod alerts;
mod analytics;
mod banner;
+mod catalog;
mod event;
mod handlers;
mod livetail;
diff --git a/server/src/query.rs b/server/src/query.rs
index f7b2102e5..cc1e6ef54 100644
--- a/server/src/query.rs
+++ b/server/src/query.rs
@@ -17,86 +17,54 @@
*/
mod filter_optimizer;
+mod stream_schema_provider;
mod table_provider;
+use chrono::TimeZone;
use chrono::{DateTime, Utc};
-use chrono::{TimeZone, Timelike};
-use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
+
+use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
+use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
use datafusion::prelude::*;
-use futures_util::stream::FuturesUnordered;
-use futures_util::{future, Future, TryStreamExt};
use itertools::Itertools;
-use object_store::path::Path as StorePath;
-use object_store::{ObjectMeta, ObjectStore};
-use serde_json::Value;
+use once_cell::sync::Lazy;
use std::collections::HashMap;
+use std::ops::Add;
use std::path::{Path, PathBuf};
-use std::pin::Pin;
use std::sync::Arc;
use sysinfo::{System, SystemExt};
-use crate::event::DEFAULT_TIMESTAMP_KEY;
+use crate::event;
use crate::option::CONFIG;
-use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
-use crate::storage::{ObjectStorageError, StorageDir};
-use crate::utils::TimePeriod;
-use crate::validator;
-
-use self::error::{ExecuteError, ParseError};
-use self::filter_optimizer::FilterOptimizerRule;
-use self::table_provider::QueryTableProvider;
-
-type Key = &'static str;
-fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
- value.get(key).and_then(|value| value.as_str()).ok_or(key)
-}
+use crate::storage::{ObjectStorageProvider, StorageDir};
+
+use self::error::ExecuteError;
+
+use self::stream_schema_provider::GlobalSchemaProvider;
+pub use self::stream_schema_provider::PartialTimeFilter;
+
+pub static QUERY_SESSION: Lazy =
+ Lazy::new(|| Query::create_session_context(CONFIG.storage()));
-// Query holds all values relevant to a query for a single log stream
+// A query request by client
pub struct Query {
- pub query: String,
- pub stream_name: String,
- pub schema: Arc,
+ pub raw_logical_plan: LogicalPlan,
pub start: DateTime,
pub end: DateTime,
pub filter_tag: Option>,
- pub fill_null: bool,
}
impl Query {
- // parse_query parses the SQL query and returns the log stream name on which
- // this query is supposed to be executed
- pub fn parse(query_json: Value) -> Result {
- // retrieve query, start and end time information from payload.
- let query = get_value(&query_json, "query")?;
- let start_time = get_value(&query_json, "startTime")?;
- let end_time = get_value(&query_json, "endTime")?;
-
- Ok(validator::query(query, start_time, end_time)?)
- }
-
- /// Return prefixes, each per day/hour/minutes as necessary
- fn generate_prefixes(&self) -> Vec {
- TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes()
- }
-
- fn get_prefixes(&self) -> Vec {
- self.generate_prefixes()
- .into_iter()
- .map(|key| format!("{}/{}", self.stream_name, key))
- .collect()
- }
-
// create session context for this query
- fn create_session_context(&self) -> SessionContext {
- let config = SessionConfig::default();
- let runtime_config = CONFIG
- .storage()
+ pub fn create_session_context(
+ storage: Arc,
+ ) -> SessionContext {
+ let runtime_config = storage
.get_datafusion_runtime()
.with_disk_manager(DiskManagerConfig::NewOs);
@@ -112,56 +80,34 @@ impl Query {
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
- let mut state = SessionState::new_with_config_rt(config, runtime);
-
- if let Some(tag) = &self.filter_tag {
- let filter = FilterOptimizerRule {
- column: crate::event::DEFAULT_TAGS_KEY.to_string(),
- literals: tag.clone(),
- };
- state = state.add_optimizer_rule(Arc::new(filter))
- }
-
- SessionContext::new_with_state(state)
- }
- /// Execute query on object storage(and if necessary on cache as well) with given stream information
- /// TODO: find a way to query all selected parquet files together in a single context.
- pub async fn execute(
- &self,
- storage: Arc,
- ) -> Result<(Vec, Vec), ExecuteError> {
- let ctx = self.create_session_context();
- let unresolved_prefixes = self.get_prefixes();
- let client = ctx
- .runtime_env()
- .object_store(Box::new(storage.store_url()))
+ let config = SessionConfig::default()
+ .with_parquet_pruning(true)
+ .with_prefer_existing_sort(true)
+ .with_round_robin_repartition(true);
+
+ let state = SessionState::new_with_config_rt(config, runtime);
+ let schema_provider = Arc::new(GlobalSchemaProvider {
+ storage: storage.get_object_store(),
+ });
+ state
+ .catalog_list()
+ .catalog(&state.config_options().catalog.default_catalog)
+ .expect("default catalog is provided by datafusion")
+ .register_schema(
+ &state.config_options().catalog.default_schema,
+ schema_provider,
+ )
.unwrap();
- let prefixes =
- resolve_paths(client, storage.normalize_prefixes(unresolved_prefixes)).await?;
-
- let remote_listing_table = self.remote_query(prefixes, storage)?;
- let current_minute = Utc::now()
- .with_second(0)
- .and_then(|x| x.with_nanosecond(0))
- .expect("zeroed value is valid");
-
- let memtable = if self.end > current_minute {
- crate::event::STREAM_WRITERS.recordbatches_cloned(&self.stream_name, &self.schema)
- } else {
- None
- };
+ SessionContext::new_with_state(state)
+ }
- let table =
- QueryTableProvider::try_new(memtable, remote_listing_table, self.schema.clone())?;
+ pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> {
+ let df = QUERY_SESSION
+ .execute_logical_plan(self.final_logical_plan())
+ .await?;
- ctx.register_table(&*self.stream_name, Arc::new(table))
- .map_err(ObjectStorageError::DataFusionError)?;
- // execute the query and collect results
- let df = ctx.sql(self.query.as_str()).await?;
- // dataframe qualifies name by adding table name before columns. \
- // For now this is just actual names
let fields = df
.schema()
.fields()
@@ -171,34 +117,123 @@ impl Query {
.collect_vec();
let results = df.collect().await?;
-
Ok((results, fields))
}
- fn remote_query(
- &self,
- prefixes: Vec,
- storage: Arc,
- ) -> Result