Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async-trait = "0.1"
bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.5"
chrono = "0.4"
chrono = "~0.4.34"
derive_builder = "0.20.0"
either = "1"
env_logger = "0.11.0"
Expand Down
56 changes: 56 additions & 0 deletions crates/iceberg/src/expr/accessor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::spec::{Literal, Struct, Type};
use serde_derive::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
enum InnerOrType {
Inner(Arc<StructAccessor>),
Type(Type),
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct StructAccessor {
position: i32,
inner_or_type: InnerOrType,
}

pub(crate) type StructAccessorRef = Arc<StructAccessor>;

impl StructAccessor {
pub(crate) fn new(position: i32, r#type: Type) -> Self {
StructAccessor {
position,
inner_or_type: InnerOrType::Type(r#type),
}
}

pub(crate) fn wrap(position: i32, inner: StructAccessorRef) -> Self {
StructAccessor {
position,
inner_or_type: InnerOrType::Inner(inner),
}
}

pub fn position(&self) -> i32 {
self.position
}

fn r#type(&self) -> &Type {
match &self.inner_or_type {
InnerOrType::Inner(inner) => inner.r#type(),
InnerOrType::Type(r#type) => r#type,
}
}

fn get<'a>(&'a self, container: &'a Struct) -> &Literal {
match &self.inner_or_type {
InnerOrType::Inner(inner) => match container.get(self.position) {
Literal::Struct(wrapped) => inner.get(wrapped),
_ => {
panic!("Nested accessor should only be wrapping a Struct")
}
},
InnerOrType::Type(_) => container.get(self.position),
}
}
}
8 changes: 5 additions & 3 deletions crates/iceberg/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

//! This module contains expressions.

mod term;

use std::fmt::{Display, Formatter};

pub use term::*;
mod predicate;

pub(crate) mod accessor;
pub(crate) mod predicate;
pub(crate) mod term;
pub(crate) mod visitors;

use crate::spec::SchemaRef;
pub use predicate::*;
Expand Down
71 changes: 70 additions & 1 deletion crates/iceberg/src/expr/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<T: Debug, const N: usize> Debug for LogicalExpression<T, N> {
}

impl<T, const N: usize> LogicalExpression<T, N> {
fn new(inputs: [Box<T>; N]) -> Self {
pub(crate) fn new(inputs: [Box<T>; N]) -> Self {
Self { inputs }
}

Expand Down Expand Up @@ -116,6 +116,21 @@ impl<T> UnaryExpression<T> {
debug_assert!(op.is_unary());
Self { op, term }
}

pub(crate) fn term(&self) -> &T {
&self.term
}

pub(crate) fn op(&self) -> &PredicateOperator {
&self.op
}
}

impl UnaryExpression<BoundReference> {
/// get the field_id of this expression's term's field
pub(crate) fn field_id(&self) -> i32 {
self.term.field().id
}
}

/// Binary predicate, for example, `a > 10`.
Expand Down Expand Up @@ -144,6 +159,25 @@ impl<T> BinaryExpression<T> {
debug_assert!(op.is_binary());
Self { op, term, literal }
}

pub(crate) fn term(&self) -> &T {
&self.term
}

pub(crate) fn op(&self) -> &PredicateOperator {
&self.op
}

pub(crate) fn literal(&self) -> &Datum {
&self.literal
}
}

impl BinaryExpression<BoundReference> {
/// get the field_id of this expression's term's field
pub(crate) fn field_id(&self) -> i32 {
self.term.field().id
}
}

impl<T: Display> Display for BinaryExpression<T> {
Expand Down Expand Up @@ -191,6 +225,25 @@ impl<T> SetExpression<T> {
debug_assert!(op.is_set());
Self { op, term, literals }
}

pub(crate) fn term(&self) -> &T {
&self.term
}

pub(crate) fn op(&self) -> &PredicateOperator {
&self.op
}

pub(crate) fn literals(&self) -> &FnvHashSet<Datum> {
&self.literals
}
}

impl SetExpression<BoundReference> {
/// get the field_id of this expression's term's field
pub(crate) fn field_id(&self) -> i32 {
self.term.field().id
}
}

impl<T: Bind> Bind for SetExpression<T> {
Expand All @@ -217,6 +270,10 @@ impl<T: Display + Debug> Display for SetExpression<T> {
/// Unbound predicate expression before binding to a schema.
#[derive(Debug, PartialEq)]
pub enum Predicate {
/// AlwaysTrue predicate, for example, `TRUE`.
AlwaysTrue,
/// AlwaysFalse predicate, for example, `FALSE`.
AlwaysFalse,
/// And predicate, for example, `a > 10 AND b < 20`.
And(LogicalExpression<Predicate, 2>),
/// Or predicate, for example, `a > 10 OR b < 20`.
Expand Down Expand Up @@ -367,13 +424,21 @@ impl Bind for Predicate {
bound_literals,
)))
}
Predicate::AlwaysTrue => Ok(BoundPredicate::AlwaysTrue),
Predicate::AlwaysFalse => Ok(BoundPredicate::AlwaysFalse),
}
}
}

