diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 81a1bd85c..8e3e62cfa 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -16,25 +16,16 @@ * */ -use crate::catalog::manifest::File; -use crate::hottier::HotTierManager; -use crate::option::Mode; -use crate::parseable::STREAM_EXISTS; -use crate::{ - catalog::snapshot::{self, Snapshot}, - storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, -}; +use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; + use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc}; -use datafusion::catalog::Session; -use datafusion::common::stats::Precision; -use datafusion::logical_expr::utils::conjunction; -use datafusion::physical_expr::LexOrdering; use datafusion::{ - catalog::SchemaProvider, + catalog::{SchemaProvider, Session}, common::{ + stats::Precision, tree_node::{TreeNode, TreeNodeRecursion}, ToDFSchema, }, @@ -46,32 +37,36 @@ use datafusion::{ }, error::{DataFusionError, Result as DataFusionResult}, execution::{context::SessionState, object_store::ObjectStoreUrl}, - logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType}, - physical_expr::{create_physical_expr, PhysicalSortExpr}, - physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, + logical_expr::{ + utils::conjunction, BinaryExpr, Operator, TableProviderFilterPushDown, TableType, + }, + physical_expr::{create_physical_expr, expressions::col, LexOrdering, PhysicalSortExpr}, + physical_plan::{empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, prelude::Expr, scalar::ScalarValue, }; - use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; use relative_path::RelativePathBuf; -use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; use url::Url; use crate::{ catalog::{ - self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile, + column::{Column, TypedStatistics}, + manifest::{File, Manifest}, + snapshot::{ManifestItem, Snapshot}, + ManifestFile, Snapshot as CatalogSnapshot, }, event::DEFAULT_TIMESTAMP_KEY, + hottier::HotTierManager, metrics::QUERY_CACHE_HIT, - parseable::PARSEABLE, - storage::ObjectStorage, + option::Mode, + parseable::{PARSEABLE, STREAM_EXISTS}, + storage::{ObjectStorage, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, }; use super::listing_table_builder::ListingTableBuilder; -use crate::catalog::Snapshot as CatalogSnapshot; // schema provider for stream based on global data #[derive(Debug)] @@ -142,9 +137,9 @@ impl StandardTableProvider { let sort_expr = PhysicalSortExpr { expr: if let Some(time_partition) = time_partition { - physical_plan::expressions::col(&time_partition, &self.schema)? + col(&time_partition, &self.schema)? } else { - physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)? + col(DEFAULT_TIMESTAMP_KEY, &self.schema)? }, options: SortOptions { descending: true, @@ -223,6 +218,59 @@ impl StandardTableProvider { Ok(()) } + /// Create an execution plan over the records in arrows and parquet that are still in staging, awaiting push to object storage + async fn get_staging_execution_plan( + &self, + execution_plans: &mut Vec>, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option<&String>, + ) -> Result<(), DataFusionError> { + let Ok(staging) = PARSEABLE.get_stream(&self.stream) else { + return Ok(()); + }; + + // Staging arrow exection plan + let records = staging.recordbatches_cloned(&self.schema); + let arrow_exec = reversed_mem_table(records, self.schema.clone())? + .scan(state, projection, filters, limit) + .await?; + execution_plans.push(arrow_exec); + + // Get a list of parquet files still in staging, order by filename + let mut parquet_files = staging.parquet_files(); + parquet_files.sort_by(|a, b| a.cmp(b).reverse()); + + // NOTE: We don't partition among CPUs to ensure consistent results. + // i.e. We were seeing in-consistent ordering when querying over parquets in staging. + let mut partitioned_files = Vec::with_capacity(parquet_files.len()); + for file_path in parquet_files { + let Ok(file_meta) = file_path.metadata() else { + continue; + }; + let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len()); + partitioned_files.push(file) + } + + // NOTE: There is the possibility of a parquet file being pushed to object store + // and deleted from staging in the time it takes for datafusion to get to it. + // Staging parquet execution plan + self.create_parquet_physical_plan( + execution_plans, + ObjectStoreUrl::parse("file:///").unwrap(), + vec![partitioned_files], + Statistics::new_unknown(&self.schema), + projection, + filters, + limit, + state, + time_partition.cloned(), + ) + .await + } + #[allow(clippy::too_many_arguments)] async fn legacy_listing_table( &self, @@ -277,12 +325,11 @@ impl StandardTableProvider { fn partitioned_files( &self, - manifest_files: Vec, + manifest_files: Vec, ) -> (Vec>, datafusion::common::Statistics) { let target_partition = num_cpus::get(); let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); - let mut column_statistics = - HashMap::>::new(); + let mut column_statistics = HashMap::>::new(); let mut count = 0; for (index, file) in manifest_files .into_iter() @@ -290,7 +337,7 @@ impl StandardTableProvider { .map(|(x, y)| (x % target_partition, y)) { #[allow(unused_mut)] - let catalog::manifest::File { + let File { mut file_path, num_rows, columns, @@ -357,12 +404,12 @@ impl StandardTableProvider { } async fn collect_from_snapshot( - snapshot: &catalog::snapshot::Snapshot, + snapshot: &Snapshot, time_filters: &[PartialTimeFilter], object_store: Arc, filters: &[Expr], limit: Option, -) -> Result, DataFusionError> { +) -> Result, DataFusionError> { let items = snapshot.manifests(time_filters); let manifest_files = collect_manifest_files( object_store, @@ -443,17 +490,17 @@ impl TableProvider for StandardTableProvider { } if is_within_staging_window(&time_filters) { - if let Ok(staging) = PARSEABLE.get_stream(&self.stream) { - let records = staging.recordbatches_cloned(&self.schema); - let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; - - let memory_exec = reversed_mem_table - .scan(state, projection, filters, limit) - .await?; - execution_plans.push(memory_exec); - } + self.get_staging_execution_plan( + &mut execution_plans, + projection, + filters, + limit, + state, + time_partition.as_ref(), + ) + .await?; }; - let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); + let mut merged_snapshot = Snapshot::default(); if PARSEABLE.options.mode == Mode::Query { let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); let obs = glob_storage @@ -848,7 +895,7 @@ pub fn extract_primary_filter( } trait ManifestExt: ManifestFile { - fn find_matching_column(&self, partial_filter: &Expr) -> Option<&catalog::column::Column> { + fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> { let name = match partial_filter { Expr::BinaryExpr(binary_expr) => { let Expr::Column(col) = binary_expr.left.as_ref() else {