Skip to content

Conversation

@DPeterK
Copy link
Member

@DPeterK DPeterK commented Mar 28, 2017

Introduces a new module; iris.options. Within this module we can provide classes that control certain elements of Iris' behaviour within a limited scope, such as the lifetime of the session, or within a context manager.

Specifically, this PR introduces the Parallel class, used to control dask parallel processing options for Iris. The Parallel class allows for control of:

  • the dask scheduler used to process dask graphs when compute is called (defaults to threaded), and
  • the number of workers used to process graphs in parallel (defaults to 1, only available for the threaded and multiprocessing scheduler options).

Note: no tests yet (I'll do them next); I was keen to get eyes on the functional code I've proposed here.

The pattern proposed here also has an obvious application to the proposal in #2457.

Addresses #2403.

@DPeterK
Copy link
Member Author

DPeterK commented Mar 28, 2017

Ping @bjlittle @lbdreyer -- I'd appreciate your thoughts on this proposal.

@lbdreyer lbdreyer added the dask label Mar 29, 2017
@lbdreyer lbdreyer added this to the dask milestone Mar 29, 2017
from iris._deprecation import IrisDeprecation, warn_deprecated
import iris.fileformats
import iris.io
import iris.options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iris.options is not being used in this module, so by importing it here, you are catering to someone being able to do:

import iris
iris.options.parallel(scheduler='multiprocessing', num_workers=6)

I don't think this is a good thing. I would prefer users have to import iris.options directly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree; I think this is a good thing. We want to make options as easy to use as possible. It makes complete sense that the controlling options for Iris come as part of the primary import.

And it's not just me that thinks so -- dask does this, as does iris.FUTURE, by dint of being defined in the Iris __init__ already. And adding options to the __init__ actually was a design consideration I made, which I only dropped because of the potential size of the options module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick It's a moot point at the moment since iris.options doesn't actually exist 😜

using multiprocessing with six worker processes::
>>> iris.options.parallel(scheduler='multiprocessing', num_workers=6)
>>> iris.load('my_dataset.nc')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with the other code examples, you should remove the python interpreter prompts i.e. the >>> and the ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 And indent by four spaces? I didn't look very hard into how this is done elsewhere...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with the other code examples

Actually, can you provide an example of this please? All the examples I just found in cube.py use the chevrons...

Copy link
Member

@lbdreyer lbdreyer Mar 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong. I didn't realise it would be so difficult for you to find examples

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to get snippy. All I was after was the example you found, as they were different to the examples I found...

@DPeterK DPeterK mentioned this pull request Mar 29, 2017
@pp-mo
Copy link
Member

pp-mo commented Apr 3, 2017

Discussed betwen @dkillick @pp-mo in offline conversation...
Probably this can all go in the existing iris.config instead of making a new module.
As at : #2467 (comment)

@DPeterK DPeterK changed the title WIP: Iris options (dask parallel processing) Iris options (dask parallel processing) Apr 11, 2017
@DPeterK
Copy link
Member Author

DPeterK commented Apr 12, 2017

Mocky tests added with some much appreciated help from @bjlittle! Think this is ready to go now, so long as Travis is happy...

.. note::
Specific dask options for computation are controlled by
:class:`iris.options.Parallel`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick This is stale, should be iris.config.Parallel or should it be iris.config.parallel ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bjlittle it should be :class:iris.config.Parallel as Parallel is (a) actually a class that can be linked to by Sphinx, and (b) has a docstring. I'll change this, and all of the similar below...

* Specify that we want to load a cube with dask parallel processing
using multiprocessing with six worker processes::
iris.options.parallel(scheduler='multiprocessing', num_workers=6)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick Nope ...

* Specify, with a context manager, that we want to load a cube with
dask parallel processing using four worker threads::
with iris.options.parallel(scheduler='threaded', num_workers=4):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick iris.config.parallel

* Run dask parallel processing using a distributed scheduler that has
been set up at the IP address and port at ``192.168.0.219:8786``::
iris.options.parallel(scheduler='192.168.0.219:8786')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick Again ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I'd fixed all of them 😒

# NOTE: tests that call the option class directly and as a contextmgr.

def test_bad_name__contextmgr(self):
# Check we can't do `with iris.options.parallel.context('foo'='bar')`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick

try:
import distributed
except ImportError:
DISTRIBUTED_AVAILABLE = False
Copy link
Member

@bjlittle bjlittle Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick Would you consider this slightly neater pattern (IMHO):

try:
    import distributed
except ImportError:
    distributed = None

Then later in the code, rather than checking for

if DISTRIBUTED_AVAILABLE:

it's simply

if distributed is not None:

or

if distributed:

See here ...

import contextlib
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
import re
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick Import order, move below following line ...

if scheduler in ['threaded', 'multiprocessing']:
num_workers = self.get('num_workers')
pool = ThreadPool(num_workers)
if scheduler == 'distributed':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick elif ?

default = self._defaults_dict['scheduler']['default']
if value is None:
value = default
elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick You could compile this re ...

Copy link
Member

@corinnebosley corinnebosley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice proposal Pete, I think that a lot of people will appreciate the ability to manipulate these options.

https://distributed.readthedocs.io/en/latest/index.html.
Example usages:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick I'm really glad you put some example usages in here. And I like the amount of detail you have put into the descriptions of the args.

if value >= cpu_count():
# Limit maximum CPUs used to 1 fewer than all available CPUs.
wmsg = ('Requested more CPUs ({}) than total available ({}). '
'Limiting number of used CPUs to {}.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick Is this going to be a bit confusing? What if somebody requests that maximum number of CPUs available, and then is only allowed to use one less than the max available, but the warning message states that the total available is the same number that they requested.

For example, let's say my CPU count is 8, and as a parallelisation beginner I want to see how fast my script is when I run it on all 8 CPUs, so I request 8. This isn't allowed by set_num_workers, so I get a warning message which says:
'Requested more CPUs (8) than total available (8). Limiting number of used CPUs to 7'

I would find this message confusing, and it would make me angry. Maybe it just needs a little clarification that you can't use all the CPUs available because it's silly.


pool = self.pool
get = dask.async.get_sync
self.patch_set_options.assert_called_once_with(pool=None, get=get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick I'm confused, if pool=None in the assert on L.137, why do you need 'pool = self.pool' on L.135?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to make sure it isn't used in this case! Nice spot though 👍

@djkirkham
Copy link
Contributor

djkirkham commented Apr 19, 2017

I'm not sure I like the idea of overriding __setattr__. It seems like it would be be better to use __setitem__ instead and have it set values in an internal dictionary. There's not much difference in syntax (option['key'] = value instead of option.key = value) but it avoids the complications of overriding __setattr__, such as not being able to use instance attributes.

def set__scheduler(self, value):
return value

def set_scheduler(self, value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These set_<key> methods are confusingly named. They don't actually set anything. A better name would be convert_<key>, IMO.

Copy link
Member Author

@DPeterK DPeterK Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They define values that are set for keys in the instance's underlying __dict__. In this sense they behave very similarly to the @property.setter in a more traditional class.

I think renaming them to convert_xxx would be more confusing as I think "converting" is further from what they're doing than "setting" is.

Copy link
Contributor

@djkirkham djkirkham Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they behave very similarly to the @property.setter in a more traditional class

But they don't behave like a setter because they don't set anything!

# Distributed not available.
wmsg = 'Cannot import distributed. Defaulting to {}.'
warnings.warn(wmsg.format(default))
self.set_scheduler(default)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually do anything, assuming the default scheduler is always the same. Did you mean to return this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention is that the scheduler setter is re-run on the default value in case it throws up any problems with setting the scheduler to the default in this case. Either way, the default value will very likely be returned in the next call of set_scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what is returned from this function is value, and that isn't changed if this else section is reached. So ultimately what will be set in the attribute is the original value passed in, despite what the warning says.

Copy link
Contributor

@djkirkham djkirkham Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I see what happens. When Option.__setattr__ is called, it doesn't recognise the address as a legitimate value, and sets the attribute to the default. But I still don't understand the point of this self.set_scheduler(default) call.

value = default
elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value):
if DISTRIBUTED_AVAILABLE:
value = 'distributed'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone tries to set the scheduler to a new address, aren't you just discarding it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler value will be set to distributed, true, which is used as a check on L407, for example. The actual IP address is maintained in _scheduler, which is set on L283 and then used on L408.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so it's only set in __init__. If I do iris.options.parallel.scheduler = <address>, that won't change what's saved in _scheduler.

def __setattr__(self, name, value):
if value is None:
value = self._defaults_dict[name]['default']
attr_setter = getattr(self, 'set_{}'.format(name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty ugly, IMO. I like the way Matplotlib handles this. In addition to storing default values, they also store a validator/converter function for each parameter (See matplotlib/rcsetup.py)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djkirkham nice idea, I'll implement that.

@DPeterK
Copy link
Member Author

DPeterK commented Apr 25, 2017

Closed on account of excessive complexity and replaced by #2511.

@DPeterK DPeterK closed this Apr 25, 2017
@DPeterK DPeterK deleted the iris_options branch April 25, 2017 11:08
@QuLogic QuLogic modified the milestones: dask, v2.0 Aug 2, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants