Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Execution plan for reading Parquet files

use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, SchemaRef};
use fmt::Debug;
use std::any::Any;
use std::fmt;
Expand Down Expand Up @@ -55,8 +55,10 @@ use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
use parquet::errors::ParquetError;
use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
use parquet::schema::types::ColumnDescriptor;

mod metrics;
mod page_filter;
Expand Down Expand Up @@ -654,6 +656,43 @@ pub async fn plan_to_parquet(
}
}

// TODO: consolidate code with arrow-rs
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
// Copy from the arrow-rs
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the common func here.

assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
parquet_column: &ColumnDescriptor,
) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Some(DataType::Decimal128(precision as u8, scale as u8))
}
_ => match type_ptr.get_basic_info().converted_type() {
ConvertedType::DECIMAL => Some(DataType::Decimal128(
type_ptr.get_precision() as u8,
type_ptr.get_scale() as u8,
)),
_ => None,
},
}
}

#[cfg(test)]
mod tests {
// See also `parquet_exec` integration test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

//! Contains code to filter entire pages

use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array};
use arrow::array::{
BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
StringArray,
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error, trace};
use parquet::schema::types::ColumnDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand All @@ -35,6 +40,9 @@ use std::collections::VecDeque;
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};

use super::metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -132,6 +140,7 @@ pub(crate) fn build_page_filter(
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
Expand Down Expand Up @@ -305,15 +314,18 @@ fn prune_pages_in_one_row_group(
predicate: &PruningPredicate,
col_offset_indexes: Option<&Vec<PageLocation>>,
col_page_indexes: Option<&Index>,
col_desc: &ColumnDescriptor,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
if let (Some(col_offset_indexes), Some(col_page_indexes)) =
(col_offset_indexes, col_page_indexes)
{
let target_type = parquet_to_arrow_decimal_type(col_desc);
let pruning_stats = PagesPruningStatistics {
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -382,6 +394,9 @@ fn create_row_count_in_each_page(
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
}

// Extract the min or max value calling `func` from page idex
Expand All @@ -390,16 +405,48 @@ macro_rules! get_min_max_values_for_page_index {
match $self.col_page_indexes {
Index::NONE => None,
Index::INT32(index) => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
let vec: Vec<Option<i128>> = vec
.iter()
.map(|x| x.$func().and_then(|x| Some(*x as i128)))
.collect();
Decimal128Array::from(vec)
.with_precision_and_scale(*precision, *scale)
.ok()
.map(|arr| Arc::new(arr) as ArrayRef)
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::INT64(index) => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int64 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
let vec: Vec<Option<i128>> = vec
.iter()
.map(|x| x.$func().and_then(|x| Some(*x as i128)))
.collect();
Decimal128Array::from(vec)
.with_precision_and_scale(*precision, *scale)
.ok()
.map(|arr| Arc::new(arr) as ArrayRef)
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::FLOAT(index) => {
let vec = &index.indexes;
Expand All @@ -419,10 +466,33 @@ macro_rules! get_min_max_values_for_page_index {
vec.iter().map(|x| x.$func().cloned()),
)))
}
Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
Index::BYTE_ARRAY(index) => {
let vec = &index.indexes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decimal should be supported for this logical type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow-rs contains the method of decoding decimal from byte array in ByteArrayReader

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, i prefer align with row group, do them together in other pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addition additional support in a follow on PR sounds like a good idea to me -- maybe we can file a ticket to track the work

let array: StringArray = vec
.iter()
.map(|x| x.$func())
.map(|x| x.and_then(|x| std::str::from_utf8(x).ok()))
.collect();
Some(Arc::new(array))
}
Index::INT96(_) => {
//Todo support these type
None
}
Index::FIXED_LEN_BYTE_ARRAY(index) => match $self.target_type {
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
Decimal128Array::from(
vec.iter()
.map(|x| x.$func().and_then(|x| Some(from_bytes_to_i128(x))))
.collect::<Vec<Option<i128>>>(),
)
.with_precision_and_scale(*precision, *scale)
.ok()
.map(|arr| Arc::new(arr) as ArrayRef)
}
_ => None,
},
}
}};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ use datafusion_common::Column;
use datafusion_common::ScalarValue;
use log::debug;

use parquet::{
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
schema::types::ColumnDescriptor,
use parquet::file::{
metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};

use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::{
datasource::listing::FileRange,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
};
use parquet::basic::{ConvertedType, LogicalType};

use super::ParquetFileMetrics;

Expand Down Expand Up @@ -85,23 +86,6 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}

// TODO: consolidate code with arrow-rs
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
// Copy from the arrow-rs
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
}

/// Extract the min/max statistics from a `ParquetStatistics` object
macro_rules! get_statistic {
($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
Expand Down Expand Up @@ -217,24 +201,6 @@ macro_rules! get_null_count_values {
}};
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Some(DataType::Decimal128(precision as u8, scale as u8))
}
_ => match type_ptr.get_basic_info().converted_type() {
ConvertedType::DECIMAL => Some(DataType::Decimal128(
type_ptr.get_precision() as u8,
type_ptr.get_scale() as u8,
)),
_ => None,
},
}
}

impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
Expand Down
53 changes: 39 additions & 14 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,20 +266,17 @@ async fn single_file_small_data_pages() {
// page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

// This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833
// (page index pruning not implemented for byte array)

// TestCase::new(&test_parquet_file)
// .with_name("selective")
// // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
// // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
// .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
// .with_pushdown_expected(PushdownExpected::Some)
// .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
// .with_expected_rows(2574)
// .run()
// .await;

TestCase::new(&test_parquet_file)
.with_name("selective")
// predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
// pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
.with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
.with_pushdown_expected(PushdownExpected::Some)
.with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
.with_expected_rows(2574)
.run()
.await;

// time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN
// --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Expand All @@ -299,6 +296,34 @@ async fn single_file_small_data_pages() {
.with_expected_rows(9745)
.run()
.await;

// decimal_price TV=53819 RL=0 DL=0
// ----------------------------------------------------------------------------
// row group 0:
// column index for column decimal_price:
// Boudary order: UNORDERED
// null count min max
// page-0 0 1 9216
// page-1 0 9217 18432
// page-2 0 18433 27648
// page-3 0 27649 36864
// page-4 0 36865 46080
// page-5 0 46081 53819
//
// offset index for column decimal_price:
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
TestCase::new(&test_parquet_file)
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
.with_filter(col("decimal_price").lt_eq(lit(9200)))
.with_pushdown_expected(PushdownExpected::Some)
.with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
.with_expected_rows(9200)
.run()
.await;
}

/// Expected pushdown behavior
Expand Down
Loading