impl Display for Predicate {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Predicate::AlwaysTrue => {
write!(f, "TRUE")
}
Predicate::AlwaysFalse => {
write!(f, "FALSE")
}
Predicate::And(expr) => {
write!(f, "({}) AND ({})", expr.inputs()[0], expr.inputs()[1])
}
Expand Down Expand Up @@ -461,6 +526,8 @@ impl Predicate {
/// ```
pub fn negate(self) -> Predicate {
match self {
Predicate::AlwaysTrue => Predicate::AlwaysFalse,
Predicate::AlwaysFalse => Predicate::AlwaysTrue,
Predicate::And(expr) => Predicate::Or(LogicalExpression::new(
expr.inputs.map(|expr| Box::new(expr.negate())),
)),
Expand Down Expand Up @@ -525,6 +592,8 @@ impl Predicate {
Predicate::Unary(expr) => Predicate::Unary(expr),
Predicate::Binary(expr) => Predicate::Binary(expr),
Predicate::Set(expr) => Predicate::Set(expr),
Predicate::AlwaysTrue => Predicate::AlwaysTrue,
Predicate::AlwaysFalse => Predicate::AlwaysFalse,
}
}
}
Expand Down
33 changes: 31 additions & 2 deletions crates/iceberg/src/expr/term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fmt::{Display, Formatter};

use fnv::FnvHashSet;

use crate::expr::accessor::{StructAccessor, StructAccessorRef};
use crate::expr::Bind;
use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression};
use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef};
Expand Down Expand Up @@ -188,7 +189,19 @@ impl Bind for Reference {
format!("Field {} not found in schema", self.name),
)
})?;
Ok(BoundReference::new(self.name.clone(), field.clone()))

let accessor = schema.accessor_for_field_id(field.id).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Accessor for Field {} not found", self.name),
)
})?;

Ok(BoundReference::new(
self.name.clone(),
field.clone(),
accessor.clone(),
))
}
}

Expand All @@ -199,21 +212,32 @@ pub struct BoundReference {
// For example, if the field is `a.b.c`, then `field.name` is `c`, but `original_name` is `a.b.c`.
column_name: String,
field: NestedFieldRef,
accessor: StructAccessorRef,
}

impl BoundReference {
/// Creates a new bound reference.
pub fn new(name: impl Into<String>, field: NestedFieldRef) -> Self {
pub fn new(
name: impl Into<String>,
field: NestedFieldRef,
accessor: StructAccessorRef,
) -> Self {
Self {
column_name: name.into(),
field,
accessor,
}
}

/// Return the field of this reference.
pub fn field(&self) -> &NestedField {
&self.field
}

/// Get this BoundReference's Accessor
pub fn accessor(&self) -> &StructAccessor {
&self.accessor
}
}

impl Display for BoundReference {
Expand All @@ -229,6 +253,7 @@ pub type BoundTerm = BoundReference;
mod tests {
use std::sync::Arc;

use crate::expr::accessor::StructAccessor;
use crate::expr::{Bind, BoundReference, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};

Expand All @@ -252,9 +277,11 @@ mod tests {
let schema = table_schema_simple();
let reference = Reference::new("bar").bind(schema, true).unwrap();

let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)));
let expected_ref = BoundReference::new(
"bar",
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
accessor_ref.clone(),
);

assert_eq!(expected_ref, reference);
Expand All @@ -265,9 +292,11 @@ mod tests {
let schema = table_schema_simple();
let reference = Reference::new("BAR").bind(schema, false).unwrap();

let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)));
let expected_ref = BoundReference::new(
"BAR",
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
accessor_ref.clone(),
);

assert_eq!(expected_ref, reference);
Expand Down
Loading