Skip to content
4 changes: 4 additions & 0 deletions pyiceberg/avro/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any
from uuid import UUID

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
Expand Down Expand Up @@ -74,3 +75,6 @@ def write_uuid(self, uuid: UUID) -> None:
if len(uuid.bytes) != 16:
raise ValueError(f"Expected UUID to have 16 bytes, got: len({uuid.bytes!r})")
return self.write(uuid.bytes)

def write_unknown(self, _: Any) -> None:
"""Nulls are written as 0 bytes in avro, so we do nothing."""
8 changes: 8 additions & 0 deletions pyiceberg/avro/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip(16)


class UnknownReader(Reader):
def read(self, decoder: BinaryDecoder) -> None:
return None

def skip(self, decoder: BinaryDecoder) -> None:
pass


@dataclass(frozen=True)
class FixedReader(Reader):
_len: int = dataclassfield()
Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/avro/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
TimeReader,
TimestampReader,
TimestamptzReader,
UnknownReader,
UUIDReader,
)
from pyiceberg.avro.writer import (
Expand All @@ -66,6 +67,7 @@
TimestamptzWriter,
TimestampWriter,
TimeWriter,
UnknownWriter,
UUIDWriter,
Writer,
)
Expand Down Expand Up @@ -100,6 +102,7 @@
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
)

Expand Down Expand Up @@ -193,6 +196,9 @@ def visit_uuid(self, uuid_type: UUIDType) -> Writer:
def visit_binary(self, binary_type: BinaryType) -> Writer:
return BinaryWriter()

def visit_unknown(self, unknown_type: UnknownType) -> Writer:
return UnknownWriter()


CONSTRUCT_WRITER_VISITOR = ConstructWriter()

Expand Down Expand Up @@ -341,6 +347,9 @@ def visit_fixed(self, fixed_type: FixedType, partner: Optional[IcebergType]) ->
def visit_binary(self, binary_type: BinaryType, partner: Optional[IcebergType]) -> Writer:
return BinaryWriter()

def visit_unknown(self, unknown_type: UnknownType, partner: Optional[IcebergType]) -> Writer:
return UnknownWriter()


class ReadSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
__slots__ = ("read_types", "read_enums", "context")
Expand Down Expand Up @@ -471,6 +480,9 @@ def visit_fixed(self, fixed_type: FixedType, partner: Optional[IcebergType]) ->
def visit_binary(self, binary_type: BinaryType, partner: Optional[IcebergType]) -> Reader:
return BinaryReader()

def visit_unknown(self, unknown_type: UnknownType, partner: Optional[IcebergType]) -> Reader:
return UnknownReader()


class SchemaPartnerAccessor(PartnerAccessor[IcebergType]):
def schema_partner(self, partner: Optional[IcebergType]) -> Optional[IcebergType]:
Expand Down
6 changes: 6 additions & 0 deletions pyiceberg/avro/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ def write(self, encoder: BinaryEncoder, val: UUID) -> None:
encoder.write(val.bytes)


@dataclass(frozen=True)
class UnknownWriter(Writer):
def write(self, encoder: BinaryEncoder, val: Any) -> None:
encoder.write_unknown(val)


@dataclass(frozen=True)
class FixedWriter(Writer):
_len: int = dataclassfield()
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
)
from pyiceberg.utils.properties import property_as_bool, property_as_float
Expand Down Expand Up @@ -221,6 +222,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD
UUIDType: "string",
BinaryType: "binary",
FixedType: "binary",
UnknownType: "void",
}


Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
strtobool,
)
Expand Down Expand Up @@ -154,6 +155,12 @@ def _(_: DecimalType, value_str: str) -> Decimal:
return Decimal(value_str)


@partition_to_py.register(UnknownType)
@handle_none
def _(type_: UnknownType, _: str) -> None:
return None


