diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index cd5a957b22..6fa0286282 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -16,9 +16,21 @@ # under the License. from __future__ import annotations +import uuid from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import date, datetime from functools import cached_property, singledispatch -from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar +from typing import ( + Any, + Dict, + Generic, + List, + Optional, + Tuple, + TypeVar, +) +from urllib.parse import quote from pydantic import ( BeforeValidator, @@ -41,8 +53,18 @@ YearTransform, parse_transform, ) -from pyiceberg.typedef import IcebergBaseModel -from pyiceberg.types import NestedField, StructType +from pyiceberg.typedef import IcebergBaseModel, Record +from pyiceberg.types import ( + DateType, + IcebergType, + NestedField, + PrimitiveType, + StructType, + TimestampType, + TimestamptzType, + UUIDType, +) +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros INITIAL_PARTITION_SPEC_ID = 0 PARTITION_FIELD_ID_START: int = 1000 @@ -199,6 +221,23 @@ def partition_type(self, schema: Schema) -> StructType: nested_fields.append(NestedField(field.field_id, field.name, result_type, required=False)) return StructType(*nested_fields) + def partition_to_path(self, data: Record, schema: Schema) -> str: + partition_type = self.partition_type(schema) + field_types = partition_type.fields + + field_strs = [] + value_strs = [] + for pos, value in enumerate(data.record_fields()): + partition_field = self.fields[pos] + value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value) + + value_str = quote(value_str, safe='') + value_strs.append(value_str) + field_strs.append(partition_field.name) + + path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)]) + return path + UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0) @@ -326,3 +365,59 @@ def _visit_partition_field(schema: Schema, field: PartitionField, visitor: Parti return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform)) else: raise ValueError(f"Unknown transform {transform}") + + +@dataclass(frozen=True) +class PartitionFieldValue: + field: PartitionField + value: Any + + +@dataclass(frozen=True) +class PartitionKey: + raw_partition_field_values: List[PartitionFieldValue] + partition_spec: PartitionSpec + schema: Schema + + @cached_property + def partition(self) -> Record: # partition key transformed with iceberg internal representation as input + iceberg_typed_key_values = {} + for raw_partition_field_value in self.raw_partition_field_values: + partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] + if len(partition_fields) != 1: + raise ValueError("partition_fields must contain exactly one field.") + partition_field = partition_fields[0] + iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type + iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value) + transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) + iceberg_typed_key_values[partition_field.name] = transformed_value + return Record(**iceberg_typed_key_values) + + def to_path(self) -> str: + return self.partition_spec.partition_to_path(self.partition, self.schema) + + +@singledispatch +def _to_partition_representation(type: IcebergType, value: Any) -> Any: + return TypeError(f"Unsupported partition field type: {type}") + + +@_to_partition_representation.register(TimestampType) +@_to_partition_representation.register(TimestamptzType) +def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: + return datetime_to_micros(value) if value is not None else None + + +@_to_partition_representation.register(DateType) +def _(type: IcebergType, value: Optional[date]) -> Optional[int]: + return date_to_days(value) if value is not None else None + + +@_to_partition_representation.register(UUIDType) +def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]: + return str(value) if value is not None else None + + +@_to_partition_representation.register(PrimitiveType) +def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]: + return value diff --git a/pyiceberg/transforms.py b/pyiceberg/transforms.py index 9f499a3dd4..e678a77e69 100644 --- a/pyiceberg/transforms.py +++ b/pyiceberg/transforms.py @@ -655,6 +655,11 @@ def _(value: int, _type: IcebergType) -> str: return _int_to_human_string(_type, value) +@_human_string.register(bool) +def _(value: bool, _type: IcebergType) -> str: + return str(value).lower() + + @singledispatch def _int_to_human_string(_type: IcebergType, value: int) -> str: return str(value) diff --git a/tests/conftest.py b/tests/conftest.py index aa66f36046..5b488c70f0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,9 +46,10 @@ import boto3 import pytest from moto import mock_aws +from pyspark.sql import SparkSession from pyiceberg import schema -from pyiceberg.catalog import Catalog +from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.expressions import BoundReference from pyiceberg.io import ( @@ -1925,3 +1926,51 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table: @pytest.fixture def bound_reference_str() -> BoundReference[str]: return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None)) + + +@pytest.fixture(scope="session") +def session_catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture(scope="session") +def spark() -> SparkSession: + import importlib.metadata + import os + + spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) + scala_version = "2.12" + iceberg_version = "1.4.3" + + os.environ["PYSPARK_SUBMIT_ARGS"] = ( + f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," + f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" + ) + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "admin" + os.environ["AWS_SECRET_ACCESS_KEY"] = "password" + + spark = ( + SparkSession.builder.appName("PyIceberg integration test") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") + .config("spark.sql.catalog.integration.uri", "http://localhost:8181") + .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") + .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.integration.s3.path-style-access", "true") + .config("spark.sql.defaultCatalog", "integration") + .getOrCreate() + ) + + return spark diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py new file mode 100644 index 0000000000..12056bac1e --- /dev/null +++ b/tests/integration/test_partitioning_key.py @@ -0,0 +1,772 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +import uuid +from datetime import date, datetime, timedelta, timezone +from decimal import Decimal +from typing import Any, List + +import pytest +from pyspark.sql import SparkSession +from pyspark.sql.utils import AnalysisException + +from pyiceberg.catalog import Catalog +from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import ( + BucketTransform, + DayTransform, + HourTransform, + IdentityTransform, + MonthTransform, + TruncateTransform, + YearTransform, +) +from pyiceberg.typedef import Record +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, + IntegerType, + LongType, + NestedField, + StringType, + TimestampType, + TimestamptzType, + UUIDType, +) + +TABLE_SCHEMA = Schema( + NestedField(field_id=1, name="boolean_field", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), + NestedField(field_id=3, name="string_long_field", field_type=StringType(), required=False), + NestedField(field_id=4, name="int_field", field_type=IntegerType(), required=False), + NestedField(field_id=5, name="long_field", field_type=LongType(), required=False), + NestedField(field_id=6, name="float_field", field_type=FloatType(), required=False), + NestedField(field_id=7, name="double_field", field_type=DoubleType(), required=False), + NestedField(field_id=8, name="timestamp_field", field_type=TimestampType(), required=False), + NestedField(field_id=9, name="timestamptz_field", field_type=TimestamptzType(), required=False), + NestedField(field_id=10, name="date_field", field_type=DateType(), required=False), + # NestedField(field_id=11, name="time", field_type=TimeType(), required=False), + NestedField(field_id=11, name="binary_field", field_type=BinaryType(), required=False), + NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), + NestedField(field_id=13, name="decimal_field", field_type=DecimalType(5, 2), required=False), + NestedField(field_id=14, name="uuid_field", field_type=UUIDType(), required=False), +) + + +identifier = "default.test_table" + + +@pytest.mark.parametrize( + "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", + [ + # # Identity Transform + ( + [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], + [False], + Record(boolean_field=False), + "boolean_field=false", + f"""CREATE TABLE {identifier} ( + boolean_field boolean, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(boolean_field) -- Partitioning by 'boolean_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (false, 'Boolean field set to false'); + """, + ), + ( + [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], + ["sample_string"], + Record(string_field="sample_string"), + "string_field=sample_string", + f"""CREATE TABLE {identifier} ( + string_field string, + another_string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(string_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('sample_string', 'Another string value') + """, + ), + ( + [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], + [42], + Record(int_field=42), + "int_field=42", + f"""CREATE TABLE {identifier} ( + int_field int, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(int_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (42, 'Associated string value for int 42') + """, + ), + ( + [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], + [1234567890123456789], + Record(long_field=1234567890123456789), + "long_field=1234567890123456789", + f"""CREATE TABLE {identifier} ( + long_field bigint, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(long_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (1234567890123456789, 'Associated string value for long 1234567890123456789') + """, + ), + ( + [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], + [3.14], + Record(float_field=3.14), + "float_field=3.14", + # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) + # so justification (compare expected value with spark behavior) would fail. + None, + None, + # f"""CREATE TABLE {identifier} ( + # float_field float, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(float_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (3.14, 'Associated string value for float 3.14') + # """ + ), + ( + [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], + [6.282], + Record(double_field=6.282), + "double_field=6.282", + # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) + # so justification (compare expected value with spark behavior) would fail. + None, + None, + # f"""CREATE TABLE {identifier} ( + # double_field double, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(double_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (6.282, 'Associated string value for double 6.282') + # """ + ), + ( + [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + [datetime(2023, 1, 1, 12, 0, 1, 999)], + Record(timestamp_field=1672574401000999), + "timestamp_field=2023-01-01T12%3A00%3A01.000999", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp_ntz, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(timestamp_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + """, + ), + ( + [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + [datetime(2023, 1, 1, 12, 0, 1)], + Record(timestamp_field=1672574401000000), + "timestamp_field=2023-01-01T12%3A00%3A01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp_ntz, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(timestamp_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + """, + ), + ( + [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + [datetime(2023, 1, 1, 12, 0, 0)], + Record(timestamp_field=1672574400000000), + "timestamp_field=2023-01-01T12%3A00%3A00", + # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' + # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes 2023-01-01T12:00 in the hive partition path when spark writes it (without the seconds). + None, + None, + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp_ntz, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamp_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """ + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field=1672563601000999), + "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", + # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' + # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP) becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00). + None, + None, + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') + # """ + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], + [date(2023, 1, 1)], + Record(date_field=19358), + "date_field=2023-01-01", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(date_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') + """, + ), + ( + [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], + [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], + Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), + "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", + f"""CREATE TABLE {identifier} ( + uuid_field string, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(uuid_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') + """, + ), + ( + [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], + [b'example'], + Record(binary_field=b'example'), + "binary_field=ZXhhbXBsZQ%3D%3D", + f"""CREATE TABLE {identifier} ( + binary_field binary, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(binary_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('example' AS BINARY), 'Associated string value for binary `example`') + """, + ), + ( + [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], + [Decimal('123.45')], + Record(decimal_field=Decimal('123.45')), + "decimal_field=123.45", + f"""CREATE TABLE {identifier} ( + decimal_field decimal(5,2), + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(decimal_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (123.45, 'Associated string value for decimal 123.45') + """, + ), + # # Year Month Day Hour Transform + # Month Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_month=((2023 - 1970) * 12)), + "timestamp_field_month=2023-01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp_ntz, + string_field string + ) + USING iceberg + PARTITIONED BY ( + month(timestamp_field) -- Partitioning by month from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at 2023-01-01 11:55:59.999999'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), + "timestamptz_field_month=2023-01", + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + month(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], + [date(2023, 1, 1)], + Record(date_field_month=((2023 - 1970) * 12)), + "date_field_month=2023-01", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + month(date_field) -- Partitioning by month from 'date_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + # Year Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_year=(2023 - 1970)), + "timestamp_field_year=2023", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(timestamp_field) -- Partitioning by year from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field_year=53), + "timestamptz_field_year=2023", + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], + [date(2023, 1, 1)], + Record(date_field_year=(2023 - 1970)), + "date_field_year=2023", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(date_field) -- Partitioning by year from 'date_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + # # Day Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_day=19358), + "timestamp_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + day(timestamp_field) -- Partitioning by day from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field_day=19358), + "timestamptz_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + day(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], + [date(2023, 1, 1)], + Record(date_field_day=19358), + "date_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + day(date_field) -- Partitioning by day from 'date_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + # Hour Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_hour=464603), + "timestamp_field_hour=2023-01-01-11", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field_hour=464601), + "timestamptz_field_hour=2023-01-01-09", + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + hour(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, + ), + # Truncate Transform + ( + [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], + [12345], + Record(int_field_trunc=12340), + "int_field_trunc=12340", + f"""CREATE TABLE {identifier} ( + int_field int, + string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 + ) + """, + f"""INSERT INTO {identifier} + VALUES + (12345, 'Sample data for int'); + """, + ), + ( + [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], + [2**32 + 1], + Record(bigint_field_trunc=2**32), # 4294967296 + "bigint_field_trunc=4294967296", + f"""CREATE TABLE {identifier} ( + bigint_field bigint, + string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 + ) + """, + f"""INSERT INTO {identifier} + VALUES + (4294967297, 'Sample data for long'); + """, + ), + ( + [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], + ["abcdefg"], + Record(string_field_trunc="abc"), + "string_field_trunc=abc", + f"""CREATE TABLE {identifier} ( + string_field string, + another_string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('abcdefg', 'Another sample for string'); + """, + ), + ( + [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], + [Decimal('678.93')], + Record(decimal_field_trunc=Decimal('678.90')), + "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 + f"""CREATE TABLE {identifier} ( + decimal_field decimal(5,2), + string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(decimal_field, 2) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (678.90, 'Associated string value for decimal 678.90') + """, + ), + ( + [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], + [b'HELLOICEBERG'], + Record(binary_field_trunc=b'HELLOICEBE'), + "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", + f"""CREATE TABLE {identifier} ( + binary_field binary, + string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes + ) + """, + f"""INSERT INTO {identifier} + VALUES + (binary('HELLOICEBERG'), 'Sample data for binary'); + """, + ), + # Bucket Transform + ( + [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], + [10], + Record(int_field_bucket=0), + "int_field_bucket=0", + f"""CREATE TABLE {identifier} ( + int_field int, + string_field string + ) + USING iceberg + PARTITIONED BY ( + bucket(2, int_field) -- Distributing 'int_field' across 2 buckets + ) + """, + f"""INSERT INTO {identifier} + VALUES + (10, 'Integer with value 10'); + """, + ), + # Test multiple field combinations could generate the Partition record and hive partition path correctly + ( + [ + PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), + PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), + ], + [ + datetime(2023, 1, 1, 11, 55, 59, 999999), + date(2023, 1, 1), + ], + Record(timestamp_field_year=53, date_field_day=19358), + "timestamp_field_year=2023/date_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(timestamp_field), + day(date_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); + """, + ), + ], +) +@pytest.mark.integration +def test_partition_key( + session_catalog: Catalog, + spark: SparkSession, + partition_fields: List[PartitionField], + partition_values: List[Any], + expected_partition_record: Record, + expected_hive_partition_path_slice: str, + spark_create_table_sql_for_justification: str, + spark_data_insert_sql_for_justification: str, +) -> None: + partition_field_values = [PartitionFieldValue(field, value) for field, value in zip(partition_fields, partition_values)] + spec = PartitionSpec(*partition_fields) + + key = PartitionKey( + raw_partition_field_values=partition_field_values, + partition_spec=spec, + schema=TABLE_SCHEMA, + ) + + # key.partition is used to write the metadata in DataFile, ManifestFile and all above layers + assert key.partition == expected_partition_record + # key.to_path() generates the hive partitioning part of the to-write parquet file path + assert key.to_path() == expected_hive_partition_path_slice + + # Justify expected values are not made up but conform to spark behaviors + if spark_create_table_sql_for_justification is not None and spark_data_insert_sql_for_justification is not None: + try: + spark.sql(f"drop table {identifier}") + except AnalysisException: + pass + + spark.sql(spark_create_table_sql_for_justification) + spark.sql(spark_data_insert_sql_for_justification) + + iceberg_table = session_catalog.load_table(identifier=identifier) + snapshot = iceberg_table.current_snapshot() + assert snapshot + spark_partition_for_justification = ( + snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition + ) + spark_path_for_justification = ( + snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path + ) + assert spark_partition_for_justification == expected_partition_record + assert expected_hive_partition_path_slice in spark_path_for_justification diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index fa5e93d925..62075dfa60 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -115,20 +115,6 @@ def catalog() -> Catalog: ) -@pytest.fixture(scope="session") -def session_catalog() -> Catalog: - return load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - - @pytest.fixture(scope="session") def pa_schema() -> pa.Schema: return pa.schema([ @@ -252,40 +238,6 @@ def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_nu assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" -@pytest.fixture(scope="session") -def spark() -> SparkSession: - import importlib.metadata - import os - - spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) - scala_version = "2.12" - iceberg_version = "1.4.3" - - os.environ["PYSPARK_SUBMIT_ARGS"] = ( - f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," - f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" - ) - os.environ["AWS_REGION"] = "us-east-1" - os.environ["AWS_ACCESS_KEY_ID"] = "admin" - os.environ["AWS_SECRET_ACCESS_KEY"] = "password" - - spark = ( - SparkSession.builder.appName("PyIceberg integration test") - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") - .config("spark.sql.catalog.integration.uri", "http://localhost:8181") - .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") - .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") - .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") - .config("spark.sql.catalog.integration.s3.path-style-access", "true") - .config("spark.sql.defaultCatalog", "integration") - .getOrCreate() - ) - - return spark - - @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_query_count(spark: SparkSession, format_version: int) -> None: