- 
          
 - 
                Notifications
    
You must be signed in to change notification settings  - Fork 153
 
feat: query parquet files still in staging #1199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
afa0161
              a0d2c8e
              5e1cef9
              1b066f0
              978c24f
              608e717
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -16,25 +16,16 @@ | |
| * | ||
| */ | ||
| 
     | 
||
| use crate::catalog::manifest::File; | ||
| use crate::hottier::HotTierManager; | ||
| use crate::option::Mode; | ||
| use crate::parseable::STREAM_EXISTS; | ||
| use crate::{ | ||
| catalog::snapshot::{self, Snapshot}, | ||
| storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, | ||
| }; | ||
| use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; | ||
| 
     | 
||
| use arrow_array::RecordBatch; | ||
| use arrow_schema::{Schema, SchemaRef, SortOptions}; | ||
| use bytes::Bytes; | ||
| use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc}; | ||
| use datafusion::catalog::Session; | ||
| use datafusion::common::stats::Precision; | ||
| use datafusion::logical_expr::utils::conjunction; | ||
| use datafusion::physical_expr::LexOrdering; | ||
| use datafusion::{ | ||
| catalog::SchemaProvider, | ||
| catalog::{SchemaProvider, Session}, | ||
| common::{ | ||
| stats::Precision, | ||
| tree_node::{TreeNode, TreeNodeRecursion}, | ||
| ToDFSchema, | ||
| }, | ||
| 
        
          
        
         | 
    @@ -46,32 +37,36 @@ use datafusion::{ | |
| }, | ||
| error::{DataFusionError, Result as DataFusionResult}, | ||
| execution::{context::SessionState, object_store::ObjectStoreUrl}, | ||
| logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType}, | ||
| physical_expr::{create_physical_expr, PhysicalSortExpr}, | ||
| physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, | ||
| logical_expr::{ | ||
| utils::conjunction, BinaryExpr, Operator, TableProviderFilterPushDown, TableType, | ||
| }, | ||
| physical_expr::{create_physical_expr, expressions::col, LexOrdering, PhysicalSortExpr}, | ||
| physical_plan::{empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, | ||
| prelude::Expr, | ||
| scalar::ScalarValue, | ||
| }; | ||
| 
     | 
||
| use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; | ||
| use itertools::Itertools; | ||
| use object_store::{path::Path, ObjectStore}; | ||
| use relative_path::RelativePathBuf; | ||
| use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; | ||
| use url::Url; | ||
| 
     | 
