Skip to content

Commit 1ccb31d

Browse files
committed
Add a partitioned overwrite test
1 parent 05fcf2d commit 1ccb31d

File tree

4 files changed

+87
-12
lines changed

4 files changed

+87
-12
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2084,9 +2084,9 @@ def _dataframe_to_data_files(
20842084
]),
20852085
)
20862086
else:
2087-
from pyiceberg.table import determine_partitions
2087+
from pyiceberg.table import _determine_partitions
20882088

2089-
partitions = determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
2089+
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
20902090
yield from write_file(
20912091
io=io,
20922092
table_metadata=table_metadata,

pyiceberg/table/__init__.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ class TableProperties:
249249
DELETE_MODE = "write.delete.mode"
250250
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
251251
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
252+
DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE
252253

253254
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
254255
FORMAT_VERSION = "format-version"
@@ -533,6 +534,12 @@ def overwrite(
533534
"""
534535
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
535536
537+
An overwrite may produce zero or more snapshots based on the operation:
538+
539+
- DELETE: In case existing Parquet files can be dropped completely.
540+
- REPLACE: In case existing Parquet files need to be rewritten.
541+
- APPEND: In case new data is being inserted into the table.
542+
536543
Args:
537544
df: The Arrow dataframe that will be used to overwrite the table
538545
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
@@ -547,8 +554,12 @@ def overwrite(
547554
if not isinstance(df, pa.Table):
548555
raise ValueError(f"Expected PyArrow table, got: {df}")
549556

550-
if len(self._table.spec().fields) > 0:
551-
raise ValueError("Cannot write to partitioned tables")
557+
if unsupported_partitions := [
558+
field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
559+
]:
560+
raise ValueError(
561+
f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
562+
)
552563

553564
_check_schema_compatible(self._table.schema(), other_schema=df.schema)
554565
# cast if the two schemas are compatible but not equal
@@ -568,8 +579,15 @@ def overwrite(
568579
update_snapshot.append_data_file(data_file)
569580

570581
def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
582+
"""
583+
Shorthand for deleting record from a table.
584+
585+
Args:
586+
delete_filter: A boolean expression to delete rows from a table
587+
snapshot_properties: Custom properties to be added to the snapshot summary
588+
"""
571589
if (
572-
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_COPY_ON_WRITE)
590+
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
573591
== TableProperties.DELETE_MODE_MERGE_ON_READ
574592
):
575593
warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")
@@ -1562,6 +1580,12 @@ def overwrite(
15621580
"""
15631581
Shorthand for overwriting the table with a PyArrow table.
15641582
1583+
An overwrite may produce zero or more snapshots based on the operation:
1584+
1585+
- DELETE: In case existing Parquet files can be dropped completely.
1586+
- REPLACE: In case existing Parquet files need to be rewritten.
1587+
- APPEND: In case new data is being inserted into the table.
1588+
15651589
Args:
15661590
df: The Arrow dataframe that will be used to overwrite the table
15671591
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
@@ -4326,7 +4350,7 @@ def _get_table_partitions(
43264350
return table_partitions
43274351

43284352

4329-
def determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[TablePartition]:
4353+
def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[TablePartition]:
43304354
"""Based on the iceberg table partition spec, slice the arrow table into partitions with their keys.
43314355
43324356
Example:

tests/integration/test_deletes.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCa
137137

138138

139139
@pytest.mark.integration
140-
def test_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
140+
def test_delete_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
141141
identifier = "default.table_partitioned_delete"
142142

143143
run_spark_commands(
@@ -175,7 +175,7 @@ def test_partitioned_table_positional_deletes(spark: SparkSession, session_catal
175175
assert len(files) == 1
176176
assert len(files[0].delete_files) == 1
177177

178-
# Will rewrite a data file with a positional delete
178+
# Will rewrite a data file without the positional delete
179179
tbl.delete(EqualTo("number", 40))
180180

181181
# One positional delete has been added, but an OVERWRITE status is set
@@ -184,6 +184,57 @@ def test_partitioned_table_positional_deletes(spark: SparkSession, session_catal
184184
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}
185185

186186

187+
@pytest.mark.integration
188+
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None:
189+
identifier = "default.table_partitioned_delete"
190+
191+
run_spark_commands(
192+
spark,
193+
[
194+
f"DROP TABLE IF EXISTS {identifier}",
195+
f"""
196+
CREATE TABLE {identifier} (
197+
number_partitioned int,
198+
number int
199+
)
200+
USING iceberg
201+
PARTITIONED BY (number_partitioned)
202+
TBLPROPERTIES(
203+
'format-version' = 2,
204+
'write.delete.mode'='merge-on-read',
205+
'write.update.mode'='merge-on-read',
206+
'write.merge.mode'='merge-on-read'
207+
)
208+
""",
209+
f"""
210+
INSERT INTO {identifier} VALUES (10, 1), (10, 2), (20, 3)
211+
""",
212+
],
213+
)
214+
215+
tbl = session_catalog.load_table(identifier)
216+
217+
files = list(tbl.scan().plan_files())
218+
assert len(files) == 2
219+
220+
arrow_schema = pa.schema([pa.field("number_partitioned", pa.int32()), pa.field("number", pa.int32())])
221+
arrow_tbl = pa.Table.from_pylist(
222+
[
223+
{"number_partitioned": 10, "number": 4},
224+
{"number_partitioned": 10, "number": 5},
225+
],
226+
schema=arrow_schema,
227+
)
228+
229+
# Will rewrite a data file without the positional delete
230+
tbl.overwrite(arrow_tbl, "number_partitioned == 10")
231+
232+
# One positional delete has been added, but an OVERWRITE status is set
233+
# https://github.com/apache/iceberg/issues/10122
234+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "append"]
235+
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10, 20], "number": [4, 5, 3]}
236+
237+
187238
@pytest.mark.integration
188239
def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None:
189240
identifier = "default.table_partitioned_delete_sequence_number"
@@ -225,7 +276,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
225276
files = list(tbl.scan().plan_files())
226277
assert len(files) == 2
227278

228-
# Will rewrite a data file with a positional delete
279+
# Will rewrite a data file without a positional delete
229280
tbl.delete(EqualTo("number", 201))
230281

231282
# One positional delete has been added, but an OVERWRITE status is set

tests/table/test_init.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@
6464
UpdateSchema,
6565
_apply_table_update,
6666
_check_schema_compatible,
67+
_determine_partitions,
6768
_match_deletes_to_data_file,
6869
_TableMetadataUpdateContext,
69-
determine_partitions,
7070
update_table_metadata,
7171
)
7272
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
@@ -1270,7 +1270,7 @@ def test_partition_for_demo() -> None:
12701270
PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="n_legs_identity"),
12711271
PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="year_identity"),
12721272
)
1273-
result = determine_partitions(partition_spec, test_schema, arrow_table)
1273+
result = _determine_partitions(partition_spec, test_schema, arrow_table)
12741274
assert {table_partition.partition_key.partition for table_partition in result} == {
12751275
Record(n_legs_identity=2, year_identity=2020),
12761276
Record(n_legs_identity=100, year_identity=2021),
@@ -1320,7 +1320,7 @@ def test_identity_partition_on_multi_columns() -> None:
13201320
}
13211321
arrow_table = pa.Table.from_pydict(test_data, schema=test_pa_schema)
13221322

1323-
result = determine_partitions(partition_spec, test_schema, arrow_table)
1323+
result = _determine_partitions(partition_spec, test_schema, arrow_table)
13241324

13251325
assert {table_partition.partition_key.partition for table_partition in result} == expected
13261326
concatenated_arrow_table = pa.concat_tables([table_partition.arrow_table_partition for table_partition in result])

0 commit comments

Comments
 (0)