From f81364b9cd719c39233e94aeafbb9ecfd77711f5 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 17 Sep 2024 11:01:10 -0500 Subject: [PATCH 1/8] Default to RemoteStore for fsspec URIs --- src/zarr/api/asynchronous.py | 61 ++++++++++++++++++++++++------ src/zarr/api/synchronous.py | 2 + src/zarr/store/common.py | 41 ++++++++++++++++---- tests/v3/test_store/test_core.py | 9 +++++ tests/v3/test_store/test_remote.py | 41 ++++++++++++++++++++ 5 files changed, 134 insertions(+), 20 deletions(-) diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 8a1b0c5f36..422143110f 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -194,6 +194,7 @@ async def open( zarr_version: ZarrFormat | None = None, # deprecated zarr_format: ZarrFormat | None = None, path: str | None = None, + storage_options: dict[str, Any] | None = None, **kwargs: Any, # TODO: type kwargs as valid args to open_array ) -> AsyncArray | AsyncGroup: """Convenience function to open a group or array using file-mode-like semantics. @@ -211,6 +212,9 @@ async def open( The zarr format to use when saving. path : str or None, optional The path within the store to open. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. **kwargs Additional parameters are passed through to :func:`zarr.creation.open_array` or :func:`zarr.hierarchy.open_group`. @@ -221,7 +225,7 @@ async def open( Return type depends on what exists in the given store. """ zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format) - store_path = await make_store_path(store, mode=mode) + store_path = await make_store_path(store, mode=mode, storage_options=storage_options) if path is not None: store_path = store_path / path @@ -276,6 +280,7 @@ async def save_array( zarr_version: ZarrFormat | None = None, # deprecated zarr_format: ZarrFormat | None = None, path: str | None = None, + storage_options: dict[str, Any] | None = None, **kwargs: Any, # TODO: type kwargs as valid args to create ) -> None: """Convenience function to save a NumPy array to the local file system, following a @@ -291,6 +296,9 @@ async def save_array( The zarr format to use when saving. path : str or None, optional The path within the store where the array will be saved. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. kwargs Passed through to :func:`create`, e.g., compressor. """ @@ -299,7 +307,7 @@ async def save_array( or _default_zarr_version() ) - store_path = await make_store_path(store, mode="w") + store_path = await make_store_path(store, mode="w", storage_options=storage_options) if path is not None: store_path = store_path / path new = await AsyncArray.create( @@ -319,6 +327,7 @@ async def save_group( zarr_version: ZarrFormat | None = None, # deprecated zarr_format: ZarrFormat | None = None, path: str | None = None, + storage_options: dict[str, Any] | None = None, **kwargs: NDArrayLike, ) -> None: """Convenience function to save several NumPy arrays to the local file system, following a @@ -334,11 +343,17 @@ async def save_group( The zarr format to use when saving. path : str or None, optional Path within the store where the group will be saved. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. kwargs NumPy arrays with data to save. """ zarr_format = ( - _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format) + _handle_zarr_version_or_format( + zarr_version=zarr_version, + zarr_format=zarr_format, + ) or _default_zarr_version() ) @@ -346,10 +361,22 @@ async def save_group( raise ValueError("at least one array must be provided") aws = [] for i, arr in enumerate(args): - aws.append(save_array(store, arr, zarr_format=zarr_format, path=f"{path}/arr_{i}")) + aws.append( + save_array( + store, + arr, + zarr_format=zarr_format, + path=f"{path}/arr_{i}", + storage_options=storage_options, + ) + ) for k, arr in kwargs.items(): _path = f"{path}/{k}" if path is not None else k - aws.append(save_array(store, arr, zarr_format=zarr_format, path=_path)) + aws.append( + save_array( + store, arr, zarr_format=zarr_format, path=_path, storage_options=storage_options + ) + ) await asyncio.gather(*aws) @@ -418,6 +445,7 @@ async def group( zarr_format: ZarrFormat | None = None, meta_array: Any | None = None, # not used attributes: dict[str, JSON] | None = None, + storage_options: dict[str, Any] | None = None, ) -> AsyncGroup: """Create a group. @@ -444,6 +472,9 @@ async def group( to users. Use `numpy.empty(())` by default. zarr_format : {2, 3, None}, optional The zarr format to use when saving. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. Returns ------- @@ -453,7 +484,7 @@ async def group( zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format) - store_path = await make_store_path(store) + store_path = await make_store_path(store, storage_options=storage_options) if path is not None: store_path = store_path / path @@ -488,7 +519,7 @@ async def open_group( synchronizer: Any = None, # not used path: str | None = None, chunk_store: StoreLike | None = None, # not used - storage_options: dict[str, Any] | None = None, # not used + storage_options: dict[str, Any] | None = None, zarr_version: ZarrFormat | None = None, # deprecated zarr_format: ZarrFormat | None = None, meta_array: Any | None = None, # not used @@ -548,10 +579,8 @@ async def open_group( warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2) if chunk_store is not None: warnings.warn("chunk_store is not yet implemented", RuntimeWarning, stacklevel=2) - if storage_options is not None: - warnings.warn("storage_options is not yet implemented", RuntimeWarning, stacklevel=2) - store_path = await make_store_path(store, mode=mode) + store_path = await make_store_path(store, mode=mode, storage_options=storage_options) if path is not None: store_path = store_path / path @@ -603,6 +632,7 @@ async def create( ) = None, codecs: Iterable[Codec | dict[str, JSON]] | None = None, dimension_names: Iterable[str] | None = None, + storage_options: dict[str, Any] | None = None, **kwargs: Any, ) -> AsyncArray: """Create an array. @@ -674,6 +704,9 @@ async def create( to users. Use `numpy.empty(())` by default. .. versionadded:: 2.13 + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. Returns ------- @@ -725,7 +758,7 @@ async def create( warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2) mode = kwargs.pop("mode", cast(AccessModeLiteral, "r" if read_only else "w")) - store_path = await make_store_path(store, mode=mode) + store_path = await make_store_path(store, mode=mode, storage_options=storage_options) if path is not None: store_path = store_path / path @@ -875,6 +908,7 @@ async def open_array( zarr_version: ZarrFormat | None = None, # deprecated zarr_format: ZarrFormat | None = None, path: PathLike | None = None, + storage_options: dict[str, Any] | None = None, **kwargs: Any, # TODO: type kwargs as valid args to save ) -> AsyncArray: """Open an array using file-mode-like semantics. @@ -887,6 +921,9 @@ async def open_array( The zarr format to use when saving. path : string, optional Path in store to array. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. **kwargs Any keyword arguments to pass to the array constructor. @@ -896,7 +933,7 @@ async def open_array( The opened array. """ - store_path = await make_store_path(store) + store_path = await make_store_path(store, storage_options=storage_options) if path is not None: store_path = store_path / path diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index 93a33b8d3f..129f901934 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -134,6 +134,7 @@ def save_group( zarr_version: ZarrFormat | None = None, # deprecated zarr_format: ZarrFormat | None = None, path: str | None = None, + storage_options: dict[str, Any] | None = None, **kwargs: NDArrayLike, ) -> None: return sync( @@ -143,6 +144,7 @@ def save_group( zarr_version=zarr_version, zarr_format=zarr_format, path=path, + storage_options=storage_options, **kwargs, ) ) diff --git a/src/zarr/store/common.py b/src/zarr/store/common.py index 8028c9af3d..992cb7ad0f 100644 --- a/src/zarr/store/common.py +++ b/src/zarr/store/common.py @@ -4,6 +4,9 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Literal +import fsspec +import fsspec.implementations + from zarr.abc.store import AccessMode, Store from zarr.core.buffer import Buffer, default_buffer_prototype from zarr.core.common import ZARR_JSON, ZARRAY_JSON, ZGROUP_JSON, ZarrFormat @@ -11,6 +14,8 @@ from zarr.store.local import LocalStore from zarr.store.memory import MemoryStore +# from zarr.store.remote import RemoteStore + if TYPE_CHECKING: from zarr.core.buffer import BufferPrototype from zarr.core.common import AccessModeLiteral @@ -75,30 +80,50 @@ def __eq__(self, other: Any) -> bool: async def make_store_path( - store_like: StoreLike | None, *, mode: AccessModeLiteral | None = None + store_like: StoreLike | None, + *, + path: str | None = None, + mode: AccessModeLiteral | None = None, + storage_options: dict[str, Any] | None = None, ) -> StorePath: + from zarr.store.remote import RemoteStore # circular import + if isinstance(store_like, StorePath): if mode is not None: assert AccessMode.from_literal(mode) == store_like.store.mode - return store_like + result = store_like elif isinstance(store_like, Store): if mode is not None: assert AccessMode.from_literal(mode) == store_like.mode await store_like._ensure_open() - return StorePath(store_like) + result = StorePath(store_like) elif store_like is None: if mode is None: mode = "w" # exception to the default mode = 'r' - return StorePath(await MemoryStore.open(mode=mode)) + result = StorePath(await MemoryStore.open(mode=mode)) elif isinstance(store_like, Path): - return StorePath(await LocalStore.open(root=store_like, mode=mode or "r")) + result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r")) elif isinstance(store_like, str): - return StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r")) + try: + fs, path = fsspec.url_to_fs(store_like, **storage_options) + except Exception: + # not sure what to do here, but I don't want this to fail... + pass + else: + if "file" not in fs.protocol: + storage_options = storage_options or {} + return StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options)) + result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r")) elif isinstance(store_like, dict): # We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings. # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. - return StorePath(await MemoryStore.open(store_dict=store_like, mode=mode)) - raise TypeError + result = StorePath(await MemoryStore.open(store_dict=store_like, mode=mode)) + else: + raise TypeError + + if path is not None: + result = result / path + return result async def ensure_no_existing_node(store_path: StorePath, zarr_format: ZarrFormat) -> None: diff --git a/tests/v3/test_store/test_core.py b/tests/v3/test_store/test_core.py index c65d91f9d0..44eaf95a8c 100644 --- a/tests/v3/test_store/test_core.py +++ b/tests/v3/test_store/test_core.py @@ -5,6 +5,7 @@ from zarr.store.common import make_store_path from zarr.store.local import LocalStore from zarr.store.memory import MemoryStore +from zarr.store.remote import RemoteStore async def test_make_store_path(tmpdir: str) -> None: @@ -34,3 +35,11 @@ async def test_make_store_path(tmpdir: str) -> None: with pytest.raises(TypeError): await make_store_path(1) # type: ignore[arg-type] + + +async def test_make_store_path_fsspec(monkeypatch) -> None: + import fsspec.implementations.memory + + monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True) + store_path = await make_store_path("memory://") + assert isinstance(store_path.store, RemoteStore) diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index afa991209f..01495e3a16 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -1,3 +1,4 @@ +import json import os from collections.abc import Generator @@ -6,6 +7,7 @@ import pytest from upath import UPath +import zarr.api.asynchronous from zarr.core.buffer import Buffer, cpu, default_buffer_prototype from zarr.core.sync import sync from zarr.store import RemoteStore @@ -158,3 +160,42 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None: def test_store_supports_listing(self, store: RemoteStore) -> None: assert True + + async def test_remote_store_from_uri( + self, store: RemoteStore, store_kwargs: dict[str, str | bool] + ): + storage_options = { + "endpoint_url": endpoint_url, + "anon": False, + } + + meta = {"attributes": {"key": "value"}, "zarr_format": 3, "node_type": "group"} + + await store.set( + "zarr.json", + self.buffer_cls.from_bytes(json.dumps(meta).encode()), + ) + group = await zarr.api.asynchronous.open_group( + store=store._url, storage_options=storage_options + ) + assert dict(group.attrs) == {"key": "value"} + + meta["attributes"]["key"] = "value-2" + await store.set( + "directory-2/zarr.json", + self.buffer_cls.from_bytes(json.dumps(meta).encode()), + ) + group = await zarr.api.asynchronous.open_group( + store="/".join([store._url.rstrip("/"), "directory-2"]), storage_options=storage_options + ) + assert dict(group.attrs) == {"key": "value-2"} + + meta["attributes"]["key"] = "value-3" + await store.set( + "directory-3/zarr.json", + self.buffer_cls.from_bytes(json.dumps(meta).encode()), + ) + group = await zarr.api.asynchronous.open_group( + store=store._url, path="directory-3", storage_options=storage_options + ) + assert dict(group.attrs) == {"key": "value-3"} From 64b9371cac603cf0fd1ce82e9c94fb22f458d29e Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 17 Sep 2024 11:29:21 -0500 Subject: [PATCH 2/8] fixup --- src/zarr/store/common.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/zarr/store/common.py b/src/zarr/store/common.py index 992cb7ad0f..7bfe35bc2b 100644 --- a/src/zarr/store/common.py +++ b/src/zarr/store/common.py @@ -104,16 +104,13 @@ async def make_store_path( elif isinstance(store_like, Path): result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r")) elif isinstance(store_like, str): - try: - fs, path = fsspec.url_to_fs(store_like, **storage_options) - except Exception: - # not sure what to do here, but I don't want this to fail... - pass + storage_options = storage_options or {} + fs, path = fsspec.url_to_fs(store_like, **storage_options) + if "file" not in fs.protocol: + storage_options = storage_options or {} + result = StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options)) else: - if "file" not in fs.protocol: - storage_options = storage_options or {} - return StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options)) - result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r")) + result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r")) elif isinstance(store_like, dict): # We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings. # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. From 72fb559dae2ec62d92634777fbbf44c1b090af95 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 17 Sep 2024 11:50:17 -0500 Subject: [PATCH 3/8] fixup --- src/zarr/store/common.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/zarr/store/common.py b/src/zarr/store/common.py index 7bfe35bc2b..afec50ea62 100644 --- a/src/zarr/store/common.py +++ b/src/zarr/store/common.py @@ -82,7 +82,6 @@ def __eq__(self, other: Any) -> bool: async def make_store_path( store_like: StoreLike | None, *, - path: str | None = None, mode: AccessModeLiteral | None = None, storage_options: dict[str, Any] | None = None, ) -> StorePath: @@ -105,7 +104,7 @@ async def make_store_path( result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r")) elif isinstance(store_like, str): storage_options = storage_options or {} - fs, path = fsspec.url_to_fs(store_like, **storage_options) + fs, _ = fsspec.url_to_fs(store_like, **storage_options) if "file" not in fs.protocol: storage_options = storage_options or {} result = StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options)) @@ -118,8 +117,6 @@ async def make_store_path( else: raise TypeError - if path is not None: - result = result / path return result From 205807e5623ed66a8780113e4baff66cae2ce6d1 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Wed, 18 Sep 2024 11:27:57 -0500 Subject: [PATCH 4/8] wip --- src/zarr/store/remote.py | 110 ++++++++++++++++------------- tests/v3/test_store/test_remote.py | 61 ++++++++-------- 2 files changed, 89 insertions(+), 82 deletions(-) diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index e3e2ba3447..f350db724b 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -11,12 +11,18 @@ from collections.abc import AsyncGenerator from fsspec.asyn import AsyncFileSystem - from upath import UPath from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.common import AccessModeLiteral, BytesLike +ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( + FileNotFoundError, + IsADirectoryError, + NotADirectoryError, +) + + class RemoteStore(Store): # based on FSSpec supports_writes: bool = True @@ -24,21 +30,18 @@ class RemoteStore(Store): supports_partial_writes: bool = False supports_listing: bool = True - _fs: AsyncFileSystem - _url: str - path: str + fs: AsyncFileSystem + # _url: str + # path: str allowed_exceptions: tuple[type[Exception], ...] def __init__( self, - url: UPath | str, + fs: AsyncFileSystem, mode: AccessModeLiteral = "r", - allowed_exceptions: tuple[type[Exception], ...] = ( - FileNotFoundError, - IsADirectoryError, - NotADirectoryError, - ), - **storage_options: Any, + path: str = "/", + # url: UPath | str, + allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, ): """ Parameters @@ -51,53 +54,62 @@ def __init__( this must not be used. """ super().__init__(mode=mode) - self._storage_options = storage_options - if isinstance(url, str): - self._url = url.rstrip("/") - self._fs, _path = fsspec.url_to_fs(url, **storage_options) - self.path = _path.rstrip("/") - elif hasattr(url, "protocol") and hasattr(url, "fs"): - # is UPath-like - but without importing - if storage_options: - raise ValueError( - "If constructed with a UPath object, no additional " - "storage_options are allowed" - ) - # n.b. UPath returns the url and path attributes with a trailing /, at least for s3 - # that trailing / must be removed to compose with the store interface - self._url = str(url).rstrip("/") - self.path = url.path.rstrip("/") - self._fs = url.fs - else: - raise ValueError(f"URL not understood, {url}") + self.fs = fs + self.path = path self.allowed_exceptions = allowed_exceptions - # test instantiate file system - if not self._fs.async_impl: - raise TypeError("FileSystem needs to support async operations") + + if not self.fs.async_impl: + raise TypeError("Filesystem needs to support async operations.") + + @classmethod + def from_upath( + cls, + upath: Any, + mode: AccessModeLiteral = "r", + allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, + ) -> RemoteStore: + return cls( + fs=upath.fs, + path=upath.path.rstrip("/"), + mode=mode, + allowed_exceptions=allowed_exceptions, + ) + + @classmethod + def from_url( + cls, + url: str, + storage_options: dict[str, Any] | None = None, + mode: AccessModeLiteral = "r", + allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, + ): + fs, path = fsspec.url_to_fs(url, **storage_options) + return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) async def clear(self) -> None: try: - for subpath in await self._fs._find(self.path, withdirs=True): + for subpath in await self.fs._find(self.path, withdirs=True): if subpath != self.path: - await self._fs._rm(subpath, recursive=True) + await self.fs._rm(subpath, recursive=True) except FileNotFoundError: pass async def empty(self) -> bool: - return not await self._fs._find(self.path, withdirs=True) + return not await self.fs._find(self.path, withdirs=True) - def __str__(self) -> str: - return f"{self._url}" + # def __str__(self) -> str: + # return f"RemoteStore" def __repr__(self) -> str: - return f"" + return f"" def __eq__(self, other: object) -> bool: return ( isinstance(other, type(self)) and self.path == other.path and self.mode == other.mode - and self._url == other._url + and self.fs == other.fs + # and self._url == other._url # and self._storage_options == other._storage_options # FIXME: this isn't working for some reason ) @@ -123,9 +135,9 @@ async def get( end = None value = prototype.buffer.from_bytes( await ( - self._fs._cat_file(path, start=byte_range[0], end=end) + self.fs._cat_file(path, start=byte_range[0], end=end) if byte_range - else self._fs._cat_file(path) + else self.fs._cat_file(path) ) ) @@ -152,13 +164,13 @@ async def set( # write data if byte_range: raise NotImplementedError - await self._fs._pipe_file(path, value.to_bytes()) + await self.fs._pipe_file(path, value.to_bytes()) async def delete(self, key: str) -> None: self._check_writable() path = _dereference_path(self.path, key) try: - await self._fs._rm(path) + await self.fs._rm(path) except FileNotFoundError: pass except self.allowed_exceptions: @@ -166,7 +178,7 @@ async def delete(self, key: str) -> None: async def exists(self, key: str) -> bool: path = _dereference_path(self.path, key) - exists: bool = await self._fs._exists(path) + exists: bool = await self.fs._exists(path) return exists async def get_partial_values( @@ -189,7 +201,7 @@ async def get_partial_values( else: return [] # TODO: expectations for exceptions or missing keys? - res = await self._fs._cat_ranges(list(paths), starts, stops, on_error="return") + res = await self.fs._cat_ranges(list(paths), starts, stops, on_error="return") # the following is an s3-specific condition we probably don't want to leak res = [b"" if (isinstance(r, OSError) and "not satisfiable" in str(r)) else r for r in res] for r in res: @@ -202,19 +214,19 @@ async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesL raise NotImplementedError async def list(self) -> AsyncGenerator[str, None]: - allfiles = await self._fs._find(self.path, detail=False, withdirs=False) + allfiles = await self.fs._find(self.path, detail=False, withdirs=False) for onefile in (a.replace(self.path + "/", "") for a in allfiles): yield onefile async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: prefix = f"{self.path}/{prefix.rstrip('/')}" try: - allfiles = await self._fs._ls(prefix, detail=False) + allfiles = await self.fs._ls(prefix, detail=False) except FileNotFoundError: return for onefile in (a.replace(prefix + "/", "") for a in allfiles): yield onefile.removeprefix(self.path).removeprefix("/") async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: - for onefile in await self._fs._ls(prefix, detail=False): + for onefile in await self.fs._ls(prefix, detail=False): yield onefile diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 01495e3a16..4b5d2a493b 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -5,7 +5,6 @@ import botocore.client import fsspec import pytest -from upath import UPath import zarr.api.asynchronous from zarr.core.buffer import Buffer, cpu, default_buffer_prototype @@ -86,8 +85,10 @@ async def alist(it): async def test_basic() -> None: - store = await RemoteStore.open( - f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_url, anon=False + store = RemoteStore.from_url( + f"s3://{test_bucket_name}", + mode="w", + storage_options=dict(endpoint_url=endpoint_url, anon=False), ) assert not await alist(store.list()) assert not await store.exists("foo") @@ -105,48 +106,42 @@ class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]): store_cls = RemoteStore buffer_cls = cpu.Buffer - @pytest.fixture(scope="function", params=("use_upath", "use_str")) + @pytest.fixture(scope="function") def store_kwargs(self, request) -> dict[str, str | bool]: - url = f"s3://{test_bucket_name}" - anon = False - mode = "r+" - if request.param == "use_upath": - return {"url": UPath(url, endpoint_url=endpoint_url, anon=anon), "mode": mode} - elif request.param == "use_str": - return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url} - - raise AssertionError + fs, path = fsspec.url_to_fs( + f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False + ) + return {"fs": fs, "path": path, "mode": "r+"} @pytest.fixture(scope="function") def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: - url = store_kwargs["url"] - mode = store_kwargs["mode"] - if isinstance(url, UPath): - out = self.store_cls(url=url, mode=mode) - else: - endpoint_url = store_kwargs["endpoint_url"] - out = self.store_cls(url=url, asynchronous=True, mode=mode, endpoint_url=endpoint_url) - return out + return self.store_cls(**store_kwargs) + + # url = store_kwargs["url"] + # mode = store_kwargs["mode"] + # if isinstance(url, UPath): + # out = self.store_cls.from_upath(url, mode=mode) + # else: + # storage_options = { + # "anon": store_kwargs["anon"], + # "endpoint_url": store_kwargs["endpoint_url"] + # } + # out = self.store_cls.from_url(url=url, storage_options=storage_options, mode=mode) + # return out def get(self, store: RemoteStore, key: str) -> Buffer: # make a new, synchronous instance of the filesystem because this test is run in sync code - fs, _ = fsspec.url_to_fs( - url=store._url, - asynchronous=False, - anon=store._fs.anon, - endpoint_url=store._fs.endpoint_url, + new_fs = fsspec.filesystem( + "s3", endpoint_url=store.fs.endpoint_url, anon=store.fs.anon, asynchronous=False ) - return self.buffer_cls.from_bytes(fs.cat(f"{store.path}/{key}")) + return self.buffer_cls.from_bytes(new_fs.cat(f"{store.path}/{key}")) def set(self, store: RemoteStore, key: str, value: Buffer) -> None: # make a new, synchronous instance of the filesystem because this test is run in sync code - fs, _ = fsspec.url_to_fs( - url=store._url, - asynchronous=False, - anon=store._fs.anon, - endpoint_url=store._fs.endpoint_url, + new_fs = fsspec.filesystem( + "s3", endpoint_url=store.fs.endpoint_url, anon=store.fs.anon, asynchronous=False ) - fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) + new_fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) def test_store_repr(self, store: RemoteStore) -> None: assert str(store) == f"s3://{test_bucket_name}" From 0ea04afa6a92c13450776819b2ad9ca2336e7465 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Wed, 18 Sep 2024 11:42:11 -0500 Subject: [PATCH 5/8] fixup --- src/zarr/store/common.py | 28 +++++++++++++++++++++------- src/zarr/store/remote.py | 8 +------- tests/v3/test_store/test_remote.py | 26 ++++++++++---------------- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/zarr/store/common.py b/src/zarr/store/common.py index afec50ea62..6ea6687464 100644 --- a/src/zarr/store/common.py +++ b/src/zarr/store/common.py @@ -4,9 +4,6 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Literal -import fsspec -import fsspec.implementations - from zarr.abc.store import AccessMode, Store from zarr.core.buffer import Buffer, default_buffer_prototype from zarr.core.common import ZARR_JSON, ZARRAY_JSON, ZGROUP_JSON, ZarrFormat @@ -104,10 +101,11 @@ async def make_store_path( result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r")) elif isinstance(store_like, str): storage_options = storage_options or {} - fs, _ = fsspec.url_to_fs(store_like, **storage_options) - if "file" not in fs.protocol: - storage_options = storage_options or {} - result = StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options)) + + if _is_fsspec_uri(store_like): + result = StorePath( + RemoteStore.from_url(store_like, storage_options=storage_options, mode=mode or "r") + ) else: result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r")) elif isinstance(store_like, dict): @@ -120,6 +118,22 @@ async def make_store_path( return result +def _is_fsspec_uri(uri: str) -> bool: + """ + Check if a URI looks like a non-local fsspec URI. + + Examples + -------- + >>> _is_fsspec_uri("s3://bucket") + True + >>> _is_fsspec_uri("my-directory") + False + >>> _is_fsspec_uri("local://my-directory") + False + """ + return "://" in uri or "::" in uri and "local://" not in uri + + async def ensure_no_existing_node(store_path: StorePath, zarr_format: ZarrFormat) -> None: """ Check if a store_path is safe for array / group creation. diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index f350db724b..a9f6dd077b 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -40,7 +40,6 @@ def __init__( fs: AsyncFileSystem, mode: AccessModeLiteral = "r", path: str = "/", - # url: UPath | str, allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, ): """ @@ -82,7 +81,7 @@ def from_url( storage_options: dict[str, Any] | None = None, mode: AccessModeLiteral = "r", allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, - ): + ) -> RemoteStore: fs, path = fsspec.url_to_fs(url, **storage_options) return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) @@ -97,9 +96,6 @@ async def clear(self) -> None: async def empty(self) -> bool: return not await self.fs._find(self.path, withdirs=True) - # def __str__(self) -> str: - # return f"RemoteStore" - def __repr__(self) -> str: return f"" @@ -109,8 +105,6 @@ def __eq__(self, other: object) -> bool: and self.path == other.path and self.mode == other.mode and self.fs == other.fs - # and self._url == other._url - # and self._storage_options == other._storage_options # FIXME: this isn't working for some reason ) async def get( diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 4b5d2a493b..63ae9debc6 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -5,6 +5,7 @@ import botocore.client import fsspec import pytest +from upath import UPath import zarr.api.asynchronous from zarr.core.buffer import Buffer, cpu, default_buffer_prototype @@ -117,18 +118,6 @@ def store_kwargs(self, request) -> dict[str, str | bool]: def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: return self.store_cls(**store_kwargs) - # url = store_kwargs["url"] - # mode = store_kwargs["mode"] - # if isinstance(url, UPath): - # out = self.store_cls.from_upath(url, mode=mode) - # else: - # storage_options = { - # "anon": store_kwargs["anon"], - # "endpoint_url": store_kwargs["endpoint_url"] - # } - # out = self.store_cls.from_url(url=url, storage_options=storage_options, mode=mode) - # return out - def get(self, store: RemoteStore, key: str) -> Buffer: # make a new, synchronous instance of the filesystem because this test is run in sync code new_fs = fsspec.filesystem( @@ -144,7 +133,7 @@ def set(self, store: RemoteStore, key: str, value: Buffer) -> None: new_fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) def test_store_repr(self, store: RemoteStore) -> None: - assert str(store) == f"s3://{test_bucket_name}" + assert str(store) == "" def test_store_supports_writes(self, store: RemoteStore) -> None: assert True @@ -171,7 +160,7 @@ async def test_remote_store_from_uri( self.buffer_cls.from_bytes(json.dumps(meta).encode()), ) group = await zarr.api.asynchronous.open_group( - store=store._url, storage_options=storage_options + store=f"s3://{test_bucket_name}", storage_options=storage_options ) assert dict(group.attrs) == {"key": "value"} @@ -181,7 +170,7 @@ async def test_remote_store_from_uri( self.buffer_cls.from_bytes(json.dumps(meta).encode()), ) group = await zarr.api.asynchronous.open_group( - store="/".join([store._url.rstrip("/"), "directory-2"]), storage_options=storage_options + store=f"s3://{test_bucket_name}/directory-2", storage_options=storage_options ) assert dict(group.attrs) == {"key": "value-2"} @@ -191,6 +180,11 @@ async def test_remote_store_from_uri( self.buffer_cls.from_bytes(json.dumps(meta).encode()), ) group = await zarr.api.asynchronous.open_group( - store=store._url, path="directory-3", storage_options=storage_options + store=f"s3://{test_bucket_name}", path="directory-3", storage_options=storage_options ) assert dict(group.attrs) == {"key": "value-3"} + + def test_from_upath(self) -> None: + path = UPath(f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False) + result = RemoteStore.from_upath(path) + assert result.fs.endpoint_url == endpoint_url From c8535af7c989693df705300d7c3008eb609223fa Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 19 Sep 2024 09:41:35 -0500 Subject: [PATCH 6/8] Added check for invalid. --- src/zarr/store/common.py | 7 +++++++ tests/v3/test_store/test_core.py | 19 ++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/zarr/store/common.py b/src/zarr/store/common.py index 6ea6687464..79ac0a44dd 100644 --- a/src/zarr/store/common.py +++ b/src/zarr/store/common.py @@ -84,6 +84,8 @@ async def make_store_path( ) -> StorePath: from zarr.store.remote import RemoteStore # circular import + used_storage_options = False + if isinstance(store_like, StorePath): if mode is not None: assert AccessMode.from_literal(mode) == store_like.store.mode @@ -103,6 +105,7 @@ async def make_store_path( storage_options = storage_options or {} if _is_fsspec_uri(store_like): + used_storage_options = True result = StorePath( RemoteStore.from_url(store_like, storage_options=storage_options, mode=mode or "r") ) @@ -115,6 +118,10 @@ async def make_store_path( else: raise TypeError + if storage_options and not used_storage_options: + msg = "'storage_options' was provided but unused. 'storage_options' is only used for fsspec filesystem stores." + raise TypeError(msg) + return result diff --git a/tests/v3/test_store/test_core.py b/tests/v3/test_store/test_core.py index 44eaf95a8c..d9c4d33461 100644 --- a/tests/v3/test_store/test_core.py +++ b/tests/v3/test_store/test_core.py @@ -1,8 +1,9 @@ +import tempfile from pathlib import Path import pytest -from zarr.store.common import make_store_path +from zarr.store.common import StoreLike, StorePath, make_store_path from zarr.store.local import LocalStore from zarr.store.memory import MemoryStore from zarr.store.remote import RemoteStore @@ -43,3 +44,19 @@ async def test_make_store_path_fsspec(monkeypatch) -> None: monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True) store_path = await make_store_path("memory://") assert isinstance(store_path.store, RemoteStore) + + +@pytest.mark.parametrize( + "store_like", + [ + None, + str(tempfile.TemporaryDirectory()), + Path(tempfile.TemporaryDirectory().name), + StorePath(store=MemoryStore(store_dict={}, mode="w"), path="/"), + MemoryStore(store_dict={}, mode="w"), + {}, + ], +) +async def test_make_store_path_storage_options_raises(store_like: StoreLike) -> None: + with pytest.raises(TypeError, match="storage_options"): + await make_store_path(store_like, storage_options={"foo": "bar"}, mode="w") From fe089fce8c139b870c8fbf8ff60512ae1b7f03bf Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 19 Sep 2024 09:44:11 -0500 Subject: [PATCH 7/8] fixup --- src/zarr/store/common.py | 3 ++- tests/v3/test_store/test_core.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/zarr/store/common.py b/src/zarr/store/common.py index 79ac0a44dd..196479dd67 100644 --- a/src/zarr/store/common.py +++ b/src/zarr/store/common.py @@ -116,7 +116,8 @@ async def make_store_path( # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. result = StorePath(await MemoryStore.open(store_dict=store_like, mode=mode)) else: - raise TypeError + msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable] + raise TypeError(msg) if storage_options and not used_storage_options: msg = "'storage_options' was provided but unused. 'storage_options' is only used for fsspec filesystem stores." diff --git a/tests/v3/test_store/test_core.py b/tests/v3/test_store/test_core.py index d9c4d33461..f401491127 100644 --- a/tests/v3/test_store/test_core.py +++ b/tests/v3/test_store/test_core.py @@ -60,3 +60,8 @@ async def test_make_store_path_fsspec(monkeypatch) -> None: async def test_make_store_path_storage_options_raises(store_like: StoreLike) -> None: with pytest.raises(TypeError, match="storage_options"): await make_store_path(store_like, storage_options={"foo": "bar"}, mode="w") + + +async def test_unsupported() -> None: + with pytest.raises(TypeError, match="Unsupported type for store_like: 'int'"): + await make_store_path(1) # type: ignore[arg-type] From 8bbc508f4a45eef9129e75c24aac4f3a1f739e7f Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 19 Sep 2024 11:37:49 -0500 Subject: [PATCH 8/8] fixup --- src/zarr/store/remote.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index 9b4c09212a..ecb46a31d3 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -31,8 +31,6 @@ class RemoteStore(Store): supports_listing: bool = True fs: AsyncFileSystem - # _url: str - # path: str allowed_exceptions: tuple[type[Exception], ...] def __init__(