@singledispatch
def to_bytes(
primitive_type: PrimitiveType, _: Union[bool, bytes, Decimal, date, datetime, float, int, str, time, uuid.UUID]
Expand Down Expand Up @@ -324,3 +331,8 @@ def _(_: PrimitiveType, b: bytes) -> bytes:
def _(primitive_type: DecimalType, buf: bytes) -> Decimal:
unscaled = int.from_bytes(buf, "big", signed=True)
return unscaled_to_decimal(unscaled, primitive_type.scale)


@from_bytes.register(UnknownType)
def _(type_: UnknownType, buf: bytes) -> None:
return None
9 changes: 9 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
)
from pyiceberg.utils.concurrent import ExecutorFactory
Expand Down Expand Up @@ -659,6 +660,9 @@ def visit_string(self, _: StringType) -> pa.DataType:
def visit_uuid(self, _: UUIDType) -> pa.DataType:
return pa.binary(16)

def visit_unknown(self, _: UnknownType) -> pa.DataType:
return pa.null()

def visit_binary(self, _: BinaryType) -> pa.DataType:
return pa.large_binary()

Expand Down Expand Up @@ -1209,6 +1213,8 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
elif pa.types.is_fixed_size_binary(primitive):
primitive = cast(pa.FixedSizeBinaryType, primitive)
return FixedType(primitive.byte_width)
elif pa.types.is_null(primitive):
return UnknownType()

raise TypeError(f"Unsupported type: {primitive}")

Expand Down Expand Up @@ -1919,6 +1925,9 @@ def visit_uuid(self, uuid_type: UUIDType) -> str:
def visit_binary(self, binary_type: BinaryType) -> str:
return "BYTE_ARRAY"

def visit_unknown(self, unknown_type: UnknownType) -> str:
return "UNKNOWN"


_PRIMITIVE_TO_PHYSICAL_TYPE_VISITOR = PrimitiveToPhysicalType()

Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ def _(type: IcebergType, value: Optional[Union[int, datetime]]) -> Optional[int]
elif isinstance(value, datetime):
return datetime_to_micros(value)
else:
raise ValueError(f"Unknown type: {value}")
raise ValueError(f"Type not recognized: {value}")


@_to_partition_representation.register(DateType)
Expand All @@ -456,7 +456,7 @@ def _(type: IcebergType, value: Optional[Union[int, date]]) -> Optional[int]:
elif isinstance(value, date):
return date_to_days(value)
else:
raise ValueError(f"Unknown type: {value}")
raise ValueError(f"Type not recognized: {value}")


@_to_partition_representation.register(TimeType)
Expand Down
17 changes: 15 additions & 2 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
)

Expand Down Expand Up @@ -531,8 +532,10 @@ def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) ->
return self.visit_fixed(primitive, primitive_partner)
elif isinstance(primitive, BinaryType):
return self.visit_binary(primitive, primitive_partner)
elif isinstance(primitive, UnknownType):
return self.visit_unknown(primitive, primitive_partner)
else:
raise ValueError(f"Unknown type: {primitive}")
raise ValueError(f"Type not recognized: {primitive}")

@abstractmethod
def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P]) -> T:
Expand Down Expand Up @@ -590,6 +593,10 @@ def visit_fixed(self, fixed_type: FixedType, partner: Optional[P]) -> T:
def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
"""Visit a BinaryType."""

@abstractmethod
def visit_unknown(self, unknown_type: UnknownType, partner: Optional[P]) -> T:
"""Visit a UnknownType."""


class PartnerAccessor(Generic[P], ABC):
@abstractmethod
Expand Down Expand Up @@ -707,8 +714,10 @@ def primitive(self, primitive: PrimitiveType) -> T:
return self.visit_uuid(primitive)
elif isinstance(primitive, BinaryType):
return self.visit_binary(primitive)
elif isinstance(primitive, UnknownType):
return self.visit_unknown(primitive)
else:
raise ValueError(f"Unknown type: {primitive}")
raise ValueError(f"Type not recognized: {primitive}")

