From 1988314b5e461b3ee078fdb54a03be3a9e823fe1 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Tue, 25 Apr 2017 11:42:17 +0100 Subject: [PATCH 1/3] Dask opts controlled by simple function --- lib/iris/_lazy_data.py | 23 ++++++ .../unit/lazy_data/test_iris_dask_defaults.py | 73 +++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 7d525477d6..fd9f8d2e16 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -23,11 +23,34 @@ from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa +import dask import dask.array as da +import dask.context import numpy as np import numpy.ma as ma +def _iris_dask_defaults(): + """ + Set dask defaults for Iris. The current default dask operation mode for + Iris is running single-threaded using `dask.async.get_sync`. This default + ensures that running Iris under "normal" conditions will not use up all + available computational resource. + + .. note:: + We only want Iris to set dask options in the case where doing so will + not change user-specified options that have already been set. + + """ + if 'pool' not in dask.context._globals and \ + 'get' not in dask.context._globals: + dask.set_options(get=dask.async.get_sync) + + +# Run this at import time to set dask options for Iris. +_iris_dask_defaults() + + def is_lazy_data(data): """ Return whether the argument is an Iris 'lazy' data array. diff --git a/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py b/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py new file mode 100644 index 0000000000..4dd80f944b --- /dev/null +++ b/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py @@ -0,0 +1,73 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Test :func:`iris._lazy data._iris_dask_defaults` function. + +""" + +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests + +import dask.context +from iris._lazy_data import _iris_dask_defaults + + +class Test__iris_dask_defaults(tests.IrisTest): + def setUp(self): + self.context = 'dask.context' + self._globals = 'iris._lazy_data._globals' + set_options = 'dask.set_options' + self.patch_set_options = self.patch(set_options) + get_sync = 'dask.async.get_sync' + self.patch_get_sync = self.patch(get_sync) + + def test_no_user_options(self): + test_dict = {} + with self.patch(self.context, _globals=test_dict): + _iris_dask_defaults() + self.assertEqual(dask.context._globals, test_dict) + self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) + + def test_user_options__pool(self): + test_dict = {'pool': 5} + with self.patch(self.context, _globals=test_dict): + _iris_dask_defaults() + self.assertEqual(dask.context._globals, test_dict) + self.assertEqual(self.patch_set_options.call_count, 0) + + def test_user_options__get(self): + test_dict = {'get': 'threaded'} + with self.patch(self.context, _globals=test_dict): + _iris_dask_defaults() + self.assertEqual(dask.context._globals, test_dict) + self.assertEqual(self.patch_set_options.call_count, 0) + + def test_user_options__wibble(self): + # Test a user-specified dask option that does not affect Iris. + test_dict = {'wibble': 'foo'} + with self.patch(self.context, _globals=test_dict): + _iris_dask_defaults() + self.assertEqual(dask.context._globals, test_dict) + self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) + + +if __name__ == '__main__': + tests.main() From 60a9c9730774f4085d58ffc44797dfda9ce71992 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Tue, 25 Apr 2017 16:45:30 +0100 Subject: [PATCH 2/3] Review actions: simpler tests --- .../unit/lazy_data/test_iris_dask_defaults.py | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py b/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py index 4dd80f944b..4f25a40c82 100644 --- a/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py +++ b/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py @@ -32,40 +32,30 @@ class Test__iris_dask_defaults(tests.IrisTest): def setUp(self): - self.context = 'dask.context' - self._globals = 'iris._lazy_data._globals' set_options = 'dask.set_options' self.patch_set_options = self.patch(set_options) get_sync = 'dask.async.get_sync' self.patch_get_sync = self.patch(get_sync) def test_no_user_options(self): - test_dict = {} - with self.patch(self.context, _globals=test_dict): - _iris_dask_defaults() - self.assertEqual(dask.context._globals, test_dict) + self.patch('dask.context._globals', {}) + _iris_dask_defaults() self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) def test_user_options__pool(self): - test_dict = {'pool': 5} - with self.patch(self.context, _globals=test_dict): - _iris_dask_defaults() - self.assertEqual(dask.context._globals, test_dict) + self.patch('dask.context._globals', {'pool': 5}) + _iris_dask_defaults() self.assertEqual(self.patch_set_options.call_count, 0) def test_user_options__get(self): - test_dict = {'get': 'threaded'} - with self.patch(self.context, _globals=test_dict): - _iris_dask_defaults() - self.assertEqual(dask.context._globals, test_dict) + self.patch('dask.context._globals', {'get': 'threaded'}) + _iris_dask_defaults() self.assertEqual(self.patch_set_options.call_count, 0) def test_user_options__wibble(self): # Test a user-specified dask option that does not affect Iris. - test_dict = {'wibble': 'foo'} - with self.patch(self.context, _globals=test_dict): - _iris_dask_defaults() - self.assertEqual(dask.context._globals, test_dict) + self.patch('dask.context._globals', {'wibble': 'foo'}) + _iris_dask_defaults() self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) From 20494b0df1151fd2b5b672c5c18dfdff42e43504 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Tue, 25 Apr 2017 16:48:40 +0100 Subject: [PATCH 3/3] Review actions: update docstring --- lib/iris/_lazy_data.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index fd9f8d2e16..da13ae1b9b 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -37,6 +37,9 @@ def _iris_dask_defaults(): ensures that running Iris under "normal" conditions will not use up all available computational resource. + Otherwise, by default, `dask` will use a multi-threaded scheduler that uses + all available CPUs. + .. note:: We only want Iris to set dask options in the case where doing so will not change user-specified options that have already been set.