From eba7ed17637aeff58a272d517b1a796581033be7 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 5 Feb 2024 16:51:26 +0100 Subject: [PATCH 1/9] Store compression program settings in dedicated objects, rather than through subclassing --- src/xopen/__init__.py | 298 ++++++++++-------------------------------- tests/test_piped.py | 212 +++++++++++++++--------------- 2 files changed, 178 insertions(+), 332 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 96223c4..e256ccc 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -4,14 +4,7 @@ __all__ = [ "xopen", - "PipedGzipProgram", - "PipedIGzipReader", - "PipedIGzipProgram", - "PipedPigzProgram", - "PipedPBzip2Program", - "PipedXzProgram", - "PipedZstdProgram", - "PipedPythonIsalProgram", + "_PipedCompressionProgram", "__version__", ] @@ -27,17 +20,21 @@ import subprocess import tempfile import time +import typing from subprocess import Popen, PIPE from typing import ( + Any, + Dict, Optional, Union, TextIO, IO, - List, - Set, + Sequence, + Container, overload, BinaryIO, Literal, + Tuple, ) from types import ModuleType @@ -99,6 +96,38 @@ FilePath = Union[str, bytes, os.PathLike] +# Rather than using a dict, use a NamedTuple with _asdict to enforce presence +# of certain members and type checking. +class _ProgramSettings(typing.NamedTuple): + program_args: Tuple[str, ...] + acceptable_compression_levels: Tuple[int, ...] = tuple(range(1, 10)) + threads_flag: Optional[str] = None + # This exit code is not interpreted as an error when terminating the process + allowed_exit_code: Optional[int] = -signal.SIGTERM + # If this message is printed on stderr on terminating the process, + # it is not interpreted as an error + allowed_exit_message: Optional[bytes] = None + + +PROGRAM_SETTINGS: Dict[str, _ProgramSettings] = { + "pbzip2": _ProgramSettings( + ("pbzip2",), + tuple(range(1, 10)), + "-p", + allowed_exit_code=None, + allowed_exit_message=b"\n *Control-C or similar caught [sig=15], quitting...", + ), + "xz": _ProgramSettings(("xz",), tuple(range(0, 10)), "-T"), + "zstd": _ProgramSettings(("zstd",), tuple(range(1, 20)), "-T"), + "pigz": _ProgramSettings(("pigz", "--no-name"), tuple(range(0, 10)) + (11,), "-p"), + "gzip": _ProgramSettings(("gzip", "--no-name"), tuple(range(1, 10))), +} + + +def _program_settings(program: str) -> Dict[str, Any]: + return PROGRAM_SETTINGS[program]._asdict() + + def _available_cpu_count() -> int: """ Number of available virtual or physical CPUs on this system @@ -161,7 +190,7 @@ def _can_read_concatenated_gz(program: str) -> bool: os.remove(temp_path) -class PipedCompressionProgram(io.IOBase): +class _PipedCompressionProgram(io.IOBase): """ Read and write compressed files by running an external process and piping into it. """ @@ -169,16 +198,17 @@ class PipedCompressionProgram(io.IOBase): def __init__( # noqa: C901 self, path: FilePath, - program_args: List[str], mode="rb", compresslevel: Optional[int] = None, - threads_flag: Optional[str] = None, threads: Optional[int] = None, + program_args: Optional[Sequence[str]] = None, + threads_flag: Optional[str] = None, # This exit code is not interpreted as an error when terminating the process allowed_exit_code: Optional[int] = -signal.SIGTERM, # If this message is printed on stderr on terminating the process, # it is not interpreted as an error allowed_exit_message: Optional[bytes] = None, + acceptable_compression_levels: Container[int] = tuple(range(0, 10)), ): """ mode -- one of 'w', 'wb', 'a', 'ab' @@ -189,14 +219,23 @@ def __init__( # noqa: C901 used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to use all available cores. """ + if program_args is None: + program_args = ("gzip", "--no-name") self._error_raised = False - self._program_args = program_args[:] + self._program_args = list(program_args) self._allowed_exit_code = allowed_exit_code self._allowed_exit_message = allowed_exit_message if mode not in ("r", "rb", "w", "wb", "a", "ab"): raise ValueError( f"Mode is '{mode}', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'" ) + if ( + compresslevel is not None + and compresslevel not in acceptable_compression_levels + ): + raise ValueError( + f"compresslevel must be in {acceptable_compression_levels}." + ) path = os.fspath(path) if isinstance(path, bytes) and sys.platform == "win32": path = path.decode() @@ -401,217 +440,6 @@ def flush(self) -> None: return None -class PipedGzipProgram(PipedCompressionProgram): - """ - Write gzip-compressed files by running an external gzip process and - piping into it. On Python 3, gzip.GzipFile is on par with gzip itself, - but running an external gzip can still reduce wall-clock time because - the compression happens in a separate process. - """ - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - """ - if compresslevel is not None and compresslevel not in range(1, 10): - raise ValueError("compresslevel must be between 1 and 9") - super().__init__( - path, - ["gzip", "--no-name"], - mode, - compresslevel, - ) - - -class PipedPigzProgram(PipedCompressionProgram): - """ - Write gzip-compressed files by running an external pigz process and - piping into it. pigz can compress using multiple cores. It is also more - efficient than gzip on only one core. (But then igzip is even faster and - should be preferred if the compression level allows it.) - """ - - _accepted_compression_levels: Set[int] = set(list(range(10)) + [11]) - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - threads: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - threads (int) -- number of pigz threads. If this is set to None, a reasonable default is - used. At the moment, this means that the number of available CPU cores is used, capped - at four to avoid creating too many threads. Use 0 to let pigz use all available cores. - """ - if ( - compresslevel is not None - and compresslevel not in self._accepted_compression_levels - ): - raise ValueError("compresslevel must be between 0 and 9 or 11") - super().__init__( - path, - ["pigz", "--no-name"], - mode, - compresslevel, - "-p", - threads, - ) - - -class PipedPBzip2Program(PipedCompressionProgram): - """ - Write bzip2-compressed files by running an external pbzip2 process and - piping into it. pbzip2 can compress using multiple cores. - """ - - def __init__( - self, - path, - mode: str = "rb", - threads: Optional[int] = None, - ): - # Use default compression level for pbzip2: 9 - super().__init__( - path, - ["pbzip2"], - mode, - 9, - "-p", - threads, - allowed_exit_code=None, - allowed_exit_message=b"\n *Control-C or similar caught [sig=15], quitting...", - ) - - -class PipedXzProgram(PipedCompressionProgram): - """ - Write xz-compressed files by running an external xz process and - piping into it. xz can compress using multiple cores. - """ - - _accepted_compression_levels: Set[int] = set(range(10)) - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - threads: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - threads (int) -- number of xz threads. If this is set to None, a reasonable default is - used. At the moment, this means that the number of available CPU cores is used, capped - at four to avoid creating too many threads. Use 0 to let xz use all available cores. - """ - if ( - compresslevel is not None - and compresslevel not in self._accepted_compression_levels - ): - raise ValueError("compresslevel must be between 0 and 9") - super().__init__(path, ["xz"], mode, compresslevel, "-T", threads) - - -class PipedZstdProgram(PipedCompressionProgram): - """ - Write Zstandard-compressed files by running an external xz process and - piping into it. xz can compress using multiple cores. - """ - - _accepted_compression_levels: Set[int] = set(range(1, 20)) - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - threads: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - threads (int) -- number of zstd threads. If this is set to None, a reasonable default is - used. At the moment, this means that the number of available CPU cores is used, capped - at four to avoid creating too many threads. Use 0 to let zstd use all available cores. - """ - if ( - compresslevel is not None - and compresslevel not in self._accepted_compression_levels - ): - raise ValueError("compresslevel must be between 1 and 19") - super().__init__( - path, - ["zstd"], - mode, - compresslevel, - "-T", - threads, - ) - - -class PipedIGzipProgram(PipedCompressionProgram): - """ - Uses igzip for writing a gzipped file. This is much faster than either - gzip or pigz which were written to run on a wide array of systems. igzip - can only run on x86 and ARM architectures, but is able to use more - architecture-specific optimizations as a result. - - Threads are supported by a flag, but do not add any speed. Also on some - distro version (isal package in debian buster) the thread flag is not - present. For these reason threads are omitted from the interface. - Only compresslevel 0-3 are supported and these output slightly different - filesizes from their pigz/gzip counterparts. - See: https://gist.github.com/rhpvorderman/4f1201c3f39518ff28dde45409eb696b - """ - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - ): - if "r" in mode and not _can_read_concatenated_gz("igzip"): - # Instead of elaborate version string checking once the problem is - # fixed, it is much easier to use this, "proof in the pudding" type - # of evaluation. - raise ValueError( - "This version of igzip does not support reading " - "concatenated gzip files and is therefore not " - "safe to use. See: https://github.com/intel/isa-l/issues/143" - ) - if compresslevel is not None and compresslevel not in range(0, 4): - raise ValueError("compresslevel must be between 0 and 3") - super().__init__( - path, - ["igzip", "--no-name"], - mode, - compresslevel, - ) - - -class PipedPythonIsalProgram(PipedCompressionProgram): - def __init__(self, path, mode: str = "rb", compresslevel: Optional[int] = None): - if compresslevel is not None and compresslevel not in range(0, 4): - raise ValueError("compresslevel must be between 0 and 3") - super().__init__( - path, - [sys.executable, "-m", "isal.igzip", "--no-name"], - mode, - compresslevel, - ) - - def _open_stdin_or_out(mode: str): assert "b" in mode std = sys.stdout if "w" in mode else sys.stdin @@ -622,7 +450,9 @@ def _open_bz2(filename, mode: str, threads: Optional[int]): assert "b" in mode if threads != 0: try: - return PipedPBzip2Program(filename, mode, threads) + return _PipedCompressionProgram( + filename, mode, threads=threads, **_program_settings("pbzip2") + ) except OSError: pass # We try without threads. @@ -641,7 +471,9 @@ def _open_xz( if threads != 0: try: - return PipedXzProgram(filename, mode, compresslevel, threads) + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, **_program_settings("xz") + ) except OSError: pass # We try without threads. @@ -664,7 +496,9 @@ def _open_zst( # noqa: C901 compresslevel = 3 if threads != 0: try: - return PipedZstdProgram(filename, mode, compresslevel, threads) + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, **_program_settings("zstd") + ) except OSError: if zstandard is None: # No fallback available @@ -717,9 +551,13 @@ def _open_gz( # noqa: C901 pass try: try: - return PipedPigzProgram(filename, mode, compresslevel, threads=threads) + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, **_program_settings("pigz") + ) except OSError: - return PipedGzipProgram(filename, mode, compresslevel) + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, **_program_settings("gzip") + ) except OSError: pass # We try without threads. diff --git a/tests/test_piped.py b/tests/test_piped.py index fafd047..c62e3aa 100644 --- a/tests/test_piped.py +++ b/tests/test_piped.py @@ -13,17 +13,9 @@ from xopen import ( xopen, - PipedCompressionProgram, - PipedGzipProgram, - PipedPBzip2Program, - PipedPigzProgram, - PipedIGzipProgram, - PipedPythonIsalProgram, - PipedXzProgram, - PipedZstdProgram, + _PipedCompressionProgram, _MAX_PIPE_SIZE, - _can_read_concatenated_gz, - igzip, + _program_settings, ) extensions = ["", ".gz", ".bz2", ".xz", ".zst"] @@ -44,37 +36,24 @@ def available_gzip_programs(): - programs = [ - klass - for prog, klass in [ - ("gzip", PipedGzipProgram), - ("pigz", PipedPigzProgram), - ("igzip", PipedIGzipProgram), - ] - if shutil.which(prog) - ] - if PipedIGzipProgram in programs and not _can_read_concatenated_gz("igzip"): - programs.remove(PipedIGzipProgram) - if igzip is not None: - programs.append(PipedPythonIsalProgram) - return programs + return [_program_settings(prog) for prog in ("gzip", "pigz") if shutil.which(prog)] def available_bzip2_programs(): if shutil.which("pbzip2"): - return [PipedPBzip2Program] + return [_program_settings("pbzip2")] return [] def available_xz_programs(): if shutil.which("xz"): - return [PipedXzProgram] + return [_program_settings("xz")] return [] def available_zstd_programs(): if shutil.which("zstd"): - return [PipedZstdProgram] + return [_program_settings("zstd")] return [] @@ -91,9 +70,11 @@ def available_zstd_programs(): ) -THREADED_PROGRAMS = {(PipedPigzProgram, ".gz"), (PipedPBzip2Program, ".bz2")} & set( - ALL_PROGRAMS_WITH_EXTENSION -) +THREADED_PROGRAMS = [ + settings + for settings in ALL_PROGRAMS_WITH_EXTENSION + if "pbzip2" in settings[0]["program_args"] or "pigz" in settings[0]["program_args"] +] @pytest.fixture(params=PIPED_GZIP_PROGRAMS) @@ -117,74 +98,88 @@ def writer(request): def test_reader_readinto(reader): - opener, extension = reader - content = CONTENT - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: - b = bytearray(len(content) + 100) + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", **program_settings + ) as f: + b = bytearray(len(CONTENT) + 100) length = f.readinto(b) - assert length == len(content) - assert b[:length] == content + assert length == len(CONTENT) + assert b[:length] == CONTENT def test_reader_textiowrapper(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", **program_settings + ) as f: wrapped = io.TextIOWrapper(f, encoding="utf-8") assert wrapped.read() == CONTENT.decode("utf-8") def test_reader_readline(reader): - opener, extension = reader - first_line = CONTENT_LINES[0] - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: - assert f.readline() == first_line + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", + "rb", + **program_settings, + ) as f: + assert f.readline() == CONTENT_LINES[0] def test_reader_readlines(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "r") as f: + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", **program_settings + ) as f: assert f.readlines() == CONTENT_LINES @pytest.mark.parametrize("threads", [None, 1, 2]) def test_piped_reader_iter(threads, threaded_reader): - opener, extension = threaded_reader - with opener(TEST_DIR / f"file.txt{extension}", mode="r", threads=threads) as f: + program_settings, extension = threaded_reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", + "rb", + **program_settings, + ) as f: lines = list(f) assert lines[0] == CONTENT_LINES[0] def test_writer(tmp_path, writer): - opener, extension = writer - print(opener, writer) - print(repr(opener)) + program_settings, extension = writer path = tmp_path / f"out{extension}" - with opener(path, mode="wb") as f: - print(f) + with _PipedCompressionProgram(path, mode="wb", **program_settings) as f: f.write(b"hello") with xopen(path, mode="rb") as f: assert f.read() == b"hello" def test_writer_has_iter_method(tmp_path, writer): - opener, extension = writer - with opener(tmp_path / f"out{extension}", "wb") as f: + program_settings, extension = writer + path = tmp_path / f"out{extension}" + with _PipedCompressionProgram( + path, + mode="wb", + **program_settings, + ) as f: f.write(b"hello") assert hasattr(f, "__iter__") def test_reader_iter_without_with(reader): - opener, extension = reader - f = opener(TEST_DIR / f"file.txt{extension}") + program_settings, extension = reader + f = _PipedCompressionProgram(TEST_DIR / f"file.txt{extension}", **program_settings) it = iter(f) assert CONTENT_LINES[0] == next(it) f.close() def test_reader_close(reader, create_large_file): - reader, extension = reader + program_settings, extension = reader large_file = create_large_file(extension) - with reader(large_file, mode="rb") as f: + with _PipedCompressionProgram(large_file, "rb", **program_settings) as f: f.readline() time.sleep(0.2) # The subprocess should be properly terminated now @@ -192,81 +187,97 @@ def test_reader_close(reader, create_large_file): def test_invalid_gzip_compression_level(gzip_writer, tmp_path): with pytest.raises(ValueError) as e: - with gzip_writer(tmp_path / "out.gz", mode="w", compresslevel=17) as f: - f.write("hello") # pragma: no cover + with _PipedCompressionProgram( + tmp_path / "out.gz", + mode="w", + compresslevel=17, + **gzip_writer, + ) as f: + f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] def test_invalid_xz_compression_level(tmp_path): with pytest.raises(ValueError) as e: - with PipedXzProgram(tmp_path / "out.xz", mode="w", compresslevel=10) as f: - f.write("hello") # pragma: no cover + with _PipedCompressionProgram( + tmp_path / "out.xz", + mode="w", + compresslevel=17, + **_program_settings("xz"), + ) as f: + f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] def test_invalid_zstd_compression_level(tmp_path): with pytest.raises(ValueError) as e: - with PipedZstdProgram(tmp_path / "out.zst", mode="w", compresslevel=25) as f: - f.write("hello") # pragma: no cover + with _PipedCompressionProgram( + tmp_path / "out.zst", + mode="w", + compresslevel=25, + **_program_settings("zstd"), + ) as f: + f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] def test_readers_read(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", **program_settings + ) as f: assert f.read() == CONTENT -@pytest.mark.skipif( - sys.platform.startswith("win"), - reason="Windows does not have a gzip application by default.", -) -def test_concatenated_gzip_function(): - assert _can_read_concatenated_gz("gzip") is True - assert _can_read_concatenated_gz("pigz") is True - assert _can_read_concatenated_gz("cat") is False - - @pytest.mark.skipif( not hasattr(fcntl, "F_GETPIPE_SZ") or _MAX_PIPE_SIZE is None, reason="Pipe size modifications not available on this platform.", ) -def test_pipesize_changed(tmp_path, monkeypatch): +def test_pipesize_changed(tmp_path): # Higher compression level to avoid opening with threaded opener - with PipedGzipProgram(tmp_path / "hello.gz", "wb", compresslevel=5) as f: - assert isinstance(f, PipedCompressionProgram) + with _PipedCompressionProgram(tmp_path / "hello.gz", "wb", compresslevel=5) as f: assert fcntl.fcntl(f._file.fileno(), fcntl.F_GETPIPE_SZ) == _MAX_PIPE_SIZE def test_pipedcompressionwriter_wrong_mode(tmp_path): with pytest.raises(ValueError) as error: - PipedCompressionProgram(tmp_path / "test", ["gzip"], "xb") + _PipedCompressionProgram(tmp_path / "test", "xb") error.match("Mode is 'xb', but it must be") def test_pipedcompressionwriter_wrong_program(tmp_path): with pytest.raises(OSError): - PipedCompressionProgram(tmp_path / "test", ["XVXCLSKDLA"], "wb") + _PipedCompressionProgram( + tmp_path / "test", + "wb", + program_args=[ + "XVXCLSKDLA", + ], + ) def test_compression_level(tmp_path, gzip_writer): # Currently only the gzip writers handle compression levels. path = tmp_path / "test.gz" - with gzip_writer(path, "wb", 2) as test_h: + with _PipedCompressionProgram(path, "wb", 2, **gzip_writer) as test_h: test_h.write(b"test") assert gzip.decompress(path.read_bytes()) == b"test" def test_iter_method_writers(writer, tmp_path): - opener, extension = writer - writer = opener(tmp_path / f"test{extension}", "wb") + program_settings, extension = writer + writer = _PipedCompressionProgram( + tmp_path / f"test{extension}", "wb", **program_settings + ) assert iter(writer) == writer writer.close() def test_next_method_writers(writer, tmp_path): - opener, extension = writer - writer = opener(tmp_path / f"test{extension}", "wb") + program_settings, extension = writer + writer = _PipedCompressionProgram( + tmp_path / f"test{extension}", "wb", **program_settings + ) with pytest.raises(io.UnsupportedOperation) as error: next(writer) error.match("read") @@ -275,14 +286,14 @@ def test_next_method_writers(writer, tmp_path): def test_pipedcompressionprogram_wrong_mode(): with pytest.raises(ValueError) as error: - PipedCompressionProgram("test", ["gzip"], "xb") + _PipedCompressionProgram("test", "xb") error.match("Mode is 'xb', but it must be") def test_piped_compression_reader_peek_binary(reader): - opener, extension = reader + program_settings, extension = reader filegz = TEST_DIR / f"file.txt{extension}" - with opener(filegz, "rb") as read_h: + with _PipedCompressionProgram(filegz, "rb", **program_settings) as read_h: # Peek returns at least the amount of characters but maybe more # depending on underlying stream. Hence startswith not ==. assert read_h.peek(1).startswith(b"T") @@ -292,9 +303,9 @@ def test_piped_compression_reader_peek_binary(reader): sys.platform != "win32", reason="seeking only works on Windows for now" ) def test_piped_compression_reader_seek_and_tell(reader): - opener, extension = reader + program_settings, extension = reader filegz = TEST_DIR / f"file.txt{extension}" - with opener(filegz, "rb") as f: + with _PipedCompressionProgram(filegz, "rb", **program_settings) as f: original_position = f.tell() assert f.read(4) == b"Test" f.seek(original_position) @@ -303,23 +314,20 @@ def test_piped_compression_reader_seek_and_tell(reader): @pytest.mark.parametrize("mode", ["r", "rb"]) def test_piped_compression_reader_peek_text(reader, mode): - opener, extension = reader + program_settings, extension = reader compressed_file = TEST_DIR / f"file.txt{extension}" - with opener(compressed_file, mode) as read_h: + with _PipedCompressionProgram(compressed_file, mode, **program_settings) as read_h: assert read_h.peek(1)[0] == CONTENT[0] def writers_and_levels(): for writer in PIPED_GZIP_PROGRAMS: - if writer == PipedGzipProgram: + if "gzip" in writer["program_args"]: # Levels 1-9 are supported yield from ((writer, i) for i in range(1, 10)) - elif writer == PipedPigzProgram: + elif "pigz" in writer["program_args"]: # Levels 0-9 + 11 are supported yield from ((writer, i) for i in list(range(10)) + [11]) - elif writer == PipedIGzipProgram or writer == PipedPythonIsalProgram: - # Levels 0-3 are supported - yield from ((writer, i) for i in range(4)) else: raise NotImplementedError( f"Test should be implemented for " f"{writer}" @@ -329,14 +337,14 @@ def writers_and_levels(): @pytest.mark.parametrize(["writer", "level"], writers_and_levels()) def test_valid_compression_levels(writer, level, tmp_path): path = tmp_path / "test.gz" - with writer(path, "wb", level) as handle: + with _PipedCompressionProgram(path, "wb", level, **writer) as handle: handle.write(b"test") assert gzip.decompress(path.read_bytes()) == b"test" def test_reproducible_gzip_compression(gzip_writer, tmp_path): path = tmp_path / "file.gz" - with gzip_writer(path, mode="wb") as f: + with _PipedCompressionProgram(path, mode="wb", **gzip_writer) as f: f.write(b"hello") data = path.read_bytes() @@ -347,14 +355,14 @@ def test_reproducible_gzip_compression(gzip_writer, tmp_path): def test_piped_tool_fails_on_close(tmp_path): # This test exercises the retcode != 0 case in PipedCompressionWriter.close() with pytest.raises(OSError) as e: - with PipedCompressionProgram( + with _PipedCompressionProgram( tmp_path / "out.txt", - [ + "wb", + program_args=[ sys.executable, "-c", "import sys\nfor line in sys.stdin: pass\nprint()\nsys.exit(5)", ], - mode="wb", ) as f: f.write(b"Hello") assert "exit code 5" in e.value.args[0] From 17a95f2dc7a628c1b1376701255412a75a7659ad Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 5 Feb 2024 16:53:09 +0100 Subject: [PATCH 2/9] Remove redundant _can_read_concatenated_gz function The igzip CLI program from ISA-L is no longer used --- src/xopen/__init__.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index e256ccc..b302a1c 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -166,30 +166,6 @@ def _set_pipe_size_to_max(fd: int) -> None: pass -def _can_read_concatenated_gz(program: str) -> bool: - """ - Check if a concatenated gzip file can be read properly. Not all deflate - programs handle this properly. - """ - fd, temp_path = tempfile.mkstemp(suffix=".gz", prefix="xopen.") - try: - # Create a concatenated gzip file. gzip.compress recreates the contents - # of a gzip file including header and trailer. - with open(temp_path, "wb") as temp_file: - temp_file.write(gzip.compress(b"AB") + gzip.compress(b"CD")) - try: - result = subprocess.run( - [program, "-c", "-d", temp_path], check=True, stderr=PIPE, stdout=PIPE - ) - return result.stdout == b"ABCD" - except subprocess.CalledProcessError: - # Program can't read zip - return False - finally: - os.close(fd) - os.remove(temp_path) - - class _PipedCompressionProgram(io.IOBase): """ Read and write compressed files by running an external process and piping into it. From f7420c048497bc5fc99a3252f1ebbac3d977c18c Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 5 Feb 2024 17:04:02 +0100 Subject: [PATCH 3/9] Make the PROGRAM_SETTINGS dict private --- src/xopen/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index b302a1c..a9e3243 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -109,7 +109,7 @@ class _ProgramSettings(typing.NamedTuple): allowed_exit_message: Optional[bytes] = None -PROGRAM_SETTINGS: Dict[str, _ProgramSettings] = { +_PROGRAM_SETTINGS: Dict[str, _ProgramSettings] = { "pbzip2": _ProgramSettings( ("pbzip2",), tuple(range(1, 10)), @@ -125,7 +125,7 @@ class _ProgramSettings(typing.NamedTuple): def _program_settings(program: str) -> Dict[str, Any]: - return PROGRAM_SETTINGS[program]._asdict() + return _PROGRAM_SETTINGS[program]._asdict() def _available_cpu_count() -> int: From a883bfc3a7f2b415770bd011142a6c24ac4b7749 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 5 Feb 2024 17:11:58 +0100 Subject: [PATCH 4/9] Write a changelog entry for the subclass removal --- README.rst | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 274c2e0..ed32868 100644 --- a/README.rst +++ b/README.rst @@ -186,8 +186,16 @@ Changelog in-development ~~~~~~~~~~~~~~~~~~~ -* #146: PipedCompressionReader/Writer are now binary-only. For text reading - they are wrapped in an ``io.TextIOWrapper`` in the ``xopen()`` function. +* #146, #147, #148: Various refactors for better code size and readability: + + * PipedCompressionReader/Writer are now combined _PipedCompressionProgram + class. + * _PipedCompressionProgram is binary-only. For text reading and writing + it is wrapped in an ``io.TextIOWrapper`` in the ``xopen()`` function. + * Classes that derive from PipedCompressionReader/Writer have been removed. +* #148: xopen's classes, variables and functions pertaining to piped reading + and writing are all made private by prefixing them with an underscore. + These are not part of the API and may change between releases. v1.9.0 (2024-01-31) ~~~~~~~~~~~~~~~~~~~ From 3c3175797ce67b2aa17ca1ebf12bf2710b3775dd Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 5 Feb 2024 18:30:11 +0100 Subject: [PATCH 5/9] Use immutable default rather than None for _program_args --- src/xopen/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index a9e3243..8b5e49e 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -177,7 +177,7 @@ def __init__( # noqa: C901 mode="rb", compresslevel: Optional[int] = None, threads: Optional[int] = None, - program_args: Optional[Sequence[str]] = None, + program_args: Sequence[str] = ("gzip", "--no-name"), threads_flag: Optional[str] = None, # This exit code is not interpreted as an error when terminating the process allowed_exit_code: Optional[int] = -signal.SIGTERM, @@ -195,8 +195,6 @@ def __init__( # noqa: C901 used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to use all available cores. """ - if program_args is None: - program_args = ("gzip", "--no-name") self._error_raised = False self._program_args = list(program_args) self._allowed_exit_code = allowed_exit_code From 403ade3012281295398547d696a0a268d371777a Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 6 Feb 2024 10:25:19 +0100 Subject: [PATCH 6/9] Make _open_gz piped program opening simpler --- src/xopen/__init__.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 8b5e49e..9602aa5 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -492,9 +492,7 @@ def _open_zst( # noqa: C901 return f -def _open_gz( # noqa: C901 - filename, mode: str, compresslevel, threads, **text_mode_kwargs -): +def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): assert "b" in mode if compresslevel is None: # Force the same compression level on every tool regardless of @@ -523,18 +521,14 @@ def _open_gz( # noqa: C901 ) except zlib_ng.error: # Bad compression level pass - try: + + for program in ("pigz", "gzip"): try: return _PipedCompressionProgram( - filename, mode, compresslevel, threads, **_program_settings("pigz") + filename, mode, compresslevel, threads, **_program_settings(program) ) except OSError: - return _PipedCompressionProgram( - filename, mode, compresslevel, threads, **_program_settings("gzip") - ) - except OSError: - pass # We try without threads. - + pass # We try without threads. return _open_reproducible_gzip( filename, mode=mode, From 6d0117cf8d85613ab04968017cb46288cc498146 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 9 Feb 2024 11:45:15 +0100 Subject: [PATCH 7/9] Pass _ProgramSettings class directly rather than using **kwargs --- src/xopen/__init__.py | 46 +++++++++------------- tests/test_piped.py | 89 ++++++++++++++++++++++++------------------- 2 files changed, 67 insertions(+), 68 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 9602aa5..f2deca9 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -8,6 +8,7 @@ "__version__", ] +import dataclasses import gzip import sys import io @@ -20,17 +21,13 @@ import subprocess import tempfile import time -import typing from subprocess import Popen, PIPE from typing import ( - Any, Dict, Optional, Union, TextIO, IO, - Sequence, - Container, overload, BinaryIO, Literal, @@ -96,9 +93,8 @@ FilePath = Union[str, bytes, os.PathLike] -# Rather than using a dict, use a NamedTuple with _asdict to enforce presence -# of certain members and type checking. -class _ProgramSettings(typing.NamedTuple): +@dataclasses.dataclass +class _ProgramSettings: program_args: Tuple[str, ...] acceptable_compression_levels: Tuple[int, ...] = tuple(range(1, 10)) threads_flag: Optional[str] = None @@ -124,10 +120,6 @@ class _ProgramSettings(typing.NamedTuple): } -def _program_settings(program: str) -> Dict[str, Any]: - return _PROGRAM_SETTINGS[program]._asdict() - - def _available_cpu_count() -> int: """ Number of available virtual or physical CPUs on this system @@ -177,14 +169,7 @@ def __init__( # noqa: C901 mode="rb", compresslevel: Optional[int] = None, threads: Optional[int] = None, - program_args: Sequence[str] = ("gzip", "--no-name"), - threads_flag: Optional[str] = None, - # This exit code is not interpreted as an error when terminating the process - allowed_exit_code: Optional[int] = -signal.SIGTERM, - # If this message is printed on stderr on terminating the process, - # it is not interpreted as an error - allowed_exit_message: Optional[bytes] = None, - acceptable_compression_levels: Container[int] = tuple(range(0, 10)), + program_settings: _ProgramSettings = _ProgramSettings(("gzip", "--no-name")), ): """ mode -- one of 'w', 'wb', 'a', 'ab' @@ -196,19 +181,19 @@ def __init__( # noqa: C901 at four to avoid creating too many threads. Use 0 to use all available cores. """ self._error_raised = False - self._program_args = list(program_args) - self._allowed_exit_code = allowed_exit_code - self._allowed_exit_message = allowed_exit_message + self._program_args = list(program_settings.program_args) + self._allowed_exit_code = program_settings.allowed_exit_code + self._allowed_exit_message = program_settings.allowed_exit_message if mode not in ("r", "rb", "w", "wb", "a", "ab"): raise ValueError( f"Mode is '{mode}', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'" ) if ( compresslevel is not None - and compresslevel not in acceptable_compression_levels + and compresslevel not in program_settings.acceptable_compression_levels ): raise ValueError( - f"compresslevel must be in {acceptable_compression_levels}." + f"compresslevel must be in {program_settings.acceptable_compression_levels}." ) path = os.fspath(path) if isinstance(path, bytes) and sys.platform == "win32": @@ -216,7 +201,7 @@ def __init__( # noqa: C901 self.name: str = str(path) self._mode: str = mode self._stderr = tempfile.TemporaryFile("w+b") - self._threads_flag: Optional[str] = threads_flag + self._threads_flag: Optional[str] = program_settings.threads_flag if threads is None: if "r" in mode: @@ -425,7 +410,10 @@ def _open_bz2(filename, mode: str, threads: Optional[int]): if threads != 0: try: return _PipedCompressionProgram( - filename, mode, threads=threads, **_program_settings("pbzip2") + filename, + mode, + threads=threads, + program_settings=_PROGRAM_SETTINGS["pbzip2"], ) except OSError: pass # We try without threads. @@ -446,7 +434,7 @@ def _open_xz( if threads != 0: try: return _PipedCompressionProgram( - filename, mode, compresslevel, threads, **_program_settings("xz") + filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["xz"] ) except OSError: pass # We try without threads. @@ -471,7 +459,7 @@ def _open_zst( # noqa: C901 if threads != 0: try: return _PipedCompressionProgram( - filename, mode, compresslevel, threads, **_program_settings("zstd") + filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["zstd"] ) except OSError: if zstandard is None: @@ -525,7 +513,7 @@ def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): for program in ("pigz", "gzip"): try: return _PipedCompressionProgram( - filename, mode, compresslevel, threads, **_program_settings(program) + filename, mode, compresslevel, threads, _PROGRAM_SETTINGS[program] ) except OSError: pass # We try without threads. diff --git a/tests/test_piped.py b/tests/test_piped.py index c62e3aa..9992d9a 100644 --- a/tests/test_piped.py +++ b/tests/test_piped.py @@ -15,7 +15,8 @@ xopen, _PipedCompressionProgram, _MAX_PIPE_SIZE, - _program_settings, + _PROGRAM_SETTINGS, + _ProgramSettings, ) extensions = ["", ".gz", ".bz2", ".xz", ".zst"] @@ -36,24 +37,24 @@ def available_gzip_programs(): - return [_program_settings(prog) for prog in ("gzip", "pigz") if shutil.which(prog)] + return [_PROGRAM_SETTINGS[prog] for prog in ("gzip", "pigz") if shutil.which(prog)] def available_bzip2_programs(): if shutil.which("pbzip2"): - return [_program_settings("pbzip2")] + return [_PROGRAM_SETTINGS["pbzip2"]] return [] def available_xz_programs(): if shutil.which("xz"): - return [_program_settings("xz")] + return [_PROGRAM_SETTINGS["xz"]] return [] def available_zstd_programs(): if shutil.which("zstd"): - return [_program_settings("zstd")] + return [_PROGRAM_SETTINGS["zstd"]] return [] @@ -73,7 +74,7 @@ def available_zstd_programs(): THREADED_PROGRAMS = [ settings for settings in ALL_PROGRAMS_WITH_EXTENSION - if "pbzip2" in settings[0]["program_args"] or "pigz" in settings[0]["program_args"] + if "pbzip2" in settings[0].program_args or "pigz" in settings[0].program_args ] @@ -100,7 +101,7 @@ def writer(request): def test_reader_readinto(reader): program_settings, extension = reader with _PipedCompressionProgram( - TEST_DIR / f"file.txt{extension}", "rb", **program_settings + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings ) as f: b = bytearray(len(CONTENT) + 100) length = f.readinto(b) @@ -111,7 +112,7 @@ def test_reader_readinto(reader): def test_reader_textiowrapper(reader): program_settings, extension = reader with _PipedCompressionProgram( - TEST_DIR / f"file.txt{extension}", "rb", **program_settings + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings ) as f: wrapped = io.TextIOWrapper(f, encoding="utf-8") assert wrapped.read() == CONTENT.decode("utf-8") @@ -122,7 +123,7 @@ def test_reader_readline(reader): with _PipedCompressionProgram( TEST_DIR / f"file.txt{extension}", "rb", - **program_settings, + program_settings=program_settings, ) as f: assert f.readline() == CONTENT_LINES[0] @@ -130,7 +131,7 @@ def test_reader_readline(reader): def test_reader_readlines(reader): program_settings, extension = reader with _PipedCompressionProgram( - TEST_DIR / f"file.txt{extension}", "rb", **program_settings + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings ) as f: assert f.readlines() == CONTENT_LINES @@ -141,7 +142,7 @@ def test_piped_reader_iter(threads, threaded_reader): with _PipedCompressionProgram( TEST_DIR / f"file.txt{extension}", "rb", - **program_settings, + program_settings=program_settings, ) as f: lines = list(f) assert lines[0] == CONTENT_LINES[0] @@ -150,7 +151,9 @@ def test_piped_reader_iter(threads, threaded_reader): def test_writer(tmp_path, writer): program_settings, extension = writer path = tmp_path / f"out{extension}" - with _PipedCompressionProgram(path, mode="wb", **program_settings) as f: + with _PipedCompressionProgram( + path, mode="wb", program_settings=program_settings + ) as f: f.write(b"hello") with xopen(path, mode="rb") as f: assert f.read() == b"hello" @@ -162,7 +165,7 @@ def test_writer_has_iter_method(tmp_path, writer): with _PipedCompressionProgram( path, mode="wb", - **program_settings, + program_settings=program_settings, ) as f: f.write(b"hello") assert hasattr(f, "__iter__") @@ -170,7 +173,9 @@ def test_writer_has_iter_method(tmp_path, writer): def test_reader_iter_without_with(reader): program_settings, extension = reader - f = _PipedCompressionProgram(TEST_DIR / f"file.txt{extension}", **program_settings) + f = _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", program_settings=program_settings + ) it = iter(f) assert CONTENT_LINES[0] == next(it) f.close() @@ -179,7 +184,9 @@ def test_reader_iter_without_with(reader): def test_reader_close(reader, create_large_file): program_settings, extension = reader large_file = create_large_file(extension) - with _PipedCompressionProgram(large_file, "rb", **program_settings) as f: + with _PipedCompressionProgram( + large_file, "rb", program_settings=program_settings + ) as f: f.readline() time.sleep(0.2) # The subprocess should be properly terminated now @@ -191,7 +198,7 @@ def test_invalid_gzip_compression_level(gzip_writer, tmp_path): tmp_path / "out.gz", mode="w", compresslevel=17, - **gzip_writer, + program_settings=gzip_writer, ) as f: f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] @@ -203,7 +210,7 @@ def test_invalid_xz_compression_level(tmp_path): tmp_path / "out.xz", mode="w", compresslevel=17, - **_program_settings("xz"), + program_settings=_PROGRAM_SETTINGS["xz"], ) as f: f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] @@ -215,7 +222,7 @@ def test_invalid_zstd_compression_level(tmp_path): tmp_path / "out.zst", mode="w", compresslevel=25, - **_program_settings("zstd"), + program_settings=_PROGRAM_SETTINGS["zstd"], ) as f: f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] @@ -224,7 +231,7 @@ def test_invalid_zstd_compression_level(tmp_path): def test_readers_read(reader): program_settings, extension = reader with _PipedCompressionProgram( - TEST_DIR / f"file.txt{extension}", "rb", **program_settings + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings ) as f: assert f.read() == CONTENT @@ -248,18 +255,16 @@ def test_pipedcompressionwriter_wrong_mode(tmp_path): def test_pipedcompressionwriter_wrong_program(tmp_path): with pytest.raises(OSError): _PipedCompressionProgram( - tmp_path / "test", - "wb", - program_args=[ - "XVXCLSKDLA", - ], + tmp_path / "test", "wb", program_settings=_ProgramSettings(("XVXCLSKDLA",)) ) def test_compression_level(tmp_path, gzip_writer): # Currently only the gzip writers handle compression levels. path = tmp_path / "test.gz" - with _PipedCompressionProgram(path, "wb", 2, **gzip_writer) as test_h: + with _PipedCompressionProgram( + path, "wb", 2, program_settings=gzip_writer + ) as test_h: test_h.write(b"test") assert gzip.decompress(path.read_bytes()) == b"test" @@ -267,7 +272,7 @@ def test_compression_level(tmp_path, gzip_writer): def test_iter_method_writers(writer, tmp_path): program_settings, extension = writer writer = _PipedCompressionProgram( - tmp_path / f"test{extension}", "wb", **program_settings + tmp_path / f"test{extension}", "wb", program_settings=program_settings ) assert iter(writer) == writer writer.close() @@ -276,7 +281,7 @@ def test_iter_method_writers(writer, tmp_path): def test_next_method_writers(writer, tmp_path): program_settings, extension = writer writer = _PipedCompressionProgram( - tmp_path / f"test{extension}", "wb", **program_settings + tmp_path / f"test{extension}", "wb", program_settings=program_settings ) with pytest.raises(io.UnsupportedOperation) as error: next(writer) @@ -293,7 +298,9 @@ def test_pipedcompressionprogram_wrong_mode(): def test_piped_compression_reader_peek_binary(reader): program_settings, extension = reader filegz = TEST_DIR / f"file.txt{extension}" - with _PipedCompressionProgram(filegz, "rb", **program_settings) as read_h: + with _PipedCompressionProgram( + filegz, "rb", program_settings=program_settings + ) as read_h: # Peek returns at least the amount of characters but maybe more # depending on underlying stream. Hence startswith not ==. assert read_h.peek(1).startswith(b"T") @@ -305,7 +312,7 @@ def test_piped_compression_reader_peek_binary(reader): def test_piped_compression_reader_seek_and_tell(reader): program_settings, extension = reader filegz = TEST_DIR / f"file.txt{extension}" - with _PipedCompressionProgram(filegz, "rb", **program_settings) as f: + with _PipedCompressionProgram(filegz, "rb", program_settings=program_settings) as f: original_position = f.tell() assert f.read(4) == b"Test" f.seek(original_position) @@ -316,16 +323,18 @@ def test_piped_compression_reader_seek_and_tell(reader): def test_piped_compression_reader_peek_text(reader, mode): program_settings, extension = reader compressed_file = TEST_DIR / f"file.txt{extension}" - with _PipedCompressionProgram(compressed_file, mode, **program_settings) as read_h: + with _PipedCompressionProgram( + compressed_file, mode, program_settings=program_settings + ) as read_h: assert read_h.peek(1)[0] == CONTENT[0] def writers_and_levels(): for writer in PIPED_GZIP_PROGRAMS: - if "gzip" in writer["program_args"]: + if "gzip" in writer.program_args: # Levels 1-9 are supported yield from ((writer, i) for i in range(1, 10)) - elif "pigz" in writer["program_args"]: + elif "pigz" in writer.program_args: # Levels 0-9 + 11 are supported yield from ((writer, i) for i in list(range(10)) + [11]) else: @@ -337,14 +346,14 @@ def writers_and_levels(): @pytest.mark.parametrize(["writer", "level"], writers_and_levels()) def test_valid_compression_levels(writer, level, tmp_path): path = tmp_path / "test.gz" - with _PipedCompressionProgram(path, "wb", level, **writer) as handle: + with _PipedCompressionProgram(path, "wb", level, program_settings=writer) as handle: handle.write(b"test") assert gzip.decompress(path.read_bytes()) == b"test" def test_reproducible_gzip_compression(gzip_writer, tmp_path): path = tmp_path / "file.gz" - with _PipedCompressionProgram(path, mode="wb", **gzip_writer) as f: + with _PipedCompressionProgram(path, mode="wb", program_settings=gzip_writer) as f: f.write(b"hello") data = path.read_bytes() @@ -358,11 +367,13 @@ def test_piped_tool_fails_on_close(tmp_path): with _PipedCompressionProgram( tmp_path / "out.txt", "wb", - program_args=[ - sys.executable, - "-c", - "import sys\nfor line in sys.stdin: pass\nprint()\nsys.exit(5)", - ], + program_settings=_ProgramSettings( + ( + sys.executable, + "-c", + "import sys\nfor line in sys.stdin: pass\nprint()\nsys.exit(5)", + ), + ), ) as f: f.write(b"Hello") assert "exit code 5" in e.value.args[0] From 765a833d25853bcb4e42b160eacd43d243433122 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 9 Feb 2024 11:48:22 +0100 Subject: [PATCH 8/9] Revert variable name change --- tests/test_piped.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_piped.py b/tests/test_piped.py index 9992d9a..ebd5aa3 100644 --- a/tests/test_piped.py +++ b/tests/test_piped.py @@ -100,13 +100,14 @@ def writer(request): def test_reader_readinto(reader): program_settings, extension = reader + content = CONTENT with _PipedCompressionProgram( TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings ) as f: - b = bytearray(len(CONTENT) + 100) + b = bytearray(len(content) + 100) length = f.readinto(b) - assert length == len(CONTENT) - assert b[:length] == CONTENT + assert length == len(content) + assert b[:length] == content def test_reader_textiowrapper(reader): From 1be7ad65af1909807470871217bb83c61aae3d7b Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 9 Feb 2024 11:59:35 +0100 Subject: [PATCH 9/9] Add commentary from lost docstrings in _open functions --- src/xopen/__init__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index f2deca9..1246833 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -409,6 +409,7 @@ def _open_bz2(filename, mode: str, threads: Optional[int]): assert "b" in mode if threads != 0: try: + # pbzip2 can compress using multiple cores. return _PipedCompressionProgram( filename, mode, @@ -433,6 +434,7 @@ def _open_xz( if threads != 0: try: + # xz can compress using multiple cores. return _PipedCompressionProgram( filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["xz"] ) @@ -458,6 +460,7 @@ def _open_zst( # noqa: C901 compresslevel = 3 if threads != 0: try: + # zstd can compress using multiple cores return _PipedCompressionProgram( filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["zstd"] ) @@ -481,6 +484,15 @@ def _open_zst( # noqa: C901 def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): + """ + Open a gzip file. The ISA-L library is preferred when applicable because + it is the fastest. Then zlib-ng which is not as fast, but supports all + compression levels. After that comes pigz, which can utilize multiple + threads and is more efficient than gzip, even on one core. gzip is chosen + when none of the alternatives are available. Despite it being able to use + only one core, it still finishes faster than using the builtin gzip library + as the (de)compression is moved to another thread. + """ assert "b" in mode if compresslevel is None: # Force the same compression level on every tool regardless of