From d4bef091e139420a90b24e9a39df75d51ab97bb4 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 14 Mar 2024 20:47:11 +0100 Subject: [PATCH 1/6] wip: prototype project_transform --- crates/iceberg/src/expr/predicate.rs | 14 +++- crates/iceberg/src/spec/transform.rs | 104 +++++++++++++++++++++++++++ crates/iceberg/src/spec/values.rs | 4 ++ 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 67a46e2b11..12bb1beb36 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use crate::error::Result; use crate::expr::{Bind, BoundReference, PredicateOperator, Reference}; -use crate::spec::{Datum, SchemaRef}; +use crate::spec::{Datum, PrimitiveLiteral, SchemaRef}; use crate::{Error, ErrorKind}; /// Logical expression, such as `AND`, `OR`, `NOT`. @@ -116,6 +116,10 @@ impl UnaryExpression { debug_assert!(op.is_unary()); Self { op, term } } + + pub(crate) fn op(&self) -> PredicateOperator { + self.op + } } /// Binary predicate, for example, `a > 10`. @@ -144,6 +148,14 @@ impl BinaryExpression { debug_assert!(op.is_binary()); Self { op, term, literal } } + + pub(crate) fn op(&self) -> PredicateOperator { + self.op + } + + pub(crate) fn as_primitive_literal(&self) -> PrimitiveLiteral { + self.literal.literal() + } } impl Display for BinaryExpression { diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 839d582dc0..0c461b0751 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -18,11 +18,19 @@ //! Transforms in iceberg. use crate::error::{Error, Result}; +use crate::expr::{ + BinaryExpression, BoundPredicate, Predicate, PredicateOperator, Reference, UnaryExpression, +}; use crate::spec::datatypes::{PrimitiveType, Type}; +use crate::transform::create_transform_function; use crate::ErrorKind; +use arrow_array::{Int32Array, Int64Array}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::{Display, Formatter}; use std::str::FromStr; +use std::sync::Arc; + +use super::{Datum, PrimitiveLiteral}; /// Transform is used to transform predicates to partition predicates, /// in addition to transforming data values. @@ -261,6 +269,50 @@ impl Transform { _ => self == other, } } + /// Projects predicate to `Transform` + pub fn project(&self, name: String, pred: &BoundPredicate) -> Result> { + let func = create_transform_function(self)?; + + let projection = match self { + Transform::Bucket(_) => match pred { + BoundPredicate::Unary(expr) => Some(Predicate::Unary(UnaryExpression::new( + expr.op(), + Reference::new(name), + ))), + BoundPredicate::Binary(expr) => { + if expr.op() != PredicateOperator::Eq { + return Ok(None); + } + + let result = match expr.as_primitive_literal() { + PrimitiveLiteral::Int(v) => func + .transform(Arc::new(Int32Array::from_value(v, 1)))? + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Failed to downcast"))? + .value(0), + PrimitiveLiteral::Long(v) => func + .transform(Arc::new(Int64Array::from_value(v, 1)))? + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Failed to downcast"))? + .value(0), + _ => return Ok(None), + }; + + Some(Predicate::Binary(BinaryExpression::new( + expr.op(), + Reference::new(name), + Datum::int(result), + ))) + } + _ => unimplemented!(), + }, + _ => unimplemented!(), + }; + + Ok(projection) + } } impl Display for Transform { @@ -358,6 +410,12 @@ impl<'de> Deserialize<'de> for Transform { #[cfg(test)] mod tests { + use std::sync::Arc; + + use crate::expr::{ + BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, + UnaryExpression, + }; use crate::spec::datatypes::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, Timestamptz, Uuid, @@ -365,6 +423,8 @@ mod tests { use crate::spec::datatypes::Type::{Primitive, Struct}; use crate::spec::datatypes::{NestedField, StructType, Type}; use crate::spec::transform::Transform; + use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType}; + use crate::Result; struct TestParameter { display: String, @@ -398,6 +458,50 @@ mod tests { } } + #[test] + fn test_bucket_project_binary() -> Result<()> { + let trans = Transform::Bucket(8); + let name = "projected_name".to_string(); + + let field = NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)); + + let pred = BoundPredicate::Binary(BinaryExpression::new( + PredicateOperator::Eq, + BoundReference::new("original_name", Arc::new(field)), + Datum::int(5), + )); + + let literal = match trans.project(name, &pred)? { + Some(pred) => match pred { + Predicate::Binary(expr) => expr.as_primitive_literal(), + _ => unimplemented!(), + }, + _ => unimplemented!(), + }; + + assert_eq!(literal, PrimitiveLiteral::Int(7)); + + Ok(()) + } + + #[test] + fn test_bucket_project_unary() { + let trans = Transform::Bucket(8); + + let name = "projected_name".to_string(); + + let field = NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)); + + let pred = BoundPredicate::Unary(UnaryExpression::new( + PredicateOperator::IsNull, + BoundReference::new("original_name", Arc::new(field)), + )); + + let result = trans.project(name, &pred).unwrap().unwrap(); + + assert_eq!(format!("{result}"), "projected_name IS NULL"); + } + #[test] fn test_bucket_transform() { let trans = Transform::Bucket(8); diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 00f2e57d2b..962f3b575f 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -141,6 +141,10 @@ impl From for Literal { } impl Datum { + /// Prototype + pub fn literal(&self) -> PrimitiveLiteral { + self.literal.clone() + } /// Creates a boolean value. /// /// Example: From 16d01099b6fba00330118b42dbe166049b07a9cb Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 18 Mar 2024 14:12:49 +0100 Subject: [PATCH 2/6] add conversion fns --- crates/iceberg/src/spec/values.rs | 53 +++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 962f3b575f..052d6748a8 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -21,8 +21,11 @@ use std::fmt::{Display, Formatter}; use std::str::FromStr; +use std::sync::Arc; use std::{any::Any, collections::BTreeMap}; +use arrow_array::{ArrayRef, Int32Array, Int64Array}; +use arrow_schema::DataType; use bitvec::vec::BitVec; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; use ordered_float::OrderedFloat; @@ -141,10 +144,54 @@ impl From for Literal { } impl Datum { - /// Prototype - pub fn literal(&self) -> PrimitiveLiteral { - self.literal.clone() + /// Convert `Datum` into `arrow_array::ArrayRef` + pub fn to_arrow_array(&self) -> ArrayRef { + // TODO: Support more PrimitiveLiterals + match self.literal { + PrimitiveLiteral::Int(v) => Arc::new(Int32Array::from_value(v.clone(), 1)), + PrimitiveLiteral::Long(v) => Arc::new(Int64Array::from_value(v.clone(), 1)), + _ => todo!(), + } } + /// Creates `Datum` from `arrow_array::ArrayRef` + pub fn from_arrow_array(input: &ArrayRef) -> Result { + if input.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Input array must not be empty", + )); + } + + let downcast_err = || Error::new(ErrorKind::Unexpected, "Failed to downcast"); + + // TODO: Support more data_types + match input.data_type() { + DataType::Int32 => { + let arr = input + .as_any() + .downcast_ref::() + .ok_or_else(downcast_err)?; + + Ok(Self { + r#type: PrimitiveType::Int, + literal: PrimitiveLiteral::Int(arr.value(0)), + }) + } + DataType::Int64 => { + let arr = input + .as_any() + .downcast_ref::() + .ok_or_else(downcast_err)?; + + Ok(Self { + r#type: PrimitiveType::Long, + literal: PrimitiveLiteral::Long(arr.value(0)), + }) + } + _ => todo!(), + } + } + /// Creates a boolean value. /// /// Example: From febe05efc5f0ec569aaea8ae595716fa45d4e35c Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 18 Mar 2024 14:13:36 +0100 Subject: [PATCH 3/6] expose op and literal on Expressions --- crates/iceberg/src/expr/predicate.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 12bb1beb36..bd325a54c4 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use crate::error::Result; use crate::expr::{Bind, BoundReference, PredicateOperator, Reference}; -use crate::spec::{Datum, PrimitiveLiteral, SchemaRef}; +use crate::spec::{Datum, SchemaRef}; use crate::{Error, ErrorKind}; /// Logical expression, such as `AND`, `OR`, `NOT`. @@ -153,8 +153,8 @@ impl BinaryExpression { self.op } - pub(crate) fn as_primitive_literal(&self) -> PrimitiveLiteral { - self.literal.literal() + pub(crate) fn literal(&self) -> &Datum { + &self.literal } } @@ -199,6 +199,14 @@ impl SetExpression { debug_assert!(op.is_set()); Self { op, term, literals } } + + pub(crate) fn op(&self) -> PredicateOperator { + self.op + } + + pub(crate) fn literals(&self) -> &FnvHashSet { + &self.literals + } } impl Bind for SetExpression { From f3b8d80c86cfa8c5de3d32c1af0b2e838bbc6923 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 18 Mar 2024 14:15:43 +0100 Subject: [PATCH 4/6] impl project Transform::Bucket --- crates/iceberg/src/spec/transform.rs | 101 ++++++++++++++++++--------- 1 file changed, 68 insertions(+), 33 deletions(-) diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 0c461b0751..8438e1aee8 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -19,18 +19,18 @@ use crate::error::{Error, Result}; use crate::expr::{ - BinaryExpression, BoundPredicate, Predicate, PredicateOperator, Reference, UnaryExpression, + BinaryExpression, BoundPredicate, Predicate, PredicateOperator, Reference, SetExpression, + UnaryExpression, }; use crate::spec::datatypes::{PrimitiveType, Type}; use crate::transform::create_transform_function; use crate::ErrorKind; -use arrow_array::{Int32Array, Int64Array}; +use fnv::FnvHashSet; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::{Display, Formatter}; use std::str::FromStr; -use std::sync::Arc; -use super::{Datum, PrimitiveLiteral}; +use super::Datum; /// Transform is used to transform predicates to partition predicates, /// in addition to transforming data values. @@ -273,6 +273,7 @@ impl Transform { pub fn project(&self, name: String, pred: &BoundPredicate) -> Result> { let func = create_transform_function(self)?; + // TODO: Support other transforms let projection = match self { Transform::Bucket(_) => match pred { BoundPredicate::Unary(expr) => Some(Predicate::Unary(UnaryExpression::new( @@ -284,31 +285,37 @@ impl Transform { return Ok(None); } - let result = match expr.as_primitive_literal() { - PrimitiveLiteral::Int(v) => func - .transform(Arc::new(Int32Array::from_value(v, 1)))? - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Failed to downcast"))? - .value(0), - PrimitiveLiteral::Long(v) => func - .transform(Arc::new(Int64Array::from_value(v, 1)))? - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Failed to downcast"))? - .value(0), - _ => return Ok(None), - }; + let result = func.transform(expr.literal().to_arrow_array())?; Some(Predicate::Binary(BinaryExpression::new( expr.op(), Reference::new(name), - Datum::int(result), + Datum::from_arrow_array(&result)?, ))) } - _ => unimplemented!(), + BoundPredicate::Set(expr) => { + if expr.op() != PredicateOperator::In { + return Ok(None); + } + + let result: FnvHashSet = expr + .literals() + .iter() + .map(|lit| { + let res = func.transform(lit.to_arrow_array()).unwrap(); + Datum::from_arrow_array(&res).unwrap() + }) + .collect(); + + Some(Predicate::Set(SetExpression::new( + expr.op(), + Reference::new(name), + result, + ))) + } + _ => None, }, - _ => unimplemented!(), + _ => todo!(), }; Ok(projection) @@ -412,9 +419,11 @@ impl<'de> Deserialize<'de> for Transform { mod tests { use std::sync::Arc; + use fnv::FnvHashSet; + use crate::expr::{ - BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, - UnaryExpression, + BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference, + SetExpression, UnaryExpression, }; use crate::spec::datatypes::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, @@ -423,7 +432,7 @@ mod tests { use crate::spec::datatypes::Type::{Primitive, Struct}; use crate::spec::datatypes::{NestedField, StructType, Type}; use crate::spec::transform::Transform; - use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType}; + use crate::spec::{Datum, PrimitiveType}; use crate::Result; struct TestParameter { @@ -458,6 +467,32 @@ mod tests { } } + #[test] + fn test_bucket_project_set() -> Result<()> { + let trans = Transform::Bucket(8); + let name = "projected_name".to_string(); + + let field = NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)); + + let pred = BoundPredicate::Set(SetExpression::new( + PredicateOperator::In, + BoundReference::new("original_name", Arc::new(field)), + FnvHashSet::from_iter([Datum::int(5), Datum::int(6)]), + )); + + let expected = Some(Predicate::Set(SetExpression::new( + PredicateOperator::In, + Reference::new(&name), + FnvHashSet::from_iter([Datum::int(7), Datum::int(1)]), + ))); + + let result = trans.project(name, &pred)?; + + assert_eq!(result, expected); + + Ok(()) + } + #[test] fn test_bucket_project_binary() -> Result<()> { let trans = Transform::Bucket(8); @@ -471,15 +506,15 @@ mod tests { Datum::int(5), )); - let literal = match trans.project(name, &pred)? { - Some(pred) => match pred { - Predicate::Binary(expr) => expr.as_primitive_literal(), - _ => unimplemented!(), - }, - _ => unimplemented!(), - }; + let expected = Some(Predicate::Binary(BinaryExpression::new( + PredicateOperator::Eq, + Reference::new(&name), + Datum::int(7), + ))); + + let result = trans.project(name, &pred)?; - assert_eq!(literal, PrimitiveLiteral::Int(7)); + assert_eq!(result, expected); Ok(()) } From 3131e7a4d9b663069e820056c065a6bf81da5b10 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 18 Mar 2024 14:17:17 +0100 Subject: [PATCH 5/6] fix: clippy --- crates/iceberg/src/spec/values.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 052d6748a8..3147a77a78 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -148,8 +148,8 @@ impl Datum { pub fn to_arrow_array(&self) -> ArrayRef { // TODO: Support more PrimitiveLiterals match self.literal { - PrimitiveLiteral::Int(v) => Arc::new(Int32Array::from_value(v.clone(), 1)), - PrimitiveLiteral::Long(v) => Arc::new(Int64Array::from_value(v.clone(), 1)), + PrimitiveLiteral::Int(v) => Arc::new(Int32Array::from_value(v, 1)), + PrimitiveLiteral::Long(v) => Arc::new(Int64Array::from_value(v, 1)), _ => todo!(), } } From efa40704a434b3ae454717ced18d3ab506dbfeb1 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 18 Mar 2024 14:49:58 +0100 Subject: [PATCH 6/6] remove unwrap --- crates/iceberg/src/spec/transform.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 8438e1aee8..ec5d281b9a 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -298,20 +298,23 @@ impl Transform { return Ok(None); } - let result: FnvHashSet = expr + let projected_set: Result> = expr .literals() .iter() .map(|lit| { - let res = func.transform(lit.to_arrow_array()).unwrap(); - Datum::from_arrow_array(&res).unwrap() + func.transform(lit.to_arrow_array()) + .and_then(|arr| Datum::from_arrow_array(&arr)) }) .collect(); - Some(Predicate::Set(SetExpression::new( - expr.op(), - Reference::new(name), - result, - ))) + match projected_set { + Err(e) => return Err(e), + Ok(set) => Some(Predicate::Set(SetExpression::new( + expr.op(), + Reference::new(name), + set, + ))), + } } _ => None, },