Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,10 +896,15 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
field = predicate.term.ref().field
file_column_name = self.file_schema.find_column_name(field.field_id)

# In the case of schema evolution or column projection, the field might not be present in the file schema
if file_column_name is None:
# In the case of schema evolution, the column might not be present
# we can use the default value as a constant and evaluate it against
# the predicate
# If the field has no initial_default, return AlwaysTrue to include all rows for further evaluation.
# This ensures that the predicate is evaluated during row-level filtering rather than being eliminated
# at the file level, preserving the ability to apply more granular filtering later in the process.
if field.initial_default is None:
return AlwaysTrue()

# If the field has initial_default, use the default value as a constant and evaluate it against the predicate
pred: BooleanExpression
if isinstance(predicate, BoundUnaryPredicate):
pred = predicate.as_unbound(field.name)
Expand Down
202 changes: 202 additions & 0 deletions tests/expressions/test_visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@
expression_to_plain_format,
rewrite_not,
rewrite_to_dnf,
translate_column_names,
visit,
visit_bound_predicate,
)
from pyiceberg.manifest import ManifestFile, PartitionFieldSummary
from pyiceberg.schema import Accessor, Schema
from pyiceberg.typedef import Record
from pyiceberg.types import (
BooleanType,
DoubleType,
FloatType,
IcebergType,
Expand Down Expand Up @@ -1623,3 +1625,203 @@ def test_expression_evaluator_null() -> None:
assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False
assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False
assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True


def test_translate_column_names_simple_case(table_schema_simple: Schema) -> None:
"""Test translate_column_names with matching column names."""
# Create a bound expression using the original schema
unbound_expr = EqualTo("foo", "test_value")
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=table_schema_simple, case_sensitive=True))

# File schema has the same column names
file_schema = Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
schema_id=1,
)

# Translate column names
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should return the same unbound expression since column names match
assert isinstance(translated_expr, EqualTo)
assert translated_expr.term == Reference("foo")
assert translated_expr.literal == literal("test_value")


def test_translate_column_names_different_column_names() -> None:
"""Test translate_column_names with different column names in file schema."""
# Original schema
original_schema = Schema(
NestedField(field_id=1, name="original_name", field_type=StringType(), required=False),
schema_id=1,
)

# Create bound expression
unbound_expr = EqualTo("original_name", "test_value")
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True))

# File schema has different column name but same field ID
file_schema = Schema(
NestedField(field_id=1, name="file_column_name", field_type=StringType(), required=False),
schema_id=1,
)

# Translate column names
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should use the file schema's column name
assert isinstance(translated_expr, EqualTo)
assert translated_expr.term == Reference("file_column_name")
assert translated_expr.literal == literal("test_value")


def test_translate_column_names_missing_column() -> None:
"""Test translate_column_names when column is missing from file schema (schema evolution)."""
# Original schema
original_schema = Schema(
NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False, initial_default="default"),
NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42),
schema_id=1,
)

# Create bound expression for the missing column
unbound_expr = EqualTo("missing_col", 42)
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True))

# File schema only has the existing column (field_id=1), missing field_id=2
file_schema = Schema(
NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False),
schema_id=1,
)

# Translate column names
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should evaluate to AlwaysTrue because the default value (42) matches the literal (42)
assert translated_expr == AlwaysTrue()


def test_translate_column_names_missing_column_false_evaluation() -> None:
"""Test translate_column_names when missing column evaluates to false."""
# Original schema
original_schema = Schema(
NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10),
schema_id=1,
)

# Create bound expression that won't match the default value
unbound_expr = EqualTo("missing_col", 42) # default is 10, literal is 42
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True))

# File schema doesn't have this column
file_schema = Schema(
NestedField(field_id=1, name="other_col", field_type=StringType(), required=False),
schema_id=1,
)

# Translate column names
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should evaluate to AlwaysFalse because default value (10) doesn't match literal (42)
assert translated_expr == AlwaysFalse()


def test_translate_column_names_complex_expression() -> None:
"""Test translate_column_names with complex boolean expressions."""
# Original schema
original_schema = Schema(
NestedField(field_id=1, name="col1", field_type=StringType(), required=False),
NestedField(field_id=2, name="col2", field_type=IntegerType(), required=True),
schema_id=1,
)

# Create complex bound expression
unbound_expr = And(EqualTo("col1", "test"), GreaterThan("col2", 10))
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True))

# File schema has different column names
file_schema = Schema(
NestedField(field_id=1, name="file_col1", field_type=StringType(), required=False),
NestedField(field_id=2, name="file_col2", field_type=IntegerType(), required=True),
schema_id=1,
)

# Translate column names
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should be an And expression with translated column names
assert isinstance(translated_expr, And)
assert isinstance(translated_expr.left, EqualTo)
assert translated_expr.left.term == Reference("file_col1")
assert isinstance(translated_expr.right, GreaterThan)
assert translated_expr.right.term == Reference("file_col2")


def test_translate_column_names_case_sensitive() -> None:
"""Test translate_column_names with case sensitivity."""
# Original schema
original_schema = Schema(
NestedField(field_id=1, name="TestColumn", field_type=StringType(), required=False),
schema_id=1,
)

# Create bound expression
unbound_expr = EqualTo("TestColumn", "value")
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True))

# File schema has same field ID but different case
file_schema = Schema(
NestedField(field_id=1, name="testcolumn", field_type=StringType(), required=False),
schema_id=1,
)

# Translate with case sensitivity
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should use the file schema's column name (different case)
assert isinstance(translated_expr, EqualTo)
assert translated_expr.term == Reference("testcolumn")


def test_translate_column_names_always_true_false() -> None:
"""Test translate_column_names with AlwaysTrue and AlwaysFalse expressions."""
file_schema = Schema(
NestedField(field_id=1, name="col", field_type=StringType(), required=False),
schema_id=1,
)

# Test AlwaysTrue
translated_true = translate_column_names(AlwaysTrue(), file_schema, case_sensitive=True)
assert translated_true == AlwaysTrue()

# Test AlwaysFalse
translated_false = translate_column_names(AlwaysFalse(), file_schema, case_sensitive=True)
assert translated_false == AlwaysFalse()


def test_translate_column_names_column_projection_missing_field_no_initial_default() -> None:
"""Test translate_column_names for column projection when field is missing from file schema and has no initial_default."""
# Original schema with a field that has no initial_default (defaults to None)
original_schema = Schema(
NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False),
NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), # No initial_default specified
schema_id=1,
)

# Create bound expression for the missing column
unbound_expr = EqualTo("missing_col", 42)
bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True))

# File schema only has the existing column (field_id=1), missing field_id=2
file_schema = Schema(
NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False),
schema_id=1,
)

# Translate column names
translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True)

# Should evaluate to AlwaysTrue when field has no initial_default, allowing for further evaluation
assert translated_expr == AlwaysTrue()
10 changes: 10 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,16 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa
},
schema=schema,
)
# Test that the partition value is projected correctly
assert table.scan(row_filter="partition_id = 1").to_arrow() == pa.table(
{
"other_field": ["foo", "bar", "baz"],
"partition_id": [1, 1, 1],
},
schema=schema,
)
# Test that the partition value is not projected for a non-existing partition
assert len(table.scan(row_filter="partition_id = -1").to_arrow()) == 0


def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryCatalog) -> None:
Expand Down
Loading