Skip to content

Commit e89cfb7

Browse files
committed
Relax the constaints a bit
1 parent 7ae8f5f commit e89cfb7

File tree

4 files changed

+24
-30
lines changed

4 files changed

+24
-30
lines changed

pyiceberg/avro/file.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ class AvroOutputFile(Generic[D]):
228228
encoder: BinaryEncoder
229229
sync_bytes: bytes
230230
writer: Writer
231-
records_written: int
232231

233232
def __init__(
234233
self,
@@ -248,7 +247,6 @@ def __init__(
248247
else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
249248
)
250249
self.metadata = metadata
251-
self.records_written = 0
252250

253251
def __enter__(self) -> AvroOutputFile[D]:
254252
"""
@@ -268,12 +266,6 @@ def __exit__(
268266
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
269267
) -> None:
270268
"""Perform cleanup when exiting the scope of a 'with' statement."""
271-
if self.records_written == 0:
272-
# This is very opinionated, as for Iceberg we should not write empty metadata.
273-
# The `write_block` method should be called at least once to make sure that we
274-
# write the number of blocks and more.
275-
raise ValueError("No records have been written for this Avro file.")
276-
277269
self.output_stream.close()
278270

279271
def _write_header(self) -> None:
@@ -286,15 +278,9 @@ def write_block(self, objects: List[D]) -> None:
286278
in_memory = io.BytesIO()
287279
block_content_encoder = BinaryEncoder(output_stream=in_memory)
288280

289-
records_written_in_block = 0
290281
for obj in objects:
291282
self.writer.write(block_content_encoder, obj)
292-
records_written_in_block += 1
293-
294-
if records_written_in_block == 0:
295-
raise ValueError("No records have been written in this block.")
296283

297-
self.records_written += records_written_in_block
298284
block_content = in_memory.getvalue()
299285

300286
self.encoder.write_int(len(objects))

pyiceberg/manifest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,10 @@ def __exit__(
685685
traceback: Optional[TracebackType],
686686
) -> None:
687687
"""Close the writer."""
688+
if (self._added_files + self._existing_files + self._deleted_files) == 0:
689+
# This is just a guard to ensure that we don't write empty manifest files
690+
raise ValueError("An empty manifest file has been written")
691+
688692
self.closed = True
689693
self._writer.__exit__(exc_type, exc_value, traceback)
690694

@@ -757,6 +761,8 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
757761
elif entry.status == ManifestEntryStatus.DELETED:
758762
self._deleted_files += 1
759763
self._deleted_rows += entry.data_file.record_count
764+
else:
765+
raise ValueError(f"Unknown entry: {entry.status}")
760766

761767
self._partitions.append(entry.data_file.partition)
762768

tests/avro/test_file.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -394,18 +394,3 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
394394
for idx, field in enumerate(all_primitives_schema.as_struct()):
395395
assert record[idx] == avro_entry[idx], f"Invalid {field}"
396396
assert record[idx] == avro_entry_read_with_fastavro[idx], f"Invalid {field} read with fastavro"
397-
398-
399-
def test_forbid_writing_empty_file() -> None:
400-
with TemporaryDirectory() as tmpdir:
401-
tmp_avro_file = tmpdir + "/manifest_entry.avro"
402-
403-
with pytest.raises(ValueError, match="No records have been written for this Avro file."):
404-
with avro.AvroOutputFile[ManifestEntry](
405-
output_file=PyArrowFileIO().new_output(tmp_avro_file),
406-
file_schema=MANIFEST_ENTRY_SCHEMAS[1],
407-
schema_name="manifest_entry",
408-
record_schema=MANIFEST_ENTRY_SCHEMAS[2],
409-
) as out:
410-
with pytest.raises(ValueError, match="No records have been written in this block."):
411-
out.write_block([])

tests/utils/test_manifest.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
write_manifest,
3636
write_manifest_list,
3737
)
38-
from pyiceberg.partitioning import PartitionField, PartitionSpec
38+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
3939
from pyiceberg.schema import Schema
4040
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
4141
from pyiceberg.transforms import IdentityTransform
@@ -306,6 +306,23 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
306306
assert entry.status == ManifestEntryStatus.ADDED
307307

308308

309+
def test_write_empty_manifest() -> None:
310+
io = load_file_io()
311+
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
312+
with TemporaryDirectory() as tmpdir:
313+
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
314+
315+
with pytest.raises(ValueError, match="An empty manifest file has been written"):
316+
with write_manifest(
317+
format_version=1,
318+
spec=UNPARTITIONED_PARTITION_SPEC,
319+
schema=test_schema,
320+
output_file=io.new_output(tmp_avro_file),
321+
snapshot_id=8744736658442914487,
322+
) as _:
323+
pass
324+
325+
309326
@pytest.mark.parametrize("format_version", [1, 2])
310327
def test_write_manifest(
311328
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion

0 commit comments

Comments
 (0)