Skip to content

Commit a8e91fb

Browse files
authored
Merge pull request #66 from pycompression/fixflushing
Fix flushing behavior in threading implementation.
2 parents 4f7a683 + 175df5b commit a8e91fb

File tree

3 files changed

+28
-14
lines changed

3 files changed

+28
-14
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ version develop
1111
-----------------
1212
+ Python 3.14 is supported.
1313
+ Python 3.8 and 3.9 are no longer supported.
14+
+ Fix an issue where flushing using igzip_threaded caused a gzip end of stream
15+
and started a new gzip stream. In essence creating a concatenated gzip
16+
stream. Now it is in concordance with how single threaded gzip streams
17+
are flushed using Z_SYNC_FLUSH.
1418
+ Switched to setuptools-scm for building the package rather than versioningit.
1519
+ Test files are added to the source distribution.
1620

src/zlib_ng/gzip_ng_threaded.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -321,30 +321,25 @@ def write(self, b) -> int:
321321
self.input_queues[worker_index].put((data, zdict))
322322
return len(data)
323323

324-
def _end_gzip_stream(self):
324+
def flush(self):
325325
self._check_closed()
326326
# Wait for all data to be compressed
327327
for in_q in self.input_queues:
328328
in_q.join()
329329
# Wait for all data to be written
330330
for out_q in self.output_queues:
331331
out_q.join()
332-
# Write an empty deflate block with a lost block marker.
332+
self.raw.flush()
333+
334+
def close(self):
335+
if self._closed:
336+
return
333337
self.raw.write(zlib_ng.compress(b"", wbits=-15))
334338
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
335339
self.raw.write(trailer)
336340
self._crc = 0
337341
self._size = 0
338342
self.raw.flush()
339-
340-
def flush(self):
341-
self._end_gzip_stream()
342-
self._write_gzip_header()
343-
344-
def close(self) -> None:
345-
if self._closed:
346-
return
347-
self._end_gzip_stream()
348343
self.stop()
349344
if self.exception:
350345
self.raw.close()

tests/test_gzip_ng_threaded.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import subprocess
1313
import sys
1414
import tempfile
15+
import zlib
1516
from pathlib import Path
1617

1718
import pytest
@@ -234,15 +235,29 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):
234235

235236
@pytest.mark.parametrize("threads", [1, 2])
236237
def test_flush(tmp_path, threads):
238+
empty_block_end = b"\x00\x00\xff\xff"
239+
compressobj = zlib.compressobj(wbits=-15)
240+
deflate_last_block = compressobj.compress(b"") + compressobj.flush()
237241
test_file = tmp_path / "output.gz"
238242
with gzip_ng_threaded.open(test_file, "wb", threads=threads) as f:
239243
f.write(b"1")
240244
f.flush()
241-
assert gzip.decompress(test_file.read_bytes()) == b"1"
245+
data = test_file.read_bytes()
246+
assert data[-4:] == empty_block_end
247+
# Cut off gzip header and end data with an explicit last block to
248+
# test if the data was compressed correctly.
249+
deflate_block = data[10:] + deflate_last_block
250+
assert zlib.decompress(deflate_block, wbits=-15) == b"1"
242251
f.write(b"2")
243252
f.flush()
244-
assert gzip.decompress(test_file.read_bytes()) == b"12"
253+
data = test_file.read_bytes()
254+
assert data[-4:] == empty_block_end
255+
deflate_block = data[10:] + deflate_last_block
256+
assert zlib.decompress(deflate_block, wbits=-15) == b"12"
245257
f.write(b"3")
246258
f.flush()
247-
assert gzip.decompress(test_file.read_bytes()) == b"123"
259+
data = test_file.read_bytes()
260+
assert data[-4:] == empty_block_end
261+
deflate_block = data[10:] + deflate_last_block
262+
assert zlib.decompress(deflate_block, wbits=-15) == b"123"
248263
assert gzip.decompress(test_file.read_bytes()) == b"123"

0 commit comments

Comments
 (0)