From afa01613bb23e3c5de1f37ab571ea078db710d16 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 16:09:49 +0530 Subject: [PATCH 1/6] feat: query parquet files still in staging --- src/query/stream_schema_provider.rs | 101 ++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 29 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 81a1bd85c..4deb2d6c1 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -16,25 +16,20 @@ * */ -use crate::catalog::manifest::File; -use crate::hottier::HotTierManager; -use crate::option::Mode; -use crate::parseable::STREAM_EXISTS; -use crate::{ - catalog::snapshot::{self, Snapshot}, - storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, -}; +use std::any::Any; +use std::collections::HashMap; +use std::ops::Bound; +use std::os::unix::fs::MetadataExt; +use std::sync::Arc; + use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc}; -use datafusion::catalog::Session; -use datafusion::common::stats::Precision; -use datafusion::logical_expr::utils::conjunction; -use datafusion::physical_expr::LexOrdering; use datafusion::{ - catalog::SchemaProvider, + catalog::{SchemaProvider, Session}, common::{ + stats::Precision, tree_node::{TreeNode, TreeNodeRecursion}, ToDFSchema, }, @@ -46,28 +41,32 @@ use datafusion::{ }, error::{DataFusionError, Result as DataFusionResult}, execution::{context::SessionState, object_store::ObjectStoreUrl}, - logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType}, - physical_expr::{create_physical_expr, PhysicalSortExpr}, + logical_expr::{ + utils::conjunction, BinaryExpr, Operator, TableProviderFilterPushDown, TableType, + }, + physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}, physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, prelude::Expr, scalar::ScalarValue, }; - use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; use relative_path::RelativePathBuf; -use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; use url::Url; use crate::{ catalog::{ - self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile, + self, column::TypedStatistics, manifest::File, manifest::Manifest, snapshot::ManifestItem, + snapshot::Snapshot, ManifestFile, }, event::DEFAULT_TIMESTAMP_KEY, + hottier::HotTierManager, metrics::QUERY_CACHE_HIT, + option::Mode, parseable::PARSEABLE, - storage::ObjectStorage, + parseable::STREAM_EXISTS, + storage::{ObjectStorage, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, }; use super::listing_table_builder::ListingTableBuilder; @@ -223,6 +222,50 @@ impl StandardTableProvider { Ok(()) } + async fn get_staging_execution_plan( + &self, + execution_plans: &mut Vec>, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option<&String>, + ) -> Result<(), DataFusionError> { + let Ok(staging) = PARSEABLE.get_stream(&self.stream) else { + return Ok(()); + }; + let records = staging.recordbatches_cloned(&self.schema); + let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; + + let memory_exec = reversed_mem_table + .scan(state, projection, filters, limit) + .await?; + execution_plans.push(memory_exec); + + let target_partition = num_cpus::get(); + let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); + for (index, file_path) in staging.parquet_files().into_iter().enumerate() { + let Ok(file_meta) = file_path.metadata() else { + continue; + }; + let file = PartitionedFile::new(file_path.display().to_string(), file_meta.size()); + partitioned_files[index % target_partition].push(file) + } + + self.create_parquet_physical_plan( + execution_plans, + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + Statistics::new_unknown(&self.schema), + projection, + filters, + limit, + state, + time_partition.cloned(), + ) + .await + } + #[allow(clippy::too_many_arguments)] async fn legacy_listing_table( &self, @@ -443,17 +486,17 @@ impl TableProvider for StandardTableProvider { } if is_within_staging_window(&time_filters) { - if let Ok(staging) = PARSEABLE.get_stream(&self.stream) { - let records = staging.recordbatches_cloned(&self.schema); - let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; - - let memory_exec = reversed_mem_table - .scan(state, projection, filters, limit) - .await?; - execution_plans.push(memory_exec); - } + self.get_staging_execution_plan( + &mut execution_plans, + projection, + filters, + limit, + state, + time_partition.as_ref(), + ) + .await?; }; - let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); + let mut merged_snapshot = Snapshot::default(); if PARSEABLE.options.mode == Mode::Query { let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); let obs = glob_storage From a0d2c8eceabeae526eae5ad85099f6bb4e4a73a9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 16:48:07 +0530 Subject: [PATCH 2/6] doc: explain the code --- src/query/stream_schema_provider.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 4deb2d6c1..d70a43fec 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -234,14 +234,15 @@ impl StandardTableProvider { let Ok(staging) = PARSEABLE.get_stream(&self.stream) else { return Ok(()); }; - let records = staging.recordbatches_cloned(&self.schema); - let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; - let memory_exec = reversed_mem_table + // Staging arrow exection plan + let records = staging.recordbatches_cloned(&self.schema); + let arrow_exec = reversed_mem_table(records, self.schema.clone())? .scan(state, projection, filters, limit) .await?; - execution_plans.push(memory_exec); + execution_plans.push(arrow_exec); + // Partititon parquet files on disk among the available CPUs let target_partition = num_cpus::get(); let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); for (index, file_path) in staging.parquet_files().into_iter().enumerate() { @@ -252,6 +253,9 @@ impl StandardTableProvider { partitioned_files[index % target_partition].push(file) } + // NOTE: There is the possibility of a parquet file being pushed to object store + // and deleted from staging in the time it takes for datafusion to get to it. + // Staging parquet execution plan self.create_parquet_physical_plan( execution_plans, ObjectStoreUrl::parse("file:///").unwrap(), From 5e1cef9b7523573673121d0ae123d0b73fee8840 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 17:06:28 +0530 Subject: [PATCH 3/6] fix: non-unix compilation --- src/query/stream_schema_provider.rs | 39 +++++++++++++---------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index d70a43fec..dfaaec345 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -16,11 +16,7 @@ * */ -use std::any::Any; -use std::collections::HashMap; -use std::ops::Bound; -use std::os::unix::fs::MetadataExt; -use std::sync::Arc; +use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; @@ -44,8 +40,8 @@ use datafusion::{ logical_expr::{ utils::conjunction, BinaryExpr, Operator, TableProviderFilterPushDown, TableType, }, - physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}, - physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, + physical_expr::{create_physical_expr, expressions::col, LexOrdering, PhysicalSortExpr}, + physical_plan::{empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, prelude::Expr, scalar::ScalarValue, }; @@ -57,20 +53,20 @@ use url::Url; use crate::{ catalog::{ - self, column::TypedStatistics, manifest::File, manifest::Manifest, snapshot::ManifestItem, - snapshot::Snapshot, ManifestFile, + column::{Column, TypedStatistics}, + manifest::{File, Manifest}, + snapshot::{ManifestItem, Snapshot}, + ManifestFile, Snapshot as CatalogSnapshot, }, event::DEFAULT_TIMESTAMP_KEY, hottier::HotTierManager, metrics::QUERY_CACHE_HIT, option::Mode, - parseable::PARSEABLE, - parseable::STREAM_EXISTS, + parseable::{PARSEABLE, STREAM_EXISTS}, storage::{ObjectStorage, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, }; use super::listing_table_builder::ListingTableBuilder; -use crate::catalog::Snapshot as CatalogSnapshot; // schema provider for stream based on global data #[derive(Debug)] @@ -141,9 +137,9 @@ impl StandardTableProvider { let sort_expr = PhysicalSortExpr { expr: if let Some(time_partition) = time_partition { - physical_plan::expressions::col(&time_partition, &self.schema)? + col(&time_partition, &self.schema)? } else { - physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)? + col(DEFAULT_TIMESTAMP_KEY, &self.schema)? }, options: SortOptions { descending: true, @@ -249,7 +245,7 @@ impl StandardTableProvider { let Ok(file_meta) = file_path.metadata() else { continue; }; - let file = PartitionedFile::new(file_path.display().to_string(), file_meta.size()); + let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len()); partitioned_files[index % target_partition].push(file) } @@ -324,12 +320,11 @@ impl StandardTableProvider { fn partitioned_files( &self, - manifest_files: Vec, + manifest_files: Vec, ) -> (Vec>, datafusion::common::Statistics) { let target_partition = num_cpus::get(); let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); - let mut column_statistics = - HashMap::>::new(); + let mut column_statistics = HashMap::>::new(); let mut count = 0; for (index, file) in manifest_files .into_iter() @@ -337,7 +332,7 @@ impl StandardTableProvider { .map(|(x, y)| (x % target_partition, y)) { #[allow(unused_mut)] - let catalog::manifest::File { + let File { mut file_path, num_rows, columns, @@ -404,12 +399,12 @@ impl StandardTableProvider { } async fn collect_from_snapshot( - snapshot: &catalog::snapshot::Snapshot, + snapshot: &Snapshot, time_filters: &[PartialTimeFilter], object_store: Arc, filters: &[Expr], limit: Option, -) -> Result, DataFusionError> { +) -> Result, DataFusionError> { let items = snapshot.manifests(time_filters); let manifest_files = collect_manifest_files( object_store, @@ -895,7 +890,7 @@ pub fn extract_primary_filter( } trait ManifestExt: ManifestFile { - fn find_matching_column(&self, partial_filter: &Expr) -> Option<&catalog::column::Column> { + fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> { let name = match partial_filter { Expr::BinaryExpr(binary_expr) => { let Expr::Column(col) = binary_expr.left.as_ref() else { From 1b066f005e02b5d4aff67adb6a368c3a37c523df Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 11:00:38 +0530 Subject: [PATCH 4/6] doc: describe method working --- src/query/stream_schema_provider.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index dfaaec345..439104e8c 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -218,6 +218,7 @@ impl StandardTableProvider { Ok(()) } + /// Create an exection plan over the records in arrows and parquet that are still in staging, awaiting push to object storage async fn get_staging_execution_plan( &self, execution_plans: &mut Vec>, From 978c24ff1991645eb45b54ea82b6f40ec570b19f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 11:57:30 +0530 Subject: [PATCH 5/6] fix: inconsistent ordering when querying parquets in staging --- src/query/stream_schema_provider.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 439104e8c..97f1eb05e 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -239,15 +239,19 @@ impl StandardTableProvider { .await?; execution_plans.push(arrow_exec); - // Partititon parquet files on disk among the available CPUs - let target_partition = num_cpus::get(); - let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); - for (index, file_path) in staging.parquet_files().into_iter().enumerate() { + // Get a list of parquet files still in staging, order by filename + let mut parquet_files = staging.parquet_files(); + parquet_files.sort_by(|a, b| a.cmp(b).reverse()); + + // NOTE: We don't partition among CPUs to ensure consistent results. + // i.e. We were seeing in-consistent ordering when querying over parquets in staging. + let mut partitioned_files = Vec::with_capacity(parquet_files.len()); + for file_path in parquet_files { let Ok(file_meta) = file_path.metadata() else { continue; }; let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len()); - partitioned_files[index % target_partition].push(file) + partitioned_files.push(file) } // NOTE: There is the possibility of a parquet file being pushed to object store @@ -256,7 +260,7 @@ impl StandardTableProvider { self.create_parquet_physical_plan( execution_plans, ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, + vec![partitioned_files], Statistics::new_unknown(&self.schema), projection, filters, From 608e7178f021c52b4e6923bedeb08e561ed93b48 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 12:16:22 +0530 Subject: [PATCH 6/6] typo Signed-off-by: Devdutt Shenoi --- src/query/stream_schema_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 97f1eb05e..8e3e62cfa 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -218,7 +218,7 @@ impl StandardTableProvider { Ok(()) } - /// Create an exection plan over the records in arrows and parquet that are still in staging, awaiting push to object storage + /// Create an execution plan over the records in arrows and parquet that are still in staging, awaiting push to object storage async fn get_staging_execution_plan( &self, execution_plans: &mut Vec>,