From 9839ef135bd6c09b184017e697b3f40461e6acd3 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 9 Mar 2024 13:52:36 +0000 Subject: [PATCH 01/17] feat: add PartitionEvaluator Issue: 152 --- crates/iceberg/src/scan.rs | 299 ++++++++++++++++++++++++++-- crates/iceberg/src/spec/manifest.rs | 9 + 2 files changed, 295 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 852bcafbb9..2ced1620a6 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,14 +18,23 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; +use crate::expr::BoundPredicate::AlwaysTrue; +use crate::expr::{Bind, BoundPredicate, BoundReference, LogicalExpression, Predicate, PredicateOperator}; use crate::io::FileIO; -use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; +use crate::spec::{ + DataContentType, FieldSummary, Manifest, ManifestEntry, ManifestEntryRef, ManifestFile, + PartitionField, PartitionSpec, PartitionSpecRef, Schema, SchemaRef, SnapshotRef, + TableMetadataRef, Transform, +}; use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; -use async_stream::try_stream; use futures::stream::{iter, BoxStream}; use futures::StreamExt; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::Arc; +use async_stream::try_stream; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -34,6 +43,8 @@ pub struct TableScanBuilder<'a> { column_names: Vec, snapshot_id: Option, batch_size: Option, + case_sensitive: bool, + filter: Option, } impl<'a> TableScanBuilder<'a> { @@ -43,6 +54,8 @@ impl<'a> TableScanBuilder<'a> { column_names: vec![], snapshot_id: None, batch_size: None, + case_sensitive: true, + filter: None, } } @@ -53,6 +66,18 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the scan as being case-insensitive + pub fn with_case_insensitivity(mut self) -> Self { + self.case_sensitive = false; + self + } + + /// Specifies a predicate to use as a filter + pub fn with_filter(mut self, predicate: Predicate) -> Self { + self.filter = Some(predicate); + self + } + /// Select all columns. pub fn select_all(mut self) -> Self { self.column_names.clear(); @@ -122,6 +147,8 @@ impl<'a> TableScanBuilder<'a> { column_names: self.column_names, schema, batch_size: self.batch_size, + case_sensitive: self.case_sensitive, + filter: self.filter, }) } } @@ -136,6 +163,8 @@ pub struct TableScan { column_names: Vec, schema: SchemaRef, batch_size: Option, + case_sensitive: bool, + filter: Option, } /// A stream of [`FileScanTask`]. @@ -143,7 +172,12 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result>; impl TableScan { /// Returns a stream of file scan tasks. + + pub async fn plan_files(&self) -> crate::Result { + // Cache `PartitionEvaluator`s created as part of this scan + let mut partition_evaluator_cache: HashMap = HashMap::new(); + let snapshot = self.snapshot.clone(); let table_metadata = self.table_metadata.clone(); let file_io = self.file_io.clone(); @@ -159,6 +193,29 @@ impl TableScan { while let Some(entry) = entries.next().await { let manifest = entry.load_manifest(&file_io).await?; + // If this scan has a filter, check the partition evaluator cache for an existing + // PartitionEvaluator that matches this manifest's partition spec ID. + // Use one from the cache if there is one. If not, create one, put it in + // the cache, and take a reference to it. + let partition_evaluator = if let Some(filter) = self.filter.as_ref() { + Some( + partition_evaluator_cache + .entry(manifest.partition_spec_id()) + .or_insert_with_key(self.create_partition_evaluator(filter)) + .deref(), + ) + } else { + None + }; + + // If this scan has a filter, reject any manifest files whose partition values + // don't match the filter. + if let Some(partition_evaluator) = partition_evaluator { + if !partition_evaluator.filter_manifest_file(&entry) { + continue; + } + } + let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); while let Some(manifest_entry) = manifest_entries.next().await { match manifest_entry.content_type() { @@ -179,8 +236,17 @@ impl TableScan { } } } + }.boxed()) + } + + fn create_partition_evaluator(&self, filter: &Predicate) -> fn(&i32) -> crate::Result { + |&id| { + // TODO: predicate binding not yet merged to main + let bound_predicate = filter.bind(self.schema.clone(), self.case_sensitive)?; + + let partition_spec = self.table_metadata.partition_spec_by_id(id).unwrap(); + PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone()) } - .boxed()) } pub async fn to_arrow(&self) -> crate::Result { @@ -214,11 +280,221 @@ impl FileScanTask { } } +/// Evaluates manifest files to see if their partition values comply with a filter predicate +pub struct PartitionEvaluator { + manifest_eval_visitor: ManifestEvalVisitor, +} + +impl PartitionEvaluator { + pub(crate) fn new( + partition_spec: PartitionSpecRef, + partition_filter: BoundPredicate, + table_schema: SchemaRef, + ) -> crate::Result { + let manifest_eval_visitor = ManifestEvalVisitor::manifest_evaluator( + partition_spec, + table_schema, + partition_filter, + true, + )?; + + Ok(PartitionEvaluator { + manifest_eval_visitor, + }) + } + + pub(crate) fn filter_manifest_file(&self, _manifest_file: &ManifestFile) -> bool { + self.manifest_eval_visitor.eval(_manifest_file) + } +} + +struct ManifestEvalVisitor { + partition_schema: SchemaRef, + partition_filter: BoundPredicate, + case_sensitive: bool, +} + +impl ManifestEvalVisitor { + fn new(partition_schema: SchemaRef, partition_filter: Predicate, case_sensitive: bool) -> crate::Result { + let partition_filter = partition_filter.bind(partition_schema.clone(), case_sensitive)?; + + Ok(Self { + partition_schema, + partition_filter, + case_sensitive, + }) + } + + pub(crate) fn manifest_evaluator( + partition_spec: PartitionSpecRef, + table_schema: SchemaRef, + partition_filter: BoundPredicate, + case_sensitive: bool, + ) -> crate::Result { + let partition_type = partition_spec.partition_type(&table_schema)?; + let partition_schema = Schema::builder() + .with_fields(partition_type.fields()) + .build()?; + + let partition_schema_ref = Arc::new(partition_schema); + + let inclusive_projection = + InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); + let unbound_partition_filter = inclusive_projection.project(&partition_filter); + + Ok(Self::new( + partition_schema_ref.clone(), + unbound_partition_filter, + case_sensitive, + )?) + } + + pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> bool { + if manifest_file.partitions.is_empty() { + return true; + } + + self.visit(&self.partition_filter, &manifest_file.partitions) + } + + // see https://github.com/apache/iceberg-python/blob/ea9da8856a686eaeda0d5c2be78d5e3102b67c44/pyiceberg/expressions/visitors.py#L548 + fn visit(&self, predicate: &BoundPredicate, partitions: &Vec) -> bool { + match predicate { + AlwaysTrue => true, + BoundPredicate::AlwaysFalse => false, + BoundPredicate::And(expr) => { + self.visit(expr.inputs()[0], partitions) && self.visit(expr.inputs()[1], partitions) + } + BoundPredicate::Or(expr) => { + self.visit(expr.inputs()[0], partitions) || self.visit(expr.inputs()[1], partitions) + } + BoundPredicate::Not(_) => { + panic!("NOT predicates should be eliminated before calling this function") + } + BoundPredicate::Unary(expr) => { + // TODO: pos = term.ref().accessor.position in python impl + let pos = 0; + let field = &partitions[pos]; + match expr.op { + PredicateOperator::IsNull => field.contains_null, + PredicateOperator::NotNull => { + todo!() + } + PredicateOperator::IsNan => field.contains_nan.is_some(), + PredicateOperator::NotNan => { + todo!() + } + _ => { + panic!("unexpected op") + } + } + } + BoundPredicate::Binary(expr) => match expr.op { + PredicateOperator::LessThan => { + todo!() + } + PredicateOperator::LessThanOrEq => { + todo!() + } + PredicateOperator::GreaterThan => { + todo!() + } + PredicateOperator::GreaterThanOrEq => { + todo!() + } + PredicateOperator::Eq => { + todo!() + } + PredicateOperator::NotEq => { + todo!() + } + PredicateOperator::StartsWith => { + todo!() + } + PredicateOperator::NotStartsWith => { + todo!() + } + _ => { + panic!("unexpected op") + } + }, + BoundPredicate::Set(expr) => match expr.op { + PredicateOperator::In => { + todo!() + } + PredicateOperator::NotIn => true, + _ => { + panic!("unexpected op") + } + }, + } + } +} + +struct InclusiveProjection { + table_schema: SchemaRef, + partition_spec: PartitionSpecRef, +} + +impl InclusiveProjection { + pub(crate) fn new(table_schema: SchemaRef, partition_spec: PartitionSpecRef) -> Self { + Self { + table_schema, + partition_spec, + } + } + + pub(crate) fn project(&self, predicate: &BoundPredicate) -> Predicate { + // TODO: apply rewrite_not() as projection assumes that there are no NOT nodes + self.visit(predicate) + } + + fn visit(&self, bound_predicate: &BoundPredicate) -> Predicate { + match bound_predicate { + BoundPredicate::AlwaysTrue => Predicate::AlwaysTrue, + BoundPredicate::AlwaysFalse => Predicate::AlwaysFalse, + BoundPredicate::And(expr) => Predicate::And(LogicalExpression::new( + expr.inputs().map(|expr| Box::new(self.visit(expr))), + )), + BoundPredicate::Or(expr) => Predicate::Or(LogicalExpression::new( + expr.inputs().map(|expr| Box::new(self.visit(expr))), + )), + BoundPredicate::Not(_) => { + panic!("should not get here as NOT-rewriting should have removed NOT nodes") + } + bp => self.visit_bound_predicate(bp), + } + } + + fn visit_bound_predicate(&self, predicate: &BoundPredicate) -> Predicate { + let field_id = match predicate { + BoundPredicate::Unary(expr) => expr.field_id(), + BoundPredicate::Binary(expr) => expr.field_id(), + BoundPredicate::Set(expr) => expr.field_id(), + _ => { + panic!("Should not get here as these brances handled in self.visit") + } + }; + + // TODO: cache this? + let mut parts: Vec<&PartitionField> = vec![]; + for partition_spec_field in self.partition_spec.fields { + if partition_spec_field.source_id == field_id { + parts.push(&partition_spec_field) + } + } + + parts.iter().fold(Predicate::AlwaysTrue, |res, &part| { + res.and(part.transform.project(&part.name, &predicate)) + }) + } +} + #[cfg(test)] mod tests { use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, + DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, }; @@ -321,15 +597,14 @@ mod tests { ManifestEntry::builder() .status(ManifestStatus::Added) .data_file( - DataFileBuilder::default() + DataFile::builder() .content(DataContentType::Data) .file_path(format!("{}/1.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) - .build() - .unwrap(), + .build(), ) .build(), ManifestEntry::builder() @@ -338,15 +613,14 @@ mod tests { .sequence_number(parent_snapshot.sequence_number()) .file_sequence_number(parent_snapshot.sequence_number()) .data_file( - DataFileBuilder::default() + DataFile::builder() .content(DataContentType::Data) .file_path(format!("{}/2.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(200))])) - .build() - .unwrap(), + .build(), ) .build(), ManifestEntry::builder() @@ -355,15 +629,14 @@ mod tests { .sequence_number(parent_snapshot.sequence_number()) .file_sequence_number(parent_snapshot.sequence_number()) .data_file( - DataFileBuilder::default() + DataFile::builder() .content(DataContentType::Data) .file_path(format!("{}/3.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(300))])) - .build() - .unwrap(), + .build(), ) .build(), ], diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 44ac7cac50..cc0187a64e 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -101,6 +101,11 @@ impl Manifest { entries: entries.into_iter().map(Arc::new).collect(), } } + + /// Get the ID for this Manifest's partition_spec [`PartitionSpec`] + pub fn partition_spec_id(&self) -> i32 { + self.metadata.partition_spec.spec_id + } } /// A manifest writer. @@ -868,6 +873,10 @@ impl ManifestEntry { &self.data_file.file_path } + pub fn get_partition_struct(&self) -> &Struct { + &self.data_file.partition + } + /// Inherit data from manifest list, such as snapshot id, sequence number. pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) { if self.snapshot_id.is_none() { From 9a0f80fe91d2041770f0f40ac267f81df9b5b4cf Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sun, 17 Mar 2024 00:03:30 +0000 Subject: [PATCH 02/17] feat(wip): add ManifestEvalVisitor and InclusiveProjection --- Cargo.toml | 2 +- crates/iceberg/src/expr/predicate.rs | 54 ++++++++++++++++++-- crates/iceberg/src/scan.rs | 74 +++++++++++++++------------- crates/iceberg/src/spec/transform.rs | 7 +++ 4 files changed, 98 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7da16e00d4..9f4c34bba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ async-trait = "0.1" bimap = "0.6" bitvec = "1.0.1" bytes = "1.5" -chrono = "0.4" +chrono = "~0.4.34" derive_builder = "0.20.0" either = "1" env_logger = "0.11.0" diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index f8bcffe703..1f964eba66 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -46,7 +46,7 @@ impl Debug for LogicalExpression { } impl LogicalExpression { - fn new(inputs: [Box; N]) -> Self { + pub(crate) fn new(inputs: [Box; N]) -> Self { Self { inputs } } @@ -82,9 +82,9 @@ where #[derive(PartialEq)] pub struct UnaryExpression { /// Operator of this predicate, must be single operand operator. - op: PredicateOperator, + pub(crate) op: PredicateOperator, /// Term of this predicate, for example, `a` in `a IS NULL`. - term: T, + pub(crate) term: T, } impl Debug for UnaryExpression { @@ -116,13 +116,20 @@ impl UnaryExpression { debug_assert!(op.is_unary()); Self { op, term } } + + pub(crate) fn field_id(&self) -> i32 { + todo!(); + + // The below is not yet working since T may not implement `.field()` + // self.term.field().id + } } /// Binary predicate, for example, `a > 10`. #[derive(PartialEq)] pub struct BinaryExpression { /// Operator of this predicate, must be binary operator, such as `=`, `>`, `<`, etc. - op: PredicateOperator, + pub(crate) op: PredicateOperator, /// Term of this predicate, for example, `a` in `a > 10`. term: T, /// Literal of this predicate, for example, `10` in `a > 10`. @@ -144,6 +151,13 @@ impl BinaryExpression { debug_assert!(op.is_binary()); Self { op, term, literal } } + + pub(crate) fn field_id(&self) -> i32 { + todo!(); + + // The below is not yet working since T may not implement `.field()` + // self.term.field().id + } } impl Display for BinaryExpression { @@ -169,7 +183,7 @@ impl Bind for BinaryExpression { #[derive(PartialEq)] pub struct SetExpression { /// Operator of this predicate, must be set operator, such as `IN`, `NOT IN`, etc. - op: PredicateOperator, + pub(crate) op: PredicateOperator, /// Term of this predicate, for example, `a` in `a in (1, 2, 3)`. term: T, /// Literals of this predicate, for example, `(1, 2, 3)` in `a in (1, 2, 3)`. @@ -191,6 +205,13 @@ impl SetExpression { debug_assert!(op.is_set()); Self { op, term, literals } } + + pub(crate) fn field_id(&self) -> i32 { + todo!(); + + // The below is not yet working since T may not implement `.field()` + // self.term.field().id + } } impl Bind for SetExpression { @@ -217,6 +238,9 @@ impl Display for SetExpression { /// Unbound predicate expression before binding to a schema. #[derive(Debug, PartialEq)] pub enum Predicate { + AlwaysTrue, + AlwaysFalse, + /// And predicate, for example, `a > 10 AND b < 20`. And(LogicalExpression), /// Or predicate, for example, `a > 10 OR b < 20`. @@ -367,6 +391,8 @@ impl Bind for Predicate { bound_literals, ))) } + Predicate::AlwaysTrue => Ok(BoundPredicate::AlwaysTrue), + Predicate::AlwaysFalse => Ok(BoundPredicate::AlwaysFalse), } } } @@ -374,6 +400,12 @@ impl Bind for Predicate { impl Display for Predicate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { + Predicate::AlwaysTrue => { + write!(f, "TRUE") + } + Predicate::AlwaysFalse => { + write!(f, "FALSE") + } Predicate::And(expr) => { write!(f, "({}) AND ({})", expr.inputs()[0], expr.inputs()[1]) } @@ -461,6 +493,8 @@ impl Predicate { /// ``` pub fn negate(self) -> Predicate { match self { + Predicate::AlwaysTrue => Predicate::AlwaysFalse, + Predicate::AlwaysFalse => Predicate::AlwaysTrue, Predicate::And(expr) => Predicate::Or(LogicalExpression::new( expr.inputs.map(|expr| Box::new(expr.negate())), )), @@ -525,6 +559,8 @@ impl Predicate { Predicate::Unary(expr) => Predicate::Unary(expr), Predicate::Binary(expr) => Predicate::Binary(expr), Predicate::Set(expr) => Predicate::Set(expr), + Predicate::AlwaysTrue => Predicate::AlwaysTrue, + Predicate::AlwaysFalse => Predicate::AlwaysFalse, } } } @@ -607,6 +643,14 @@ impl Display for BoundPredicate { } } +pub(crate) trait PredicateVisitor { + fn visit(predicate: Predicate) -> T; +} + +pub(crate) trait BoundPredicateVisitor { + fn visit(predicate: BoundPredicate) -> T; +} + #[cfg(test)] mod tests { use std::ops::Not; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 2ced1620a6..fe9e137e30 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -19,22 +19,20 @@ use crate::arrow::ArrowReaderBuilder; use crate::expr::BoundPredicate::AlwaysTrue; -use crate::expr::{Bind, BoundPredicate, BoundReference, LogicalExpression, Predicate, PredicateOperator}; +use crate::expr::{Bind, BoundPredicate, LogicalExpression, Predicate, PredicateOperator}; use crate::io::FileIO; use crate::spec::{ - DataContentType, FieldSummary, Manifest, ManifestEntry, ManifestEntryRef, ManifestFile, - PartitionField, PartitionSpec, PartitionSpecRef, Schema, SchemaRef, SnapshotRef, - TableMetadataRef, Transform, + DataContentType, FieldSummary, ManifestEntryRef, ManifestFile, PartitionField, + PartitionSpecRef, Schema, SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; +use async_stream::try_stream; use futures::stream::{iter, BoxStream}; use futures::StreamExt; use std::collections::HashMap; -use std::ops::Deref; use std::sync::Arc; -use async_stream::try_stream; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -173,8 +171,7 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result>; impl TableScan { /// Returns a stream of file scan tasks. - - pub async fn plan_files(&self) -> crate::Result { + pub async fn plan_files(&'static self) -> crate::Result { // Cache `PartitionEvaluator`s created as part of this scan let mut partition_evaluator_cache: HashMap = HashMap::new(); @@ -197,20 +194,12 @@ impl TableScan { // PartitionEvaluator that matches this manifest's partition spec ID. // Use one from the cache if there is one. If not, create one, put it in // the cache, and take a reference to it. - let partition_evaluator = if let Some(filter) = self.filter.as_ref() { - Some( - partition_evaluator_cache + if let Some(filter) = self.filter.as_ref() { + let partition_evaluator = partition_evaluator_cache .entry(manifest.partition_spec_id()) - .or_insert_with_key(self.create_partition_evaluator(filter)) - .deref(), - ) - } else { - None - }; + .or_insert_with_key(|key| self.create_partition_evaluator(key, filter)); - // If this scan has a filter, reject any manifest files whose partition values - // don't match the filter. - if let Some(partition_evaluator) = partition_evaluator { + // reject any manifest files whose partition values don't match the filter. if !partition_evaluator.filter_manifest_file(&entry) { continue; } @@ -236,20 +225,24 @@ impl TableScan { } } } - }.boxed()) + } + .boxed()) } - fn create_partition_evaluator(&self, filter: &Predicate) -> fn(&i32) -> crate::Result { - |&id| { - // TODO: predicate binding not yet merged to main - let bound_predicate = filter.bind(self.schema.clone(), self.case_sensitive)?; + fn create_partition_evaluator(&self, id: &i32, filter: &Predicate) -> PartitionEvaluator { - let partition_spec = self.table_metadata.partition_spec_by_id(id).unwrap(); - PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone()) - } + // TODO: this does not work yet. `bind` consumes self, but `Predicate` + // does not implement `Clone` or `Copy`. + let bound_predicate = filter.clone() + .bind(self.schema.clone(), self.case_sensitive) + .unwrap(); + + let partition_spec = self.table_metadata.partition_spec_by_id(*id).unwrap(); + PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone()) + .unwrap() } - pub async fn to_arrow(&self) -> crate::Result { + pub async fn to_arrow(&'static self) -> crate::Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone()); @@ -315,7 +308,11 @@ struct ManifestEvalVisitor { } impl ManifestEvalVisitor { - fn new(partition_schema: SchemaRef, partition_filter: Predicate, case_sensitive: bool) -> crate::Result { + fn new( + partition_schema: SchemaRef, + partition_filter: Predicate, + case_sensitive: bool, + ) -> crate::Result { let partition_filter = partition_filter.bind(partition_schema.clone(), case_sensitive)?; Ok(Self { @@ -332,8 +329,13 @@ impl ManifestEvalVisitor { case_sensitive: bool, ) -> crate::Result { let partition_type = partition_spec.partition_type(&table_schema)?; + + // this is needed as SchemaBuilder.with_fields expects an iterator over + // Arc rather than &Arc + let cloned_partition_fields: Vec<_> = partition_type.fields().iter().map(Arc::clone).collect(); + let partition_schema = Schema::builder() - .with_fields(partition_type.fields()) + .with_fields(cloned_partition_fields) .build()?; let partition_schema_ref = Arc::new(partition_schema); @@ -478,14 +480,20 @@ impl InclusiveProjection { // TODO: cache this? let mut parts: Vec<&PartitionField> = vec![]; - for partition_spec_field in self.partition_spec.fields { + for partition_spec_field in &self.partition_spec.fields { if partition_spec_field.source_id == field_id { parts.push(&partition_spec_field) } } parts.iter().fold(Predicate::AlwaysTrue, |res, &part| { - res.and(part.transform.project(&part.name, &predicate)) + // should this use ? instead of destructuring Ok() so that the whole call fails + // if the transform project() call errors? This would require changing the signature of `visit`. + if let Ok(Some(pred_for_part)) = part.transform.project(&part.name, &predicate) { + res.and(pred_for_part) + } else { + res + } }) } } diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 839d582dc0..d2295cdb88 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -18,6 +18,7 @@ //! Transforms in iceberg. use crate::error::{Error, Result}; +use crate::expr::{BoundPredicate, Predicate}; use crate::spec::datatypes::{PrimitiveType, Type}; use crate::ErrorKind; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -261,6 +262,12 @@ impl Transform { _ => self == other, } } + + pub fn project(&self, _name: &str, _predicate: &BoundPredicate) -> Result> { + // Waiting on https://github.com/apache/iceberg-rust/pull/269 + // to deliver https://github.com/apache/iceberg-rust/issues/264 + todo!() + } } impl Display for Transform { From 91c2852eb891469fa245bfd409c1bccf3f987fa6 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 21 Mar 2024 06:55:03 +0000 Subject: [PATCH 03/17] feat: evaluate ManifestFile partition values before loading its Manifest --- crates/iceberg/src/scan.rs | 6 +++--- crates/iceberg/src/spec/manifest_list.rs | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index fe9e137e30..4aeea7df23 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -188,15 +188,13 @@ impl TableScan { // Generate data file stream let mut entries = iter(manifest_list.entries()); while let Some(entry) = entries.next().await { - let manifest = entry.load_manifest(&file_io).await?; - // If this scan has a filter, check the partition evaluator cache for an existing // PartitionEvaluator that matches this manifest's partition spec ID. // Use one from the cache if there is one. If not, create one, put it in // the cache, and take a reference to it. if let Some(filter) = self.filter.as_ref() { let partition_evaluator = partition_evaluator_cache - .entry(manifest.partition_spec_id()) + .entry(entry.partition_spec_id()) .or_insert_with_key(|key| self.create_partition_evaluator(key, filter)); // reject any manifest files whose partition values don't match the filter. @@ -205,6 +203,8 @@ impl TableScan { } } + let manifest = entry.load_manifest(&file_io).await?; + let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); while let Some(manifest_entry) = manifest_entries.next().await { match manifest_entry.content_type() { diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index da63815d60..34d0571a0c 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -649,6 +649,10 @@ impl ManifestFile { Ok(Manifest::new(metadata, entries)) } + + pub(crate) fn partition_spec_id(&self) -> i32 { + self.partition_spec_id + } } /// Field summary for partition field in the spec. From 53c6073ce9f9447b6d7d3aeea05e52021b051bbd Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 21 Mar 2024 07:01:30 +0000 Subject: [PATCH 04/17] feat: avoid unneeded extra iter and await --- crates/iceberg/src/scan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 4aeea7df23..b06f91c0c7 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -186,8 +186,7 @@ impl TableScan { .await?; // Generate data file stream - let mut entries = iter(manifest_list.entries()); - while let Some(entry) = entries.next().await { + for entry in manifest_list.entries() { // If this scan has a filter, check the partition evaluator cache for an existing // PartitionEvaluator that matches this manifest's partition spec ID. // Use one from the cache if there is one. If not, create one, put it in From f2704432d3ba3cba3be6165280baca1bc64e4932 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 21 Mar 2024 07:24:08 +0000 Subject: [PATCH 05/17] chore: fix some clippy lints --- crates/iceberg/src/expr/predicate.rs | 3 ++- crates/iceberg/src/scan.rs | 19 +++++++++++-------- crates/iceberg/src/spec/manifest.rs | 1 + crates/iceberg/src/spec/transform.rs | 1 + 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 1f964eba66..85558c7832 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -238,9 +238,10 @@ impl Display for SetExpression { /// Unbound predicate expression before binding to a schema. #[derive(Debug, PartialEq)] pub enum Predicate { + /// AlwaysTrue predicate, for example, `TRUE`. AlwaysTrue, + /// AlwaysFalse predicate, for example, `FALSE`. AlwaysFalse, - /// And predicate, for example, `a > 10 AND b < 20`. And(LogicalExpression), /// Or predicate, for example, `a > 10 OR b < 20`. diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index b06f91c0c7..9ae3b78686 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -197,7 +197,7 @@ impl TableScan { .or_insert_with_key(|key| self.create_partition_evaluator(key, filter)); // reject any manifest files whose partition values don't match the filter. - if !partition_evaluator.filter_manifest_file(&entry) { + if !partition_evaluator.filter_manifest_file(entry) { continue; } } @@ -229,10 +229,9 @@ impl TableScan { } fn create_partition_evaluator(&self, id: &i32, filter: &Predicate) -> PartitionEvaluator { - // TODO: this does not work yet. `bind` consumes self, but `Predicate` // does not implement `Clone` or `Copy`. - let bound_predicate = filter.clone() + let bound_predicate = filter .bind(self.schema.clone(), self.case_sensitive) .unwrap(); @@ -301,8 +300,10 @@ impl PartitionEvaluator { } struct ManifestEvalVisitor { + #[allow(dead_code)] partition_schema: SchemaRef, partition_filter: BoundPredicate, + #[allow(dead_code)] case_sensitive: bool, } @@ -331,7 +332,8 @@ impl ManifestEvalVisitor { // this is needed as SchemaBuilder.with_fields expects an iterator over // Arc rather than &Arc - let cloned_partition_fields: Vec<_> = partition_type.fields().iter().map(Arc::clone).collect(); + let cloned_partition_fields: Vec<_> = + partition_type.fields().iter().map(Arc::clone).collect(); let partition_schema = Schema::builder() .with_fields(cloned_partition_fields) @@ -343,11 +345,11 @@ impl ManifestEvalVisitor { InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); let unbound_partition_filter = inclusive_projection.project(&partition_filter); - Ok(Self::new( + Self::new( partition_schema_ref.clone(), unbound_partition_filter, case_sensitive, - )?) + ) } pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> bool { @@ -433,6 +435,7 @@ impl ManifestEvalVisitor { } struct InclusiveProjection { + #[allow(dead_code)] table_schema: SchemaRef, partition_spec: PartitionSpecRef, } @@ -481,14 +484,14 @@ impl InclusiveProjection { let mut parts: Vec<&PartitionField> = vec![]; for partition_spec_field in &self.partition_spec.fields { if partition_spec_field.source_id == field_id { - parts.push(&partition_spec_field) + parts.push(partition_spec_field) } } parts.iter().fold(Predicate::AlwaysTrue, |res, &part| { // should this use ? instead of destructuring Ok() so that the whole call fails // if the transform project() call errors? This would require changing the signature of `visit`. - if let Ok(Some(pred_for_part)) = part.transform.project(&part.name, &predicate) { + if let Ok(Some(pred_for_part)) = part.transform.project(&part.name, predicate) { res.and(pred_for_part) } else { res diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index cc0187a64e..0ed2d79235 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -873,6 +873,7 @@ impl ManifestEntry { &self.data_file.file_path } + /// Get a reference to the Partition Struct of the data file of this manifest entry pub fn get_partition_struct(&self) -> &Struct { &self.data_file.partition } diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index d2295cdb88..65f4e6e552 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -263,6 +263,7 @@ impl Transform { } } + /// Projects predicate to `Transform` pub fn project(&self, _name: &str, _predicate: &BoundPredicate) -> Result> { // Waiting on https://github.com/apache/iceberg-rust/pull/269 // to deliver https://github.com/apache/iceberg-rust/issues/264 From b8b6bf24f22fc8c261bd69db6c74a378f66eb867 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 21 Mar 2024 07:46:48 +0000 Subject: [PATCH 06/17] feat: implement field_id accessor for BoundReference Expressions --- crates/iceberg/src/expr/predicate.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 85558c7832..3c086d1fe3 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -116,12 +116,12 @@ impl UnaryExpression { debug_assert!(op.is_unary()); Self { op, term } } +} +impl UnaryExpression { + /// get the field_id of this expression's term's field pub(crate) fn field_id(&self) -> i32 { - todo!(); - - // The below is not yet working since T may not implement `.field()` - // self.term.field().id + self.term.field().id } } @@ -151,12 +151,12 @@ impl BinaryExpression { debug_assert!(op.is_binary()); Self { op, term, literal } } +} +impl BinaryExpression { + /// get the field_id of this expression's term's field pub(crate) fn field_id(&self) -> i32 { - todo!(); - - // The below is not yet working since T may not implement `.field()` - // self.term.field().id + self.term.field().id } } @@ -205,12 +205,12 @@ impl SetExpression { debug_assert!(op.is_set()); Self { op, term, literals } } +} +impl SetExpression { + /// get the field_id of this expression's term's field pub(crate) fn field_id(&self) -> i32 { - todo!(); - - // The below is not yet working since T may not implement `.field()` - // self.term.field().id + self.term.field().id } } From 380f4a0f7962135f1fa1fb4d97294082c2c9cf3e Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Fri, 22 Mar 2024 19:21:24 +0000 Subject: [PATCH 07/17] feat(wip): add accessors --- crates/iceberg/src/expr/accessor.rs | 30 ++ crates/iceberg/src/expr/mod.rs | 1 + crates/iceberg/src/expr/predicate.rs | 14 +- crates/iceberg/src/expr/term.rs | 22 +- crates/iceberg/src/scan.rs | 726 ++++++++++++++------------- crates/iceberg/src/spec/schema.rs | 17 + 6 files changed, 449 insertions(+), 361 deletions(-) create mode 100644 crates/iceberg/src/expr/accessor.rs diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs new file mode 100644 index 0000000000..13287c19fe --- /dev/null +++ b/crates/iceberg/src/expr/accessor.rs @@ -0,0 +1,30 @@ +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Accessor { + position: usize, + inner: Option>, +} + +impl Accessor { + pub(crate) fn new(position: usize, inner: Option) -> Self { + Accessor { + position, + inner: inner.map(Box::new), + } + } + + pub fn position(&self) -> usize { + self.position + } + + // fn get(&self, container: T) -> R { + // let mut val = container[self.position]; + // let mut inner = &self.inner; + // + // while let Some(inner_inner) = inner { + // val = val[inner_inner.position]; + // inner = &inner_inner.inner; + // } + // + // val + // } +} diff --git a/crates/iceberg/src/expr/mod.rs b/crates/iceberg/src/expr/mod.rs index 0d329682e5..dccafb79af 100644 --- a/crates/iceberg/src/expr/mod.rs +++ b/crates/iceberg/src/expr/mod.rs @@ -22,6 +22,7 @@ mod term; use std::fmt::{Display, Formatter}; pub use term::*; +pub(crate) mod accessor; mod predicate; use crate::spec::SchemaRef; diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 3c086d1fe3..398f93c5f2 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -84,7 +84,7 @@ pub struct UnaryExpression { /// Operator of this predicate, must be single operand operator. pub(crate) op: PredicateOperator, /// Term of this predicate, for example, `a` in `a IS NULL`. - pub(crate) term: T, + term: T, } impl Debug for UnaryExpression { @@ -116,6 +116,10 @@ impl UnaryExpression { debug_assert!(op.is_unary()); Self { op, term } } + + pub(crate) fn term(&self) -> &T { + &self.term + } } impl UnaryExpression { @@ -151,6 +155,10 @@ impl BinaryExpression { debug_assert!(op.is_binary()); Self { op, term, literal } } + + pub(crate) fn term(&self) -> &T { + &self.term + } } impl BinaryExpression { @@ -205,6 +213,10 @@ impl SetExpression { debug_assert!(op.is_set()); Self { op, term, literals } } + + pub(crate) fn term(&self) -> &T { + &self.term + } } impl SetExpression { diff --git a/crates/iceberg/src/expr/term.rs b/crates/iceberg/src/expr/term.rs index 15cb298172..a4a678b7e1 100644 --- a/crates/iceberg/src/expr/term.rs +++ b/crates/iceberg/src/expr/term.rs @@ -21,6 +21,7 @@ use std::fmt::{Display, Formatter}; use fnv::FnvHashSet; +use crate::expr::accessor::Accessor; use crate::expr::Bind; use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression}; use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef}; @@ -188,7 +189,14 @@ impl Bind for Reference { format!("Field {} not found in schema", self.name), ) })?; - Ok(BoundReference::new(self.name.clone(), field.clone())) + + let accessor = schema.accessor_for_field_id(field.id); + + Ok(BoundReference::new( + self.name.clone(), + field.clone(), + accessor.clone(), + )) } } @@ -199,14 +207,16 @@ pub struct BoundReference { // For example, if the field is `a.b.c`, then `field.name` is `c`, but `original_name` is `a.b.c`. column_name: String, field: NestedFieldRef, + accessor: Accessor, } impl BoundReference { /// Creates a new bound reference. - pub fn new(name: impl Into, field: NestedFieldRef) -> Self { + pub fn new(name: impl Into, field: NestedFieldRef, accessor: Accessor) -> Self { Self { column_name: name.into(), field, + accessor, } } @@ -214,6 +224,11 @@ impl BoundReference { pub fn field(&self) -> &NestedField { &self.field } + + /// Get this BoundReference's Accessor + pub fn accessor(&self) -> &Accessor { + &self.accessor + } } impl Display for BoundReference { @@ -229,6 +244,7 @@ pub type BoundTerm = BoundReference; mod tests { use std::sync::Arc; + use crate::expr::accessor::Accessor; use crate::expr::{Bind, BoundReference, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; @@ -255,6 +271,7 @@ mod tests { let expected_ref = BoundReference::new( "bar", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + Accessor::new(1, None), ); assert_eq!(expected_ref, reference); @@ -268,6 +285,7 @@ mod tests { let expected_ref = BoundReference::new( "BAR", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + Accessor::new(1, None), ); assert_eq!(expected_ref, reference); diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 9ae3b78686..48904df82d 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -375,9 +375,9 @@ impl ManifestEvalVisitor { panic!("NOT predicates should be eliminated before calling this function") } BoundPredicate::Unary(expr) => { - // TODO: pos = term.ref().accessor.position in python impl - let pos = 0; + let pos = expr.term().accessor().position(); let field = &partitions[pos]; + match expr.op { PredicateOperator::IsNull => field.contains_null, PredicateOperator::NotNull => { @@ -392,44 +392,54 @@ impl ManifestEvalVisitor { } } } - BoundPredicate::Binary(expr) => match expr.op { - PredicateOperator::LessThan => { - todo!() - } - PredicateOperator::LessThanOrEq => { - todo!() - } - PredicateOperator::GreaterThan => { - todo!() - } - PredicateOperator::GreaterThanOrEq => { - todo!() - } - PredicateOperator::Eq => { - todo!() - } - PredicateOperator::NotEq => { - todo!() - } - PredicateOperator::StartsWith => { - todo!() - } - PredicateOperator::NotStartsWith => { - todo!() - } - _ => { - panic!("unexpected op") - } - }, - BoundPredicate::Set(expr) => match expr.op { - PredicateOperator::In => { - todo!() + BoundPredicate::Binary(expr) => { + let pos = expr.term().accessor().position(); + let _field = &partitions[pos]; + + match expr.op { + PredicateOperator::LessThan => { + todo!() + } + PredicateOperator::LessThanOrEq => { + todo!() + } + PredicateOperator::GreaterThan => { + todo!() + } + PredicateOperator::GreaterThanOrEq => { + todo!() + } + PredicateOperator::Eq => { + todo!() + } + PredicateOperator::NotEq => { + todo!() + } + PredicateOperator::StartsWith => { + todo!() + } + PredicateOperator::NotStartsWith => { + todo!() + } + _ => { + panic!("unexpected op") + } } - PredicateOperator::NotIn => true, - _ => { - panic!("unexpected op") + } + BoundPredicate::Set(expr) => { + let pos = expr.term().accessor().position(); + let _field = &partitions[pos]; + + match expr.op { + PredicateOperator::In => { + todo!() + } + PredicateOperator::NotIn => true, + _ => { + panic!("unexpected op") + } } - }, + } } } } @@ -500,323 +510,323 @@ impl InclusiveProjection { } } -#[cfg(test)] -mod tests { - use crate::io::{FileIO, OutputFile}; - use crate::spec::{ - DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest, - ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, - }; - use crate::table::Table; - use crate::TableIdent; - use arrow_array::{ArrayRef, Int64Array, RecordBatch}; - use futures::TryStreamExt; - use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; - use parquet::basic::Compression; - use parquet::file::properties::WriterProperties; - use std::collections::HashMap; - use std::fs; - use std::fs::File; - use std::sync::Arc; - use tempfile::TempDir; - use tera::{Context, Tera}; - use uuid::Uuid; - - struct TableTestFixture { - table_location: String, - table: Table, - } - - impl TableTestFixture { - fn new() -> Self { - let tmp_dir = TempDir::new().unwrap(); - let table_location = tmp_dir.path().join("table1"); - let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); - let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); - let table_metadata1_location = table_location.join("metadata/v1.json"); - - let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) - .unwrap() - .build() - .unwrap(); - - let table_metadata = { - let template_json_str = fs::read_to_string(format!( - "{}/testdata/example_table_metadata_v2.json", - env!("CARGO_MANIFEST_DIR") - )) - .unwrap(); - let mut context = Context::new(); - context.insert("table_location", &table_location); - context.insert("manifest_list_1_location", &manifest_list1_location); - context.insert("manifest_list_2_location", &manifest_list2_location); - context.insert("table_metadata_1_location", &table_metadata1_location); - - let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); - serde_json::from_str::(&metadata_json).unwrap() - }; - - let table = Table::builder() - .metadata(table_metadata) - .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) - .file_io(file_io) - .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) - .build(); - - Self { - table_location: table_location.to_str().unwrap().to_string(), - table, - } - } - - fn next_manifest_file(&self) -> OutputFile { - self.table - .file_io() - .new_output(format!( - "{}/metadata/manifest_{}.avro", - self.table_location, - Uuid::new_v4() - )) - .unwrap() - } - - async fn setup_manifest_files(&mut self) { - let current_snapshot = self.table.metadata().current_snapshot().unwrap(); - let parent_snapshot = current_snapshot - .parent_snapshot(self.table.metadata()) - .unwrap(); - let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); - - // Write data files - let data_file_manifest = ManifestWriter::new( - self.next_manifest_file(), - current_snapshot.snapshot_id(), - vec![], - ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema((*current_schema).clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ - ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFile::builder() - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .build(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Deleted) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFile::builder() - .content(DataContentType::Data) - .file_path(format!("{}/2.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(200))])) - .build(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Existing) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFile::builder() - .content(DataContentType::Data) - .file_path(format!("{}/3.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(300))])) - .build(), - ) - .build(), - ], - )) - .await - .unwrap(); - - // Write to manifest list - let mut manifest_list_write = ManifestListWriter::v2( - self.table - .file_io() - .new_output(current_snapshot.manifest_list()) - .unwrap(), - current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), - current_snapshot.sequence_number(), - ); - manifest_list_write - .add_manifests(vec![data_file_manifest].into_iter()) - .unwrap(); - manifest_list_write.close().await.unwrap(); - - // prepare data - let schema = { - let fields = - vec![ - arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "0".to_string(), - )])), - ]; - Arc::new(arrow_schema::Schema::new(fields)) - }; - let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; - let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); - - // Write the Parquet files - let props = WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build(); - - for n in 1..=3 { - let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); - let mut writer = - ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); - - writer.write(&to_write).expect("Writing batch"); - - // writer must be closed to write footer - writer.close().unwrap(); - } - } - } - - #[test] - fn test_table_scan_columns() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().select(["x", "y"]).build().unwrap(); - assert_eq!(vec!["x", "y"], table_scan.column_names); - - let table_scan = table - .scan() - .select(["x", "y"]) - .select(["z"]) - .build() - .unwrap(); - assert_eq!(vec!["z"], table_scan.column_names); - } - - #[test] - fn test_select_all() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().select_all().build().unwrap(); - assert!(table_scan.column_names.is_empty()); - } - - #[test] - fn test_select_no_exist_column() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().select(["x", "y", "z", "a"]).build(); - assert!(table_scan.is_err()); - } - - #[test] - fn test_table_scan_default_snapshot_id() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().build().unwrap(); - assert_eq!( - table.metadata().current_snapshot().unwrap().snapshot_id(), - table_scan.snapshot.snapshot_id() - ); - } - - #[test] - fn test_table_scan_non_exist_snapshot_id() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().snapshot_id(1024).build(); - assert!(table_scan.is_err()); - } - - #[test] - fn test_table_scan_with_snapshot_id() { - let table = TableTestFixture::new().table; - - let table_scan = table - .scan() - .snapshot_id(3051729675574597004) - .build() - .unwrap(); - assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004); - } - - #[tokio::test] - async fn test_plan_files_no_deletions() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; - - // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); - let mut tasks = table_scan - .plan_files() - .await - .unwrap() - .try_fold(vec![], |mut acc, task| async move { - acc.push(task); - Ok(acc) - }) - .await - .unwrap(); - - assert_eq!(tasks.len(), 2); - - tasks.sort_by_key(|t| t.data_file.file_path().to_string()); - - // Check first task is added data file - assert_eq!( - tasks[0].data_file.file_path(), - format!("{}/1.parquet", &fixture.table_location) - ); - - // Check second task is existing data file - assert_eq!( - tasks[1].data_file.file_path(), - format!("{}/3.parquet", &fixture.table_location) - ); - } - - #[tokio::test] - async fn test_open_parquet_no_deletions() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; - - // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); - - let batch_stream = table_scan.to_arrow().await.unwrap(); - - let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - - let col = batches[0].column_by_name("col").unwrap(); - - let int64_arr = col.as_any().downcast_ref::().unwrap(); - assert_eq!(int64_arr.value(0), 1); - } -} +// #[cfg(test)] +// mod tests { +// use crate::io::{FileIO, OutputFile}; +// use crate::spec::{ +// DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest, +// ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, +// ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, +// }; +// use crate::table::Table; +// use crate::TableIdent; +// use arrow_array::{ArrayRef, Int64Array, RecordBatch}; +// use futures::TryStreamExt; +// use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; +// use parquet::basic::Compression; +// use parquet::file::properties::WriterProperties; +// use std::collections::HashMap; +// use std::fs; +// use std::fs::File; +// use std::sync::Arc; +// use tempfile::TempDir; +// use tera::{Context, Tera}; +// use uuid::Uuid; +// +// struct TableTestFixture { +// table_location: String, +// table: Table, +// } +// +// impl TableTestFixture { +// fn new() -> Self { +// let tmp_dir = TempDir::new().unwrap(); +// let table_location = tmp_dir.path().join("table1"); +// let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); +// let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); +// let table_metadata1_location = table_location.join("metadata/v1.json"); +// +// let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) +// .unwrap() +// .build() +// .unwrap(); +// +// let table_metadata = { +// let template_json_str = fs::read_to_string(format!( +// "{}/testdata/example_table_metadata_v2.json", +// env!("CARGO_MANIFEST_DIR") +// )) +// .unwrap(); +// let mut context = Context::new(); +// context.insert("table_location", &table_location); +// context.insert("manifest_list_1_location", &manifest_list1_location); +// context.insert("manifest_list_2_location", &manifest_list2_location); +// context.insert("table_metadata_1_location", &table_metadata1_location); +// +// let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); +// serde_json::from_str::(&metadata_json).unwrap() +// }; +// +// let table = Table::builder() +// .metadata(table_metadata) +// .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) +// .file_io(file_io) +// .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) +// .build(); +// +// Self { +// table_location: table_location.to_str().unwrap().to_string(), +// table, +// } +// } +// +// fn next_manifest_file(&self) -> OutputFile { +// self.table +// .file_io() +// .new_output(format!( +// "{}/metadata/manifest_{}.avro", +// self.table_location, +// Uuid::new_v4() +// )) +// .unwrap() +// } +// +// async fn setup_manifest_files(&mut self) { +// let current_snapshot = self.table.metadata().current_snapshot().unwrap(); +// let parent_snapshot = current_snapshot +// .parent_snapshot(self.table.metadata()) +// .unwrap(); +// let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); +// let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); +// +// // Write data files +// let data_file_manifest = ManifestWriter::new( +// self.next_manifest_file(), +// current_snapshot.snapshot_id(), +// vec![], +// ) +// .write(Manifest::new( +// ManifestMetadata::builder() +// .schema((*current_schema).clone()) +// .content(ManifestContentType::Data) +// .format_version(FormatVersion::V2) +// .partition_spec((**current_partition_spec).clone()) +// .schema_id(current_schema.schema_id()) +// .build(), +// vec![ +// ManifestEntry::builder() +// .status(ManifestStatus::Added) +// .data_file( +// DataFile::builder() +// .content(DataContentType::Data) +// .file_path(format!("{}/1.parquet", &self.table_location)) +// .file_format(DataFileFormat::Parquet) +// .file_size_in_bytes(100) +// .record_count(1) +// .partition(Struct::from_iter([Some(Literal::long(100))])) +// .build(), +// ) +// .build(), +// ManifestEntry::builder() +// .status(ManifestStatus::Deleted) +// .snapshot_id(parent_snapshot.snapshot_id()) +// .sequence_number(parent_snapshot.sequence_number()) +// .file_sequence_number(parent_snapshot.sequence_number()) +// .data_file( +// DataFile::builder() +// .content(DataContentType::Data) +// .file_path(format!("{}/2.parquet", &self.table_location)) +// .file_format(DataFileFormat::Parquet) +// .file_size_in_bytes(100) +// .record_count(1) +// .partition(Struct::from_iter([Some(Literal::long(200))])) +// .build(), +// ) +// .build(), +// ManifestEntry::builder() +// .status(ManifestStatus::Existing) +// .snapshot_id(parent_snapshot.snapshot_id()) +// .sequence_number(parent_snapshot.sequence_number()) +// .file_sequence_number(parent_snapshot.sequence_number()) +// .data_file( +// DataFile::builder() +// .content(DataContentType::Data) +// .file_path(format!("{}/3.parquet", &self.table_location)) +// .file_format(DataFileFormat::Parquet) +// .file_size_in_bytes(100) +// .record_count(1) +// .partition(Struct::from_iter([Some(Literal::long(300))])) +// .build(), +// ) +// .build(), +// ], +// )) +// .await +// .unwrap(); +// +// // Write to manifest list +// let mut manifest_list_write = ManifestListWriter::v2( +// self.table +// .file_io() +// .new_output(current_snapshot.manifest_list()) +// .unwrap(), +// current_snapshot.snapshot_id(), +// current_snapshot +// .parent_snapshot_id() +// .unwrap_or(EMPTY_SNAPSHOT_ID), +// current_snapshot.sequence_number(), +// ); +// manifest_list_write +// .add_manifests(vec![data_file_manifest].into_iter()) +// .unwrap(); +// manifest_list_write.close().await.unwrap(); +// +// // prepare data +// let schema = { +// let fields = +// vec![ +// arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) +// .with_metadata(HashMap::from([( +// PARQUET_FIELD_ID_META_KEY.to_string(), +// "0".to_string(), +// )])), +// ]; +// Arc::new(arrow_schema::Schema::new(fields)) +// }; +// let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; +// let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); +// +// // Write the Parquet files +// let props = WriterProperties::builder() +// .set_compression(Compression::SNAPPY) +// .build(); +// +// for n in 1..=3 { +// let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); +// let mut writer = +// ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); +// +// writer.write(&to_write).expect("Writing batch"); +// +// // writer must be closed to write footer +// writer.close().unwrap(); +// } +// } +// } +// +// #[test] +// fn test_table_scan_columns() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().select(["x", "y"]).build().unwrap(); +// assert_eq!(vec!["x", "y"], table_scan.column_names); +// +// let table_scan = table +// .scan() +// .select(["x", "y"]) +// .select(["z"]) +// .build() +// .unwrap(); +// assert_eq!(vec!["z"], table_scan.column_names); +// } +// +// #[test] +// fn test_select_all() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().select_all().build().unwrap(); +// assert!(table_scan.column_names.is_empty()); +// } +// +// #[test] +// fn test_select_no_exist_column() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().select(["x", "y", "z", "a"]).build(); +// assert!(table_scan.is_err()); +// } +// +// #[test] +// fn test_table_scan_default_snapshot_id() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().build().unwrap(); +// assert_eq!( +// table.metadata().current_snapshot().unwrap().snapshot_id(), +// table_scan.snapshot.snapshot_id() +// ); +// } +// +// #[test] +// fn test_table_scan_non_exist_snapshot_id() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().snapshot_id(1024).build(); +// assert!(table_scan.is_err()); +// } +// +// #[test] +// fn test_table_scan_with_snapshot_id() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table +// .scan() +// .snapshot_id(3051729675574597004) +// .build() +// .unwrap(); +// assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004); +// } +// +// #[tokio::test] +// async fn test_plan_files_no_deletions() { +// let mut fixture = TableTestFixture::new(); +// fixture.setup_manifest_files().await; +// +// // Create table scan for current snapshot and plan files +// let table_scan = fixture.table.scan().build().unwrap(); +// let mut tasks = table_scan +// .plan_files() +// .await +// .unwrap() +// .try_fold(vec![], |mut acc, task| async move { +// acc.push(task); +// Ok(acc) +// }) +// .await +// .unwrap(); +// +// assert_eq!(tasks.len(), 2); +// +// tasks.sort_by_key(|t| t.data_file.file_path().to_string()); +// +// // Check first task is added data file +// assert_eq!( +// tasks[0].data_file.file_path(), +// format!("{}/1.parquet", &fixture.table_location) +// ); +// +// // Check second task is existing data file +// assert_eq!( +// tasks[1].data_file.file_path(), +// format!("{}/3.parquet", &fixture.table_location) +// ); +// } +// +// #[tokio::test] +// async fn test_open_parquet_no_deletions() { +// let mut fixture = TableTestFixture::new(); +// fixture.setup_manifest_files().await; +// +// // Create table scan for current snapshot and plan files +// let table_scan = fixture.table.scan().build().unwrap(); +// +// let batch_stream = table_scan.to_arrow().await.unwrap(); +// +// let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); +// +// let col = batches[0].column_by_name("col").unwrap(); +// +// let int64_arr = col.as_any().downcast_ref::().unwrap(); +// assert_eq!(int64_arr.value(0), 1); +// } +// } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 975a2a9ef7..266792bdb3 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -18,6 +18,7 @@ //! This module defines schema in iceberg. use crate::error::Result; +use crate::expr::accessor::Accessor; use crate::spec::datatypes::{ ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, @@ -53,6 +54,8 @@ pub struct Schema { name_to_id: HashMap, lowercase_name_to_id: HashMap, id_to_name: HashMap, + + field_id_to_accessor: HashMap, } impl PartialEq for Schema { @@ -103,6 +106,13 @@ impl SchemaBuilder { pub fn build(self) -> Result { let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0); + // TODO: handle nested field accessors and types + // base this on https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/Accessors.java#L214 + let mut field_id_to_accessor = HashMap::new(); + for (idx, field) in self.fields.iter().enumerate() { + field_id_to_accessor.insert(field.id, Accessor::new(idx, None)); + } + let r#struct = StructType::new(self.fields); let id_to_field = index_by_id(&r#struct)?; @@ -135,6 +145,8 @@ impl SchemaBuilder { name_to_id, lowercase_name_to_id, id_to_name, + + field_id_to_accessor, }) } @@ -262,6 +274,11 @@ impl Schema { pub fn name_by_field_id(&self, field_id: i32) -> Option<&str> { self.id_to_name.get(&field_id).map(String::as_str) } + + /// Get an accessor for retrieving data in a struct + pub fn accessor_for_field_id(&self, field_id: i32) -> &Accessor { + &self.field_id_to_accessor[&field_id] + } } impl Display for Schema { From 47270d2bf958c97f9c525b79c7adbcbaa632f2fd Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Fri, 22 Mar 2024 19:27:05 +0000 Subject: [PATCH 08/17] fix: dont make predicate op pub(crate)" --- crates/iceberg/src/expr/predicate.rs | 18 +++++++++++++++--- crates/iceberg/src/scan.rs | 6 +++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 398f93c5f2..877c1c630a 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -82,7 +82,7 @@ where #[derive(PartialEq)] pub struct UnaryExpression { /// Operator of this predicate, must be single operand operator. - pub(crate) op: PredicateOperator, + op: PredicateOperator, /// Term of this predicate, for example, `a` in `a IS NULL`. term: T, } @@ -120,6 +120,10 @@ impl UnaryExpression { pub(crate) fn term(&self) -> &T { &self.term } + + pub(crate) fn op(&self) -> &PredicateOperator { + &self.op + } } impl UnaryExpression { @@ -133,7 +137,7 @@ impl UnaryExpression { #[derive(PartialEq)] pub struct BinaryExpression { /// Operator of this predicate, must be binary operator, such as `=`, `>`, `<`, etc. - pub(crate) op: PredicateOperator, + op: PredicateOperator, /// Term of this predicate, for example, `a` in `a > 10`. term: T, /// Literal of this predicate, for example, `10` in `a > 10`. @@ -159,6 +163,10 @@ impl BinaryExpression { pub(crate) fn term(&self) -> &T { &self.term } + + pub(crate) fn op(&self) -> &PredicateOperator { + &self.op + } } impl BinaryExpression { @@ -191,7 +199,7 @@ impl Bind for BinaryExpression { #[derive(PartialEq)] pub struct SetExpression { /// Operator of this predicate, must be set operator, such as `IN`, `NOT IN`, etc. - pub(crate) op: PredicateOperator, + op: PredicateOperator, /// Term of this predicate, for example, `a` in `a in (1, 2, 3)`. term: T, /// Literals of this predicate, for example, `(1, 2, 3)` in `a in (1, 2, 3)`. @@ -217,6 +225,10 @@ impl SetExpression { pub(crate) fn term(&self) -> &T { &self.term } + + pub(crate) fn op(&self) -> &PredicateOperator { + &self.op + } } impl SetExpression { diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 48904df82d..51582db45d 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -378,7 +378,7 @@ impl ManifestEvalVisitor { let pos = expr.term().accessor().position(); let field = &partitions[pos]; - match expr.op { + match expr.op() { PredicateOperator::IsNull => field.contains_null, PredicateOperator::NotNull => { todo!() @@ -396,7 +396,7 @@ impl ManifestEvalVisitor { let pos = expr.term().accessor().position(); let _field = &partitions[pos]; - match expr.op { + match expr.op() { PredicateOperator::LessThan => { todo!() } @@ -430,7 +430,7 @@ impl ManifestEvalVisitor { let pos = expr.term().accessor().position(); let _field = &partitions[pos]; - match expr.op { + match expr.op() { PredicateOperator::In => { todo!() } From 9156a56980bd46851103ae1833e8ba28beaefa6a Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 23 Mar 2024 11:05:45 +0000 Subject: [PATCH 09/17] refactor: change TableScanBuilder case sensitivity setter --- crates/iceberg/src/scan.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 51582db45d..a84d1e8a86 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -64,9 +64,9 @@ impl<'a> TableScanBuilder<'a> { self } - /// Sets the scan as being case-insensitive - pub fn with_case_insensitivity(mut self) -> Self { - self.case_sensitive = false; + /// Sets the scan's case sensitivity + pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self { + self.case_sensitive = case_sensitive; self } From 22ac049335731acc9b4c8b70c3619dedb731b258 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 23 Mar 2024 11:06:04 +0000 Subject: [PATCH 10/17] chore: remove outdated comment and fix typo --- crates/iceberg/src/scan.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index a84d1e8a86..3dac63e080 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -229,8 +229,6 @@ impl TableScan { } fn create_partition_evaluator(&self, id: &i32, filter: &Predicate) -> PartitionEvaluator { - // TODO: this does not work yet. `bind` consumes self, but `Predicate` - // does not implement `Clone` or `Copy`. let bound_predicate = filter .bind(self.schema.clone(), self.case_sensitive) .unwrap(); @@ -486,7 +484,7 @@ impl InclusiveProjection { BoundPredicate::Binary(expr) => expr.field_id(), BoundPredicate::Set(expr) => expr.field_id(), _ => { - panic!("Should not get here as these brances handled in self.visit") + panic!("Should not get here as these branches handled in self.visit") } }; From 7b1eff847c30ccea1b8a0bdbb5ab7b8cd1ac5e30 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 23 Mar 2024 22:18:24 +0000 Subject: [PATCH 11/17] feat: redo accessors. Add field_id to accessor map to Schema, popualted in build() --- crates/iceberg/src/expr/accessor.rs | 63 ++++++++++++++++++++--------- crates/iceberg/src/expr/term.rs | 14 +++---- crates/iceberg/src/scan.rs | 6 +-- crates/iceberg/src/spec/schema.rs | 58 +++++++++++++++++++++----- crates/iceberg/src/spec/values.rs | 5 +++ 5 files changed, 106 insertions(+), 40 deletions(-) diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs index 13287c19fe..8ee0e6a392 100644 --- a/crates/iceberg/src/expr/accessor.rs +++ b/crates/iceberg/src/expr/accessor.rs @@ -1,30 +1,53 @@ -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct Accessor { - position: usize, - inner: Option>, +use crate::spec::{Literal, Struct, Type}; +use serde_derive::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +enum InnerOrType { + Inner(Box), + Type(Type), +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct StructAccessor { + position: i32, + inner_or_type: InnerOrType, } -impl Accessor { - pub(crate) fn new(position: usize, inner: Option) -> Self { - Accessor { +impl StructAccessor { + pub(crate) fn new(position: i32, r#type: Type) -> Self { + StructAccessor { + position, + inner_or_type: InnerOrType::Type(r#type), + } + } + + pub(crate) fn wrap(position: i32, inner: StructAccessor) -> Self { + StructAccessor { position, - inner: inner.map(Box::new), + inner_or_type: InnerOrType::Inner(Box::from(inner)), } } - pub fn position(&self) -> usize { + pub fn position(&self) -> i32 { self.position } - // fn get(&self, container: T) -> R { - // let mut val = container[self.position]; - // let mut inner = &self.inner; - // - // while let Some(inner_inner) = inner { - // val = val[inner_inner.position]; - // inner = &inner_inner.inner; - // } - // - // val - // } + fn r#type(&self) -> &Type { + match &self.inner_or_type { + InnerOrType::Inner(inner) => inner.r#type(), + InnerOrType::Type(r#type) => r#type, + } + } + + fn get<'a>(&'a self, container: &'a Struct) -> &Literal { + match &self.inner_or_type { + InnerOrType::Inner(inner) => match container.get(self.position) { + Literal::Struct(wrapped) => inner.get(wrapped), + _ => { + unreachable!() + } + }, + InnerOrType::Type(_) => container.get(self.position), + } + } } diff --git a/crates/iceberg/src/expr/term.rs b/crates/iceberg/src/expr/term.rs index a4a678b7e1..616a02ca4b 100644 --- a/crates/iceberg/src/expr/term.rs +++ b/crates/iceberg/src/expr/term.rs @@ -21,7 +21,7 @@ use std::fmt::{Display, Formatter}; use fnv::FnvHashSet; -use crate::expr::accessor::Accessor; +use crate::expr::accessor::StructAccessor; use crate::expr::Bind; use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression}; use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef}; @@ -207,12 +207,12 @@ pub struct BoundReference { // For example, if the field is `a.b.c`, then `field.name` is `c`, but `original_name` is `a.b.c`. column_name: String, field: NestedFieldRef, - accessor: Accessor, + accessor: StructAccessor, } impl BoundReference { /// Creates a new bound reference. - pub fn new(name: impl Into, field: NestedFieldRef, accessor: Accessor) -> Self { + pub fn new(name: impl Into, field: NestedFieldRef, accessor: StructAccessor) -> Self { Self { column_name: name.into(), field, @@ -226,7 +226,7 @@ impl BoundReference { } /// Get this BoundReference's Accessor - pub fn accessor(&self) -> &Accessor { + pub fn accessor(&self) -> &StructAccessor { &self.accessor } } @@ -244,7 +244,7 @@ pub type BoundTerm = BoundReference; mod tests { use std::sync::Arc; - use crate::expr::accessor::Accessor; + use crate::expr::accessor::{StructAccessor, StructAccessor}; use crate::expr::{Bind, BoundReference, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; @@ -271,7 +271,7 @@ mod tests { let expected_ref = BoundReference::new( "bar", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - Accessor::new(1, None), + StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)), ); assert_eq!(expected_ref, reference); @@ -285,7 +285,7 @@ mod tests { let expected_ref = BoundReference::new( "BAR", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - Accessor::new(1, None), + StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)), ); assert_eq!(expected_ref, reference); diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 3dac63e080..81d70a196b 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -374,7 +374,7 @@ impl ManifestEvalVisitor { } BoundPredicate::Unary(expr) => { let pos = expr.term().accessor().position(); - let field = &partitions[pos]; + let field = &partitions[pos as usize]; match expr.op() { PredicateOperator::IsNull => field.contains_null, @@ -392,7 +392,7 @@ impl ManifestEvalVisitor { } BoundPredicate::Binary(expr) => { let pos = expr.term().accessor().position(); - let _field = &partitions[pos]; + let _field = &partitions[pos as usize]; match expr.op() { PredicateOperator::LessThan => { @@ -426,7 +426,7 @@ impl ManifestEvalVisitor { } BoundPredicate::Set(expr) => { let pos = expr.term().accessor().position(); - let _field = &partitions[pos]; + let _field = &partitions[pos as usize]; match expr.op() { PredicateOperator::In => { diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 266792bdb3..0c1bd94ea0 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -18,7 +18,7 @@ //! This module defines schema in iceberg. use crate::error::Result; -use crate::expr::accessor::Accessor; +use crate::expr::accessor::StructAccessor; use crate::spec::datatypes::{ ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, @@ -55,7 +55,7 @@ pub struct Schema { lowercase_name_to_id: HashMap, id_to_name: HashMap, - field_id_to_accessor: HashMap, + field_id_to_accessor: HashMap, } impl PartialEq for Schema { @@ -106,12 +106,7 @@ impl SchemaBuilder { pub fn build(self) -> Result { let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0); - // TODO: handle nested field accessors and types - // base this on https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/Accessors.java#L214 - let mut field_id_to_accessor = HashMap::new(); - for (idx, field) in self.fields.iter().enumerate() { - field_id_to_accessor.insert(field.id, Accessor::new(idx, None)); - } + let field_id_to_accessor = self.build_accessors(); let r#struct = StructType::new(self.fields); let id_to_field = index_by_id(&r#struct)?; @@ -150,6 +145,49 @@ impl SchemaBuilder { }) } + fn build_accessors(&self) -> HashMap { + let mut map = HashMap::new(); + + for (pos, field) in self.fields.iter().enumerate() { + // add an accessor for this field + map.insert( + field.id, + StructAccessor::new(pos as i32, *field.field_type.clone()), + ); + + if let Type::Struct(nested) = field.field_type.as_ref() { + // add accessors for nested fields + for (field_id, accessor) in Self::build_accessors_nested(nested.fields()) { + map.insert(field_id, StructAccessor::wrap(pos as i32, accessor)); + } + } + } + + map + } + + fn build_accessors_nested(fields: &[NestedFieldRef]) -> Vec<(i32, StructAccessor)> { + let mut results = vec![]; + for (pos, field) in fields.iter().enumerate() { + if let Type::Struct(nested) = field.field_type.as_ref() { + let nested_accessors = Self::build_accessors_nested(nested.fields()); + + let wrapped_nested_accessors = nested_accessors + .into_iter() + .map(|(id, accessor)| (id, StructAccessor::wrap(pos as i32, accessor))); + + results.extend(wrapped_nested_accessors); + } + + results.push(( + field.id, + StructAccessor::new(pos as i32, *field.field_type.clone()), + )); + } + + results + } + fn validate_identifier_ids( r#struct: &StructType, id_to_field: &HashMap, @@ -276,7 +314,7 @@ impl Schema { } /// Get an accessor for retrieving data in a struct - pub fn accessor_for_field_id(&self, field_id: i32) -> &Accessor { + pub fn accessor_for_field_id(&self, field_id: i32) -> &StructAccessor { &self.field_id_to_accessor[&field_id] } } @@ -396,7 +434,7 @@ pub fn visit_schema(schema: &Schema, visitor: &mut V) -> Resul visitor.schema(schema, result) } -/// Creates an field id to field map. +/// Creates a field id to field map. pub fn index_by_id(r#struct: &StructType) -> Result> { struct IndexById(HashMap); diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index f31d64779d..df929ed22c 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1131,6 +1131,11 @@ impl Struct { }, ) } + + /// Gets a ref to the field at `position` within the `Struct` + pub fn get(&self, position: i32) -> &Literal { + &self.fields[position as usize] + } } /// An iterator that moves out of a struct. From 56d935c32f1481173872232ea69695598a06bea0 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Tue, 26 Mar 2024 07:09:38 +0000 Subject: [PATCH 12/17] feat: ensure visit/project are fallible, accessor returns option. Add rewrite_not call --- crates/iceberg/src/expr/accessor.rs | 4 +--- crates/iceberg/src/expr/predicate.rs | 8 ------- crates/iceberg/src/scan.rs | 32 ++++++++++++++-------------- crates/iceberg/src/spec/schema.rs | 4 ++-- 4 files changed, 19 insertions(+), 29 deletions(-) diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs index 8ee0e6a392..eb01be57e6 100644 --- a/crates/iceberg/src/expr/accessor.rs +++ b/crates/iceberg/src/expr/accessor.rs @@ -43,9 +43,7 @@ impl StructAccessor { match &self.inner_or_type { InnerOrType::Inner(inner) => match container.get(self.position) { Literal::Struct(wrapped) => inner.get(wrapped), - _ => { - unreachable!() - } + _ => { panic!("Should only be wrapping a Struct") } }, InnerOrType::Type(_) => container.get(self.position), } diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 877c1c630a..4ea1365b80 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -668,14 +668,6 @@ impl Display for BoundPredicate { } } -pub(crate) trait PredicateVisitor { - fn visit(predicate: Predicate) -> T; -} - -pub(crate) trait BoundPredicateVisitor { - fn visit(predicate: BoundPredicate) -> T; -} - #[cfg(test)] mod tests { use std::ops::Not; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 81d70a196b..2fb70eee73 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -72,7 +72,10 @@ impl<'a> TableScanBuilder<'a> { /// Specifies a predicate to use as a filter pub fn with_filter(mut self, predicate: Predicate) -> Self { - self.filter = Some(predicate); + + // calls rewrite_not to remove Not nodes, which must be absent + // when applying the manifest evaluator + self.filter = Some(predicate.rewrite_not()); self } @@ -341,7 +344,7 @@ impl ManifestEvalVisitor { let inclusive_projection = InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); - let unbound_partition_filter = inclusive_projection.project(&partition_filter); + let unbound_partition_filter = inclusive_projection.project(&partition_filter)?; Self::new( partition_schema_ref.clone(), @@ -456,29 +459,28 @@ impl InclusiveProjection { } } - pub(crate) fn project(&self, predicate: &BoundPredicate) -> Predicate { - // TODO: apply rewrite_not() as projection assumes that there are no NOT nodes + pub(crate) fn project(&self, predicate: &BoundPredicate) -> crate::Result { self.visit(predicate) } - fn visit(&self, bound_predicate: &BoundPredicate) -> Predicate { - match bound_predicate { + fn visit(&self, bound_predicate: &BoundPredicate) -> crate::Result { + Ok(match bound_predicate { BoundPredicate::AlwaysTrue => Predicate::AlwaysTrue, BoundPredicate::AlwaysFalse => Predicate::AlwaysFalse, BoundPredicate::And(expr) => Predicate::And(LogicalExpression::new( - expr.inputs().map(|expr| Box::new(self.visit(expr))), + expr.inputs().map(|expr| Box::new(self.visit(expr)?)), )), BoundPredicate::Or(expr) => Predicate::Or(LogicalExpression::new( - expr.inputs().map(|expr| Box::new(self.visit(expr))), + expr.inputs().map(|expr| Box::new(self.visit(expr)?)), )), BoundPredicate::Not(_) => { panic!("should not get here as NOT-rewriting should have removed NOT nodes") } - bp => self.visit_bound_predicate(bp), - } + bp => self.visit_bound_predicate(bp)?, + }) } - fn visit_bound_predicate(&self, predicate: &BoundPredicate) -> Predicate { + fn visit_bound_predicate(&self, predicate: &BoundPredicate) -> crate::Result { let field_id = match predicate { BoundPredicate::Unary(expr) => expr.field_id(), BoundPredicate::Binary(expr) => expr.field_id(), @@ -496,15 +498,13 @@ impl InclusiveProjection { } } - parts.iter().fold(Predicate::AlwaysTrue, |res, &part| { - // should this use ? instead of destructuring Ok() so that the whole call fails - // if the transform project() call errors? This would require changing the signature of `visit`. - if let Ok(Some(pred_for_part)) = part.transform.project(&part.name, predicate) { + Ok(parts.iter().fold(Predicate::AlwaysTrue, |res, &part| { + if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { res.and(pred_for_part) } else { res } - }) + })) } } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 0c1bd94ea0..065d749fd1 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -314,8 +314,8 @@ impl Schema { } /// Get an accessor for retrieving data in a struct - pub fn accessor_for_field_id(&self, field_id: i32) -> &StructAccessor { - &self.field_id_to_accessor[&field_id] + pub fn accessor_for_field_id(&self, field_id: i32) -> Option<&StructAccessor> { + self.field_id_to_accessor.get(&field_id) } } From 7bf331d90275b9f3e108b253f1391284a260df54 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 3 Apr 2024 08:26:30 +0100 Subject: [PATCH 13/17] feat: add BoundPredicateEvaluator --- crates/iceberg/src/expr/mod.rs | 6 +- crates/iceberg/src/expr/predicate.rs | 8 + .../visitors/bound_predicate_evaluator.rs | 310 ++++++++++++++++++ crates/iceberg/src/expr/visitors/mod.rs | 1 + 4 files changed, 323 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs create mode 100644 crates/iceberg/src/expr/visitors/mod.rs diff --git a/crates/iceberg/src/expr/mod.rs b/crates/iceberg/src/expr/mod.rs index dccafb79af..8c6c345d82 100644 --- a/crates/iceberg/src/expr/mod.rs +++ b/crates/iceberg/src/expr/mod.rs @@ -17,13 +17,15 @@ //! This module contains expressions. -mod term; use std::fmt::{Display, Formatter}; pub use term::*; + pub(crate) mod accessor; -mod predicate; +pub(crate) mod predicate; +pub(crate) mod term; +pub(crate) mod visitors; use crate::spec::SchemaRef; pub use predicate::*; diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 4ea1365b80..81c4aeb8ae 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -167,6 +167,10 @@ impl BinaryExpression { pub(crate) fn op(&self) -> &PredicateOperator { &self.op } + + pub(crate) fn literal(&self) -> &Datum { + &self.literal + } } impl BinaryExpression { @@ -229,6 +233,10 @@ impl SetExpression { pub(crate) fn op(&self) -> &PredicateOperator { &self.op } + + pub(crate) fn literals(&self) -> &FnvHashSet { + &self.literals + } } impl SetExpression { diff --git a/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs new file mode 100644 index 0000000000..71d1bd1b4f --- /dev/null +++ b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs @@ -0,0 +1,310 @@ +use fnv::FnvHashSet; +use crate::expr::{BoundPredicate, BoundReference, PredicateOperator}; +use crate::Result; +use crate::spec::Datum; + +pub trait BoundPredicateEvaluator { + fn visit(&mut self, node: &BoundPredicate) -> Result { + match node { + BoundPredicate::AlwaysTrue => self.always_true(), + BoundPredicate::AlwaysFalse => self.always_false(), + BoundPredicate::And(expr) => { + let [left_pred, right_pred] = expr.inputs(); + + let left_result = self.visit(left_pred)?; + let right_result = self.visit(right_pred)?; + + Ok(left_result && right_result) + } + BoundPredicate::Or(expr) => { + let [left_pred, right_pred] = expr.inputs(); + + let left_result = self.visit(left_pred)?; + let right_result = self.visit(right_pred)?; + + Ok(left_result || right_result) + } + BoundPredicate::Not(expr) => { + let [inner_pred] = expr.inputs(); + + let inner_result = self.visit(inner_pred)?; + + self.not(inner_result) + } + BoundPredicate::Unary(expr) => { + match expr.op() { + PredicateOperator::IsNull => { + self.is_null(expr.term()) + } + PredicateOperator::NotNull => { + self.not_null(expr.term()) + } + PredicateOperator::IsNan => { + self.is_nan(expr.term()) + } + PredicateOperator::NotNan => { + self.not_nan(expr.term()) + } + op => { panic!("Unexpected op for unary predicate: {}", &op) } + } + } + BoundPredicate::Binary(expr) => { + let reference = expr.term(); + let literal = expr.literal(); + match expr.op() { + PredicateOperator::LessThan => { + self.less_than(reference, literal) + } + PredicateOperator::LessThanOrEq => { + self.less_than_or_eq(reference, literal) + } + PredicateOperator::GreaterThan => { + self.greater_than(reference, literal) + } + PredicateOperator::GreaterThanOrEq => { + self.greater_than_or_eq(reference, literal) + } + PredicateOperator::Eq => { + self.eq(reference, literal) + } + PredicateOperator::NotEq => { + self.not_eq(reference, literal) + } + PredicateOperator::StartsWith => { + self.starts_with(reference, literal) + } + PredicateOperator::NotStartsWith => { + self.not_starts_with(reference, literal) + } + op => { panic!("Unexpected op for binary predicate: {}", &op) } + } + } + BoundPredicate::Set(expr) => { + let reference = expr.term(); + let literals = expr.literals(); + match expr.op() { + PredicateOperator::In => { + self.r#in(reference, literals) + } + PredicateOperator::NotIn => { + self.not_in(reference, literals) + } + op => { panic!("Unexpected op for set predicate: {}", &op) } + } + } + } + } + + // default implementations for logical operators + fn always_true(&mut self) -> Result { + Ok(true) + } + fn always_false(&mut self) -> Result { Ok(false) } + fn and(&mut self, lhs: bool, rhs: bool) -> Result { + Ok(lhs && rhs) + } + fn or(&mut self, lhs: bool, rhs: bool) -> Result { + Ok(lhs || rhs) + } + fn not(&mut self, inner: bool) -> Result { + Ok(!inner) + } + + // visitor methods for unary / binary / set operators must be implemented + fn is_null(&mut self, reference: &BoundReference) -> Result; + fn not_null(&mut self, reference: &BoundReference) -> Result; + fn is_nan(&mut self, reference: &BoundReference) -> Result; + fn not_nan(&mut self, reference: &BoundReference) -> Result; + fn less_than(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn less_than_or_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn greater_than(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn greater_than_or_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn not_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn not_starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn r#in(&mut self, reference: &BoundReference, literals: &FnvHashSet) -> Result; + fn not_in(&mut self, reference: &BoundReference, literals: &FnvHashSet) -> Result; +} + +#[cfg(test)] +mod tests { + use std::ops::Not; + use std::sync::Arc; + use fnv::FnvHashSet; + use crate::expr::{Bind, BoundReference, Predicate}; + use crate::expr::Predicate::{AlwaysFalse, AlwaysTrue}; + use crate::expr::visitors::bound_predicate_evaluator::BoundPredicateEvaluator; + use crate::spec::{Datum, Schema, SchemaRef}; + + struct TestEvaluator {} + impl BoundPredicateEvaluator for TestEvaluator { + fn is_null(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(true) + } + + fn not_null(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(false) + } + + fn is_nan(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(true) + } + + fn not_nan(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(false) + } + + fn less_than(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(true) + } + + fn less_than_or_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(false) + } + + fn greater_than(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(true) + } + + fn greater_than_or_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(false) + } + + fn eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(true) + } + + fn not_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(false) + } + + fn starts_with(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(true) + } + + fn not_starts_with(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(false) + } + + fn r#in(&mut self, _reference: &BoundReference, _literals: &FnvHashSet) -> crate::Result { + Ok(true) + } + + fn not_in(&mut self, _reference: &BoundReference, _literals: &FnvHashSet) -> crate::Result { + Ok(false) + } + } + + fn create_test_schema() -> SchemaRef { + let schema = Schema::builder().build().unwrap(); + + let schema_arc = Arc::new(schema); + schema_arc.clone() + } + + #[test] + fn test_default_default_always_true() { + let predicate = Predicate::AlwaysTrue; + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + } + + #[test] + fn test_default_default_always_false() { + let predicate = Predicate::AlwaysFalse; + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + } + + #[test] + fn test_default_default_logical_and() { + let predicate = AlwaysTrue.and(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + + let predicate = AlwaysFalse.and(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + + let predicate = AlwaysTrue.and(AlwaysTrue); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + } + + #[test] + fn test_default_default_logical_or() { + let predicate = AlwaysTrue.or(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + + let predicate = AlwaysFalse.or(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + + let predicate = AlwaysTrue.or(AlwaysTrue); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + } + + #[test] + fn test_default_default_not() { + let predicate = AlwaysFalse.not(); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + + let predicate = AlwaysTrue.not(); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + } +} \ No newline at end of file diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs new file mode 100644 index 0000000000..d5a445bcca --- /dev/null +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -0,0 +1 @@ +mod bound_predicate_evaluator; \ No newline at end of file From fbe09e9feffb4941730a64c2ddc984688d097a21 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 3 Apr 2024 08:29:18 +0100 Subject: [PATCH 14/17] feat: update accessors to use Arc and store their map as Arcs. --- crates/iceberg/src/expr/accessor.rs | 13 +++++++++---- crates/iceberg/src/expr/term.rs | 21 ++++++++++++++------- crates/iceberg/src/spec/schema.rs | 26 +++++++++++++++++--------- 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs index eb01be57e6..662adf7ed7 100644 --- a/crates/iceberg/src/expr/accessor.rs +++ b/crates/iceberg/src/expr/accessor.rs @@ -1,9 +1,10 @@ +use std::sync::Arc; use crate::spec::{Literal, Struct, Type}; use serde_derive::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] enum InnerOrType { - Inner(Box), + Inner(Arc), Type(Type), } @@ -13,6 +14,8 @@ pub struct StructAccessor { inner_or_type: InnerOrType, } +pub(crate) type StructAccessorRef = Arc; + impl StructAccessor { pub(crate) fn new(position: i32, r#type: Type) -> Self { StructAccessor { @@ -21,10 +24,10 @@ impl StructAccessor { } } - pub(crate) fn wrap(position: i32, inner: StructAccessor) -> Self { + pub(crate) fn wrap(position: i32, inner: StructAccessorRef) -> Self { StructAccessor { position, - inner_or_type: InnerOrType::Inner(Box::from(inner)), + inner_or_type: InnerOrType::Inner(inner), } } @@ -43,7 +46,9 @@ impl StructAccessor { match &self.inner_or_type { InnerOrType::Inner(inner) => match container.get(self.position) { Literal::Struct(wrapped) => inner.get(wrapped), - _ => { panic!("Should only be wrapping a Struct") } + _ => { + panic!("Nested accessor should only be wrapping a Struct") + } }, InnerOrType::Type(_) => container.get(self.position), } diff --git a/crates/iceberg/src/expr/term.rs b/crates/iceberg/src/expr/term.rs index 616a02ca4b..11b3299842 100644 --- a/crates/iceberg/src/expr/term.rs +++ b/crates/iceberg/src/expr/term.rs @@ -21,7 +21,7 @@ use std::fmt::{Display, Formatter}; use fnv::FnvHashSet; -use crate::expr::accessor::StructAccessor; +use crate::expr::accessor::{StructAccessor, StructAccessorRef}; use crate::expr::Bind; use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression}; use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef}; @@ -190,7 +190,12 @@ impl Bind for Reference { ) })?; - let accessor = schema.accessor_for_field_id(field.id); + let accessor = schema.accessor_for_field_id(field.id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Accessor for Field {} not found", self.name), + ) + })?; Ok(BoundReference::new( self.name.clone(), @@ -207,12 +212,12 @@ pub struct BoundReference { // For example, if the field is `a.b.c`, then `field.name` is `c`, but `original_name` is `a.b.c`. column_name: String, field: NestedFieldRef, - accessor: StructAccessor, + accessor: StructAccessorRef, } impl BoundReference { /// Creates a new bound reference. - pub fn new(name: impl Into, field: NestedFieldRef, accessor: StructAccessor) -> Self { + pub fn new(name: impl Into, field: NestedFieldRef, accessor: StructAccessorRef) -> Self { Self { column_name: name.into(), field, @@ -244,7 +249,7 @@ pub type BoundTerm = BoundReference; mod tests { use std::sync::Arc; - use crate::expr::accessor::{StructAccessor, StructAccessor}; + use crate::expr::accessor::StructAccessor; use crate::expr::{Bind, BoundReference, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; @@ -268,10 +273,11 @@ mod tests { let schema = table_schema_simple(); let reference = Reference::new("bar").bind(schema, true).unwrap(); + let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int))); let expected_ref = BoundReference::new( "bar", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)), + accessor_ref.clone(), ); assert_eq!(expected_ref, reference); @@ -282,10 +288,11 @@ mod tests { let schema = table_schema_simple(); let reference = Reference::new("BAR").bind(schema, false).unwrap(); + let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int))); let expected_ref = BoundReference::new( "BAR", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)), + accessor_ref.clone(), ); assert_eq!(expected_ref, reference); diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 065d749fd1..6ba0b831c1 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -55,7 +55,7 @@ pub struct Schema { lowercase_name_to_id: HashMap, id_to_name: HashMap, - field_id_to_accessor: HashMap, + field_id_to_accessor: HashMap>, } impl PartialEq for Schema { @@ -145,20 +145,23 @@ impl SchemaBuilder { }) } - fn build_accessors(&self) -> HashMap { + fn build_accessors(&self) -> HashMap> { let mut map = HashMap::new(); for (pos, field) in self.fields.iter().enumerate() { // add an accessor for this field + + let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone())); map.insert( field.id, - StructAccessor::new(pos as i32, *field.field_type.clone()), + accessor.clone(), ); if let Type::Struct(nested) = field.field_type.as_ref() { // add accessors for nested fields for (field_id, accessor) in Self::build_accessors_nested(nested.fields()) { - map.insert(field_id, StructAccessor::wrap(pos as i32, accessor)); + let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor)); + map.insert(field_id, new_accessor.clone()); } } } @@ -166,7 +169,7 @@ impl SchemaBuilder { map } - fn build_accessors_nested(fields: &[NestedFieldRef]) -> Vec<(i32, StructAccessor)> { + fn build_accessors_nested(fields: &[NestedFieldRef]) -> Vec<(i32, Arc)> { let mut results = vec![]; for (pos, field) in fields.iter().enumerate() { if let Type::Struct(nested) = field.field_type.as_ref() { @@ -174,14 +177,19 @@ impl SchemaBuilder { let wrapped_nested_accessors = nested_accessors .into_iter() - .map(|(id, accessor)| (id, StructAccessor::wrap(pos as i32, accessor))); + .map(|(id, accessor)| { + let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor)); + (id, new_accessor.clone()) + }); results.extend(wrapped_nested_accessors); } + let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone())); + results.push(( field.id, - StructAccessor::new(pos as i32, *field.field_type.clone()), + accessor.clone(), )); } @@ -314,8 +322,8 @@ impl Schema { } /// Get an accessor for retrieving data in a struct - pub fn accessor_for_field_id(&self, field_id: i32) -> Option<&StructAccessor> { - self.field_id_to_accessor.get(&field_id) + pub fn accessor_for_field_id(&self, field_id: i32) -> Option> { + self.field_id_to_accessor.get(&field_id).map(|acc|acc.clone()) } } From 35428c9d5defb9dd015d5519f0edd7fdaf067499 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 3 Apr 2024 08:33:26 +0100 Subject: [PATCH 15/17] fix: InclusiveProjection --- crates/iceberg/src/scan.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 2fb70eee73..5452d3e12d 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -19,7 +19,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::expr::BoundPredicate::AlwaysTrue; -use crate::expr::{Bind, BoundPredicate, LogicalExpression, Predicate, PredicateOperator}; +use crate::expr::{Bind, BoundPredicate, Predicate, PredicateOperator}; use crate::io::FileIO; use crate::spec::{ DataContentType, FieldSummary, ManifestEntryRef, ManifestFile, PartitionField, @@ -72,7 +72,6 @@ impl<'a> TableScanBuilder<'a> { /// Specifies a predicate to use as a filter pub fn with_filter(mut self, predicate: Predicate) -> Self { - // calls rewrite_not to remove Not nodes, which must be absent // when applying the manifest evaluator self.filter = Some(predicate.rewrite_not()); @@ -467,12 +466,14 @@ impl InclusiveProjection { Ok(match bound_predicate { BoundPredicate::AlwaysTrue => Predicate::AlwaysTrue, BoundPredicate::AlwaysFalse => Predicate::AlwaysFalse, - BoundPredicate::And(expr) => Predicate::And(LogicalExpression::new( - expr.inputs().map(|expr| Box::new(self.visit(expr)?)), - )), - BoundPredicate::Or(expr) => Predicate::Or(LogicalExpression::new( - expr.inputs().map(|expr| Box::new(self.visit(expr)?)), - )), + BoundPredicate::And(expr) => { + let [left_pred, right_pred] = expr.inputs(); + self.visit(left_pred)?.and(self.visit(right_pred)?) + }, + BoundPredicate::Or(expr) => { + let [left_pred, right_pred] = expr.inputs(); + self.visit(left_pred)?.or(self.visit(right_pred)?) + }, BoundPredicate::Not(_) => { panic!("should not get here as NOT-rewriting should have removed NOT nodes") } @@ -498,13 +499,13 @@ impl InclusiveProjection { } } - Ok(parts.iter().fold(Predicate::AlwaysTrue, |res, &part| { - if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { - res.and(pred_for_part) + parts.iter().fold(Ok(Predicate::AlwaysTrue), |res, &part| { + Ok(if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { + res?.and(pred_for_part) } else { - res - } - })) + res? + }) + }) } } From c663a7d8cdaafc1ad3246d07b0066789def9b2eb Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 3 Apr 2024 08:59:21 +0100 Subject: [PATCH 16/17] refactor: factor out PartitionEvaluator, use only ManifestEvaluator --- crates/iceberg/src/expr/accessor.rs | 2 +- crates/iceberg/src/expr/mod.rs | 1 - crates/iceberg/src/expr/term.rs | 6 +- .../visitors/bound_predicate_evaluator.rs | 132 ++++++++++-------- crates/iceberg/src/expr/visitors/mod.rs | 2 +- crates/iceberg/src/scan.rs | 96 +++++-------- crates/iceberg/src/spec/schema.rs | 17 +-- 7 files changed, 117 insertions(+), 139 deletions(-) diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs index 662adf7ed7..d96ba5c118 100644 --- a/crates/iceberg/src/expr/accessor.rs +++ b/crates/iceberg/src/expr/accessor.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; use crate::spec::{Literal, Struct, Type}; use serde_derive::{Deserialize, Serialize}; +use std::sync::Arc; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] enum InnerOrType { diff --git a/crates/iceberg/src/expr/mod.rs b/crates/iceberg/src/expr/mod.rs index 8c6c345d82..4136b39a8e 100644 --- a/crates/iceberg/src/expr/mod.rs +++ b/crates/iceberg/src/expr/mod.rs @@ -17,7 +17,6 @@ //! This module contains expressions. - use std::fmt::{Display, Formatter}; pub use term::*; diff --git a/crates/iceberg/src/expr/term.rs b/crates/iceberg/src/expr/term.rs index 11b3299842..4c29a72fb8 100644 --- a/crates/iceberg/src/expr/term.rs +++ b/crates/iceberg/src/expr/term.rs @@ -217,7 +217,11 @@ pub struct BoundReference { impl BoundReference { /// Creates a new bound reference. - pub fn new(name: impl Into, field: NestedFieldRef, accessor: StructAccessorRef) -> Self { + pub fn new( + name: impl Into, + field: NestedFieldRef, + accessor: StructAccessorRef, + ) -> Self { Self { column_name: name.into(), field, diff --git a/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs index 71d1bd1b4f..0583b4828d 100644 --- a/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs @@ -1,7 +1,7 @@ -use fnv::FnvHashSet; use crate::expr::{BoundPredicate, BoundReference, PredicateOperator}; -use crate::Result; use crate::spec::Datum; +use crate::Result; +use fnv::FnvHashSet; pub trait BoundPredicateEvaluator { fn visit(&mut self, node: &BoundPredicate) -> Result { @@ -31,65 +31,43 @@ pub trait BoundPredicateEvaluator { self.not(inner_result) } - BoundPredicate::Unary(expr) => { - match expr.op() { - PredicateOperator::IsNull => { - self.is_null(expr.term()) - } - PredicateOperator::NotNull => { - self.not_null(expr.term()) - } - PredicateOperator::IsNan => { - self.is_nan(expr.term()) - } - PredicateOperator::NotNan => { - self.not_nan(expr.term()) - } - op => { panic!("Unexpected op for unary predicate: {}", &op) } + BoundPredicate::Unary(expr) => match expr.op() { + PredicateOperator::IsNull => self.is_null(expr.term()), + PredicateOperator::NotNull => self.not_null(expr.term()), + PredicateOperator::IsNan => self.is_nan(expr.term()), + PredicateOperator::NotNan => self.not_nan(expr.term()), + op => { + panic!("Unexpected op for unary predicate: {}", &op) } - } + }, BoundPredicate::Binary(expr) => { let reference = expr.term(); let literal = expr.literal(); match expr.op() { - PredicateOperator::LessThan => { - self.less_than(reference, literal) - } - PredicateOperator::LessThanOrEq => { - self.less_than_or_eq(reference, literal) - } - PredicateOperator::GreaterThan => { - self.greater_than(reference, literal) - } + PredicateOperator::LessThan => self.less_than(reference, literal), + PredicateOperator::LessThanOrEq => self.less_than_or_eq(reference, literal), + PredicateOperator::GreaterThan => self.greater_than(reference, literal), PredicateOperator::GreaterThanOrEq => { self.greater_than_or_eq(reference, literal) } - PredicateOperator::Eq => { - self.eq(reference, literal) - } - PredicateOperator::NotEq => { - self.not_eq(reference, literal) - } - PredicateOperator::StartsWith => { - self.starts_with(reference, literal) + PredicateOperator::Eq => self.eq(reference, literal), + PredicateOperator::NotEq => self.not_eq(reference, literal), + PredicateOperator::StartsWith => self.starts_with(reference, literal), + PredicateOperator::NotStartsWith => self.not_starts_with(reference, literal), + op => { + panic!("Unexpected op for binary predicate: {}", &op) } - PredicateOperator::NotStartsWith => { - self.not_starts_with(reference, literal) - } - op => { panic!("Unexpected op for binary predicate: {}", &op) } } } BoundPredicate::Set(expr) => { let reference = expr.term(); let literals = expr.literals(); match expr.op() { - PredicateOperator::In => { - self.r#in(reference, literals) - } - PredicateOperator::NotIn => { - self.not_in(reference, literals) + PredicateOperator::In => self.r#in(reference, literals), + PredicateOperator::NotIn => self.not_in(reference, literals), + op => { + panic!("Unexpected op for set predicate: {}", &op) } - op => { panic!("Unexpected op for set predicate: {}", &op) } } } } @@ -99,7 +77,9 @@ pub trait BoundPredicateEvaluator { fn always_true(&mut self) -> Result { Ok(true) } - fn always_false(&mut self) -> Result { Ok(false) } + fn always_false(&mut self) -> Result { + Ok(false) + } fn and(&mut self, lhs: bool, rhs: bool) -> Result { Ok(lhs && rhs) } @@ -129,13 +109,13 @@ pub trait BoundPredicateEvaluator { #[cfg(test)] mod tests { - use std::ops::Not; - use std::sync::Arc; - use fnv::FnvHashSet; - use crate::expr::{Bind, BoundReference, Predicate}; - use crate::expr::Predicate::{AlwaysFalse, AlwaysTrue}; use crate::expr::visitors::bound_predicate_evaluator::BoundPredicateEvaluator; + use crate::expr::Predicate::{AlwaysFalse, AlwaysTrue}; + use crate::expr::{Bind, BoundReference, Predicate}; use crate::spec::{Datum, Schema, SchemaRef}; + use fnv::FnvHashSet; + use std::ops::Not; + use std::sync::Arc; struct TestEvaluator {} impl BoundPredicateEvaluator for TestEvaluator { @@ -155,19 +135,35 @@ mod tests { Ok(false) } - fn less_than(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn less_than( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { Ok(true) } - fn less_than_or_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn less_than_or_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { Ok(false) } - fn greater_than(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn greater_than( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { Ok(true) } - fn greater_than_or_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn greater_than_or_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { Ok(false) } @@ -179,19 +175,35 @@ mod tests { Ok(false) } - fn starts_with(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { Ok(true) } - fn not_starts_with(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn not_starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { Ok(false) } - fn r#in(&mut self, _reference: &BoundReference, _literals: &FnvHashSet) -> crate::Result { + fn r#in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + ) -> crate::Result { Ok(true) } - fn not_in(&mut self, _reference: &BoundReference, _literals: &FnvHashSet) -> crate::Result { + fn not_in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + ) -> crate::Result { Ok(false) } } @@ -307,4 +319,4 @@ mod tests { assert_eq!(result.unwrap(), false); } -} \ No newline at end of file +} diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs index d5a445bcca..c9fff601b8 100644 --- a/crates/iceberg/src/expr/visitors/mod.rs +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -1 +1 @@ -mod bound_predicate_evaluator; \ No newline at end of file +mod bound_predicate_evaluator; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5452d3e12d..88ec5b8c83 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -175,7 +175,7 @@ impl TableScan { pub async fn plan_files(&'static self) -> crate::Result { // Cache `PartitionEvaluator`s created as part of this scan - let mut partition_evaluator_cache: HashMap = HashMap::new(); + let mut manifest_evaluator_cache: HashMap = HashMap::new(); let snapshot = self.snapshot.clone(); let table_metadata = self.table_metadata.clone(); @@ -194,12 +194,13 @@ impl TableScan { // Use one from the cache if there is one. If not, create one, put it in // the cache, and take a reference to it. if let Some(filter) = self.filter.as_ref() { - let partition_evaluator = partition_evaluator_cache + let manifest_evaluator = manifest_evaluator_cache .entry(entry.partition_spec_id()) - .or_insert_with_key(|key| self.create_partition_evaluator(key, filter)); + .or_insert_with_key(|key| self.create_manifest_evaluator(key, filter)); + // reject any manifest files whose partition values don't match the filter. - if !partition_evaluator.filter_manifest_file(entry) { + if !manifest_evaluator.eval(entry) { continue; } } @@ -230,14 +231,20 @@ impl TableScan { .boxed()) } - fn create_partition_evaluator(&self, id: &i32, filter: &Predicate) -> PartitionEvaluator { + fn create_manifest_evaluator(&self, id: &i32, filter: &Predicate) -> ManifestEvaluator { let bound_predicate = filter .bind(self.schema.clone(), self.case_sensitive) .unwrap(); let partition_spec = self.table_metadata.partition_spec_by_id(*id).unwrap(); - PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone()) - .unwrap() + + ManifestEvaluator::new( + partition_spec.clone(), + self.schema.clone(), + bound_predicate, + self.case_sensitive, + ) + .unwrap() } pub async fn to_arrow(&'static self) -> crate::Result { @@ -271,35 +278,7 @@ impl FileScanTask { } } -/// Evaluates manifest files to see if their partition values comply with a filter predicate -pub struct PartitionEvaluator { - manifest_eval_visitor: ManifestEvalVisitor, -} - -impl PartitionEvaluator { - pub(crate) fn new( - partition_spec: PartitionSpecRef, - partition_filter: BoundPredicate, - table_schema: SchemaRef, - ) -> crate::Result { - let manifest_eval_visitor = ManifestEvalVisitor::manifest_evaluator( - partition_spec, - table_schema, - partition_filter, - true, - )?; - - Ok(PartitionEvaluator { - manifest_eval_visitor, - }) - } - - pub(crate) fn filter_manifest_file(&self, _manifest_file: &ManifestFile) -> bool { - self.manifest_eval_visitor.eval(_manifest_file) - } -} - -struct ManifestEvalVisitor { +struct ManifestEvaluator { #[allow(dead_code)] partition_schema: SchemaRef, partition_filter: BoundPredicate, @@ -307,22 +286,8 @@ struct ManifestEvalVisitor { case_sensitive: bool, } -impl ManifestEvalVisitor { - fn new( - partition_schema: SchemaRef, - partition_filter: Predicate, - case_sensitive: bool, - ) -> crate::Result { - let partition_filter = partition_filter.bind(partition_schema.clone(), case_sensitive)?; - - Ok(Self { - partition_schema, - partition_filter, - case_sensitive, - }) - } - - pub(crate) fn manifest_evaluator( +impl ManifestEvaluator { + pub(crate) fn new( partition_spec: PartitionSpecRef, table_schema: SchemaRef, partition_filter: BoundPredicate, @@ -345,11 +310,14 @@ impl ManifestEvalVisitor { InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); let unbound_partition_filter = inclusive_projection.project(&partition_filter)?; - Self::new( - partition_schema_ref.clone(), - unbound_partition_filter, + let partition_filter = + unbound_partition_filter.bind(partition_schema_ref.clone(), case_sensitive)?; + + Ok(Self { + partition_schema: partition_schema_ref, + partition_filter, case_sensitive, - ) + }) } pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> bool { @@ -469,11 +437,11 @@ impl InclusiveProjection { BoundPredicate::And(expr) => { let [left_pred, right_pred] = expr.inputs(); self.visit(left_pred)?.and(self.visit(right_pred)?) - }, + } BoundPredicate::Or(expr) => { let [left_pred, right_pred] = expr.inputs(); self.visit(left_pred)?.or(self.visit(right_pred)?) - }, + } BoundPredicate::Not(_) => { panic!("should not get here as NOT-rewriting should have removed NOT nodes") } @@ -500,11 +468,13 @@ impl InclusiveProjection { } parts.iter().fold(Ok(Predicate::AlwaysTrue), |res, &part| { - Ok(if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { - res?.and(pred_for_part) - } else { - res? - }) + Ok( + if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { + res?.and(pred_for_part) + } else { + res? + }, + ) }) } } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 6ba0b831c1..e2fee0d0ef 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -152,10 +152,7 @@ impl SchemaBuilder { // add an accessor for this field let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone())); - map.insert( - field.id, - accessor.clone(), - ); + map.insert(field.id, accessor.clone()); if let Type::Struct(nested) = field.field_type.as_ref() { // add accessors for nested fields @@ -175,9 +172,8 @@ impl SchemaBuilder { if let Type::Struct(nested) = field.field_type.as_ref() { let nested_accessors = Self::build_accessors_nested(nested.fields()); - let wrapped_nested_accessors = nested_accessors - .into_iter() - .map(|(id, accessor)| { + let wrapped_nested_accessors = + nested_accessors.into_iter().map(|(id, accessor)| { let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor)); (id, new_accessor.clone()) }); @@ -187,10 +183,7 @@ impl SchemaBuilder { let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone())); - results.push(( - field.id, - accessor.clone(), - )); + results.push((field.id, accessor.clone())); } results @@ -323,7 +316,7 @@ impl Schema { /// Get an accessor for retrieving data in a struct pub fn accessor_for_field_id(&self, field_id: i32) -> Option> { - self.field_id_to_accessor.get(&field_id).map(|acc|acc.clone()) + self.field_id_to_accessor.get(&field_id).cloned() } } From ae8de9b87ece8f7ae471cc964d5d26f6475c4fa0 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 3 Apr 2024 19:54:59 +0100 Subject: [PATCH 17/17] refactor: add BoundPredicateEvaluator, use for ManifestEvaluator, move inclusive projection and manifest evaluator into own files --- .../visitors/bound_predicate_evaluator.rs | 17 ++ .../src/expr/visitors/inclusive_projection.rs | 86 +++++++ .../src/expr/visitors/manifest_evaluator.rs | 181 ++++++++++++++ crates/iceberg/src/expr/visitors/mod.rs | 4 +- crates/iceberg/src/scan.rs | 230 ++---------------- 5 files changed, 302 insertions(+), 216 deletions(-) create mode 100644 crates/iceberg/src/expr/visitors/inclusive_projection.rs create mode 100644 crates/iceberg/src/expr/visitors/manifest_evaluator.rs diff --git a/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs index 0583b4828d..d37d839ca1 100644 --- a/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::expr::{BoundPredicate, BoundReference, PredicateOperator}; use crate::spec::Datum; use crate::Result; diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs new file mode 100644 index 0000000000..7ba0c0a942 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::{BoundPredicate, Predicate}; +use crate::spec::{PartitionField, PartitionSpecRef, SchemaRef}; + +pub(crate) struct InclusiveProjection { + #[allow(dead_code)] + table_schema: SchemaRef, + partition_spec: PartitionSpecRef, +} + +impl InclusiveProjection { + pub(crate) fn new(table_schema: SchemaRef, partition_spec: PartitionSpecRef) -> Self { + Self { + table_schema, + partition_spec, + } + } + + pub(crate) fn project(&self, predicate: &BoundPredicate) -> crate::Result { + self.visit(predicate) + } + + fn visit(&self, bound_predicate: &BoundPredicate) -> crate::Result { + Ok(match bound_predicate { + BoundPredicate::AlwaysTrue => Predicate::AlwaysTrue, + BoundPredicate::AlwaysFalse => Predicate::AlwaysFalse, + BoundPredicate::And(expr) => { + let [left_pred, right_pred] = expr.inputs(); + self.visit(left_pred)?.and(self.visit(right_pred)?) + } + BoundPredicate::Or(expr) => { + let [left_pred, right_pred] = expr.inputs(); + self.visit(left_pred)?.or(self.visit(right_pred)?) + } + BoundPredicate::Not(_) => { + panic!("should not get here as NOT-rewriting should have removed NOT nodes") + } + bp => self.visit_bound_predicate(bp)?, + }) + } + + fn visit_bound_predicate(&self, predicate: &BoundPredicate) -> crate::Result { + let field_id = match predicate { + BoundPredicate::Unary(expr) => expr.field_id(), + BoundPredicate::Binary(expr) => expr.field_id(), + BoundPredicate::Set(expr) => expr.field_id(), + _ => { + panic!("Should not get here as these branches handled in self.visit") + } + }; + + // TODO: cache this? + let mut parts: Vec<&PartitionField> = vec![]; + for partition_spec_field in &self.partition_spec.fields { + if partition_spec_field.source_id == field_id { + parts.push(partition_spec_field) + } + } + + parts.iter().fold(Ok(Predicate::AlwaysTrue), |res, &part| { + Ok( + if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { + res?.and(pred_for_part) + } else { + res? + }, + ) + }) + } +} diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs new file mode 100644 index 0000000000..cd002e5269 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::visitors::bound_predicate_evaluator::BoundPredicateEvaluator; +use crate::expr::visitors::inclusive_projection::InclusiveProjection; +use crate::expr::{Bind, BoundPredicate, BoundReference}; +use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef}; +use fnv::FnvHashSet; +use std::sync::Arc; + +pub(crate) struct ManifestEvaluatorFactory { + partition_schema: SchemaRef, + partition_filter: BoundPredicate, + case_sensitive: bool, +} + +impl ManifestEvaluatorFactory { + pub(crate) fn new( + partition_spec: PartitionSpecRef, + table_schema: SchemaRef, + partition_filter: BoundPredicate, + case_sensitive: bool, + ) -> crate::Result { + let partition_type = partition_spec.partition_type(&table_schema)?; + + // this is needed as SchemaBuilder.with_fields expects an iterator over + // Arc rather than &Arc + let cloned_partition_fields: Vec<_> = + partition_type.fields().iter().map(Arc::clone).collect(); + + let partition_schema = Schema::builder() + .with_fields(cloned_partition_fields) + .build()?; + + let partition_schema_ref = Arc::new(partition_schema); + + let inclusive_projection = + InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); + let unbound_partition_filter = inclusive_projection.project(&partition_filter)?; + + let partition_filter = + unbound_partition_filter.bind(partition_schema_ref.clone(), case_sensitive)?; + + Ok(Self { + partition_schema: partition_schema_ref, + partition_filter, + case_sensitive, + }) + } + + pub(crate) fn evaluate(&self, manifest_file: &ManifestFile) -> crate::Result { + if manifest_file.partitions.is_empty() { + return Ok(true); + } + + let mut evaluator = ManifestEvaluator::new(self, &manifest_file.partitions); + + evaluator.visit(&self.partition_filter) + } +} + +struct ManifestEvaluator<'a> { + manifest_evaluator_builder: &'a ManifestEvaluatorFactory, + partitions: &'a Vec, +} + +impl<'a> ManifestEvaluator<'a> { + fn new( + manifest_evaluator_builder: &'a ManifestEvaluatorFactory, + partitions: &'a Vec, + ) -> Self { + ManifestEvaluator { + manifest_evaluator_builder, + partitions, + } + } +} + +// Remove this annotation once all todos have been removed +#[allow(unused_variables)] +impl BoundPredicateEvaluator for ManifestEvaluator<'_> { + fn is_null(&mut self, reference: &BoundReference) -> crate::Result { + Ok(self.field_summary_for_reference(reference).contains_null) + } + + fn not_null(&mut self, reference: &BoundReference) -> crate::Result { + todo!() + } + + fn is_nan(&mut self, reference: &BoundReference) -> crate::Result { + Ok(self + .field_summary_for_reference(reference) + .contains_nan + .is_some()) + } + + fn not_nan(&mut self, reference: &BoundReference) -> crate::Result { + todo!() + } + + fn less_than(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + ) -> crate::Result { + todo!() + } + + fn greater_than(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + ) -> crate::Result { + todo!() + } + + fn eq(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn not_eq(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn not_starts_with( + &mut self, + reference: &BoundReference, + literal: &Datum, + ) -> crate::Result { + todo!() + } + + fn r#in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + ) -> crate::Result { + todo!() + } + + fn not_in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + ) -> crate::Result { + todo!() + } +} + +impl ManifestEvaluator<'_> { + fn field_summary_for_reference(&self, reference: &BoundReference) -> &FieldSummary { + let pos = reference.accessor().position(); + &self.partitions[pos as usize] + } +} diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs index c9fff601b8..3ecc14ac58 100644 --- a/crates/iceberg/src/expr/visitors/mod.rs +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -1 +1,3 @@ -mod bound_predicate_evaluator; +pub(crate) mod bound_predicate_evaluator; +pub(crate) mod inclusive_projection; +pub(crate) mod manifest_evaluator; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 88ec5b8c83..ca0b5d2024 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,13 +18,10 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; -use crate::expr::BoundPredicate::AlwaysTrue; -use crate::expr::{Bind, BoundPredicate, Predicate, PredicateOperator}; +use crate::expr::visitors::manifest_evaluator::ManifestEvaluatorFactory; +use crate::expr::{Bind, Predicate}; use crate::io::FileIO; -use crate::spec::{ - DataContentType, FieldSummary, ManifestEntryRef, ManifestFile, PartitionField, - PartitionSpecRef, Schema, SchemaRef, SnapshotRef, TableMetadataRef, -}; +use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; @@ -32,7 +29,6 @@ use async_stream::try_stream; use futures::stream::{iter, BoxStream}; use futures::StreamExt; use std::collections::HashMap; -use std::sync::Arc; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -174,8 +170,9 @@ impl TableScan { /// Returns a stream of file scan tasks. pub async fn plan_files(&'static self) -> crate::Result { - // Cache `PartitionEvaluator`s created as part of this scan - let mut manifest_evaluator_cache: HashMap = HashMap::new(); + // Cache `ManifestEvaluatorFactory`s created as part of this scan + let mut manifest_evaluator_factory_cache: HashMap = + HashMap::new(); let snapshot = self.snapshot.clone(); let table_metadata = self.table_metadata.clone(); @@ -194,13 +191,13 @@ impl TableScan { // Use one from the cache if there is one. If not, create one, put it in // the cache, and take a reference to it. if let Some(filter) = self.filter.as_ref() { - let manifest_evaluator = manifest_evaluator_cache + let manifest_eval_factory = manifest_evaluator_factory_cache .entry(entry.partition_spec_id()) - .or_insert_with_key(|key| self.create_manifest_evaluator(key, filter)); + .or_insert_with_key(|key| self.create_manifest_eval_factory(key, filter)); // reject any manifest files whose partition values don't match the filter. - if !manifest_evaluator.eval(entry) { + if !manifest_eval_factory.evaluate(entry)? { continue; } } @@ -231,14 +228,18 @@ impl TableScan { .boxed()) } - fn create_manifest_evaluator(&self, id: &i32, filter: &Predicate) -> ManifestEvaluator { + fn create_manifest_eval_factory( + &self, + id: &i32, + filter: &Predicate, + ) -> ManifestEvaluatorFactory { let bound_predicate = filter .bind(self.schema.clone(), self.case_sensitive) .unwrap(); let partition_spec = self.table_metadata.partition_spec_by_id(*id).unwrap(); - ManifestEvaluator::new( + ManifestEvaluatorFactory::new( partition_spec.clone(), self.schema.clone(), bound_predicate, @@ -278,207 +279,6 @@ impl FileScanTask { } } -struct ManifestEvaluator { - #[allow(dead_code)] - partition_schema: SchemaRef, - partition_filter: BoundPredicate, - #[allow(dead_code)] - case_sensitive: bool, -} - -impl ManifestEvaluator { - pub(crate) fn new( - partition_spec: PartitionSpecRef, - table_schema: SchemaRef, - partition_filter: BoundPredicate, - case_sensitive: bool, - ) -> crate::Result { - let partition_type = partition_spec.partition_type(&table_schema)?; - - // this is needed as SchemaBuilder.with_fields expects an iterator over - // Arc rather than &Arc - let cloned_partition_fields: Vec<_> = - partition_type.fields().iter().map(Arc::clone).collect(); - - let partition_schema = Schema::builder() - .with_fields(cloned_partition_fields) - .build()?; - - let partition_schema_ref = Arc::new(partition_schema); - - let inclusive_projection = - InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); - let unbound_partition_filter = inclusive_projection.project(&partition_filter)?; - - let partition_filter = - unbound_partition_filter.bind(partition_schema_ref.clone(), case_sensitive)?; - - Ok(Self { - partition_schema: partition_schema_ref, - partition_filter, - case_sensitive, - }) - } - - pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> bool { - if manifest_file.partitions.is_empty() { - return true; - } - - self.visit(&self.partition_filter, &manifest_file.partitions) - } - - // see https://github.com/apache/iceberg-python/blob/ea9da8856a686eaeda0d5c2be78d5e3102b67c44/pyiceberg/expressions/visitors.py#L548 - fn visit(&self, predicate: &BoundPredicate, partitions: &Vec) -> bool { - match predicate { - AlwaysTrue => true, - BoundPredicate::AlwaysFalse => false, - BoundPredicate::And(expr) => { - self.visit(expr.inputs()[0], partitions) && self.visit(expr.inputs()[1], partitions) - } - BoundPredicate::Or(expr) => { - self.visit(expr.inputs()[0], partitions) || self.visit(expr.inputs()[1], partitions) - } - BoundPredicate::Not(_) => { - panic!("NOT predicates should be eliminated before calling this function") - } - BoundPredicate::Unary(expr) => { - let pos = expr.term().accessor().position(); - let field = &partitions[pos as usize]; - - match expr.op() { - PredicateOperator::IsNull => field.contains_null, - PredicateOperator::NotNull => { - todo!() - } - PredicateOperator::IsNan => field.contains_nan.is_some(), - PredicateOperator::NotNan => { - todo!() - } - _ => { - panic!("unexpected op") - } - } - } - BoundPredicate::Binary(expr) => { - let pos = expr.term().accessor().position(); - let _field = &partitions[pos as usize]; - - match expr.op() { - PredicateOperator::LessThan => { - todo!() - } - PredicateOperator::LessThanOrEq => { - todo!() - } - PredicateOperator::GreaterThan => { - todo!() - } - PredicateOperator::GreaterThanOrEq => { - todo!() - } - PredicateOperator::Eq => { - todo!() - } - PredicateOperator::NotEq => { - todo!() - } - PredicateOperator::StartsWith => { - todo!() - } - PredicateOperator::NotStartsWith => { - todo!() - } - _ => { - panic!("unexpected op") - } - } - } - BoundPredicate::Set(expr) => { - let pos = expr.term().accessor().position(); - let _field = &partitions[pos as usize]; - - match expr.op() { - PredicateOperator::In => { - todo!() - } - PredicateOperator::NotIn => true, - _ => { - panic!("unexpected op") - } - } - } - } - } -} - -struct InclusiveProjection { - #[allow(dead_code)] - table_schema: SchemaRef, - partition_spec: PartitionSpecRef, -} - -impl InclusiveProjection { - pub(crate) fn new(table_schema: SchemaRef, partition_spec: PartitionSpecRef) -> Self { - Self { - table_schema, - partition_spec, - } - } - - pub(crate) fn project(&self, predicate: &BoundPredicate) -> crate::Result { - self.visit(predicate) - } - - fn visit(&self, bound_predicate: &BoundPredicate) -> crate::Result { - Ok(match bound_predicate { - BoundPredicate::AlwaysTrue => Predicate::AlwaysTrue, - BoundPredicate::AlwaysFalse => Predicate::AlwaysFalse, - BoundPredicate::And(expr) => { - let [left_pred, right_pred] = expr.inputs(); - self.visit(left_pred)?.and(self.visit(right_pred)?) - } - BoundPredicate::Or(expr) => { - let [left_pred, right_pred] = expr.inputs(); - self.visit(left_pred)?.or(self.visit(right_pred)?) - } - BoundPredicate::Not(_) => { - panic!("should not get here as NOT-rewriting should have removed NOT nodes") - } - bp => self.visit_bound_predicate(bp)?, - }) - } - - fn visit_bound_predicate(&self, predicate: &BoundPredicate) -> crate::Result { - let field_id = match predicate { - BoundPredicate::Unary(expr) => expr.field_id(), - BoundPredicate::Binary(expr) => expr.field_id(), - BoundPredicate::Set(expr) => expr.field_id(), - _ => { - panic!("Should not get here as these branches handled in self.visit") - } - }; - - // TODO: cache this? - let mut parts: Vec<&PartitionField> = vec![]; - for partition_spec_field in &self.partition_spec.fields { - if partition_spec_field.source_id == field_id { - parts.push(partition_spec_field) - } - } - - parts.iter().fold(Ok(Predicate::AlwaysTrue), |res, &part| { - Ok( - if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { - res?.and(pred_for_part) - } else { - res? - }, - ) - }) - } -} - // #[cfg(test)] // mod tests { // use crate::io::{FileIO, OutputFile};