From 8a79f475ee7a3b67b8e56a6ae9ece94e78a6c2a7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Nov 2022 05:40:40 +0800 Subject: [PATCH 1/8] Support parquet page filtering for string columns Signed-off-by: yangjiang --- .../file_format/parquet/page_filter.rs | 15 +++++++++-- .../core/tests/parquet/filter_pushdown.rs | 25 ++++++++----------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 5f31a6a49d9f..ef9a6f88c917 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -17,7 +17,9 @@ //! Contains code to filter entire pages -use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array}; +use arrow::array::{ + BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, +}; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_optimizer::utils::split_conjunction; @@ -419,7 +421,16 @@ macro_rules! get_min_max_values_for_page_index { vec.iter().map(|x| x.$func().cloned()), ))) } - Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + Index::BYTE_ARRAY(index) => { + let vec = &index.indexes; + let array: StringArray = vec + .iter() + .map(|x| x.$func()) + .map(|x| x.and_then(|x| std::str::from_utf8(x).ok())) + .collect(); + Some(Arc::new(array)) + } + Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { //Todo support these type None } diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 657f00d0cea5..aba0e69eba56 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -266,20 +266,17 @@ async fn single_file_small_data_pages() { // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216 // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 - // - // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833 - // (page index pruning not implemented for byte array) - - // TestCase::new(&test_parquet_file) - // .with_name("selective") - // // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 - // // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' - // .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) - // .with_pushdown_expected(PushdownExpected::Some) - // .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) - // .with_expected_rows(2574) - // .run() - // .await; + + TestCase::new(&test_parquet_file) + .with_name("selective") + // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 + // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' + .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(2574) + .run() + .await; // time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN // -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- From fc8475439bbb5613dc09648e491e0d7ec7d09283 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 15 Nov 2022 17:19:48 +0800 Subject: [PATCH 2/8] Support parquet page filtering on min_max for decimal128 columns Signed-off-by: yangjiang --- .../src/physical_plan/file_format/parquet.rs | 41 ++++++++- .../file_format/parquet/page_filter.rs | 85 ++++++++++++++++--- .../file_format/parquet/row_groups.rs | 44 ++-------- .../core/tests/parquet/filter_pushdown.rs | 28 ++++++ test-utils/src/data_gen.rs | 13 ++- 5 files changed, 159 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index fa65d7656489..58b91d65bd08 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -17,7 +17,7 @@ //! Execution plan for reading Parquet files -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{DataType, SchemaRef}; use fmt::Debug; use std::any::Any; use std::fmt; @@ -55,8 +55,10 @@ use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::basic::{ConvertedType, LogicalType}; use parquet::errors::ParquetError; use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; +use parquet::schema::types::ColumnDescriptor; mod metrics; mod page_filter; @@ -654,6 +656,43 @@ pub async fn plan_to_parquet( } } +// TODO: consolidate code with arrow-rs +// Convert the bytes array to i128. +// The endian of the input bytes array must be big-endian. +// Copy from the arrow-rs +pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { + assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); + let first_bit = b[0] & 128u8 == 128u8; + let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; + for (i, v) in b.iter().enumerate() { + result[i + (16 - b.len())] = *v; + } + // The bytes array are from parquet file and must be the big-endian. + // The endian is defined by parquet format, and the reference document + // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 + i128::from_be_bytes(result) +} + +// Convert parquet column schema to arrow data type, and just consider the +// decimal data type. +pub(crate) fn parquet_to_arrow_decimal_type( + parquet_column: &ColumnDescriptor, +) -> Option { + let type_ptr = parquet_column.self_type_ptr(); + match type_ptr.get_basic_info().logical_type() { + Some(LogicalType::Decimal { scale, precision }) => { + Some(DataType::Decimal128(precision as u8, scale as u8)) + } + _ => match type_ptr.get_basic_info().converted_type() { + ConvertedType::DECIMAL => Some(DataType::Decimal128( + type_ptr.get_precision() as u8, + type_ptr.get_scale() as u8, + )), + _ => None, + }, + } +} + #[cfg(test)] mod tests { // See also `parquet_exec` integration test diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index ef9a6f88c917..8efe6f4475db 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -18,12 +18,15 @@ //! Contains code to filter entire pages use arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, + BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, }; +use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_optimizer::utils::split_conjunction; use log::{debug, error, trace}; +use parquet::schema::types::ColumnDescriptor; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, errors::ParquetError, @@ -37,6 +40,9 @@ use std::collections::VecDeque; use std::sync::Arc; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use crate::physical_plan::file_format::parquet::{ + from_bytes_to_i128, parquet_to_arrow_decimal_type, +}; use super::metrics::ParquetFileMetrics; @@ -134,6 +140,7 @@ pub(crate) fn build_page_filter( &predicate, rg_offset_indexes.get(col_id), rg_page_indexes.get(col_id), + groups[*r].column(col_id).column_descr(), file_metrics, ) .map_err(|e| { @@ -307,15 +314,18 @@ fn prune_pages_in_one_row_group( predicate: &PruningPredicate, col_offset_indexes: Option<&Vec>, col_page_indexes: Option<&Index>, + col_desc: &ColumnDescriptor, metrics: &ParquetFileMetrics, ) -> Result> { let num_rows = group.num_rows() as usize; if let (Some(col_offset_indexes), Some(col_page_indexes)) = (col_offset_indexes, col_page_indexes) { + let target_type = parquet_to_arrow_decimal_type(col_desc); let pruning_stats = PagesPruningStatistics { col_page_indexes, col_offset_indexes, + target_type: &target_type, }; match predicate.prune(&pruning_stats) { @@ -384,6 +394,9 @@ fn create_row_count_in_each_page( struct PagesPruningStatistics<'a> { col_page_indexes: &'a Index, col_offset_indexes: &'a Vec, + // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the + // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` + target_type: &'a Option, } // Extract the min or max value calling `func` from page idex @@ -392,16 +405,50 @@ macro_rules! get_min_max_values_for_page_index { match $self.col_page_indexes { Index::NONE => None, Index::INT32(index) => { - let vec = &index.indexes; - Some(Arc::new(Int32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) + match $self.target_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(arr) = Decimal128Array::from_iter_values( + vec.iter().map(|x| *x.$func().unwrap() as i128), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(arr)); + } else { + return None; + } + } + _ => { + let vec = &index.indexes; + Some(Arc::new(Int32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + } } Index::INT64(index) => { - let vec = &index.indexes; - Some(Arc::new(Int64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) + match $self.target_type { + // int64 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(arr) = Decimal128Array::from_iter_values( + vec.iter().map(|x| *x.$func().unwrap() as i128), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(arr)); + } else { + return None; + } + } + _ => { + let vec = &index.indexes; + Some(Arc::new(Int64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + } } Index::FLOAT(index) => { let vec = &index.indexes; @@ -430,10 +477,28 @@ macro_rules! get_min_max_values_for_page_index { .collect(); Some(Arc::new(array)) } - Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + Index::INT96(_) => { //Todo support these type None } + Index::FIXED_LEN_BYTE_ARRAY(index) => { + match $self.target_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(array) = Decimal128Array::from_iter_values( + vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(array)); + } else { + return None; + } + } + _ => None, + } + } } }}; } diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs index c7a281198d95..24b04ee67103 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs @@ -23,16 +23,17 @@ use datafusion_common::Column; use datafusion_common::ScalarValue; use log::debug; -use parquet::{ - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, - schema::types::ColumnDescriptor, +use parquet::file::{ + metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, }; +use crate::physical_plan::file_format::parquet::{ + from_bytes_to_i128, parquet_to_arrow_decimal_type, +}; use crate::{ datasource::listing::FileRange, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, }; -use parquet::basic::{ConvertedType, LogicalType}; use super::ParquetFileMetrics; @@ -85,23 +86,6 @@ struct RowGroupPruningStatistics<'a> { parquet_schema: &'a Schema, } -// TODO: consolidate code with arrow-rs -// Convert the bytes array to i128. -// The endian of the input bytes array must be big-endian. -// Copy from the arrow-rs -fn from_bytes_to_i128(b: &[u8]) -> i128 { - assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); - let first_bit = b[0] & 128u8 == 128u8; - let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; - for (i, v) in b.iter().enumerate() { - result[i + (16 - b.len())] = *v; - } - // The bytes array are from parquet file and must be the big-endian. - // The endian is defined by parquet format, and the reference document - // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 - i128::from_be_bytes(result) -} - /// Extract the min/max statistics from a `ParquetStatistics` object macro_rules! get_statistic { ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ @@ -217,24 +201,6 @@ macro_rules! get_null_count_values { }}; } -// Convert parquet column schema to arrow data type, and just consider the -// decimal data type. -fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { - let type_ptr = parquet_column.self_type_ptr(); - match type_ptr.get_basic_info().logical_type() { - Some(LogicalType::Decimal { scale, precision }) => { - Some(DataType::Decimal128(precision as u8, scale as u8)) - } - _ => match type_ptr.get_basic_info().converted_type() { - ConvertedType::DECIMAL => Some(DataType::Decimal128( - type_ptr.get_precision() as u8, - type_ptr.get_scale() as u8, - )), - _ => None, - }, - } -} - impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { get_min_max_values!(self, column, min, min_bytes) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index aba0e69eba56..999becafd0a5 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -296,6 +296,34 @@ async fn single_file_small_data_pages() { .with_expected_rows(9745) .run() .await; + + // decimal_price TV=53819 RL=0 DL=0 + // ---------------------------------------------------------------------------- + // row group 0: + // column index for column decimal_price: + // Boudary order: UNORDERED + // null count min max + // page-0 0 1 9216 + // page-1 0 9217 18432 + // page-2 0 18433 27648 + // page-3 0 27649 36864 + // page-4 0 36865 46080 + // page-5 0 46081 53819 + // + // offset index for column decimal_price: + // offset compressed size first row index + // page-0 5581636 147517 0 + // page-1 5729153 147517 9216 + TestCase::new(&test_parquet_file) + .with_name("selective_on_decimal") + // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5 + // decimal_price < 9200 + .with_filter(col("decimal_price").lt_eq(lit(9200))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(9200) + .run() + .await; } /// Expected pushdown behavior diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index adff4a514615..c82d56ef21f9 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -19,8 +19,8 @@ use std::ops::Range; use std::sync::Arc; use arrow::array::{ - Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, - UInt16Builder, + Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, + TimestampNanosecondBuilder, UInt16Builder, }; use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; @@ -43,6 +43,7 @@ struct BatchBuilder { request_bytes: Int32Builder, response_bytes: Int32Builder, response_status: UInt16Builder, + prices_status: Decimal128Builder, /// optional number of rows produced row_limit: Option, @@ -73,6 +74,7 @@ impl BatchBuilder { Field::new("request_bytes", DataType::Int32, true), Field::new("response_bytes", DataType::Int32, true), Field::new("response_status", DataType::UInt16, false), + Field::new("decimal_price", DataType::Decimal128(38, 0), false), ])) } @@ -146,6 +148,7 @@ impl BatchBuilder { .append_option(rng.gen_bool(0.9).then(|| rng.gen())); self.response_status .append_value(status[rng.gen_range(0..status.len())]); + self.prices_status.append_value(self.row_count as i128); } fn finish(mut self, schema: SchemaRef) -> RecordBatch { @@ -166,6 +169,12 @@ impl BatchBuilder { Arc::new(self.request_bytes.finish()), Arc::new(self.response_bytes.finish()), Arc::new(self.response_status.finish()), + Arc::new( + self.prices_status + .finish() + .with_precision_and_scale(38, 0) + .unwrap(), + ), ], ) .unwrap() From a719dffe96ca4692e8dae16dba4b21306e003a7f Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Fri, 18 Nov 2022 11:07:30 +0800 Subject: [PATCH 3/8] Update datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs Co-authored-by: Andrew Lamb --- .../core/src/physical_plan/file_format/parquet/page_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 8efe6f4475db..820ca68cca33 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -433,7 +433,7 @@ macro_rules! get_min_max_values_for_page_index { Some(DataType::Decimal128(precision, scale)) => { let vec = &index.indexes; if let Ok(arr) = Decimal128Array::from_iter_values( - vec.iter().map(|x| *x.$func().unwrap() as i128), + vec.iter().map(|x| *x.$func().ok()).map(|v| v as i128), ) .with_precision_and_scale(*precision, *scale) { From 4e29644abbea5a65574ed0c6c15c54a5a11640a4 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 18 Nov 2022 12:16:21 +0800 Subject: [PATCH 4/8] Avoid unwarp Signed-off-by: yangjiang --- .../file_format/parquet/page_filter.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 820ca68cca33..bf4e2861caf6 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -409,10 +409,12 @@ macro_rules! get_min_max_values_for_page_index { // int32 to decimal with the precision and scale Some(DataType::Decimal128(precision, scale)) => { let vec = &index.indexes; - if let Ok(arr) = Decimal128Array::from_iter_values( - vec.iter().map(|x| *x.$func().unwrap() as i128), - ) - .with_precision_and_scale(*precision, *scale) + let vec: Vec> = vec + .iter() + .map(|x| x.min().and_then(|x| Some(*x as i128))) + .collect(); + if let Ok(arr) = Decimal128Array::from(vec) + .with_precision_and_scale(*precision, *scale) { return Some(Arc::new(arr)); } else { @@ -432,10 +434,12 @@ macro_rules! get_min_max_values_for_page_index { // int64 to decimal with the precision and scale Some(DataType::Decimal128(precision, scale)) => { let vec = &index.indexes; - if let Ok(arr) = Decimal128Array::from_iter_values( - vec.iter().map(|x| *x.$func().ok()).map(|v| v as i128), - ) - .with_precision_and_scale(*precision, *scale) + let vec: Vec> = vec + .iter() + .map(|x| x.min().and_then(|x| Some(*x as i128))) + .collect(); + if let Ok(arr) = Decimal128Array::from(vec) + .with_precision_and_scale(*precision, *scale) { return Some(Arc::new(arr)); } else { From 5b6c4783d98fbe3f34f4cb4b305aece9ad1f3651 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 18 Nov 2022 22:54:19 +0800 Subject: [PATCH 5/8] reorg test code Signed-off-by: yangjiang --- datafusion/core/tests/parquet/mod.rs | 538 ++++++++++++++++++ datafusion/core/tests/parquet/page_pruning.rs | 45 +- .../core/tests/parquet/row_group_pruning.rs | 500 +--------------- 3 files changed, 590 insertions(+), 493 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index ab410bd76e48..020338021431 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -16,7 +16,545 @@ // under the License. //! Parquet integration tests +use arrow::array::Decimal128Array; +use arrow::{ + array::{ + Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + util::pretty::pretty_format_batches, +}; +use chrono::{Datelike, Duration}; +use datafusion::{ + datasource::{provider_as_source, TableProvider}, + physical_plan::{ + accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan, + ExecutionPlanVisitor, + }, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, +}; +use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use std::sync::Arc; +use tempfile::NamedTempFile; +use datafusion::config::OPT_PARQUET_ENABLE_PAGE_INDEX; + mod custom_reader; mod filter_pushdown; mod page_pruning; mod row_group_pruning; + +// ---------------------- +// Begin test fixture +// ---------------------- + +/// What data to use +enum Scenario { + Timestamps, + Dates, + Int32, + Float64, + Decimal, + DecimalLargePrecision, +} + +enum Unit { + RowGroup, + Page, +} + +/// Test fixture that has an execution context that has an external +/// table "t" registered, pointing at a parquet file made with +/// `make_test_file` +struct ContextWithParquet { + #[allow(dead_code)] + /// temp file parquet data is written to. The file is cleaned up + /// when dropped + file: NamedTempFile, + provider: Arc, + ctx: SessionContext, +} + +/// The output of running one of the test cases +struct TestOutput { + /// The input string + sql: String, + /// Execution metrics for the Parquet Scan + parquet_metrics: MetricsSet, + /// number of rows in results + result_rows: usize, + /// the contents of the input, as a string + pretty_input: String, + /// the raw results, as a string + pretty_results: String, +} + +impl TestOutput { + /// retrieve the value of the named metric, if any + fn metric_value(&self, metric_name: &str) -> Option { + self.parquet_metrics + .sum(|metric| metric.value().name() == metric_name) + .map(|v| v.as_usize()) + } + + /// The number of times the pruning predicate evaluation errors + fn predicate_evaluation_errors(&self) -> Option { + self.metric_value("predicate_evaluation_errors") + } + + /// The number of times the pruning predicate evaluation errors + fn row_groups_pruned(&self) -> Option { + self.metric_value("row_groups_pruned") + } + + /// The number of times the pruning predicate evaluation errors + fn row_pages_pruned(&self) -> Option { + self.metric_value("page_index_rows_filtered") + } + + fn description(&self) -> String { + format!( + "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}", + self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics, + ) + } +} + +/// Creates an execution context that has an external table "t" +/// registered pointing at a parquet file made with `make_test_file` +/// and the appropriate scenario +impl ContextWithParquet { + async fn new(scenario: Scenario, unit: Unit) -> Self { + Self::with_config(scenario, unit, SessionConfig::new()).await + } + + async fn with_config(scenario: Scenario, unit: Unit, config: SessionConfig) -> Self { + let file = match unit { + Unit::RowGroup => make_test_file_rg(scenario).await, + Unit::Page => { + config.config_options.write() + .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, true); + make_test_file_page(scenario).await + } + }; + let parquet_path = file.path().to_string_lossy(); + + // now, setup a the file as a data source and run a query against it + let ctx = SessionContext::with_config(config); + + ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default()) + .await + .unwrap(); + let provider = ctx.deregister_table("t").unwrap().unwrap(); + ctx.register_table("t", provider.clone()).unwrap(); + + Self { + file, + provider, + ctx, + } + } + + /// runs a query like "SELECT * from t WHERE and returns + /// the number of output rows and normalized execution metrics + async fn query_with_expr(&mut self, expr: Expr) -> TestOutput { + let sql = format!("EXPR only: {:?}", expr); + let logical_plan = LogicalPlanBuilder::scan( + "t", + provider_as_source(self.provider.clone()), + None, + ) + .unwrap() + .filter(expr) + .unwrap() + .build() + .unwrap(); + self.run_test(logical_plan, sql).await + } + + /// Runs the specified SQL query and returns the number of output + /// rows and normalized execution metrics + async fn query(&mut self, sql: &str) -> TestOutput { + println!("Planning sql {}", sql); + let logical_plan = self + .ctx + .sql(sql) + .await + .expect("planning") + .to_unoptimized_plan(); + self.run_test(logical_plan, sql).await + } + + /// runs the logical plan + async fn run_test( + &mut self, + logical_plan: LogicalPlan, + sql: impl Into, + ) -> TestOutput { + let input = self + .ctx + .sql("SELECT * from t") + .await + .expect("planning") + .collect() + .await + .expect("getting input"); + let pretty_input = pretty_format_batches(&input).unwrap().to_string(); + + let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); + + let physical_plan = self + .ctx + .create_physical_plan(&logical_plan) + .await + .expect("creating physical plan"); + + let task_ctx = self.ctx.task_ctx(); + let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx) + .await + .expect("Running"); + + // find the parquet metrics + struct MetricsFinder { + metrics: Option, + } + impl ExecutionPlanVisitor for MetricsFinder { + type Error = std::convert::Infallible; + fn pre_visit( + &mut self, + plan: &dyn ExecutionPlan, + ) -> Result { + if plan.as_any().downcast_ref::().is_some() { + self.metrics = plan.metrics(); + } + // stop searching once we have found the metrics + Ok(self.metrics.is_none()) + } + } + let mut finder = MetricsFinder { metrics: None }; + accept(physical_plan.as_ref(), &mut finder).unwrap(); + let parquet_metrics = finder.metrics.unwrap(); + + let result_rows = results.iter().map(|b| b.num_rows()).sum(); + + let pretty_results = pretty_format_batches(&results).unwrap().to_string(); + + let sql = sql.into(); + TestOutput { + sql, + parquet_metrics, + result_rows, + pretty_input, + pretty_results, + } + } +} + +/// Return record batch with a few rows of data for all of the supported timestamp types +/// values with the specified offset +/// +/// Columns are named: +/// "nanos" --> TimestampNanosecondArray +/// "micros" --> TimestampMicrosecondArray +/// "millis" --> TimestampMillisecondArray +/// "seconds" --> TimestampSecondArray +/// "names" --> StringArray +fn make_timestamp_batch(offset: Duration) -> RecordBatch { + let ts_strings = vec![ + Some("2020-01-01T01:01:01.0000000000001"), + Some("2020-01-01T01:02:01.0000000000001"), + Some("2020-01-01T02:01:01.0000000000001"), + None, + Some("2020-01-02T01:01:01.0000000000001"), + ]; + + let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos"); + + let ts_nanos = ts_strings + .into_iter() + .map(|t| { + t.map(|t| { + offset_nanos + + t.parse::() + .unwrap() + .timestamp_nanos() + }) + }) + .collect::>(); + + let ts_micros = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) + .collect::>(); + + let ts_millis = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) + .collect::>(); + + let ts_seconds = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) + .collect::>(); + + let names = ts_nanos + .iter() + .enumerate() + .map(|(i, _)| format!("Row {} + {}", i, offset)) + .collect::>(); + + let arr_nanos = TimestampNanosecondArray::from(ts_nanos); + let arr_micros = TimestampMicrosecondArray::from(ts_micros); + let arr_millis = TimestampMillisecondArray::from(ts_millis); + let arr_seconds = TimestampSecondArray::from(ts_seconds); + + let names = names.iter().map(|s| s.as_str()).collect::>(); + let arr_names = StringArray::from(names); + + let schema = Schema::new(vec![ + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("seconds", arr_seconds.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(arr_nanos), + Arc::new(arr_micros), + Arc::new(arr_millis), + Arc::new(arr_seconds), + Arc::new(arr_names), + ], + ) + .unwrap() +} + +/// Return record batch with i32 sequence +/// +/// Columns are named +/// "i" -> Int32Array +fn make_int32_batch(start: i32, end: i32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); + let v: Vec = (start..end).collect(); + let array = Arc::new(Int32Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + +/// Return record batch with f64 vector +/// +/// Columns are named +/// "f" -> Float64Array +fn make_f64_batch(v: Vec) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float64, true)])); + let array = Arc::new(Float64Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + +/// Return record batch with decimal vector +/// +/// Columns are named +/// "decimal_col" -> DecimalArray +fn make_decimal_batch(v: Vec, precision: u8, scale: u8) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "decimal_col", + DataType::Decimal128(precision, scale), + true, + )])); + let array = Arc::new( + Decimal128Array::from_iter_values(v) + .with_precision_and_scale(precision, scale) + .unwrap(), + ) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + +/// Return record batch with a few rows of data for all of the supported date +/// types with the specified offset (in days) +/// +/// Columns are named: +/// "date32" --> Date32Array +/// "date64" --> Date64Array +/// "names" --> StringArray +fn make_date_batch(offset: Duration) -> RecordBatch { + let date_strings = vec![ + Some("2020-01-01"), + Some("2020-01-02"), + Some("2020-01-03"), + None, + Some("2020-01-04"), + ]; + + let names = date_strings + .iter() + .enumerate() + .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val)) + .collect::>(); + + // Copied from `cast.rs` cast kernel due to lack of temporal kernels + // https://github.com/apache/arrow-rs/issues/527 + const EPOCH_DAYS_FROM_CE: i32 = 719_163; + + let date_seconds = date_strings + .iter() + .map(|t| { + t.map(|t| { + let t = t.parse::().unwrap(); + let t = t + offset; + t.num_days_from_ce() - EPOCH_DAYS_FROM_CE + }) + }) + .collect::>(); + + let date_millis = date_strings + .into_iter() + .map(|t| { + t.map(|t| { + let t = t + .parse::() + .unwrap() + .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); + let t = t + offset; + t.timestamp_millis() + }) + }) + .collect::>(); + + let arr_date32 = Date32Array::from(date_seconds); + let arr_date64 = Date64Array::from(date_millis); + + let names = names.iter().map(|s| s.as_str()).collect::>(); + let arr_names = StringArray::from(names); + + let schema = Schema::new(vec![ + Field::new("date32", arr_date32.data_type().clone(), true), + Field::new("date64", arr_date64.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(arr_date32), + Arc::new(arr_date64), + Arc::new(arr_names), + ], + ) + .unwrap() +} + +fn create_data_batch(scenario: Scenario) -> Vec { + match scenario { + Scenario::Timestamps => { + vec![ + make_timestamp_batch(Duration::seconds(0)), + make_timestamp_batch(Duration::seconds(10)), + make_timestamp_batch(Duration::minutes(10)), + make_timestamp_batch(Duration::days(10)), + ] + } + Scenario::Dates => { + vec![ + make_date_batch(Duration::days(0)), + make_date_batch(Duration::days(10)), + make_date_batch(Duration::days(300)), + make_date_batch(Duration::days(3600)), + ] + } + Scenario::Int32 => { + vec![ + make_int32_batch(-5, 0), + make_int32_batch(-4, 1), + make_int32_batch(0, 5), + make_int32_batch(5, 10), + ] + } + Scenario::Float64 => { + vec![ + make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), + make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]), + make_f64_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]), + make_f64_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]), + ] + } + Scenario::Decimal => { + // decimal record batch + vec![ + make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2), + make_decimal_batch(vec![-500, 100, 300, 400, 600], 9, 2), + make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2), + ] + } + Scenario::DecimalLargePrecision => { + // decimal record batch with large precision, + // and the data will stored as FIXED_LENGTH_BYTE_ARRAY + vec![ + make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2), + make_decimal_batch(vec![-500, 100, 300, 400, 600], 38, 2), + make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2), + ] + } + } +} + +/// Create a test parquet file with varioud data types +async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { + let mut output_file = tempfile::Builder::new() + .prefix("parquet_pruning") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + let props = WriterProperties::builder() + .set_max_row_group_size(5) + .build(); + + let batches = create_data_batch(scenario); + + let schema = batches[0].schema(); + + let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + + for batch in batches { + writer.write(&batch).expect("writing batch"); + } + writer.close().unwrap(); + + output_file +} + +async fn make_test_file_page(scenario: Scenario) -> NamedTempFile { + let mut output_file = tempfile::Builder::new() + .prefix("parquet_page_pruning") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + // set row count to 5, should get same result as rowGroup + let props = WriterProperties::builder() + .set_data_page_row_count_limit(5) + .set_write_batch_size(5) + .build(); + + let batches = create_data_batch(scenario); + + let schema = batches[0].schema(); + + let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + + for batch in batches { + writer.write(&batch).expect("writing batch"); + } + writer.close().unwrap(); + + let x = output_file.path().as_os_str().to_os_string(); + output_file +} diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 8d8c3bcae67a..a75d040c3ae1 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use crate::parquet::Unit::Page; +use crate::parquet::{ContextWithParquet, Scenario}; use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; @@ -22,8 +24,8 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; -use datafusion_common::Statistics; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::{ScalarValue, Statistics}; use datafusion_expr::{col, lit, Expr}; use object_store::path::Path; use object_store::ObjectMeta; @@ -142,6 +144,22 @@ async fn page_index_filter_one_col() { // should same with `month = 1` assert_eq!(batch.num_rows(), 645); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + // 5.create filter date_string_col == 1; + let filter = col("date_string_col").eq(lit("01/01/09")); + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + let batch = results.next().await.unwrap().unwrap(); + + // there should only two pages match the filter + // min max + // page-20 0 01/01/09 01/02/09 + // page-21 0 01/01/09 01/01/09 + // each 7 rows + assert_eq!(batch.num_rows(), 14); } #[tokio::test] @@ -204,3 +222,26 @@ async fn page_index_filter_multi_col() { let batch = results.next().await.unwrap().unwrap(); assert_eq!(batch.num_rows(), 7300); } + +async fn test_prune( + case_data_type: Scenario, + sql: &str, + expected_errors: Option, + expected_row_pages_pruned: Option, + expected_results: usize, +) { + let output = ContextWithParquet::new(case_data_type, Page) + .await + .query(sql) + .await; + + println!("{}", output.description()); + assert_eq!(output.predicate_evaluation_errors(), expected_errors); + assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned); + assert_eq!( + output.result_rows, + expected_results, + "{}", + output.description() + ); +} diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index c3de01b38380..c7e3c533f7d6 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -18,32 +18,12 @@ //! This file contains an end to end test of parquet pruning. It writes //! data into a parquet file and then verifies row groups are pruned as //! expected. -use std::sync::Arc; - -use arrow::array::Decimal128Array; -use arrow::{ - array::{ - Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, - }, - datatypes::{DataType, Field, Schema}, - record_batch::RecordBatch, - util::pretty::pretty_format_batches, -}; -use chrono::{Datelike, Duration}; -use datafusion::{ - datasource::{provider_as_source, TableProvider}, - physical_plan::{ - accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan, - ExecutionPlanVisitor, - }, - prelude::{ParquetReadOptions, SessionConfig, SessionContext}, - scalar::ScalarValue, -}; -use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; -use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; -use tempfile::NamedTempFile; +use datafusion::prelude::SessionConfig; +use datafusion_common::ScalarValue; + +use crate::parquet::Unit::RowGroup; +use crate::parquet::{ContextWithParquet, Scenario}; +use datafusion_expr::{col, lit}; async fn test_prune( case_data_type: Scenario, @@ -52,7 +32,7 @@ async fn test_prune( expected_row_group_pruned: Option, expected_results: usize, ) { - let output = ContextWithParquet::new(case_data_type) + let output = ContextWithParquet::new(case_data_type, RowGroup) .await .query(sql) .await; @@ -137,7 +117,7 @@ async fn prune_date64() { .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); let date = ScalarValue::Date64(Some(date.timestamp_millis())); - let output = ContextWithParquet::new(Scenario::Dates) + let output = ContextWithParquet::new(Scenario::Dates, RowGroup) .await .query_with_expr(col("date64").lt(lit(date))) // .query( @@ -169,7 +149,7 @@ async fn prune_disabled() { let expected_rows = 10; let config = SessionConfig::new().with_parquet_pruning(false); - let output = ContextWithParquet::with_config(Scenario::Timestamps, config) + let output = ContextWithParquet::with_config(Scenario::Timestamps, RowGroup, config) .await .query(query) .await; @@ -503,465 +483,3 @@ async fn prune_decimal_in_list() { ) .await; } - -// ---------------------- -// Begin test fixture -// ---------------------- - -/// What data to use -enum Scenario { - Timestamps, - Dates, - Int32, - Float64, - Decimal, - DecimalLargePrecision, -} - -/// Test fixture that has an execution context that has an external -/// table "t" registered, pointing at a parquet file made with -/// `make_test_file` -struct ContextWithParquet { - #[allow(dead_code)] - /// temp file parquet data is written to. The file is cleaned up - /// when dropped - file: NamedTempFile, - provider: Arc, - ctx: SessionContext, -} - -/// The output of running one of the test cases -struct TestOutput { - /// The input string - sql: String, - /// Execution metrics for the Parquet Scan - parquet_metrics: MetricsSet, - /// number of rows in results - result_rows: usize, - /// the contents of the input, as a string - pretty_input: String, - /// the raw results, as a string - pretty_results: String, -} - -impl TestOutput { - /// retrieve the value of the named metric, if any - fn metric_value(&self, metric_name: &str) -> Option { - self.parquet_metrics - .sum(|metric| metric.value().name() == metric_name) - .map(|v| v.as_usize()) - } - - /// The number of times the pruning predicate evaluation errors - fn predicate_evaluation_errors(&self) -> Option { - self.metric_value("predicate_evaluation_errors") - } - - /// The number of times the pruning predicate evaluation errors - fn row_groups_pruned(&self) -> Option { - self.metric_value("row_groups_pruned") - } - - fn description(&self) -> String { - format!( - "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}", - self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics, - ) - } -} - -/// Creates an execution context that has an external table "t" -/// registered pointing at a parquet file made with `make_test_file` -/// and the appropriate scenario -impl ContextWithParquet { - async fn new(scenario: Scenario) -> Self { - Self::with_config(scenario, SessionConfig::new()).await - } - - async fn with_config(scenario: Scenario, config: SessionConfig) -> Self { - let file = make_test_file(scenario).await; - let parquet_path = file.path().to_string_lossy(); - - // now, setup a the file as a data source and run a query against it - let ctx = SessionContext::with_config(config); - - ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default()) - .await - .unwrap(); - let provider = ctx.deregister_table("t").unwrap().unwrap(); - ctx.register_table("t", provider.clone()).unwrap(); - - Self { - file, - provider, - ctx, - } - } - - /// runs a query like "SELECT * from t WHERE and returns - /// the number of output rows and normalized execution metrics - async fn query_with_expr(&mut self, expr: Expr) -> TestOutput { - let sql = format!("EXPR only: {:?}", expr); - let logical_plan = LogicalPlanBuilder::scan( - "t", - provider_as_source(self.provider.clone()), - None, - ) - .unwrap() - .filter(expr) - .unwrap() - .build() - .unwrap(); - self.run_test(logical_plan, sql).await - } - - /// Runs the specified SQL query and returns the number of output - /// rows and normalized execution metrics - async fn query(&mut self, sql: &str) -> TestOutput { - println!("Planning sql {}", sql); - let logical_plan = self - .ctx - .sql(sql) - .await - .expect("planning") - .to_unoptimized_plan(); - self.run_test(logical_plan, sql).await - } - - /// runs the logical plan - async fn run_test( - &mut self, - logical_plan: LogicalPlan, - sql: impl Into, - ) -> TestOutput { - let input = self - .ctx - .sql("SELECT * from t") - .await - .expect("planning") - .collect() - .await - .expect("getting input"); - let pretty_input = pretty_format_batches(&input).unwrap().to_string(); - - let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); - - let physical_plan = self - .ctx - .create_physical_plan(&logical_plan) - .await - .expect("creating physical plan"); - - let task_ctx = self.ctx.task_ctx(); - let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx) - .await - .expect("Running"); - - // find the parquet metrics - struct MetricsFinder { - metrics: Option, - } - impl ExecutionPlanVisitor for MetricsFinder { - type Error = std::convert::Infallible; - fn pre_visit( - &mut self, - plan: &dyn ExecutionPlan, - ) -> Result { - if plan.as_any().downcast_ref::().is_some() { - self.metrics = plan.metrics(); - } - // stop searching once we have found the metrics - Ok(self.metrics.is_none()) - } - } - let mut finder = MetricsFinder { metrics: None }; - accept(physical_plan.as_ref(), &mut finder).unwrap(); - let parquet_metrics = finder.metrics.unwrap(); - - let result_rows = results.iter().map(|b| b.num_rows()).sum(); - - let pretty_results = pretty_format_batches(&results).unwrap().to_string(); - - let sql = sql.into(); - TestOutput { - sql, - parquet_metrics, - result_rows, - pretty_input, - pretty_results, - } - } -} - -/// Create a test parquet file with varioud data types -async fn make_test_file(scenario: Scenario) -> NamedTempFile { - let mut output_file = tempfile::Builder::new() - .prefix("parquet_pruning") - .suffix(".parquet") - .tempfile() - .expect("tempfile creation"); - - let props = WriterProperties::builder() - .set_max_row_group_size(5) - .build(); - - let batches = match scenario { - Scenario::Timestamps => { - vec![ - make_timestamp_batch(Duration::seconds(0)), - make_timestamp_batch(Duration::seconds(10)), - make_timestamp_batch(Duration::minutes(10)), - make_timestamp_batch(Duration::days(10)), - ] - } - Scenario::Dates => { - vec![ - make_date_batch(Duration::days(0)), - make_date_batch(Duration::days(10)), - make_date_batch(Duration::days(300)), - make_date_batch(Duration::days(3600)), - ] - } - Scenario::Int32 => { - vec![ - make_int32_batch(-5, 0), - make_int32_batch(-4, 1), - make_int32_batch(0, 5), - make_int32_batch(5, 10), - ] - } - Scenario::Float64 => { - vec![ - make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), - make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]), - make_f64_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]), - make_f64_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]), - ] - } - Scenario::Decimal => { - // decimal record batch - vec![ - make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2), - make_decimal_batch(vec![-500, 100, 300, 400, 600], 9, 2), - make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2), - ] - } - Scenario::DecimalLargePrecision => { - // decimal record batch with large precision, - // and the data will stored as FIXED_LENGTH_BYTE_ARRAY - vec![ - make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2), - make_decimal_batch(vec![-500, 100, 300, 400, 600], 38, 2), - make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2), - ] - } - }; - - let schema = batches[0].schema(); - - let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); - - for batch in batches { - writer.write(&batch).expect("writing batch"); - } - writer.close().unwrap(); - - output_file -} - -/// Return record batch with a few rows of data for all of the supported timestamp types -/// values with the specified offset -/// -/// Columns are named: -/// "nanos" --> TimestampNanosecondArray -/// "micros" --> TimestampMicrosecondArray -/// "millis" --> TimestampMillisecondArray -/// "seconds" --> TimestampSecondArray -/// "names" --> StringArray -fn make_timestamp_batch(offset: Duration) -> RecordBatch { - let ts_strings = vec![ - Some("2020-01-01T01:01:01.0000000000001"), - Some("2020-01-01T01:02:01.0000000000001"), - Some("2020-01-01T02:01:01.0000000000001"), - None, - Some("2020-01-02T01:01:01.0000000000001"), - ]; - - let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos"); - - let ts_nanos = ts_strings - .into_iter() - .map(|t| { - t.map(|t| { - offset_nanos - + t.parse::() - .unwrap() - .timestamp_nanos() - }) - }) - .collect::>(); - - let ts_micros = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) - .collect::>(); - - let ts_millis = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) - .collect::>(); - - let ts_seconds = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) - .collect::>(); - - let names = ts_nanos - .iter() - .enumerate() - .map(|(i, _)| format!("Row {} + {}", i, offset)) - .collect::>(); - - let arr_nanos = TimestampNanosecondArray::from(ts_nanos); - let arr_micros = TimestampMicrosecondArray::from(ts_micros); - let arr_millis = TimestampMillisecondArray::from(ts_millis); - let arr_seconds = TimestampSecondArray::from(ts_seconds); - - let names = names.iter().map(|s| s.as_str()).collect::>(); - let arr_names = StringArray::from(names); - - let schema = Schema::new(vec![ - Field::new("nanos", arr_nanos.data_type().clone(), true), - Field::new("micros", arr_micros.data_type().clone(), true), - Field::new("millis", arr_millis.data_type().clone(), true), - Field::new("seconds", arr_seconds.data_type().clone(), true), - Field::new("name", arr_names.data_type().clone(), true), - ]); - let schema = Arc::new(schema); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(arr_nanos), - Arc::new(arr_micros), - Arc::new(arr_millis), - Arc::new(arr_seconds), - Arc::new(arr_names), - ], - ) - .unwrap() -} - -/// Return record batch with i32 sequence -/// -/// Columns are named -/// "i" -> Int32Array -fn make_int32_batch(start: i32, end: i32) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); - let v: Vec = (start..end).collect(); - let array = Arc::new(Int32Array::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() -} - -/// Return record batch with f64 vector -/// -/// Columns are named -/// "f" -> Float64Array -fn make_f64_batch(v: Vec) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float64, true)])); - let array = Arc::new(Float64Array::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() -} - -/// Return record batch with decimal vector -/// -/// Columns are named -/// "decimal_col" -> DecimalArray -fn make_decimal_batch(v: Vec, precision: u8, scale: u8) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new( - "decimal_col", - DataType::Decimal128(precision, scale), - true, - )])); - let array = Arc::new( - Decimal128Array::from_iter_values(v) - .with_precision_and_scale(precision, scale) - .unwrap(), - ) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() -} - -/// Return record batch with a few rows of data for all of the supported date -/// types with the specified offset (in days) -/// -/// Columns are named: -/// "date32" --> Date32Array -/// "date64" --> Date64Array -/// "names" --> StringArray -fn make_date_batch(offset: Duration) -> RecordBatch { - let date_strings = vec![ - Some("2020-01-01"), - Some("2020-01-02"), - Some("2020-01-03"), - None, - Some("2020-01-04"), - ]; - - let names = date_strings - .iter() - .enumerate() - .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val)) - .collect::>(); - - // Copied from `cast.rs` cast kernel due to lack of temporal kernels - // https://github.com/apache/arrow-rs/issues/527 - const EPOCH_DAYS_FROM_CE: i32 = 719_163; - - let date_seconds = date_strings - .iter() - .map(|t| { - t.map(|t| { - let t = t.parse::().unwrap(); - let t = t + offset; - t.num_days_from_ce() - EPOCH_DAYS_FROM_CE - }) - }) - .collect::>(); - - let date_millis = date_strings - .into_iter() - .map(|t| { - t.map(|t| { - let t = t - .parse::() - .unwrap() - .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); - let t = t + offset; - t.timestamp_millis() - }) - }) - .collect::>(); - - let arr_date32 = Date32Array::from(date_seconds); - let arr_date64 = Date64Array::from(date_millis); - - let names = names.iter().map(|s| s.as_str()).collect::>(); - let arr_names = StringArray::from(names); - - let schema = Schema::new(vec![ - Field::new("date32", arr_date32.data_type().clone(), true), - Field::new("date64", arr_date64.data_type().clone(), true), - Field::new("name", arr_names.data_type().clone(), true), - ]); - let schema = Arc::new(schema); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(arr_date32), - Arc::new(arr_date64), - Arc::new(arr_names), - ], - ) - .unwrap() -} From 4c81dca861935f66238e359d2e157a34910e0768 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 18 Nov 2022 23:29:55 +0800 Subject: [PATCH 6/8] add test for page index Signed-off-by: yangjiang --- datafusion/core/tests/parquet/mod.rs | 8 +- datafusion/core/tests/parquet/page_pruning.rs | 442 +++++++++++++++++- 2 files changed, 445 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 020338021431..255df515c791 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,6 +28,7 @@ use arrow::{ util::pretty::pretty_format_batches, }; use chrono::{Datelike, Duration}; +use datafusion::config::OPT_PARQUET_ENABLE_PAGE_INDEX; use datafusion::{ datasource::{provider_as_source, TableProvider}, physical_plan::{ @@ -41,7 +42,6 @@ use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use std::sync::Arc; use tempfile::NamedTempFile; -use datafusion::config::OPT_PARQUET_ENABLE_PAGE_INDEX; mod custom_reader; mod filter_pushdown; @@ -136,7 +136,9 @@ impl ContextWithParquet { let file = match unit { Unit::RowGroup => make_test_file_rg(scenario).await, Unit::Page => { - config.config_options.write() + config + .config_options + .write() .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, true); make_test_file_page(scenario).await } @@ -554,7 +556,5 @@ async fn make_test_file_page(scenario: Scenario) -> NamedTempFile { writer.write(&batch).expect("writing batch"); } writer.close().unwrap(); - - let x = output_file.path().as_os_str().to_os_string(); output_file } diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index a75d040c3ae1..8c0f6d68bed6 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -24,7 +24,7 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, Statistics}; use datafusion_expr::{col, lit, Expr}; use object_store::path::Path; @@ -245,3 +245,443 @@ async fn test_prune( output.description() ); } + +#[tokio::test] +// null count min max +// page-0 1 2020-01-01T01:01:01.000000000 2020-01-02T01:01:01.000000000 +// page-1 1 2020-01-01T01:01:11.000000000 2020-01-02T01:01:11.000000000 +// page-2 1 2020-01-01T01:11:01.000000000 2020-01-02T01:11:01.000000000 +// page-3 1 2020-01-11T01:01:01.000000000 2020-01-12T01:01:01.000000000 +async fn prune_timestamps_nanos() { + test_prune( + Scenario::Timestamps, + "SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')", + Some(0), + Some(5), + 10, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 1 2020-01-01T01:01:01.000000 2020-01-02T01:01:01.000000 +// page-1 1 2020-01-01T01:01:11.000000 2020-01-02T01:01:11.000000 +// page-2 1 2020-01-01T01:11:01.000000 2020-01-02T01:11:01.000000 +// page-3 1 2020-01-11T01:01:01.000000 2020-01-12T01:01:01.000000 +async fn prune_timestamps_micros() { + test_prune( + Scenario::Timestamps, + "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')", + Some(0), + Some(5), + 10, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 1 2020-01-01T01:01:01.000 2020-01-02T01:01:01.000 +// page-1 1 2020-01-01T01:01:11.000 2020-01-02T01:01:11.000 +// page-2 1 2020-01-01T01:11:01.000 2020-01-02T01:11:01.000 +// page-3 1 2020-01-11T01:01:01.000 2020-01-12T01:01:01.000 +async fn prune_timestamps_millis() { + test_prune( + Scenario::Timestamps, + "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 01:01:11Z')", + Some(0), + Some(5), + 10, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 1 1577840461 1577926861 +// page-1 1 1577840471 1577926871 +// page-2 1 1577841061 1577927461 +// page-3 1 1578704461 1578790861 + +async fn prune_timestamps_seconds() { + test_prune( + Scenario::Timestamps, + "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')", + Some(0), + Some(5), + 10, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 1 2020-01-01 2020-01-04 +// page-1 1 2020-01-11 2020-01-14 +// page-2 1 2020-10-27 2020-10-30 +// page-3 1 2029-11-09 2029-11-12 +async fn prune_date32() { + test_prune( + Scenario::Dates, + "SELECT * FROM t where date32 < cast('2020-01-02' as date)", + Some(0), + Some(15), + 1, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 1 2020-01-01 2020-01-04 +// page-1 1 2020-01-11 2020-01-14 +// page-2 1 2020-10-27 2020-10-30 +// page-3 1 2029-11-09 2029-11-12 +async fn prune_date64() { + // work around for not being able to cast Date32 to Date64 automatically + let date = "2020-01-02" + .parse::() + .unwrap() + .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); + let date = ScalarValue::Date64(Some(date.timestamp_millis())); + + let output = ContextWithParquet::new(Scenario::Dates, Page) + .await + .query_with_expr(col("date64").lt(lit(date))) + .await; + + println!("{}", output.description()); + // This should prune out groups without error + assert_eq!(output.predicate_evaluation_errors(), Some(0)); + assert_eq!(output.row_pages_pruned(), Some(15)); + assert_eq!(output.result_rows, 1, "{}", output.description()); +} + +#[tokio::test] +// null count min max +// page-0 0 -5 -1 +// page-1 0 -4 0 +// page-2 0 0 4 +// page-3 0 5 9 +async fn prune_int32_lt() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where i < 1", + Some(0), + Some(5), + 11, + ) + .await; + // result of sql "SELECT * FROM t where i < 1" is same as + // "SELECT * FROM t where -i > -1" + test_prune( + Scenario::Int32, + "SELECT * FROM t where -i > -1", + Some(0), + Some(5), + 11, + ) + .await; +} + +#[tokio::test] +async fn prune_int32_eq() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where i = 1", + Some(0), + Some(15), + 1, + ) + .await; +} +#[tokio::test] +#[ignore] +async fn prune_int32_scalar_fun_and_eq() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where abs(i) = 1 and i = 1", + Some(0), + Some(15), + 1, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_int32_scalar_fun() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where abs(i) = 1", + Some(0), + Some(0), + 3, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_int32_complex_expr() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where i+1 = 1", + Some(0), + Some(0), + 2, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_int32_complex_expr_subtract() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where 1-i > 1", + Some(0), + Some(0), + 9, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 0 -5.0 -1.0 +// page-1 0 -4.0 0.0 +// page-2 0 0.0 4.0 +// page-3 0 5.0 9.0 +async fn prune_f64_lt() { + test_prune( + Scenario::Float64, + "SELECT * FROM t where f < 1", + Some(0), + Some(5), + 11, + ) + .await; + test_prune( + Scenario::Float64, + "SELECT * FROM t where -f > -1", + Some(0), + Some(5), + 11, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_f64_scalar_fun_and_gt() { + // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1" + // only use "f >= 0" to prune + test_prune( + Scenario::Float64, + "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1", + Some(0), + Some(2), + 1, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_f64_scalar_fun() { + // result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported + test_prune( + Scenario::Float64, + "SELECT * FROM t where abs(f-1) <= 0.000001", + Some(0), + Some(0), + 1, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_f64_complex_expr() { + // result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported + test_prune( + Scenario::Float64, + "SELECT * FROM t where f+1 > 1.1", + Some(0), + Some(0), + 9, + ) + .await; +} + +#[tokio::test] +#[ignore] +async fn prune_f64_complex_expr_subtract() { + // result of sql "SELECT * FROM t where 1-f > 1" is not supported + test_prune( + Scenario::Float64, + "SELECT * FROM t where 1-f > 1", + Some(0), + Some(0), + 9, + ) + .await; +} + +#[tokio::test] +// null count min max +// page-0 0 -5 -1 +// page-1 0 -4 0 +// page-2 0 0 4 +// page-3 0 5 9 +async fn prune_int32_eq_in_list() { + // result of sql "SELECT * FROM t where in (1)" + test_prune( + Scenario::Int32, + "SELECT * FROM t where i in (1)", + Some(0), + Some(15), + 1, + ) + .await; +} + +#[tokio::test] +async fn prune_int32_eq_in_list_negated() { + // result of sql "SELECT * FROM t where not in (1)" prune nothing + test_prune( + Scenario::Int32, + "SELECT * FROM t where i not in (1)", + Some(0), + Some(0), + 19, + ) + .await; +} + +#[tokio::test] +async fn prune_decimal_lt() { + // The data type of decimal_col is decimal(9,2) + // There are three pages each 5 rows: + // [1.00, 6.00], [-5.00,6.00], [20.00,60.00] + test_prune( + Scenario::Decimal, + "SELECT * FROM t where decimal_col < 4", + Some(0), + Some(5), + 6, + ) + .await; + // compare with the casted decimal value + test_prune( + Scenario::Decimal, + "SELECT * FROM t where decimal_col < cast(4.55 as decimal(20,2))", + Some(0), + Some(5), + 8, + ) + .await; + + // The data type of decimal_col is decimal(38,2) + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col < 4", + Some(0), + Some(5), + 6, + ) + .await; + // compare with the casted decimal value + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col < cast(4.55 as decimal(20,2))", + Some(0), + Some(5), + 8, + ) + .await; +} + +#[tokio::test] +async fn prune_decimal_eq() { + // The data type of decimal_col is decimal(9,2) + // There are three pages: + // [1.00, 6.00], [-5.00,6.00], [20.00,60.00] + test_prune( + Scenario::Decimal, + "SELECT * FROM t where decimal_col = 4", + Some(0), + Some(5), + 2, + ) + .await; + test_prune( + Scenario::Decimal, + "SELECT * FROM t where decimal_col = 4.00", + Some(0), + Some(5), + 2, + ) + .await; + + // The data type of decimal_col is decimal(38,2) + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col = 4", + Some(0), + Some(5), + 2, + ) + .await; + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col = 4.00", + Some(0), + Some(5), + 2, + ) + .await; +} + +#[tokio::test] +async fn prune_decimal_in_list() { + // The data type of decimal_col is decimal(9,2) + // There are three pages: + // [1.00, 6.00], [-5.00,6.00], [20.00,60.00] + test_prune( + Scenario::Decimal, + "SELECT * FROM t where decimal_col in (4,3,2,123456789123)", + Some(0), + Some(5), + 5, + ) + .await; + test_prune( + Scenario::Decimal, + "SELECT * FROM t where decimal_col in (4.00,3.00,11.2345,1)", + Some(0), + Some(5), + 6, + ) + .await; + + // The data type of decimal_col is decimal(38,2) + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col in (4,3,2,123456789123)", + Some(0), + Some(5), + 5, + ) + .await; + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col in (4.00,3.00,11.2345,1)", + Some(0), + Some(5), + 6, + ) + .await; +} From d0bea7b9496ab01823f699bea536d4054a0aa485 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 22 Nov 2022 11:53:31 +0800 Subject: [PATCH 7/8] fix commet Signed-off-by: yangjiang --- .../file_format/parquet/page_filter.rs | 78 +++++++++++-------- datafusion/core/tests/parquet/page_pruning.rs | 29 +++++++ 2 files changed, 76 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index bf4e2861caf6..0b963c369697 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -411,15 +411,12 @@ macro_rules! get_min_max_values_for_page_index { let vec = &index.indexes; let vec: Vec> = vec .iter() - .map(|x| x.min().and_then(|x| Some(*x as i128))) + .map(|x| x.$func().and_then(|x| Some(*x as i128))) .collect(); - if let Ok(arr) = Decimal128Array::from(vec) + Decimal128Array::from(vec) .with_precision_and_scale(*precision, *scale) - { - return Some(Arc::new(arr)); - } else { - return None; - } + .ok() + .map(|arr| Arc::new(arr) as ArrayRef) } _ => { let vec = &index.indexes; @@ -436,15 +433,12 @@ macro_rules! get_min_max_values_for_page_index { let vec = &index.indexes; let vec: Vec> = vec .iter() - .map(|x| x.min().and_then(|x| Some(*x as i128))) + .map(|x| x.$func().and_then(|x| Some(*x as i128))) .collect(); - if let Ok(arr) = Decimal128Array::from(vec) + Decimal128Array::from(vec) .with_precision_and_scale(*precision, *scale) - { - return Some(Arc::new(arr)); - } else { - return None; - } + .ok() + .map(|arr| Arc::new(arr) as ArrayRef) } _ => { let vec = &index.indexes; @@ -485,24 +479,20 @@ macro_rules! get_min_max_values_for_page_index { //Todo support these type None } - Index::FIXED_LEN_BYTE_ARRAY(index) => { - match $self.target_type { - // int32 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - let vec = &index.indexes; - if let Ok(array) = Decimal128Array::from_iter_values( - vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())), - ) - .with_precision_and_scale(*precision, *scale) - { - return Some(Arc::new(array)); - } else { - return None; - } - } - _ => None, + Index::FIXED_LEN_BYTE_ARRAY(index) => match $self.target_type { + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + Decimal128Array::from( + vec.iter() + .map(|x| x.$func().and_then(|x| Some(from_bytes_to_i128(x)))) + .collect::>>(), + ) + .with_precision_and_scale(*precision, *scale) + .ok() + .map(|arr| Arc::new(arr) as ArrayRef) } - } + _ => None, + }, } }}; } @@ -510,6 +500,32 @@ macro_rules! get_min_max_values_for_page_index { impl<'a> PruningStatistics for PagesPruningStatistics<'a> { fn min_values(&self, _column: &Column) -> Option { get_min_max_values_for_page_index!(self, min) + // match self.col_page_indexes { + // Index::NONE => None, + // Index::FIXED_LEN_BYTE_ARRAY(index) => { + // match self.target_type { + // // int32 to decimal with the precision and scale + // Some(DataType::Decimal128(precision, scale)) => { + // let vec = &index.indexes; + // // if let Ok(array) = Decimal128Array::from_iter_values( + // // vec.iter().map(|x| from_bytes_to_i128(x.min().unwrap())), + // // ) + // // .with_precision_and_scale(*precision, *scale) + // // { + // // return Some(Arc::new(array)); + // // } else { + // // return None; + // // } + // Decimal128Array::from(vec) + // .with_precision_and_scale(*precision, *scale) + // .ok() + // .map(|arr| Arc::new(arr ) as ArrayRef) + // } + // _ => None, + // } + // } + // _ => {None} + // } } fn max_values(&self, _column: &Column) -> Option { diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 8c0f6d68bed6..8c67bc346d12 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -385,6 +385,27 @@ async fn prune_int32_lt() { .await; } +#[tokio::test] +async fn prune_int32_gt() { + test_prune( + Scenario::Int32, + "SELECT * FROM t where i > 8", + Some(0), + Some(15), + 1, + ) + .await; + + test_prune( + Scenario::Int32, + "SELECT * FROM t where -i < -8", + Some(0), + Some(15), + 1, + ) + .await; +} + #[tokio::test] async fn prune_int32_eq() { test_prune( @@ -643,6 +664,14 @@ async fn prune_decimal_eq() { 2, ) .await; + test_prune( + Scenario::DecimalLargePrecision, + "SELECT * FROM t where decimal_col = 30.00", + Some(0), + Some(10), + 2, + ) + .await; } #[tokio::test] From 9ebe5637e216deabd05ba2779ef844a90836ba1a Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 22 Nov 2022 11:58:41 +0800 Subject: [PATCH 8/8] remove code Signed-off-by: yangjiang --- .../file_format/parquet/page_filter.rs | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 0b963c369697..a1cf03666fad 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -500,32 +500,6 @@ macro_rules! get_min_max_values_for_page_index { impl<'a> PruningStatistics for PagesPruningStatistics<'a> { fn min_values(&self, _column: &Column) -> Option { get_min_max_values_for_page_index!(self, min) - // match self.col_page_indexes { - // Index::NONE => None, - // Index::FIXED_LEN_BYTE_ARRAY(index) => { - // match self.target_type { - // // int32 to decimal with the precision and scale - // Some(DataType::Decimal128(precision, scale)) => { - // let vec = &index.indexes; - // // if let Ok(array) = Decimal128Array::from_iter_values( - // // vec.iter().map(|x| from_bytes_to_i128(x.min().unwrap())), - // // ) - // // .with_precision_and_scale(*precision, *scale) - // // { - // // return Some(Arc::new(array)); - // // } else { - // // return None; - // // } - // Decimal128Array::from(vec) - // .with_precision_and_scale(*precision, *scale) - // .ok() - // .map(|arr| Arc::new(arr ) as ArrayRef) - // } - // _ => None, - // } - // } - // _ => {None} - // } } fn max_values(&self, _column: &Column) -> Option {