From e380cfa9082fcdd2d4eaf66f65248b7cce1ff800 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Fri, 16 Feb 2024 19:57:54 +0000 Subject: [PATCH 1/6] PartitionKey Class And Tests --- pyiceberg/partitioning.py | 90 ++- tests/integration/test_partitioning_key.py | 683 +++++++++++++++++++++ 2 files changed, 770 insertions(+), 3 deletions(-) create mode 100644 tests/integration/test_partitioning_key.py diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index cd5a957b22..a14c9f8c99 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -17,8 +17,18 @@ from __future__ import annotations from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import date, datetime from functools import cached_property, singledispatch -from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar +from typing import ( + Any, + Dict, + Generic, + List, + Optional, + Tuple, + TypeVar, +) from pydantic import ( BeforeValidator, @@ -41,8 +51,9 @@ YearTransform, parse_transform, ) -from pyiceberg.typedef import IcebergBaseModel -from pyiceberg.types import NestedField, StructType +from pyiceberg.typedef import IcebergBaseModel, Record +from pyiceberg.types import DateType, IcebergType, NestedField, PrimitiveType, StructType, TimestampType, TimestamptzType +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros INITIAL_PARTITION_SPEC_ID = 0 PARTITION_FIELD_ID_START: int = 1000 @@ -199,6 +210,28 @@ def partition_type(self, schema: Schema) -> StructType: nested_fields.append(NestedField(field.field_id, field.name, result_type, required=False)) return StructType(*nested_fields) + def partition_to_path(self, data: Record, schema: Schema) -> str: + partition_type = self.partition_type(schema) + field_types = partition_type.fields + + pos = 0 + field_strs = [] + value_strs = [] + for field_name in data._position_to_field_name: + value = getattr(data, field_name) + + partition_field = self.fields[pos] # partition field + value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value) + from urllib.parse import quote + + value_str = quote(value_str, safe='') + value_strs.append(value_str) + field_strs.append(partition_field.name) + pos += 1 + + path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)]) + return path + UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0) @@ -326,3 +359,54 @@ def _visit_partition_field(schema: Schema, field: PartitionField, visitor: Parti return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform)) else: raise ValueError(f"Unknown transform {transform}") + + +@dataclass(frozen=True) +class PartitionFieldValue: + field: PartitionField + value: Any + + +@dataclass(frozen=True) +class PartitionKey: + raw_partition_field_values: list[PartitionFieldValue] + partition_spec: PartitionSpec + schema: Schema + from functools import cached_property + + @cached_property + def partition(self) -> Record: # partition key in iceberg type + iceberg_typed_key_values = {} + for raw_partition_field_value in self.raw_partition_field_values: + partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] + assert len(partition_fields) == 1 + partition_field = partition_fields[0] + iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type + _iceberg_typed_value = iceberg_typed_value(iceberg_type, raw_partition_field_value.value) + transformed_value = partition_field.transform.transform(iceberg_type)(_iceberg_typed_value) + iceberg_typed_key_values[partition_field.name] = transformed_value + return Record(**iceberg_typed_key_values) + + def to_path(self) -> str: + return self.partition_spec.partition_to_path(self.partition, self.schema) + + +@singledispatch +def iceberg_typed_value(type: IcebergType, value: Any) -> Any: + return TypeError(f"Unsupported partition field type: {type}") + + +@iceberg_typed_value.register(TimestampType) +@iceberg_typed_value.register(TimestamptzType) +def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: + return datetime_to_micros(value) if value is not None else None + + +@iceberg_typed_value.register(DateType) +def _(type: IcebergType, value: Optional[date]) -> Optional[int]: + return date_to_days(value) if value is not None else None + + +@iceberg_typed_value.register(PrimitiveType) +def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]: + return value diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py new file mode 100644 index 0000000000..f25044868e --- /dev/null +++ b/tests/integration/test_partitioning_key.py @@ -0,0 +1,683 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from datetime import date, datetime +from typing import Any + +import pytest +import pytz +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.exceptions import NamespaceAlreadyExistsError +from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import ( + BucketTransform, + DayTransform, + HourTransform, + IdentityTransform, + MonthTransform, + TruncateTransform, + YearTransform, +) +from pyiceberg.typedef import Record +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DoubleType, + FixedType, + FloatType, + IntegerType, + LongType, + NestedField, + StringType, + TimestampType, + TimestamptzType, +) + + +@pytest.fixture() +def catalog() -> Catalog: + catalog = load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + return catalog + + +@pytest.fixture(scope="session") +def session_catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture(scope="session") +def spark() -> SparkSession: + import importlib.metadata + import os + + spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) + scala_version = "2.12" + iceberg_version = "1.4.3" + + os.environ["PYSPARK_SUBMIT_ARGS"] = ( + f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," + f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" + ) + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "admin" + os.environ["AWS_SECRET_ACCESS_KEY"] = "password" + + spark = ( + SparkSession.builder.appName("PyIceberg integration test") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") + .config("spark.sql.catalog.integration.uri", "http://localhost:8181") + .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") + .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.integration.s3.path-style-access", "true") + .config("spark.sql.defaultCatalog", "integration") + .getOrCreate() + ) + + return spark + + +TABLE_SCHEMA = Schema( + NestedField(field_id=1, name="boolean_field", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), + NestedField(field_id=3, name="string_long_field", field_type=StringType(), required=False), + NestedField(field_id=4, name="int_field", field_type=IntegerType(), required=False), + NestedField(field_id=5, name="long_field", field_type=LongType(), required=False), + NestedField(field_id=6, name="float_field", field_type=FloatType(), required=False), + NestedField(field_id=7, name="double_field", field_type=DoubleType(), required=False), + NestedField(field_id=8, name="timestamp_field", field_type=TimestampType(), required=False), + NestedField(field_id=9, name="timestamptz_field", field_type=TimestamptzType(), required=False), + NestedField(field_id=10, name="date_field", field_type=DateType(), required=False), + # NestedField(field_id=11, name="time", field_type=TimeType(), required=False), + # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), + NestedField(field_id=11, name="binary_field", field_type=BinaryType(), required=False), + NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), +) + + +identifier = "default.test_table" + + +@pytest.mark.parametrize( + "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", + [ + # Identity Transform + ( + [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], + [False], + Record(boolean_field=False), + "boolean_field=False", + # pyiceberg writes False while spark writes false, so justification (compare expected value with spark behavior) would fail. + None, + None, + # f"""CREATE TABLE {identifier} ( + # boolean_field boolean, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(boolean_field) -- Partitioning by 'boolean_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (false, 'Boolean field set to false'); + # """ + ), + ( + [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], + ["sample_string"], + Record(string_field="sample_string"), + "string_field=sample_string", + f"""CREATE TABLE {identifier} ( + string_field string, + another_string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(string_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('sample_string', 'Another string value') + """, + ), + ( + [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], + [42], + Record(int_field=42), + "int_field=42", + f"""CREATE TABLE {identifier} ( + int_field int, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(int_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (42, 'Associated string value for int 42') + """, + ), + ( + [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], + [1234567890123456789], + Record(long_field=1234567890123456789), + "long_field=1234567890123456789", + f"""CREATE TABLE {identifier} ( + long_field bigint, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(long_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (1234567890123456789, 'Associated string value for long 1234567890123456789') + """, + ), + ( + [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], + [3.14], + Record(float_field=3.14), + "float_field=3.14", + # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) + # so justification (compare expected value with spark behavior) would fail. + None, + None, + # f"""CREATE TABLE {identifier} ( + # float_field float, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(float_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (3.14, 'Associated string value for float 3.14') + # """ + ), + ( + [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], + [6.282], + Record(double_field=6.282), + "double_field=6.282", + # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) + # so justification (compare expected value with spark behavior) would fail. + None, + None, + # f"""CREATE TABLE {identifier} ( + # double_field double, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(double_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (6.282, 'Associated string value for double 6.282') + # """ + ), + ( + [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + [datetime(2023, 1, 1, 12, 0, 0)], + Record(timestamp_field=1672574400000000), + "timestamp_field=2023-01-01T12%3A00%3A00", + # spark writes differently as pyiceberg, Record[timestamp_field=1672574400000000] path:timestamp_field=2023-01-01T12%3A00Z (the Z is the difference) + # so justification (compare expected value with spark behavior) would fail. + None, + None, + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamp_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """ + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], + [date(2023, 1, 1)], + Record(date_field=19358), + "date_field=2023-01-01", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(date_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') + """, + ), + ( + [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], + [b'example'], + Record(binary_field=b'example'), + "binary_field=ZXhhbXBsZQ%3D%3D", + f"""CREATE TABLE {identifier} ( + binary_field binary, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(binary_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('example' AS BINARY), 'Associated string value for binary `example`') + """, + ), + # Year Month Day Hour Transform + # Month Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_month=((2023 - 1970) * 12)), + "timestamp_field_month=2023-01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + month(timestamp_field) -- Partitioning by month from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], + [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + Record(timestamptz_field_month=((2023 - 1970) * 12)), + "timestamptz_field_month=2023-01", + # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + None, + None, + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], + [date(2023, 1, 1)], + Record(date_field_month=((2023 - 1970) * 12)), + "date_field_month=2023-01", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + month(date_field) -- Partitioning by month from 'date_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + # Year Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_year=(2023 - 1970)), + "timestamp_field_year=2023", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(timestamp_field) -- Partitioning by year from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], + [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + Record(timestamptz_field_year=53), + "timestamptz_field_year=2023", + # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + None, + None, + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], + [date(2023, 1, 1)], + Record(date_field_year=(2023 - 1970)), + "date_field_year=2023", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(date_field) -- Partitioning by year from 'date_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + # # Day Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_day=19358), + "timestamp_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + day(timestamp_field) -- Partitioning by day from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], + [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + Record(timestamptz_field_day=19358), + "timestamptz_field_day=2023-01-01", + # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + None, + None, + ), + ( + [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], + [date(2023, 1, 1)], + Record(date_field_day=19358), + "date_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + day(date_field) -- Partitioning by day from 'date_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + """, + ), + # Hour Transform + ( + [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], + [datetime(2023, 1, 1, 11, 55, 59, 999999)], + Record(timestamp_field_hour=464603), + "timestamp_field_hour=2023-01-01-11", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); + """, + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], + [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + Record(timestamptz_field_hour=464608), # 464608 = 464603 + 5, new york winter day light saving time + "timestamptz_field_hour=2023-01-01-16", + # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + None, + None, + ), + # Truncate Transform + ( + [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], + [12345], + Record(int_field_trunc=12340), + "int_field_trunc=12340", + f"""CREATE TABLE {identifier} ( + int_field int, + string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 + ) + """, + f"""INSERT INTO {identifier} + VALUES + (12345, 'Sample data for int'); + """, + ), + ( + [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], + [2**32 + 1], + Record(bigint_field_trunc=2**32), # 4294967296 + "bigint_field_trunc=4294967296", + f"""CREATE TABLE {identifier} ( + bigint_field bigint, + other_data string + ) + USING iceberg + PARTITIONED BY ( + truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 + ) + """, + f"""INSERT INTO {identifier} + VALUES + (4294967297, 'Sample data for long'); + """, + ), + ( + [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], + ["abcdefg"], + Record(string_field_trunc="abc"), + "string_field_trunc=abc", + f"""CREATE TABLE {identifier} ( + string_field string, + other_data string + ) + USING iceberg + PARTITIONED BY ( + truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('abcdefg', 'Another sample for string'); + """, + ), + # it seems the transform.tohumanstring does take a bytes type which means i do not need to do extra conversion in iceberg_typed_value() for BinaryType + ( + [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], + [b'HELLOICEBERG'], + Record(binary_field_trunc=b'HELLOICEBE'), + "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", + f"""CREATE TABLE {identifier} ( + binary_field binary, + other_data string + ) + USING iceberg + PARTITIONED BY ( + truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes + ) + """, + f"""INSERT INTO {identifier} + VALUES + (binary('HELLOICEBERG'), 'Sample data for binary'); + """, + ), + # Bucket Transform + ( + [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], + [10], + Record(int_field_bucket=0), + "int_field_bucket=0", + f"""CREATE TABLE {identifier} ( + int_field int, + other_data string + ) + USING iceberg + PARTITIONED BY ( + bucket(2, int_field) -- Distributing 'int_field' across 2 buckets + ) + """, + f"""INSERT INTO {identifier} + VALUES + (10, 'Integer with value 10'); + """, + ), + # Test multiple field combinations could generate the Partition record and hive partition path correctly + ( + [ + PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), + PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), + ], + [ + datetime(2023, 1, 1, 11, 55, 59, 999999), + date(2023, 1, 1), + ], + Record(timestamp_field_year=53, date_field_day=19358), + "timestamp_field_year=2023/date_field_day=2023-01-01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp, + date_field date, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(timestamp_field), + day(date_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); + """, + ), + ], +) +@pytest.mark.integration +def test_partition_key( + session_catalog: Catalog, + spark: SparkSession, + partition_fields: list[PartitionField], + partition_values: list[Any], + expected_partition_record: Record, + expected_hive_partition_path_slice: str, + spark_create_table_sql_for_justification: str, + spark_data_insert_sql_for_justification: str, +) -> None: + partition_field_values = [PartitionFieldValue(field, value) for field, value in zip(partition_fields, partition_values)] + spec = PartitionSpec(*partition_fields) + + key = PartitionKey( + raw_partition_field_values=partition_field_values, + partition_spec=spec, + schema=TABLE_SCHEMA, + ) + # print(f"{key.partition=}") + # print(f"{key.to_path()=}") + # this affects the metadata in DataFile and all above layers + assert key.partition == expected_partition_record + # this affects the hive partitioning part in the parquet file path + assert key.to_path() == expected_hive_partition_path_slice + + from pyspark.sql.utils import AnalysisException + + # verify expected values are not made up but conform to spark behaviors + if spark_create_table_sql_for_justification is not None and spark_data_insert_sql_for_justification is not None: + try: + spark.sql(f"drop table {identifier}") + except AnalysisException: + pass + + spark.sql(spark_create_table_sql_for_justification) + spark.sql(spark_data_insert_sql_for_justification) + + iceberg_table = session_catalog.load_table(identifier=identifier) + snapshot = iceberg_table.current_snapshot() + assert snapshot + verify_partition = snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition + verify_path = snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path + # print(f"{verify_partition=}") + # print(f"{verify_path=}") + assert verify_partition == expected_partition_record + assert expected_hive_partition_path_slice in verify_path From da14e6990f3e9aa2008ee27ca946498849a4c824 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 21 Feb 2024 04:53:33 +0000 Subject: [PATCH 2/6] fix linting; add decimal input transform test --- pyiceberg/partitioning.py | 19 +++-- tests/integration/test_partitioning_key.py | 81 ++++++++++++++++------ 2 files changed, 69 insertions(+), 31 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index a14c9f8c99..ef68f86b6f 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -29,6 +29,7 @@ Tuple, TypeVar, ) +from urllib.parse import quote from pydantic import ( BeforeValidator, @@ -222,7 +223,6 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: partition_field = self.fields[pos] # partition field value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value) - from urllib.parse import quote value_str = quote(value_str, safe='') value_strs.append(value_str) @@ -369,10 +369,9 @@ class PartitionFieldValue: @dataclass(frozen=True) class PartitionKey: - raw_partition_field_values: list[PartitionFieldValue] + raw_partition_field_values: List[PartitionFieldValue] partition_spec: PartitionSpec schema: Schema - from functools import cached_property @cached_property def partition(self) -> Record: # partition key in iceberg type @@ -382,8 +381,8 @@ def partition(self) -> Record: # partition key in iceberg type assert len(partition_fields) == 1 partition_field = partition_fields[0] iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type - _iceberg_typed_value = iceberg_typed_value(iceberg_type, raw_partition_field_value.value) - transformed_value = partition_field.transform.transform(iceberg_type)(_iceberg_typed_value) + iceberg_typed_value = _to_iceberg_type(iceberg_type, raw_partition_field_value.value) + transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) iceberg_typed_key_values[partition_field.name] = transformed_value return Record(**iceberg_typed_key_values) @@ -392,21 +391,21 @@ def to_path(self) -> str: @singledispatch -def iceberg_typed_value(type: IcebergType, value: Any) -> Any: +def _to_iceberg_type(type: IcebergType, value: Any) -> Any: return TypeError(f"Unsupported partition field type: {type}") -@iceberg_typed_value.register(TimestampType) -@iceberg_typed_value.register(TimestamptzType) +@_to_iceberg_type.register(TimestampType) +@_to_iceberg_type.register(TimestamptzType) def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: return datetime_to_micros(value) if value is not None else None -@iceberg_typed_value.register(DateType) +@_to_iceberg_type.register(DateType) def _(type: IcebergType, value: Optional[date]) -> Optional[int]: return date_to_days(value) if value is not None else None -@iceberg_typed_value.register(PrimitiveType) +@_to_iceberg_type.register(PrimitiveType) def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]: return value diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index f25044868e..76a3182f33 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -16,11 +16,13 @@ # under the License. # pylint:disable=redefined-outer-name from datetime import date, datetime -from typing import Any +from decimal import Decimal +from typing import Any, List import pytest import pytz from pyspark.sql import SparkSession +from pyspark.sql.utils import AnalysisException from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.exceptions import NamespaceAlreadyExistsError @@ -40,6 +42,7 @@ BinaryType, BooleanType, DateType, + DecimalType, DoubleType, FixedType, FloatType, @@ -136,6 +139,7 @@ def spark() -> SparkSession: # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), NestedField(field_id=11, name="binary_field", field_type=BinaryType(), required=False), NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), + NestedField(field_id=13, name="decimal", field_type=DecimalType(5, 2), required=False), ) @@ -332,6 +336,25 @@ def spark() -> SparkSession: (CAST('example' AS BINARY), 'Associated string value for binary `example`') """, ), + ( + [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], + [Decimal('123.45')], + Record(decimal_field=Decimal('123.45')), + "decimal_field=123.45", + f"""CREATE TABLE {identifier} ( + decimal_field decimal(5,2), + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(decimal_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (123.45, 'Associated string value for decimal 123.45') + """, + ), # Year Month Day Hour Transform # Month Transform ( @@ -533,7 +556,7 @@ def spark() -> SparkSession: "bigint_field_trunc=4294967296", f"""CREATE TABLE {identifier} ( bigint_field bigint, - other_data string + string_field string ) USING iceberg PARTITIONED BY ( @@ -552,7 +575,7 @@ def spark() -> SparkSession: "string_field_trunc=abc", f"""CREATE TABLE {identifier} ( string_field string, - other_data string + another_string_field string ) USING iceberg PARTITIONED BY ( @@ -564,7 +587,25 @@ def spark() -> SparkSession: ('abcdefg', 'Another sample for string'); """, ), - # it seems the transform.tohumanstring does take a bytes type which means i do not need to do extra conversion in iceberg_typed_value() for BinaryType + ( + [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], + [Decimal('678.93')], + Record(decimal_field_trunc=Decimal('678.90')), + "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 + f"""CREATE TABLE {identifier} ( + decimal_field decimal(5,2), + string_field string + ) + USING iceberg + PARTITIONED BY ( + truncate(decimal_field, 2) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (678.90, 'Associated string value for decimal 678.90') + """, + ), ( [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], [b'HELLOICEBERG'], @@ -572,7 +613,7 @@ def spark() -> SparkSession: "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", f"""CREATE TABLE {identifier} ( binary_field binary, - other_data string + string_field string ) USING iceberg PARTITIONED BY ( @@ -592,7 +633,7 @@ def spark() -> SparkSession: "int_field_bucket=0", f"""CREATE TABLE {identifier} ( int_field int, - other_data string + string_field string ) USING iceberg PARTITIONED BY ( @@ -638,8 +679,8 @@ def spark() -> SparkSession: def test_partition_key( session_catalog: Catalog, spark: SparkSession, - partition_fields: list[PartitionField], - partition_values: list[Any], + partition_fields: List[PartitionField], + partition_values: List[Any], expected_partition_record: Record, expected_hive_partition_path_slice: str, spark_create_table_sql_for_justification: str, @@ -653,16 +694,12 @@ def test_partition_key( partition_spec=spec, schema=TABLE_SCHEMA, ) - # print(f"{key.partition=}") - # print(f"{key.to_path()=}") - # this affects the metadata in DataFile and all above layers + # key.partition is used to write the metadata in DataFile, ManifestFile and all above layers assert key.partition == expected_partition_record - # this affects the hive partitioning part in the parquet file path + # key.to_path() generates the hive partitioning part of the to-write parquet file path assert key.to_path() == expected_hive_partition_path_slice - from pyspark.sql.utils import AnalysisException - - # verify expected values are not made up but conform to spark behaviors + # Justify expected values are not made up but conform to spark behaviors if spark_create_table_sql_for_justification is not None and spark_data_insert_sql_for_justification is not None: try: spark.sql(f"drop table {identifier}") @@ -675,9 +712,11 @@ def test_partition_key( iceberg_table = session_catalog.load_table(identifier=identifier) snapshot = iceberg_table.current_snapshot() assert snapshot - verify_partition = snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition - verify_path = snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path - # print(f"{verify_partition=}") - # print(f"{verify_path=}") - assert verify_partition == expected_partition_record - assert expected_hive_partition_path_slice in verify_path + spark_partition_for_justification = ( + snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition + ) + spark_path_for_justification = ( + snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path + ) + assert spark_partition_for_justification == expected_partition_record + assert expected_hive_partition_path_slice in spark_path_for_justification From 0a062cb7478b56e2adbd38e13d80222692309781 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Thu, 22 Feb 2024 16:34:24 +0000 Subject: [PATCH 3/6] fix bool to path lower case; fix timestamptz tests; other pr comments --- pyiceberg/partitioning.py | 9 +- pyiceberg/transforms.py | 5 + pyiceberg/typedef.py | 3 +- tests/integration/conftest.py | 67 +++++ tests/integration/test_partitioning_key.py | 296 +++++++++++++-------- tests/integration/test_writes.py | 48 ---- 6 files changed, 263 insertions(+), 165 deletions(-) create mode 100644 tests/integration/conftest.py diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index ef68f86b6f..c34272b432 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -215,19 +215,15 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: partition_type = self.partition_type(schema) field_types = partition_type.fields - pos = 0 field_strs = [] value_strs = [] - for field_name in data._position_to_field_name: - value = getattr(data, field_name) - + for pos, value in enumerate(data.record_fields()): partition_field = self.fields[pos] # partition field value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value) value_str = quote(value_str, safe='') value_strs.append(value_str) field_strs.append(partition_field.name) - pos += 1 path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)]) return path @@ -378,7 +374,8 @@ def partition(self) -> Record: # partition key in iceberg type iceberg_typed_key_values = {} for raw_partition_field_value in self.raw_partition_field_values: partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] - assert len(partition_fields) == 1 + if len(partition_fields) != 1: + raise ValueError("partition_fields must contain exactly one field.") partition_field = partition_fields[0] iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type iceberg_typed_value = _to_iceberg_type(iceberg_type, raw_partition_field_value.value) diff --git a/pyiceberg/transforms.py b/pyiceberg/transforms.py index 9f499a3dd4..e678a77e69 100644 --- a/pyiceberg/transforms.py +++ b/pyiceberg/transforms.py @@ -655,6 +655,11 @@ def _(value: int, _type: IcebergType) -> str: return _int_to_human_string(_type, value) +@_human_string.register(bool) +def _(value: bool, _type: IcebergType) -> str: + return str(value).lower() + + @singledispatch def _int_to_human_string(_type: IcebergType, value: int) -> str: return str(value) diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index 56a3d3c72d..7aa148b887 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -17,6 +17,7 @@ from __future__ import annotations from abc import abstractmethod +from datetime import date, datetime from decimal import Decimal from functools import lru_cache from typing import ( @@ -77,7 +78,7 @@ def __missing__(self, key: K) -> V: RecursiveDict = Dict[str, Union[str, "RecursiveDict"]] # Represents the literal value -L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, covariant=True) +L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, date, datetime, covariant=True) @runtime_checkable diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000000..cc0f6c847a --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from pyiceberg.catalog import Catalog, load_catalog +import pytest +from pyspark.sql import SparkSession + +@pytest.fixture(scope="session") +def session_catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture(scope="session") +def spark() -> SparkSession: + import importlib.metadata + import os + + spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) + scala_version = "2.12" + iceberg_version = "1.4.3" + + os.environ["PYSPARK_SUBMIT_ARGS"] = ( + f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," + f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" + ) + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "admin" + os.environ["AWS_SECRET_ACCESS_KEY"] = "password" + + spark = ( + SparkSession.builder.appName("PyIceberg integration test") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") + .config("spark.sql.catalog.integration.uri", "http://localhost:8181") + .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") + .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.integration.s3.path-style-access", "true") + .config("spark.sql.defaultCatalog", "integration") + .getOrCreate() + ) + + return spark \ No newline at end of file diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index 76a3182f33..b8815c7d96 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -15,17 +15,15 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from datetime import date, datetime +from datetime import date, datetime, timedelta, timezone from decimal import Decimal from typing import Any, List import pytest -import pytz from pyspark.sql import SparkSession from pyspark.sql.utils import AnalysisException -from pyiceberg.catalog import Catalog, load_catalog -from pyiceberg.exceptions import NamespaceAlreadyExistsError +from pyiceberg.catalog import Catalog from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.transforms import ( @@ -54,74 +52,52 @@ TimestamptzType, ) - -@pytest.fixture() -def catalog() -> Catalog: - catalog = load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - - try: - catalog.create_namespace("default") - except NamespaceAlreadyExistsError: - pass - - return catalog +# @pytest.fixture(scope="session") +# def session_catalog() -> Catalog: +# return load_catalog( +# "local", +# **{ +# "type": "rest", +# "uri": "http://localhost:8181", +# "s3.endpoint": "http://localhost:9000", +# "s3.access-key-id": "admin", +# "s3.secret-access-key": "password", +# }, +# ) -@pytest.fixture(scope="session") -def session_catalog() -> Catalog: - return load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - +# @pytest.fixture(scope="session") +# def spark() -> SparkSession: +# import importlib.metadata +# import os -@pytest.fixture(scope="session") -def spark() -> SparkSession: - import importlib.metadata - import os +# spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) +# scala_version = "2.12" +# iceberg_version = "1.4.3" - spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) - scala_version = "2.12" - iceberg_version = "1.4.3" - - os.environ["PYSPARK_SUBMIT_ARGS"] = ( - f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," - f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" - ) - os.environ["AWS_REGION"] = "us-east-1" - os.environ["AWS_ACCESS_KEY_ID"] = "admin" - os.environ["AWS_SECRET_ACCESS_KEY"] = "password" +# os.environ["PYSPARK_SUBMIT_ARGS"] = ( +# f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," +# f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" +# ) +# os.environ["AWS_REGION"] = "us-east-1" +# os.environ["AWS_ACCESS_KEY_ID"] = "admin" +# os.environ["AWS_SECRET_ACCESS_KEY"] = "password" - spark = ( - SparkSession.builder.appName("PyIceberg integration test") - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") - .config("spark.sql.catalog.integration.uri", "http://localhost:8181") - .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") - .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") - .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") - .config("spark.sql.catalog.integration.s3.path-style-access", "true") - .config("spark.sql.defaultCatalog", "integration") - .getOrCreate() - ) +# spark = ( +# SparkSession.builder.appName("PyIceberg integration test") +# .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") +# .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") +# .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") +# .config("spark.sql.catalog.integration.uri", "http://localhost:8181") +# .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") +# .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") +# .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") +# .config("spark.sql.catalog.integration.s3.path-style-access", "true") +# .config("spark.sql.defaultCatalog", "integration") +# .getOrCreate() +# ) - return spark +# return spark TABLE_SCHEMA = Schema( @@ -149,28 +125,25 @@ def spark() -> SparkSession: @pytest.mark.parametrize( "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", [ - # Identity Transform + # # Identity Transform ( [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], [False], Record(boolean_field=False), - "boolean_field=False", - # pyiceberg writes False while spark writes false, so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # boolean_field boolean, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(boolean_field) -- Partitioning by 'boolean_field' - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (false, 'Boolean field set to false'); - # """ + "boolean_field=false", + f"""CREATE TABLE {identifier} ( + boolean_field boolean, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(boolean_field) -- Partitioning by 'boolean_field' + ) + """, + f"""INSERT INTO {identifier} + VALUES + (false, 'Boolean field set to false'); + """, ), ( [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], @@ -275,17 +248,56 @@ def spark() -> SparkSession: # (6.282, 'Associated string value for double 6.282') # """ ), + ( + [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + [datetime(2023, 1, 1, 12, 0, 1, 999)], + Record(timestamp_field=1672574401000999), + "timestamp_field=2023-01-01T12%3A00%3A01.000999", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp_ntz, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(timestamp_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + """, + ), + ( + [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + [datetime(2023, 1, 1, 12, 0, 1)], + Record(timestamp_field=1672574401000000), + "timestamp_field=2023-01-01T12%3A00%3A01", + f"""CREATE TABLE {identifier} ( + timestamp_field timestamp_ntz, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(timestamp_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + """, + ), ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 0)], Record(timestamp_field=1672574400000000), "timestamp_field=2023-01-01T12%3A00%3A00", - # spark writes differently as pyiceberg, Record[timestamp_field=1672574400000000] path:timestamp_field=2023-01-01T12%3A00Z (the Z is the difference) - # so justification (compare expected value with spark behavior) would fail. + # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' + # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes 2023-01-01T12:00 in the hive partition path when spark writes it (without the seconds). None, None, # f"""CREATE TABLE {identifier} ( - # timestamp_field timestamp, + # timestamp_field timestamp_ntz, # string_field string # ) # USING iceberg @@ -295,7 +307,31 @@ def spark() -> SparkSession: # """, # f"""INSERT INTO {identifier} # VALUES - # (CAST('2023-01-01 12:00:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01T12:00:00') + # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """ + ), + ( + [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field=1672563601000999), + "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", + # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' + # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP) becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00). + None, + None, + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') # """ ), ( @@ -355,7 +391,7 @@ def spark() -> SparkSession: (123.45, 'Associated string value for decimal 123.45') """, ), - # Year Month Day Hour Transform + # # Year Month Day Hour Transform # Month Transform ( [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], @@ -363,7 +399,7 @@ def spark() -> SparkSession: Record(timestamp_field_month=((2023 - 1970) * 12)), "timestamp_field_month=2023-01", f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, + timestamp_field timestamp_ntz, string_field string ) USING iceberg @@ -373,17 +409,27 @@ def spark() -> SparkSession: """, f"""INSERT INTO {identifier} VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at 2023-01-01 11:55:59.999999'); """, ), ( [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], - Record(timestamptz_field_month=((2023 - 1970) * 12)), + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), "timestamptz_field_month=2023-01", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + month(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, ), ( [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], @@ -426,12 +472,22 @@ def spark() -> SparkSession: ), ( [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], Record(timestamptz_field_year=53), "timestamptz_field_year=2023", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + year(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, ), ( [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], @@ -474,12 +530,22 @@ def spark() -> SparkSession: ), ( [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], Record(timestamptz_field_day=19358), "timestamptz_field_day=2023-01-01", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + day(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, ), ( [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], @@ -522,12 +588,22 @@ def spark() -> SparkSession: ), ( [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], - Record(timestamptz_field_hour=464608), # 464608 = 464603 + 5, new york winter day light saving time - "timestamptz_field_hour=2023-01-01-16", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, + [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + Record(timestamptz_field_hour=464601), + "timestamptz_field_hour=2023-01-01-09", + f"""CREATE TABLE {identifier} ( + timestamptz_field timestamp, + string_field string + ) + USING iceberg + PARTITIONED BY ( + hour(timestamptz_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + """, ), # Truncate Transform ( diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index fa5e93d925..62075dfa60 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -115,20 +115,6 @@ def catalog() -> Catalog: ) -@pytest.fixture(scope="session") -def session_catalog() -> Catalog: - return load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - - @pytest.fixture(scope="session") def pa_schema() -> pa.Schema: return pa.schema([ @@ -252,40 +238,6 @@ def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_nu assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" -@pytest.fixture(scope="session") -def spark() -> SparkSession: - import importlib.metadata - import os - - spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) - scala_version = "2.12" - iceberg_version = "1.4.3" - - os.environ["PYSPARK_SUBMIT_ARGS"] = ( - f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," - f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" - ) - os.environ["AWS_REGION"] = "us-east-1" - os.environ["AWS_ACCESS_KEY_ID"] = "admin" - os.environ["AWS_SECRET_ACCESS_KEY"] = "password" - - spark = ( - SparkSession.builder.appName("PyIceberg integration test") - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") - .config("spark.sql.catalog.integration.uri", "http://localhost:8181") - .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") - .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") - .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") - .config("spark.sql.catalog.integration.s3.path-style-access", "true") - .config("spark.sql.defaultCatalog", "integration") - .getOrCreate() - ) - - return spark - - @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_query_count(spark: SparkSession, format_version: int) -> None: From f9be89f1dae172c15219f40d7a2bcfd514effe49 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Thu, 22 Feb 2024 16:56:35 +0000 Subject: [PATCH 4/6] clean up --- pyiceberg/typedef.py | 3 +- tests/conftest.py | 51 +++++++++++++++++++++++++- tests/integration/conftest.py | 67 ----------------------------------- 3 files changed, 51 insertions(+), 70 deletions(-) delete mode 100644 tests/integration/conftest.py diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index 7aa148b887..56a3d3c72d 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -17,7 +17,6 @@ from __future__ import annotations from abc import abstractmethod -from datetime import date, datetime from decimal import Decimal from functools import lru_cache from typing import ( @@ -78,7 +77,7 @@ def __missing__(self, key: K) -> V: RecursiveDict = Dict[str, Union[str, "RecursiveDict"]] # Represents the literal value -L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, date, datetime, covariant=True) +L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, covariant=True) @runtime_checkable diff --git a/tests/conftest.py b/tests/conftest.py index aa66f36046..5b488c70f0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,9 +46,10 @@ import boto3 import pytest from moto import mock_aws +from pyspark.sql import SparkSession from pyiceberg import schema -from pyiceberg.catalog import Catalog +from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.expressions import BoundReference from pyiceberg.io import ( @@ -1925,3 +1926,51 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table: @pytest.fixture def bound_reference_str() -> BoundReference[str]: return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None)) + + +@pytest.fixture(scope="session") +def session_catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture(scope="session") +def spark() -> SparkSession: + import importlib.metadata + import os + + spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) + scala_version = "2.12" + iceberg_version = "1.4.3" + + os.environ["PYSPARK_SUBMIT_ARGS"] = ( + f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," + f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" + ) + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "admin" + os.environ["AWS_SECRET_ACCESS_KEY"] = "password" + + spark = ( + SparkSession.builder.appName("PyIceberg integration test") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") + .config("spark.sql.catalog.integration.uri", "http://localhost:8181") + .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") + .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.integration.s3.path-style-access", "true") + .config("spark.sql.defaultCatalog", "integration") + .getOrCreate() + ) + + return spark diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py deleted file mode 100644 index cc0f6c847a..0000000000 --- a/tests/integration/conftest.py +++ /dev/null @@ -1,67 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# pylint:disable=redefined-outer-name -from pyiceberg.catalog import Catalog, load_catalog -import pytest -from pyspark.sql import SparkSession - -@pytest.fixture(scope="session") -def session_catalog() -> Catalog: - return load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - - -@pytest.fixture(scope="session") -def spark() -> SparkSession: - import importlib.metadata - import os - - spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) - scala_version = "2.12" - iceberg_version = "1.4.3" - - os.environ["PYSPARK_SUBMIT_ARGS"] = ( - f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," - f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" - ) - os.environ["AWS_REGION"] = "us-east-1" - os.environ["AWS_ACCESS_KEY_ID"] = "admin" - os.environ["AWS_SECRET_ACCESS_KEY"] = "password" - - spark = ( - SparkSession.builder.appName("PyIceberg integration test") - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") - .config("spark.sql.catalog.integration.uri", "http://localhost:8181") - .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") - .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") - .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") - .config("spark.sql.catalog.integration.s3.path-style-access", "true") - .config("spark.sql.defaultCatalog", "integration") - .getOrCreate() - ) - - return spark \ No newline at end of file From d6e9e730d716d3324dba8d1ef752167c208a4f21 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 27 Feb 2024 18:17:20 +0000 Subject: [PATCH 5/6] add uuid partition type --- pyiceberg/partitioning.py | 31 ++++++--- tests/integration/test_partitioning_key.py | 74 +++++++--------------- 2 files changed, 47 insertions(+), 58 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index c34272b432..1fd244c2b4 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import uuid from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import date, datetime @@ -53,7 +54,16 @@ parse_transform, ) from pyiceberg.typedef import IcebergBaseModel, Record -from pyiceberg.types import DateType, IcebergType, NestedField, PrimitiveType, StructType, TimestampType, TimestamptzType +from pyiceberg.types import ( + DateType, + IcebergType, + NestedField, + PrimitiveType, + StructType, + TimestampType, + TimestamptzType, + UUIDType, +) from pyiceberg.utils.datetime import date_to_days, datetime_to_micros INITIAL_PARTITION_SPEC_ID = 0 @@ -370,7 +380,7 @@ class PartitionKey: schema: Schema @cached_property - def partition(self) -> Record: # partition key in iceberg type + def partition(self) -> Record: # partition key transformed with iceberg internal representation as input iceberg_typed_key_values = {} for raw_partition_field_value in self.raw_partition_field_values: partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] @@ -378,7 +388,7 @@ def partition(self) -> Record: # partition key in iceberg type raise ValueError("partition_fields must contain exactly one field.") partition_field = partition_fields[0] iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type - iceberg_typed_value = _to_iceberg_type(iceberg_type, raw_partition_field_value.value) + iceberg_typed_value = _to_iceberg_internal_representation(iceberg_type, raw_partition_field_value.value) transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) iceberg_typed_key_values[partition_field.name] = transformed_value return Record(**iceberg_typed_key_values) @@ -388,21 +398,26 @@ def to_path(self) -> str: @singledispatch -def _to_iceberg_type(type: IcebergType, value: Any) -> Any: +def _to_iceberg_internal_representation(type: IcebergType, value: Any) -> Any: return TypeError(f"Unsupported partition field type: {type}") -@_to_iceberg_type.register(TimestampType) -@_to_iceberg_type.register(TimestamptzType) +@_to_iceberg_internal_representation.register(TimestampType) +@_to_iceberg_internal_representation.register(TimestamptzType) def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: return datetime_to_micros(value) if value is not None else None -@_to_iceberg_type.register(DateType) +@_to_iceberg_internal_representation.register(DateType) def _(type: IcebergType, value: Optional[date]) -> Optional[int]: return date_to_days(value) if value is not None else None -@_to_iceberg_type.register(PrimitiveType) +@_to_iceberg_internal_representation.register(UUIDType) +def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]: + return str(value) if value is not None else None + + +@_to_iceberg_internal_representation.register(PrimitiveType) def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]: return value diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index b8815c7d96..12056bac1e 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name +import uuid from datetime import date, datetime, timedelta, timezone from decimal import Decimal from typing import Any, List @@ -50,56 +51,9 @@ StringType, TimestampType, TimestamptzType, + UUIDType, ) -# @pytest.fixture(scope="session") -# def session_catalog() -> Catalog: -# return load_catalog( -# "local", -# **{ -# "type": "rest", -# "uri": "http://localhost:8181", -# "s3.endpoint": "http://localhost:9000", -# "s3.access-key-id": "admin", -# "s3.secret-access-key": "password", -# }, -# ) - - -# @pytest.fixture(scope="session") -# def spark() -> SparkSession: -# import importlib.metadata -# import os - -# spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) -# scala_version = "2.12" -# iceberg_version = "1.4.3" - -# os.environ["PYSPARK_SUBMIT_ARGS"] = ( -# f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," -# f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" -# ) -# os.environ["AWS_REGION"] = "us-east-1" -# os.environ["AWS_ACCESS_KEY_ID"] = "admin" -# os.environ["AWS_SECRET_ACCESS_KEY"] = "password" - -# spark = ( -# SparkSession.builder.appName("PyIceberg integration test") -# .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") -# .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") -# .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") -# .config("spark.sql.catalog.integration.uri", "http://localhost:8181") -# .config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") -# .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") -# .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") -# .config("spark.sql.catalog.integration.s3.path-style-access", "true") -# .config("spark.sql.defaultCatalog", "integration") -# .getOrCreate() -# ) - -# return spark - - TABLE_SCHEMA = Schema( NestedField(field_id=1, name="boolean_field", field_type=BooleanType(), required=False), NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), @@ -112,10 +66,10 @@ NestedField(field_id=9, name="timestamptz_field", field_type=TimestamptzType(), required=False), NestedField(field_id=10, name="date_field", field_type=DateType(), required=False), # NestedField(field_id=11, name="time", field_type=TimeType(), required=False), - # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), NestedField(field_id=11, name="binary_field", field_type=BinaryType(), required=False), NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), - NestedField(field_id=13, name="decimal", field_type=DecimalType(5, 2), required=False), + NestedField(field_id=13, name="decimal_field", field_type=DecimalType(5, 2), required=False), + NestedField(field_id=14, name="uuid_field", field_type=UUIDType(), required=False), ) @@ -353,6 +307,25 @@ (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') """, ), + ( + [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], + [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], + Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), + "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", + f"""CREATE TABLE {identifier} ( + uuid_field string, + string_field string + ) + USING iceberg + PARTITIONED BY ( + identity(uuid_field) + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') + """, + ), ( [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], [b'example'], @@ -770,6 +743,7 @@ def test_partition_key( partition_spec=spec, schema=TABLE_SCHEMA, ) + # key.partition is used to write the metadata in DataFile, ManifestFile and all above layers assert key.partition == expected_partition_record # key.to_path() generates the hive partitioning part of the to-write parquet file path From 10348238ad3d92f6dd40344f85e283dc5cad4ad5 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 28 Feb 2024 17:53:21 +0000 Subject: [PATCH 6/6] clean up; rename ambiguous function name --- pyiceberg/partitioning.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 1fd244c2b4..6fa0286282 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -228,7 +228,7 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: field_strs = [] value_strs = [] for pos, value in enumerate(data.record_fields()): - partition_field = self.fields[pos] # partition field + partition_field = self.fields[pos] value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value) value_str = quote(value_str, safe='') @@ -388,7 +388,7 @@ def partition(self) -> Record: # partition key transformed with iceberg interna raise ValueError("partition_fields must contain exactly one field.") partition_field = partition_fields[0] iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type - iceberg_typed_value = _to_iceberg_internal_representation(iceberg_type, raw_partition_field_value.value) + iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value) transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) iceberg_typed_key_values[partition_field.name] = transformed_value return Record(**iceberg_typed_key_values) @@ -398,26 +398,26 @@ def to_path(self) -> str: @singledispatch -def _to_iceberg_internal_representation(type: IcebergType, value: Any) -> Any: +def _to_partition_representation(type: IcebergType, value: Any) -> Any: return TypeError(f"Unsupported partition field type: {type}") -@_to_iceberg_internal_representation.register(TimestampType) -@_to_iceberg_internal_representation.register(TimestamptzType) +@_to_partition_representation.register(TimestampType) +@_to_partition_representation.register(TimestamptzType) def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: return datetime_to_micros(value) if value is not None else None -@_to_iceberg_internal_representation.register(DateType) +@_to_partition_representation.register(DateType) def _(type: IcebergType, value: Optional[date]) -> Optional[int]: return date_to_days(value) if value is not None else None -@_to_iceberg_internal_representation.register(UUIDType) +@_to_partition_representation.register(UUIDType) def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]: return str(value) if value is not None else None -@_to_iceberg_internal_representation.register(PrimitiveType) +@_to_partition_representation.register(PrimitiveType) def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]: return value