diff --git a/README.rst b/README.rst index 274c2e0..ed32868 100644 --- a/README.rst +++ b/README.rst @@ -186,8 +186,16 @@ Changelog in-development ~~~~~~~~~~~~~~~~~~~ -* #146: PipedCompressionReader/Writer are now binary-only. For text reading - they are wrapped in an ``io.TextIOWrapper`` in the ``xopen()`` function. +* #146, #147, #148: Various refactors for better code size and readability: + + * PipedCompressionReader/Writer are now combined _PipedCompressionProgram + class. + * _PipedCompressionProgram is binary-only. For text reading and writing + it is wrapped in an ``io.TextIOWrapper`` in the ``xopen()`` function. + * Classes that derive from PipedCompressionReader/Writer have been removed. +* #148: xopen's classes, variables and functions pertaining to piped reading + and writing are all made private by prefixing them with an underscore. + These are not part of the API and may change between releases. v1.9.0 (2024-01-31) ~~~~~~~~~~~~~~~~~~~ diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 96223c4..1246833 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -4,17 +4,11 @@ __all__ = [ "xopen", - "PipedGzipProgram", - "PipedIGzipReader", - "PipedIGzipProgram", - "PipedPigzProgram", - "PipedPBzip2Program", - "PipedXzProgram", - "PipedZstdProgram", - "PipedPythonIsalProgram", + "_PipedCompressionProgram", "__version__", ] +import dataclasses import gzip import sys import io @@ -29,15 +23,15 @@ import time from subprocess import Popen, PIPE from typing import ( + Dict, Optional, Union, TextIO, IO, - List, - Set, overload, BinaryIO, Literal, + Tuple, ) from types import ModuleType @@ -99,6 +93,33 @@ FilePath = Union[str, bytes, os.PathLike] +@dataclasses.dataclass +class _ProgramSettings: + program_args: Tuple[str, ...] + acceptable_compression_levels: Tuple[int, ...] = tuple(range(1, 10)) + threads_flag: Optional[str] = None + # This exit code is not interpreted as an error when terminating the process + allowed_exit_code: Optional[int] = -signal.SIGTERM + # If this message is printed on stderr on terminating the process, + # it is not interpreted as an error + allowed_exit_message: Optional[bytes] = None + + +_PROGRAM_SETTINGS: Dict[str, _ProgramSettings] = { + "pbzip2": _ProgramSettings( + ("pbzip2",), + tuple(range(1, 10)), + "-p", + allowed_exit_code=None, + allowed_exit_message=b"\n *Control-C or similar caught [sig=15], quitting...", + ), + "xz": _ProgramSettings(("xz",), tuple(range(0, 10)), "-T"), + "zstd": _ProgramSettings(("zstd",), tuple(range(1, 20)), "-T"), + "pigz": _ProgramSettings(("pigz", "--no-name"), tuple(range(0, 10)) + (11,), "-p"), + "gzip": _ProgramSettings(("gzip", "--no-name"), tuple(range(1, 10))), +} + + def _available_cpu_count() -> int: """ Number of available virtual or physical CPUs on this system @@ -137,31 +158,7 @@ def _set_pipe_size_to_max(fd: int) -> None: pass -def _can_read_concatenated_gz(program: str) -> bool: - """ - Check if a concatenated gzip file can be read properly. Not all deflate - programs handle this properly. - """ - fd, temp_path = tempfile.mkstemp(suffix=".gz", prefix="xopen.") - try: - # Create a concatenated gzip file. gzip.compress recreates the contents - # of a gzip file including header and trailer. - with open(temp_path, "wb") as temp_file: - temp_file.write(gzip.compress(b"AB") + gzip.compress(b"CD")) - try: - result = subprocess.run( - [program, "-c", "-d", temp_path], check=True, stderr=PIPE, stdout=PIPE - ) - return result.stdout == b"ABCD" - except subprocess.CalledProcessError: - # Program can't read zip - return False - finally: - os.close(fd) - os.remove(temp_path) - - -class PipedCompressionProgram(io.IOBase): +class _PipedCompressionProgram(io.IOBase): """ Read and write compressed files by running an external process and piping into it. """ @@ -169,16 +166,10 @@ class PipedCompressionProgram(io.IOBase): def __init__( # noqa: C901 self, path: FilePath, - program_args: List[str], mode="rb", compresslevel: Optional[int] = None, - threads_flag: Optional[str] = None, threads: Optional[int] = None, - # This exit code is not interpreted as an error when terminating the process - allowed_exit_code: Optional[int] = -signal.SIGTERM, - # If this message is printed on stderr on terminating the process, - # it is not interpreted as an error - allowed_exit_message: Optional[bytes] = None, + program_settings: _ProgramSettings = _ProgramSettings(("gzip", "--no-name")), ): """ mode -- one of 'w', 'wb', 'a', 'ab' @@ -190,20 +181,27 @@ def __init__( # noqa: C901 at four to avoid creating too many threads. Use 0 to use all available cores. """ self._error_raised = False - self._program_args = program_args[:] - self._allowed_exit_code = allowed_exit_code - self._allowed_exit_message = allowed_exit_message + self._program_args = list(program_settings.program_args) + self._allowed_exit_code = program_settings.allowed_exit_code + self._allowed_exit_message = program_settings.allowed_exit_message if mode not in ("r", "rb", "w", "wb", "a", "ab"): raise ValueError( f"Mode is '{mode}', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'" ) + if ( + compresslevel is not None + and compresslevel not in program_settings.acceptable_compression_levels + ): + raise ValueError( + f"compresslevel must be in {program_settings.acceptable_compression_levels}." + ) path = os.fspath(path) if isinstance(path, bytes) and sys.platform == "win32": path = path.decode() self.name: str = str(path) self._mode: str = mode self._stderr = tempfile.TemporaryFile("w+b") - self._threads_flag: Optional[str] = threads_flag + self._threads_flag: Optional[str] = program_settings.threads_flag if threads is None: if "r" in mode: @@ -401,217 +399,6 @@ def flush(self) -> None: return None -class PipedGzipProgram(PipedCompressionProgram): - """ - Write gzip-compressed files by running an external gzip process and - piping into it. On Python 3, gzip.GzipFile is on par with gzip itself, - but running an external gzip can still reduce wall-clock time because - the compression happens in a separate process. - """ - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - """ - if compresslevel is not None and compresslevel not in range(1, 10): - raise ValueError("compresslevel must be between 1 and 9") - super().__init__( - path, - ["gzip", "--no-name"], - mode, - compresslevel, - ) - - -class PipedPigzProgram(PipedCompressionProgram): - """ - Write gzip-compressed files by running an external pigz process and - piping into it. pigz can compress using multiple cores. It is also more - efficient than gzip on only one core. (But then igzip is even faster and - should be preferred if the compression level allows it.) - """ - - _accepted_compression_levels: Set[int] = set(list(range(10)) + [11]) - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - threads: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - threads (int) -- number of pigz threads. If this is set to None, a reasonable default is - used. At the moment, this means that the number of available CPU cores is used, capped - at four to avoid creating too many threads. Use 0 to let pigz use all available cores. - """ - if ( - compresslevel is not None - and compresslevel not in self._accepted_compression_levels - ): - raise ValueError("compresslevel must be between 0 and 9 or 11") - super().__init__( - path, - ["pigz", "--no-name"], - mode, - compresslevel, - "-p", - threads, - ) - - -class PipedPBzip2Program(PipedCompressionProgram): - """ - Write bzip2-compressed files by running an external pbzip2 process and - piping into it. pbzip2 can compress using multiple cores. - """ - - def __init__( - self, - path, - mode: str = "rb", - threads: Optional[int] = None, - ): - # Use default compression level for pbzip2: 9 - super().__init__( - path, - ["pbzip2"], - mode, - 9, - "-p", - threads, - allowed_exit_code=None, - allowed_exit_message=b"\n *Control-C or similar caught [sig=15], quitting...", - ) - - -class PipedXzProgram(PipedCompressionProgram): - """ - Write xz-compressed files by running an external xz process and - piping into it. xz can compress using multiple cores. - """ - - _accepted_compression_levels: Set[int] = set(range(10)) - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - threads: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - threads (int) -- number of xz threads. If this is set to None, a reasonable default is - used. At the moment, this means that the number of available CPU cores is used, capped - at four to avoid creating too many threads. Use 0 to let xz use all available cores. - """ - if ( - compresslevel is not None - and compresslevel not in self._accepted_compression_levels - ): - raise ValueError("compresslevel must be between 0 and 9") - super().__init__(path, ["xz"], mode, compresslevel, "-T", threads) - - -class PipedZstdProgram(PipedCompressionProgram): - """ - Write Zstandard-compressed files by running an external xz process and - piping into it. xz can compress using multiple cores. - """ - - _accepted_compression_levels: Set[int] = set(range(1, 20)) - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - threads: Optional[int] = None, - ): - """ - mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' - compresslevel -- compression level - threads (int) -- number of zstd threads. If this is set to None, a reasonable default is - used. At the moment, this means that the number of available CPU cores is used, capped - at four to avoid creating too many threads. Use 0 to let zstd use all available cores. - """ - if ( - compresslevel is not None - and compresslevel not in self._accepted_compression_levels - ): - raise ValueError("compresslevel must be between 1 and 19") - super().__init__( - path, - ["zstd"], - mode, - compresslevel, - "-T", - threads, - ) - - -class PipedIGzipProgram(PipedCompressionProgram): - """ - Uses igzip for writing a gzipped file. This is much faster than either - gzip or pigz which were written to run on a wide array of systems. igzip - can only run on x86 and ARM architectures, but is able to use more - architecture-specific optimizations as a result. - - Threads are supported by a flag, but do not add any speed. Also on some - distro version (isal package in debian buster) the thread flag is not - present. For these reason threads are omitted from the interface. - Only compresslevel 0-3 are supported and these output slightly different - filesizes from their pigz/gzip counterparts. - See: https://gist.github.com/rhpvorderman/4f1201c3f39518ff28dde45409eb696b - """ - - def __init__( - self, - path, - mode: str = "rb", - compresslevel: Optional[int] = None, - ): - if "r" in mode and not _can_read_concatenated_gz("igzip"): - # Instead of elaborate version string checking once the problem is - # fixed, it is much easier to use this, "proof in the pudding" type - # of evaluation. - raise ValueError( - "This version of igzip does not support reading " - "concatenated gzip files and is therefore not " - "safe to use. See: https://github.com/intel/isa-l/issues/143" - ) - if compresslevel is not None and compresslevel not in range(0, 4): - raise ValueError("compresslevel must be between 0 and 3") - super().__init__( - path, - ["igzip", "--no-name"], - mode, - compresslevel, - ) - - -class PipedPythonIsalProgram(PipedCompressionProgram): - def __init__(self, path, mode: str = "rb", compresslevel: Optional[int] = None): - if compresslevel is not None and compresslevel not in range(0, 4): - raise ValueError("compresslevel must be between 0 and 3") - super().__init__( - path, - [sys.executable, "-m", "isal.igzip", "--no-name"], - mode, - compresslevel, - ) - - def _open_stdin_or_out(mode: str): assert "b" in mode std = sys.stdout if "w" in mode else sys.stdin @@ -622,7 +409,13 @@ def _open_bz2(filename, mode: str, threads: Optional[int]): assert "b" in mode if threads != 0: try: - return PipedPBzip2Program(filename, mode, threads) + # pbzip2 can compress using multiple cores. + return _PipedCompressionProgram( + filename, + mode, + threads=threads, + program_settings=_PROGRAM_SETTINGS["pbzip2"], + ) except OSError: pass # We try without threads. @@ -641,7 +434,10 @@ def _open_xz( if threads != 0: try: - return PipedXzProgram(filename, mode, compresslevel, threads) + # xz can compress using multiple cores. + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["xz"] + ) except OSError: pass # We try without threads. @@ -664,7 +460,10 @@ def _open_zst( # noqa: C901 compresslevel = 3 if threads != 0: try: - return PipedZstdProgram(filename, mode, compresslevel, threads) + # zstd can compress using multiple cores + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["zstd"] + ) except OSError: if zstandard is None: # No fallback available @@ -684,9 +483,16 @@ def _open_zst( # noqa: C901 return f -def _open_gz( # noqa: C901 - filename, mode: str, compresslevel, threads, **text_mode_kwargs -): +def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): + """ + Open a gzip file. The ISA-L library is preferred when applicable because + it is the fastest. Then zlib-ng which is not as fast, but supports all + compression levels. After that comes pigz, which can utilize multiple + threads and is more efficient than gzip, even on one core. gzip is chosen + when none of the alternatives are available. Despite it being able to use + only one core, it still finishes faster than using the builtin gzip library + as the (de)compression is moved to another thread. + """ assert "b" in mode if compresslevel is None: # Force the same compression level on every tool regardless of @@ -715,14 +521,14 @@ def _open_gz( # noqa: C901 ) except zlib_ng.error: # Bad compression level pass - try: + + for program in ("pigz", "gzip"): try: - return PipedPigzProgram(filename, mode, compresslevel, threads=threads) + return _PipedCompressionProgram( + filename, mode, compresslevel, threads, _PROGRAM_SETTINGS[program] + ) except OSError: - return PipedGzipProgram(filename, mode, compresslevel) - except OSError: - pass # We try without threads. - + pass # We try without threads. return _open_reproducible_gzip( filename, mode=mode, diff --git a/tests/test_piped.py b/tests/test_piped.py index fafd047..ebd5aa3 100644 --- a/tests/test_piped.py +++ b/tests/test_piped.py @@ -13,17 +13,10 @@ from xopen import ( xopen, - PipedCompressionProgram, - PipedGzipProgram, - PipedPBzip2Program, - PipedPigzProgram, - PipedIGzipProgram, - PipedPythonIsalProgram, - PipedXzProgram, - PipedZstdProgram, + _PipedCompressionProgram, _MAX_PIPE_SIZE, - _can_read_concatenated_gz, - igzip, + _PROGRAM_SETTINGS, + _ProgramSettings, ) extensions = ["", ".gz", ".bz2", ".xz", ".zst"] @@ -44,37 +37,24 @@ def available_gzip_programs(): - programs = [ - klass - for prog, klass in [ - ("gzip", PipedGzipProgram), - ("pigz", PipedPigzProgram), - ("igzip", PipedIGzipProgram), - ] - if shutil.which(prog) - ] - if PipedIGzipProgram in programs and not _can_read_concatenated_gz("igzip"): - programs.remove(PipedIGzipProgram) - if igzip is not None: - programs.append(PipedPythonIsalProgram) - return programs + return [_PROGRAM_SETTINGS[prog] for prog in ("gzip", "pigz") if shutil.which(prog)] def available_bzip2_programs(): if shutil.which("pbzip2"): - return [PipedPBzip2Program] + return [_PROGRAM_SETTINGS["pbzip2"]] return [] def available_xz_programs(): if shutil.which("xz"): - return [PipedXzProgram] + return [_PROGRAM_SETTINGS["xz"]] return [] def available_zstd_programs(): if shutil.which("zstd"): - return [PipedZstdProgram] + return [_PROGRAM_SETTINGS["zstd"]] return [] @@ -91,9 +71,11 @@ def available_zstd_programs(): ) -THREADED_PROGRAMS = {(PipedPigzProgram, ".gz"), (PipedPBzip2Program, ".bz2")} & set( - ALL_PROGRAMS_WITH_EXTENSION -) +THREADED_PROGRAMS = [ + settings + for settings in ALL_PROGRAMS_WITH_EXTENSION + if "pbzip2" in settings[0].program_args or "pigz" in settings[0].program_args +] @pytest.fixture(params=PIPED_GZIP_PROGRAMS) @@ -117,9 +99,11 @@ def writer(request): def test_reader_readinto(reader): - opener, extension = reader + program_settings, extension = reader content = CONTENT - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings + ) as f: b = bytearray(len(content) + 100) length = f.readinto(b) assert length == len(content) @@ -127,64 +111,83 @@ def test_reader_readinto(reader): def test_reader_textiowrapper(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings + ) as f: wrapped = io.TextIOWrapper(f, encoding="utf-8") assert wrapped.read() == CONTENT.decode("utf-8") def test_reader_readline(reader): - opener, extension = reader - first_line = CONTENT_LINES[0] - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: - assert f.readline() == first_line + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", + "rb", + program_settings=program_settings, + ) as f: + assert f.readline() == CONTENT_LINES[0] def test_reader_readlines(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "r") as f: + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings + ) as f: assert f.readlines() == CONTENT_LINES @pytest.mark.parametrize("threads", [None, 1, 2]) def test_piped_reader_iter(threads, threaded_reader): - opener, extension = threaded_reader - with opener(TEST_DIR / f"file.txt{extension}", mode="r", threads=threads) as f: + program_settings, extension = threaded_reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", + "rb", + program_settings=program_settings, + ) as f: lines = list(f) assert lines[0] == CONTENT_LINES[0] def test_writer(tmp_path, writer): - opener, extension = writer - print(opener, writer) - print(repr(opener)) + program_settings, extension = writer path = tmp_path / f"out{extension}" - with opener(path, mode="wb") as f: - print(f) + with _PipedCompressionProgram( + path, mode="wb", program_settings=program_settings + ) as f: f.write(b"hello") with xopen(path, mode="rb") as f: assert f.read() == b"hello" def test_writer_has_iter_method(tmp_path, writer): - opener, extension = writer - with opener(tmp_path / f"out{extension}", "wb") as f: + program_settings, extension = writer + path = tmp_path / f"out{extension}" + with _PipedCompressionProgram( + path, + mode="wb", + program_settings=program_settings, + ) as f: f.write(b"hello") assert hasattr(f, "__iter__") def test_reader_iter_without_with(reader): - opener, extension = reader - f = opener(TEST_DIR / f"file.txt{extension}") + program_settings, extension = reader + f = _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", program_settings=program_settings + ) it = iter(f) assert CONTENT_LINES[0] == next(it) f.close() def test_reader_close(reader, create_large_file): - reader, extension = reader + program_settings, extension = reader large_file = create_large_file(extension) - with reader(large_file, mode="rb") as f: + with _PipedCompressionProgram( + large_file, "rb", program_settings=program_settings + ) as f: f.readline() time.sleep(0.2) # The subprocess should be properly terminated now @@ -192,81 +195,95 @@ def test_reader_close(reader, create_large_file): def test_invalid_gzip_compression_level(gzip_writer, tmp_path): with pytest.raises(ValueError) as e: - with gzip_writer(tmp_path / "out.gz", mode="w", compresslevel=17) as f: - f.write("hello") # pragma: no cover + with _PipedCompressionProgram( + tmp_path / "out.gz", + mode="w", + compresslevel=17, + program_settings=gzip_writer, + ) as f: + f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] def test_invalid_xz_compression_level(tmp_path): with pytest.raises(ValueError) as e: - with PipedXzProgram(tmp_path / "out.xz", mode="w", compresslevel=10) as f: - f.write("hello") # pragma: no cover + with _PipedCompressionProgram( + tmp_path / "out.xz", + mode="w", + compresslevel=17, + program_settings=_PROGRAM_SETTINGS["xz"], + ) as f: + f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] def test_invalid_zstd_compression_level(tmp_path): with pytest.raises(ValueError) as e: - with PipedZstdProgram(tmp_path / "out.zst", mode="w", compresslevel=25) as f: - f.write("hello") # pragma: no cover + with _PipedCompressionProgram( + tmp_path / "out.zst", + mode="w", + compresslevel=25, + program_settings=_PROGRAM_SETTINGS["zstd"], + ) as f: + f.write(b"hello") # pragma: no cover assert "compresslevel must be" in e.value.args[0] def test_readers_read(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: + program_settings, extension = reader + with _PipedCompressionProgram( + TEST_DIR / f"file.txt{extension}", "rb", program_settings=program_settings + ) as f: assert f.read() == CONTENT -@pytest.mark.skipif( - sys.platform.startswith("win"), - reason="Windows does not have a gzip application by default.", -) -def test_concatenated_gzip_function(): - assert _can_read_concatenated_gz("gzip") is True - assert _can_read_concatenated_gz("pigz") is True - assert _can_read_concatenated_gz("cat") is False - - @pytest.mark.skipif( not hasattr(fcntl, "F_GETPIPE_SZ") or _MAX_PIPE_SIZE is None, reason="Pipe size modifications not available on this platform.", ) -def test_pipesize_changed(tmp_path, monkeypatch): +def test_pipesize_changed(tmp_path): # Higher compression level to avoid opening with threaded opener - with PipedGzipProgram(tmp_path / "hello.gz", "wb", compresslevel=5) as f: - assert isinstance(f, PipedCompressionProgram) + with _PipedCompressionProgram(tmp_path / "hello.gz", "wb", compresslevel=5) as f: assert fcntl.fcntl(f._file.fileno(), fcntl.F_GETPIPE_SZ) == _MAX_PIPE_SIZE def test_pipedcompressionwriter_wrong_mode(tmp_path): with pytest.raises(ValueError) as error: - PipedCompressionProgram(tmp_path / "test", ["gzip"], "xb") + _PipedCompressionProgram(tmp_path / "test", "xb") error.match("Mode is 'xb', but it must be") def test_pipedcompressionwriter_wrong_program(tmp_path): with pytest.raises(OSError): - PipedCompressionProgram(tmp_path / "test", ["XVXCLSKDLA"], "wb") + _PipedCompressionProgram( + tmp_path / "test", "wb", program_settings=_ProgramSettings(("XVXCLSKDLA",)) + ) def test_compression_level(tmp_path, gzip_writer): # Currently only the gzip writers handle compression levels. path = tmp_path / "test.gz" - with gzip_writer(path, "wb", 2) as test_h: + with _PipedCompressionProgram( + path, "wb", 2, program_settings=gzip_writer + ) as test_h: test_h.write(b"test") assert gzip.decompress(path.read_bytes()) == b"test" def test_iter_method_writers(writer, tmp_path): - opener, extension = writer - writer = opener(tmp_path / f"test{extension}", "wb") + program_settings, extension = writer + writer = _PipedCompressionProgram( + tmp_path / f"test{extension}", "wb", program_settings=program_settings + ) assert iter(writer) == writer writer.close() def test_next_method_writers(writer, tmp_path): - opener, extension = writer - writer = opener(tmp_path / f"test{extension}", "wb") + program_settings, extension = writer + writer = _PipedCompressionProgram( + tmp_path / f"test{extension}", "wb", program_settings=program_settings + ) with pytest.raises(io.UnsupportedOperation) as error: next(writer) error.match("read") @@ -275,14 +292,16 @@ def test_next_method_writers(writer, tmp_path): def test_pipedcompressionprogram_wrong_mode(): with pytest.raises(ValueError) as error: - PipedCompressionProgram("test", ["gzip"], "xb") + _PipedCompressionProgram("test", "xb") error.match("Mode is 'xb', but it must be") def test_piped_compression_reader_peek_binary(reader): - opener, extension = reader + program_settings, extension = reader filegz = TEST_DIR / f"file.txt{extension}" - with opener(filegz, "rb") as read_h: + with _PipedCompressionProgram( + filegz, "rb", program_settings=program_settings + ) as read_h: # Peek returns at least the amount of characters but maybe more # depending on underlying stream. Hence startswith not ==. assert read_h.peek(1).startswith(b"T") @@ -292,9 +311,9 @@ def test_piped_compression_reader_peek_binary(reader): sys.platform != "win32", reason="seeking only works on Windows for now" ) def test_piped_compression_reader_seek_and_tell(reader): - opener, extension = reader + program_settings, extension = reader filegz = TEST_DIR / f"file.txt{extension}" - with opener(filegz, "rb") as f: + with _PipedCompressionProgram(filegz, "rb", program_settings=program_settings) as f: original_position = f.tell() assert f.read(4) == b"Test" f.seek(original_position) @@ -303,23 +322,22 @@ def test_piped_compression_reader_seek_and_tell(reader): @pytest.mark.parametrize("mode", ["r", "rb"]) def test_piped_compression_reader_peek_text(reader, mode): - opener, extension = reader + program_settings, extension = reader compressed_file = TEST_DIR / f"file.txt{extension}" - with opener(compressed_file, mode) as read_h: + with _PipedCompressionProgram( + compressed_file, mode, program_settings=program_settings + ) as read_h: assert read_h.peek(1)[0] == CONTENT[0] def writers_and_levels(): for writer in PIPED_GZIP_PROGRAMS: - if writer == PipedGzipProgram: + if "gzip" in writer.program_args: # Levels 1-9 are supported yield from ((writer, i) for i in range(1, 10)) - elif writer == PipedPigzProgram: + elif "pigz" in writer.program_args: # Levels 0-9 + 11 are supported yield from ((writer, i) for i in list(range(10)) + [11]) - elif writer == PipedIGzipProgram or writer == PipedPythonIsalProgram: - # Levels 0-3 are supported - yield from ((writer, i) for i in range(4)) else: raise NotImplementedError( f"Test should be implemented for " f"{writer}" @@ -329,14 +347,14 @@ def writers_and_levels(): @pytest.mark.parametrize(["writer", "level"], writers_and_levels()) def test_valid_compression_levels(writer, level, tmp_path): path = tmp_path / "test.gz" - with writer(path, "wb", level) as handle: + with _PipedCompressionProgram(path, "wb", level, program_settings=writer) as handle: handle.write(b"test") assert gzip.decompress(path.read_bytes()) == b"test" def test_reproducible_gzip_compression(gzip_writer, tmp_path): path = tmp_path / "file.gz" - with gzip_writer(path, mode="wb") as f: + with _PipedCompressionProgram(path, mode="wb", program_settings=gzip_writer) as f: f.write(b"hello") data = path.read_bytes() @@ -347,14 +365,16 @@ def test_reproducible_gzip_compression(gzip_writer, tmp_path): def test_piped_tool_fails_on_close(tmp_path): # This test exercises the retcode != 0 case in PipedCompressionWriter.close() with pytest.raises(OSError) as e: - with PipedCompressionProgram( + with _PipedCompressionProgram( tmp_path / "out.txt", - [ - sys.executable, - "-c", - "import sys\nfor line in sys.stdin: pass\nprint()\nsys.exit(5)", - ], - mode="wb", + "wb", + program_settings=_ProgramSettings( + ( + sys.executable, + "-c", + "import sys\nfor line in sys.stdin: pass\nprint()\nsys.exit(5)", + ), + ), ) as f: f.write(b"Hello") assert "exit code 5" in e.value.args[0]