diff --git a/Cargo.toml b/Cargo.toml index 7da16e00d4..9f4c34bba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ async-trait = "0.1" bimap = "0.6" bitvec = "1.0.1" bytes = "1.5" -chrono = "0.4" +chrono = "~0.4.34" derive_builder = "0.20.0" either = "1" env_logger = "0.11.0" diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs new file mode 100644 index 0000000000..d96ba5c118 --- /dev/null +++ b/crates/iceberg/src/expr/accessor.rs @@ -0,0 +1,56 @@ +use crate::spec::{Literal, Struct, Type}; +use serde_derive::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +enum InnerOrType { + Inner(Arc), + Type(Type), +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct StructAccessor { + position: i32, + inner_or_type: InnerOrType, +} + +pub(crate) type StructAccessorRef = Arc; + +impl StructAccessor { + pub(crate) fn new(position: i32, r#type: Type) -> Self { + StructAccessor { + position, + inner_or_type: InnerOrType::Type(r#type), + } + } + + pub(crate) fn wrap(position: i32, inner: StructAccessorRef) -> Self { + StructAccessor { + position, + inner_or_type: InnerOrType::Inner(inner), + } + } + + pub fn position(&self) -> i32 { + self.position + } + + fn r#type(&self) -> &Type { + match &self.inner_or_type { + InnerOrType::Inner(inner) => inner.r#type(), + InnerOrType::Type(r#type) => r#type, + } + } + + fn get<'a>(&'a self, container: &'a Struct) -> &Literal { + match &self.inner_or_type { + InnerOrType::Inner(inner) => match container.get(self.position) { + Literal::Struct(wrapped) => inner.get(wrapped), + _ => { + panic!("Nested accessor should only be wrapping a Struct") + } + }, + InnerOrType::Type(_) => container.get(self.position), + } + } +} diff --git a/crates/iceberg/src/expr/mod.rs b/crates/iceberg/src/expr/mod.rs index 0d329682e5..4136b39a8e 100644 --- a/crates/iceberg/src/expr/mod.rs +++ b/crates/iceberg/src/expr/mod.rs @@ -17,12 +17,14 @@ //! This module contains expressions. -mod term; - use std::fmt::{Display, Formatter}; pub use term::*; -mod predicate; + +pub(crate) mod accessor; +pub(crate) mod predicate; +pub(crate) mod term; +pub(crate) mod visitors; use crate::spec::SchemaRef; pub use predicate::*; diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index f8bcffe703..81c4aeb8ae 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -46,7 +46,7 @@ impl Debug for LogicalExpression { } impl LogicalExpression { - fn new(inputs: [Box; N]) -> Self { + pub(crate) fn new(inputs: [Box; N]) -> Self { Self { inputs } } @@ -116,6 +116,21 @@ impl UnaryExpression { debug_assert!(op.is_unary()); Self { op, term } } + + pub(crate) fn term(&self) -> &T { + &self.term + } + + pub(crate) fn op(&self) -> &PredicateOperator { + &self.op + } +} + +impl UnaryExpression { + /// get the field_id of this expression's term's field + pub(crate) fn field_id(&self) -> i32 { + self.term.field().id + } } /// Binary predicate, for example, `a > 10`. @@ -144,6 +159,25 @@ impl BinaryExpression { debug_assert!(op.is_binary()); Self { op, term, literal } } + + pub(crate) fn term(&self) -> &T { + &self.term + } + + pub(crate) fn op(&self) -> &PredicateOperator { + &self.op + } + + pub(crate) fn literal(&self) -> &Datum { + &self.literal + } +} + +impl BinaryExpression { + /// get the field_id of this expression's term's field + pub(crate) fn field_id(&self) -> i32 { + self.term.field().id + } } impl Display for BinaryExpression { @@ -191,6 +225,25 @@ impl SetExpression { debug_assert!(op.is_set()); Self { op, term, literals } } + + pub(crate) fn term(&self) -> &T { + &self.term + } + + pub(crate) fn op(&self) -> &PredicateOperator { + &self.op + } + + pub(crate) fn literals(&self) -> &FnvHashSet { + &self.literals + } +} + +impl SetExpression { + /// get the field_id of this expression's term's field + pub(crate) fn field_id(&self) -> i32 { + self.term.field().id + } } impl Bind for SetExpression { @@ -217,6 +270,10 @@ impl Display for SetExpression { /// Unbound predicate expression before binding to a schema. #[derive(Debug, PartialEq)] pub enum Predicate { + /// AlwaysTrue predicate, for example, `TRUE`. + AlwaysTrue, + /// AlwaysFalse predicate, for example, `FALSE`. + AlwaysFalse, /// And predicate, for example, `a > 10 AND b < 20`. And(LogicalExpression), /// Or predicate, for example, `a > 10 OR b < 20`. @@ -367,6 +424,8 @@ impl Bind for Predicate { bound_literals, ))) } + Predicate::AlwaysTrue => Ok(BoundPredicate::AlwaysTrue), + Predicate::AlwaysFalse => Ok(BoundPredicate::AlwaysFalse), } } } @@ -374,6 +433,12 @@ impl Bind for Predicate { impl Display for Predicate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { + Predicate::AlwaysTrue => { + write!(f, "TRUE") + } + Predicate::AlwaysFalse => { + write!(f, "FALSE") + } Predicate::And(expr) => { write!(f, "({}) AND ({})", expr.inputs()[0], expr.inputs()[1]) } @@ -461,6 +526,8 @@ impl Predicate { /// ``` pub fn negate(self) -> Predicate { match self { + Predicate::AlwaysTrue => Predicate::AlwaysFalse, + Predicate::AlwaysFalse => Predicate::AlwaysTrue, Predicate::And(expr) => Predicate::Or(LogicalExpression::new( expr.inputs.map(|expr| Box::new(expr.negate())), )), @@ -525,6 +592,8 @@ impl Predicate { Predicate::Unary(expr) => Predicate::Unary(expr), Predicate::Binary(expr) => Predicate::Binary(expr), Predicate::Set(expr) => Predicate::Set(expr), + Predicate::AlwaysTrue => Predicate::AlwaysTrue, + Predicate::AlwaysFalse => Predicate::AlwaysFalse, } } } diff --git a/crates/iceberg/src/expr/term.rs b/crates/iceberg/src/expr/term.rs index 15cb298172..4c29a72fb8 100644 --- a/crates/iceberg/src/expr/term.rs +++ b/crates/iceberg/src/expr/term.rs @@ -21,6 +21,7 @@ use std::fmt::{Display, Formatter}; use fnv::FnvHashSet; +use crate::expr::accessor::{StructAccessor, StructAccessorRef}; use crate::expr::Bind; use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression}; use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef}; @@ -188,7 +189,19 @@ impl Bind for Reference { format!("Field {} not found in schema", self.name), ) })?; - Ok(BoundReference::new(self.name.clone(), field.clone())) + + let accessor = schema.accessor_for_field_id(field.id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Accessor for Field {} not found", self.name), + ) + })?; + + Ok(BoundReference::new( + self.name.clone(), + field.clone(), + accessor.clone(), + )) } } @@ -199,14 +212,20 @@ pub struct BoundReference { // For example, if the field is `a.b.c`, then `field.name` is `c`, but `original_name` is `a.b.c`. column_name: String, field: NestedFieldRef, + accessor: StructAccessorRef, } impl BoundReference { /// Creates a new bound reference. - pub fn new(name: impl Into, field: NestedFieldRef) -> Self { + pub fn new( + name: impl Into, + field: NestedFieldRef, + accessor: StructAccessorRef, + ) -> Self { Self { column_name: name.into(), field, + accessor, } } @@ -214,6 +233,11 @@ impl BoundReference { pub fn field(&self) -> &NestedField { &self.field } + + /// Get this BoundReference's Accessor + pub fn accessor(&self) -> &StructAccessor { + &self.accessor + } } impl Display for BoundReference { @@ -229,6 +253,7 @@ pub type BoundTerm = BoundReference; mod tests { use std::sync::Arc; + use crate::expr::accessor::StructAccessor; use crate::expr::{Bind, BoundReference, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; @@ -252,9 +277,11 @@ mod tests { let schema = table_schema_simple(); let reference = Reference::new("bar").bind(schema, true).unwrap(); + let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int))); let expected_ref = BoundReference::new( "bar", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + accessor_ref.clone(), ); assert_eq!(expected_ref, reference); @@ -265,9 +292,11 @@ mod tests { let schema = table_schema_simple(); let reference = Reference::new("BAR").bind(schema, false).unwrap(); + let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int))); let expected_ref = BoundReference::new( "BAR", NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + accessor_ref.clone(), ); assert_eq!(expected_ref, reference); diff --git a/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs new file mode 100644 index 0000000000..d37d839ca1 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/bound_predicate_evaluator.rs @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::{BoundPredicate, BoundReference, PredicateOperator}; +use crate::spec::Datum; +use crate::Result; +use fnv::FnvHashSet; + +pub trait BoundPredicateEvaluator { + fn visit(&mut self, node: &BoundPredicate) -> Result { + match node { + BoundPredicate::AlwaysTrue => self.always_true(), + BoundPredicate::AlwaysFalse => self.always_false(), + BoundPredicate::And(expr) => { + let [left_pred, right_pred] = expr.inputs(); + + let left_result = self.visit(left_pred)?; + let right_result = self.visit(right_pred)?; + + Ok(left_result && right_result) + } + BoundPredicate::Or(expr) => { + let [left_pred, right_pred] = expr.inputs(); + + let left_result = self.visit(left_pred)?; + let right_result = self.visit(right_pred)?; + + Ok(left_result || right_result) + } + BoundPredicate::Not(expr) => { + let [inner_pred] = expr.inputs(); + + let inner_result = self.visit(inner_pred)?; + + self.not(inner_result) + } + BoundPredicate::Unary(expr) => match expr.op() { + PredicateOperator::IsNull => self.is_null(expr.term()), + PredicateOperator::NotNull => self.not_null(expr.term()), + PredicateOperator::IsNan => self.is_nan(expr.term()), + PredicateOperator::NotNan => self.not_nan(expr.term()), + op => { + panic!("Unexpected op for unary predicate: {}", &op) + } + }, + BoundPredicate::Binary(expr) => { + let reference = expr.term(); + let literal = expr.literal(); + match expr.op() { + PredicateOperator::LessThan => self.less_than(reference, literal), + PredicateOperator::LessThanOrEq => self.less_than_or_eq(reference, literal), + PredicateOperator::GreaterThan => self.greater_than(reference, literal), + PredicateOperator::GreaterThanOrEq => { + self.greater_than_or_eq(reference, literal) + } + PredicateOperator::Eq => self.eq(reference, literal), + PredicateOperator::NotEq => self.not_eq(reference, literal), + PredicateOperator::StartsWith => self.starts_with(reference, literal), + PredicateOperator::NotStartsWith => self.not_starts_with(reference, literal), + op => { + panic!("Unexpected op for binary predicate: {}", &op) + } + } + } + BoundPredicate::Set(expr) => { + let reference = expr.term(); + let literals = expr.literals(); + match expr.op() { + PredicateOperator::In => self.r#in(reference, literals), + PredicateOperator::NotIn => self.not_in(reference, literals), + op => { + panic!("Unexpected op for set predicate: {}", &op) + } + } + } + } + } + + // default implementations for logical operators + fn always_true(&mut self) -> Result { + Ok(true) + } + fn always_false(&mut self) -> Result { + Ok(false) + } + fn and(&mut self, lhs: bool, rhs: bool) -> Result { + Ok(lhs && rhs) + } + fn or(&mut self, lhs: bool, rhs: bool) -> Result { + Ok(lhs || rhs) + } + fn not(&mut self, inner: bool) -> Result { + Ok(!inner) + } + + // visitor methods for unary / binary / set operators must be implemented + fn is_null(&mut self, reference: &BoundReference) -> Result; + fn not_null(&mut self, reference: &BoundReference) -> Result; + fn is_nan(&mut self, reference: &BoundReference) -> Result; + fn not_nan(&mut self, reference: &BoundReference) -> Result; + fn less_than(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn less_than_or_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn greater_than(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn greater_than_or_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn not_eq(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn not_starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> Result; + fn r#in(&mut self, reference: &BoundReference, literals: &FnvHashSet) -> Result; + fn not_in(&mut self, reference: &BoundReference, literals: &FnvHashSet) -> Result; +} + +#[cfg(test)] +mod tests { + use crate::expr::visitors::bound_predicate_evaluator::BoundPredicateEvaluator; + use crate::expr::Predicate::{AlwaysFalse, AlwaysTrue}; + use crate::expr::{Bind, BoundReference, Predicate}; + use crate::spec::{Datum, Schema, SchemaRef}; + use fnv::FnvHashSet; + use std::ops::Not; + use std::sync::Arc; + + struct TestEvaluator {} + impl BoundPredicateEvaluator for TestEvaluator { + fn is_null(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(true) + } + + fn not_null(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(false) + } + + fn is_nan(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(true) + } + + fn not_nan(&mut self, _reference: &BoundReference) -> crate::Result { + Ok(false) + } + + fn less_than( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { + Ok(true) + } + + fn less_than_or_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { + Ok(false) + } + + fn greater_than( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { + Ok(true) + } + + fn greater_than_or_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { + Ok(false) + } + + fn eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(true) + } + + fn not_eq(&mut self, _reference: &BoundReference, _literal: &Datum) -> crate::Result { + Ok(false) + } + + fn starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { + Ok(true) + } + + fn not_starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + ) -> crate::Result { + Ok(false) + } + + fn r#in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + ) -> crate::Result { + Ok(true) + } + + fn not_in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + ) -> crate::Result { + Ok(false) + } + } + + fn create_test_schema() -> SchemaRef { + let schema = Schema::builder().build().unwrap(); + + let schema_arc = Arc::new(schema); + schema_arc.clone() + } + + #[test] + fn test_default_default_always_true() { + let predicate = Predicate::AlwaysTrue; + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + } + + #[test] + fn test_default_default_always_false() { + let predicate = Predicate::AlwaysFalse; + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + } + + #[test] + fn test_default_default_logical_and() { + let predicate = AlwaysTrue.and(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + + let predicate = AlwaysFalse.and(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + + let predicate = AlwaysTrue.and(AlwaysTrue); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + } + + #[test] + fn test_default_default_logical_or() { + let predicate = AlwaysTrue.or(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + + let predicate = AlwaysFalse.or(AlwaysFalse); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + + let predicate = AlwaysTrue.or(AlwaysTrue); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + } + + #[test] + fn test_default_default_not() { + let predicate = AlwaysFalse.not(); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), true); + + let predicate = AlwaysTrue.not(); + let bound_predicate = predicate.bind(create_test_schema(), false).unwrap(); + + let mut test_evaluator = TestEvaluator {}; + + let result = test_evaluator.visit(&bound_predicate); + + assert_eq!(result.unwrap(), false); + } +} diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs new file mode 100644 index 0000000000..7ba0c0a942 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::{BoundPredicate, Predicate}; +use crate::spec::{PartitionField, PartitionSpecRef, SchemaRef}; + +pub(crate) struct InclusiveProjection { + #[allow(dead_code)] + table_schema: SchemaRef, + partition_spec: PartitionSpecRef, +} + +impl InclusiveProjection { + pub(crate) fn new(table_schema: SchemaRef, partition_spec: PartitionSpecRef) -> Self { + Self { + table_schema, + partition_spec, + } + } + + pub(crate) fn project(&self, predicate: &BoundPredicate) -> crate::Result { + self.visit(predicate) + } + + fn visit(&self, bound_predicate: &BoundPredicate) -> crate::Result { + Ok(match bound_predicate { + BoundPredicate::AlwaysTrue => Predicate::AlwaysTrue, + BoundPredicate::AlwaysFalse => Predicate::AlwaysFalse, + BoundPredicate::And(expr) => { + let [left_pred, right_pred] = expr.inputs(); + self.visit(left_pred)?.and(self.visit(right_pred)?) + } + BoundPredicate::Or(expr) => { + let [left_pred, right_pred] = expr.inputs(); + self.visit(left_pred)?.or(self.visit(right_pred)?) + } + BoundPredicate::Not(_) => { + panic!("should not get here as NOT-rewriting should have removed NOT nodes") + } + bp => self.visit_bound_predicate(bp)?, + }) + } + + fn visit_bound_predicate(&self, predicate: &BoundPredicate) -> crate::Result { + let field_id = match predicate { + BoundPredicate::Unary(expr) => expr.field_id(), + BoundPredicate::Binary(expr) => expr.field_id(), + BoundPredicate::Set(expr) => expr.field_id(), + _ => { + panic!("Should not get here as these branches handled in self.visit") + } + }; + + // TODO: cache this? + let mut parts: Vec<&PartitionField> = vec![]; + for partition_spec_field in &self.partition_spec.fields { + if partition_spec_field.source_id == field_id { + parts.push(partition_spec_field) + } + } + + parts.iter().fold(Ok(Predicate::AlwaysTrue), |res, &part| { + Ok( + if let Some(pred_for_part) = part.transform.project(&part.name, predicate)? { + res?.and(pred_for_part) + } else { + res? + }, + ) + }) + } +} diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs new file mode 100644 index 0000000000..cd002e5269 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::visitors::bound_predicate_evaluator::BoundPredicateEvaluator; +use crate::expr::visitors::inclusive_projection::InclusiveProjection; +use crate::expr::{Bind, BoundPredicate, BoundReference}; +use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef}; +use fnv::FnvHashSet; +use std::sync::Arc; + +pub(crate) struct ManifestEvaluatorFactory { + partition_schema: SchemaRef, + partition_filter: BoundPredicate, + case_sensitive: bool, +} + +impl ManifestEvaluatorFactory { + pub(crate) fn new( + partition_spec: PartitionSpecRef, + table_schema: SchemaRef, + partition_filter: BoundPredicate, + case_sensitive: bool, + ) -> crate::Result { + let partition_type = partition_spec.partition_type(&table_schema)?; + + // this is needed as SchemaBuilder.with_fields expects an iterator over + // Arc rather than &Arc + let cloned_partition_fields: Vec<_> = + partition_type.fields().iter().map(Arc::clone).collect(); + + let partition_schema = Schema::builder() + .with_fields(cloned_partition_fields) + .build()?; + + let partition_schema_ref = Arc::new(partition_schema); + + let inclusive_projection = + InclusiveProjection::new(table_schema.clone(), partition_spec.clone()); + let unbound_partition_filter = inclusive_projection.project(&partition_filter)?; + + let partition_filter = + unbound_partition_filter.bind(partition_schema_ref.clone(), case_sensitive)?; + + Ok(Self { + partition_schema: partition_schema_ref, + partition_filter, + case_sensitive, + }) + } + + pub(crate) fn evaluate(&self, manifest_file: &ManifestFile) -> crate::Result { + if manifest_file.partitions.is_empty() { + return Ok(true); + } + + let mut evaluator = ManifestEvaluator::new(self, &manifest_file.partitions); + + evaluator.visit(&self.partition_filter) + } +} + +struct ManifestEvaluator<'a> { + manifest_evaluator_builder: &'a ManifestEvaluatorFactory, + partitions: &'a Vec, +} + +impl<'a> ManifestEvaluator<'a> { + fn new( + manifest_evaluator_builder: &'a ManifestEvaluatorFactory, + partitions: &'a Vec, + ) -> Self { + ManifestEvaluator { + manifest_evaluator_builder, + partitions, + } + } +} + +// Remove this annotation once all todos have been removed +#[allow(unused_variables)] +impl BoundPredicateEvaluator for ManifestEvaluator<'_> { + fn is_null(&mut self, reference: &BoundReference) -> crate::Result { + Ok(self.field_summary_for_reference(reference).contains_null) + } + + fn not_null(&mut self, reference: &BoundReference) -> crate::Result { + todo!() + } + + fn is_nan(&mut self, reference: &BoundReference) -> crate::Result { + Ok(self + .field_summary_for_reference(reference) + .contains_nan + .is_some()) + } + + fn not_nan(&mut self, reference: &BoundReference) -> crate::Result { + todo!() + } + + fn less_than(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + ) -> crate::Result { + todo!() + } + + fn greater_than(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + ) -> crate::Result { + todo!() + } + + fn eq(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn not_eq(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn starts_with(&mut self, reference: &BoundReference, literal: &Datum) -> crate::Result { + todo!() + } + + fn not_starts_with( + &mut self, + reference: &BoundReference, + literal: &Datum, + ) -> crate::Result { + todo!() + } + + fn r#in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + ) -> crate::Result { + todo!() + } + + fn not_in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + ) -> crate::Result { + todo!() + } +} + +impl ManifestEvaluator<'_> { + fn field_summary_for_reference(&self, reference: &BoundReference) -> &FieldSummary { + let pos = reference.accessor().position(); + &self.partitions[pos as usize] + } +} diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs new file mode 100644 index 0000000000..3ecc14ac58 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod bound_predicate_evaluator; +pub(crate) mod inclusive_projection; +pub(crate) mod manifest_evaluator; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 852bcafbb9..ca0b5d2024 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,6 +18,8 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; +use crate::expr::visitors::manifest_evaluator::ManifestEvaluatorFactory; +use crate::expr::{Bind, Predicate}; use crate::io::FileIO; use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; use crate::table::Table; @@ -26,6 +28,7 @@ use arrow_array::RecordBatch; use async_stream::try_stream; use futures::stream::{iter, BoxStream}; use futures::StreamExt; +use std::collections::HashMap; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -34,6 +37,8 @@ pub struct TableScanBuilder<'a> { column_names: Vec, snapshot_id: Option, batch_size: Option, + case_sensitive: bool, + filter: Option, } impl<'a> TableScanBuilder<'a> { @@ -43,6 +48,8 @@ impl<'a> TableScanBuilder<'a> { column_names: vec![], snapshot_id: None, batch_size: None, + case_sensitive: true, + filter: None, } } @@ -53,6 +60,20 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the scan's case sensitivity + pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self { + self.case_sensitive = case_sensitive; + self + } + + /// Specifies a predicate to use as a filter + pub fn with_filter(mut self, predicate: Predicate) -> Self { + // calls rewrite_not to remove Not nodes, which must be absent + // when applying the manifest evaluator + self.filter = Some(predicate.rewrite_not()); + self + } + /// Select all columns. pub fn select_all(mut self) -> Self { self.column_names.clear(); @@ -122,6 +143,8 @@ impl<'a> TableScanBuilder<'a> { column_names: self.column_names, schema, batch_size: self.batch_size, + case_sensitive: self.case_sensitive, + filter: self.filter, }) } } @@ -136,6 +159,8 @@ pub struct TableScan { column_names: Vec, schema: SchemaRef, batch_size: Option, + case_sensitive: bool, + filter: Option, } /// A stream of [`FileScanTask`]. @@ -143,7 +168,12 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result>; impl TableScan { /// Returns a stream of file scan tasks. - pub async fn plan_files(&self) -> crate::Result { + + pub async fn plan_files(&'static self) -> crate::Result { + // Cache `ManifestEvaluatorFactory`s created as part of this scan + let mut manifest_evaluator_factory_cache: HashMap = + HashMap::new(); + let snapshot = self.snapshot.clone(); let table_metadata = self.table_metadata.clone(); let file_io = self.file_io.clone(); @@ -155,8 +185,23 @@ impl TableScan { .await?; // Generate data file stream - let mut entries = iter(manifest_list.entries()); - while let Some(entry) = entries.next().await { + for entry in manifest_list.entries() { + // If this scan has a filter, check the partition evaluator cache for an existing + // PartitionEvaluator that matches this manifest's partition spec ID. + // Use one from the cache if there is one. If not, create one, put it in + // the cache, and take a reference to it. + if let Some(filter) = self.filter.as_ref() { + let manifest_eval_factory = manifest_evaluator_factory_cache + .entry(entry.partition_spec_id()) + .or_insert_with_key(|key| self.create_manifest_eval_factory(key, filter)); + + + // reject any manifest files whose partition values don't match the filter. + if !manifest_eval_factory.evaluate(entry)? { + continue; + } + } + let manifest = entry.load_manifest(&file_io).await?; let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); @@ -183,7 +228,27 @@ impl TableScan { .boxed()) } - pub async fn to_arrow(&self) -> crate::Result { + fn create_manifest_eval_factory( + &self, + id: &i32, + filter: &Predicate, + ) -> ManifestEvaluatorFactory { + let bound_predicate = filter + .bind(self.schema.clone(), self.case_sensitive) + .unwrap(); + + let partition_spec = self.table_metadata.partition_spec_by_id(*id).unwrap(); + + ManifestEvaluatorFactory::new( + partition_spec.clone(), + self.schema.clone(), + bound_predicate, + self.case_sensitive, + ) + .unwrap() + } + + pub async fn to_arrow(&'static self) -> crate::Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone()); @@ -214,326 +279,323 @@ impl FileScanTask { } } -#[cfg(test)] -mod tests { - use crate::io::{FileIO, OutputFile}; - use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, - ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, - }; - use crate::table::Table; - use crate::TableIdent; - use arrow_array::{ArrayRef, Int64Array, RecordBatch}; - use futures::TryStreamExt; - use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; - use parquet::basic::Compression; - use parquet::file::properties::WriterProperties; - use std::collections::HashMap; - use std::fs; - use std::fs::File; - use std::sync::Arc; - use tempfile::TempDir; - use tera::{Context, Tera}; - use uuid::Uuid; - - struct TableTestFixture { - table_location: String, - table: Table, - } - - impl TableTestFixture { - fn new() -> Self { - let tmp_dir = TempDir::new().unwrap(); - let table_location = tmp_dir.path().join("table1"); - let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); - let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); - let table_metadata1_location = table_location.join("metadata/v1.json"); - - let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) - .unwrap() - .build() - .unwrap(); - - let table_metadata = { - let template_json_str = fs::read_to_string(format!( - "{}/testdata/example_table_metadata_v2.json", - env!("CARGO_MANIFEST_DIR") - )) - .unwrap(); - let mut context = Context::new(); - context.insert("table_location", &table_location); - context.insert("manifest_list_1_location", &manifest_list1_location); - context.insert("manifest_list_2_location", &manifest_list2_location); - context.insert("table_metadata_1_location", &table_metadata1_location); - - let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); - serde_json::from_str::(&metadata_json).unwrap() - }; - - let table = Table::builder() - .metadata(table_metadata) - .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) - .file_io(file_io) - .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) - .build(); - - Self { - table_location: table_location.to_str().unwrap().to_string(), - table, - } - } - - fn next_manifest_file(&self) -> OutputFile { - self.table - .file_io() - .new_output(format!( - "{}/metadata/manifest_{}.avro", - self.table_location, - Uuid::new_v4() - )) - .unwrap() - } - - async fn setup_manifest_files(&mut self) { - let current_snapshot = self.table.metadata().current_snapshot().unwrap(); - let parent_snapshot = current_snapshot - .parent_snapshot(self.table.metadata()) - .unwrap(); - let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); - - // Write data files - let data_file_manifest = ManifestWriter::new( - self.next_manifest_file(), - current_snapshot.snapshot_id(), - vec![], - ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema((*current_schema).clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ - ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .build() - .unwrap(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Deleted) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/2.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(200))])) - .build() - .unwrap(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Existing) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/3.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(300))])) - .build() - .unwrap(), - ) - .build(), - ], - )) - .await - .unwrap(); - - // Write to manifest list - let mut manifest_list_write = ManifestListWriter::v2( - self.table - .file_io() - .new_output(current_snapshot.manifest_list()) - .unwrap(), - current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), - current_snapshot.sequence_number(), - ); - manifest_list_write - .add_manifests(vec![data_file_manifest].into_iter()) - .unwrap(); - manifest_list_write.close().await.unwrap(); - - // prepare data - let schema = { - let fields = - vec![ - arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "0".to_string(), - )])), - ]; - Arc::new(arrow_schema::Schema::new(fields)) - }; - let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; - let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); - - // Write the Parquet files - let props = WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build(); - - for n in 1..=3 { - let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); - let mut writer = - ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); - - writer.write(&to_write).expect("Writing batch"); - - // writer must be closed to write footer - writer.close().unwrap(); - } - } - } - - #[test] - fn test_table_scan_columns() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().select(["x", "y"]).build().unwrap(); - assert_eq!(vec!["x", "y"], table_scan.column_names); - - let table_scan = table - .scan() - .select(["x", "y"]) - .select(["z"]) - .build() - .unwrap(); - assert_eq!(vec!["z"], table_scan.column_names); - } - - #[test] - fn test_select_all() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().select_all().build().unwrap(); - assert!(table_scan.column_names.is_empty()); - } - - #[test] - fn test_select_no_exist_column() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().select(["x", "y", "z", "a"]).build(); - assert!(table_scan.is_err()); - } - - #[test] - fn test_table_scan_default_snapshot_id() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().build().unwrap(); - assert_eq!( - table.metadata().current_snapshot().unwrap().snapshot_id(), - table_scan.snapshot.snapshot_id() - ); - } - - #[test] - fn test_table_scan_non_exist_snapshot_id() { - let table = TableTestFixture::new().table; - - let table_scan = table.scan().snapshot_id(1024).build(); - assert!(table_scan.is_err()); - } - - #[test] - fn test_table_scan_with_snapshot_id() { - let table = TableTestFixture::new().table; - - let table_scan = table - .scan() - .snapshot_id(3051729675574597004) - .build() - .unwrap(); - assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004); - } - - #[tokio::test] - async fn test_plan_files_no_deletions() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; - - // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); - let mut tasks = table_scan - .plan_files() - .await - .unwrap() - .try_fold(vec![], |mut acc, task| async move { - acc.push(task); - Ok(acc) - }) - .await - .unwrap(); - - assert_eq!(tasks.len(), 2); - - tasks.sort_by_key(|t| t.data_file.file_path().to_string()); - - // Check first task is added data file - assert_eq!( - tasks[0].data_file.file_path(), - format!("{}/1.parquet", &fixture.table_location) - ); - - // Check second task is existing data file - assert_eq!( - tasks[1].data_file.file_path(), - format!("{}/3.parquet", &fixture.table_location) - ); - } - - #[tokio::test] - async fn test_open_parquet_no_deletions() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; - - // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); - - let batch_stream = table_scan.to_arrow().await.unwrap(); - - let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - - let col = batches[0].column_by_name("col").unwrap(); - - let int64_arr = col.as_any().downcast_ref::().unwrap(); - assert_eq!(int64_arr.value(0), 1); - } -} +// #[cfg(test)] +// mod tests { +// use crate::io::{FileIO, OutputFile}; +// use crate::spec::{ +// DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest, +// ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, +// ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, +// }; +// use crate::table::Table; +// use crate::TableIdent; +// use arrow_array::{ArrayRef, Int64Array, RecordBatch}; +// use futures::TryStreamExt; +// use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; +// use parquet::basic::Compression; +// use parquet::file::properties::WriterProperties; +// use std::collections::HashMap; +// use std::fs; +// use std::fs::File; +// use std::sync::Arc; +// use tempfile::TempDir; +// use tera::{Context, Tera}; +// use uuid::Uuid; +// +// struct TableTestFixture { +// table_location: String, +// table: Table, +// } +// +// impl TableTestFixture { +// fn new() -> Self { +// let tmp_dir = TempDir::new().unwrap(); +// let table_location = tmp_dir.path().join("table1"); +// let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); +// let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); +// let table_metadata1_location = table_location.join("metadata/v1.json"); +// +// let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) +// .unwrap() +// .build() +// .unwrap(); +// +// let table_metadata = { +// let template_json_str = fs::read_to_string(format!( +// "{}/testdata/example_table_metadata_v2.json", +// env!("CARGO_MANIFEST_DIR") +// )) +// .unwrap(); +// let mut context = Context::new(); +// context.insert("table_location", &table_location); +// context.insert("manifest_list_1_location", &manifest_list1_location); +// context.insert("manifest_list_2_location", &manifest_list2_location); +// context.insert("table_metadata_1_location", &table_metadata1_location); +// +// let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); +// serde_json::from_str::(&metadata_json).unwrap() +// }; +// +// let table = Table::builder() +// .metadata(table_metadata) +// .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) +// .file_io(file_io) +// .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) +// .build(); +// +// Self { +// table_location: table_location.to_str().unwrap().to_string(), +// table, +// } +// } +// +// fn next_manifest_file(&self) -> OutputFile { +// self.table +// .file_io() +// .new_output(format!( +// "{}/metadata/manifest_{}.avro", +// self.table_location, +// Uuid::new_v4() +// )) +// .unwrap() +// } +// +// async fn setup_manifest_files(&mut self) { +// let current_snapshot = self.table.metadata().current_snapshot().unwrap(); +// let parent_snapshot = current_snapshot +// .parent_snapshot(self.table.metadata()) +// .unwrap(); +// let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); +// let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); +// +// // Write data files +// let data_file_manifest = ManifestWriter::new( +// self.next_manifest_file(), +// current_snapshot.snapshot_id(), +// vec![], +// ) +// .write(Manifest::new( +// ManifestMetadata::builder() +// .schema((*current_schema).clone()) +// .content(ManifestContentType::Data) +// .format_version(FormatVersion::V2) +// .partition_spec((**current_partition_spec).clone()) +// .schema_id(current_schema.schema_id()) +// .build(), +// vec![ +// ManifestEntry::builder() +// .status(ManifestStatus::Added) +// .data_file( +// DataFile::builder() +// .content(DataContentType::Data) +// .file_path(format!("{}/1.parquet", &self.table_location)) +// .file_format(DataFileFormat::Parquet) +// .file_size_in_bytes(100) +// .record_count(1) +// .partition(Struct::from_iter([Some(Literal::long(100))])) +// .build(), +// ) +// .build(), +// ManifestEntry::builder() +// .status(ManifestStatus::Deleted) +// .snapshot_id(parent_snapshot.snapshot_id()) +// .sequence_number(parent_snapshot.sequence_number()) +// .file_sequence_number(parent_snapshot.sequence_number()) +// .data_file( +// DataFile::builder() +// .content(DataContentType::Data) +// .file_path(format!("{}/2.parquet", &self.table_location)) +// .file_format(DataFileFormat::Parquet) +// .file_size_in_bytes(100) +// .record_count(1) +// .partition(Struct::from_iter([Some(Literal::long(200))])) +// .build(), +// ) +// .build(), +// ManifestEntry::builder() +// .status(ManifestStatus::Existing) +// .snapshot_id(parent_snapshot.snapshot_id()) +// .sequence_number(parent_snapshot.sequence_number()) +// .file_sequence_number(parent_snapshot.sequence_number()) +// .data_file( +// DataFile::builder() +// .content(DataContentType::Data) +// .file_path(format!("{}/3.parquet", &self.table_location)) +// .file_format(DataFileFormat::Parquet) +// .file_size_in_bytes(100) +// .record_count(1) +// .partition(Struct::from_iter([Some(Literal::long(300))])) +// .build(), +// ) +// .build(), +// ], +// )) +// .await +// .unwrap(); +// +// // Write to manifest list +// let mut manifest_list_write = ManifestListWriter::v2( +// self.table +// .file_io() +// .new_output(current_snapshot.manifest_list()) +// .unwrap(), +// current_snapshot.snapshot_id(), +// current_snapshot +// .parent_snapshot_id() +// .unwrap_or(EMPTY_SNAPSHOT_ID), +// current_snapshot.sequence_number(), +// ); +// manifest_list_write +// .add_manifests(vec![data_file_manifest].into_iter()) +// .unwrap(); +// manifest_list_write.close().await.unwrap(); +// +// // prepare data +// let schema = { +// let fields = +// vec![ +// arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) +// .with_metadata(HashMap::from([( +// PARQUET_FIELD_ID_META_KEY.to_string(), +// "0".to_string(), +// )])), +// ]; +// Arc::new(arrow_schema::Schema::new(fields)) +// }; +// let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; +// let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); +// +// // Write the Parquet files +// let props = WriterProperties::builder() +// .set_compression(Compression::SNAPPY) +// .build(); +// +// for n in 1..=3 { +// let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); +// let mut writer = +// ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); +// +// writer.write(&to_write).expect("Writing batch"); +// +// // writer must be closed to write footer +// writer.close().unwrap(); +// } +// } +// } +// +// #[test] +// fn test_table_scan_columns() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().select(["x", "y"]).build().unwrap(); +// assert_eq!(vec!["x", "y"], table_scan.column_names); +// +// let table_scan = table +// .scan() +// .select(["x", "y"]) +// .select(["z"]) +// .build() +// .unwrap(); +// assert_eq!(vec!["z"], table_scan.column_names); +// } +// +// #[test] +// fn test_select_all() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().select_all().build().unwrap(); +// assert!(table_scan.column_names.is_empty()); +// } +// +// #[test] +// fn test_select_no_exist_column() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().select(["x", "y", "z", "a"]).build(); +// assert!(table_scan.is_err()); +// } +// +// #[test] +// fn test_table_scan_default_snapshot_id() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().build().unwrap(); +// assert_eq!( +// table.metadata().current_snapshot().unwrap().snapshot_id(), +// table_scan.snapshot.snapshot_id() +// ); +// } +// +// #[test] +// fn test_table_scan_non_exist_snapshot_id() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table.scan().snapshot_id(1024).build(); +// assert!(table_scan.is_err()); +// } +// +// #[test] +// fn test_table_scan_with_snapshot_id() { +// let table = TableTestFixture::new().table; +// +// let table_scan = table +// .scan() +// .snapshot_id(3051729675574597004) +// .build() +// .unwrap(); +// assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004); +// } +// +// #[tokio::test] +// async fn test_plan_files_no_deletions() { +// let mut fixture = TableTestFixture::new(); +// fixture.setup_manifest_files().await; +// +// // Create table scan for current snapshot and plan files +// let table_scan = fixture.table.scan().build().unwrap(); +// let mut tasks = table_scan +// .plan_files() +// .await +// .unwrap() +// .try_fold(vec![], |mut acc, task| async move { +// acc.push(task); +// Ok(acc) +// }) +// .await +// .unwrap(); +// +// assert_eq!(tasks.len(), 2); +// +// tasks.sort_by_key(|t| t.data_file.file_path().to_string()); +// +// // Check first task is added data file +// assert_eq!( +// tasks[0].data_file.file_path(), +// format!("{}/1.parquet", &fixture.table_location) +// ); +// +// // Check second task is existing data file +// assert_eq!( +// tasks[1].data_file.file_path(), +// format!("{}/3.parquet", &fixture.table_location) +// ); +// } +// +// #[tokio::test] +// async fn test_open_parquet_no_deletions() { +// let mut fixture = TableTestFixture::new(); +// fixture.setup_manifest_files().await; +// +// // Create table scan for current snapshot and plan files +// let table_scan = fixture.table.scan().build().unwrap(); +// +// let batch_stream = table_scan.to_arrow().await.unwrap(); +// +// let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); +// +// let col = batches[0].column_by_name("col").unwrap(); +// +// let int64_arr = col.as_any().downcast_ref::().unwrap(); +// assert_eq!(int64_arr.value(0), 1); +// } +// } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 44ac7cac50..0ed2d79235 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -101,6 +101,11 @@ impl Manifest { entries: entries.into_iter().map(Arc::new).collect(), } } + + /// Get the ID for this Manifest's partition_spec [`PartitionSpec`] + pub fn partition_spec_id(&self) -> i32 { + self.metadata.partition_spec.spec_id + } } /// A manifest writer. @@ -868,6 +873,11 @@ impl ManifestEntry { &self.data_file.file_path } + /// Get a reference to the Partition Struct of the data file of this manifest entry + pub fn get_partition_struct(&self) -> &Struct { + &self.data_file.partition + } + /// Inherit data from manifest list, such as snapshot id, sequence number. pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) { if self.snapshot_id.is_none() { diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index da63815d60..34d0571a0c 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -649,6 +649,10 @@ impl ManifestFile { Ok(Manifest::new(metadata, entries)) } + + pub(crate) fn partition_spec_id(&self) -> i32 { + self.partition_spec_id + } } /// Field summary for partition field in the spec. diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 975a2a9ef7..e2fee0d0ef 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -18,6 +18,7 @@ //! This module defines schema in iceberg. use crate::error::Result; +use crate::expr::accessor::StructAccessor; use crate::spec::datatypes::{ ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, @@ -53,6 +54,8 @@ pub struct Schema { name_to_id: HashMap, lowercase_name_to_id: HashMap, id_to_name: HashMap, + + field_id_to_accessor: HashMap>, } impl PartialEq for Schema { @@ -103,6 +106,8 @@ impl SchemaBuilder { pub fn build(self) -> Result { let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0); + let field_id_to_accessor = self.build_accessors(); + let r#struct = StructType::new(self.fields); let id_to_field = index_by_id(&r#struct)?; @@ -135,9 +140,55 @@ impl SchemaBuilder { name_to_id, lowercase_name_to_id, id_to_name, + + field_id_to_accessor, }) } + fn build_accessors(&self) -> HashMap> { + let mut map = HashMap::new(); + + for (pos, field) in self.fields.iter().enumerate() { + // add an accessor for this field + + let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone())); + map.insert(field.id, accessor.clone()); + + if let Type::Struct(nested) = field.field_type.as_ref() { + // add accessors for nested fields + for (field_id, accessor) in Self::build_accessors_nested(nested.fields()) { + let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor)); + map.insert(field_id, new_accessor.clone()); + } + } + } + + map + } + + fn build_accessors_nested(fields: &[NestedFieldRef]) -> Vec<(i32, Arc)> { + let mut results = vec![]; + for (pos, field) in fields.iter().enumerate() { + if let Type::Struct(nested) = field.field_type.as_ref() { + let nested_accessors = Self::build_accessors_nested(nested.fields()); + + let wrapped_nested_accessors = + nested_accessors.into_iter().map(|(id, accessor)| { + let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor)); + (id, new_accessor.clone()) + }); + + results.extend(wrapped_nested_accessors); + } + + let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone())); + + results.push((field.id, accessor.clone())); + } + + results + } + fn validate_identifier_ids( r#struct: &StructType, id_to_field: &HashMap, @@ -262,6 +313,11 @@ impl Schema { pub fn name_by_field_id(&self, field_id: i32) -> Option<&str> { self.id_to_name.get(&field_id).map(String::as_str) } + + /// Get an accessor for retrieving data in a struct + pub fn accessor_for_field_id(&self, field_id: i32) -> Option> { + self.field_id_to_accessor.get(&field_id).cloned() + } } impl Display for Schema { @@ -379,7 +435,7 @@ pub fn visit_schema(schema: &Schema, visitor: &mut V) -> Resul visitor.schema(schema, result) } -/// Creates an field id to field map. +/// Creates a field id to field map. pub fn index_by_id(r#struct: &StructType) -> Result> { struct IndexById(HashMap); diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 839d582dc0..65f4e6e552 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -18,6 +18,7 @@ //! Transforms in iceberg. use crate::error::{Error, Result}; +use crate::expr::{BoundPredicate, Predicate}; use crate::spec::datatypes::{PrimitiveType, Type}; use crate::ErrorKind; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -261,6 +262,13 @@ impl Transform { _ => self == other, } } + + /// Projects predicate to `Transform` + pub fn project(&self, _name: &str, _predicate: &BoundPredicate) -> Result> { + // Waiting on https://github.com/apache/iceberg-rust/pull/269 + // to deliver https://github.com/apache/iceberg-rust/issues/264 + todo!() + } } impl Display for Transform { diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index f31d64779d..df929ed22c 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1131,6 +1131,11 @@ impl Struct { }, ) } + + /// Gets a ref to the field at `position` within the `Struct` + pub fn get(&self, position: i32) -> &Literal { + &self.fields[position as usize] + } } /// An iterator that moves out of a struct.