diff --git a/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs b/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs index f29afbf619..31baf91a14 100644 --- a/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs +++ b/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs @@ -41,54 +41,108 @@ pub trait BoundPredicateVisitor { fn not(&mut self, inner: Self::T) -> Result; /// Called after a predicate with an `IsNull` operator is visited - fn is_null(&mut self, reference: &BoundReference) -> Result; + fn is_null( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `NotNull` operator is visited - fn not_null(&mut self, reference: &BoundReference) -> Result; + fn not_null( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with an `IsNan` operator is visited - fn is_nan(&mut self, reference: &BoundReference) -> Result; + fn is_nan(&mut self, reference: &BoundReference, predicate: &BoundPredicate) + -> Result; /// Called after a predicate with a `NotNan` operator is visited - fn not_nan(&mut self, reference: &BoundReference) -> Result; + fn not_nan( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `LessThan` operator is visited - fn less_than(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn less_than( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `LessThanOrEq` operator is visited - fn less_than_or_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `GreaterThan` operator is visited - fn greater_than(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn greater_than( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `GreaterThanOrEq` operator is visited fn greater_than_or_eq( &mut self, reference: &BoundReference, literal: &Datum, + predicate: &BoundPredicate, ) -> Result; /// Called after a predicate with an `Eq` operator is visited - fn eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `NotEq` operator is visited - fn not_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn not_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `StartsWith` operator is visited - fn starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn starts_with( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `NotStartsWith` operator is visited - fn not_starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn not_starts_with( + &mut self, + reference: &BoundReference, + literal: &Datum, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with an `In` operator is visited - fn r#in(&mut self, reference: &BoundReference, literals: &FnvHashSet) - -> Result; + fn r#in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + predicate: &BoundPredicate, + ) -> Result; /// Called after a predicate with a `NotIn` operator is visited fn not_in( &mut self, reference: &BoundReference, literals: &FnvHashSet, + predicate: &BoundPredicate, ) -> Result; } @@ -125,10 +179,10 @@ pub(crate) fn visit( visitor.not(inner_result) } BoundPredicate::Unary(expr) => match expr.op() { - PredicateOperator::IsNull => visitor.is_null(expr.term()), - PredicateOperator::NotNull => visitor.not_null(expr.term()), - PredicateOperator::IsNan => visitor.is_nan(expr.term()), - PredicateOperator::NotNan => visitor.not_nan(expr.term()), + PredicateOperator::IsNull => visitor.is_null(expr.term(), predicate), + PredicateOperator::NotNull => visitor.not_null(expr.term(), predicate), + PredicateOperator::IsNan => visitor.is_nan(expr.term(), predicate), + PredicateOperator::NotNan => visitor.not_nan(expr.term(), predicate), op => { panic!("Unexpected op for unary predicate: {}", &op) } @@ -137,16 +191,22 @@ pub(crate) fn visit( let reference = expr.term(); let literal = expr.literal(); match expr.op() { - PredicateOperator::LessThan => visitor.less_than(reference, literal), - PredicateOperator::LessThanOrEq => visitor.less_than_or_eq(reference, literal), - PredicateOperator::GreaterThan => visitor.greater_than(reference, literal), + PredicateOperator::LessThan => visitor.less_than(reference, literal, predicate), + PredicateOperator::LessThanOrEq => { + visitor.less_than_or_eq(reference, literal, predicate) + } + PredicateOperator::GreaterThan => { + visitor.greater_than(reference, literal, predicate) + } PredicateOperator::GreaterThanOrEq => { - visitor.greater_than_or_eq(reference, literal) + visitor.greater_than_or_eq(reference, literal, predicate) + } + PredicateOperator::Eq => visitor.eq(reference, literal, predicate), + PredicateOperator::NotEq => visitor.not_eq(reference, literal, predicate), + PredicateOperator::StartsWith => visitor.starts_with(reference, literal, predicate), + PredicateOperator::NotStartsWith => { + visitor.not_starts_with(reference, literal, predicate) } - PredicateOperator::Eq => visitor.eq(reference, literal), - PredicateOperator::NotEq => visitor.not_eq(reference, literal), - PredicateOperator::StartsWith => visitor.starts_with(reference, literal), - PredicateOperator::NotStartsWith => visitor.not_starts_with(reference, literal), op => { panic!("Unexpected op for binary predicate: {}", &op) } @@ -156,8 +216,8 @@ pub(crate) fn visit( let reference = expr.term(); let literals = expr.literals(); match expr.op() { - PredicateOperator::In => visitor.r#in(reference, literals), - PredicateOperator::NotIn => visitor.not_in(reference, literals), + PredicateOperator::In => visitor.r#in(reference, literals, predicate), + PredicateOperator::NotIn => visitor.not_in(reference, literals, predicate), op => { panic!("Unexpected op for set predicate: {}", &op) } @@ -170,8 +230,8 @@ pub(crate) fn visit( mod tests { use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{ - BinaryExpression, Bind, BoundReference, Predicate, PredicateOperator, Reference, - SetExpression, UnaryExpression, + BinaryExpression, Bind, BoundPredicate, BoundReference, Predicate, PredicateOperator, + Reference, SetExpression, UnaryExpression, }; use crate::spec::{Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type}; use fnv::FnvHashSet; @@ -202,19 +262,35 @@ mod tests { Ok(!inner) } - fn is_null(&mut self, _reference: &BoundReference) -> crate::Result { + fn is_null( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { Ok(true) } - fn not_null(&mut self, _reference: &BoundReference) -> crate::Result { + fn not_null( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { Ok(false) } - fn is_nan(&mut self, _reference: &BoundReference) -> crate::Result { + fn is_nan( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { Ok(true) } - fn not_nan(&mut self, _reference: &BoundReference) -> crate::Result { + fn not_nan( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { Ok(false) } @@ -222,6 +298,7 @@ mod tests { &mut self, _reference: &BoundReference, _literal: &Datum, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(true) } @@ -230,6 +307,7 @@ mod tests { &mut self, _reference: &BoundReference, _literal: &Datum, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(false) } @@ -238,6 +316,7 @@ mod tests { &mut self, _reference: &BoundReference, _literal: &Datum, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(true) } @@ -246,15 +325,26 @@ mod tests { &mut self, _reference: &BoundReference, _literal: &Datum, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(false) } - fn eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { Ok(true) } - fn not_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + fn not_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { Ok(false) } @@ -262,6 +352,7 @@ mod tests { &mut self, _reference: &BoundReference, _literal: &Datum, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(true) } @@ -270,6 +361,7 @@ mod tests { &mut self, _reference: &BoundReference, _literal: &Datum, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(false) } @@ -278,6 +370,7 @@ mod tests { &mut self, _reference: &BoundReference, _literals: &FnvHashSet, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(true) } @@ -286,6 +379,7 @@ mod tests { &mut self, _reference: &BoundReference, _literals: &FnvHashSet, + _predicate: &BoundPredicate, ) -> crate::Result { Ok(false) } diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs new file mode 100644 index 0000000000..0014b49762 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -0,0 +1,444 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference, Predicate}; +use crate::spec::{Datum, PartitionField, PartitionSpecRef}; +use crate::Error; +use fnv::FnvHashSet; +use std::collections::HashMap; + +pub(crate) struct InclusiveProjection { + partition_spec: PartitionSpecRef, + cached_parts: HashMap>, +} + +impl InclusiveProjection { + pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self { + Self { + partition_spec, + cached_parts: HashMap::new(), + } + } + + fn get_parts_for_field_id(&mut self, field_id: i32) -> &Vec { + if let std::collections::hash_map::Entry::Vacant(e) = self.cached_parts.entry(field_id) { + let mut parts: Vec = vec![]; + for partition_spec_field in &self.partition_spec.fields { + if partition_spec_field.source_id == field_id { + parts.push(partition_spec_field.clone()) + } + } + + e.insert(parts); + } + + &self.cached_parts[&field_id] + } + + pub(crate) fn project(&mut self, predicate: &BoundPredicate) -> crate::Result { + visit(self, predicate) + } + + fn get_parts( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + // This could be made a bit neater if `try_reduce` ever becomes stable + self.get_parts_for_field_id(field_id) + .iter() + .try_fold(Predicate::AlwaysTrue, |res, part| { + Ok( + if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { + if res == Predicate::AlwaysTrue { + pred_for_part + } else { + res.and(pred_for_part) + } + } else { + res + }, + ) + }) + } +} + +impl BoundPredicateVisitor for InclusiveProjection { + type T = Predicate; + + fn always_true(&mut self) -> crate::Result { + Ok(Predicate::AlwaysTrue) + } + + fn always_false(&mut self) -> crate::Result { + Ok(Predicate::AlwaysFalse) + } + + fn and(&mut self, lhs: Self::T, rhs: Self::T) -> crate::Result { + Ok(lhs.and(rhs)) + } + + fn or(&mut self, lhs: Self::T, rhs: Self::T) -> crate::Result { + Ok(lhs.or(rhs)) + } + + fn not(&mut self, _inner: Self::T) -> crate::Result { + panic!("InclusiveProjection should not be performed against Predicates that contain a Not operator. Ensure that \"Rewrite Not\" gets applied to the originating Predicate before binding it.") + } + + fn is_null( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn not_null( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn is_nan( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn not_nan( + &mut self, + reference: &BoundReference, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn less_than( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn greater_than( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn not_eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn starts_with( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn not_starts_with( + &mut self, + reference: &BoundReference, + _literal: &Datum, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn r#in( + &mut self, + reference: &BoundReference, + _literals: &FnvHashSet, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } + + fn not_in( + &mut self, + reference: &BoundReference, + _literals: &FnvHashSet, + predicate: &BoundPredicate, + ) -> crate::Result { + self.get_parts(reference, predicate) + } +} + +#[cfg(test)] +mod tests { + use crate::expr::visitors::inclusive_projection::InclusiveProjection; + use crate::expr::{Bind, Predicate, Reference}; + use crate::spec::{ + Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type, + }; + use std::sync::Arc; + + fn build_test_schema() -> Schema { + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "a", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::required( + 2, + "date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::required( + 3, + "name", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap() + } + + #[test] + fn test_inclusive_projection_logic_ops() { + let schema = build_test_schema(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![]) + .build() + .unwrap(); + + let arc_schema = Arc::new(schema); + let arc_partition_spec = Arc::new(partition_spec); + + // this predicate contains only logic operators, + // AlwaysTrue, and AlwaysFalse. + let unbound_predicate = Predicate::AlwaysTrue + .and(Predicate::AlwaysFalse) + .or(Predicate::AlwaysTrue); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); + + // applying InclusiveProjection to bound_predicate + // should result in the same Predicate as the original + // `unbound_predicate`, since `InclusiveProjection` + // simply unbinds logic ops, AlwaysTrue, and AlwaysFalse. + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + assert_eq!(result.to_string(), "TRUE".to_string()) + } + + #[test] + fn test_inclusive_projection_identity_transform() { + let schema = build_test_schema(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![PartitionField::builder() + .source_id(1) + .name("a".to_string()) + .field_id(1) + .transform(Transform::Identity) + .build()]) + .build() + .unwrap(); + + let arc_schema = Arc::new(schema); + let arc_partition_spec = Arc::new(partition_spec); + + let unbound_predicate = Reference::new("a").less_than(Datum::int(10)); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); + + // applying InclusiveProjection to bound_predicate + // should result in the same Predicate as the original + // `unbound_predicate`, since we have just a single partition field, + // and it has an Identity transform + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + let expected = "a < 10".to_string(); + + assert_eq!(result.to_string(), expected) + } + + #[test] + fn test_inclusive_projection_date_transforms() { + let schema = build_test_schema(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![ + PartitionField::builder() + .source_id(2) + .name("year".to_string()) + .field_id(2) + .transform(Transform::Year) + .build(), + PartitionField::builder() + .source_id(2) + .name("month".to_string()) + .field_id(2) + .transform(Transform::Month) + .build(), + PartitionField::builder() + .source_id(2) + .name("day".to_string()) + .field_id(2) + .transform(Transform::Day) + .build(), + ]) + .build() + .unwrap(); + + let arc_schema = Arc::new(schema); + let arc_partition_spec = Arc::new(partition_spec); + + let unbound_predicate = + Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap()); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); + + // applying InclusiveProjection to bound_predicate + // should result in a predicate that correctly handles + // year, month and date + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + let expected = "((year <= 53) AND (month <= 647)) AND (day <= 19722)".to_string(); + + assert_eq!(result.to_string(), expected); + } + + #[test] + fn test_inclusive_projection_truncate_transform() { + let schema = build_test_schema(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![PartitionField::builder() + .source_id(3) + .name("name".to_string()) + .field_id(3) + .transform(Transform::Truncate(4)) + .build()]) + .build() + .unwrap(); + + let arc_schema = Arc::new(schema); + let arc_partition_spec = Arc::new(partition_spec); + + let unbound_predicate = Reference::new("name").starts_with(Datum::string("Testy McTest")); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); + + // applying InclusiveProjection to bound_predicate + // should result in the 'name STARTS WITH "Testy McTest"' + // predicate being transformed to 'name STARTS WITH "Test"', + // since a `Truncate(4)` partition will map values of + // name that start with "Testy McTest" into a partition + // for values of name that start with the first four letters + // of that, ie "Test". + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + let expected = "name STARTS WITH \"Test\"".to_string(); + + assert_eq!(result.to_string(), expected) + } + + #[test] + fn test_inclusive_projection_bucket_transform() { + let schema = build_test_schema(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![PartitionField::builder() + .source_id(1) + .name("a".to_string()) + .field_id(1) + .transform(Transform::Bucket(7)) + .build()]) + .build() + .unwrap(); + + let arc_schema = Arc::new(schema); + let arc_partition_spec = Arc::new(partition_spec); + + let unbound_predicate = Reference::new("a").equal_to(Datum::int(10)); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); + + // applying InclusiveProjection to bound_predicate + // should result in the "a = 10" predicate being + // transformed into "a = 2", since 10 gets bucketed + // to 2 with a Bucket(7) partition + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + let expected = "a = 2".to_string(); + + assert_eq!(result.to_string(), expected) + } +} diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs index 242a55c4b0..47651d289d 100644 --- a/crates/iceberg/src/expr/visitors/mod.rs +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -16,3 +16,4 @@ // under the License. pub(crate) mod bound_predicate_visitor; +pub(crate) mod inclusive_projection;