Skip to content

Commit e36bf79

Browse files
committed
fix and add docs on schemas
1 parent 2c2ea99 commit e36bf79

File tree

4 files changed

+105
-24
lines changed

4 files changed

+105
-24
lines changed

datafusion-examples/examples/struct_field_rewrite.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,6 @@ impl TreeNodeRewriter for StructFieldRewriterImpl {
353353
.file_schema
354354
.field_with_name(&expected_flattened_column_name)
355355
{
356-
println!("source_field: {:?}", source_field);
357-
println!("shredded_field: {:?}", shredded_field);
358356
if source_field.data_type() == shredded_field.data_type()
359357
{
360358
// Rewrite the expression to use the flattened column

datafusion/datasource-parquet/src/opener.rs

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20+
use std::collections::BTreeSet;
2021
use std::sync::Arc;
2122

2223
use crate::file_format::{
@@ -28,14 +29,16 @@ use crate::{
2829
row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileMetrics,
2930
ParquetFileReaderFactory,
3031
};
32+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
3133
use datafusion_datasource::file_expr_rewriter::FileExpressionRewriter;
3234
use datafusion_datasource::file_meta::FileMeta;
3335
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3436
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3537

36-
use arrow::datatypes::SchemaRef;
38+
use arrow::datatypes::{Field, Schema, SchemaRef};
3739
use arrow::error::ArrowError;
38-
use datafusion_common::{exec_err, Result};
40+
use datafusion_common::{exec_err, Result, SchemaError};
41+
use datafusion_physical_expr::expressions::Column;
3942
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4043
use datafusion_physical_optimizer::pruning::PruningPredicate;
4144
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
@@ -90,6 +93,17 @@ pub(super) struct ParquetOpener {
9093

9194
impl FileOpener for ParquetOpener {
9295
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
96+
// Note about schemas: we are actually dealing with _4_ different schemas here:
97+
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
98+
// - The "virtual" file schema: this is the table schema minus any hive partition columns. This is what the file schema is coerced to.
99+
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
100+
// - The filter schema: a hybrid of the virtual file schema and the physical file schema.
101+
// If a filter is rewritten to reference columns that are in the physical file schema but not the virtual file schema, we need to add those columns to the filter schema so that the filter can be evaluated.
102+
// This schema is generated by taking any columns from the virtual file schema that are referenced by the filter and adding any columns from the physical file schema that are referenced by the filter but not in the virtual file schema.
103+
// Columns from the virtual file schema are added in the order they appear in the virtual file schema.
104+
// The columns from the physical file schema are always added to the end of the schema, in the order they appear in the physical file schema.
105+
//
106+
// I think it might be wise to do some renaming of parameters where possible, e.g. rename `file_schema` to `table_schema_without_partition_columns` and `physical_file_schema` or something like that.
93107
let file_range = file_meta.range.clone();
94108
let extensions = file_meta.extensions.clone();
95109
let file_name = file_meta.location().to_string();
@@ -110,13 +124,12 @@ impl FileOpener for ParquetOpener {
110124

111125
let projected_schema =
112126
SchemaRef::from(self.table_schema.project(&self.projection)?);
113-
let schema_adapter = self
114-
.schema_adapter_factory
115-
.create(projected_schema, Arc::clone(&self.table_schema));
116127
let mut predicate = self.predicate.clone();
117128
let mut pruning_predicate = self.pruning_predicate.clone();
118129
let mut page_pruning_predicate = self.page_pruning_predicate.clone();
130+
let schema_adapter_factory = self.schema_adapter_factory.clone();
119131
let table_schema = Arc::clone(&self.table_schema);
132+
let mut filter_schema = table_schema.clone();
120133
let filter_expression_rewriter = self.filter_expression_rewriter.clone();
121134
let reorder_predicates = self.reorder_filters;
122135
let pushdown_filters = self.pushdown_filters;
@@ -165,14 +178,6 @@ impl FileOpener for ParquetOpener {
165178

166179
let file_schema = Arc::clone(builder.schema());
167180

168-
let (schema_mapping, adapted_projections) =
169-
schema_adapter.map_schema(&file_schema)?;
170-
171-
let mask = ProjectionMask::roots(
172-
builder.parquet_schema(),
173-
adapted_projections.iter().cloned(),
174-
);
175-
176181
// Try to rewrite the predicate using our filter expression rewriter
177182
// This must happen before build_row_filter and other subsequent steps
178183
// so that they all use the rewritten predicate if it was rewritten.
@@ -181,12 +186,18 @@ impl FileOpener for ParquetOpener {
181186
if let Ok(rewritten) = filter_rewriter
182187
.rewrite(Arc::clone(&file_schema), original_predicate)
183188
{
189+
let mut filter_schema_builder = FilterSchemaBuilder::new(
190+
&file_schema,
191+
&table_schema,
192+
);
193+
rewritten.visit(&mut filter_schema_builder)?;
194+
filter_schema = filter_schema_builder.build();
184195
// If we rewrote the filter we need to recompute the pruning predicates to match the new filter.
185196
page_pruning_predicate =
186-
Some(build_page_pruning_predicate(&rewritten, &file_schema));
197+
Some(build_page_pruning_predicate(&rewritten, &filter_schema));
187198
pruning_predicate = build_pruning_predicate(
188199
rewritten.clone(),
189-
&file_schema,
200+
&filter_schema,
190201
&predicate_creation_errors,
191202
);
192203
// Update the predicate to the rewritten version
@@ -195,12 +206,23 @@ impl FileOpener for ParquetOpener {
195206
}
196207
}
197208

209+
let schema_adapter = schema_adapter_factory
210+
.create(projected_schema, Arc::clone(&filter_schema));
211+
212+
let (schema_mapping, adapted_projections) =
213+
schema_adapter.map_schema(&file_schema)?;
214+
215+
let mask = ProjectionMask::roots(
216+
builder.parquet_schema(),
217+
adapted_projections.iter().cloned(),
218+
);
219+
198220
// Filter pushdown: evaluate predicates during scan
199221
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
200222
let row_filter = row_filter::build_row_filter(
201223
&predicate,
202224
&file_schema,
203-
&table_schema,
225+
&filter_schema,
204226
builder.metadata(),
205227
reorder_predicates,
206228
&file_metrics,
@@ -370,3 +392,68 @@ pub(crate) fn build_page_pruning_predicate(
370392
Arc::clone(file_schema),
371393
))
372394
}
395+
396+
/// A vistor for a PhysicalExpr that collects all column references to determine what columns the expression needs to be evaluated.
397+
struct FilterSchemaBuilder<'schema> {
398+
filter_schema_fields: BTreeSet<Arc<Field>>,
399+
file_schema: &'schema Schema,
400+
table_schema: &'schema Schema,
401+
}
402+
403+
impl<'schema> FilterSchemaBuilder<'schema> {
404+
fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> Self {
405+
Self {
406+
filter_schema_fields: BTreeSet::new(),
407+
file_schema,
408+
table_schema,
409+
}
410+
}
411+
412+
fn sort_fields(fields: &mut Vec<Arc<Field>>, table_schema: &Schema, file_schema: &Schema) {
413+
fields.sort_by_key(|f| f.name().to_string());
414+
fields.dedup_by_key(|f| f.name().to_string());
415+
fields.sort_by_key(|f| {
416+
let table_schema_index = table_schema.index_of(f.name()).unwrap_or(std::usize::MAX);
417+
let file_schema_index = file_schema.index_of(f.name()).unwrap_or(std::usize::MAX);
418+
(table_schema_index, file_schema_index)
419+
});
420+
}
421+
422+
fn build(self) -> SchemaRef {
423+
let mut fields = self.filter_schema_fields.into_iter().collect::<Vec<_>>();
424+
FilterSchemaBuilder::sort_fields(&mut fields, self.table_schema, self.file_schema);
425+
Arc::new(Schema::new(fields))
426+
}
427+
}
428+
429+
430+
impl<'node> TreeNodeVisitor<'node> for FilterSchemaBuilder<'_> {
431+
type Node = Arc<dyn PhysicalExpr>;
432+
433+
fn f_down(
434+
&mut self,
435+
node: &'node Arc<dyn PhysicalExpr>,
436+
) -> Result<TreeNodeRecursion> {
437+
if let Some(column) = node.as_any().downcast_ref::<Column>() {
438+
if let Ok(field) = self.table_schema.field_with_name(column.name()) {
439+
self.filter_schema_fields.insert(Arc::new(field.clone()));
440+
} else if let Ok(field) = self.file_schema.field_with_name(column.name()) {
441+
self.filter_schema_fields.insert(Arc::new(field.clone()));
442+
} else {
443+
// valid fields are the table schema's fields + the file schema's fields, preferring the table schema's fields when there is a conflict
444+
let mut valid_fields = self.table_schema.fields().iter().chain(self.file_schema.fields().iter()).cloned().collect::<Vec<_>>();
445+
FilterSchemaBuilder::sort_fields(&mut valid_fields, self.table_schema, self.file_schema);
446+
let valid_fields = valid_fields.into_iter().map(|f| datafusion_common::Column::new_unqualified(f.name())).collect();
447+
let field = datafusion_common::Column::new_unqualified(column.name());
448+
return Err(
449+
datafusion_common::DataFusionError::SchemaError(
450+
SchemaError::FieldNotFound { field: Box::new(field), valid_fields },
451+
Box::new(None),
452+
)
453+
)
454+
}
455+
}
456+
457+
Ok(TreeNodeRecursion::Continue)
458+
}
459+
}

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl<'a> FilterCandidateBuilder<'a> {
274274

275275
let required_bytes = size_of_columns(&required_indices, metadata)?;
276276
let can_use_index = columns_sorted(&required_indices, metadata)?;
277-
277+
278278
Ok(Some(FilterCandidate {
279279
expr: rewritten_expr,
280280
required_bytes,

datafusion/datasource/src/schema_adapter.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ impl SchemaMapper for SchemaMapping {
395395
.fields()
396396
.iter()
397397
.zip(batch_cols.iter())
398-
.map(|(field, batch_col)| {
398+
.flat_map(|(field, batch_col)| {
399399
self.table_schema
400400
// try to get the same field from the table schema that we have stored in self
401401
.field_with_name(field.name())
@@ -413,10 +413,6 @@ impl SchemaMapper for SchemaMapping {
413413
// and if that works, return the field and column.
414414
.map(|new_col| (new_col, table_field.clone()))
415415
})
416-
.unwrap_or_else(|| {
417-
// If we don't have the field in the table schema return it as-is from the file schema
418-
Ok((batch_col.clone(), Arc::unwrap_or_clone(field.clone())))
419-
})
420416
})
421417
.collect::<Result<Vec<_>, _>>()?
422418
.into_iter()

0 commit comments

Comments
 (0)