@abstractmethod
def visit_fixed(self, fixed_type: FixedType) -> T:
Expand Down Expand Up @@ -766,6 +775,10 @@ def visit_uuid(self, uuid_type: UUIDType) -> T:
def visit_binary(self, binary_type: BinaryType) -> T:
"""Visit a BinaryType."""

@abstractmethod
def visit_unknown(self, unknown_type: UnknownType) -> T:
"""Visit a UnknownType."""


@dataclass(init=True, eq=True, frozen=True)
class Accessor:
Expand Down
20 changes: 19 additions & 1 deletion pyiceberg/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ def handle_primitive_type(cls, v: Any, handler: ValidatorFunctionWrapHandler) ->
return UUIDType()
if v == "binary":
return BinaryType()
if v == "unknown":
return UnknownType()
if v.startswith("fixed"):
return FixedType(_parse_fixed_type(v))
if v.startswith("decimal"):
precision, scale = _parse_decimal_type(v)
return DecimalType(precision, scale)
else:
raise ValueError(f"Unknown type: {v}")
raise ValueError(f"Type not recognized: {v}")
if isinstance(v, dict) and cls == IcebergType:
complex_type = v.get("type")
if complex_type == "list":
Expand Down Expand Up @@ -747,3 +749,19 @@ class BinaryType(PrimitiveType):
"""

root: Literal["binary"] = Field(default="binary")


class UnknownType(PrimitiveType):
"""An unknown data type in Iceberg can be represented using an instance of this class.

Unknowns in Iceberg are used to represent data types that are not known at the time of writing.

Example:
>>> column_foo = UnknownType()
>>> isinstance(column_foo, UnknownType)
True
>>> column_foo
UnknownType()
"""

root: Literal["unknown"] = Field(default="unknown")
9 changes: 7 additions & 2 deletions pyiceberg/utils/schema_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
)
from pyiceberg.utils.decimal import decimal_required_bytes
Expand All @@ -62,6 +63,7 @@
"long": LongType(),
"string": StringType(),
"enum": StringType(),
"null": UnknownType(),
}

LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = {
Expand Down Expand Up @@ -209,9 +211,9 @@ def _convert_schema(self, avro_type: Union[str, Dict[str, Any]]) -> IcebergType:
elif isinstance(type_identifier, str) and type_identifier in PRIMITIVE_FIELD_TYPE_MAPPING:
return PRIMITIVE_FIELD_TYPE_MAPPING[type_identifier]
else:
raise TypeError(f"Unknown type: {avro_type}")
raise TypeError(f"Type not recognized: {avro_type}")
else:
raise TypeError(f"Unknown type: {avro_type}")
raise TypeError(f"Type not recognized: {avro_type}")

def _convert_field(self, field: Dict[str, Any]) -> NestedField:
"""Convert an Avro field into an Iceberg equivalent field.
Expand Down Expand Up @@ -618,3 +620,6 @@ def visit_uuid(self, uuid_type: UUIDType) -> AvroType:

def visit_binary(self, binary_type: BinaryType) -> AvroType:
return "bytes"

def visit_unknown(self, unknown_type: UnknownType) -> AvroType:
return "null"
11 changes: 3 additions & 8 deletions tests/avro/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TimeReader,
TimestampReader,
TimestamptzReader,
UnknownReader,
UUIDReader,
)
from pyiceberg.avro.resolver import construct_reader
Expand All @@ -55,12 +56,12 @@
IntegerType,
LongType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UnknownType,
UUIDType,
)

Expand Down Expand Up @@ -325,13 +326,7 @@ def test_binary_reader() -> None:


def test_unknown_type() -> None:
class UnknownType(PrimitiveType):
root: str = "UnknownType"

with pytest.raises(ValueError) as exc_info:
construct_reader(UnknownType())

assert "Unknown type:" in str(exc_info.value)
assert construct_reader(UnknownType()) == UnknownReader()


def test_uuid_reader() -> None:
Expand Down
Loading