From 5fb6db0f6b8fa82bef0c1752798ccbb8312bfe73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Tue, 21 Jan 2025 09:18:06 +0100 Subject: [PATCH 1/4] Use zarr-fixture to prevent thread leakage errors --- xarray/tests/test_distributed.py | 64 +++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 9d68d1899d8..c6f9392ec46 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -21,7 +21,9 @@ from distributed.client import futures_of from distributed.utils_test import ( # noqa: F401 cleanup, + client, cluster, + cluster_fixture, gen_cluster, loop, loop_in_thread, @@ -46,6 +48,7 @@ from xarray.tests.test_dataset import create_test_data loop = loop # loop is an imported fixture, which flake8 has issues ack-ing +client = client # loop is an imported fixture, which flake8 has issues ack-ing @pytest.fixture @@ -214,11 +217,40 @@ def test_dask_distributed_read_netcdf_integration_test( assert_allclose(original, computed) +# fixture vendored from dask +# https://github.com/dask/dask/blob/e04734b4d8959ba259801f2e2a490cb4ee8d891f/dask/tests/test_distributed.py#L338-L358 +@pytest.fixture(scope="function") +def zarr(client): + zarr_lib = pytest.importorskip("zarr") + # Zarr-Python 3 lazily allocates a dedicated thread/IO loop + # for to execute async tasks. To avoid having this thread + # be picked up as a "leaked thread", we manually trigger it's + # creation before using zarr + try: + _ = zarr_lib.core.sync._get_loop() + _ = zarr_lib.core.sync._get_executor() + yield zarr_lib + except AttributeError: + yield zarr_lib + finally: + # Zarr-Python 3 lazily allocates a IO thread, a thread pool executor, and + # an IO loop. Here we clean up these resources to avoid leaking threads + # In normal operations, this is done as by an atexit handler when Zarr + # is shutting down. + try: + zarr_lib.core.sync.cleanup_resources() + except AttributeError: + pass + + @requires_zarr @pytest.mark.parametrize("consolidated", [True, False]) @pytest.mark.parametrize("compute", [True, False]) def test_dask_distributed_zarr_integration_test( - loop, consolidated: bool, compute: bool + client, + zarr, + consolidated: bool, + compute: bool, ) -> None: if consolidated: write_kwargs: dict[str, Any] = {"consolidated": True} @@ -226,23 +258,19 @@ def test_dask_distributed_zarr_integration_test( else: write_kwargs = read_kwargs = {} chunks = {"dim1": 4, "dim2": 3, "dim3": 5} - with cluster() as (s, [a, b]): - with Client(s["address"], loop=loop): - original = create_test_data().chunk(chunks) - with create_tmp_file( - allow_cleanup_failure=ON_WINDOWS, suffix=".zarrc" - ) as filename: - maybe_futures = original.to_zarr( # type: ignore[call-overload] #mypy bug? - filename, compute=compute, **write_kwargs - ) - if not compute: - maybe_futures.compute() - with xr.open_dataset( - filename, chunks="auto", engine="zarr", **read_kwargs - ) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, suffix=".zarrc") as filename: + maybe_futures = original.to_zarr( # type: ignore[call-overload] #mypy bug? + filename, compute=compute, **write_kwargs + ) + if not compute: + maybe_futures.compute() + with xr.open_dataset( + filename, chunks="auto", engine="zarr", **read_kwargs + ) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @gen_cluster(client=True) From 55590438352185ecb310ba738295577b89bed24c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 22 Jan 2025 07:26:27 +0100 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Joe Hamman --- xarray/tests/test_distributed.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index c6f9392ec46..e4fdf08d0b4 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -48,7 +48,7 @@ from xarray.tests.test_dataset import create_test_data loop = loop # loop is an imported fixture, which flake8 has issues ack-ing -client = client # loop is an imported fixture, which flake8 has issues ack-ing +client = client # client is an imported fixture, which flake8 has issues ack-ing @pytest.fixture @@ -218,6 +218,7 @@ def test_dask_distributed_read_netcdf_integration_test( # fixture vendored from dask +# heads-up, this is using quite private zarr API # https://github.com/dask/dask/blob/e04734b4d8959ba259801f2e2a490cb4ee8d891f/dask/tests/test_distributed.py#L338-L358 @pytest.fixture(scope="function") def zarr(client): From 739310f0a25ed4937f33a082d5a092448df3005c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 22 Jan 2025 07:32:39 +0100 Subject: [PATCH 3/4] Add whats-new.rst entry --- doc/whats-new.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index fe698bc358b..de90b77bfa8 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -67,7 +67,8 @@ Bug fixes ~~~~~~~~~ - Fix issues related to Pandas v3 ("us" vs. "ns" for python datetime, copy on write) and handling of 0d-numpy arrays in datetime/timedelta decoding (:pull:`9953`). By `Kai Mühlbauer `_. - +- Use zarr-fixture to prevent thread leakage errors (:pull:`9967`). + By `Kai Mühlbauer `_. Documentation ~~~~~~~~~~~~~ From bc79046fffa023f594e51c47e7f18b4450b6185c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 22 Jan 2025 08:30:52 +0100 Subject: [PATCH 4/4] Explicitely add pyarrow to windows builds, as importing dask.dataframe (dask>=2025.1.0) raises ImportError when missing. --- ci/requirements/environment-windows-3.13.yml | 1 + ci/requirements/environment-windows.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/ci/requirements/environment-windows-3.13.yml b/ci/requirements/environment-windows-3.13.yml index 448e3f70c0c..14a843a88be 100644 --- a/ci/requirements/environment-windows-3.13.yml +++ b/ci/requirements/environment-windows-3.13.yml @@ -29,6 +29,7 @@ dependencies: # - pint>=0.22 - pip - pre-commit + - pyarrow # importing dask.dataframe raises an ImportError without this - pydap - pytest - pytest-cov diff --git a/ci/requirements/environment-windows.yml b/ci/requirements/environment-windows.yml index 3b2e6dc62e6..5052b0fdc6c 100644 --- a/ci/requirements/environment-windows.yml +++ b/ci/requirements/environment-windows.yml @@ -29,6 +29,7 @@ dependencies: # - pint>=0.22 - pip - pre-commit + - pyarrow # importing dask.dataframe raises an ImportError without this - pydap - pytest - pytest-cov