From 8c43ffe616fa56df8aca747237411887fcd89435 Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Mon, 13 Feb 2023 21:31:49 -0500 Subject: [PATCH 1/3] TYP: Annotate openers Opener proxy methods now match io.BufferedIOBase prototypes. Remove some version checks for indexed-gzip < 0.8, which supported Python 3.6 while our minimum is now 3.8. A runtime-checkable protocol for .read()/.write() was the easiest way to accommodate weird file-likes that aren't IOBases. When indexed-gzip is typed, we may need to adjust the output of _gzip_open. --- nibabel/openers.py | 181 +++++++++++++++++++++------------- nibabel/tests/test_openers.py | 2 +- 2 files changed, 116 insertions(+), 67 deletions(-) diff --git a/nibabel/openers.py b/nibabel/openers.py index 5f2bb0cde7..3e3b2fb29f 100644 --- a/nibabel/openers.py +++ b/nibabel/openers.py @@ -7,34 +7,48 @@ # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## """Context manager openers for various fileobject types""" +from __future__ import annotations + import gzip -import warnings +import io +import typing as ty from bz2 import BZ2File from os.path import splitext -from packaging.version import Version - from nibabel.optpkg import optional_package -# is indexed_gzip present and modern? -try: - import indexed_gzip as igzip # type: ignore +if ty.TYPE_CHECKING: # pragma: no cover + from types import TracebackType - version = igzip.__version__ + import pyzstd + from _typeshed import WriteableBuffer - HAVE_INDEXED_GZIP = True + ModeRT = ty.Literal['r', 'rt'] + ModeRB = ty.Literal['rb'] + ModeWT = ty.Literal['w', 'wt'] + ModeWB = ty.Literal['wb'] + ModeR = ty.Union[ModeRT, ModeRB] + ModeW = ty.Union[ModeWT, ModeWB] + Mode = ty.Union[ModeR, ModeW] - # < 0.7 - no good - if Version(version) < Version('0.7.0'): - warnings.warn(f'indexed_gzip is present, but too old (>= 0.7.0 required): {version})') - HAVE_INDEXED_GZIP = False - # >= 0.8 SafeIndexedGzipFile renamed to IndexedGzipFile - elif Version(version) < Version('0.8.0'): - IndexedGzipFile = igzip.SafeIndexedGzipFile - else: - IndexedGzipFile = igzip.IndexedGzipFile - del igzip, version + OpenerDef = tuple[ty.Callable[..., io.IOBase], tuple[str, ...]] +else: + pyzstd = optional_package('pyzstd')[0] + + +@ty.runtime_checkable +class Fileish(ty.Protocol): + def read(self, size: int = -1, /) -> bytes: + ... # pragma: no cover + + def write(self, b: bytes, /) -> int | None: + ... # pragma: no cover + + +try: + from indexed_gzip import IndexedGzipFile # type: ignore + HAVE_INDEXED_GZIP = True except ImportError: # nibabel.openers.IndexedGzipFile is imported by nibabel.volumeutils # to detect compressed file types, so we give a fallback value here. @@ -49,35 +63,63 @@ class DeterministicGzipFile(gzip.GzipFile): to a modification time (``mtime``) of 0 seconds. """ - def __init__(self, filename=None, mode=None, compresslevel=9, fileobj=None, mtime=0): - # These two guards are copied from + def __init__( + self, + filename: str | None = None, + mode: Mode | None = None, + compresslevel: int = 9, + fileobj: io.FileIO | None = None, + mtime: int = 0, + ): + if mode is None: + mode = 'rb' + modestr: str = mode + + # These two guards are adapted from # https://github.com/python/cpython/blob/6ab65c6/Lib/gzip.py#L171-L174 - if mode and 'b' not in mode: - mode += 'b' + if 'b' not in modestr: + modestr = f'{mode}b' if fileobj is None: - fileobj = self.myfileobj = open(filename, mode or 'rb') + if filename is None: + raise TypeError('Must define either fileobj or filename') + # Cast because GzipFile.myfileobj has type io.FileIO while open returns ty.IO + fileobj = self.myfileobj = ty.cast(io.FileIO, open(filename, modestr)) return super().__init__( - filename='', mode=mode, compresslevel=compresslevel, fileobj=fileobj, mtime=mtime + filename='', + mode=modestr, + compresslevel=compresslevel, + fileobj=fileobj, + mtime=mtime, ) -def _gzip_open(filename, mode='rb', compresslevel=9, mtime=0, keep_open=False): +def _gzip_open( + filename: str, + mode: Mode = 'rb', + compresslevel: int = 9, + mtime: int = 0, + keep_open: bool = False, +) -> gzip.GzipFile: + + if not HAVE_INDEXED_GZIP or mode != 'rb': + gzip_file = DeterministicGzipFile(filename, mode, compresslevel, mtime=mtime) # use indexed_gzip if possible for faster read access. If keep_open == # True, we tell IndexedGzipFile to keep the file handle open. Otherwise # the IndexedGzipFile will close/open the file on each read. - if HAVE_INDEXED_GZIP and mode == 'rb': - gzip_file = IndexedGzipFile(filename, drop_handles=not keep_open) - - # Fall-back to built-in GzipFile else: - gzip_file = DeterministicGzipFile(filename, mode, compresslevel, mtime=mtime) + gzip_file = IndexedGzipFile(filename, drop_handles=not keep_open) return gzip_file -def _zstd_open(filename, mode='r', *, level_or_option=None, zstd_dict=None): - pyzstd = optional_package('pyzstd')[0] +def _zstd_open( + filename: str, + mode: Mode = 'r', + *, + level_or_option: int | dict | None = None, + zstd_dict: pyzstd.ZstdDict | None = None, +) -> pyzstd.ZstdFile: return pyzstd.ZstdFile(filename, mode, level_or_option=level_or_option, zstd_dict=zstd_dict) @@ -104,7 +146,7 @@ class Opener: gz_def = (_gzip_open, ('mode', 'compresslevel', 'mtime', 'keep_open')) bz2_def = (BZ2File, ('mode', 'buffering', 'compresslevel')) zstd_def = (_zstd_open, ('mode', 'level_or_option', 'zstd_dict')) - compress_ext_map = { + compress_ext_map: dict[str | None, OpenerDef] = { '.gz': gz_def, '.bz2': bz2_def, '.zst': zstd_def, @@ -121,19 +163,19 @@ class Opener: 'w': default_zst_compresslevel, } #: whether to ignore case looking for compression extensions - compress_ext_icase = True + compress_ext_icase: bool = True + + fobj: io.IOBase - def __init__(self, fileish, *args, **kwargs): - if self._is_fileobj(fileish): + def __init__(self, fileish: str | io.IOBase, *args, **kwargs): + if isinstance(fileish, (io.IOBase, Fileish)): self.fobj = fileish self.me_opened = False - self._name = None + self._name = getattr(fileish, 'name', None) return opener, arg_names = self._get_opener_argnames(fileish) # Get full arguments to check for mode and compresslevel - full_kwargs = kwargs.copy() - n_args = len(args) - full_kwargs.update(dict(zip(arg_names[:n_args], args))) + full_kwargs = {**kwargs, **dict(zip(arg_names, args))} # Set default mode if 'mode' not in full_kwargs: mode = 'rb' @@ -155,7 +197,7 @@ def __init__(self, fileish, *args, **kwargs): self._name = fileish self.me_opened = True - def _get_opener_argnames(self, fileish): + def _get_opener_argnames(self, fileish: str) -> OpenerDef: _, ext = splitext(fileish) if self.compress_ext_icase: ext = ext.lower() @@ -168,16 +210,12 @@ def _get_opener_argnames(self, fileish): return self.compress_ext_map[ext] return self.compress_ext_map[None] - def _is_fileobj(self, obj): - """Is `obj` a file-like object?""" - return hasattr(obj, 'read') and hasattr(obj, 'write') - @property - def closed(self): + def closed(self) -> bool: return self.fobj.closed @property - def name(self): + def name(self) -> str | None: """Return ``self.fobj.name`` or self._name if not present self._name will be None if object was created with a fileobj, otherwise @@ -186,42 +224,53 @@ def name(self): return self._name @property - def mode(self): - return self.fobj.mode + def mode(self) -> str: + # Check and raise our own error for type narrowing purposes + if hasattr(self.fobj, 'mode'): + return self.fobj.mode + raise AttributeError(f'{self.fobj.__class__.__name__} has no attribute "mode"') - def fileno(self): + def fileno(self) -> int: return self.fobj.fileno() - def read(self, *args, **kwargs): - return self.fobj.read(*args, **kwargs) + def read(self, size: int = -1, /) -> bytes: + return self.fobj.read(size) - def readinto(self, *args, **kwargs): - return self.fobj.readinto(*args, **kwargs) + def readinto(self, buffer: WriteableBuffer, /) -> int | None: + # Check and raise our own error for type narrowing purposes + if hasattr(self.fobj, 'readinto'): + return self.fobj.readinto(buffer) + raise AttributeError(f'{self.fobj.__class__.__name__} has no attribute "readinto"') - def write(self, *args, **kwargs): - return self.fobj.write(*args, **kwargs) + def write(self, b: bytes, /) -> int | None: + return self.fobj.write(b) - def seek(self, *args, **kwargs): - return self.fobj.seek(*args, **kwargs) + def seek(self, pos: int, whence: int = 0, /) -> int: + return self.fobj.seek(pos, whence) - def tell(self, *args, **kwargs): - return self.fobj.tell(*args, **kwargs) + def tell(self, /) -> int: + return self.fobj.tell() - def close(self, *args, **kwargs): - return self.fobj.close(*args, **kwargs) + def close(self, /) -> None: + return self.fobj.close() - def __iter__(self): + def __iter__(self) -> ty.Iterator[bytes]: return iter(self.fobj) - def close_if_mine(self): + def close_if_mine(self) -> None: """Close ``self.fobj`` iff we opened it in the constructor""" if self.me_opened: self.close() - def __enter__(self): + def __enter__(self) -> Opener: return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: self.close_if_mine() diff --git a/nibabel/tests/test_openers.py b/nibabel/tests/test_openers.py index b4f71f2501..893c5f4f88 100644 --- a/nibabel/tests/test_openers.py +++ b/nibabel/tests/test_openers.py @@ -38,7 +38,7 @@ def __init__(self, message): def write(self): pass - def read(self): + def read(self, size=-1, /): return self.message From ece10ac88ebaa7346e4fdf87fc875cc6aa02ba59 Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Mon, 13 Feb 2023 23:39:35 -0500 Subject: [PATCH 2/3] TYP: Annotate fileholders --- nibabel/filebasedimages.py | 3 +-- nibabel/fileholders.py | 27 ++++++++++++++++++--------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/nibabel/filebasedimages.py b/nibabel/filebasedimages.py index 6e4ea86135..7e289bfa48 100644 --- a/nibabel/filebasedimages.py +++ b/nibabel/filebasedimages.py @@ -16,12 +16,11 @@ from typing import Type from urllib import request -from .fileholders import FileHolder +from .fileholders import FileHolder, FileMap from .filename_parser import TypesFilenamesError, splitext_addext, types_filenames from .openers import ImageOpener FileSpec = ty.Union[str, os.PathLike] -FileMap = ty.Mapping[str, FileHolder] FileSniff = ty.Tuple[bytes, str] ImgT = ty.TypeVar('ImgT', bound='FileBasedImage') diff --git a/nibabel/fileholders.py b/nibabel/fileholders.py index 691d31ecff..a27715350d 100644 --- a/nibabel/fileholders.py +++ b/nibabel/fileholders.py @@ -7,6 +7,10 @@ # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## """Fileholder class""" +from __future__ import annotations + +import io +import typing as ty from copy import copy from .openers import ImageOpener @@ -19,7 +23,12 @@ class FileHolderError(Exception): class FileHolder: """class to contain filename, fileobj and file position""" - def __init__(self, filename=None, fileobj=None, pos=0): + def __init__( + self, + filename: str | None = None, + fileobj: io.IOBase | None = None, + pos: int = 0, + ): """Initialize FileHolder instance Parameters @@ -37,7 +46,7 @@ def __init__(self, filename=None, fileobj=None, pos=0): self.fileobj = fileobj self.pos = pos - def get_prepare_fileobj(self, *args, **kwargs): + def get_prepare_fileobj(self, *args, **kwargs) -> ImageOpener: """Return fileobj if present, or return fileobj from filename Set position to that given in self.pos @@ -69,7 +78,7 @@ def get_prepare_fileobj(self, *args, **kwargs): raise FileHolderError('No filename or fileobj present') return obj - def same_file_as(self, other): + def same_file_as(self, other: FileHolder) -> bool: """Test if `self` refers to same files / fileobj as `other` Parameters @@ -86,12 +95,15 @@ def same_file_as(self, other): return (self.filename == other.filename) and (self.fileobj == other.fileobj) @property - def file_like(self): + def file_like(self) -> str | io.IOBase | None: """Return ``self.fileobj`` if not None, otherwise ``self.filename``""" return self.fileobj if self.fileobj is not None else self.filename -def copy_file_map(file_map): +FileMap = ty.Mapping[str, FileHolder] + + +def copy_file_map(file_map: FileMap) -> FileMap: r"""Copy mapping of fileholders given by `file_map` Parameters @@ -105,7 +117,4 @@ def copy_file_map(file_map): Copy of `file_map`, using shallow copy of ``FileHolder``\s """ - fm_copy = {} - for key, fh in file_map.items(): - fm_copy[key] = copy(fh) - return fm_copy + return {key: copy(fh) for key, fh in file_map.items()} From d13768f803ed9975c9ea8a3f0d5e82ddf187be03 Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Tue, 14 Feb 2023 08:50:47 -0500 Subject: [PATCH 3/3] TYP: Annotated filename_parser, move typedefs from filebasedimages --- nibabel/dataobj_images.py | 5 ++- nibabel/filebasedimages.py | 11 ++++--- nibabel/filename_parser.py | 66 +++++++++++++++++++++----------------- nibabel/spatialimages.py | 3 +- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/nibabel/dataobj_images.py b/nibabel/dataobj_images.py index f23daf5d8d..eaf341271e 100644 --- a/nibabel/dataobj_images.py +++ b/nibabel/dataobj_images.py @@ -15,11 +15,14 @@ from .arrayproxy import ArrayLike from .deprecated import deprecate_with_version -from .filebasedimages import FileBasedHeader, FileBasedImage, FileMap, FileSpec +from .filebasedimages import FileBasedHeader, FileBasedImage +from .fileholders import FileMap if ty.TYPE_CHECKING: # pragma: no cover import numpy.typing as npt + from .filename_parser import FileSpec + ArrayImgT = ty.TypeVar('ArrayImgT', bound='DataobjImage') diff --git a/nibabel/filebasedimages.py b/nibabel/filebasedimages.py index 7e289bfa48..685b11b79b 100644 --- a/nibabel/filebasedimages.py +++ b/nibabel/filebasedimages.py @@ -10,17 +10,18 @@ from __future__ import annotations import io -import os import typing as ty from copy import deepcopy from typing import Type from urllib import request from .fileholders import FileHolder, FileMap -from .filename_parser import TypesFilenamesError, splitext_addext, types_filenames +from .filename_parser import TypesFilenamesError, _stringify_path, splitext_addext, types_filenames from .openers import ImageOpener -FileSpec = ty.Union[str, os.PathLike] +if ty.TYPE_CHECKING: # pragma: no cover + from .filename_parser import ExtensionSpec, FileSpec + FileSniff = ty.Tuple[bytes, str] ImgT = ty.TypeVar('ImgT', bound='FileBasedImage') @@ -159,7 +160,7 @@ class FileBasedImage: header_class: Type[FileBasedHeader] = FileBasedHeader _header: FileBasedHeader _meta_sniff_len: int = 0 - files_types: tuple[tuple[str, str | None], ...] = (('image', None),) + files_types: tuple[ExtensionSpec, ...] = (('image', None),) valid_exts: tuple[str, ...] = () _compressed_suffixes: tuple[str, ...] = () @@ -410,7 +411,7 @@ def _sniff_meta_for( t_fnames = types_filenames( filename, klass.files_types, trailing_suffixes=klass._compressed_suffixes ) - meta_fname = t_fnames.get('header', filename) + meta_fname = t_fnames.get('header', _stringify_path(filename)) # Do not re-sniff if it would be from the same file if sniff is not None and sniff[1] == meta_fname: diff --git a/nibabel/filename_parser.py b/nibabel/filename_parser.py index c4e47ee72c..45c50d6830 100644 --- a/nibabel/filename_parser.py +++ b/nibabel/filename_parser.py @@ -7,15 +7,21 @@ # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## """Create filename pairs, triplets etc, with expected extensions""" +from __future__ import annotations + import os -import pathlib +import typing as ty + +if ty.TYPE_CHECKING: # pragma: no cover + FileSpec = str | os.PathLike[str] + ExtensionSpec = tuple[str, str | None] class TypesFilenamesError(Exception): pass -def _stringify_path(filepath_or_buffer): +def _stringify_path(filepath_or_buffer: FileSpec) -> str: """Attempt to convert a path-like object to a string. Parameters @@ -28,30 +34,21 @@ def _stringify_path(filepath_or_buffer): Notes ----- - Objects supporting the fspath protocol (python 3.6+) are coerced - according to its __fspath__ method. - For backwards compatibility with older pythons, pathlib.Path objects - are specially coerced. - Any other object is passed through unchanged, which includes bytes, - strings, buffers, or anything else that's not even path-like. - - Copied from: - https://github.com/pandas-dev/pandas/blob/325dd686de1589c17731cf93b649ed5ccb5a99b4/pandas/io/common.py#L131-L160 + Adapted from: + https://github.com/pandas-dev/pandas/blob/325dd68/pandas/io/common.py#L131-L160 """ - if hasattr(filepath_or_buffer, '__fspath__'): + if isinstance(filepath_or_buffer, os.PathLike): return filepath_or_buffer.__fspath__() - elif isinstance(filepath_or_buffer, pathlib.Path): - return str(filepath_or_buffer) return filepath_or_buffer def types_filenames( - template_fname, - types_exts, - trailing_suffixes=('.gz', '.bz2'), - enforce_extensions=True, - match_case=False, -): + template_fname: FileSpec, + types_exts: ty.Sequence[ExtensionSpec], + trailing_suffixes: ty.Sequence[str] = ('.gz', '.bz2'), + enforce_extensions: bool = True, + match_case: bool = False, +) -> dict[str, str]: """Return filenames with standard extensions from template name The typical case is returning image and header filenames for an @@ -152,12 +149,12 @@ def types_filenames( # we've found .IMG as the extension, we want .HDR as the matching # one. Let's only do this when the extension is all upper or all # lower case. - proc_ext = lambda s: s + proc_ext: ty.Callable[[str], str] = lambda s: s if found_ext: if found_ext == found_ext.upper(): - proc_ext = lambda s: s.upper() + proc_ext = str.upper elif found_ext == found_ext.lower(): - proc_ext = lambda s: s.lower() + proc_ext = str.lower for name, ext in types_exts: if name == direct_set_name: tfns[name] = template_fname @@ -171,7 +168,12 @@ def types_filenames( return tfns -def parse_filename(filename, types_exts, trailing_suffixes, match_case=False): +def parse_filename( + filename: FileSpec, + types_exts: ty.Sequence[ExtensionSpec], + trailing_suffixes: ty.Sequence[str], + match_case: bool = False, +) -> tuple[str, str, str | None, str | None]: """Split filename into fileroot, extension, trailing suffix; guess type. Parameters @@ -230,9 +232,9 @@ def parse_filename(filename, types_exts, trailing_suffixes, match_case=False): break guessed_name = None found_ext = None - for name, ext in types_exts: - if ext and endswith(filename, ext): - extpos = -len(ext) + for name, type_ext in types_exts: + if type_ext and endswith(filename, type_ext): + extpos = -len(type_ext) found_ext = filename[extpos:] filename = filename[:extpos] guessed_name = name @@ -242,15 +244,19 @@ def parse_filename(filename, types_exts, trailing_suffixes, match_case=False): return (filename, found_ext, ignored, guessed_name) -def _endswith(whole, end): +def _endswith(whole: str, end: str) -> bool: return whole.endswith(end) -def _iendswith(whole, end): +def _iendswith(whole: str, end: str) -> bool: return whole.lower().endswith(end.lower()) -def splitext_addext(filename, addexts=('.gz', '.bz2', '.zst'), match_case=False): +def splitext_addext( + filename: FileSpec, + addexts: ty.Sequence[str] = ('.gz', '.bz2', '.zst'), + match_case: bool = False, +) -> tuple[str, str, str]: """Split ``/pth/fname.ext.gz`` into ``/pth/fname, .ext, .gz`` where ``.gz`` may be any of passed `addext` trailing suffixes. diff --git a/nibabel/spatialimages.py b/nibabel/spatialimages.py index 4f3648c4d6..be347bd86f 100644 --- a/nibabel/spatialimages.py +++ b/nibabel/spatialimages.py @@ -140,7 +140,8 @@ from .arrayproxy import ArrayLike from .dataobj_images import DataobjImage -from .filebasedimages import FileBasedHeader, FileBasedImage, FileMap +from .filebasedimages import FileBasedHeader, FileBasedImage +from .fileholders import FileMap from .fileslice import canonical_slicers from .orientations import apply_orientation, inv_ornt_aff from .viewers import OrthoSlicer3D