From b24170da589d34aa4f3b270653c4fe06a5fad46c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 13 Nov 2023 13:46:44 +0530 Subject: [PATCH 1/6] Introduce custom table catalog format This PR introduces a new table format that keeps track of data files in the data storage. The format is inspired by Apache Iceberg so has similar naming scheme for things. A snapshot is the main entry point to a table. A snapshot consists of list of list of url to manifest file and primary time stats for pruning. A manifest file contains list of all the actual files present along with the file level statistics. Currently a manifest file is generated per top level partition ( i.e date ). --- Cargo.lock | 12 + server/Cargo.toml | 1 + server/src/catalog.rs | 187 ++++++++ server/src/catalog/column.rs | 141 ++++++ server/src/catalog/manifest.rs | 188 ++++++++ server/src/catalog/snapshot.rs | 85 ++++ server/src/handlers.rs | 1 - server/src/handlers/http/query.rs | 214 ++++++--- server/src/main.rs | 1 + server/src/query.rs | 376 +++++++-------- server/src/query/stream_schema_provider.rs | 520 +++++++++++++++++++++ server/src/storage.rs | 5 +- server/src/storage/localfs.rs | 13 +- server/src/storage/object_storage.rs | 90 +++- server/src/storage/s3.rs | 5 +- server/src/utils.rs | 2 + server/src/utils/arrow/reverse_reader.rs | 4 +- server/src/validator.rs | 108 +---- 18 files changed, 1533 insertions(+), 420 deletions(-) create mode 100644 server/src/catalog.rs create mode 100644 server/src/catalog/column.rs create mode 100644 server/src/catalog/manifest.rs create mode 100644 server/src/catalog/snapshot.rs create mode 100644 server/src/query/stream_schema_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 722287c71..b74617386 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2698,6 +2698,7 @@ dependencies = [ "semver", "serde", "serde_json", + "serde_repr", "sha1_smol", "static-files", "sysinfo", @@ -3333,6 +3334,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "serde_spanned" version = "0.6.1" diff --git a/server/Cargo.toml b/server/Cargo.toml index 9a05bf1b7..50a651858 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -99,6 +99,7 @@ humantime = "2.1.0" openid = { version = "0.12.0", default-features = false, features = ["rustls"] } url = "2.4.0" http-auth-basic = "0.3.3" +serde_repr = "0.1.17" [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/catalog.rs b/server/src/catalog.rs new file mode 100644 index 000000000..b6c6ec99a --- /dev/null +++ b/server/src/catalog.rs @@ -0,0 +1,187 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashMap, sync::Arc}; + +use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc}; +use relative_path::RelativePathBuf; +use ulid::Ulid; + +use crate::{ + catalog::manifest::Manifest, + query::PartialTimeFilter, + storage::{ObjectStorage, ObjectStorageError}, +}; + +use self::{column::Column, snapshot::ManifestItem}; + +pub mod column; +pub mod manifest; +pub mod snapshot; + +pub use manifest::create_from_parquet_file; + +pub trait Snapshot { + fn id(&self) -> Ulid; + fn manifests(&self, time_predicates: Vec) -> Vec; +} + +pub trait ManifestFile { + fn file_name(&self) -> &str; + fn ingestion_size(&self) -> u64; + fn file_size(&self) -> u64; + fn num_rows(&self) -> u64; + fn columns(&self) -> &[Column]; +} + +impl ManifestFile for manifest::File { + fn file_name(&self) -> &str { + &self.file_path + } + + fn ingestion_size(&self) -> u64 { + self.ingestion_size + } + + fn file_size(&self) -> u64 { + self.file_size + } + + fn num_rows(&self) -> u64 { + self.num_rows + } + + fn columns(&self) -> &[Column] { + self.columns.as_slice() + } +} + +pub async fn update_snapshot( + storage: Arc, + stream_name: &str, + changes: Vec, +) -> Result<(), ObjectStorageError> { + fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { + match file + .columns() + .iter() + .find(|col| col.name == "p_timestamp") + .unwrap() + .stats + .clone() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } + } + + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + let mut change_map: HashMap, Vec> = HashMap::new(); + + for change in changes { + let (change_lower_bound, _) = get_file_bounds(&change); + + let pos = manifests.iter().position(|item| { + item.time_lower_bound <= change_lower_bound + && change_lower_bound < item.time_upper_bound + }); + change_map.entry(pos).or_default().push(change); + } + + let mut new_entries = Vec::new(); + for (pos, changes) in change_map { + let Some(pos) = pos else { + new_entries.extend(changes); + continue; + }; + let info = &mut manifests[pos]; + let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); + let Some(mut manifest) = storage.get_manifest(&path).await? else { + new_entries.extend(changes); + continue; + }; + + manifest.apply_change(changes); + storage.put_manifest(&path, manifest).await?; + } + + let mut new_snapshot_entries = Vec::new(); + + for entry in new_entries { + let (lower_bound, _) = get_file_bounds(&entry); + let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let upper_bound = lower_bound + .date_naive() + .and_time( + NaiveTime::from_num_seconds_from_midnight_opt( + 23 * 3600 + 59 * 60 + 59, + 999_999_999, + ) + .unwrap(), + ) + .and_utc(); + + let manifest = Manifest { + files: vec![entry], + ..Manifest::default() + }; + + let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json"); + storage + .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) + .await?; + + let path = storage.absolute_url(&path); + new_snapshot_entries.push(snapshot::ManifestItem { + manifest_path: path.to_string(), + time_lower_bound: lower_bound, + time_upper_bound: upper_bound, + }) + } + + manifests.extend(new_snapshot_entries); + + storage.put_snapshot(stream_name, meta).await +} + +// partition path to which this manifest belongs. +// is useful when uploading manifest file +fn partition_path( + stream: &str, + lower_bound: DateTime, + upper_bound: DateTime, +) -> RelativePathBuf { + let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string(); + let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string(); + if lower == upper { + RelativePathBuf::from_iter([stream, &format!("date={}", lower)]) + } else { + RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)]) + } +} diff --git a/server/src/catalog/column.rs b/server/src/catalog/column.rs new file mode 100644 index 000000000..305d7492f --- /dev/null +++ b/server/src/catalog/column.rs @@ -0,0 +1,141 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::cmp::{max, min}; + +use parquet::file::statistics::Statistics; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BoolType { + pub min: bool, + pub max: bool, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Float64Type { + pub min: f64, + pub max: f64, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Int64Type { + pub min: i64, + pub max: i64, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Utf8Type { + pub min: String, + pub max: String, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum TypedStatistics { + Bool(BoolType), + Int(Int64Type), + Float(Float64Type), + String(Utf8Type), +} + +impl TypedStatistics { + pub fn update(self, other: Self) -> Self { + match (self, other) { + (TypedStatistics::Bool(this), TypedStatistics::Bool(other)) => { + TypedStatistics::Bool(BoolType { + min: min(this.min, other.min), + max: max(this.max, other.max), + }) + } + (TypedStatistics::Float(this), TypedStatistics::Float(other)) => { + TypedStatistics::Float(Float64Type { + min: this.min.min(other.min), + max: this.max.max(other.max), + }) + } + (TypedStatistics::Int(this), TypedStatistics::Int(other)) => { + TypedStatistics::Int(Int64Type { + min: min(this.min, other.min), + max: max(this.max, other.max), + }) + } + (TypedStatistics::String(this), TypedStatistics::String(other)) => { + TypedStatistics::String(Utf8Type { + min: min(this.min, other.min), + max: max(this.max, other.max), + }) + } + _ => panic!("Cannot update wrong types"), + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Column { + pub name: String, + pub stats: Option, + pub uncompressed_size: u64, + pub compressed_size: u64, +} + +impl TryFrom<&Statistics> for TypedStatistics { + type Error = parquet::errors::ParquetError; + fn try_from(value: &Statistics) -> Result { + if !value.has_min_max_set() { + return Err(parquet::errors::ParquetError::General( + "min max is not set".to_string(), + )); + } + + let res = match value { + Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType { + min: *stats.min(), + max: *stats.max(), + }), + Statistics::Int32(stats) => TypedStatistics::Int(Int64Type { + min: *stats.min() as i64, + max: *stats.max() as i64, + }), + Statistics::Int64(stats) => TypedStatistics::Int(Int64Type { + min: *stats.min(), + max: *stats.max(), + }), + Statistics::Int96(stats) => TypedStatistics::Int(Int64Type { + min: stats.min().to_i64(), + max: stats.max().to_i64(), + }), + Statistics::Float(stats) => TypedStatistics::Float(Float64Type { + min: *stats.min() as f64, + max: *stats.max() as f64, + }), + Statistics::Double(stats) => TypedStatistics::Float(Float64Type { + min: *stats.min(), + max: *stats.max(), + }), + Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type { + min: stats.min().as_utf8()?.to_owned(), + max: stats.max().as_utf8()?.to_owned(), + }), + Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type { + min: stats.min().as_utf8()?.to_owned(), + max: stats.max().as_utf8()?.to_owned(), + }), + }; + + Ok(res) + } +} diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs new file mode 100644 index 000000000..6dda79281 --- /dev/null +++ b/server/src/catalog/manifest.rs @@ -0,0 +1,188 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashMap; + +use itertools::Itertools; +use parquet::{file::reader::FileReader, format::SortingColumn}; + +use super::column::Column; + +#[derive( + Debug, + Default, + Clone, + Copy, + PartialEq, + Eq, + serde_repr::Serialize_repr, + serde_repr::Deserialize_repr, +)] +#[repr(u8)] +pub enum SortOrder { + AscNullsFirst = 0, + AscNullsLast, + DescNullsLast, + #[default] + DescNullsFirst, +} + +pub type SortInfo = (String, SortOrder); + +#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] +pub struct File { + pub file_path: String, + pub num_rows: u64, + pub file_size: u64, + pub ingestion_size: u64, + pub columns: Vec, + pub sort_order_id: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Manifest { + pub version: String, + pub files: Vec, +} + +impl Default for Manifest { + fn default() -> Self { + Self { + version: "v1".to_string(), + files: Vec::default(), + } + } +} + +impl Manifest { + pub fn apply_change(&mut self, changes: Vec) { + for change in changes { + if let Some(pos) = self + .files + .iter() + .position(|file| file.file_path == change.file_path) + { + self.files[pos] = change + } else { + self.files.push(change) + } + } + } +} + +pub fn create_from_parquet_file( + object_store_path: String, + fs_file_path: &std::path::Path, +) -> anyhow::Result { + let mut manifest_file = File { + file_path: object_store_path, + ..File::default() + }; + + let file = std::fs::File::open(fs_file_path)?; + let file = parquet::file::serialized_reader::SerializedFileReader::new(file)?; + let file_meta = file.metadata().file_metadata(); + let row_groups = file.metadata().row_groups(); + + manifest_file.num_rows = file_meta.num_rows() as u64; + manifest_file.ingestion_size = row_groups + .iter() + .fold(0, |acc, x| acc + x.total_byte_size() as u64); + manifest_file.file_size = row_groups + .iter() + .fold(0, |acc, x| acc + x.compressed_size() as u64); + + let columns = column_statistics(row_groups); + manifest_file.columns = columns.into_values().collect(); + let mut sort_orders = sort_order(row_groups); + + if let Some(last_sort_order) = sort_orders.pop() { + if sort_orders + .into_iter() + .all(|sort_order| sort_order == last_sort_order) + { + manifest_file.sort_order_id = last_sort_order; + } + } + + Ok(manifest_file) +} + +fn sort_order( + row_groups: &[parquet::file::metadata::RowGroupMetaData], +) -> Vec> { + let mut sort_orders = Vec::new(); + for row_group in row_groups { + let sort_order = row_group.sorting_columns().unwrap(); + let sort_order = sort_order + .iter() + .map(|sort_order| { + let SortingColumn { + column_idx, + descending, + nulls_first, + } = sort_order; + let col = row_group + .column(*column_idx as usize) + .column_descr() + .path() + .string(); + let sort_info = match (descending, nulls_first) { + (true, true) => SortOrder::DescNullsFirst, + (true, false) => SortOrder::DescNullsLast, + (false, true) => SortOrder::AscNullsFirst, + (false, false) => SortOrder::AscNullsLast, + }; + + (col, sort_info) + }) + .collect_vec(); + + sort_orders.push(sort_order) + } + sort_orders +} + +fn column_statistics( + row_groups: &[parquet::file::metadata::RowGroupMetaData], +) -> HashMap { + let mut columns: HashMap = HashMap::new(); + for row_group in row_groups { + for col in row_group.columns() { + let col_name = col.column_descr().path().string(); + if let Some(entry) = columns.get_mut(&col_name) { + entry.compressed_size += col.compressed_size() as u64; + entry.uncompressed_size += col.uncompressed_size() as u64; + if let Some(other) = col.statistics().and_then(|stats| stats.try_into().ok()) { + entry.stats = entry.stats.clone().map(|this| this.update(other)); + } + } else { + columns.insert( + col_name.clone(), + Column { + name: col_name, + stats: col.statistics().and_then(|stats| stats.try_into().ok()), + uncompressed_size: col.uncompressed_size() as u64, + compressed_size: col.compressed_size() as u64, + }, + ); + } + } + } + columns +} diff --git a/server/src/catalog/snapshot.rs b/server/src/catalog/snapshot.rs new file mode 100644 index 000000000..cb2ad1d36 --- /dev/null +++ b/server/src/catalog/snapshot.rs @@ -0,0 +1,85 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::ops::Bound; + +use chrono::{DateTime, Utc}; +use ulid::Ulid; + +use crate::query::PartialTimeFilter; + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Snapshot { + pub version: String, + pub id: Ulid, + pub manifest_list: Vec, +} + +impl Default for Snapshot { + fn default() -> Self { + Self { + version: "v1".to_string(), + id: Ulid::new(), + manifest_list: Vec::default(), + } + } +} + +impl super::Snapshot for Snapshot { + fn id(&self) -> Ulid { + self.id + } + + fn manifests(&self, time_predicates: Vec) -> Vec { + let mut manifests = self.manifest_list.clone(); + for predicate in time_predicates { + match predicate { + PartialTimeFilter::Low(Bound::Included(time)) => manifests.retain(|item| { + let time = time.and_utc(); + item.time_upper_bound >= time + }), + PartialTimeFilter::Low(Bound::Excluded(time)) => manifests.retain(|item| { + let time = time.and_utc(); + item.time_upper_bound > time + }), + PartialTimeFilter::High(Bound::Included(time)) => manifests.retain(|item| { + let time = time.and_utc(); + item.time_lower_bound <= time + }), + PartialTimeFilter::High(Bound::Excluded(time)) => manifests.retain(|item| { + let time = time.and_utc(); + item.time_lower_bound < time + }), + PartialTimeFilter::Eq(time) => manifests.retain(|item| { + let time = time.and_utc(); + item.time_lower_bound <= time && time <= item.time_upper_bound + }), + _ => (), + } + } + + manifests + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct ManifestItem { + pub manifest_path: String, + pub time_lower_bound: DateTime, + pub time_upper_bound: DateTime, +} diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 294fc40a8..3d6409dc2 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -22,7 +22,6 @@ pub mod livetail; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; -const FILL_NULL_OPTION_KEY: &str = "send_null"; const SEPARATOR: char = '^'; const OIDC_SCOPE: &str = "openid profile email"; diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index b41120912..f6dc3a20b 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -16,56 +16,100 @@ * */ -use actix_web::error::ErrorUnauthorized; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; +use chrono::{DateTime, Utc}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; -use serde_json::Value; use std::collections::HashMap; use std::pin::Pin; use std::time::Instant; -use crate::handlers::FILL_NULL_OPTION_KEY; use crate::metrics::QUERY_EXECUTE_TIME; -use crate::option::CONFIG; -use crate::query::error::{ExecuteError, ParseError}; -use crate::query::Query; +use crate::query::error::ExecuteError; +use crate::query::QUERY_SESSION; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; use crate::utils::actix::extract_session_key_from_req; -pub async fn query( - query: Query, - web::Query(params): web::Query>, -) -> Result { +// Query Request thorugh http endpoint. +// Implememts FromRequest +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Query { + query: String, + start_time: String, + end_time: String, + #[serde(default)] + send_null: bool, + #[serde(skip)] + fields: bool, + #[serde(skip)] + filter_tags: Option>, +} + +pub async fn query(req: HttpRequest, query_request: Query) -> Result { + let creds = extract_session_key_from_req(&req).expect("expects basic auth"); + let permissions = Users.get_permissions(&creds); + let session_state = QUERY_SESSION.state(); + let mut query = into_query(&query_request, &session_state).await?; + + // check authorization of this query if it references physical table; + let table_name = query.table_name(); + if let Some(ref table) = table_name { + let mut authorized = false; + let mut tags = Vec::new(); + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(Action::Query, ref stream, tag) + if stream == table || stream == "*" => + { + authorized = true; + if let Some(tag) = tag { + tags.push(tag) + } + } + _ => (), + } + } + + if !authorized { + return Err(QueryError::Unauthorized); + } + + if !tags.is_empty() { + query.filter_tag = Some(tags) + } + } + let time = Instant::now(); - // format output json to include field names - let with_fields = params.get("fields").cloned().unwrap_or(false); - // Fill missing columns with null - let fill_null = params - .get("fillNull") - .cloned() - .or(Some(query.fill_null)) - .unwrap_or(false); - - let storage = CONFIG.storage().get_object_store(); - let (records, fields) = query.execute(storage).await?; + let (records, fields) = query.execute().await?; let response = QueryResponse { records, fields, - fill_null, - with_fields, + fill_null: query_request.send_null, + with_fields: query_request.fields, } .to_http(); - let time = time.elapsed().as_secs_f64(); - QUERY_EXECUTE_TIME - .with_label_values(&[query.stream_name.as_str()]) - .observe(time); + if let Some(table) = table_name { + let time = time.elapsed().as_secs_f64(); + QUERY_EXECUTE_TIME + .with_label_values(&[&table]) + .observe(time); + } Ok(response) } @@ -75,48 +119,19 @@ impl FromRequest for Query { type Future = Pin>>>; fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let creds = extract_session_key_from_req(req).expect("expects basic auth"); - let permissions = Users.get_permissions(&creds); - let json = Json::::from_request(req, payload); + let query = Json::::from_request(req, payload); + let params = web::Query::>::from_request(req, payload) + .into_inner() + .map(|x| x.0) + .unwrap_or_default(); let fut = async move { - let json = json.await?.into_inner(); - // maybe move this option to query param instead so that it can simply be extracted from request - let fill_null = json - .as_object() - .and_then(|map| map.get(FILL_NULL_OPTION_KEY)) - .and_then(|value| value.as_bool()) - .unwrap_or_default(); - - let mut query = Query::parse(json).map_err(QueryError::Parse)?; - query.fill_null = fill_null; - - let mut authorized = false; - let mut tags = Vec::new(); - - // in permission check if user can run query on the stream. - // also while iterating add any filter tags for this stream - for permission in permissions { - match permission { - Permission::Stream(Action::All, _) => authorized = true, - Permission::StreamWithTag(Action::Query, stream, tag) - if stream == query.stream_name || stream == "*" => - { - authorized = true; - if let Some(tag) = tag { - tags.push(tag) - } - } - _ => (), - } - } - - if !authorized { - return Err(ErrorUnauthorized("Not authorized")); - } + let mut query = query.await?.into_inner(); + // format output json to include field names + query.fields = params.get("fields").cloned().unwrap_or(false); - if !tags.is_empty() { - query.filter_tag = Some(tags) + if !query.send_null { + query.send_null = params.get("sendNull").cloned().unwrap_or(false); } Ok(query) @@ -126,10 +141,71 @@ impl FromRequest for Query { } } +async fn into_query( + query: &Query, + session_state: &SessionState, +) -> Result { + if query.query.is_empty() { + return Err(QueryError::EmptyQuery); + } + + if query.start_time.is_empty() { + return Err(QueryError::EmptyStartTime); + } + + if query.end_time.is_empty() { + return Err(QueryError::EmptyEndTime); + } + + let start: DateTime; + let end: DateTime; + + if query.end_time == "now" { + end = Utc::now(); + start = end - chrono::Duration::from_std(humantime::parse_duration(&query.start_time)?)?; + } else { + start = DateTime::parse_from_rfc3339(&query.start_time) + .map_err(|_| QueryError::StartTimeParse)? + .into(); + end = DateTime::parse_from_rfc3339(&query.end_time) + .map_err(|_| QueryError::EndTimeParse)? + .into(); + }; + + if start.timestamp() > end.timestamp() { + return Err(QueryError::StartTimeAfterEndTime); + } + + Ok(crate::query::Query { + raw_logical_plan: session_state.create_logical_plan(&query.query).await?, + start, + end, + filter_tag: query.filter_tags.clone(), + }) +} + #[derive(Debug, thiserror::Error)] pub enum QueryError { - #[error("Bad request: {0}")] - Parse(#[from] ParseError), + #[error("Query cannot be empty")] + EmptyQuery, + #[error("Start time cannot be empty")] + EmptyStartTime, + #[error("End time cannot be empty")] + EmptyEndTime, + #[error("Could not parse start time correctly")] + StartTimeParse, + #[error("Could not parse end time correctly")] + EndTimeParse, + #[error("While generating times for 'now' failed to parse duration")] + NotValidDuration(#[from] humantime::DurationError), + #[error("Parsed duration out of range")] + OutOfRange(#[from] chrono::OutOfRangeError), + #[error("Start time cannot be greater than the end time")] + StartTimeAfterEndTime, + #[error("Unauthorized")] + Unauthorized, + #[error("Datafusion Error: {0}")] + Datafusion(#[from] DataFusionError), #[error("Query execution failed due to {0}")] Execute(#[from] ExecuteError), } @@ -137,8 +213,8 @@ pub enum QueryError { impl actix_web::ResponseError for QueryError { fn status_code(&self) -> http::StatusCode { match self { - QueryError::Parse(_) => StatusCode::BAD_REQUEST, QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + _ => StatusCode::BAD_REQUEST, } } diff --git a/server/src/main.rs b/server/src/main.rs index 74e2e523b..0d2bee0b3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -29,6 +29,7 @@ mod about; mod alerts; mod analytics; mod banner; +mod catalog; mod event; mod handlers; mod livetail; diff --git a/server/src/query.rs b/server/src/query.rs index f7b2102e5..115e34445 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -17,86 +17,53 @@ */ mod filter_optimizer; +mod stream_schema_provider; mod table_provider; +use chrono::TimeZone; use chrono::{DateTime, Utc}; -use chrono::{TimeZone, Timelike}; -use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; + +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan}; use datafusion::prelude::*; -use futures_util::stream::FuturesUnordered; -use futures_util::{future, Future, TryStreamExt}; use itertools::Itertools; -use object_store::path::Path as StorePath; -use object_store::{ObjectMeta, ObjectStore}; -use serde_json::Value; +use once_cell::sync::Lazy; use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::sync::Arc; use sysinfo::{System, SystemExt}; -use crate::event::DEFAULT_TIMESTAMP_KEY; +use crate::event; use crate::option::CONFIG; -use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; -use crate::storage::{ObjectStorageError, StorageDir}; -use crate::utils::TimePeriod; -use crate::validator; - -use self::error::{ExecuteError, ParseError}; -use self::filter_optimizer::FilterOptimizerRule; -use self::table_provider::QueryTableProvider; - -type Key = &'static str; -fn get_value(value: &Value, key: Key) -> Result<&str, Key> { - value.get(key).and_then(|value| value.as_str()).ok_or(key) -} +use crate::storage::{ObjectStorageProvider, StorageDir}; + +use self::error::ExecuteError; + +use self::stream_schema_provider::GlobalSchemaProvider; +pub use self::stream_schema_provider::PartialTimeFilter; + +pub static QUERY_SESSION: Lazy = + Lazy::new(|| Query::create_session_context(CONFIG.storage())); -// Query holds all values relevant to a query for a single log stream +// A query request by client pub struct Query { - pub query: String, - pub stream_name: String, - pub schema: Arc, + pub raw_logical_plan: LogicalPlan, pub start: DateTime, pub end: DateTime, pub filter_tag: Option>, - pub fill_null: bool, } impl Query { - // parse_query parses the SQL query and returns the log stream name on which - // this query is supposed to be executed - pub fn parse(query_json: Value) -> Result { - // retrieve query, start and end time information from payload. - let query = get_value(&query_json, "query")?; - let start_time = get_value(&query_json, "startTime")?; - let end_time = get_value(&query_json, "endTime")?; - - Ok(validator::query(query, start_time, end_time)?) - } - - /// Return prefixes, each per day/hour/minutes as necessary - fn generate_prefixes(&self) -> Vec { - TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes() - } - - fn get_prefixes(&self) -> Vec { - self.generate_prefixes() - .into_iter() - .map(|key| format!("{}/{}", self.stream_name, key)) - .collect() - } - // create session context for this query - fn create_session_context(&self) -> SessionContext { - let config = SessionConfig::default(); - let runtime_config = CONFIG - .storage() + pub fn create_session_context( + storage: Arc, + ) -> SessionContext { + let runtime_config = storage .get_datafusion_runtime() .with_disk_manager(DiskManagerConfig::NewOs); @@ -112,56 +79,36 @@ impl Query { let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let mut state = SessionState::new_with_config_rt(config, runtime); - - if let Some(tag) = &self.filter_tag { - let filter = FilterOptimizerRule { - column: crate::event::DEFAULT_TAGS_KEY.to_string(), - literals: tag.clone(), - }; - state = state.add_optimizer_rule(Arc::new(filter)) - } + + let config = SessionConfig::default() + .with_parquet_pruning(true) + .with_prefer_existing_sort(true) + .with_round_robin_repartition(true); + + let state = SessionState::new_with_config_rt(config, runtime); + let schema_provider = Arc::new(GlobalSchemaProvider { + storage: storage.get_object_store(), + }); + state + .catalog_list() + .catalog(&state.config_options().catalog.default_catalog) + .expect("default catalog is provided by datafusion") + .register_schema( + &state.config_options().catalog.default_schema, + schema_provider, + ) + .unwrap(); SessionContext::new_with_state(state) } /// Execute query on object storage(and if necessary on cache as well) with given stream information /// TODO: find a way to query all selected parquet files together in a single context. - pub async fn execute( - &self, - storage: Arc, - ) -> Result<(Vec, Vec), ExecuteError> { - let ctx = self.create_session_context(); - let unresolved_prefixes = self.get_prefixes(); - let client = ctx - .runtime_env() - .object_store(Box::new(storage.store_url())) - .unwrap(); - let prefixes = - resolve_paths(client, storage.normalize_prefixes(unresolved_prefixes)).await?; - - let remote_listing_table = self.remote_query(prefixes, storage)?; + pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + let df = QUERY_SESSION + .execute_logical_plan(self.final_logical_plan()) + .await?; - let current_minute = Utc::now() - .with_second(0) - .and_then(|x| x.with_nanosecond(0)) - .expect("zeroed value is valid"); - - let memtable = if self.end > current_minute { - crate::event::STREAM_WRITERS.recordbatches_cloned(&self.stream_name, &self.schema) - } else { - None - }; - - let table = - QueryTableProvider::try_new(memtable, remote_listing_table, self.schema.clone())?; - - ctx.register_table(&*self.stream_name, Arc::new(table)) - .map_err(ObjectStorageError::DataFusionError)?; - // execute the query and collect results - let df = ctx.sql(self.query.as_str()).await?; - // dataframe qualifies name by adding table name before columns. \ - // For now this is just actual names let fields = df .schema() .fields() @@ -171,34 +118,121 @@ impl Query { .collect_vec(); let results = df.collect().await?; - Ok((results, fields)) } - fn remote_query( - &self, - prefixes: Vec, - storage: Arc, - ) -> Result>, ExecuteError> { - let prefixes = storage.query_prefixes(prefixes); - if prefixes.is_empty() { - return Ok(None); + /// return logical plan with all time filters applied through + fn final_logical_plan(&self) -> LogicalPlan { + fn tag_filter(filters: Vec) -> Option { + filters + .iter() + .map(|literal| { + Expr::Column(Column::from_name(event::DEFAULT_TAGS_KEY)) + .like(lit(format!("%{}%", literal))) + }) + .reduce(or) + } + + fn transform( + plan: LogicalPlan, + start_time_filter: Expr, + end_time_filter: Expr, + filters: Option, + ) -> LogicalPlan { + plan.transform(&|plan| match plan { + LogicalPlan::TableScan(table) => { + let mut new_filters = vec![]; + if !table.filters.iter().any(|expr| { + let Expr::BinaryExpr(binexpr) = expr else {return false}; + matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == event::DEFAULT_TIMESTAMP_KEY) + }) { + new_filters.push(start_time_filter.clone()); + new_filters.push(end_time_filter.clone()); + } + + if let Some(tag_filters) = filters.clone() { + new_filters.push(tag_filters) + } + + let new_filter = new_filters.into_iter().reduce(and); + + if let Some(new_filter) = new_filter { + let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); + Ok(Transformed::Yes(LogicalPlan::Filter(filter))) + } else { + Ok(Transformed::No(LogicalPlan::TableScan(table))) + } + }, + x => Ok(Transformed::No(x)), + }) + .expect("transform only transforms the tablescan") + } + + let filters = self.filter_tag.clone().and_then(tag_filter); + let start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(self.start.naive_utc())).binary_expr( + Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)), + ); + let end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(self.end.naive_utc())).binary_expr( + Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)), + ); + + // see https://github.com/apache/arrow-datafusion/pull/8400 + // this can be eliminated in later version of datafusion but with slight caveat + // transform cannot modify stringified plans by itself + // we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan + match self.raw_logical_plan.clone() { + LogicalPlan::Explain(plan) => { + let transformed = transform( + plan.plan.as_ref().clone(), + start_time_filter, + end_time_filter, + filters, + ); + LogicalPlan::Explain(Explain { + verbose: plan.verbose, + stringified_plans: vec![ + transformed.to_stringified(PlanType::InitialLogicalPlan) + ], + plan: Arc::new(transformed), + schema: plan.schema, + logical_optimization_succeeded: plan.logical_optimization_succeeded, + }) + } + x => transform(x, start_time_filter, end_time_filter, filters), + } + } + + pub fn table_name(&self) -> Option { + let mut visitor = TableScanVisitor::default(); + let _ = self.raw_logical_plan.visit(&mut visitor); + visitor.into_inner().pop() + } +} + +#[derive(Debug, Default)] +struct TableScanVisitor { + tables: Vec, +} + +impl TableScanVisitor { + fn into_inner(self) -> Vec { + self.tables + } +} + +impl TreeNodeVisitor for TableScanVisitor { + type N = LogicalPlan; + + fn pre_visit(&mut self, node: &Self::N) -> Result { + match node { + LogicalPlan::TableScan(table) => { + self.tables.push(table.table_name.table().to_string()); + Ok(VisitRecursion::Stop) + } + _ => Ok(VisitRecursion::Continue), } - let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(false, true)]]; - let listing_options = ListingOptions::new(Arc::new(file_format)) - .with_file_extension(".parquet") - .with_file_sort_order(file_sort_order) - .with_collect_stat(true) - // enforce distribution will take care of parallelization - .with_target_partitions(1); - - let config = ListingTableConfig::new_with_multi_paths(prefixes) - .with_listing_options(listing_options) - .with_schema(self.schema.clone()); - - let listing_table = Arc::new(ListingTable::try_new(config)?); - Ok(Some(listing_table)) } } @@ -243,104 +277,10 @@ fn time_from_path(path: &Path) -> DateTime { .unwrap() } -// accepts relative paths to resolve the narrative -// returns list of prefixes sorted in descending order -async fn resolve_paths( - client: Arc, - prefixes: Vec, -) -> Result, ObjectStorageError> { - let mut minute_resolve: HashMap> = HashMap::new(); - let mut all_resolve = Vec::new(); - - for prefix in prefixes { - let components = prefix.split_terminator('/'); - if components.last().is_some_and(|x| x.starts_with("minute")) { - let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")]; - minute_resolve - .entry(hour_prefix.to_owned()) - .and_modify(|list| list.push(prefix)) - .or_default(); - } else { - all_resolve.push(prefix) - } - } - - type ResolveFuture = Pin, ObjectStorageError>>>>; - - let tasks: FuturesUnordered = FuturesUnordered::new(); - - for (listing_prefix, prefix) in minute_resolve { - let client = Arc::clone(&client); - tasks.push(Box::pin(async move { - let mut list = client - .list(Some(&StorePath::from(listing_prefix))) - .await? - .try_collect::>() - .await?; - - list.retain(|object| { - prefix.iter().any(|prefix| { - object - .location - .prefix_matches(&StorePath::from(prefix.as_ref())) - }) - }); - - Ok(list) - })); - } - - for prefix in all_resolve { - let client = Arc::clone(&client); - tasks.push(Box::pin(async move { - client - .list(Some(&StorePath::from(prefix))) - .await? - .try_collect::>() - .await - .map_err(Into::into) - })); - } - - let res: Vec> = tasks - .and_then(|res| { - future::ok( - res.into_iter() - .map(|res| res.location.to_string()) - .collect_vec(), - ) - }) - .try_collect() - .await?; - - let mut res = res.into_iter().flatten().collect_vec(); - res.sort(); - res.reverse(); - - Ok(res) -} - pub mod error { + use crate::storage::ObjectStorageError; use datafusion::error::DataFusionError; - use crate::{storage::ObjectStorageError, validator::error::QueryValidationError}; - - use super::Key; - - #[derive(Debug, thiserror::Error)] - pub enum ParseError { - #[error("Key not found: {0}")] - Key(String), - #[error("Error parsing query: {0}")] - Validation(#[from] QueryValidationError), - } - - impl From for ParseError { - fn from(key: Key) -> Self { - ParseError::Key(key.to_string()) - } - } - #[derive(Debug, thiserror::Error)] pub enum ExecuteError { #[error("Query Execution failed due to error in object storage: {0}")] diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs new file mode 100644 index 000000000..254dc2f18 --- /dev/null +++ b/server/src/query/stream_schema_provider.rs @@ -0,0 +1,520 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{any::Any, ops::Bound, sync::Arc}; + +use arrow_schema::SchemaRef; +use bytes::Bytes; +use chrono::{NaiveDateTime, Timelike, Utc}; +use datafusion::{ + catalog::schema::SchemaProvider, + common::tree_node::{TreeNode, VisitRecursion}, + datasource::{ + file_format::parquet::ParquetFormat, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + TableProvider, + }, + error::DataFusionError, + execution::context::SessionState, + logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType}, + physical_plan::ExecutionPlan, + prelude::{col, Column, Expr}, + scalar::ScalarValue, +}; +use futures_util::{stream::FuturesOrdered, StreamExt, TryStreamExt}; +use itertools::Itertools; +use object_store::{path::Path, ObjectStore}; +use url::Url; + +use crate::{ + catalog::{self, column::TypedStatistics, manifest::Manifest, ManifestFile, Snapshot}, + event::{self, DEFAULT_TIMESTAMP_KEY}, + metadata::STREAM_INFO, + option::CONFIG, + storage::ObjectStorage, +}; + +use super::table_provider; + +// schema provider for stream based on global data +pub struct GlobalSchemaProvider { + pub storage: Arc, +} + +#[async_trait::async_trait] +impl SchemaProvider for GlobalSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + STREAM_INFO.list_streams() + } + + async fn table(&self, name: &str) -> Option> { + if self.table_exist(name) { + Some(Arc::new(StandardTableProvider { + schema: STREAM_INFO.schema(name).unwrap(), + stream: name.to_owned(), + url: self.storage.store_url(), + })) + } else { + None + } + } + + fn table_exist(&self, name: &str) -> bool { + STREAM_INFO.stream_exists(name) + } +} + +#[derive(Debug)] +struct StandardTableProvider { + schema: SchemaRef, + // prefix under which to find snapshot + stream: String, + // url to find right instance of object store + url: Url, +} + +impl StandardTableProvider { + async fn collect_scan_from_snapshot( + &self, + glob_storage: Arc, + object_store: Arc, + _projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let time_filters = extract_primary_filter(filters); + let storage = glob_storage; + // todo can remove + if time_filters.is_empty() { + return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); + } + + let snapshot = storage + .get_snapshot(&self.stream) + .await + .map_err(|err| DataFusionError::Plan(err.to_string()))?; + + let items = snapshot.manifests(time_filters); + let manifest_files = collect_manifest_files( + object_store, + items + .into_iter() + .sorted_by_key(|file| file.time_lower_bound) + .map(|item| item.manifest_path) + .collect(), + ) + .await?; + + let mut manifest_files: Vec<_> = manifest_files + .into_iter() + .flat_map(|file| file.files) + .rev() + .collect(); + + for filter in filters { + manifest_files.retain(|file| !file.can_be_pruned(filter)) + } + + if let Some(limit) = limit { + let limit = limit as u64; + let mut curr_limit = 0; + let mut pos = None; + + for (index, file) in manifest_files.iter().enumerate() { + curr_limit += file.num_rows(); + if curr_limit >= limit { + pos = Some(index); + break; + } + } + + if let Some(pos) = pos { + manifest_files.truncate(pos + 1); + } + } + + Ok(manifest_files + .iter() + .map(|x| storage.store_url().join(&x.file_path).unwrap()) + .map(|x| ListingTableUrl::parse(x).unwrap()) + .collect()) + } + + fn remote_query( + &self, + prefixes: Vec, + ) -> Result>, DataFusionError> { + if prefixes.is_empty() { + return Ok(None); + } + + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(".parquet") + .with_file_sort_order(file_sort_order) + .with_collect_stat(true) + .with_target_partitions(1); + + let config = ListingTableConfig::new_with_multi_paths(prefixes) + .with_listing_options(listing_options) + .with_schema(self.schema.clone()); + + let listing_table = Arc::new(ListingTable::try_new(config)?); + Ok(Some(listing_table)) + } +} + +#[async_trait::async_trait] +impl TableProvider for StandardTableProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let memtable = if include_now(filters) { + event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) + } else { + None + }; + + let storage = state + .runtime_env() + .object_store_registry + .get_store(&self.url) + .unwrap(); + + let prefixes = self + .collect_scan_from_snapshot( + CONFIG.storage().get_object_store(), + storage, + projection, + filters, + limit, + ) + .await?; + + table_provider::QueryTableProvider::try_new( + memtable, + self.remote_query(prefixes)?, + self.schema(), + )? + .scan(state, projection, filters, limit) + .await + } + + fn supports_filter_pushdown( + &self, + filter: &Expr, + ) -> Result { + if expr_in_boundary(filter) { + // if filter can be handled by time partiton pruning, it is exact + Ok(TableProviderFilterPushDown::Exact) + } else { + // otherwise, we still might be able to handle the filter with file + // level mechanisms such as Parquet row group pruning. + Ok(TableProviderFilterPushDown::Inexact) + } + } +} + +fn include_now(filters: &[Expr]) -> bool { + let current_minute = Utc::now() + .with_second(0) + .and_then(|x| x.with_nanosecond(0)) + .expect("zeroed value is valid") + .naive_utc(); + + let time_filters = extract_primary_filter(filters); + + let upper_bound_matches = time_filters.iter().any(|filter| match filter { + PartialTimeFilter::High(Bound::Excluded(time)) + | PartialTimeFilter::High(Bound::Included(time)) + | PartialTimeFilter::Eq(time) => time > ¤t_minute, + _ => false, + }); + + if upper_bound_matches { + return true; + } + + // does it even have a higher bound + let has_upper_bound = time_filters + .iter() + .any(|filter| matches!(filter, PartialTimeFilter::High(_))); + + !has_upper_bound +} + +fn expr_in_boundary(filter: &Expr) -> bool { + let Expr::BinaryExpr(binexpr) = filter else { + return false; + }; + let Some((op, time)) = extract_timestamp_bound(binexpr) else { + return false; + }; + + // this is due to knowlege of prefixes being minute long always. + // Without a consistent partition spec this cannot be guarenteed. + time.second() == 0 + && time.nanosecond() == 0 + && matches!( + op, + Operator::Gt | Operator::GtEq | Operator::Lt | Operator::LtEq + ) +} + +fn extract_from_lit(expr: &Expr) -> Option { + if let Expr::Literal(value) = expr { + match value { + ScalarValue::TimestampMillisecond(Some(value), _) => { + Some(NaiveDateTime::from_timestamp_millis(*value).unwrap()) + } + _ => None, + } + } else { + None + } +} + +fn extract_timestamp_bound(binexpr: &BinaryExpr) -> Option<(Operator, NaiveDateTime)> { + if matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == DEFAULT_TIMESTAMP_KEY) + { + let time = extract_from_lit(&binexpr.right)?; + Some((binexpr.op, time)) + } else { + None + } +} + +async fn collect_manifest_files( + storage: Arc, + manifest_urls: Vec, +) -> Result, object_store::Error> { + let tasks = manifest_urls.into_iter().map(|path| { + let path = Path::parse(path).unwrap(); + let storage = Arc::clone(&storage); + async move { storage.get(&path).await } + }); + + let resp = FuturesOrdered::from_iter(tasks) + .and_then(|res| res.bytes()) + .collect::>>() + .await; + + Ok(resp + .into_iter() + .flat_map(|res| res.ok()) + .map(|bytes| serde_json::from_slice(&bytes).unwrap()) + .collect()) +} + +// extract start time and end time from filter preficate +fn extract_primary_filter(filters: &[Expr]) -> Vec { + let mut time_filters = Vec::new(); + filters.iter().for_each(|expr| { + let _ = expr.apply(&mut |expr| { + let time = PartialTimeFilter::try_from_expr(expr); + if let Some(time) = time { + time_filters.push(time); + Ok(VisitRecursion::Stop) + } else { + Ok(VisitRecursion::Skip) + } + }); + }); + time_filters +} + +#[derive(Debug)] +pub enum PartialTimeFilter { + Low(Bound), + High(Bound), + Eq(NaiveDateTime), +} + +impl PartialTimeFilter { + fn try_from_expr(expr: &Expr) -> Option { + let Expr::BinaryExpr(binexpr) = expr else { + return None; + }; + let (op, time) = extract_timestamp_bound(binexpr)?; + let value = match op { + Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), + Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), + Operator::Lt => PartialTimeFilter::High(Bound::Excluded(time)), + Operator::LtEq => PartialTimeFilter::High(Bound::Included(time)), + Operator::Eq => PartialTimeFilter::Eq(time), + Operator::IsNotDistinctFrom => PartialTimeFilter::Eq(time), + _ => return None, + }; + Some(value) + } + + pub fn binary_expr(&self, left: Expr) -> Expr { + let (op, right) = match self { + PartialTimeFilter::Low(Bound::Excluded(time)) => { + (Operator::Gt, time.timestamp_millis()) + } + PartialTimeFilter::Low(Bound::Included(time)) => { + (Operator::GtEq, time.timestamp_millis()) + } + PartialTimeFilter::High(Bound::Excluded(time)) => { + (Operator::Lt, time.timestamp_millis()) + } + PartialTimeFilter::High(Bound::Included(time)) => { + (Operator::LtEq, time.timestamp_millis()) + } + PartialTimeFilter::Eq(time) => (Operator::Eq, time.timestamp_millis()), + _ => unimplemented!(), + }; + + Expr::BinaryExpr(BinaryExpr::new( + Box::new(left), + op, + Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(right), + None, + ))), + )) + } +} + +trait ManifestExt: ManifestFile { + fn find_matching_column(&self, partial_filter: &Expr) -> Option<&catalog::column::Column> { + let name = match partial_filter { + Expr::BinaryExpr(binary_expr) => { + let Expr::Column(col) = binary_expr.left.as_ref() else { + return None; + }; + &col.name + } + _ => { + return None; + } + }; + + self.columns().iter().find(|col| &col.name == name) + } + + fn can_be_pruned(&self, partial_filter: &Expr) -> bool { + fn extract_op_scalar(expr: &Expr) -> Option<(Operator, &ScalarValue)> { + let Expr::BinaryExpr(expr) = expr else { + return None; + }; + let Expr::Literal(value) = &*expr.right else { + return None; + }; + Some((expr.op, value)) + } + + let Some(col) = self.find_matching_column(partial_filter) else { + return false; + }; + + let Some((op, value)) = extract_op_scalar(partial_filter) else { + return false; + }; + + let Some(value) = cast_or_none(value) else { + return false; + }; + + let Some(stats) = &col.stats else { + return false; + }; + + !satisfy_constraints(value, op, stats).unwrap_or(true) + } +} + +impl ManifestExt for T {} + +enum CastRes<'a> { + Bool(bool), + Int(i64), + Float(f64), + String(&'a str), +} + +fn cast_or_none(scalar: &ScalarValue) -> Option> { + match scalar { + ScalarValue::Null => None, + ScalarValue::Boolean(val) => val.map(CastRes::Bool), + ScalarValue::Float32(val) => val.map(|val| CastRes::Float(val as f64)), + ScalarValue::Float64(val) => val.map(CastRes::Float), + ScalarValue::Int8(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::Int16(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::Int32(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::Int64(val) => val.map(CastRes::Int), + ScalarValue::UInt8(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::UInt16(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::UInt32(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::UInt64(val) => val.map(|val| CastRes::Int(val as i64)), + ScalarValue::Utf8(val) => val.as_ref().map(|val| CastRes::String(val)), + ScalarValue::TimestampMillisecond(val, _) => val.map(CastRes::Int), + _ => None, + } +} + +fn satisfy_constraints(value: CastRes, op: Operator, stats: &TypedStatistics) -> Option { + fn matches(value: T, min: T, max: T, op: Operator) -> Option { + let val = match op { + Operator::Eq | Operator::IsNotDistinctFrom => value >= min && value <= max, + Operator::NotEq => value < min && value > max, + Operator::Lt => value > min, + Operator::LtEq => value >= min, + Operator::Gt => value < max, + Operator::GtEq => value <= max, + _ => return None, + }; + Some(val) + } + + match (value, stats) { + (CastRes::Bool(val), TypedStatistics::Bool(stats)) => { + matches(val, stats.min, stats.max, op) + } + (CastRes::Int(val), TypedStatistics::Int(stats)) => matches(val, stats.min, stats.max, op), + (CastRes::Float(val), TypedStatistics::Float(stats)) => { + matches(val, stats.min, stats.max, op) + } + (CastRes::String(val), TypedStatistics::String(stats)) => { + matches(val, &stats.min, &stats.max, op) + } + _ => None, + } +} diff --git a/server/src/storage.rs b/server/src/storage.rs index 3873b45ba..21f054887 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,7 @@ * */ -use crate::stats::Stats; +use crate::{catalog::snapshot::Snapshot, stats::Stats}; use chrono::Local; @@ -72,6 +72,8 @@ pub struct ObjectStoreFormat { pub owner: Owner, pub permissions: Vec, pub stats: Stats, + #[serde(default)] + pub snapshot: Snapshot, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -112,6 +114,7 @@ impl Default for ObjectStoreFormat { owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], stats: Stats::default(), + snapshot: Snapshot::default(), } } } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 8f0572430..6c163e21f 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -194,14 +194,11 @@ impl ObjectStorage for LocalFS { Ok(()) } - fn normalize_prefixes(&self, prefixes: Vec) -> Vec { - prefixes - .into_iter() - .map(|prefix| { - let path = self.root.join(prefix); - format!("{}", path.display()) - }) - .collect() + fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { + object_store::path::Path::parse( + format!("{}", self.root.join(prefix.as_str()).display()).trim_start_matches('/'), + ) + .unwrap() } fn query_prefixes(&self, prefixes: Vec) -> Vec { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index e0f771f4e..129de283f 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -23,6 +23,7 @@ use super::{ use crate::{ alerts::Alerts, + catalog::{self, manifest::Manifest, snapshot::Snapshot}, metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, @@ -51,6 +52,7 @@ pub(super) const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; +const MANIFEST_FILE: &str = "manifest.json"; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { fn get_datafusion_runtime(&self) -> RuntimeConfig; @@ -86,8 +88,8 @@ pub trait ObjectStorage: Sync + 'static { start.elapsed() } - fn normalize_prefixes(&self, prefixes: Vec) -> Vec; fn query_prefixes(&self, prefixes: Vec) -> Vec; + fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path; fn store_url(&self) -> url::Url; async fn put_schema( @@ -244,7 +246,6 @@ pub trait ObjectStorage: Sync + 'static { async fn stream_exists(&self, stream_name: &str) -> Result { let res = self.get_object(&stream_json_path(stream_name)).await; - match res { Ok(_) => Ok(true), Err(ObjectStorageError::NoSuchKey(_)) => Ok(false), @@ -252,6 +253,53 @@ pub trait ObjectStorage: Sync + 'static { } } + async fn get_manifest( + &self, + path: &RelativePath, + ) -> Result, ObjectStorageError> { + let path = manifest_path(path.as_str()); + match self.get_object(&path).await { + Ok(bytes) => Ok(Some( + serde_json::from_slice(&bytes).expect("manifest is valid json"), + )), + Err(err) => { + if matches!(err, ObjectStorageError::NoSuchKey(_)) { + Ok(None) + } else { + Err(err) + } + } + } + } + + async fn put_manifest( + &self, + path: &RelativePath, + manifest: Manifest, + ) -> Result<(), ObjectStorageError> { + let path = manifest_path(path.as_str()); + self.put_object(&path, to_bytes(&manifest)).await + } + + async fn get_snapshot(&self, stream: &str) -> Result { + let path = stream_json_path(stream); + let bytes = self.get_object(&path).await?; + Ok(serde_json::from_slice::(&bytes) + .expect("snapshot is valid json") + .snapshot) + } + + async fn put_snapshot( + &self, + stream: &str, + snapshot: Snapshot, + ) -> Result<(), ObjectStorageError> { + let mut stream_meta = self.get_stream_metadata(stream).await?; + stream_meta.snapshot = snapshot; + self.put_object(&stream_json_path(stream), to_bytes(&stream_meta)) + .await + } + async fn sync(&self) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); @@ -270,7 +318,17 @@ pub trait ObjectStorage: Sync + 'static { commit_schema_to_storage(stream, schema).await?; } - for file in dir.parquet_files() { + let parquet_files = dir.parquet_files(); + parquet_files.iter().for_each(|file| { + let compressed_size = file.metadata().map_or(0, |meta| meta.len()); + stream_stats + .entry(stream) + .and_modify(|size| *size += compressed_size) + .or_insert_with(|| compressed_size); + }); + + let mut manifest_entries = Vec::new(); + for file in &parquet_files { let filename = file .file_name() .expect("only parquet files are returned by iterator") @@ -278,16 +336,21 @@ pub trait ObjectStorage: Sync + 'static { .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); let objectstore_path = format!("{stream}/{file_suffix}"); + let manifest = catalog::create_from_parquet_file( + self.absolute_url(RelativePath::from_path(&objectstore_path).unwrap()) + .to_string(), + file, + ) + .unwrap(); + manifest_entries.push(manifest); + + self.upload_file(&objectstore_path, file).await?; + } - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - - self.upload_file(&objectstore_path, &file).await?; - - stream_stats - .entry(stream) - .and_modify(|size| *size += compressed_size) - .or_insert_with(|| compressed_size); + let store = CONFIG.storage().get_object_store(); + catalog::update_snapshot(store, stream, manifest_entries).await?; + for file in parquet_files { let _ = fs::remove_file(file); } } @@ -344,3 +407,8 @@ fn parseable_json_path() -> RelativePathBuf { fn alert_json_path(stream_name: &str) -> RelativePathBuf { RelativePathBuf::from_iter([stream_name, ALERT_FILE_NAME]) } + +#[inline(always)] +fn manifest_path(prefix: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([prefix, MANIFEST_FILE]) +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 1c0ed53a5..79b38426d 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -444,9 +444,8 @@ impl ObjectStorage for S3 { Ok(()) } - // no op on s3 - fn normalize_prefixes(&self, prefixes: Vec) -> Vec { - prefixes + fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { + object_store::path::Path::parse(prefix).unwrap() } fn query_prefixes(&self, prefixes: Vec) -> Vec { diff --git a/server/src/utils.rs b/server/src/utils.rs index 2f04ef419..f5858efd3 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -87,12 +87,14 @@ pub fn minute_to_prefix(minute: u32, data_granularity: u32) -> Option { )) } +#[allow(dead_code)] pub struct TimePeriod { start: DateTime, end: DateTime, data_granularity: u32, } +#[allow(dead_code)] impl TimePeriod { pub fn new(start: DateTime, end: DateTime, data_granularity: u32) -> Self { Self { diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index 82c01c1ba..1e866f79f 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -295,10 +295,10 @@ mod tests { write_message(&mut buf, schema, &options).unwrap(); let buf = Cursor::new(buf); - let mut reader = get_reverse_reader(buf).unwrap().flatten(); + let reader = get_reverse_reader(buf).unwrap().flatten(); let mut sum = 0; - while let Some(rb) = reader.next() { + for rb in reader { sum += 1; assert!(rb.num_rows() > 0); } diff --git a/server/src/validator.rs b/server/src/validator.rs index 260c7e475..d6a150ba0 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -19,13 +19,8 @@ use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; -use crate::metadata::STREAM_INFO; -use crate::query::Query; -use chrono::{DateTime, Utc}; -use self::error::{ - AlertValidationError, QueryValidationError, StreamNameValidationError, UsernameValidationError, -}; +use self::error::{AlertValidationError, StreamNameValidationError, UsernameValidationError}; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -134,78 +129,7 @@ pub fn user_name(username: &str) -> Result<(), UsernameValidationError> { Ok(()) } -pub fn query(query: &str, start_time: &str, end_time: &str) -> Result { - if query.is_empty() { - return Err(QueryValidationError::EmptyQuery); - } - - // convert query to lower case for validation only - // if validation succeeds, we use the original query - // since table names/fields are case sensitive - let query_lower = query.to_lowercase(); - - let tokens = query_lower.split(' ').collect::>(); - if tokens.contains(&"join") { - return Err(QueryValidationError::ContainsJoin(query.to_string())); - } - if tokens.len() < 4 { - return Err(QueryValidationError::IncompleteQuery(query.to_string())); - } - if start_time.is_empty() { - return Err(QueryValidationError::EmptyStartTime); - } - if end_time.is_empty() { - return Err(QueryValidationError::EmptyEndTime); - } - - // log stream name is located after the `from` keyword - let stream_name_index = tokens.iter().position(|&x| x == "from").unwrap() + 1; - // we currently don't support queries like "select name, address from stream1 and stream2" - // so if there is an `and` after the first log stream name, we return an error. - if tokens.len() > stream_name_index + 1 && tokens[stream_name_index + 1] == "and" { - return Err(QueryValidationError::MultipleStreams(query.to_string())); - } - - let start: DateTime; - let end: DateTime; - - if end_time == "now" { - end = Utc::now(); - start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; - } else { - start = DateTime::parse_from_rfc3339(start_time) - .map_err(|_| QueryValidationError::StartTimeParse)? - .into(); - end = DateTime::parse_from_rfc3339(end_time) - .map_err(|_| QueryValidationError::EndTimeParse)? - .into(); - }; - - if start.timestamp() > end.timestamp() { - return Err(QueryValidationError::StartTimeAfterEndTime); - } - - let stream_name = tokens[stream_name_index].to_string(); - - if !STREAM_INFO.stream_initialized(&stream_name)? { - return Err(QueryValidationError::UninitializedStream); - } - - let schema = STREAM_INFO.schema(&stream_name)?; - - Ok(Query { - stream_name: tokens[stream_name_index].to_string(), - start, - end, - query: query.to_string(), - schema, - filter_tag: None, - fill_null: false, - }) -} - pub mod error { - use crate::metadata::error::stream_info::MetadataError; #[derive(Debug, thiserror::Error)] pub enum AlertValidationError { @@ -221,36 +145,6 @@ pub mod error { NoTarget, } - #[derive(Debug, thiserror::Error)] - pub enum QueryValidationError { - #[error("Query cannot be empty")] - EmptyQuery, - #[error("Start time cannot be empty")] - EmptyStartTime, - #[error("End time cannot be empty")] - EmptyEndTime, - #[error("Could not parse start time correctly")] - StartTimeParse, - #[error("Could not parse end time correctly")] - EndTimeParse, - #[error("While generating times for 'now' failed to parse duration")] - NotValidDuration(#[from] humantime::DurationError), - #[error("Parsed duration out of range")] - OutOfRange(#[from] chrono::OutOfRangeError), - #[error("Start time cannot be greater than the end time")] - StartTimeAfterEndTime, - #[error("Stream is not initialized yet. Post an event first.")] - UninitializedStream, - #[error("Query {0} is incomplete")] - IncompleteQuery(String), - #[error("Query contains join keyword which is not supported yet.")] - ContainsJoin(String), - #[error("Querying multiple streams is not supported yet")] - MultipleStreams(String), - #[error("Metadata Error: {0}")] - Metadata(#[from] MetadataError), - } - #[derive(Debug, thiserror::Error)] pub enum StreamNameValidationError { #[error("Stream name cannot be empty")] From f33a186a9693181207847807dbdf5cd37ab0d2f4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 4 Dec 2023 16:48:18 +0530 Subject: [PATCH 2/6] Add docs --- server/src/catalog/column.rs | 5 +++++ server/src/catalog/manifest.rs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/server/src/catalog/column.rs b/server/src/catalog/column.rs index 305d7492f..216f0d7a8 100644 --- a/server/src/catalog/column.rs +++ b/server/src/catalog/column.rs @@ -44,6 +44,9 @@ pub struct Utf8Type { pub max: String, } +// Typed statistics are typed variant of statistics +// Currently all parquet types are casted down to these 4 types +// Binary types are assumed to be of valid Utf8 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum TypedStatistics { Bool(BoolType), @@ -84,6 +87,8 @@ impl TypedStatistics { } } +// Column statistics are used to track statistics for a column in a given file +// This is similar to and derived from parquet statistics. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Column { pub name: String, diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index 6dda79281..b6435507b 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -44,6 +44,9 @@ pub enum SortOrder { pub type SortInfo = (String, SortOrder); +// File is one entry in a manifest which points to a single file. +// Additionally it is meant to store the statistics for the file it +// points to, this is used for pruning file at planning level. #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct File { pub file_path: String, @@ -54,6 +57,7 @@ pub struct File { pub sort_order_id: Vec, } +// A manifest file composes of multiple file entries. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Manifest { pub version: String, From ddf8b5a94144ba6a12cd6998f0842a54a6a3886d Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 4 Dec 2023 16:54:09 +0530 Subject: [PATCH 3/6] Adjust for loss of precision Update server/src/catalog.rs Co-authored-by: Nick Signed-off-by: Satyam Singh Update server/src/catalog/manifest.rs Co-authored-by: Nick Signed-off-by: Satyam Singh Update server/src/catalog/column.rs Co-authored-by: Nick Signed-off-by: Satyam Singh Update server/src/handlers/http/query.rs Co-authored-by: Nick Signed-off-by: Satyam Singh Update server/src/catalog/manifest.rs Co-authored-by: Nick Signed-off-by: Satyam Singh --- server/src/catalog.rs | 4 ++-- server/src/catalog/column.rs | 4 ++-- server/src/catalog/manifest.rs | 8 ++++---- server/src/handlers/http/query.rs | 3 +-- server/src/query.rs | 11 +++++++---- 5 files changed, 16 insertions(+), 14 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index b6c6ec99a..a4f8050b4 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -170,8 +170,8 @@ pub async fn update_snapshot( storage.put_snapshot(stream_name, meta).await } -// partition path to which this manifest belongs. -// is useful when uploading manifest file +/// Partition the path to which this manifest belongs. +/// Useful when uploading the manifest file. fn partition_path( stream: &str, lower_bound: DateTime, diff --git a/server/src/catalog/column.rs b/server/src/catalog/column.rs index 216f0d7a8..17361609a 100644 --- a/server/src/catalog/column.rs +++ b/server/src/catalog/column.rs @@ -87,8 +87,8 @@ impl TypedStatistics { } } -// Column statistics are used to track statistics for a column in a given file -// This is similar to and derived from parquet statistics. +/// Column statistics are used to track statistics for a column in a given file. +/// This is similar to and derived from parquet statistics. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Column { pub name: String, diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index b6435507b..68f249533 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -44,9 +44,9 @@ pub enum SortOrder { pub type SortInfo = (String, SortOrder); -// File is one entry in a manifest which points to a single file. -// Additionally it is meant to store the statistics for the file it -// points to, this is used for pruning file at planning level. +/// An entry in a manifest which points to a single file. +/// Additionally, it is meant to store the statistics for the file it +/// points to. Used for pruning file at planning level. #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct File { pub file_path: String, @@ -57,7 +57,7 @@ pub struct File { pub sort_order_id: Vec, } -// A manifest file composes of multiple file entries. +/// A manifest file composed of multiple file entries. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Manifest { pub version: String, diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index f6dc3a20b..39de3ddee 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -36,8 +36,7 @@ use crate::rbac::Users; use crate::response::QueryResponse; use crate::utils::actix::extract_session_key_from_req; -// Query Request thorugh http endpoint. -// Implememts FromRequest +/// Query Request through http endpoint. #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct Query { diff --git a/server/src/query.rs b/server/src/query.rs index 115e34445..3bdd8a718 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -34,6 +34,7 @@ use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; +use std::ops::Add; use std::path::{Path, PathBuf}; use std::sync::Arc; use sysinfo::{System, SystemExt}; @@ -173,10 +174,12 @@ impl Query { PartialTimeFilter::Low(std::ops::Bound::Included(self.start.naive_utc())).binary_expr( Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)), ); - let end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(self.end.naive_utc())).binary_expr( - Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)), - ); + let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded( + self.end.add(chrono::Duration::milliseconds(1)).naive_utc(), + )) + .binary_expr(Expr::Column(Column::from_name( + event::DEFAULT_TIMESTAMP_KEY, + ))); // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat From 7b229b680ab07b10426b7474f122cdae952cba95 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 5 Dec 2023 15:54:10 +0530 Subject: [PATCH 4/6] Add back old query method --- server/src/query.rs | 2 - server/src/query/stream_schema_provider.rs | 299 ++++++++++++++++----- server/src/utils.rs | 1 - 3 files changed, 235 insertions(+), 67 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 3bdd8a718..cc1e6ef54 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -103,8 +103,6 @@ impl Query { SessionContext::new_with_state(state) } - /// Execute query on object storage(and if necessary on cache as well) with given stream information - /// TODO: find a way to query all selected parquet files together in a single context. pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan()) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 254dc2f18..60b26d058 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -16,7 +16,7 @@ * */ -use std::{any::Any, ops::Bound, sync::Arc}; +use std::{any::Any, collections::HashMap, ops::Bound, pin::Pin, sync::Arc}; use arrow_schema::SchemaRef; use bytes::Bytes; @@ -36,9 +36,13 @@ use datafusion::{ prelude::{col, Column, Expr}, scalar::ScalarValue, }; -use futures_util::{stream::FuturesOrdered, StreamExt, TryStreamExt}; +use futures_util::{ + future, + stream::{FuturesOrdered, FuturesUnordered}, + Future, StreamExt, TryStreamExt, +}; use itertools::Itertools; -use object_store::{path::Path, ObjectStore}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; use url::Url; use crate::{ @@ -46,7 +50,8 @@ use crate::{ event::{self, DEFAULT_TIMESTAMP_KEY}, metadata::STREAM_INFO, option::CONFIG, - storage::ObjectStorage, + storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}, + utils::TimePeriod, }; use super::table_provider; @@ -93,7 +98,7 @@ struct StandardTableProvider { } impl StandardTableProvider { - async fn collect_scan_from_snapshot( + async fn collect_remote_scan( &self, glob_storage: Arc, object_store: Arc, @@ -102,17 +107,28 @@ impl StandardTableProvider { limit: Option, ) -> Result, DataFusionError> { let time_filters = extract_primary_filter(filters); - let storage = glob_storage; - // todo can remove if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - let snapshot = storage + // Fetch snapshot + let snapshot = glob_storage .get_snapshot(&self.stream) .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; + // Is query timerange is overlapping with older data. + if is_overlapping_query(&snapshot, &time_filters) { + return listing_scan( + self.stream.clone(), + glob_storage.clone(), + object_store, + &time_filters, + ) + .await + .map(|prefixes| glob_storage.query_prefixes(prefixes)); + } + let items = snapshot.manifests(time_filters); let manifest_files = collect_manifest_files( object_store, @@ -154,7 +170,7 @@ impl StandardTableProvider { Ok(manifest_files .iter() - .map(|x| storage.store_url().join(&x.file_path).unwrap()) + .map(|x| glob_storage.store_url().join(&x.file_path).unwrap()) .map(|x| ListingTableUrl::parse(x).unwrap()) .collect()) } @@ -218,7 +234,7 @@ impl TableProvider for StandardTableProvider { .unwrap(); let prefixes = self - .collect_scan_from_snapshot( + .collect_remote_scan( CONFIG.storage().get_object_store(), storage, projection, @@ -251,6 +267,215 @@ impl TableProvider for StandardTableProvider { } } +#[derive(Debug)] +pub enum PartialTimeFilter { + Low(Bound), + High(Bound), + Eq(NaiveDateTime), +} + +impl PartialTimeFilter { + fn try_from_expr(expr: &Expr) -> Option { + let Expr::BinaryExpr(binexpr) = expr else { + return None; + }; + let (op, time) = extract_timestamp_bound(binexpr)?; + let value = match op { + Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), + Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), + Operator::Lt => PartialTimeFilter::High(Bound::Excluded(time)), + Operator::LtEq => PartialTimeFilter::High(Bound::Included(time)), + Operator::Eq => PartialTimeFilter::Eq(time), + Operator::IsNotDistinctFrom => PartialTimeFilter::Eq(time), + _ => return None, + }; + Some(value) + } + + pub fn binary_expr(&self, left: Expr) -> Expr { + let (op, right) = match self { + PartialTimeFilter::Low(Bound::Excluded(time)) => { + (Operator::Gt, time.timestamp_millis()) + } + PartialTimeFilter::Low(Bound::Included(time)) => { + (Operator::GtEq, time.timestamp_millis()) + } + PartialTimeFilter::High(Bound::Excluded(time)) => { + (Operator::Lt, time.timestamp_millis()) + } + PartialTimeFilter::High(Bound::Included(time)) => { + (Operator::LtEq, time.timestamp_millis()) + } + PartialTimeFilter::Eq(time) => (Operator::Eq, time.timestamp_millis()), + _ => unimplemented!(), + }; + + Expr::BinaryExpr(BinaryExpr::new( + Box::new(left), + op, + Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(right), + None, + ))), + )) + } + + fn is_greater_than(&self, other: &NaiveDateTime) -> bool { + match self { + PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other, + PartialTimeFilter::Low(Bound::Included(time)) + | PartialTimeFilter::High(Bound::Excluded(time)) + | PartialTimeFilter::High(Bound::Included(time)) => time > other, + PartialTimeFilter::Eq(time) => time > other, + _ => unimplemented!(), + } + } +} + +// accepts relative paths to resolve the narrative +// returns list of prefixes sorted in descending order +async fn listing_scan( + stream: String, + storage: Arc, + client: Arc, + time_filters: &[PartialTimeFilter], +) -> Result, DataFusionError> { + let start_time = time_filters + .iter() + .filter_map(|x| match x { + PartialTimeFilter::Low(Bound::Excluded(x)) => Some(x), + PartialTimeFilter::Low(Bound::Included(x)) => Some(x), + _ => None, + }) + .min() + .cloned(); + + let end_time = time_filters + .iter() + .filter_map(|x| match x { + PartialTimeFilter::High(Bound::Excluded(x)) => Some(x), + PartialTimeFilter::High(Bound::Included(x)) => Some(x), + _ => None, + }) + .max() + .cloned(); + + let Some((start_time, end_time)) = start_time.zip(end_time) else { + return Err(DataFusionError::NotImplemented( + "The time predicate is not supported because of possibly querying older data." + .to_string(), + )); + }; + + let prefixes = TimePeriod::new( + start_time.and_utc(), + end_time.and_utc(), + OBJECT_STORE_DATA_GRANULARITY, + ) + .generate_prefixes(); + + let prefixes = prefixes + .into_iter() + .map(|entry| { + let path = relative_path::RelativePathBuf::from(format!("{}/{}", stream, entry)); + storage.absolute_url(path.as_relative_path()).to_string() + }) + .collect_vec(); + + let mut minute_resolve: HashMap> = HashMap::new(); + let mut all_resolve = Vec::new(); + + for prefix in prefixes { + let components = prefix.split_terminator('/'); + if components.last().is_some_and(|x| x.starts_with("minute")) { + let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")]; + minute_resolve + .entry(hour_prefix.to_owned()) + .and_modify(|list| list.push(prefix)) + .or_default(); + } else { + all_resolve.push(prefix) + } + } + + type ResolveFuture = Pin< + Box, object_store::Error>> + Send + 'static>, + >; + + let tasks: FuturesUnordered = FuturesUnordered::new(); + + for (listing_prefix, prefix) in minute_resolve { + let client = Arc::clone(&client); + tasks.push(Box::pin(async move { + let mut list = client + .list(Some(&object_store::path::Path::from(listing_prefix))) + .await? + .try_collect::>() + .await?; + + list.retain(|object| { + prefix.iter().any(|prefix| { + object + .location + .prefix_matches(&object_store::path::Path::from(prefix.as_ref())) + }) + }); + + Ok(list) + })); + } + + for prefix in all_resolve { + let client = Arc::clone(&client); + tasks.push(Box::pin(async move { + client + .list(Some(&object_store::path::Path::from(prefix))) + .await? + .try_collect::>() + .await + .map_err(Into::into) + })); + } + + let res: Vec> = tasks + .and_then(|res| { + future::ok( + res.into_iter() + .map(|res| res.location.to_string()) + .collect_vec(), + ) + }) + .try_collect() + .await?; + + let mut res = res.into_iter().flatten().collect_vec(); + res.sort(); + res.reverse(); + Ok(res) +} + +fn is_overlapping_query( + snapshot: &catalog::snapshot::Snapshot, + time_filters: &[PartialTimeFilter], +) -> bool { + // This is for backwards compatiblity. Older table format relies on listing. + // if time is lower than 2nd smallest time bound then we fall back to old listing table code for now. + let Some(second_lowest) = snapshot + .manifest_list + .iter() + .map(|file| file.time_lower_bound) + .k_smallest(2) + .nth(1) + else { + return true; + }; + + // Query is overlapping when no lower bound exists such that it is greater than second lowest time in snapshot + !time_filters + .iter() + .all(|filter| filter.is_greater_than(&second_lowest.naive_utc())) +} + fn include_now(filters: &[Expr]) -> bool { let current_minute = Utc::now() .with_second(0) @@ -359,60 +584,6 @@ fn extract_primary_filter(filters: &[Expr]) -> Vec { time_filters } -#[derive(Debug)] -pub enum PartialTimeFilter { - Low(Bound), - High(Bound), - Eq(NaiveDateTime), -} - -impl PartialTimeFilter { - fn try_from_expr(expr: &Expr) -> Option { - let Expr::BinaryExpr(binexpr) = expr else { - return None; - }; - let (op, time) = extract_timestamp_bound(binexpr)?; - let value = match op { - Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), - Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), - Operator::Lt => PartialTimeFilter::High(Bound::Excluded(time)), - Operator::LtEq => PartialTimeFilter::High(Bound::Included(time)), - Operator::Eq => PartialTimeFilter::Eq(time), - Operator::IsNotDistinctFrom => PartialTimeFilter::Eq(time), - _ => return None, - }; - Some(value) - } - - pub fn binary_expr(&self, left: Expr) -> Expr { - let (op, right) = match self { - PartialTimeFilter::Low(Bound::Excluded(time)) => { - (Operator::Gt, time.timestamp_millis()) - } - PartialTimeFilter::Low(Bound::Included(time)) => { - (Operator::GtEq, time.timestamp_millis()) - } - PartialTimeFilter::High(Bound::Excluded(time)) => { - (Operator::Lt, time.timestamp_millis()) - } - PartialTimeFilter::High(Bound::Included(time)) => { - (Operator::LtEq, time.timestamp_millis()) - } - PartialTimeFilter::Eq(time) => (Operator::Eq, time.timestamp_millis()), - _ => unimplemented!(), - }; - - Expr::BinaryExpr(BinaryExpr::new( - Box::new(left), - op, - Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( - Some(right), - None, - ))), - )) - } -} - trait ManifestExt: ManifestFile { fn find_matching_column(&self, partial_filter: &Expr) -> Option<&catalog::column::Column> { let name = match partial_filter { diff --git a/server/src/utils.rs b/server/src/utils.rs index f5858efd3..e6164a705 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -87,7 +87,6 @@ pub fn minute_to_prefix(minute: u32, data_granularity: u32) -> Option { )) } -#[allow(dead_code)] pub struct TimePeriod { start: DateTime, end: DateTime, From 56f7da6232a3c2388c3c941aad74157084dab9d7 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 6 Dec 2023 15:43:33 +0530 Subject: [PATCH 5/6] Apply one change at a time --- server/src/catalog.rs | 58 +++++++++++----------------- server/src/catalog/manifest.rs | 20 +++++----- server/src/storage/object_storage.rs | 8 +--- 3 files changed, 33 insertions(+), 53 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index a4f8050b4..91357eb29 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -16,7 +16,7 @@ * */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; @@ -74,7 +74,7 @@ impl ManifestFile for manifest::File { pub async fn update_snapshot( storage: Arc, stream_name: &str, - changes: Vec, + change: manifest::File, ) -> Result<(), ObjectStorageError> { fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { match file @@ -102,39 +102,26 @@ pub async fn update_snapshot( let mut meta = storage.get_snapshot(stream_name).await?; let manifests = &mut meta.manifest_list; - let mut change_map: HashMap, Vec> = HashMap::new(); + let (lower_bound, _) = get_file_bounds(&change); + let pos = manifests.iter().position(|item| { + item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound + }); - for change in changes { - let (change_lower_bound, _) = get_file_bounds(&change); - - let pos = manifests.iter().position(|item| { - item.time_lower_bound <= change_lower_bound - && change_lower_bound < item.time_upper_bound - }); - change_map.entry(pos).or_default().push(change); - } - - let mut new_entries = Vec::new(); - for (pos, changes) in change_map { - let Some(pos) = pos else { - new_entries.extend(changes); - continue; - }; + // We update the manifest referenced by this position + // This updates an existing file so there is no need to create a snapshot entry. + if let Some(pos) = pos { let info = &mut manifests[pos]; let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); let Some(mut manifest) = storage.get_manifest(&path).await? else { - new_entries.extend(changes); - continue; + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); }; - - manifest.apply_change(changes); + manifest.apply_change(change); storage.put_manifest(&path, manifest).await?; - } - - let mut new_snapshot_entries = Vec::new(); - - for entry in new_entries { - let (lower_bound, _) = get_file_bounds(&entry); + } else { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound .date_naive() @@ -148,7 +135,7 @@ pub async fn update_snapshot( .and_utc(); let manifest = Manifest { - files: vec![entry], + files: vec![change], ..Manifest::default() }; @@ -156,18 +143,17 @@ pub async fn update_snapshot( storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) .await?; - let path = storage.absolute_url(&path); - new_snapshot_entries.push(snapshot::ManifestItem { + let new_snapshot_entriy = snapshot::ManifestItem { manifest_path: path.to_string(), time_lower_bound: lower_bound, time_upper_bound: upper_bound, - }) + }; + manifests.push(new_snapshot_entriy); + storage.put_snapshot(stream_name, meta).await?; } - manifests.extend(new_snapshot_entries); - - storage.put_snapshot(stream_name, meta).await + Ok(()) } /// Partition the path to which this manifest belongs. diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index 68f249533..f0696fc89 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -74,17 +74,15 @@ impl Default for Manifest { } impl Manifest { - pub fn apply_change(&mut self, changes: Vec) { - for change in changes { - if let Some(pos) = self - .files - .iter() - .position(|file| file.file_path == change.file_path) - { - self.files[pos] = change - } else { - self.files.push(change) - } + pub fn apply_change(&mut self, change: File) { + if let Some(pos) = self + .files + .iter() + .position(|file| file.file_path == change.file_path) + { + self.files[pos] = change + } else { + self.files.push(change) } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 129de283f..57fbdd3f3 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -327,7 +327,6 @@ pub trait ObjectStorage: Sync + 'static { .or_insert_with(|| compressed_size); }); - let mut manifest_entries = Vec::new(); for file in &parquet_files { let filename = file .file_name() @@ -342,14 +341,11 @@ pub trait ObjectStorage: Sync + 'static { file, ) .unwrap(); - manifest_entries.push(manifest); - self.upload_file(&objectstore_path, file).await?; + let store = CONFIG.storage().get_object_store(); + catalog::update_snapshot(store, stream, manifest).await?; } - let store = CONFIG.storage().get_object_store(); - catalog::update_snapshot(store, stream, manifest_entries).await?; - for file in parquet_files { let _ = fs::remove_file(file); } From 6a261af7708d244f3b240e28954193a7a7d60a00 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 6 Dec 2023 16:03:38 +0530 Subject: [PATCH 6/6] Remove id --- server/src/catalog.rs | 2 -- server/src/catalog/snapshot.rs | 7 ------- 2 files changed, 9 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 91357eb29..f07179908 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; -use ulid::Ulid; use crate::{ catalog::manifest::Manifest, @@ -37,7 +36,6 @@ pub mod snapshot; pub use manifest::create_from_parquet_file; pub trait Snapshot { - fn id(&self) -> Ulid; fn manifests(&self, time_predicates: Vec) -> Vec; } diff --git a/server/src/catalog/snapshot.rs b/server/src/catalog/snapshot.rs index cb2ad1d36..4c0877207 100644 --- a/server/src/catalog/snapshot.rs +++ b/server/src/catalog/snapshot.rs @@ -19,14 +19,12 @@ use std::ops::Bound; use chrono::{DateTime, Utc}; -use ulid::Ulid; use crate::query::PartialTimeFilter; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Snapshot { pub version: String, - pub id: Ulid, pub manifest_list: Vec, } @@ -34,17 +32,12 @@ impl Default for Snapshot { fn default() -> Self { Self { version: "v1".to_string(), - id: Ulid::new(), manifest_list: Vec::default(), } } } impl super::Snapshot for Snapshot { - fn id(&self) -> Ulid { - self.id - } - fn manifests(&self, time_predicates: Vec) -> Vec { let mut manifests = self.manifest_list.clone(); for predicate in time_predicates {