||
| use crate::{ | ||
| catalog::{ | ||
| self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile, | ||
| column::{Column, TypedStatistics}, | ||
| manifest::{File, Manifest}, | ||
| snapshot::{ManifestItem, Snapshot}, | ||
| ManifestFile, Snapshot as CatalogSnapshot, | ||
| }, | ||
| event::DEFAULT_TIMESTAMP_KEY, | ||
| hottier::HotTierManager, | ||
| metrics::QUERY_CACHE_HIT, | ||
| parseable::PARSEABLE, | ||
| storage::ObjectStorage, | ||
| option::Mode, | ||
| parseable::{PARSEABLE, STREAM_EXISTS}, | ||
| storage::{ObjectStorage, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, | ||
| }; | ||
| 
     | 
||
| use super::listing_table_builder::ListingTableBuilder; | ||
| use crate::catalog::Snapshot as CatalogSnapshot; | ||
| 
     | 
||
| // schema provider for stream based on global data | ||
| #[derive(Debug)] | ||
| 
          
            
          
           | 
    @@ -142,9 +137,9 @@ impl StandardTableProvider { | |
| 
     | 
||
| let sort_expr = PhysicalSortExpr { | ||
| expr: if let Some(time_partition) = time_partition { | ||
| physical_plan::expressions::col(&time_partition, &self.schema)? | ||
| col(&time_partition, &self.schema)? | ||
| } else { | ||
| physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)? | ||
| col(DEFAULT_TIMESTAMP_KEY, &self.schema)? | ||
| }, | ||
| options: SortOptions { | ||
| descending: true, | ||
| 
          
            
          
           | 
    @@ -223,6 +218,55 @@ impl StandardTableProvider { | |
| Ok(()) | ||
| } | ||
| 
     | 
||
| /// Create an exection plan over the records in arrows and parquet that are still in staging, awaiting push to object storage | ||
| async fn get_staging_execution_plan( | ||
| &self, | ||
| execution_plans: &mut Vec<Arc<dyn ExecutionPlan>>, | ||
| projection: Option<&Vec<usize>>, | ||
| filters: &[Expr], | ||
| limit: Option<usize>, | ||
| state: &dyn Session, | ||
| time_partition: Option<&String>, | ||
| ) -> Result<(), DataFusionError> { | ||
| let Ok(staging) = PARSEABLE.get_stream(&self.stream) else { | ||
| return Ok(()); | ||
| }; | ||
| 
     | 
||
| // Staging arrow exection plan | ||
| let records = staging.recordbatches_cloned(&self.schema); | ||
                
      
                  de-sh marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| let arrow_exec = reversed_mem_table(records, self.schema.clone())? | ||
| .scan(state, projection, filters, limit) | ||
| .await?; | ||
| execution_plans.push(arrow_exec); | ||
| 
     | 
||
| // Partititon parquet files on disk among the available CPUs | ||
| let target_partition = num_cpus::get(); | ||
| let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); | ||
| for (index, file_path) in staging.parquet_files().into_iter().enumerate() { | ||
| let Ok(file_meta) = file_path.metadata() else { | ||
| continue; | ||
| }; | ||
| let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len()); | ||
| partitioned_files[index % target_partition].push(file) | ||
| } | ||
                
       | 
||
| 
     | 
||
| // NOTE: There is the possibility of a parquet file being pushed to object store | ||
| // and deleted from staging in the time it takes for datafusion to get to it. | ||
| // Staging parquet execution plan | ||
                
      
                  de-sh marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| self.create_parquet_physical_plan( | ||
| execution_plans, | ||
| ObjectStoreUrl::parse("file:///").unwrap(), | ||
| partitioned_files, | ||
| Statistics::new_unknown(&self.schema), | ||
| projection, | ||
| filters, | ||
| limit, | ||
| state, | ||
| time_partition.cloned(), | ||
| ) | ||
| .await | ||
| } | ||
| 
     | 
||
| #[allow(clippy::too_many_arguments)] | ||
| async fn legacy_listing_table( | ||
| &self, | ||
| 
          
            
          
           | 
    @@ -277,20 +321,19 @@ impl StandardTableProvider { | |
| 
     | 
||
| fn partitioned_files( | ||
| &self, | ||
| manifest_files: Vec<catalog::manifest::File>, | ||
| manifest_files: Vec<File>, | ||
| ) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) { | ||
| let target_partition = num_cpus::get(); | ||
| let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); | ||
| let mut column_statistics = | ||
| HashMap::<String, Option<catalog::column::TypedStatistics>>::new(); | ||
| let mut column_statistics = HashMap::<String, Option<TypedStatistics>>::new(); | ||
| let mut count = 0; | ||
| for (index, file) in manifest_files | ||
| .into_iter() | ||
| .enumerate() | ||
| .map(|(x, y)| (x % target_partition, y)) | ||
| { | ||
| #[allow(unused_mut)] | ||
| let catalog::manifest::File { | ||
| let File { | ||
| mut file_path, | ||
| num_rows, | ||
| columns, | ||
| 
          
            
          
           | 
    @@ -357,12 +400,12 @@ impl StandardTableProvider { | |
| } | ||
| 
     | 
||
| async fn collect_from_snapshot( | ||
| snapshot: &catalog::snapshot::Snapshot, | ||
| snapshot: &Snapshot, | ||
| time_filters: &[PartialTimeFilter], | ||
| object_store: Arc<dyn ObjectStore>, | ||
| filters: &[Expr], | ||
| limit: Option<usize>, | ||
| ) -> Result<Vec<catalog::manifest::File>, DataFusionError> { | ||
| ) -> Result<Vec<File>, DataFusionError> { | ||
| let items = snapshot.manifests(time_filters); | ||
| let manifest_files = collect_manifest_files( | ||
| object_store, | ||
| 
          
            
          
           | 
    @@ -443,17 +486,17 @@ impl TableProvider for StandardTableProvider { | |
| } | ||
| 
     | 
||
| if is_within_staging_window(&time_filters) { | ||
| if let Ok(staging) = PARSEABLE.get_stream(&self.stream) { | ||
| let records = staging.recordbatches_cloned(&self.schema); | ||
| let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; | ||
| 
     | 
||
| let memory_exec = reversed_mem_table | ||
| .scan(state, projection, filters, limit) | ||
| .await?; | ||
| execution_plans.push(memory_exec); | ||
| } | ||
| self.get_staging_execution_plan( | ||
| &mut execution_plans, | ||
| projection, | ||
| filters, | ||
| limit, | ||
| state, | ||
| time_partition.as_ref(), | ||
| ) | ||
| .await?; | ||
| }; | ||
| let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); | ||
| let mut merged_snapshot = Snapshot::default(); | ||
| if PARSEABLE.options.mode == Mode::Query { | ||
| let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); | ||
| let obs = glob_storage | ||
| 
          
            
          
           | 
    @@ -848,7 +891,7 @@ pub fn extract_primary_filter( | |
| } | ||
| 
     | 
||
| trait ManifestExt: ManifestFile { | ||
| fn find_matching_column(&self, partial_filter: &Expr) -> Option<&catalog::column::Column> { | ||
| fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> { | ||
| let name = match partial_filter { | ||
| Expr::BinaryExpr(binary_expr) => { | ||
| let Expr::Column(col) = binary_expr.left.as_ref() else { | ||
| 
          
            
          
           | 
    ||
Uh oh!
There was an error while loading. Please reload this page.