Skip to content

Commit bdf19ab

Browse files
authored
fix: allow reading pyarrow timestamp as iceberg timestamptz (#2333)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change This PR fix reading pyarrow timestamp as Iceberg timestamptz type. It mirrors the pyarrow logic for dealing with pyarrow timestamp types [here](https://github.com/apache/iceberg-python/blob/8b43eb88fcc80b4980ce16b71852d211d7e93b13/pyiceberg/io/pyarrow.py#L1330-L1353) Two changes were made to `ArrowProjectionVisitor._cast_if_needed` 1. reorder the logic so that we handle dealing with timestamp first. Otherwise, it will try to `promote()` timestamp to timestamptz and fail. 2. allow casting when the pyarrow's value has `None` timezone. This is allowed because we gate on the target type has "UTC" timezone. It mirrors the java logic for reading with default UTC timezone ([1](https://github.com/apache/iceberg/blob/856cbf6eb8a85dee01c65ae6291274b700f76746/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java#L107-L116), [2](https://github.com/apache/iceberg/blob/856cbf6eb8a85dee01c65ae6291274b700f76746/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java#L35)) ### Context I ran into an interesting edge case while testing metadata virtualization between delta and iceberg. Delta has both [TIMESTAMP and TIMESTAMP_NTZ data types](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-datatypes). TIMESTAMP has a timezone while TIMESTAMP_NTZ has no timezone. While Iceberg has [timestamp and timestamptz](https://iceberg.apache.org/spec/#primitive-types). timestamp has no timezone and timestamptz has a timezone. So Delta's TIMESTAMP -> Iceberg timestamptz and Delta's TIMESTAMP_NTZ -> Iceberg timestamp. Regardless of delta or iceberg, the [parquet file stores timestamp without the timezone information](https://github.com/apache/parquet-format/blob/1dbc814b97c9307687a2e4bee55545ab6a2ef106/LogicalTypes.md#timestamp) So I end up a parquet file with timestamp column, and an iceberg table with timestamptz column, and pyiceberg cannot read this table. Its hard to recreate the scenario but i did trace it to the `_to_requested_schema` function. I added a unit test for this case. The issue is that `ArrowProjectionVisitor._cast_if_needed` will try to promote `timestamp` to `timstamptz` and this is not a valid promotion. ``` E pyiceberg.exceptions.ResolveError: Cannot promote timestamp to timestamptz ``` https://github.com/apache/iceberg-python/blob/640c592b705db01199020db8e5f2b6e2c67f06bf/pyiceberg/io/pyarrow.py#L1779-L1782 The `elif` case below that can handle this case https://github.com/apache/iceberg-python/blob/640c592b705db01199020db8e5f2b6e2c67f06bf/pyiceberg/io/pyarrow.py#L1800-L1806 So maybe we just need to switch the order of execution... This was also an interesting read.. https://arrow.apache.org/docs/python/timestamps.html # Are these changes tested? # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 5acca48 commit bdf19ab

File tree

2 files changed

+50
-9
lines changed

2 files changed

+50
-9
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,14 +1788,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17881788
file_field = self._file_schema.find_field(field.field_id)
17891789

17901790
if field.field_type.is_primitive:
1791-
if field.field_type != file_field.field_type:
1792-
target_schema = schema_to_pyarrow(
1793-
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
1794-
)
1795-
if self._use_large_types is False:
1796-
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
1797-
return values.cast(target_schema)
1798-
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
1791+
if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
17991792
if field.field_type == TimestampType():
18001793
# Downcasting of nanoseconds to microseconds
18011794
if (
@@ -1814,13 +1807,22 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
18141807
pa.types.is_timestamp(target_type)
18151808
and target_type.tz == "UTC"
18161809
and pa.types.is_timestamp(values.type)
1817-
and values.type.tz in UTC_ALIASES
1810+
and (values.type.tz in UTC_ALIASES or values.type.tz is None)
18181811
):
18191812
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
18201813
return values.cast(target_type, safe=False)
18211814
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
18221815
return values.cast(target_type)
18231816
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
1817+
1818+
if field.field_type != file_field.field_type:
1819+
target_schema = schema_to_pyarrow(
1820+
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
1821+
)
1822+
if self._use_large_types is False:
1823+
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
1824+
return values.cast(target_schema)
1825+
18241826
return values
18251827

18261828
def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field:

tests/io/test_pyarrow.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2550,6 +2550,45 @@ def test_initial_value() -> None:
25502550
assert val.as_py() == 22
25512551

25522552

2553+
def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
2554+
from datetime import datetime, timezone
2555+
2556+
# file is written with timestamp without timezone
2557+
file_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2558+
batch = pa.record_batch(
2559+
[
2560+
pa.array(
2561+
[
2562+
datetime(2025, 8, 14, 12, 0, 0),
2563+
datetime(2025, 8, 14, 13, 0, 0),
2564+
],
2565+
type=pa.timestamp("us"),
2566+
)
2567+
],
2568+
names=["ts_field"],
2569+
)
2570+
2571+
# table is written with timestamp with timezone
2572+
table_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2573+
2574+
actual_result = _to_requested_schema(table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True)
2575+
expected = pa.record_batch(
2576+
[
2577+
pa.array(
2578+
[
2579+
datetime(2025, 8, 14, 12, 0, 0),
2580+
datetime(2025, 8, 14, 13, 0, 0),
2581+
],
2582+
type=pa.timestamp("us", tz=timezone.utc),
2583+
)
2584+
],
2585+
names=["ts_field"],
2586+
)
2587+
2588+
# expect actual_result to have timezone
2589+
assert expected.equals(actual_result)
2590+
2591+
25532592
def test__to_requested_schema_timestamps(
25542593
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
25552594
arrow_table_with_all_timestamp_precisions: pa.Table,

0 commit comments

Comments
 (0)