Skip to content

Commit 1653c7c

Browse files
committed
Add an aditional test
1 parent 4860bb8 commit 1653c7c

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

pyiceberg/expressions/visitors.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -893,15 +893,28 @@ def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpr
893893
raise TypeError(f"Expected Bound Predicate, got: {predicate.term}")
894894

895895
def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
896-
file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id)
896+
field = predicate.term.ref().field
897+
file_column_name = self.file_schema.find_column_name(field.field_id)
897898

898899
if file_column_name is None:
899900
# In the case of schema evolution, the column might not be present
900-
# in the file schema when reading older data
901-
if isinstance(predicate, BoundIsNull):
902-
return AlwaysTrue()
901+
# we can use the default value as a constant and evaluate it against
902+
# the predicate
903+
pred: BooleanExpression
904+
if isinstance(predicate, BoundUnaryPredicate):
905+
pred = predicate.as_unbound(field.name)
906+
elif isinstance(predicate, BoundLiteralPredicate):
907+
pred = predicate.as_unbound(field.name, predicate.literal)
908+
elif isinstance(predicate, BoundSetPredicate):
909+
pred = predicate.as_unbound(field.name, predicate.literals)
903910
else:
904-
return AlwaysFalse()
911+
raise ValueError(f"Unsupported predicate: {predicate}")
912+
913+
return (
914+
AlwaysTrue()
915+
if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(field.initial_default))
916+
else AlwaysFalse()
917+
)
905918

906919
if isinstance(predicate, BoundUnaryPredicate):
907920
return predicate.as_unbound(file_column_name)

pyiceberg/io/pyarrow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1820,6 +1820,7 @@ def struct(
18201820
field_arrays.append(pa.nulls(len(struct_array), type=arrow_type))
18211821
else:
18221822
field_arrays.append(pa.repeat(field.initial_default, len(struct_array)))
1823+
fields.append(self._construct_field(field, arrow_type))
18231824
else:
18241825
raise ResolveError(f"Field is required, and could not be found in the file: {field}")
18251826

@@ -2251,7 +2252,7 @@ def parquet_path_to_id_mapping(
22512252
Compute the mapping of parquet column path to Iceberg ID.
22522253
22532254
For each column, the parquet file metadata has a path_in_schema attribute that follows
2254-
a specific naming scheme for nested columnds. This function computes a mapping of
2255+
a specific naming scheme for nested columns. This function computes a mapping of
22552256
the full paths to the corresponding Iceberg IDs.
22562257
22572258
Args:

tests/integration/test_reads.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from hive_metastore.ttypes import LockRequest, LockResponse, LockState, UnlockRequest
3030
from pyarrow.fs import S3FileSystem
3131
from pydantic_core import ValidationError
32+
from pyspark.sql import SparkSession
3233

3334
from pyiceberg.catalog import Catalog
3435
from pyiceberg.catalog.hive import HiveCatalog, _HiveClient
@@ -1024,3 +1025,30 @@ def test_scan_with_datetime(catalog: Catalog) -> None:
10241025

10251026
df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas()
10261027
assert len(df) == 0
1028+
1029+
1030+
@pytest.mark.integration
1031+
# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
1032+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
1033+
def test_initial_default(catalog: Catalog, spark: SparkSession) -> None:
1034+
identifier = "default.test_initial_default"
1035+
try:
1036+
catalog.drop_table(identifier)
1037+
except NoSuchTableError:
1038+
pass
1039+
1040+
one_column = pa.table([pa.nulls(10, pa.int32())], ["some_field"])
1041+
1042+
tbl = catalog.create_table(identifier, schema=one_column.schema, properties={"format-version": "2"})
1043+
1044+
tbl.append(one_column)
1045+
1046+
# Do the bump version through Spark, since PyIceberg does not support this (yet)
1047+
spark.sql(f"ALTER TABLE {identifier} SET TBLPROPERTIES('format-version'='3')")
1048+
1049+
with tbl.update_schema() as upd:
1050+
upd.add_column("so_true", BooleanType(), required=False, default_value=True)
1051+
1052+
result_table = tbl.scan().filter("so_true == True").to_arrow()
1053+
1054+
assert len(result_table) == 10

0 commit comments

Comments
 (0)