Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ fn hash_struct_array(
Ok(())
}

// only adding this `cfg` b/c this function is only used with this `cfg`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_map_array(
array: &MapArray,
random_state: &RandomState,
Expand Down
16 changes: 8 additions & 8 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ use object_store::{ObjectMeta, ObjectStore};
/// - the table provider can filter the table partition values with this expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice removal of some clones 👍

let mut is_applicable = true;
expr.apply(|expr| {
match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(name);
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Expand Down Expand Up @@ -745,27 +745,27 @@ mod tests {
#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
&[String::from("c1")],
&["c1"],
&Expr::eq(col("c1"), lit("value"))
));
assert!(!expr_applicable_for_cols(
&[String::from("c1")],
&["c1"],
&Expr::eq(col("c2"), lit("value"))
));
assert!(!expr_applicable_for_cols(
&[String::from("c1")],
&["c1"],
&Expr::eq(col("c1"), col("c2"))
));
assert!(expr_applicable_for_cols(
&[String::from("c1"), String::from("c2")],
&["c1", "c2"],
&Expr::eq(col("c1"), col("c2"))
));
assert!(expr_applicable_for_cols(
&[String::from("c1"), String::from("c2")],
&["c1", "c2"],
&(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
));
assert!(expr_applicable_for_cols(
&[String::from("c1"), String::from("c2")],
&["c1", "c2"],
&(case(col("c1"))
.when(lit("v1"), lit(true))
.otherwise(lit(false))
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,15 +826,15 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let support: Vec<_> = filters
Ok(filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.map(|x| x.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
Expand All @@ -846,8 +846,7 @@ impl TableProvider for ListingTable {
TableProviderFilterPushDown::Inexact
}
})
.collect();
Ok(support)
.collect())
}

fn get_table_definition(&self) -> Option<&str> {
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,10 +685,12 @@ impl ExecutionPlan for ParquetExec {
partition_index: usize,
ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let projection = match self.base_config.file_column_projection_indices() {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
let projection = self
.base_config
.file_column_projection_indices()
.unwrap_or_else(|| {
(0..self.base_config.file_schema.fields().len()).collect()
});

let parquet_file_reader_factory = self
.parquet_file_reader_factory
Expand All @@ -698,8 +700,7 @@ impl ExecutionPlan for ParquetExec {
ctx.runtime_env()
.object_store(&self.base_config.object_store_url)
.map(|store| {
Arc::new(DefaultParquetFileReaderFactory::new(store))
as Arc<dyn ParquetFileReaderFactory>
Arc::new(DefaultParquetFileReaderFactory::new(store)) as _
})
})?;

Expand Down
95 changes: 33 additions & 62 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
//! the unsorted predicates. Within each partition, predicates are
//! still be sorted by size.

use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;

Expand Down Expand Up @@ -129,7 +130,7 @@ impl DatafusionArrowPredicate {
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
_ => remap_projection(&candidate.projection),
2.. => remap_projection(&candidate.projection),
};

Ok(Self {
Expand All @@ -151,32 +152,31 @@ impl ArrowPredicate for DatafusionArrowPredicate {
&self.projection_mask
}

fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let batch = match self.projection.is_empty() {
true => batch,
false => batch.project(&self.projection)?,
fn evaluate(&mut self, mut batch: RecordBatch) -> ArrowResult<BooleanArray> {
if !self.projection.is_empty() {
batch = batch.project(&self.projection)?;
};

let batch = self.schema_mapping.map_partial_batch(batch)?;

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
.physical_expr

self.physical_expr
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
.and_then(|array| {
let bool_arr = as_boolean_array(&array)?.clone();
let num_filtered = bool_arr.len() - bool_arr.true_count();
self.rows_filtered.add(num_filtered);
timer.stop();
Ok(bool_arr)
}
Err(e) => Err(ArrowError::ComputeError(format!(
"Error evaluating filter predicate: {e:?}"
))),
}
})
.map_err(|e| {
ArrowError::ComputeError(format!(
"Error evaluating filter predicate: {e:?}"
))
})
}
}

Expand Down Expand Up @@ -453,62 +453,33 @@ pub fn build_row_filter(

// no candidates
if candidates.is_empty() {
Ok(None)
} else if reorder_predicates {
// attempt to reorder the predicates by size and whether they are sorted
candidates.sort_by_key(|c| c.required_bytes);

let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) =
candidates.into_iter().partition(|c| c.can_use_index);

let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];

for candidate in indexed_candidates {
let filter = DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
}

for candidate in other_candidates {
let filter = DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;
return Ok(None);
}

filters.push(Box::new(filter));
}
if reorder_predicates {
candidates.sort_unstable_by(|c1, c2| {
match c1.can_use_index.cmp(&c2.can_use_index) {
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
ord => ord,
}
});
}

Ok(Some(RowFilter::new(filters)))
} else {
// otherwise evaluate the predicates in the order the appeared in the
// original expressions
let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
for candidate in candidates {
let filter = DatafusionArrowPredicate::try_new(
candidates
.into_iter()
.map(|candidate| {
DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
}

Ok(Some(RowFilter::new(filters)))
}
)
.map(|pred| Box::new(pred) as _)
})
.collect::<Result<Vec<_>, _>>()
.map(|filters| Some(RowFilter::new(filters)))
}

#[cfg(test)]
Expand Down
26 changes: 20 additions & 6 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
use std::mem;
use std::sync::Arc;

use arrow_schema::DataType;
use futures::{Stream, StreamExt};

use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;

use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use crate::physical_plan::{ColumnStatistics, Statistics};

#[cfg(feature = "parquet")]
use crate::{
arrow::datatypes::Schema,
functions_aggregate::min_max::{MaxAccumulator, MinAccumulator},
physical_plan::Accumulator,
};

use super::listing::PartitionedFile;

Expand Down Expand Up @@ -144,6 +149,8 @@ pub async fn get_statistics_with_limit(
Ok((result_files, statistics))
}

// only adding this cfg b/c this is the only feature it's used with currently
#[cfg(feature = "parquet")]
pub(crate) fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
Expand Down Expand Up @@ -175,6 +182,8 @@ fn add_row_stats(
}
}

// only adding this cfg b/c this is the only feature it's used with currently
#[cfg(feature = "parquet")]
pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
Expand Down Expand Up @@ -205,8 +214,13 @@ pub(crate) fn get_col_stats(
// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
// The reason min/max aggregate produces unpacked output because there is only one
// min/max value per group; there is no needs to keep them Dictionary encode
fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
if let DataType::Dictionary(_, value_type) = input_type {
//
// only adding this cfg b/c this is the only feature it's used with currently
#[cfg(feature = "parquet")]
fn min_max_aggregate_data_type(
input_type: &arrow_schema::DataType,
) -> &arrow_schema::DataType {
if let arrow_schema::DataType::Dictionary(_, value_type) = input_type {
value_type.as_ref()
} else {
input_type
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ impl SessionStateDefaults {

/// returns the list of default [`ScalarUDF']'s
pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
#[cfg_attr(not(feature = "nested_expressions"), allow(unused_mut))]
let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();

#[cfg(feature = "nested_expressions")]
functions.append(&mut functions_nested::all_default_nested_functions());

Expand Down Expand Up @@ -144,6 +146,7 @@ impl SessionStateDefaults {
}

/// registers all the builtin array functions
#[cfg_attr(not(feature = "nested_expressions"), allow(unused_variables))]
pub fn register_array_functions(state: &mut SessionState) {
// register crate of array expressions (if enabled)
#[cfg(feature = "nested_expressions")]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ impl PruningPredicate {
is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
}

// this is only used by `parquet` feature right now
#[allow(dead_code)]
pub(crate) fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}
Expand Down Expand Up @@ -746,6 +748,8 @@ impl RequiredColumns {
/// * `a > 5 OR a < 10` returns `Some(a)`
/// * `a > 5 OR b < 10` returns `None`
/// * `true` returns None
#[allow(dead_code)]
// this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> {
if self.columns.windows(2).all(|w| {
// check if all columns are the same (ignoring statistics and field)
Expand Down
Loading