From 5b8dd934001efcb330ad50b38ec1cc61f565767f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 20 Dec 2021 15:06:47 -0700 Subject: [PATCH 01/21] WIP --- poetry.lock | 79 +++++++++++++++++++++- pyproject.toml | 1 + stackstac/tests/__init__.py | 0 stackstac/tests/test_to_dask.py | 115 ++++++++++++++++++++++++++++++++ stackstac/to_dask.py | 83 ++++++++++++----------- 5 files changed, 239 insertions(+), 39 deletions(-) create mode 100644 stackstac/tests/__init__.py create mode 100644 stackstac/tests/test_to_dask.py diff --git a/poetry.lock b/poetry.lock index b190502..e1a95e5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -126,6 +126,14 @@ python-versions = ">=3.6" [package.dependencies] typing-extensions = ">=3.6.5" +[[package]] +name = "atomicwrites" +version = "1.4.0" +description = "Atomic file writes." +category = "dev" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + [[package]] name = "attrs" version = "21.2.0" @@ -760,6 +768,14 @@ zipp = {version = ">=3.1.0", markers = "python_version < \"3.10\""} docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-black (>=0.3.7)", "pytest-mypy"] +[[package]] +name = "iniconfig" +version = "1.1.1" +description = "iniconfig: brain-dead simple config-ini parsing" +category = "dev" +optional = false +python-versions = "*" + [[package]] name = "ipykernel" version = "6.6.0" @@ -1588,6 +1604,18 @@ python-versions = ">=3.6" docs = ["Sphinx (>=4)", "furo (>=2021.7.5b38)", "proselint (>=0.10.2)", "sphinx-autodoc-typehints (>=1.12)"] test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock (>=3.6)"] +[[package]] +name = "pluggy" +version = "1.0.0" +description = "plugin and hook calling mechanisms for python" +category = "dev" +optional = false +python-versions = ">=3.6" + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + [[package]] name = "plumbum" version = "1.7.1" @@ -1796,6 +1824,27 @@ requests = ">=2.25" [package.extras] validation = ["jsonschema (==3.2.0)"] +[[package]] +name = "pytest" +version = "6.2.5" +description = "pytest: simple powerful testing with Python" +category = "dev" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""} +attrs = ">=19.2.0" +colorama = {version = "*", markers = "sys_platform == \"win32\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=0.12,<2.0" +py = ">=1.8.2" +toml = "*" + +[package.extras] +testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"] + [[package]] name = "python-dateutil" version = "2.7.5" @@ -2344,6 +2393,14 @@ category = "dev" optional = false python-versions = ">=3.6" +[[package]] +name = "toml" +version = "0.10.2" +description = "Python Library for Tom's Obvious, Minimal Language" +category = "dev" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "tomli" version = "1.2.2" @@ -2559,7 +2616,7 @@ viz = ["aiohttp", "cachetools", "distributed", "ipyleaflet", "jupyter-server-pro [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "052718622fd814274948455946790e0f6942f50bec30101b86f1e29d3259ade2" +content-hash = "9c41586f3a1e06e9f15e47d9b5772fe06f54d501296976e0311c35f1d54695d7" [metadata.files] affine = [ @@ -2680,6 +2737,10 @@ async-timeout = [ {file = "async-timeout-4.0.1.tar.gz", hash = "sha256:b930cb161a39042f9222f6efb7301399c87eeab394727ec5437924a36d6eef51"}, {file = "async_timeout-4.0.1-py3-none-any.whl", hash = "sha256:a22c0b311af23337eb05fcf05a8b51c3ea53729d46fb5460af62bee033cec690"}, ] +atomicwrites = [ + {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, + {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, +] attrs = [ {file = "attrs-21.2.0-py2.py3-none-any.whl", hash = "sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1"}, {file = "attrs-21.2.0.tar.gz", hash = "sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb"}, @@ -3062,6 +3123,10 @@ importlib-resources = [ {file = "importlib_resources-5.4.0-py3-none-any.whl", hash = "sha256:33a95faed5fc19b4bc16b29a6eeae248a3fe69dd55d4d229d2b480e23eeaad45"}, {file = "importlib_resources-5.4.0.tar.gz", hash = "sha256:d756e2f85dd4de2ba89be0b21dba2a3bbec2e871a42a3a16719258a11f87506b"}, ] +iniconfig = [ + {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, + {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"}, +] ipykernel = [ {file = "ipykernel-6.6.0-py3-none-any.whl", hash = "sha256:82ded8919fa7f5483be2b6219c3b13380d93faab1fc49cc2cfcd10e9e24cc158"}, {file = "ipykernel-6.6.0.tar.gz", hash = "sha256:3a227788216b43982d9ac28195949467627b0d16e6b8af9741d95dcaa8c41a89"}, @@ -3590,6 +3655,10 @@ platformdirs = [ {file = "platformdirs-2.4.0-py3-none-any.whl", hash = "sha256:8868bbe3c3c80d42f20156f22e7131d2fb321f5bc86a2a345375c6481a67021d"}, {file = "platformdirs-2.4.0.tar.gz", hash = "sha256:367a5e80b3d04d2428ffa76d33f124cf11e8fff2acdaa9b43d545f5c7d661ef2"}, ] +pluggy = [ + {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, + {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, +] plumbum = [ {file = "plumbum-1.7.1-py2.py3-none-any.whl", hash = "sha256:e97229fdf218698b4e9e939355f4d20d20bf57f8a02710f81f92442e2b2db038"}, {file = "plumbum-1.7.1.tar.gz", hash = "sha256:3c0ac8c4ee57b2adddc82909d3c738a62ef5f77faf24ec7cb6f0a117e1679740"}, @@ -3758,6 +3827,10 @@ pystac-client = [ {file = "pystac-client-0.3.1.tar.gz", hash = "sha256:9c993d1ac2101a4fa4b1e92f8c2ea9c7a78b28bb1f777af821e9e32b20688057"}, {file = "pystac_client-0.3.1-py3-none-any.whl", hash = "sha256:8f2179d4ecb938199b81d9e9ec11715ab388a3d1fc5b38d9a2b6edf2fd52ebfb"}, ] +pytest = [ + {file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"}, + {file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"}, +] python-dateutil = [ {file = "python-dateutil-2.7.5.tar.gz", hash = "sha256:88f9287c0174266bb0d8cedd395cfba9c58e87e5ad86b2ce58859bc11be3cf02"}, {file = "python_dateutil-2.7.5-py2.py3-none-any.whl", hash = "sha256:063df5763652e21de43de7d9e00ccf239f953a832941e37be541614732cdfc93"}, @@ -4114,6 +4187,10 @@ threadpoolctl = [ {file = "threadpoolctl-3.0.0-py3-none-any.whl", hash = "sha256:4fade5b3b48ae4b1c30f200b28f39180371104fccc642e039e0f2435ec8cc211"}, {file = "threadpoolctl-3.0.0.tar.gz", hash = "sha256:d03115321233d0be715f0d3a5ad1d6c065fe425ddc2d671ca8e45e9fd5d7a52a"}, ] +toml = [ + {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, + {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, +] tomli = [ {file = "tomli-1.2.2-py3-none-any.whl", hash = "sha256:f04066f68f5554911363063a30b108d2b5a5b1a010aa8b6132af78489fe3aade"}, {file = "tomli-1.2.2.tar.gz", hash = "sha256:c6ce0015eb38820eaf32b5db832dbc26deb3dd427bd5f6556cf0acac2c214fee"}, diff --git a/pyproject.toml b/pyproject.toml index 33b868f..39c00f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ pystac = "^1" snakeviz = "^2.1.0" sphinx-autobuild = "^2021.3.14" twine = "^3.4.1" +pytest = "^6.2.5" [tool.poetry.extras] binder = [ diff --git a/stackstac/tests/__init__.py b/stackstac/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py new file mode 100644 index 0000000..033ec60 --- /dev/null +++ b/stackstac/tests/test_to_dask.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import numpy as np +from rasterio import windows +from dask.array.utils import assert_eq + +from stackstac.raster_spec import RasterSpec +from stackstac.prepare import ASSET_TABLE_DT +from stackstac.to_dask import items_to_dask + + +def test_items_to_dask_basic(): + asset_table = np.array( + [ + # Encode the (i, j) index in the table in the URL + [("fake://0/0", [0, 0, 2, 2]), ("fake://0/1", [0, 0, 2, 2])], + [("fake://1/0", [0, 3, 2, 5]), ("fake://1/1", [0, 3, 2, 5])], + [("fake://2/0", [1, 3, 2, 6]), ("fake://2/1", [1, 3, 2, 7])], + [(None, None), (None, None)], + ], + dtype=ASSET_TABLE_DT, + ) + + def parse_url(url: str) -> tuple[int, int]: + assert url.startswith("fake://") + i, j = url[7:].split("/") + return int(i), int(j) + + def value(i: int, j: int) -> int: + return i << 1 & j + + spec_ = RasterSpec(4326, (0, 0, 7, 8), (0.2, 0.2)) + chunksize = 2 + dtype_ = np.dtype("int32") + fill_value_ = -1 + + class TestReader: + def __init__( + self, + *, + url: str, + spec: RasterSpec, + dtype: np.dtype, + fill_value: int | float, + **kwargs + ) -> None: + i, j = parse_url(url) + entry = asset_table[i, j] + self._window = windows.from_bounds( + *entry["bounds"], transform=spec.transform + ) + self._value = value(i, j) + + self.url = url + assert spec == spec_ + self.spec = spec + assert dtype == dtype_ + self.dtype = dtype + assert fill_value == fill_value_ + self.fill_value = fill_value + + def read(self, window: windows.Window) -> np.ndarray: + assert (window.height, window.width) == (chunksize, chunksize) + assert windows.intersect(window, self._window) + return np.full((window.height, window.width), self._value, self.dtype) + + def close(self) -> None: + pass + + def __getstate__(self) -> dict: + return self.__dict__ + + def __setstate__(self, state): + self.__init__(**state) + + arr = items_to_dask( + asset_table, + spec_, + chunksize, + dtype=dtype_, + fill_value=fill_value_, + reader=TestReader, + ) + assert arr.shape == asset_table.shape + spec_.shape + assert arr.chunksize == (1, 1, chunksize, chunksize) + assert arr.dtype == dtype_ + + expected = np.full(arr.shape, fill_value_, dtype_) + + for i in range(asset_table.shape[0]): + for b in range(asset_table.shape[1]): + entry = asset_table[i, b] + v = value(*parse_url(entry["url"])) + window = windows.from_bounds(*entry["bounds"], transform=spec_.transform) + for y in range(spec_.shape[0]): + for x in range(spec_.shape[1]): + expected[ + i, + b, + y * chunksize : y * chunksize + chunksize, + x * chunksize : x * chunksize + chunksize, + ] = v + + # expected = [ + # [ + # np.full( + # (chunksize, chunksize), + # value(*parse_url(asset["url"])) if asset["url"] else fill_value_, + # dtype_, + # ) + # for asset in row + # ] + # for row in asset_table + ] + assert_eq(arr, expected) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 58a788c..55ed47c 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -1,11 +1,13 @@ from __future__ import annotations import itertools -from typing import Optional, Tuple, Type, TypeVar, Union +from typing import ClassVar, Optional, Tuple, Type, TypeVar, Union import warnings import dask import dask.array as da +from dask.blockwise import BlockwiseDep, blockwise +from dask.highlevelgraph import HighLevelGraph import numpy as np from rasterio import windows from rasterio.enums import Resampling @@ -14,9 +16,6 @@ from .rio_reader import AutoParallelRioReader, LayeredEnv from .reader_protocol import Reader -# from xarray.backends import CachingFileManager -# from xarray.backends.locks import DummyLock - def items_to_dask( asset_table: np.ndarray, @@ -70,33 +69,28 @@ def items_to_dask( meta=asset_table_dask._meta, ) - # MEGAHACK: generate a fake array for our spatial dimensions following `shape` and `chunksize`, - # but where each chunk is not actually NumPy a array, but just a 2-tuple of the (y-slice, x-slice) - # to `read` from the rasterio dataset to fetch that window of data. - shape = spec.shape - name = "slices-" + dask.base.tokenize(chunksize, shape) - chunks = da.core.normalize_chunks(chunksize, shape) - keys = itertools.product([name], *(range(len(bds)) for bds in chunks)) - slices = da.core.slices_from_chunks(chunks) - # HACK: `slices_fake_arr` is in no way a real dask array: the chunks aren't ndarrays; they're tuples of slices! - # We just stick it in an Array container to make dask's blockwise logic handle broadcasting between the graphs of - # `datasets` and `slices_fake_arr`. - slices_fake_arr = da.Array( - dict(zip(keys, slices)), name, chunks, meta=datasets._meta - ) + shape_yx = spec.shape + chunks_yx = da.core.normalize_chunks(chunksize, shape_yx) + chunks = datasets.chunks + chunks_yx with warnings.catch_warnings(): warnings.simplefilter("ignore", category=da.core.PerformanceWarning) - rasters = da.blockwise( + name = f"fetch_raster_window-{dask.base.tokenize(datasets, chunks)}" + # TODO use `da.blockwise` once it supports `BlockwiseDep`s as arguments + lyr = blockwise( fetch_raster_window, + name, "tbyx", - datasets, + datasets.name, "tb", - slices_fake_arr, + Slices(chunks_yx), "yx", - meta=np.ndarray((), dtype=dtype), # TODO dtype + numblocks={datasets.name: datasets.numblocks}, # ugh ) + dsk = HighLevelGraph.from_collections(name, lyr, [datasets]) + rasters = da.Array(dsk, name, chunks, meta=np.ndarray((), dtype=dtype)) + return rasters @@ -125,20 +119,6 @@ def asset_entry_to_reader_and_window( asset_window = windows.from_bounds(*asset_bounds, transform=spec.transform) return ( - # CachingFileManager( - # AutoParallelRioBackend, # TODO other backends - # url, - # spec, - # resampling, - # gdal_env, - # lock=DummyLock(), - # # ^ NOTE: this lock only protects the file cache, not the file itself. - # # xarray's file cache is already thread-safe, so using a lock is pointless. - # ), - # NOTE: skip the `CachingFileManager` for now to be sure datasets aren't leaked. - # There are a few issues with `CachingFileManager`: faulty ref-counting logic, - # and misleading `close` and `lock` interfaces. Either refactor that upstream, - # or implement our own caching logic. reader( url=url, spec=spec, @@ -155,15 +135,15 @@ def asset_entry_to_reader_and_window( def fetch_raster_window( asset_entry: Tuple[ReaderT, windows.Window] | np.ndarray, - slices: Tuple[slice, ...], + slices: Tuple[slice, slice], ) -> np.ndarray: + assert len(slices) == 2, slices current_window = windows.Window.from_slices(*slices) if isinstance(asset_entry, tuple): reader, asset_window = asset_entry # check that the window we're fetching overlaps with the asset if windows.intersect(current_window, asset_window): - # backend: Backend = manager.acquire(needs_lock=False) data = reader.read(current_window) return data[None, None] @@ -175,3 +155,30 @@ def fetch_raster_window( # use the broadcast trick for even fewer memz return np.broadcast_to(fill_arr, (1, 1) + windows.shape(current_window)) + +class Slices(BlockwiseDep): + starts: list[tuple[int, ...]] + produces_tasks: ClassVar[bool] = False + + def __init__(self, chunks: Tuple[Tuple[int, ...], ...]): + self.starts = [tuple(itertools.accumulate(c, initial=0)) for c in chunks] + + def __getitem__(self, idx: Tuple[int, ...]) -> Tuple[slice, ...]: + return tuple( + slice(start[i], start[i + 1]) for i, start in zip(idx, self.starts) + ) + + @property + def numblocks(self) -> list[int]: + return [len(s) - 1 for s in self.starts] + + def __dask_distributed_pack__( + self, required_indices: Optional[list[Tuple[int, ...]]] = None + ) -> list[Tuple[int, ...]]: + return self.starts + + @classmethod + def __dask_distributed_unpack__(cls, state: list[Tuple[int, ...]]) -> Slices: + self = cls.__new__(cls) + self.starts = state + return self From b70b453f3240f4f4a76d6f5adb2fb519e3e9fc49 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 31 Dec 2021 23:51:17 -0700 Subject: [PATCH 02/21] working, somewhat-reasonable first test Still pretty complicated. Probably need some helpers in order to test the more interesting stuff. Also would like to hypothesis? --- stackstac/tests/test_to_dask.py | 95 +++++++++++++++------------------ stackstac/to_dask.py | 5 +- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 033ec60..6962cd3 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -14,26 +14,50 @@ def test_items_to_dask_basic(): [ # Encode the (i, j) index in the table in the URL [("fake://0/0", [0, 0, 2, 2]), ("fake://0/1", [0, 0, 2, 2])], - [("fake://1/0", [0, 3, 2, 5]), ("fake://1/1", [0, 3, 2, 5])], + [("fake://1/0", [0, 3, 2, 5]), ("fake://1/1", [10, 13, 12, 15])], [("fake://2/0", [1, 3, 2, 6]), ("fake://2/1", [1, 3, 2, 7])], [(None, None), (None, None)], ], dtype=ASSET_TABLE_DT, ) - - def parse_url(url: str) -> tuple[int, int]: - assert url.startswith("fake://") - i, j = url[7:].split("/") - return int(i), int(j) - - def value(i: int, j: int) -> int: - return i << 1 & j - spec_ = RasterSpec(4326, (0, 0, 7, 8), (0.2, 0.2)) chunksize = 2 dtype_ = np.dtype("int32") fill_value_ = -1 + # Build expected array of the final stack. + # Start with all nodata, then write data in for each asset. + # The `TestReader` will then just read from this final array, sliced to the appropriate window. + # (This is much easier than calculating where to put nodata values ourselves.) + + asset_windows: dict[str, windows.Window] = {} + results = np.full(asset_table.shape + spec_.shape, fill_value_, dtype_) + for i, item in enumerate(asset_table): + for j, asset in enumerate(item): + url = asset["url"] + if url is None: + continue + assert url == f"fake://{i}/{j}" + + window = windows.from_bounds( + *asset["bounds"], + transform=spec_.transform, + precision=0.0 + # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 + ) + # convert to int so `toslices` works for indexing + window_int = window.round_lengths().round_offsets() + # sanity check; rounding should not have changed anything + assert window_int == window + asset_windows[url] = window_int + + chunk = results[(i, j) + window_int.toslices()] + if chunk.size: + # Asset falls within final bounds + chunk[:] = np.random.default_rng().integers( + 0, 10000, (int(window_int.height), int(window_int.width)), dtype_ + ) + class TestReader: def __init__( self, @@ -42,27 +66,24 @@ def __init__( spec: RasterSpec, dtype: np.dtype, fill_value: int | float, - **kwargs + **kwargs, ) -> None: - i, j = parse_url(url) - entry = asset_table[i, j] - self._window = windows.from_bounds( - *entry["bounds"], transform=spec.transform - ) - self._value = value(i, j) + i, j = map(int, url[7:].split("/")) + self.full_data = results[i, j] + self.window = asset_windows[url] - self.url = url assert spec == spec_ - self.spec = spec assert dtype == dtype_ - self.dtype = dtype assert fill_value == fill_value_ + # NOTE: needed for `Reader` interface: + self.dtype = dtype self.fill_value = fill_value def read(self, window: windows.Window) -> np.ndarray: assert (window.height, window.width) == (chunksize, chunksize) - assert windows.intersect(window, self._window) - return np.full((window.height, window.width), self._value, self.dtype) + # Read should be bypassed entirely if windows don't intersect + assert windows.intersect(window, self.window) + return self.full_data[window.toslices()] def close(self) -> None: pass @@ -81,35 +102,7 @@ def __setstate__(self, state): fill_value=fill_value_, reader=TestReader, ) - assert arr.shape == asset_table.shape + spec_.shape assert arr.chunksize == (1, 1, chunksize, chunksize) assert arr.dtype == dtype_ - expected = np.full(arr.shape, fill_value_, dtype_) - - for i in range(asset_table.shape[0]): - for b in range(asset_table.shape[1]): - entry = asset_table[i, b] - v = value(*parse_url(entry["url"])) - window = windows.from_bounds(*entry["bounds"], transform=spec_.transform) - for y in range(spec_.shape[0]): - for x in range(spec_.shape[1]): - expected[ - i, - b, - y * chunksize : y * chunksize + chunksize, - x * chunksize : x * chunksize + chunksize, - ] = v - - # expected = [ - # [ - # np.full( - # (chunksize, chunksize), - # value(*parse_url(asset["url"])) if asset["url"] else fill_value_, - # dtype_, - # ) - # for asset in row - # ] - # for row in asset_table - ] - assert_eq(arr, expected) + assert_eq(arr, results) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 55ed47c..3508f19 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -116,7 +116,10 @@ def asset_entry_to_reader_and_window( return np.array(fill_value, dtype) asset_bounds: Bbox = asset_entry["bounds"] - asset_window = windows.from_bounds(*asset_bounds, transform=spec.transform) + asset_window = windows.from_bounds( + *asset_bounds, transform=spec.transform, precision=0.0 + # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 + ) return ( reader( From 961f76f6549c8203abde6148b9566f0f3289f831 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 31 Dec 2021 23:53:10 -0700 Subject: [PATCH 03/21] missed one `fill_value=None` case --- stackstac/to_dask.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 3508f19..2a06de7 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -31,7 +31,7 @@ def items_to_dask( ) -> da.Array: errors_as_nodata = errors_as_nodata or () # be sure it's not None - if fill_value is not None and not np.can_cast(fill_value, dtype): + if not np.can_cast(fill_value, dtype): raise ValueError( f"The fill_value {fill_value} is incompatible with the output dtype {dtype}. " f"Either use `dtype={np.array(fill_value).dtype.name!r}`, or pick a different `fill_value`." @@ -117,7 +117,9 @@ def asset_entry_to_reader_and_window( asset_bounds: Bbox = asset_entry["bounds"] asset_window = windows.from_bounds( - *asset_bounds, transform=spec.transform, precision=0.0 + *asset_bounds, + transform=spec.transform, + precision=0.0 # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 ) From f0f58e9f9be9e10c9241eb12607436b6cd8483d3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 4 Jan 2022 22:43:09 -0700 Subject: [PATCH 04/21] start groundwork for multi-chunk asset table opening asset table probably works; reads still todo. a few missteps along the way: * `np.vectorize` on `asset_table_to_reader_and_window` is segfaulting. May have to do with all the extra arguments? Don't care to gdb it; just moved the vectorization into the function * Blockwise wants to fuse the reader table to the reads. I'm not sure it's good that blockwise optimization tries to fuse such a large expansion, but probably kinda makes sense. Unfortunately the way to disable it (annotations) feels like a little bit of a hack --- stackstac/tests/test_to_dask.py | 13 ++- stackstac/to_dask.py | 137 +++++++++++++++++--------------- 2 files changed, 87 insertions(+), 63 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 6962cd3..9f5155d 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -1,4 +1,6 @@ from __future__ import annotations +from threading import Lock +from typing import ClassVar import numpy as np from rasterio import windows @@ -20,7 +22,7 @@ def test_items_to_dask_basic(): ], dtype=ASSET_TABLE_DT, ) - spec_ = RasterSpec(4326, (0, 0, 7, 8), (0.2, 0.2)) + spec_ = RasterSpec(4326, (0, 0, 7, 8), (0.5, 0.5)) chunksize = 2 dtype_ = np.dtype("int32") fill_value_ = -1 @@ -59,6 +61,9 @@ def test_items_to_dask_basic(): ) class TestReader: + opened: ClassVar[set[str]] = set() + lock: ClassVar[Lock] = Lock() + def __init__( self, *, @@ -68,6 +73,12 @@ def __init__( fill_value: int | float, **kwargs, ) -> None: + with self.lock: + # Each URL should only be opened once. + # The `dask.annotate` on the `asset_table_to_reader_and_window` step is necessary for this, + # otherwise blockwise fusion would merge the Reader creation into every `fetch_raster_window`! + assert url not in self.opened + self.opened.add(url) i, j = map(int, url[7:].split("/")) self.full_data = results[i, j] self.window = asset_windows[url] diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 2a06de7..1281405 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -1,7 +1,7 @@ from __future__ import annotations import itertools -from typing import ClassVar, Optional, Tuple, Type, TypeVar, Union +from typing import ClassVar, Optional, Tuple, Type, Union import warnings import dask @@ -51,54 +51,55 @@ def items_to_dask( name="asset-table-" + dask.base.tokenize(asset_table), ) - # then map a function over each chunk that opens that URL as a rasterio dataset - # HACK: `url_to_ds` doesn't even return a NumPy array, so `datasets.compute()` would fail - # (because the chunks can't be `np.concatenate`d together). - # but we're just using this for the graph. - # So now we have an array of shape (items, assets), chunksize 1---the outer two dimensions of our final array. - datasets = asset_table_dask.map_blocks( - asset_entry_to_reader_and_window, - spec, - resampling, - dtype, - fill_value, - rescale, - gdal_env, - errors_as_nodata, - reader, - meta=asset_table_dask._meta, - ) + # map a function over each chunk that opens that URL as a rasterio dataset + with dask.annotate(fuse=False): + # ^ HACK: prevent this layer from fusing to the next `fetch_raster_window` one. + # This relies on the fact that blockwise fusion doesn't happen when the layers' annotations + # don't match, which may not be behavior we can rely on. + # (The actual content of the annotation is irrelevant here.) + reader_table = asset_table_dask.map_blocks( + asset_table_to_reader_and_window, + spec, + resampling, + dtype, + fill_value, + rescale, + gdal_env, + errors_as_nodata, + reader, + dtype=object, + ) shape_yx = spec.shape chunks_yx = da.core.normalize_chunks(chunksize, shape_yx) - chunks = datasets.chunks + chunks_yx + chunks = reader_table.chunks + chunks_yx with warnings.catch_warnings(): warnings.simplefilter("ignore", category=da.core.PerformanceWarning) - name = f"fetch_raster_window-{dask.base.tokenize(datasets, chunks)}" + name = f"fetch_raster_window-{dask.base.tokenize(reader_table, chunks)}" # TODO use `da.blockwise` once it supports `BlockwiseDep`s as arguments lyr = blockwise( fetch_raster_window, name, "tbyx", - datasets.name, + reader_table.name, "tb", Slices(chunks_yx), "yx", - numblocks={datasets.name: datasets.numblocks}, # ugh + numblocks={reader_table.name: reader_table.numblocks}, # ugh ) - dsk = HighLevelGraph.from_collections(name, lyr, [datasets]) + dsk = HighLevelGraph.from_collections(name, lyr, [reader_table]) rasters = da.Array(dsk, name, chunks, meta=np.ndarray((), dtype=dtype)) return rasters -ReaderT = TypeVar("ReaderT", bound=Reader) +ReaderTableEntry = Union[tuple[Reader, windows.Window], np.ndarray] -def asset_entry_to_reader_and_window( - asset_entry: np.ndarray, +def asset_table_to_reader_and_window( + asset_table: np.ndarray, spec: RasterSpec, resampling: Resampling, dtype: np.dtype, @@ -106,55 +107,67 @@ def asset_entry_to_reader_and_window( rescale: bool, gdal_env: Optional[LayeredEnv], errors_as_nodata: Tuple[Exception, ...], - reader: Type[ReaderT], -) -> Tuple[ReaderT, windows.Window] | np.ndarray: - asset_entry = asset_entry[0, 0] - # ^ because dask adds extra outer dims in `from_array` - url = asset_entry["url"] - if url is None: - # Signifies empty value - return np.array(fill_value, dtype) - - asset_bounds: Bbox = asset_entry["bounds"] - asset_window = windows.from_bounds( - *asset_bounds, - transform=spec.transform, - precision=0.0 - # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 - ) - - return ( - reader( - url=url, - spec=spec, - resampling=resampling, - dtype=dtype, - fill_value=fill_value, - rescale=rescale, - gdal_env=gdal_env, - errors_as_nodata=errors_as_nodata, - ), - asset_window, - ) + reader: Type[Reader], +) -> np.ndarray: + """ + "Open" an asset table by creating a `Reader` for each asset. + + This function converts the asset table (or chunks thereof) into an object array, + where each element contains a tuple of the `Reader` and `Window` for that asset, + or a one-element array of ``fill_value``, if the element has no URL. + """ + reader_table = np.empty_like(asset_table, dtype=object) + entry: ReaderTableEntry + for index, asset_entry in np.ndenumerate(asset_table): + url = asset_entry["url"] + if url is None: + entry = np.array(fill_value, dtype) + # ^ signifies empty value; will be broadcast to output chunk size upon read + else: + asset_bounds: Bbox = asset_entry["bounds"] + asset_window = windows.from_bounds( + *asset_bounds, + transform=spec.transform, + precision=0.0 + # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 + ) + + entry = ( + reader( + url=url, + spec=spec, + resampling=resampling, + dtype=dtype, + fill_value=fill_value, + rescale=rescale, + gdal_env=gdal_env, + errors_as_nodata=errors_as_nodata, + ), + asset_window, + ) + reader_table[index] = entry + return reader_table def fetch_raster_window( - asset_entry: Tuple[ReaderT, windows.Window] | np.ndarray, + reader_entry: np.ndarray, slices: Tuple[slice, slice], ) -> np.ndarray: assert len(slices) == 2, slices + assert reader_entry.size == 1, reader_entry.size + entry: ReaderTableEntry = reader_entry.item() + # ^ TODO handle >1 asset, i.e. chunking of time/band dims current_window = windows.Window.from_slices(*slices) - if isinstance(asset_entry, tuple): - reader, asset_window = asset_entry - + if isinstance(entry, tuple): + reader, asset_window = entry # check that the window we're fetching overlaps with the asset if windows.intersect(current_window, asset_window): data = reader.read(current_window) - return data[None, None] + return data[None, None] # add empty outer time, band dims fill_arr = np.array(reader.fill_value, reader.dtype) else: - fill_arr: np.ndarray = asset_entry + fill_arr = entry # no dataset, or we didn't overlap it: return empty data. # use the broadcast trick for even fewer memz From be289692a836168e1c0082306320f23a8c991539 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 5 Jan 2022 12:44:02 -0700 Subject: [PATCH 05/21] multi-element reader table Passing dtype and fill_value into `fetch_raster_window`, instead of storing them in the reader table, simplifies things a lot. Also lets us get rid of those fields on the reader protocol. --- stackstac/nodata_reader.py | 8 +--- stackstac/reader_protocol.py | 9 +--- stackstac/tests/test_to_dask.py | 3 -- stackstac/to_dask.py | 75 ++++++++++++++++++++------------- 4 files changed, 48 insertions(+), 47 deletions(-) diff --git a/stackstac/nodata_reader.py b/stackstac/nodata_reader.py index e453797..8aab7f1 100644 --- a/stackstac/nodata_reader.py +++ b/stackstac/nodata_reader.py @@ -1,4 +1,4 @@ -from typing import Tuple, Type, Union, cast +from typing import Tuple, Type, Union import re import numpy as np @@ -37,11 +37,7 @@ def __setstate__(self, state: State) -> None: def nodata_for_window(window: Window, fill_value: Union[int, float], dtype: np.dtype): - height = cast(int, window.height) - width = cast(int, window.width) - # Argument of type "tuple[_T@attrib, _T@attrib]" cannot be assigned to parameter "shape" of type "_ShapeLike" - # in function "full" - return np.full((height, width), fill_value, dtype) + return np.full((window.height, window.width), fill_value, dtype) def exception_matches(e: Exception, patterns: Tuple[Exception, ...]) -> bool: diff --git a/stackstac/reader_protocol.py b/stackstac/reader_protocol.py index ad4f457..81f7da4 100644 --- a/stackstac/reader_protocol.py +++ b/stackstac/reader_protocol.py @@ -26,9 +26,6 @@ class Reader(Pickleable, Protocol): Protocol for a thread-safe, lazily-loaded object for reading data from a single-band STAC asset. """ - fill_value: Union[int, float] - dtype: np.dtype - def __init__( self, *, @@ -116,12 +113,8 @@ class FakeReader: or inherent to the dask graph. """ - def __init__( - self, *, dtype: np.dtype, fill_value: Union[int, float], **kwargs - ) -> None: - pass + def __init__(self, *, dtype: np.dtype, **kwargs) -> None: self.dtype = dtype - self.fill_value = fill_value def read(self, window: Window, **kwargs) -> np.ndarray: return np.random.random((window.height, window.width)).astype(self.dtype) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 9f5155d..cc7cfd5 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -86,9 +86,6 @@ def __init__( assert spec == spec_ assert dtype == dtype_ assert fill_value == fill_value_ - # NOTE: needed for `Reader` interface: - self.dtype = dtype - self.fill_value = fill_value def read(self, window: windows.Window) -> np.ndarray: assert (window.height, window.width) == (chunksize, chunksize) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 1281405..2c71ad1 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -54,9 +54,9 @@ def items_to_dask( # map a function over each chunk that opens that URL as a rasterio dataset with dask.annotate(fuse=False): # ^ HACK: prevent this layer from fusing to the next `fetch_raster_window` one. - # This relies on the fact that blockwise fusion doesn't happen when the layers' annotations + # This uses the fact that blockwise fusion doesn't happen when the layers' annotations # don't match, which may not be behavior we can rely on. - # (The actual content of the annotation is irrelevant here.) + # (The actual content of the annotation is irrelevant here, just that there is one.) reader_table = asset_table_dask.map_blocks( asset_table_to_reader_and_window, spec, @@ -87,6 +87,10 @@ def items_to_dask( "tb", Slices(chunks_yx), "yx", + dtype, + None, + fill_value, + None, numblocks={reader_table.name: reader_table.numblocks}, # ugh ) dsk = HighLevelGraph.from_collections(name, lyr, [reader_table]) @@ -95,7 +99,7 @@ def items_to_dask( return rasters -ReaderTableEntry = Union[tuple[Reader, windows.Window], np.ndarray] +ReaderTableEntry = Optional[tuple[Reader, windows.Window]] def asset_table_to_reader_and_window( @@ -114,16 +118,12 @@ def asset_table_to_reader_and_window( This function converts the asset table (or chunks thereof) into an object array, where each element contains a tuple of the `Reader` and `Window` for that asset, - or a one-element array of ``fill_value``, if the element has no URL. + or None if the element has no URL. """ reader_table = np.empty_like(asset_table, dtype=object) - entry: ReaderTableEntry for index, asset_entry in np.ndenumerate(asset_table): - url = asset_entry["url"] - if url is None: - entry = np.array(fill_value, dtype) - # ^ signifies empty value; will be broadcast to output chunk size upon read - else: + url: str | None = asset_entry["url"] + if url: asset_bounds: Bbox = asset_entry["bounds"] asset_window = windows.from_bounds( *asset_bounds, @@ -132,7 +132,7 @@ def asset_table_to_reader_and_window( # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 ) - entry = ( + entry: ReaderTableEntry = ( reader( url=url, spec=spec, @@ -145,33 +145,48 @@ def asset_table_to_reader_and_window( ), asset_window, ) - reader_table[index] = entry + reader_table[index] = entry return reader_table def fetch_raster_window( - reader_entry: np.ndarray, + reader_table: np.ndarray, slices: Tuple[slice, slice], + dtype: np.dtype, + fill_value: Union[int, float], ) -> np.ndarray: + "Do a spatially-windowed read of raster data from all the Readers in the table." assert len(slices) == 2, slices - assert reader_entry.size == 1, reader_entry.size - entry: ReaderTableEntry = reader_entry.item() - # ^ TODO handle >1 asset, i.e. chunking of time/band dims current_window = windows.Window.from_slices(*slices) - if isinstance(entry, tuple): - reader, asset_window = entry - # check that the window we're fetching overlaps with the asset - if windows.intersect(current_window, asset_window): - data = reader.read(current_window) - - return data[None, None] # add empty outer time, band dims - fill_arr = np.array(reader.fill_value, reader.dtype) - else: - fill_arr = entry - - # no dataset, or we didn't overlap it: return empty data. - # use the broadcast trick for even fewer memz - return np.broadcast_to(fill_arr, (1, 1) + windows.shape(current_window)) + + assert reader_table.size, "Empty reader_table" + # Start with an empty output array, using the broadcast trick for even fewer memz. + # If none of the assets end up actually existing, or overlapping the current window, + # we'll just return this 1-element array that's been broadcast to look like a full-size array. + output = np.broadcast_to( + np.array(fill_value, dtype), + reader_table.shape + (current_window.height, current_window.width), + ) + + all_empty: bool = True + entry: ReaderTableEntry + for index, entry in np.ndenumerate(reader_table): + if entry: + reader, asset_window = entry + # Only read if the window we're fetching actually overlaps with the asset + if windows.intersect(current_window, asset_window): + if all_empty: + # Copy to a full-size array so it's writeable + output = np.array(output) + all_empty = False + + # NOTE: when there are multiple assets, we _could_ parallelize these reads with our own threadpool. + # However, that would probably increase memory usage, since the internal, thread-local GDAL datasets + # would end up copied to even more threads. + + # TODO when the Reader won't be rescaling, support passing `output` to avoid the copy? + output[index] = reader.read(current_window) + return output class Slices(BlockwiseDep): From ae17864d6ce2158c547ff2c1f7fd547c9ea279c6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 5 Jan 2022 12:55:18 -0700 Subject: [PATCH 06/21] update comments --- stackstac/tests/test_to_dask.py | 2 +- stackstac/to_dask.py | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index cc7cfd5..c8da500 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -45,7 +45,7 @@ def test_items_to_dask_basic(): *asset["bounds"], transform=spec_.transform, precision=0.0 - # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 + # ^ https://github.com/rasterio/rasterio/issues/2374 ) # convert to int so `toslices` works for indexing window_int = window.round_lengths().round_offsets() diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 2c71ad1..389bdec 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -29,6 +29,7 @@ def items_to_dask( gdal_env: Optional[LayeredEnv] = None, errors_as_nodata: Tuple[Exception, ...] = (), ) -> da.Array: + "Create a dask Array from an asset table" errors_as_nodata = errors_as_nodata or () # be sure it's not None if not np.can_cast(fill_value, dtype): @@ -38,8 +39,10 @@ def items_to_dask( ) # The overall strategy in this function is to materialize the outer two dimensions (items, assets) - # as one dask array, then the chunks of the inner two dimensions (y, x) as another dask array, then use - # Blockwise to represent the cartesian product between them, to avoid materializing that entire graph. + # as one dask array (the "asset table"), then map a function over it which opens each URL as a `Reader` + # instance (the "reader table"). + # Then, we use the `Slices` `BlockwiseDep` to represent the inner inner two dimensions (y, x), and + # `Blockwise` to create the cartesian product between them, avoiding materializing that entire graph. # Materializing the (items, assets) dimensions is unavoidable: every asset has a distinct URL, so that information # has to be included somehow. @@ -129,7 +132,7 @@ def asset_table_to_reader_and_window( *asset_bounds, transform=spec.transform, precision=0.0 - # ^ `precision=0.0`: https://github.com/rasterio/rasterio/issues/2374 + # ^ https://github.com/rasterio/rasterio/issues/2374 ) entry: ReaderTableEntry = ( @@ -190,6 +193,10 @@ def fetch_raster_window( class Slices(BlockwiseDep): + "Produces the slice into the full-size array corresponding to the current chunk" + # TODO this needs to be in dask/dask, otherwise the scheduler will refuse to import it + # without passlisting stackstac in `distributed.scheduler.allowed-imports`. + starts: list[tuple[int, ...]] produces_tasks: ClassVar[bool] = False From 6e0016effea16452b8645704ff0c18acd18d9c57 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 11 Jan 2022 22:48:22 -0700 Subject: [PATCH 07/21] don't expand output array when data is all empty --- stackstac/to_dask.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 389bdec..820857a 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -20,7 +20,7 @@ def items_to_dask( asset_table: np.ndarray, spec: RasterSpec, - chunksize: int, + chunksize: int | tuple[int, int], resampling: Resampling = Resampling.nearest, dtype: np.dtype = np.dtype("float64"), fill_value: Union[int, float] = np.nan, @@ -178,17 +178,27 @@ def fetch_raster_window( reader, asset_window = entry # Only read if the window we're fetching actually overlaps with the asset if windows.intersect(current_window, asset_window): - if all_empty: - # Copy to a full-size array so it's writeable - output = np.array(output) - all_empty = False - # NOTE: when there are multiple assets, we _could_ parallelize these reads with our own threadpool. # However, that would probably increase memory usage, since the internal, thread-local GDAL datasets # would end up copied to even more threads. # TODO when the Reader won't be rescaling, support passing `output` to avoid the copy? - output[index] = reader.read(current_window) + data = reader.read(current_window) + + if all_empty: + # Turn `output` from a broadcast-trick array a real array so it's writeable + if ( + np.isnan(data) + if np.isnan(fill_value) + else np.equal(data, fill_value) + ).all(): + # Unless the data we just read is all empty anyway + continue + output = np.array(output) + all_empty = False + + output[index] = data + return output From f37e5ede6f129e1909c7c8dad9c28c3955267af1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 11 Jan 2022 22:51:15 -0700 Subject: [PATCH 08/21] no need to round --- stackstac/tests/test_to_dask.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index c8da500..a15f795 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -47,17 +47,13 @@ def test_items_to_dask_basic(): precision=0.0 # ^ https://github.com/rasterio/rasterio/issues/2374 ) - # convert to int so `toslices` works for indexing - window_int = window.round_lengths().round_offsets() - # sanity check; rounding should not have changed anything - assert window_int == window - asset_windows[url] = window_int + asset_windows[url] = window - chunk = results[(i, j) + window_int.toslices()] + chunk = results[(i, j) + windows.window_index(window)] if chunk.size: # Asset falls within final bounds chunk[:] = np.random.default_rng().integers( - 0, 10000, (int(window_int.height), int(window_int.width)), dtype_ + 0, 10000, (int(window.height), int(window.width)), dtype_ ) class TestReader: From b81527ec8a27a34cfb5f4147b1d75c2e3ed07116 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 11 Jan 2022 23:03:07 -0700 Subject: [PATCH 09/21] hypothesis for dtype and fill value small steps! --- .gitignore | 1 + poetry.lock | 93 +++++++--- pyproject.toml | 1 + stackstac/testing/__init__.py | 0 stackstac/testing/strategies.py | 317 ++++++++++++++++++++++++++++++++ stackstac/tests/test_to_dask.py | 17 +- 6 files changed, 397 insertions(+), 32 deletions(-) create mode 100644 stackstac/testing/__init__.py create mode 100644 stackstac/testing/strategies.py diff --git a/.gitignore b/.gitignore index 93c328e..8376004 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ __pycache__ *.pyc +.hypothesis dask-worker-space .ipynb_checkpoints diff --git a/poetry.lock b/poetry.lock index e1a95e5..a47a98a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -721,6 +721,34 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "hypothesis" +version = "6.35.0" +description = "A library for property-based testing" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +attrs = ">=19.2.0" +sortedcontainers = ">=2.1.0,<3.0.0" + +[package.extras] +all = ["black (>=19.10b0)", "click (>=7.0)", "django (>=2.2)", "dpcontracts (>=0.4)", "lark-parser (>=0.6.5)", "libcst (>=0.3.16)", "numpy (>=1.9.0)", "pandas (>=0.25)", "pytest (>=4.6)", "python-dateutil (>=1.4)", "pytz (>=2014.1)", "redis (>=3.0.0)", "rich (>=9.0.0)", "importlib-metadata (>=3.6)", "backports.zoneinfo (>=0.2.1)", "tzdata (>=2021.5)"] +cli = ["click (>=7.0)", "black (>=19.10b0)", "rich (>=9.0.0)"] +codemods = ["libcst (>=0.3.16)"] +dateutil = ["python-dateutil (>=1.4)"] +django = ["django (>=2.2)"] +dpcontracts = ["dpcontracts (>=0.4)"] +ghostwriter = ["black (>=19.10b0)"] +lark = ["lark-parser (>=0.6.5)"] +numpy = ["numpy (>=1.9.0)"] +pandas = ["pandas (>=0.25)"] +pytest = ["pytest (>=4.6)"] +pytz = ["pytz (>=2014.1)"] +redis = ["redis (>=3.0.0)"] +zoneinfo = ["backports.zoneinfo (>=0.2.1)", "tzdata (>=2021.5)"] + [[package]] name = "idna" version = "3.3" @@ -1715,7 +1743,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "pydantic" -version = "1.8.2" +version = "1.9.0" description = "Data validation and settings management using python 3.6 type hinting" category = "main" optional = true @@ -2616,7 +2644,7 @@ viz = ["aiohttp", "cachetools", "distributed", "ipyleaflet", "jupyter-server-pro [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "9c41586f3a1e06e9f15e47d9b5772fe06f54d501296976e0311c35f1d54695d7" +content-hash = "41874d16cb9d52431167f79a586ae4f95cdaa2e8cc45e3e2247d881211bf57b2" [metadata.files] affine = [ @@ -3107,6 +3135,10 @@ heapdict = [ {file = "HeapDict-1.0.1-py3-none-any.whl", hash = "sha256:6065f90933ab1bb7e50db403b90cab653c853690c5992e69294c2de2b253fc92"}, {file = "HeapDict-1.0.1.tar.gz", hash = "sha256:8495f57b3e03d8e46d5f1b2cc62ca881aca392fd5cc048dc0aa2e1a6d23ecdb6"}, ] +hypothesis = [ + {file = "hypothesis-6.35.0-py3-none-any.whl", hash = "sha256:b51d1cbfd5c5d38b59435f695a1ff790c5fa7600baa59eba202a7bf8498d9043"}, + {file = "hypothesis-6.35.0.tar.gz", hash = "sha256:ce3961fff61e7353d022608788cbc9876c293d2468749eeba27511adc9565131"}, +] idna = [ {file = "idna-3.3-py3-none-any.whl", hash = "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"}, {file = "idna-3.3.tar.gz", hash = "sha256:9d643ff0a55b762d5cdb124b8eaa99c66322e2157b69160bc32796e824360e6d"}, @@ -3732,28 +3764,41 @@ pycparser = [ {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, ] pydantic = [ - {file = "pydantic-1.8.2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:05ddfd37c1720c392f4e0d43c484217b7521558302e7069ce8d318438d297739"}, - {file = "pydantic-1.8.2-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:a7c6002203fe2c5a1b5cbb141bb85060cbff88c2d78eccbc72d97eb7022c43e4"}, - {file = "pydantic-1.8.2-cp36-cp36m-manylinux2014_i686.whl", hash = "sha256:589eb6cd6361e8ac341db97602eb7f354551482368a37f4fd086c0733548308e"}, - {file = "pydantic-1.8.2-cp36-cp36m-manylinux2014_x86_64.whl", hash = "sha256:10e5622224245941efc193ad1d159887872776df7a8fd592ed746aa25d071840"}, - {file = "pydantic-1.8.2-cp36-cp36m-win_amd64.whl", hash = "sha256:99a9fc39470010c45c161a1dc584997f1feb13f689ecf645f59bb4ba623e586b"}, - {file = "pydantic-1.8.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:a83db7205f60c6a86f2c44a61791d993dff4b73135df1973ecd9eed5ea0bda20"}, - {file = "pydantic-1.8.2-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:41b542c0b3c42dc17da70554bc6f38cbc30d7066d2c2815a94499b5684582ecb"}, - {file = "pydantic-1.8.2-cp37-cp37m-manylinux2014_i686.whl", hash = "sha256:ea5cb40a3b23b3265f6325727ddfc45141b08ed665458be8c6285e7b85bd73a1"}, - {file = "pydantic-1.8.2-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:18b5ea242dd3e62dbf89b2b0ec9ba6c7b5abaf6af85b95a97b00279f65845a23"}, - {file = "pydantic-1.8.2-cp37-cp37m-win_amd64.whl", hash = "sha256:234a6c19f1c14e25e362cb05c68afb7f183eb931dd3cd4605eafff055ebbf287"}, - {file = "pydantic-1.8.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:021ea0e4133e8c824775a0cfe098677acf6fa5a3cbf9206a376eed3fc09302cd"}, - {file = "pydantic-1.8.2-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e710876437bc07bd414ff453ac8ec63d219e7690128d925c6e82889d674bb505"}, - {file = "pydantic-1.8.2-cp38-cp38-manylinux2014_i686.whl", hash = "sha256:ac8eed4ca3bd3aadc58a13c2aa93cd8a884bcf21cb019f8cfecaae3b6ce3746e"}, - {file = "pydantic-1.8.2-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:4a03cbbe743e9c7247ceae6f0d8898f7a64bb65800a45cbdc52d65e370570820"}, - {file = "pydantic-1.8.2-cp38-cp38-win_amd64.whl", hash = "sha256:8621559dcf5afacf0069ed194278f35c255dc1a1385c28b32dd6c110fd6531b3"}, - {file = "pydantic-1.8.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8b223557f9510cf0bfd8b01316bf6dd281cf41826607eada99662f5e4963f316"}, - {file = "pydantic-1.8.2-cp39-cp39-manylinux1_i686.whl", hash = "sha256:244ad78eeb388a43b0c927e74d3af78008e944074b7d0f4f696ddd5b2af43c62"}, - {file = "pydantic-1.8.2-cp39-cp39-manylinux2014_i686.whl", hash = "sha256:05ef5246a7ffd2ce12a619cbb29f3307b7c4509307b1b49f456657b43529dc6f"}, - {file = "pydantic-1.8.2-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:54cd5121383f4a461ff7644c7ca20c0419d58052db70d8791eacbbe31528916b"}, - {file = "pydantic-1.8.2-cp39-cp39-win_amd64.whl", hash = "sha256:4be75bebf676a5f0f87937c6ddb061fa39cbea067240d98e298508c1bda6f3f3"}, - {file = "pydantic-1.8.2-py3-none-any.whl", hash = "sha256:fec866a0b59f372b7e776f2d7308511784dace622e0992a0b59ea3ccee0ae833"}, - {file = "pydantic-1.8.2.tar.gz", hash = "sha256:26464e57ccaafe72b7ad156fdaa4e9b9ef051f69e175dbbb463283000c05ab7b"}, + {file = "pydantic-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cb23bcc093697cdea2708baae4f9ba0e972960a835af22560f6ae4e7e47d33f5"}, + {file = "pydantic-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1d5278bd9f0eee04a44c712982343103bba63507480bfd2fc2790fa70cd64cf4"}, + {file = "pydantic-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ab624700dc145aa809e6f3ec93fb8e7d0f99d9023b713f6a953637429b437d37"}, + {file = "pydantic-1.9.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c8d7da6f1c1049eefb718d43d99ad73100c958a5367d30b9321b092771e96c25"}, + {file = "pydantic-1.9.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:3c3b035103bd4e2e4a28da9da7ef2fa47b00ee4a9cf4f1a735214c1bcd05e0f6"}, + {file = "pydantic-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:3011b975c973819883842c5ab925a4e4298dffccf7782c55ec3580ed17dc464c"}, + {file = "pydantic-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:086254884d10d3ba16da0588604ffdc5aab3f7f09557b998373e885c690dd398"}, + {file = "pydantic-1.9.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:0fe476769acaa7fcddd17cadd172b156b53546ec3614a4d880e5d29ea5fbce65"}, + {file = "pydantic-1.9.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8e9dcf1ac499679aceedac7e7ca6d8641f0193c591a2d090282aaf8e9445a46"}, + {file = "pydantic-1.9.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d1e4c28f30e767fd07f2ddc6f74f41f034d1dd6bc526cd59e63a82fe8bb9ef4c"}, + {file = "pydantic-1.9.0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:c86229333cabaaa8c51cf971496f10318c4734cf7b641f08af0a6fbf17ca3054"}, + {file = "pydantic-1.9.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:c0727bda6e38144d464daec31dff936a82917f431d9c39c39c60a26567eae3ed"}, + {file = "pydantic-1.9.0-cp36-cp36m-win_amd64.whl", hash = "sha256:dee5ef83a76ac31ab0c78c10bd7d5437bfdb6358c95b91f1ba7ff7b76f9996a1"}, + {file = "pydantic-1.9.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:d9c9bdb3af48e242838f9f6e6127de9be7063aad17b32215ccc36a09c5cf1070"}, + {file = "pydantic-1.9.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ee7e3209db1e468341ef41fe263eb655f67f5c5a76c924044314e139a1103a2"}, + {file = "pydantic-1.9.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0b6037175234850ffd094ca77bf60fb54b08b5b22bc85865331dd3bda7a02fa1"}, + {file = "pydantic-1.9.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:b2571db88c636d862b35090ccf92bf24004393f85c8870a37f42d9f23d13e032"}, + {file = "pydantic-1.9.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:8b5ac0f1c83d31b324e57a273da59197c83d1bb18171e512908fe5dc7278a1d6"}, + {file = "pydantic-1.9.0-cp37-cp37m-win_amd64.whl", hash = "sha256:bbbc94d0c94dd80b3340fc4f04fd4d701f4b038ebad72c39693c794fd3bc2d9d"}, + {file = "pydantic-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e0896200b6a40197405af18828da49f067c2fa1f821491bc8f5bde241ef3f7d7"}, + {file = "pydantic-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7bdfdadb5994b44bd5579cfa7c9b0e1b0e540c952d56f627eb227851cda9db77"}, + {file = "pydantic-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:574936363cd4b9eed8acdd6b80d0143162f2eb654d96cb3a8ee91d3e64bf4cf9"}, + {file = "pydantic-1.9.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c556695b699f648c58373b542534308922c46a1cda06ea47bc9ca45ef5b39ae6"}, + {file = "pydantic-1.9.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:f947352c3434e8b937e3aa8f96f47bdfe6d92779e44bb3f41e4c213ba6a32145"}, + {file = "pydantic-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5e48ef4a8b8c066c4a31409d91d7ca372a774d0212da2787c0d32f8045b1e034"}, + {file = "pydantic-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:96f240bce182ca7fe045c76bcebfa0b0534a1bf402ed05914a6f1dadff91877f"}, + {file = "pydantic-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:815ddebb2792efd4bba5488bc8fde09c29e8ca3227d27cf1c6990fc830fd292b"}, + {file = "pydantic-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6c5b77947b9e85a54848343928b597b4f74fc364b70926b3c4441ff52620640c"}, + {file = "pydantic-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c68c3bc88dbda2a6805e9a142ce84782d3930f8fdd9655430d8576315ad97ce"}, + {file = "pydantic-1.9.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5a79330f8571faf71bf93667d3ee054609816f10a259a109a0738dac983b23c3"}, + {file = "pydantic-1.9.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:f5a64b64ddf4c99fe201ac2724daada8595ada0d102ab96d019c1555c2d6441d"}, + {file = "pydantic-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:a733965f1a2b4090a5238d40d983dcd78f3ecea221c7af1497b845a9709c1721"}, + {file = "pydantic-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:2cc6a4cb8a118ffec2ca5fcb47afbacb4f16d0ab8b7350ddea5e8ef7bcc53a16"}, + {file = "pydantic-1.9.0-py3-none-any.whl", hash = "sha256:085ca1de245782e9b46cefcf99deecc67d418737a1fd3f6a4f511344b613a5b3"}, + {file = "pydantic-1.9.0.tar.gz", hash = "sha256:742645059757a56ecd886faf4ed2441b9c0cd406079c2b4bee51bcc3fbcd510a"}, ] pyflakes = [ {file = "pyflakes-2.3.1-py2.py3-none-any.whl", hash = "sha256:7893783d01b8a89811dd72d7dfd4d84ff098e5eed95cfa8905b22bbffe52efc3"}, diff --git a/pyproject.toml b/pyproject.toml index 39c00f3..b514f9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ snakeviz = "^2.1.0" sphinx-autobuild = "^2021.3.14" twine = "^3.4.1" pytest = "^6.2.5" +hypothesis = "^6.35.0" [tool.poetry.extras] binder = [ diff --git a/stackstac/testing/__init__.py b/stackstac/testing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stackstac/testing/strategies.py b/stackstac/testing/strategies.py new file mode 100644 index 0000000..da5d36d --- /dev/null +++ b/stackstac/testing/strategies.py @@ -0,0 +1,317 @@ +from __future__ import annotations +# from math import isfinite +# import sys + +# from hypothesis import assume, reject, strategies as st +import hypothesis.extra.numpy as st_np +# import numpy as np + +# from stackstac.geom_utils import Bbox, bounds_width_height +# from stackstac.prepare import ASSET_TABLE_DT +# from stackstac.raster_spec import RasterSpec + + +# @st.composite +# def bboxes( +# draw: st.DrawFn, +# minx: int | float | None = None, +# miny: int | float | None = None, +# maxx: int | float | None = None, +# maxy: int | float | None = None, +# min_width: int | float = sys.float_info.epsilon, +# max_width: int | float | None = None, +# min_height: int | float = sys.float_info.epsilon, +# max_height: int | float | None = None, +# ) -> Bbox: +# if minx is not None and maxx is not None: +# assert maxx > minx +# if miny is not None and maxy is not None: +# assert maxy > miny + +# assert min_width > 0 +# assert min_height > 0 +# if max_width is not None: +# assert max_width > min_width +# if max_height is not None: +# assert max_height > min_height + +# maxx = sys.float_info.max if maxx is None else maxx +# maxy = sys.float_info.max if maxy is None else maxy +# # ^ https://github.com/HypothesisWorks/hypothesis/issues/3208 +# west = draw( +# st.floats( +# min_value=minx, +# max_value=maxx - min_width, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, +# exclude_max=True, +# ), +# label="west", +# ) +# assume(isfinite(west + min_width)) +# if max_width: +# assume(west + max_width > west) +# south = draw( +# st.floats( +# min_value=miny, +# max_value=maxy - min_height, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, +# exclude_max=True, +# ), +# label="south", +# ) +# assume(isfinite(south + min_height)) +# if max_height: +# assume(south + max_height > south) +# east = draw( +# st.floats( +# min_value=west + min_width, +# max_value=min(maxx, west + max_width) if max_width else maxx, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, +# exclude_min=True, +# ), +# label="east", +# ) +# assume(east - west >= min_width) +# if max_width: +# assume(east - west <= max_width) +# north = draw( +# st.floats( +# min_value=south + min_height, +# max_value=min(maxy, south + max_height) if max_height else maxy, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, +# exclude_min=True, +# ), +# label="north", +# ) +# assume(north - south >= min_height) +# if max_height: +# assume(north - south <= max_height) +# return (west, south, east, north) + + +# @st.composite +# def simple_bboxes(draw: st.DrawFn, zero_size: bool = True) -> Bbox: +# west = draw(st.integers(-100, 99)) +# south = draw(st.integers(-100, 99)) +# east = draw(st.integers(west if zero_size else west + 1, 100)) +# north = draw(st.integers(south if zero_size else south + 1, 100)) +# factor = draw(st.integers(1, 4)) +# return (west / factor, south / factor, east / factor, north / factor) + + +# @st.composite +# def asset_tables( +# draw: st.DrawFn, +# max_side: int | None = None, +# bbox: Bbox | None = None, +# ) -> np.ndarray: +# shape = draw( +# st_np.array_shapes(min_dims=2, max_dims=2, max_side=max_side), label="shape" +# ) +# bounds_arr = draw( +# st_np.arrays( +# object, +# shape, +# # elements=bboxes() if not bbox else bboxes(*bbox), +# elements=simple_bboxes(), +# fill=st.none(), +# ), +# label="bounds_arr", +# ) + +# asset_table = np.empty_like(bounds_arr, ASSET_TABLE_DT) +# for (i, j), bounds in np.ndenumerate(bounds_arr): +# if bounds: +# asset_table[i, j] = (f"fake://{i}/{j}", bounds) + +# return asset_table + + +# def resolutions( +# min_x: int | float | None = sys.float_info.epsilon, +# max_x: int | float | None = sys.float_info.max / 2, +# min_y: int | float | None = sys.float_info.epsilon, +# max_y: int | float | None = sys.float_info.max / 2, +# equal: bool = False, +# ) -> st.SearchStrategy[tuple[float, float]]: +# """ +# Strategy that generates tuples of x, y resolutions + +# With ``equal=True``, equal x and y resolutions will be picked +# that satisfy all the min/max constraints. Otherwise, the resolutions +# along x vs y may be different. + +# By default, this strategy does not use the full range of floating-point values, +# because very large or small numbers generally cause numerical errors downstream. +# """ +# # Notes on default values: +# # Floats small enough that `x * -x == 0` will generally cause numerical errors downstream. +# # The machine epsilon is not the smallest _possible_ value that will work here, but since +# # we don't particularly care about these extreme ranges for the sorts of tests we're doing, +# # epsilon will suffice as a "smallest reasonable resolution". +# # Similarly, very large resolutions may overflow the `.shape` calculation on `RasterSpec`, +# # since it uses non-standard GDAL-style rounding. +# if equal: +# min_ = max(filter(None, [min_x, min_y]), default=None) +# max_ = min(filter(None, [max_x, max_y]), default=None) +# if min_ is not None and max_ is not None: +# assert min_ < max_, ( +# "Cannot fit an equal resolution within these constraints: " +# f"{min_x=} {max_x=} {min_y=} {max_y=}" +# ) +# return _equal_resolutions(min_, max_) +# return _resolutions_xy(min_x, max_x, min_y, max_y) + + +# def _resolutions_xy( +# min_x: int | float | None = None, +# max_x: int | float | None = None, +# min_y: int | float | None = None, +# max_y: int | float | None = None, +# ) -> st.SearchStrategy[tuple[float, float]]: +# min_x = min_x or 0 +# min_y = min_y or 0 +# assert min_x >= 0, min_x +# assert min_y >= 0, min_y +# return st.tuples( +# st.floats( +# min_x, +# max_x, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, # tiny resolutions are too small to work with +# exclude_min=min_x == 0, +# ), +# st.floats( +# min_y, +# max_y, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, # tiny resolutions are too small to work with +# exclude_min=min_y == 0, +# ), +# ).filter(lambda res_xy: res_xy[0] * -res_xy[1] != 0) + + +# def _equal_resolutions( +# min: int | float | None = None, +# max: int | float | None = None, +# ) -> st.SearchStrategy[tuple[float, float]]: +# min = min or sys.float_info.epsilon +# assert min >= 0, min +# return ( +# st.floats( +# min, +# max, +# allow_nan=False, +# allow_infinity=False, +# allow_subnormal=False, +# exclude_min=min == 0, +# ) +# .filter(lambda x: x * -x != 0) +# .map(lambda r: (r, r)) +# ) + + +# @st.composite +# def raster_specs( +# draw: st.DrawFn, +# min_side: int | None = 1, +# max_side: int | None = 10, +# equal_resolution: bool = False, +# ) -> RasterSpec: +# if min_side is not None: +# assert min_side > 0 +# if max_side is not None: +# assert max_side > 0 +# if min_side is not None and max_side is not None: +# assert max_side >= min_side + +# epsg = draw(st.sampled_from([4326, 3857, 32612]), label="epsg") + +# bounds = draw(simple_bboxes(zero_size=False)) +# width, height = bounds_width_height(bounds) +# resolutions_xy = draw( +# resolutions( +# width / max_side if max_side else None, +# width / min_side if min_side else None, +# height / max_side if max_side else None, +# height / min_side if min_side else None, +# equal_resolution, +# ), +# label="resolutions_xy", +# ) + +# # res_x, res_y = draw( +# # resolutions( +# # max_x=sys.float_info.max / 2 / max_side if max_side else None, +# # max_y=sys.float_info.max / 2 / max_side if max_side else None, +# # equal=equal_resolution, +# # ), +# # label="resolutions_xy", +# # ) +# # bounds = draw( +# # bboxes( +# # min_width=res_x * min_side if min_side else None, +# # max_width=res_x * max_side if max_side else None, +# # min_height=res_y * min_side if min_side else None, +# # max_height=res_y * max_side if max_side else None, +# # ), +# # label="bounds", +# # ) + +# # # FIXME something more reasonable than this. We just tend to produce +# # # specs where calculating the shape overflows, because the width/height +# # # or resolution are too close to inf. +# # # reasonableness_bound = sys.float_info.max / 4 +# # bounds = draw( +# # bboxes( +# # # -reasonableness_bound, +# # # -reasonableness_bound, +# # # reasonableness_bound, +# # # reasonableness_bound, +# # ), +# # label="bounds", +# # ) + +# # width, height = bounds_width_height(bounds) +# # resolutions_xy = draw( +# # resolutions( +# # width / max_side if max_side else None, +# # width / min_side if min_side else None, +# # height / max_side if max_side else None, +# # height / min_side if min_side else None, +# # equal_resolution, +# # ), +# # label="resolutions_xy", +# # ) + +# spec = RasterSpec(epsg, bounds, resolutions_xy) +# try: +# shape = spec.shape +# except AssertionError: +# # Shape is inf/-inf/nan +# reject() + +# if min_side: +# assert min(shape) >= min_side, f"{shape=}, {min_side=}" +# if max_side: +# assert max(shape) <= max_side, f"{shape=}, {max_side=}" + +# return spec + + +raster_dtypes = ( + st_np.unsigned_integer_dtypes() + | st_np.integer_dtypes() + | st_np.floating_dtypes() + | st_np.complex_number_dtypes() +) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index a15f795..9cadecf 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -2,6 +2,8 @@ from threading import Lock from typing import ClassVar +from hypothesis import given, strategies as st +import hypothesis.extra.numpy as st_np import numpy as np from rasterio import windows from dask.array.utils import assert_eq @@ -9,9 +11,11 @@ from stackstac.raster_spec import RasterSpec from stackstac.prepare import ASSET_TABLE_DT from stackstac.to_dask import items_to_dask +from stackstac.testing import strategies as st_stc -def test_items_to_dask_basic(): +@given(st.data(), st_stc.raster_dtypes) +def test_items_to_dask_basic(data: st.DataObject, dtype_: np.dtype): asset_table = np.array( [ # Encode the (i, j) index in the table in the URL @@ -24,8 +28,7 @@ def test_items_to_dask_basic(): ) spec_ = RasterSpec(4326, (0, 0, 7, 8), (0.5, 0.5)) chunksize = 2 - dtype_ = np.dtype("int32") - fill_value_ = -1 + fill_value_ = data.draw(st_np.from_dtype(dtype_), label="fill_value") # Build expected array of the final stack. # Start with all nodata, then write data in for each asset. @@ -52,9 +55,7 @@ def test_items_to_dask_basic(): chunk = results[(i, j) + windows.window_index(window)] if chunk.size: # Asset falls within final bounds - chunk[:] = np.random.default_rng().integers( - 0, 10000, (int(window.height), int(window.width)), dtype_ - ) + chunk[:] = np.random.default_rng().uniform(0, 128, chunk.shape) class TestReader: opened: ClassVar[set[str]] = set() @@ -81,7 +82,7 @@ def __init__( assert spec == spec_ assert dtype == dtype_ - assert fill_value == fill_value_ + np.testing.assert_equal(fill_value, fill_value_) def read(self, window: windows.Window) -> np.ndarray: assert (window.height, window.width) == (chunksize, chunksize) @@ -109,4 +110,4 @@ def __setstate__(self, state): assert arr.chunksize == (1, 1, chunksize, chunksize) assert arr.dtype == dtype_ - assert_eq(arr, results) + assert_eq(arr, results, equal_nan=True) From 50e447611626bceb8bbbf497a02ffc9f04012d13 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 11 Jan 2022 23:37:12 -0700 Subject: [PATCH 10/21] hypothesis for asset table and spec bounds --- stackstac/testing/strategies.py | 57 ++++++------------ stackstac/tests/test_to_dask.py | 103 +++++++++++++++++++++++++------- 2 files changed, 102 insertions(+), 58 deletions(-) diff --git a/stackstac/testing/strategies.py b/stackstac/testing/strategies.py index da5d36d..49a0fae 100644 --- a/stackstac/testing/strategies.py +++ b/stackstac/testing/strategies.py @@ -1,12 +1,15 @@ from __future__ import annotations + # from math import isfinite # import sys -# from hypothesis import assume, reject, strategies as st +from hypothesis import assume, reject, strategies as st import hypothesis.extra.numpy as st_np + # import numpy as np -# from stackstac.geom_utils import Bbox, bounds_width_height +from stackstac.geom_utils import Bbox + # from stackstac.prepare import ASSET_TABLE_DT # from stackstac.raster_spec import RasterSpec @@ -97,42 +100,20 @@ # return (west, south, east, north) -# @st.composite -# def simple_bboxes(draw: st.DrawFn, zero_size: bool = True) -> Bbox: -# west = draw(st.integers(-100, 99)) -# south = draw(st.integers(-100, 99)) -# east = draw(st.integers(west if zero_size else west + 1, 100)) -# north = draw(st.integers(south if zero_size else south + 1, 100)) -# factor = draw(st.integers(1, 4)) -# return (west / factor, south / factor, east / factor, north / factor) - - -# @st.composite -# def asset_tables( -# draw: st.DrawFn, -# max_side: int | None = None, -# bbox: Bbox | None = None, -# ) -> np.ndarray: -# shape = draw( -# st_np.array_shapes(min_dims=2, max_dims=2, max_side=max_side), label="shape" -# ) -# bounds_arr = draw( -# st_np.arrays( -# object, -# shape, -# # elements=bboxes() if not bbox else bboxes(*bbox), -# elements=simple_bboxes(), -# fill=st.none(), -# ), -# label="bounds_arr", -# ) - -# asset_table = np.empty_like(bounds_arr, ASSET_TABLE_DT) -# for (i, j), bounds in np.ndenumerate(bounds_arr): -# if bounds: -# asset_table[i, j] = (f"fake://{i}/{j}", bounds) - -# return asset_table +@st.composite +def simple_bboxes( + draw: st.DrawFn, + minx: int = -100, + miny: int = -100, + maxx: int = 100, + maxy: int = 100, + zero_size: bool = True, +) -> Bbox: + west = draw(st.integers(minx, maxx - 1)) + south = draw(st.integers(miny, maxy - 1)) + east = draw(st.integers(west if zero_size else west + 1, maxy)) + north = draw(st.integers(south if zero_size else south + 1, maxy)) + return (west, south, east, north) # def resolutions( diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 9cadecf..2729e1f 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -2,31 +2,76 @@ from threading import Lock from typing import ClassVar -from hypothesis import given, strategies as st +from affine import Affine +from hypothesis import given, settings, strategies as st import hypothesis.extra.numpy as st_np import numpy as np from rasterio import windows from dask.array.utils import assert_eq -from stackstac.raster_spec import RasterSpec +from stackstac.raster_spec import Bbox, RasterSpec from stackstac.prepare import ASSET_TABLE_DT from stackstac.to_dask import items_to_dask from stackstac.testing import strategies as st_stc -@given(st.data(), st_stc.raster_dtypes) -def test_items_to_dask_basic(data: st.DataObject, dtype_: np.dtype): - asset_table = np.array( - [ - # Encode the (i, j) index in the table in the URL - [("fake://0/0", [0, 0, 2, 2]), ("fake://0/1", [0, 0, 2, 2])], - [("fake://1/0", [0, 3, 2, 5]), ("fake://1/1", [10, 13, 12, 15])], - [("fake://2/0", [1, 3, 2, 6]), ("fake://2/1", [1, 3, 2, 7])], - [(None, None), (None, None)], - ], - dtype=ASSET_TABLE_DT, +@st.composite +def asset_tables( + draw: st.DrawFn, + max_side: int | None = None, +) -> np.ndarray: + """ + Generate asset tables where entries have random bounds, and are randomly missing. + Each URL is of the form ``"fake://{i}/{j}"``, so you can parse it within a Reader + to know the (time, band) coordinates of that asset. Bounds may be zero-size (the min + and max are equal). + + An example of an asset table: + + np.array( + [ + # Encode the (i, j) index in the table in the URL + [("fake://0/0", [0, 0, 2, 2]), ("fake://0/1", [0, 0, 2, 2])], + [("fake://1/0", [0, 3, 2, 5]), ("fake://1/1", [10, 13, 12, 15])], + [("fake://2/0", [1, 3, 2, 6]), ("fake://2/1", [1, 3, 2, 7])], + [(None, None), (None, None)], + ], + dtype=ASSET_TABLE_DT, + ) + """ + shape = draw( + st_np.array_shapes(min_dims=2, max_dims=2, max_side=max_side), label="shape" + ) + bounds_arr = draw( + st_np.arrays( + object, + shape, + elements=st_stc.simple_bboxes(), + fill=st.none(), + ), + label="bounds_arr", ) - spec_ = RasterSpec(4326, (0, 0, 7, 8), (0.5, 0.5)) + + asset_table = np.empty_like(bounds_arr, ASSET_TABLE_DT) + for (i, j), bounds in np.ndenumerate(bounds_arr): + if bounds: + # Encode the (i, j) index in the table in the URL + asset_table[i, j] = (f"fake://{i}/{j}", bounds) + + return asset_table + + +@given( + st.data(), + asset_tables(max_side=5), + st_stc.simple_bboxes(-4, -4, 4, 4, zero_size=False), + st_stc.raster_dtypes, +) +@settings(max_examples=500, print_blob=True) +def test_items_to_dask( + data: st.DataObject, asset_table: np.ndarray, bounds: Bbox, dtype_: np.dtype +): + spec_ = RasterSpec(4326, bounds, (0.5, 0.5)) chunksize = 2 fill_value_ = data.draw(st_np.from_dtype(dtype_), label="fill_value") @@ -44,12 +89,7 @@ def test_items_to_dask_basic(data: st.DataObject, dtype_: np.dtype): continue assert url == f"fake://{i}/{j}" - window = windows.from_bounds( - *asset["bounds"], - transform=spec_.transform, - precision=0.0 - # ^ https://github.com/rasterio/rasterio/issues/2374 - ) + window = window_from_bounds(asset["bounds"], spec_.transform) asset_windows[url] = window chunk = results[(i, j) + windows.window_index(window)] @@ -111,3 +151,26 @@ def __setstate__(self, state): assert arr.dtype == dtype_ assert_eq(arr, results, equal_nan=True) + + +def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: + "Get the window corresponding to the bounding coordinates (correcting for rasterio bugs)" + window = windows.from_bounds( + *bounds, + transform=transform, + precision=0.0 + # ^ https://github.com/rasterio/rasterio/issues/2374 + ) + + # Trim negative `row_off`/`col_off` to work around https://github.com/rasterio/rasterio/issues/2378 + window = windows.Window( + max(window.col_off, 0), + max(window.row_off, 0), + (max(window.col_off + window.width, 0) if window.col_off < 0 else window.width), + ( + max(window.row_off + window.height, 0) + if window.row_off < 0 + else window.height + ), + ) + return window From 2a728ec5d225c002ca75f2d69205343d57ff61e1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 11 Jan 2022 23:49:01 -0700 Subject: [PATCH 11/21] hypothesis for chunksize_yx --- stackstac/tests/test_to_dask.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 2729e1f..09ca7bd 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -66,13 +66,17 @@ def asset_tables( asset_tables(max_side=5), st_stc.simple_bboxes(-4, -4, 4, 4, zero_size=False), st_stc.raster_dtypes, + st_np.array_shapes(min_dims=2, max_dims=2, max_side=10), ) @settings(max_examples=500, print_blob=True) def test_items_to_dask( - data: st.DataObject, asset_table: np.ndarray, bounds: Bbox, dtype_: np.dtype + data: st.DataObject, + asset_table: np.ndarray, + bounds: Bbox, + dtype_: np.dtype, + chunksize_yx: tuple[int, int], ): spec_ = RasterSpec(4326, bounds, (0.5, 0.5)) - chunksize = 2 fill_value_ = data.draw(st_np.from_dtype(dtype_), label="fill_value") # Build expected array of the final stack. @@ -125,7 +129,8 @@ def __init__( np.testing.assert_equal(fill_value, fill_value_) def read(self, window: windows.Window) -> np.ndarray: - assert (window.height, window.width) == (chunksize, chunksize) + assert 0 < window.height <= chunksize_yx[0] + assert 0 < window.width <= chunksize_yx[1] # Read should be bypassed entirely if windows don't intersect assert windows.intersect(window, self.window) return self.full_data[window.toslices()] @@ -142,12 +147,17 @@ def __setstate__(self, state): arr = items_to_dask( asset_table, spec_, - chunksize, + chunksize_yx, dtype=dtype_, fill_value=fill_value_, reader=TestReader, ) - assert arr.chunksize == (1, 1, chunksize, chunksize) + assert arr.chunksize == ( + 1, + 1, + min(chunksize_yx[0], spec_.shape[0]), + min(chunksize_yx[1], spec_.shape[1]), + ) assert arr.dtype == dtype_ assert_eq(arr, results, equal_nan=True) @@ -164,7 +174,7 @@ def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: # Trim negative `row_off`/`col_off` to work around https://github.com/rasterio/rasterio/issues/2378 window = windows.Window( - max(window.col_off, 0), + max(window.col_off, 0), # type: ignore "Expected 0 positional arguments" max(window.row_off, 0), (max(window.col_off + window.width, 0) if window.col_off < 0 else window.width), ( From 87176e5d6d7b437f91d36da7c576237a1c09ca9c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 11 Jan 2022 23:50:36 -0700 Subject: [PATCH 12/21] clean up strategies --- stackstac/testing/strategies.py | 274 +------------------------------- 1 file changed, 2 insertions(+), 272 deletions(-) diff --git a/stackstac/testing/strategies.py b/stackstac/testing/strategies.py index 49a0fae..b7844d9 100644 --- a/stackstac/testing/strategies.py +++ b/stackstac/testing/strategies.py @@ -1,104 +1,8 @@ from __future__ import annotations -# from math import isfinite -# import sys - -from hypothesis import assume, reject, strategies as st +from hypothesis import strategies as st import hypothesis.extra.numpy as st_np -# import numpy as np - -from stackstac.geom_utils import Bbox - -# from stackstac.prepare import ASSET_TABLE_DT -# from stackstac.raster_spec import RasterSpec - - -# @st.composite -# def bboxes( -# draw: st.DrawFn, -# minx: int | float | None = None, -# miny: int | float | None = None, -# maxx: int | float | None = None, -# maxy: int | float | None = None, -# min_width: int | float = sys.float_info.epsilon, -# max_width: int | float | None = None, -# min_height: int | float = sys.float_info.epsilon, -# max_height: int | float | None = None, -# ) -> Bbox: -# if minx is not None and maxx is not None: -# assert maxx > minx -# if miny is not None and maxy is not None: -# assert maxy > miny - -# assert min_width > 0 -# assert min_height > 0 -# if max_width is not None: -# assert max_width > min_width -# if max_height is not None: -# assert max_height > min_height - -# maxx = sys.float_info.max if maxx is None else maxx -# maxy = sys.float_info.max if maxy is None else maxy -# # ^ https://github.com/HypothesisWorks/hypothesis/issues/3208 -# west = draw( -# st.floats( -# min_value=minx, -# max_value=maxx - min_width, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, -# exclude_max=True, -# ), -# label="west", -# ) -# assume(isfinite(west + min_width)) -# if max_width: -# assume(west + max_width > west) -# south = draw( -# st.floats( -# min_value=miny, -# max_value=maxy - min_height, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, -# exclude_max=True, -# ), -# label="south", -# ) -# assume(isfinite(south + min_height)) -# if max_height: -# assume(south + max_height > south) -# east = draw( -# st.floats( -# min_value=west + min_width, -# max_value=min(maxx, west + max_width) if max_width else maxx, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, -# exclude_min=True, -# ), -# label="east", -# ) -# assume(east - west >= min_width) -# if max_width: -# assume(east - west <= max_width) -# north = draw( -# st.floats( -# min_value=south + min_height, -# max_value=min(maxy, south + max_height) if max_height else maxy, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, -# exclude_min=True, -# ), -# label="north", -# ) -# assume(north - south >= min_height) -# if max_height: -# assume(north - south <= max_height) -# return (west, south, east, north) - @st.composite def simple_bboxes( @@ -108,7 +12,7 @@ def simple_bboxes( maxx: int = 100, maxy: int = 100, zero_size: bool = True, -) -> Bbox: +) -> tuple[int, int, int, int]: west = draw(st.integers(minx, maxx - 1)) south = draw(st.integers(miny, maxy - 1)) east = draw(st.integers(west if zero_size else west + 1, maxy)) @@ -116,180 +20,6 @@ def simple_bboxes( return (west, south, east, north) -# def resolutions( -# min_x: int | float | None = sys.float_info.epsilon, -# max_x: int | float | None = sys.float_info.max / 2, -# min_y: int | float | None = sys.float_info.epsilon, -# max_y: int | float | None = sys.float_info.max / 2, -# equal: bool = False, -# ) -> st.SearchStrategy[tuple[float, float]]: -# """ -# Strategy that generates tuples of x, y resolutions - -# With ``equal=True``, equal x and y resolutions will be picked -# that satisfy all the min/max constraints. Otherwise, the resolutions -# along x vs y may be different. - -# By default, this strategy does not use the full range of floating-point values, -# because very large or small numbers generally cause numerical errors downstream. -# """ -# # Notes on default values: -# # Floats small enough that `x * -x == 0` will generally cause numerical errors downstream. -# # The machine epsilon is not the smallest _possible_ value that will work here, but since -# # we don't particularly care about these extreme ranges for the sorts of tests we're doing, -# # epsilon will suffice as a "smallest reasonable resolution". -# # Similarly, very large resolutions may overflow the `.shape` calculation on `RasterSpec`, -# # since it uses non-standard GDAL-style rounding. -# if equal: -# min_ = max(filter(None, [min_x, min_y]), default=None) -# max_ = min(filter(None, [max_x, max_y]), default=None) -# if min_ is not None and max_ is not None: -# assert min_ < max_, ( -# "Cannot fit an equal resolution within these constraints: " -# f"{min_x=} {max_x=} {min_y=} {max_y=}" -# ) -# return _equal_resolutions(min_, max_) -# return _resolutions_xy(min_x, max_x, min_y, max_y) - - -# def _resolutions_xy( -# min_x: int | float | None = None, -# max_x: int | float | None = None, -# min_y: int | float | None = None, -# max_y: int | float | None = None, -# ) -> st.SearchStrategy[tuple[float, float]]: -# min_x = min_x or 0 -# min_y = min_y or 0 -# assert min_x >= 0, min_x -# assert min_y >= 0, min_y -# return st.tuples( -# st.floats( -# min_x, -# max_x, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, # tiny resolutions are too small to work with -# exclude_min=min_x == 0, -# ), -# st.floats( -# min_y, -# max_y, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, # tiny resolutions are too small to work with -# exclude_min=min_y == 0, -# ), -# ).filter(lambda res_xy: res_xy[0] * -res_xy[1] != 0) - - -# def _equal_resolutions( -# min: int | float | None = None, -# max: int | float | None = None, -# ) -> st.SearchStrategy[tuple[float, float]]: -# min = min or sys.float_info.epsilon -# assert min >= 0, min -# return ( -# st.floats( -# min, -# max, -# allow_nan=False, -# allow_infinity=False, -# allow_subnormal=False, -# exclude_min=min == 0, -# ) -# .filter(lambda x: x * -x != 0) -# .map(lambda r: (r, r)) -# ) - - -# @st.composite -# def raster_specs( -# draw: st.DrawFn, -# min_side: int | None = 1, -# max_side: int | None = 10, -# equal_resolution: bool = False, -# ) -> RasterSpec: -# if min_side is not None: -# assert min_side > 0 -# if max_side is not None: -# assert max_side > 0 -# if min_side is not None and max_side is not None: -# assert max_side >= min_side - -# epsg = draw(st.sampled_from([4326, 3857, 32612]), label="epsg") - -# bounds = draw(simple_bboxes(zero_size=False)) -# width, height = bounds_width_height(bounds) -# resolutions_xy = draw( -# resolutions( -# width / max_side if max_side else None, -# width / min_side if min_side else None, -# height / max_side if max_side else None, -# height / min_side if min_side else None, -# equal_resolution, -# ), -# label="resolutions_xy", -# ) - -# # res_x, res_y = draw( -# # resolutions( -# # max_x=sys.float_info.max / 2 / max_side if max_side else None, -# # max_y=sys.float_info.max / 2 / max_side if max_side else None, -# # equal=equal_resolution, -# # ), -# # label="resolutions_xy", -# # ) -# # bounds = draw( -# # bboxes( -# # min_width=res_x * min_side if min_side else None, -# # max_width=res_x * max_side if max_side else None, -# # min_height=res_y * min_side if min_side else None, -# # max_height=res_y * max_side if max_side else None, -# # ), -# # label="bounds", -# # ) - -# # # FIXME something more reasonable than this. We just tend to produce -# # # specs where calculating the shape overflows, because the width/height -# # # or resolution are too close to inf. -# # # reasonableness_bound = sys.float_info.max / 4 -# # bounds = draw( -# # bboxes( -# # # -reasonableness_bound, -# # # -reasonableness_bound, -# # # reasonableness_bound, -# # # reasonableness_bound, -# # ), -# # label="bounds", -# # ) - -# # width, height = bounds_width_height(bounds) -# # resolutions_xy = draw( -# # resolutions( -# # width / max_side if max_side else None, -# # width / min_side if min_side else None, -# # height / max_side if max_side else None, -# # height / min_side if min_side else None, -# # equal_resolution, -# # ), -# # label="resolutions_xy", -# # ) - -# spec = RasterSpec(epsg, bounds, resolutions_xy) -# try: -# shape = spec.shape -# except AssertionError: -# # Shape is inf/-inf/nan -# reject() - -# if min_side: -# assert min(shape) >= min_side, f"{shape=}, {min_side=}" -# if max_side: -# assert max(shape) <= max_side, f"{shape=}, {max_side=}" - -# return spec - - raster_dtypes = ( st_np.unsigned_integer_dtypes() | st_np.integer_dtypes() From 99c3818a346d2c4b6eaf72abeda332550717b1a8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 12 Jan 2022 02:20:17 -0700 Subject: [PATCH 13/21] support full chunk sizes! --- stackstac/stack.py | 54 ++++++++++++++------------------- stackstac/testing/strategies.py | 51 +++++++++++++++++++++++++++++++ stackstac/tests/test_to_dask.py | 44 ++++++++++++++++++++------- stackstac/to_dask.py | 39 +++++++++++++++++++----- 4 files changed, 139 insertions(+), 49 deletions(-) diff --git a/stackstac/stack.py b/stackstac/stack.py index 7494990..ccc1815 100644 --- a/stackstac/stack.py +++ b/stackstac/stack.py @@ -14,7 +14,7 @@ from .rio_env import LayeredEnv from .rio_reader import AutoParallelRioReader from .stac_types import ItemCollectionIsh, ItemIsh, items_to_plain -from .to_dask import items_to_dask +from .to_dask import items_to_dask, ChunksParam def stack( @@ -28,7 +28,7 @@ def stack( bounds_latlon: Optional[Bbox] = None, snap_bounds: bool = True, resampling: Resampling = Resampling.nearest, - chunksize: int = 1024, + chunksize: ChunksParam = 1024, dtype: np.dtype = np.dtype("float64"), fill_value: Union[int, float] = np.nan, rescale: bool = True, @@ -161,35 +161,27 @@ def stack( The rasterio resampling method to use when reprojecting or rescaling data to a different CRS or resolution. Default: ``rasterio.enums.Resampling.nearest``. chunksize: - The chunksize to use for the spatial component of the Dask array, in pixels. - Default: 1024. Can be given in any format Dask understands for a ``chunks=`` argument, - such as ``1024``, ``(1024, 2048)``, ``15 MB``, etc. - - This is basically the "tile size" all your operations will be parallelized over. - Generally, you should use the internal tile size/block size of whatever data - you're accessing (or a multiple of that). For example, if all the assets are in - Cloud-Optimized GeoTIFF files with an internal tilesize of 512, pass ``chunksize=512``. - - You want the chunks of the Dask array to align with the internal tiles of the data. - Otherwise, if those grids don't line up, processing one chunk of your Dask array could - require reading many tiles from the data, parts of which will then be thrown out. - Additionally, those same data tiles might need to be re-read (and re-thrown-out) - for a neighboring Dask chunk, which is just as inefficient as it sounds (though setting - a high ``GDAL_CACHEMAX`` via ``gdal_env`` will help keep more data tiles cached, - at the expense of using more memory). - - Of course, when reprojecting data to a new grid, the internal tilings of each input - almost certainly won't line up anyway, so some misalignment is inevitable, and not - that big of a deal. - - **This is the one parameter we can't pick for you automatically**, because the STAC - specification offers no metadata about the internal tiling of the assets. We'd - have to open the data files to find out, which is very slow. But to make an educated - guess, you should look at ``rasterio.open(url).block_shapes`` for a few sample assets. - - The most important thing to avoid is making your ``chunksize`` here *smaller* than - the internal tilesize of the data. If you want small Dask chunks for other reasons, - don't set it here---instead, call ``.chunk`` on the DataArray to re-chunk it. + The chunksize to use for the Dask array. Default: 1024. Picking a good chunksize will + have significant effects on performance! + + Can be given in any format :ref:`Dask understands `, + such as ``1024``, ``(1024, 2048)``, ``(10, "auto", 512, 512)``, ``15 MB``, etc. + + If only 1 or 2 sizes are given, like ``2048`` or ``(512, 1024)``, this is used to chunk + just the spatial dimensions. The time and band dimensions will have a chunksize of 1, + meaning that every STAC Asset will be its own chunk. (This is the default.) + + If you'll be filtering items somewhat randomly (like ``stack[stack["eo:cloud_cover"] < 20]``), + you want the chunksize to be like ``(1, X, X, X)``. Otherwise, if you had a chunksize like + ``(3, 1, X, X)``, Dask would always load three items per chunk, even if two of them would be + immediately thrown away because they didn't match the cloud-cover filter. + + However, when your graph starts getting too large for Dask to handle, using a larger chunksize + for the time or band dimensions can help a lot. For example, ``chunksize=(10, 1, 512, 512)`` would + process in 512x512 pixel tiles, loading 10 assets at a time per tile. ``chunksize=(-1, 1, 512, 512)`` + would load *every* item within the 512x512 tile into the chunk. + If you'll be immediately compositing the data (like ``.mean("time")``), doing this is + often a good idea because you'll be flattening the assets together anyway. dtype: The NumPy data type of the output array. Default: ``float64``. Must be a data type that's compatible with ``fill_value``. diff --git a/stackstac/testing/strategies.py b/stackstac/testing/strategies.py index b7844d9..edb5603 100644 --- a/stackstac/testing/strategies.py +++ b/stackstac/testing/strategies.py @@ -3,6 +3,8 @@ from hypothesis import strategies as st import hypothesis.extra.numpy as st_np +from stackstac.to_dask import ChunksParam + @st.composite def simple_bboxes( @@ -11,6 +13,7 @@ def simple_bboxes( miny: int = -100, maxx: int = 100, maxy: int = 100, + *, zero_size: bool = True, ) -> tuple[int, int, int, int]: west = draw(st.integers(minx, maxx - 1)) @@ -26,3 +29,51 @@ def simple_bboxes( | st_np.floating_dtypes() | st_np.complex_number_dtypes() ) + + +def chunksizes( + ndim: int, + *, + max_side: int | None = None, + ints: bool = True, + auto: bool = True, + bytes: bool = True, + none: bool = True, + minus_one: bool = True, + tuples: bool = True, + dicts: bool = True, + singleton: bool = True, +) -> st.SearchStrategy[ChunksParam]: + "Generates arguments for ``chunks=`` for Dask." + + sizes = st.shared( + st.sampled_from(["8B", f"{max_side*8}B" if max_side else "100KiB"]), key="size" + ) + chunk_val_strategies = [] + if ints: + chunk_val_strategies.append(st.integers(1, max_side)) + if auto: + chunk_val_strategies.append(st.just("auto")) + if bytes: + chunk_val_strategies.append(sizes) + + toplevel_chunk_vals = st.one_of(chunk_val_strategies) + + if none: + chunk_val_strategies.append(st.none()) + if minus_one: + chunk_val_strategies.append(st.just(-1)) + + chunk_vals = st.one_of(chunk_val_strategies) + + final = [] + if singleton: + final.append(toplevel_chunk_vals) + if tuples: + final.append(st.tuples(*(chunk_vals,) * ndim)) + if dicts: + final.append( + st.dictionaries(st.integers(1, ndim), chunk_vals, min_size=1, max_size=ndim) + ) + + return st.one_of(final) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 09ca7bd..18efc07 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -11,7 +11,7 @@ from stackstac.raster_spec import Bbox, RasterSpec from stackstac.prepare import ASSET_TABLE_DT -from stackstac.to_dask import items_to_dask +from stackstac.to_dask import ChunksParam, items_to_dask, normalize_chunks from stackstac.testing import strategies as st_stc @@ -66,7 +66,16 @@ def asset_tables( asset_tables(max_side=5), st_stc.simple_bboxes(-4, -4, 4, 4, zero_size=False), st_stc.raster_dtypes, - st_np.array_shapes(min_dims=2, max_dims=2, max_side=10), + st_stc.chunksizes( + 4, + max_side=10, + auto=False, + bytes=False, + none=False, + minus_one=False, + dicts=False, + singleton=False, + ), ) @settings(max_examples=500, print_blob=True) def test_items_to_dask( @@ -74,7 +83,7 @@ def test_items_to_dask( asset_table: np.ndarray, bounds: Bbox, dtype_: np.dtype, - chunksize_yx: tuple[int, int], + chunksize: tuple[int, int, int, int], ): spec_ = RasterSpec(4326, bounds, (0.5, 0.5)) fill_value_ = data.draw(st_np.from_dtype(dtype_), label="fill_value") @@ -129,8 +138,8 @@ def __init__( np.testing.assert_equal(fill_value, fill_value_) def read(self, window: windows.Window) -> np.ndarray: - assert 0 < window.height <= chunksize_yx[0] - assert 0 < window.width <= chunksize_yx[1] + assert 0 < window.height <= chunksize[2] + assert 0 < window.width <= chunksize[3] # Read should be bypassed entirely if windows don't intersect assert windows.intersect(window, self.window) return self.full_data[window.toslices()] @@ -147,16 +156,13 @@ def __setstate__(self, state): arr = items_to_dask( asset_table, spec_, - chunksize_yx, + chunksize, dtype=dtype_, fill_value=fill_value_, reader=TestReader, ) - assert arr.chunksize == ( - 1, - 1, - min(chunksize_yx[0], spec_.shape[0]), - min(chunksize_yx[1], spec_.shape[1]), + assert arr.chunksize == tuple( + min(x, y) for x, y in zip(asset_table.shape + spec_.shape, chunksize) ) assert arr.dtype == dtype_ @@ -184,3 +190,19 @@ def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: ), ) return window + + +@given( + st_stc.chunksizes(4, max_side=1000), + st_np.array_shapes(min_dims=4, max_dims=4), + st_stc.raster_dtypes, +) +def test_normalize_chunks( + chunksize: ChunksParam, shape: tuple[int, int, int, int], dtype: np.dtype +): + chunks = normalize_chunks(chunksize, shape, dtype) + numblocks = tuple(map(len, chunks)) + assert len(numblocks) == 4 + assert all(x >= 1 for t in chunks for x in t) + if isinstance(chunksize, int) or isinstance(chunks, tuple) and len(chunks) == 2: + assert numblocks[:2] == shape[:2] diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 820857a..1022a57 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -1,7 +1,7 @@ from __future__ import annotations import itertools -from typing import ClassVar, Optional, Tuple, Type, Union +from typing import ClassVar, Literal, Optional, Tuple, Type, Union import warnings import dask @@ -16,11 +16,14 @@ from .rio_reader import AutoParallelRioReader, LayeredEnv from .reader_protocol import Reader +ChunkVal = Union[int, Literal["auto"], str, None] +ChunksParam = Union[ChunkVal, tuple[ChunkVal, ...], dict[int, ChunkVal]] + def items_to_dask( asset_table: np.ndarray, spec: RasterSpec, - chunksize: int | tuple[int, int], + chunksize: ChunksParam, resampling: Resampling = Resampling.nearest, dtype: np.dtype = np.dtype("float64"), fill_value: Union[int, float] = np.nan, @@ -38,6 +41,9 @@ def items_to_dask( f"Either use `dtype={np.array(fill_value).dtype.name!r}`, or pick a different `fill_value`." ) + chunks = normalize_chunks(chunksize, asset_table.shape + spec.shape, dtype) + chunks_tb, chunks_yx = chunks[:2], chunks[2:] + # The overall strategy in this function is to materialize the outer two dimensions (items, assets) # as one dask array (the "asset table"), then map a function over it which opens each URL as a `Reader` # instance (the "reader table"). @@ -49,7 +55,7 @@ def items_to_dask( # make URLs into dask array with 1-element chunks (one chunk per asset) asset_table_dask = da.from_array( asset_table, - chunks=1, + chunks=chunks_tb, inline_array=True, name="asset-table-" + dask.base.tokenize(asset_table), ) @@ -73,10 +79,6 @@ def items_to_dask( dtype=object, ) - shape_yx = spec.shape - chunks_yx = da.core.normalize_chunks(chunksize, shape_yx) - chunks = reader_table.chunks + chunks_yx - with warnings.catch_warnings(): warnings.simplefilter("ignore", category=da.core.PerformanceWarning) @@ -232,3 +234,26 @@ def __dask_distributed_unpack__(cls, state: list[Tuple[int, ...]]) -> Slices: self = cls.__new__(cls) self.starts = state return self + + +def normalize_chunks( + chunks: ChunksParam, shape: tuple[int, int, int, int], dtype: np.dtype +) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...], tuple[int, ...]]: + "Normalize chunks to tuple of tuples, assuming 1D and 2D chunks only apply to spatial coordinates" + # If only 1 or 2 chunks are given, assume they're for the y,x coordinates, + # and that the time,band coordinates should be chunksize 1. + # TODO implement our own auto-chunking that makes the time,band coordinates + # >1 if the spatial chunking would create too many tasks? + if isinstance(chunks, int): + chunks = (1, 1, chunks, chunks) + elif isinstance(chunks, tuple) and len(chunks) == 2: + chunks = (1, 1) + chunks + + return da.core.normalize_chunks( + chunks, + shape, + dtype=dtype, + previous_chunks=((1,) * shape[0], (1,) * shape[1], (shape[2],), (shape[3],)), + # ^ Give dask some hint of the physical layout of the data, so it prefers widening + # the spatial chunks over bundling together items/assets. This isn't totally accurate. + ) From beee6aa23842fdf102086a02598a5e00fb065586 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 12 Jan 2022 02:27:32 -0700 Subject: [PATCH 14/21] comments --- stackstac/tests/test_to_dask.py | 1 + stackstac/to_dask.py | 52 ++++++++++++++++----------------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 18efc07..3083f0b 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -167,6 +167,7 @@ def __setstate__(self, state): assert arr.dtype == dtype_ assert_eq(arr, results, equal_nan=True) + # TODO test broadcast-trick chunks def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 1022a57..3e7cc49 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -52,7 +52,7 @@ def items_to_dask( # Materializing the (items, assets) dimensions is unavoidable: every asset has a distinct URL, so that information # has to be included somehow. - # make URLs into dask array with 1-element chunks (one chunk per asset) + # make URLs into dask array, chunked as requested for the time,band dimensions asset_table_dask = da.from_array( asset_table, chunks=chunks_tb, @@ -204,10 +204,33 @@ def fetch_raster_window( return output +def normalize_chunks( + chunks: ChunksParam, shape: tuple[int, int, int, int], dtype: np.dtype +) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...], tuple[int, ...]]: + "Normalize chunks to tuple of tuples, assuming 1D and 2D chunks only apply to spatial coordinates" + # If only 1 or 2 chunks are given, assume they're for the y,x coordinates, + # and that the time,band coordinates should be chunksize 1. + # TODO implement our own auto-chunking that makes the time,band coordinates + # >1 if the spatial chunking would create too many tasks? + if isinstance(chunks, int): + chunks = (1, 1, chunks, chunks) + elif isinstance(chunks, tuple) and len(chunks) == 2: + chunks = (1, 1) + chunks + + return da.core.normalize_chunks( + chunks, + shape, + dtype=dtype, + previous_chunks=((1,) * shape[0], (1,) * shape[1], (shape[2],), (shape[3],)), + # ^ Give dask some hint of the physical layout of the data, so it prefers widening + # the spatial chunks over bundling together items/assets. This isn't totally accurate. + ) + + +# FIXME: Get this from Dask once https://github.com/dask/dask/pull/7417 is merged! +# The scheduler will refuse to import it without passlisting stackstac in `distributed.scheduler.allowed-imports`. class Slices(BlockwiseDep): "Produces the slice into the full-size array corresponding to the current chunk" - # TODO this needs to be in dask/dask, otherwise the scheduler will refuse to import it - # without passlisting stackstac in `distributed.scheduler.allowed-imports`. starts: list[tuple[int, ...]] produces_tasks: ClassVar[bool] = False @@ -234,26 +257,3 @@ def __dask_distributed_unpack__(cls, state: list[Tuple[int, ...]]) -> Slices: self = cls.__new__(cls) self.starts = state return self - - -def normalize_chunks( - chunks: ChunksParam, shape: tuple[int, int, int, int], dtype: np.dtype -) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...], tuple[int, ...]]: - "Normalize chunks to tuple of tuples, assuming 1D and 2D chunks only apply to spatial coordinates" - # If only 1 or 2 chunks are given, assume they're for the y,x coordinates, - # and that the time,band coordinates should be chunksize 1. - # TODO implement our own auto-chunking that makes the time,band coordinates - # >1 if the spatial chunking would create too many tasks? - if isinstance(chunks, int): - chunks = (1, 1, chunks, chunks) - elif isinstance(chunks, tuple) and len(chunks) == 2: - chunks = (1, 1) + chunks - - return da.core.normalize_chunks( - chunks, - shape, - dtype=dtype, - previous_chunks=((1,) * shape[0], (1,) * shape[1], (shape[2],), (shape[3],)), - # ^ Give dask some hint of the physical layout of the data, so it prefers widening - # the spatial chunks over bundling together items/assets. This isn't totally accurate. - ) From 8a40e3c7f0084e8694b61cb3f8206685ce489018 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 12 Jan 2022 02:39:27 -0700 Subject: [PATCH 15/21] Use `window_from_bounds` fix --- stackstac/tests/test_to_dask.py | 31 ++++++--------------------- stackstac/to_dask.py | 37 +++++++++++++++++++++++++++------ 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 3083f0b..56ce85c 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -2,7 +2,6 @@ from threading import Lock from typing import ClassVar -from affine import Affine from hypothesis import given, settings, strategies as st import hypothesis.extra.numpy as st_np import numpy as np @@ -11,7 +10,12 @@ from stackstac.raster_spec import Bbox, RasterSpec from stackstac.prepare import ASSET_TABLE_DT -from stackstac.to_dask import ChunksParam, items_to_dask, normalize_chunks +from stackstac.to_dask import ( + ChunksParam, + items_to_dask, + normalize_chunks, + window_from_bounds, +) from stackstac.testing import strategies as st_stc @@ -170,29 +174,6 @@ def __setstate__(self, state): # TODO test broadcast-trick chunks -def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: - "Get the window corresponding to the bounding coordinates (correcting for rasterio bugs)" - window = windows.from_bounds( - *bounds, - transform=transform, - precision=0.0 - # ^ https://github.com/rasterio/rasterio/issues/2374 - ) - - # Trim negative `row_off`/`col_off` to work around https://github.com/rasterio/rasterio/issues/2378 - window = windows.Window( - max(window.col_off, 0), # type: ignore "Expected 0 positional arguments" - max(window.row_off, 0), - (max(window.col_off + window.width, 0) if window.col_off < 0 else window.width), - ( - max(window.row_off + window.height, 0) - if window.row_off < 0 - else window.height - ), - ) - return window - - @given( st_stc.chunksizes(4, max_side=1000), st_np.array_shapes(min_dims=4, max_dims=4), diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 3e7cc49..f548664 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -4,6 +4,7 @@ from typing import ClassVar, Literal, Optional, Tuple, Type, Union import warnings +from affine import Affine import dask import dask.array as da from dask.blockwise import BlockwiseDep, blockwise @@ -130,12 +131,7 @@ def asset_table_to_reader_and_window( url: str | None = asset_entry["url"] if url: asset_bounds: Bbox = asset_entry["bounds"] - asset_window = windows.from_bounds( - *asset_bounds, - transform=spec.transform, - precision=0.0 - # ^ https://github.com/rasterio/rasterio/issues/2374 - ) + asset_window = window_from_bounds(asset_bounds, spec.transform) entry: ReaderTableEntry = ( reader( @@ -227,6 +223,35 @@ def normalize_chunks( ) +# FIXME remove this once rasterio bugs are fixed +def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: + "Get the window corresponding to the bounding coordinates (correcting for rasterio bugs)" + window = windows.from_bounds( + *bounds, + transform=transform, + precision=0.0 + # ^ https://github.com/rasterio/rasterio/issues/2374 + ) + + # Trim negative `row_off`/`col_off` to work around https://github.com/rasterio/rasterio/issues/2378 + # Note this does actually alter the window: it clips off anything that was out-of-bounds to the + # west/north of the `transform`'s origin. So the size and origin of the window is no longer accurate. + # This is okay for our purposes, since we only use these windows for intersection-testing to see if + # an asset intersects our current chunk. We don't care about the parts of the asset that fall + # outside our AOI. + window = windows.Window( + max(window.col_off, 0), # type: ignore "Expected 0 positional arguments" + max(window.row_off, 0), + (max(window.col_off + window.width, 0) if window.col_off < 0 else window.width), + ( + max(window.row_off + window.height, 0) + if window.row_off < 0 + else window.height + ), + ) + return window + + # FIXME: Get this from Dask once https://github.com/dask/dask/pull/7417 is merged! # The scheduler will refuse to import it without passlisting stackstac in `distributed.scheduler.allowed-imports`. class Slices(BlockwiseDep): From 335c36a253fb283ff1ecdedd2c37f369d5bbad7e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 12 Jan 2022 10:48:25 -0700 Subject: [PATCH 16/21] dtype and fill value in token --- stackstac/to_dask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index f548664..12f074d 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -83,7 +83,7 @@ def items_to_dask( with warnings.catch_warnings(): warnings.simplefilter("ignore", category=da.core.PerformanceWarning) - name = f"fetch_raster_window-{dask.base.tokenize(reader_table, chunks)}" + name = f"fetch_raster_window-{dask.base.tokenize(reader_table, chunks, dtype, fill_value)}" # TODO use `da.blockwise` once it supports `BlockwiseDep`s as arguments lyr = blockwise( fetch_raster_window, From 34ca1efefe6d484314e387a4d56cc219c8de5f1c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 12 Jan 2022 10:53:53 -0700 Subject: [PATCH 17/21] fix comments --- stackstac/to_dask.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 12f074d..0fa63fd 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -160,10 +160,11 @@ def fetch_raster_window( assert len(slices) == 2, slices current_window = windows.Window.from_slices(*slices) - assert reader_table.size, "Empty reader_table" + assert reader_table.size, f"Empty reader_table: {reader_table.shape=}" # Start with an empty output array, using the broadcast trick for even fewer memz. # If none of the assets end up actually existing, or overlapping the current window, - # we'll just return this 1-element array that's been broadcast to look like a full-size array. + # or containing data, we'll just return this 1-element array that's been broadcast + # to look like a full-size array. output = np.broadcast_to( np.array(fill_value, dtype), reader_table.shape + (current_window.height, current_window.width), @@ -184,7 +185,7 @@ def fetch_raster_window( data = reader.read(current_window) if all_empty: - # Turn `output` from a broadcast-trick array a real array so it's writeable + # Turn `output` from a broadcast-trick array to a real array, so it's writeable if ( np.isnan(data) if np.isnan(fill_value) @@ -203,9 +204,12 @@ def fetch_raster_window( def normalize_chunks( chunks: ChunksParam, shape: tuple[int, int, int, int], dtype: np.dtype ) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...], tuple[int, ...]]: - "Normalize chunks to tuple of tuples, assuming 1D and 2D chunks only apply to spatial coordinates" - # If only 1 or 2 chunks are given, assume they're for the y,x coordinates, - # and that the time,band coordinates should be chunksize 1. + """ + Normalize chunks to tuple of tuples, assuming 1D and 2D chunks only apply to spatial coordinates + + If only 1 or 2 chunks are given, assume they're for the ``y, x`` coordinates, + and that the ``time, band`` coordinates should be chunksize 1. + """ # TODO implement our own auto-chunking that makes the time,band coordinates # >1 if the spatial chunking would create too many tasks? if isinstance(chunks, int): From c4f759a3dae05988dbc2c1a0ba310d334f957343 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 14 Jan 2022 10:09:59 -0700 Subject: [PATCH 18/21] Can't use standard collections as types in 3.8 [PEP 585](https://www.python.org/dev/peps/pep-0585/) is only available in 3.9. We're still supporting 3.8 for now; planetary computer uses it, as do probably many others --- stackstac/to_dask.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 0fa63fd..87fc4dc 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -1,7 +1,7 @@ from __future__ import annotations import itertools -from typing import ClassVar, Literal, Optional, Tuple, Type, Union +from typing import ClassVar, Dict, Literal, Optional, Tuple, Type, Union import warnings from affine import Affine @@ -18,7 +18,7 @@ from .reader_protocol import Reader ChunkVal = Union[int, Literal["auto"], str, None] -ChunksParam = Union[ChunkVal, tuple[ChunkVal, ...], dict[int, ChunkVal]] +ChunksParam = Union[ChunkVal, Tuple[ChunkVal, ...], Dict[int, ChunkVal]] def items_to_dask( @@ -105,7 +105,7 @@ def items_to_dask( return rasters -ReaderTableEntry = Optional[tuple[Reader, windows.Window]] +ReaderTableEntry = Optional[Tuple[Reader, windows.Window]] def asset_table_to_reader_and_window( @@ -202,8 +202,8 @@ def fetch_raster_window( def normalize_chunks( - chunks: ChunksParam, shape: tuple[int, int, int, int], dtype: np.dtype -) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...], tuple[int, ...]]: + chunks: ChunksParam, shape: Tuple[int, int, int, int], dtype: np.dtype +) -> Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]: """ Normalize chunks to tuple of tuples, assuming 1D and 2D chunks only apply to spatial coordinates @@ -261,7 +261,7 @@ def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: class Slices(BlockwiseDep): "Produces the slice into the full-size array corresponding to the current chunk" - starts: list[tuple[int, ...]] + starts: list[Tuple[int, ...]] produces_tasks: ClassVar[bool] = False def __init__(self, chunks: Tuple[Tuple[int, ...], ...]): From 883c8faf91c65bba5f4badf770d7e0b7aa4e0d17 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 31 Jan 2022 16:54:26 -0700 Subject: [PATCH 19/21] Use `ArraySliceDep` from dask --- poetry.lock | 2 +- pyproject.toml | 4 ++-- stackstac/to_dask.py | 42 +++++------------------------------------- 3 files changed, 8 insertions(+), 40 deletions(-) diff --git a/poetry.lock b/poetry.lock index d064f91..962de9d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2637,7 +2637,7 @@ viz = ["aiohttp", "cachetools", "distributed", "ipyleaflet", "jupyter-server-pro [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "10c0d8d7933453a6e975522defe70299bfb7150f0234b5483e6f6ac1cf5f6203" +content-hash = "280d124e3d767339e5a21bb1dc058607c13e9976577283a76ccd587e7c6f192c" [metadata.files] affine = [ diff --git a/pyproject.toml b/pyproject.toml index 2052ed2..fa337ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,8 +15,8 @@ Sphinx = {version = "^3.5.4", optional = true} aiohttp = {version = "^3.7.4", optional = true} cachetools = {version = "^4.2.2", optional = true} coiled = {version = "^0", optional = true} -dask = {extras = ["array"], version = ">= 2021.4.1, < 2023"} -distributed = {version = ">= 2021.4.1, < 2023", optional = true} +dask = {extras = ["array"], version = "^2022.1.1"} +distributed = {version = "^2022.1.1", optional = true} furo = {version = "^2021.4.11-beta.34", optional = true} geogif = {version = "^0", optional = true} ipyleaflet = {version = "^0.13.6", optional = true} diff --git a/stackstac/to_dask.py b/stackstac/to_dask.py index 87fc4dc..513a1c0 100644 --- a/stackstac/to_dask.py +++ b/stackstac/to_dask.py @@ -1,14 +1,14 @@ from __future__ import annotations -import itertools -from typing import ClassVar, Dict, Literal, Optional, Tuple, Type, Union +from typing import Dict, Literal, Optional, Tuple, Type, Union import warnings from affine import Affine import dask import dask.array as da -from dask.blockwise import BlockwiseDep, blockwise +from dask.blockwise import blockwise from dask.highlevelgraph import HighLevelGraph +from dask.layers import ArraySliceDep import numpy as np from rasterio import windows from rasterio.enums import Resampling @@ -48,7 +48,7 @@ def items_to_dask( # The overall strategy in this function is to materialize the outer two dimensions (items, assets) # as one dask array (the "asset table"), then map a function over it which opens each URL as a `Reader` # instance (the "reader table"). - # Then, we use the `Slices` `BlockwiseDep` to represent the inner inner two dimensions (y, x), and + # Then, we use the `ArraySliceDep` `BlockwiseDep` to represent the inner inner two dimensions (y, x), and # `Blockwise` to create the cartesian product between them, avoiding materializing that entire graph. # Materializing the (items, assets) dimensions is unavoidable: every asset has a distinct URL, so that information # has to be included somehow. @@ -91,7 +91,7 @@ def items_to_dask( "tbyx", reader_table.name, "tb", - Slices(chunks_yx), + ArraySliceDep(chunks_yx), "yx", dtype, None, @@ -254,35 +254,3 @@ def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window: ), ) return window - - -# FIXME: Get this from Dask once https://github.com/dask/dask/pull/7417 is merged! -# The scheduler will refuse to import it without passlisting stackstac in `distributed.scheduler.allowed-imports`. -class Slices(BlockwiseDep): - "Produces the slice into the full-size array corresponding to the current chunk" - - starts: list[Tuple[int, ...]] - produces_tasks: ClassVar[bool] = False - - def __init__(self, chunks: Tuple[Tuple[int, ...], ...]): - self.starts = [tuple(itertools.accumulate(c, initial=0)) for c in chunks] - - def __getitem__(self, idx: Tuple[int, ...]) -> Tuple[slice, ...]: - return tuple( - slice(start[i], start[i + 1]) for i, start in zip(idx, self.starts) - ) - - @property - def numblocks(self) -> list[int]: - return [len(s) - 1 for s in self.starts] - - def __dask_distributed_pack__( - self, required_indices: Optional[list[Tuple[int, ...]]] = None - ) -> list[Tuple[int, ...]]: - return self.starts - - @classmethod - def __dask_distributed_unpack__(cls, state: list[Tuple[int, ...]]) -> Slices: - self = cls.__new__(cls) - self.starts = state - return self From 1b03d4b4f59f806d180aca525fca2f63de99e4f4 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 31 Jan 2022 17:14:12 -0700 Subject: [PATCH 20/21] Test broadcast-trick logic --- stackstac/tests/test_to_dask.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/stackstac/tests/test_to_dask.py b/stackstac/tests/test_to_dask.py index 56ce85c..1a85bfb 100644 --- a/stackstac/tests/test_to_dask.py +++ b/stackstac/tests/test_to_dask.py @@ -6,6 +6,8 @@ import hypothesis.extra.numpy as st_np import numpy as np from rasterio import windows +import dask.core +import dask.threaded from dask.array.utils import assert_eq from stackstac.raster_spec import Bbox, RasterSpec @@ -171,7 +173,17 @@ def __setstate__(self, state): assert arr.dtype == dtype_ assert_eq(arr, results, equal_nan=True) - # TODO test broadcast-trick chunks + + # Check that entirely-empty chunks are broadcast-tricked into being tiny. + # NOTE: unfortunately, this computes the array again, which slows down tests. + # But passing a computed array into `assert_eq` would skip handy checks for chunks, meta, etc. + TestReader.opened.clear() + chunks = dask.threaded.get(arr.dask, list(dask.core.flatten(arr.__dask_keys__()))) + for chunk in chunks: + if ( + np.isnan(chunk) if np.isnan(fill_value_) else np.equal(chunk, fill_value_) + ).all(): + assert chunk.strides == (0, 0, 0, 0) @given( From 04665618cd1c1e054bf449828a1ab01fe7a57e5a Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 31 Jan 2022 17:16:24 -0700 Subject: [PATCH 21/21] update docstring --- stackstac/stack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stackstac/stack.py b/stackstac/stack.py index ccc1815..6ab51e0 100644 --- a/stackstac/stack.py +++ b/stackstac/stack.py @@ -168,7 +168,7 @@ def stack( such as ``1024``, ``(1024, 2048)``, ``(10, "auto", 512, 512)``, ``15 MB``, etc. If only 1 or 2 sizes are given, like ``2048`` or ``(512, 1024)``, this is used to chunk - just the spatial dimensions. The time and band dimensions will have a chunksize of 1, + just the spatial dimensions (last two). The time and band dimensions will have a chunksize of 1, meaning that every STAC Asset will be its own chunk. (This is the default.) If you'll be filtering items somewhat randomly (like ``stack[stack["eo:cloud_cover"] < 20]``), @@ -180,7 +180,7 @@ def stack( for the time or band dimensions can help a lot. For example, ``chunksize=(10, 1, 512, 512)`` would process in 512x512 pixel tiles, loading 10 assets at a time per tile. ``chunksize=(-1, 1, 512, 512)`` would load *every* item within the 512x512 tile into the chunk. - If you'll be immediately compositing the data (like ``.mean("time")``), doing this is + If you'll be immediately compositing the data (like ``.median("time")``), doing this is often a good idea because you'll be flattening the assets together anyway. dtype: The NumPy data type of the output array. Default: ``float64``. Must be a data type