Skip to content

Commit 3ee9f4a

Browse files
Partition Evolution Support
1 parent 339ba53 commit 3ee9f4a

File tree

7 files changed

+968
-14
lines changed

7 files changed

+968
-14
lines changed

mkdocs/docs/api.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,63 @@ with table.update_schema(allow_incompatible_changes=True) as update:
418418
update.delete_column("some_field")
419419
```
420420

421+
## Partition evolution
422+
423+
PyIceberg supports partition evolution. See the [partition evolution](https://iceberg.apache.org/spec/#partition-evolution)
424+
for more details.
425+
426+
The API to use when evolving partitions is the `update_spec` API on the table.
427+
428+
```python
429+
with table.update_spec() as update:
430+
update.add_field("id", BucketTransform(16), "bucketed_id")
431+
update.add_field("event_ts", DayTransform(), "day_ts")
432+
```
433+
434+
Updating the partition spec can also be done as part of a transaction with other operations.
435+
436+
```python
437+
with table.transaction() as transaction:
438+
with transaction.update_spec() as update_spec:
439+
update_spec.add_field("id", BucketTransform(16), "bucketed_id")
440+
update_spec.add_field("event_ts", DayTransform(), "day_ts")
441+
# ... Update properties etc
442+
```
443+
444+
### Add fields
445+
446+
New partition fields can be added via the `add_field` API which takes in the field name to partition on,
447+
the partition transform, and an optional partition name. If the partition name is not specified,
448+
one will be created.
449+
450+
```python
451+
with table.update_spec() as update:
452+
update.add_field("id", BucketTransform(16), "bucketed_id")
453+
update.add_field("event_ts", DayTransform(), "day_ts")
454+
# identity is a shortcut API for adding an IdentityTransform
455+
update.identity("some_field")
456+
```
457+
458+
### Remove fields
459+
460+
Partition fields can also be removed via the `remove_field` API if it no longer makes sense to partition on those fields.
461+
462+
```python
463+
with table.update_spec() as update:some_partition_name
464+
# Remove the partition field with the name
465+
update.remove_field("some_partition_name")
466+
```
467+
468+
### Rename fields
469+
470+
Partition fields can also be renamed via the `rename_field` API.
471+
472+
```python
473+
with table.update_spec() as update:
474+
# Rename the partition field with the name bucketed_id to sharded_id
475+
update.rename_field("bucketed_id", "sharded_id")
476+
```
477+
421478
## Table properties
422479

423480
Set and remove properties through the `Transaction` API:

pyiceberg/catalog/hive.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ def create_table(
290290
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
291291

292292
properties = {**DEFAULT_PROPERTIES, **properties}
293+
print(f"Hive catalog properties {properties}")
293294
database_name, table_name = self.identifier_to_database_and_table(identifier)
294295
current_time_millis = int(time.time() * 1000)
295296

@@ -303,6 +304,7 @@ def create_table(
303304
sort_order=sort_order,
304305
properties=properties,
305306
)
307+
print(f"Hive metadata is {metadata}")
306308
io = load_file_io({**self.properties, **properties}, location=location)
307309
self._write_metadata(metadata, io, metadata_location)
308310

pyiceberg/partitioning.py

Lines changed: 121 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,9 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from functools import cached_property
20-
from typing import (
21-
Any,
22-
Dict,
23-
List,
24-
Optional,
25-
Tuple,
26-
)
19+
from abc import ABC, abstractmethod
20+
from functools import cached_property, singledispatch
21+
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
2722

2823
from pydantic import (
2924
BeforeValidator,
@@ -34,7 +29,18 @@
3429
from typing_extensions import Annotated
3530

3631
from pyiceberg.schema import Schema
37-
from pyiceberg.transforms import Transform, parse_transform
32+
from pyiceberg.transforms import (
33+
BucketTransform,
34+
DayTransform,
35+
HourTransform,
36+
IdentityTransform,
37+
Transform,
38+
TruncateTransform,
39+
UnknownTransform,
40+
VoidTransform,
41+
YearTransform,
42+
parse_transform,
43+
)
3844
from pyiceberg.typedef import IcebergBaseModel
3945
from pyiceberg.types import NestedField, StructType
4046

@@ -143,7 +149,7 @@ def is_unpartitioned(self) -> bool:
143149
def last_assigned_field_id(self) -> int:
144150
if self.fields:
145151
return max(pf.field_id for pf in self.fields)
146-
return PARTITION_FIELD_ID_START
152+
return PARTITION_FIELD_ID_START - 1
147153

148154
@cached_property
149155
def source_id_to_fields_map(self) -> Dict[int, List[PartitionField]]:
@@ -215,3 +221,108 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
215221
)
216222
)
217223
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
224+
225+
226+
T = TypeVar("T")
227+
228+
229+
class PartitionSpecVisitor(Generic[T], ABC):
230+
@abstractmethod
231+
def identity(self, field_id: int, source_name: str, source_id: int) -> T:
232+
"""Visit identity partition field."""
233+
234+
@abstractmethod
235+
def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> T:
236+
"""Visit bucket partition field."""
237+
238+
@abstractmethod
239+
def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> T:
240+
"""Visit truncate partition field."""
241+
242+
@abstractmethod
243+
def year(self, field_id: int, source_name: str, source_id: int) -> T:
244+
"""Visit year partition field."""
245+
246+
@abstractmethod
247+
def month(self, field_id: int, source_name: str, source_id: int) -> T:
248+
"""Visit month partition field."""
249+
250+
@abstractmethod
251+
def day(self, field_id: int, source_name: str, source_id: int) -> T:
252+
"""Visit day partition field."""
253+
254+
@abstractmethod
255+
def hour(self, field_id: int, source_name: str, source_id: int) -> T:
256+
"""Visit hour partition field."""
257+
258+
@abstractmethod
259+
def always_null(self, field_id: int, source_name: str, source_id: int) -> T:
260+
"""Visit void partition field."""
261+
262+
@abstractmethod
263+
def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> T:
264+
"""Visit unknown partition field."""
265+
raise ValueError(f"Unknown transform is not supported: {transform}")
266+
267+
268+
class _PartitionNameGenerator(PartitionSpecVisitor[str]):
269+
def identity(self, field_id: int, source_name: str, source_id: int) -> str:
270+
return source_name
271+
272+
def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> str:
273+
return f"{source_name}_bucket_{num_buckets}"
274+
275+
def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> str:
276+
return source_name + "_trunc_" + str(width)
277+
278+
def year(self, field_id: int, source_name: str, source_id: int) -> str:
279+
return source_name + "_year"
280+
281+
def month(self, field_id: int, source_name: str, source_id: int) -> str:
282+
return source_name + "_month"
283+
284+
def day(self, field_id: int, source_name: str, source_id: int) -> str:
285+
return source_name + "_day"
286+
287+
def hour(self, field_id: int, source_name: str, source_id: int) -> str:
288+
return source_name + "_hour"
289+
290+
def always_null(self, field_id: int, source_name: str, source_id: int) -> str:
291+
return source_name + "_null"
292+
293+
def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> str:
294+
return super().unknown(field_id, source_name, source_id, transform)
295+
296+
297+
R = TypeVar("R")
298+
299+
300+
@singledispatch
301+
def _visit(spec: PartitionSpec, schema: Schema, visitor: PartitionSpecVisitor[R]) -> List[R]:
302+
return [_visit_partition_field(schema, field, visitor) for field in spec.fields]
303+
304+
305+
def _visit_partition_field(schema: Schema, field: PartitionField, visitor: PartitionSpecVisitor[R]) -> R:
306+
source_name = schema.find_column_name(field.source_id)
307+
if not source_name:
308+
raise ValueError(f"Could not find field with id {field.source_id}")
309+
310+
transform = field.transform
311+
if isinstance(transform, IdentityTransform):
312+
return visitor.identity(field.field_id, source_name, field.source_id)
313+
elif isinstance(transform, BucketTransform):
314+
return visitor.bucket(field.field_id, source_name, field.source_id, transform.num_buckets)
315+
elif isinstance(transform, TruncateTransform):
316+
return visitor.truncate(field.field_id, source_name, field.source_id, transform.width)
317+
elif isinstance(transform, DayTransform):
318+
return visitor.day(field.field_id, source_name, field.source_id)
319+
elif isinstance(transform, HourTransform):
320+
return visitor.hour(field.field_id, source_name, field.source_id)
321+
elif isinstance(transform, YearTransform):
322+
return visitor.year(field.field_id, source_name, field.source_id)
323+
elif isinstance(transform, VoidTransform):
324+
return visitor.always_null(field.field_id, source_name, field.source_id)
325+
elif isinstance(transform, UnknownTransform):
326+
return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform))
327+
else:
328+
raise ValueError(f"Unknown transform {transform}")

0 commit comments

Comments
 (0)