From 2603b4195c8cc146f8c96c387d2d5093ee589fc1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 17 Nov 2016 19:57:07 +0100 Subject: [PATCH 1/4] Use forkserver on Unix and Python 3 --- distributed/bokeh/application.py | 1 - distributed/cli/dask_scheduler.py | 1 - distributed/nanny.py | 21 ++-- distributed/tests/test_client.py | 5 +- distributed/tests/test_core.py | 2 +- distributed/tests/test_hdfs.py | 155 +++++++++++++++--------------- distributed/utils.py | 7 ++ distributed/utils_test.py | 25 ++--- distributed/worker.py | 5 +- 9 files changed, 115 insertions(+), 107 deletions(-) diff --git a/distributed/bokeh/application.py b/distributed/bokeh/application.py index 6b614b04c62..44b84ca5b63 100644 --- a/distributed/bokeh/application.py +++ b/distributed/bokeh/application.py @@ -3,7 +3,6 @@ import atexit import json import logging -import multiprocessing import os import socket import sys diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index b83ce970ab0..1a9fbd75b5d 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -3,7 +3,6 @@ import atexit import json import logging -import multiprocessing import os import socket import subprocess diff --git a/distributed/nanny.py b/distributed/nanny.py index d3c87dfed7a..1713998b219 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta import json import logging -from multiprocessing import Process, Queue, queues +from multiprocessing.queues import Empty import os import shutil import subprocess @@ -18,7 +18,7 @@ from .compatibility import JSONDecodeError from .core import Server, rpc, write, RPCClosed from .protocol import to_serialize -from .utils import get_ip, ignoring, log_errors, tmpfile +from .utils import get_ip, ignoring, log_errors, mp_context, tmpfile from .worker import _ncores, Worker, run, TOTAL_MEMORY nanny_environment = os.path.dirname(sys.executable) @@ -190,13 +190,14 @@ def instantiate(self, stream=None, environment=None): except JSONDecodeError: yield gen.sleep(0.01) else: - q = Queue() - self.process = Process(target=run_worker_fork, - args=(q, self.ip, self.scheduler.ip, - self.scheduler.port, self.ncores, - self.port, self._given_worker_port, - self.local_dir, self.services, self.name, - self.memory_limit, self.reconnect)) + q = mp_context.Queue() + self.process = mp_context.Process( + target=run_worker_fork, + args=(q, self.ip, self.scheduler.ip, + self.scheduler.port, self.ncores, + self.port, self._given_worker_port, + self.local_dir, self.services, self.name, + self.memory_limit, self.reconnect)) self.process.daemon = True self.process.start() while True: @@ -208,7 +209,7 @@ def instantiate(self, stream=None, environment=None): self.worker_dir = msg['dir'] assert self.worker_port break - except queues.Empty: + except Empty: yield gen.sleep(0.1) logger.info("Nanny %s:%d starts worker process %s:%d", diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 4c46632f98f..c7c124ab33a 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6,7 +6,6 @@ from concurrent.futures import CancelledError from datetime import timedelta import itertools -from multiprocessing import Process import os import pickle from random import random, choice @@ -34,7 +33,7 @@ temp_default_client, get_restrictions) from distributed.scheduler import Scheduler, KilledWorker from distributed.sizeof import sizeof -from distributed.utils import sync, tmp_text, ignoring, tokey, All +from distributed.utils import sync, tmp_text, ignoring, tokey, All, mp_context from distributed.utils_test import (cluster, slow, slowinc, slowadd, randominc, loop, inc, dec, div, throws, gen_cluster, gen_test, double, deep) @@ -1614,7 +1613,7 @@ def long_running_client_connection(ip, port): @gen_cluster() def test_cleanup_after_broken_client_connection(s, a, b): - proc = Process(target=long_running_client_connection, args=(s.ip, s.port)) + proc = mp_context.Process(target=long_running_client_connection, args=(s.ip, s.port)) proc.daemon = True proc.start() diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index bb3be242687..6e76654b3dd 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -1,7 +1,6 @@ from __future__ import print_function, division, absolute_import from functools import partial -from multiprocessing import Process import socket from time import time @@ -13,6 +12,7 @@ coerce_to_rpc, send_recv, coerce_to_address, ConnectionPool) from distributed.utils_test import slow, loop, gen_test + def test_server(loop): @gen.coroutine def f(): diff --git a/distributed/tests/test_hdfs.py b/distributed/tests/test_hdfs.py index 1efcccd41b9..5fd78d1f609 100644 --- a/distributed/tests/test_hdfs.py +++ b/distributed/tests/test_hdfs.py @@ -36,10 +36,10 @@ def setup_module(module): def test_get_block_locations(): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'a' * int(1e8) # todo: reduce block size to speed up test - fn_1 = '/tmp/test/file1' - fn_2 = '/tmp/test/file2' + fn_1 = '%s/file1' % basedir + fn_2 = '%s/file2' % basedir with hdfs.open(fn_1, 'wb', replication=1) as f: f.write(data) @@ -60,8 +60,8 @@ def test_get_block_locations(): def dont_test_dataframes(e, s, a): # slow pytest.importorskip('pandas') n = 3000000 - fn = '/tmp/test/file.csv' - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): + fn = '%s/file.csv' % basedir data = (b'name,amount,id\r\n' + b'Alice,100,1\r\nBob,200,2\r\n' * n) with hdfs.open(fn, 'wb') as f: @@ -83,27 +83,27 @@ def load(b, **kwargs): def test_get_block_locations_nested(): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'a' for i in range(3): - hdfs.mkdir('/tmp/test/data-%d' % i) + hdfs.mkdir('%s/data-%d' % (basedir, i)) for j in range(2): - fn = '/tmp/test/data-%d/file-%d.csv' % (i, j) + fn = '%s/data-%d/file-%d.csv' % (basedir, i, j) with hdfs.open(fn, 'wb', replication=1) as f: f.write(data) L = [hdfs.get_block_locations(fn) - for fn in hdfs.glob('/tmp/test/*/*.csv')] + for fn in hdfs.glob('%s/*/*.csv' % basedir)] L = list(concat(L)) assert len(L) == 6 @gen_cluster([(ip, 1), (ip, 2)], timeout=60, client=True) def test_read_bytes(c, s, a, b): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'a' * int(1e8) - fn = '/tmp/test/file' + fn = '%s/file' % basedir with hdfs.open(fn, 'wb', replication=1) as f: f.write(data) @@ -131,36 +131,36 @@ def test_read_bytes(c, s, a, b): @pytest.mark.parametrize('nworkers', [1, 3]) def test_read_bytes_sync(loop, nworkers): with cluster(nworkers=nworkers) as (s, workers): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'a' * int(1e3) - for fn in ['/tmp/test/file.%d' % i for i in range(100)]: + for fn in ['%s/file.%d' % (basedir, i) for i in range(100)]: with hdfs.open(fn, 'wb', replication=1) as f: f.write(data) with Client(('127.0.0.1', s['port']), loop=loop) as e: - sample, values = read_bytes('hdfs:///tmp/test/file.*') + sample, values = read_bytes('hdfs://%s/file.*' % basedir) results = delayed(values).compute() assert [b''.join(r) for r in results] == 100 * [data] @gen_cluster([(ip, 1), (ip, 2)], timeout=60, client=True) def test_get_block_locations_nested_2(e, s, a, b): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'a' for i in range(3): - hdfs.mkdir('/tmp/test/data-%d' % i) + hdfs.mkdir('%s/data-%d' % (basedir, i)) for j in range(2): - fn = '/tmp/test/data-%d/file-%d.csv' % (i, j) + fn = '%s/data-%d/file-%d.csv' % (basedir, i, j) with hdfs.open(fn, 'wb', replication=1) as f: f.write(data) L = list(concat(hdfs.get_block_locations(fn) - for fn in hdfs.glob('/tmp/test/data-*/*.csv'))) + for fn in hdfs.glob('%s/data-*/*.csv' % basedir))) assert len(L) == 6 - sample, values = read_bytes('hdfs:///tmp/test/*/*.csv') + sample, values = read_bytes('hdfs://%s/*/*.csv' % basedir) futures = e.compute(list(concat(values))) results = yield e._gather(futures) assert len(results) == 6 @@ -169,17 +169,17 @@ def test_get_block_locations_nested_2(e, s, a, b): @gen_cluster([(ip, 1), (ip, 2)], timeout=60, client=True) def test_lazy_values(e, s, a, b): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'a' for i in range(3): - hdfs.mkdir('/tmp/test/data-%d' % i) + hdfs.mkdir('%s/data-%d' % (basedir, i)) for j in range(2): - fn = '/tmp/test/data-%d/file-%d.csv' % (i, j) + fn = '%s/data-%d/file-%d.csv' % (basedir, i, j) with hdfs.open(fn, 'wb', replication=1) as f: f.write(data) - sample, values = read_bytes('hdfs:///tmp/test/*/*.csv') + sample, values = read_bytes('hdfs://%s/*/*.csv' % basedir) assert all(isinstance(v, list) for v in values) assert all(isinstance(v, Delayed) for vv in values for v in vv) @@ -195,50 +195,52 @@ def test_lazy_values(e, s, a, b): @gen_cluster([(ip, 1), (ip, 2)], timeout=60, client=True) def test_write_bytes(c, s, a, b): - with make_hdfs() as hdfs: - hdfs.mkdir('/tmp/test/data/') + with make_hdfs() as (hdfs, basedir): + hdfs.mkdir('%s/data/' % basedir) data = [b'123', b'456', b'789'] remote_data = yield c._scatter(data) futures = c.compute(write_bytes(remote_data, - 'hdfs:///tmp/test/data/file.*.dat')) + 'hdfs://%s/data/file.*.dat' % basedir)) yield _wait(futures) yield futures[0]._result() - assert len(hdfs.ls('/tmp/test/data/')) == 3 - with hdfs.open('/tmp/test/data/file.1.dat') as f: + assert len(hdfs.ls('%s/data/' % basedir)) == 3 + with hdfs.open('%s/data/file.1.dat' % basedir) as f: assert f.read() == b'456' - hdfs.mkdir('/tmp/test/data2/') + hdfs.mkdir('%s/data2/' % basedir) futures = c.compute(write_bytes(remote_data, - 'hdfs:///tmp/test/data2/')) + 'hdfs://%s/data2/' % basedir)) yield _wait(futures) - assert len(hdfs.ls('/tmp/test/data2/')) == 3 + assert len(hdfs.ls('%s/data2/' % basedir)) == 3 def test_read_csv_sync(loop): import dask.dataframe as dd import pandas as pd with cluster(nworkers=3) as (s, [a, b, c]): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/1.csv', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/1.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nAlice,100,1\nBob,200,2') - with hdfs.open('/tmp/test/2.csv', 'wb') as f: + with hdfs.open('%s/2.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nCharlie,300,3\nDennis,400,4') with Client(('127.0.0.1', s['port']), loop=loop) as e: - values = dd.read_csv('hdfs:///tmp/test/*.csv', lineterminator='\n', - collection=False, header=0) + values = dd.read_csv('hdfs://%s/*.csv' % basedir, + lineterminator='\n', + collection=False, header=0) futures = e.compute(values) assert all(isinstance(f, Future) for f in futures) L = e.gather(futures) assert isinstance(L[0], pd.DataFrame) assert list(L[0].columns) == ['name', 'amount', 'id'] - df = dd.read_csv('hdfs:///tmp/test/*.csv', lineterminator='\n', + df = dd.read_csv('hdfs://%s/*.csv' % basedir, + lineterminator='\n', collection=True, header=0) assert isinstance(df, dd.DataFrame) assert list(df.head().iloc[0]) == ['Alice', 100, 1] @@ -246,28 +248,28 @@ def test_read_csv_sync(loop): def test_read_csv_sync_compute(loop): with cluster(nworkers=1) as (s, [a]): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/1.csv', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/1.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nAlice,100,1\nBob,200,2') - with hdfs.open('/tmp/test/2.csv', 'wb') as f: + with hdfs.open('%s/2.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nCharlie,300,3\nDennis,400,4') with Client(('127.0.0.1', s['port']), loop=loop) as e: - df = dd.read_csv('hdfs:///tmp/test/*.csv', collection=True) + df = dd.read_csv('hdfs://%s/*.csv' % basedir, collection=True) assert df.amount.sum().compute(get=e.get) == 1000 @gen_cluster([(ip, 1), (ip, 1)], timeout=60, client=True) def test_read_csv(e, s, a, b): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/1.csv', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/1.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nAlice,100,1\nBob,200,2') - with hdfs.open('/tmp/test/2.csv', 'wb') as f: + with hdfs.open('%s/2.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nCharlie,300,3\nDennis,400,4') - df = dd.read_csv('hdfs:///tmp/test/*.csv', lineterminator='\n') + df = dd.read_csv('hdfs://%s/*.csv' % basedir, lineterminator='\n') result = e.compute(df.id.sum(), sync=False) result = yield result._result() assert result == 1 + 2 + 3 + 4 @@ -275,25 +277,26 @@ def test_read_csv(e, s, a, b): @gen_cluster([(ip, 1), (ip, 1)], timeout=60, client=True) def test_read_csv_with_names(e, s, a, b): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/1.csv', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/1.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nAlice,100,1\nBob,200,2') - df = dd.read_csv('hdfs:///tmp/test/*.csv', names=['amount', 'name'], - lineterminator='\n') + df = dd.read_csv('hdfs://%s/*.csv' % basedir, + names=['amount', 'name'], + lineterminator='\n') assert list(df.columns) == ['amount', 'name'] @gen_cluster([(ip, 1), (ip, 1)], timeout=60, client=True) def test_read_csv_lazy(e, s, a, b): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/1.csv', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/1.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nAlice,100,1\nBob,200,2') - with hdfs.open('/tmp/test/2.csv', 'wb') as f: + with hdfs.open('%s/2.csv' % basedir, 'wb') as f: f.write(b'name,amount,id\nCharlie,300,3\nDennis,400,4') - df = dd.read_csv('hdfs:///tmp/test/*.csv', lineterminator='\n') + df = dd.read_csv('hdfs://%s/*.csv' % basedir, lineterminator='\n') yield gen.sleep(0.5) assert not s.tasks @@ -303,17 +306,17 @@ def test_read_csv_lazy(e, s, a, b): @gen_cluster([(ip, 1), (ip, 1)], timeout=60, client=True) def test__read_text(c, s, a, b): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/text.1.txt', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/text.1.txt' % basedir, 'wb') as f: f.write('Alice 100\nBob 200\nCharlie 300'.encode()) - with hdfs.open('/tmp/test/text.2.txt', 'wb') as f: + with hdfs.open('%s/text.2.txt' % basedir, 'wb') as f: f.write('Dan 400\nEdith 500\nFrank 600'.encode()) - with hdfs.open('/tmp/test/other.txt', 'wb') as f: + with hdfs.open('%s/other.txt' % basedir, 'wb') as f: f.write('a b\nc d'.encode()) - b = db.read_text('hdfs:///tmp/test/text.*.txt') + b = db.read_text('hdfs://%s/text.*.txt' % basedir) yield gen.sleep(0.5) assert not s.tasks @@ -327,7 +330,7 @@ def test__read_text(c, s, a, b): result = yield future._result() assert result == [2, 2, 2, 2, 2, 2] - b = db.read_text('hdfs:///tmp/test/other.txt') + b = db.read_text('hdfs://%s/other.txt' % basedir) b = c.persist(b) future = c.compute(b.str.split().concat()) result = yield future._result() @@ -337,11 +340,11 @@ def test__read_text(c, s, a, b): @gen_cluster([(ip, 1)], timeout=60, client=True) def test__read_text_json_endline(e, s, a): import json - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/text.1.txt', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/text.1.txt' % basedir, 'wb') as f: f.write(b'{"x": 1}\n{"x": 2}\n') - b = db.read_text('hdfs:///tmp/test/text.1.txt').map(json.loads) + b = db.read_text('hdfs://%s/text.1.txt' % basedir).map(json.loads) result = yield e.compute(b)._result() assert result == [{"x": 1}, {"x": 2}] @@ -350,9 +353,9 @@ def test__read_text_json_endline(e, s, a): @gen_cluster([(ip, 1), (ip, 1)], timeout=60, client=True) def test__read_text_unicode(e, s, a, b): - fn = '/tmp/test/data.txt' data = b'abcd\xc3\xa9' - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): + fn = '%s/data.txt' % basedir with hdfs.open(fn, 'wb') as f: f.write(b'\n'.join([data, data])) @@ -364,28 +367,28 @@ def test__read_text_unicode(e, s, a, b): def test_read_text_sync(loop): - with make_hdfs() as hdfs: - with hdfs.open('/tmp/test/data.txt', 'wb') as f: + with make_hdfs() as (hdfs, basedir): + with hdfs.open('%s/data.txt' % basedir, 'wb') as f: f.write(b'hello\nworld') with cluster(nworkers=3) as (s, [a, b, c]): with Client(('127.0.0.1', s['port']), loop=loop): - b = db.read_text('hdfs:///tmp/test/*.txt') + b = db.read_text('hdfs://%s/*.txt' % basedir) assert list(b.str.strip().str.upper()) == ['HELLO', 'WORLD'] @gen_cluster([(ip, 1), (ip, 2)], timeout=60, client=True) def test_deterministic_key_names(e, s, a, b): - with make_hdfs() as hdfs: + with make_hdfs() as (hdfs, basedir): data = b'abc\n' * int(1e3) - fn = '/tmp/test/file' + fn = '%s/file' % basedir with hdfs.open(fn, 'wb', replication=1) as f: f.write(data) - _, x = read_bytes('hdfs:///tmp/test/*', delimiter=b'\n') - _, y = read_bytes('hdfs:///tmp/test/*', delimiter=b'\n') - _, z = read_bytes('hdfs:///tmp/test/*', delimiter=b'c') + _, x = read_bytes('hdfs://%s/*' % basedir, delimiter=b'\n') + _, y = read_bytes('hdfs://%s/*' % basedir, delimiter=b'\n') + _, z = read_bytes('hdfs://%s/*' % basedir, delimiter=b'c') assert [f.key for f in concat(x)] == [f.key for f in concat(y)] assert [f.key for f in concat(x)] != [f.key for f in concat(z)] @@ -393,16 +396,16 @@ def test_deterministic_key_names(e, s, a, b): @gen_cluster([(ip, 1), (ip, 2)], timeout=60, client=True) def test_write_bytes_2(c, s, a, b): - with make_hdfs() as hdfs: - path = 'hdfs:///tmp/test/' + with make_hdfs() as (hdfs, basedir): + path = 'hdfs://%s/' % basedir data = [b'test data %i' % i for i in range(5)] values = [delayed(d) for d in data] out = write_bytes(values, path) futures = c.compute(out) results = yield c._gather(futures) - assert len(hdfs.ls('/tmp/test/')) == 5 + assert len(hdfs.ls(basedir)) == 5 - sample, vals = read_bytes('hdfs:///tmp/test/*.part') + sample, vals = read_bytes('hdfs://%s/*.part' % basedir) futures = c.compute(list(concat(vals))) results = yield c._gather(futures) assert data == results diff --git a/distributed/utils.py b/distributed/utils.py index ab5210dce79..33a90fdbc7e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -3,6 +3,7 @@ from collections import Iterable from contextlib import contextmanager import logging +import multiprocessing import os import re import shutil @@ -29,6 +30,12 @@ logger = logging.getLogger(__name__) +if PY3 and not sys.platform.startswith('win'): + mp_context = multiprocessing.get_context('forkserver') +else: + mp_context = multiprocessing + + def funcname(func): """Get the name of a function.""" while hasattr(func, 'func'): diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 16546e5da48..5ba079d35c8 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -3,7 +3,6 @@ from contextlib import contextmanager from glob import glob import logging -from multiprocessing import Process, Queue import os import shutil import signal @@ -21,7 +20,7 @@ from tornado.iostream import StreamClosedError from .core import connect, read, write, close, rpc -from .utils import ignoring, log_errors, sync +from .utils import ignoring, log_errors, sync, mp_context import pytest @@ -276,17 +275,18 @@ def cluster(nworkers=2, nanny=False, worker_kwargs={}, active_rpc_timeout=0): _run_worker = run_nanny else: _run_worker = run_worker - scheduler_q = Queue() - scheduler = Process(target=run_scheduler, args=(scheduler_q,)) + scheduler_q = mp_context.Queue() + scheduler = mp_context.Process( + target=run_scheduler, args=(scheduler_q,)) scheduler.daemon = True scheduler.start() sport = scheduler_q.get() workers = [] for i in range(nworkers): - q = Queue() + q = mp_context.Queue() fn = '_test_worker-%s' % uuid.uuid1() - proc = Process(target=_run_worker, args=(q, sport), + proc = mp_context.Process(target=_run_worker, args=(q, sport), kwargs=merge({'ncores': 1, 'local_dir': fn}, worker_kwargs)) workers.append({'proc': proc, 'queue': q, 'dir': fn}) @@ -453,16 +453,17 @@ def test_func(): def make_hdfs(): from hdfs3 import HDFileSystem # from .hdfs import DaskHDFileSystem + basedir = '/tmp/test-distributed' hdfs = HDFileSystem(host='localhost', port=8020) - if hdfs.exists('/tmp/test'): - hdfs.rm('/tmp/test') - hdfs.mkdir('/tmp/test') + if hdfs.exists(basedir): + hdfs.rm(basedir) + hdfs.mkdir(basedir) try: - yield hdfs + yield hdfs, basedir finally: - if hdfs.exists('/tmp/test'): - hdfs.rm('/tmp/test') + if hdfs.exists(basedir): + hdfs.rm(basedir) def raises(func, exc=Exception): diff --git a/distributed/worker.py b/distributed/worker.py index 60278a25d8d..47c5abc2d26 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3,7 +3,6 @@ from datetime import timedelta from importlib import import_module import logging -import multiprocessing import os import pkg_resources import tempfile @@ -33,9 +32,9 @@ from .sizeof import sizeof from .threadpoolexecutor import ThreadPoolExecutor from .utils import (funcname, get_ip, _maybe_complex, log_errors, All, - ignoring, validate_key) + ignoring, validate_key, mp_context) -_ncores = multiprocessing.cpu_count() +_ncores = mp_context.cpu_count() thread_state = local() From 8ae6d2f1d019a1bff64f5acc7d33db2674d500e5 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 21 Nov 2016 16:12:00 +0100 Subject: [PATCH 2/4] Improve test suite speed --- .travis.yml | 7 ++++--- distributed/utils.py | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 42115d09072..4b707d9b7fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -63,9 +63,10 @@ install: - python setup.py install script: + - export PYTEST_OPTIONS="--verbose -r s --timeout-method=thread --timeout=300" - | if [[ $HDFS == true ]]; then - py.test distributed/tests/test_hdfs.py --verbose -r s --timeout-method=thread --timeout=30 + py.test distributed/tests/test_hdfs.py $PYTEST_OPTIONS if [ $? -ne 0 ]; then # Diagnose test error echo "--" @@ -75,9 +76,9 @@ script: (exit 1) fi elif [[ $COVERAGE == true ]]; then - coverage run $(which py.test) distributed -m "not avoid_travis" --verbose; + coverage run $(which py.test) distributed -m "not avoid_travis" $PYTEST_OPTIONS; else - py.test -m "not avoid_travis" distributed --verbose; + py.test -m "not avoid_travis" distributed $PYTEST_OPTIONS; fi; after_success: diff --git a/distributed/utils.py b/distributed/utils.py index 33a90fdbc7e..e7e7245036c 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -32,6 +32,8 @@ if PY3 and not sys.platform.startswith('win'): mp_context = multiprocessing.get_context('forkserver') + # Makes the test suite much faster + mp_context.set_forkserver_preload(['distributed']) else: mp_context = multiprocessing From b0c1d57c86e77b1dc1613f728d9b8f4afa87341c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 21 Nov 2016 16:22:13 +0100 Subject: [PATCH 3/4] Install lz4 and paramiko --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4b707d9b7fc..610cc58923c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,7 +48,7 @@ install: # Install dependencies - conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION - source activate test-environment - - conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 + - conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 lz4 paramiko - | if [[ $HDFS == true ]]; then conda install -q libxml2 krb5 boost From fc685972f2a3d07fbb873cd7135b52ce201e73fa Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 21 Nov 2016 21:08:00 +0100 Subject: [PATCH 4/4] Skip forking HDFS tests on py2 --- distributed/tests/test_hdfs.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/distributed/tests/test_hdfs.py b/distributed/tests/test_hdfs.py index 5fd78d1f609..8edbcb2d11a 100644 --- a/distributed/tests/test_hdfs.py +++ b/distributed/tests/test_hdfs.py @@ -1,6 +1,7 @@ from __future__ import print_function, division, absolute_import import socket +import sys from toolz import concat from tornado import gen @@ -22,6 +23,14 @@ from dask.bytes.core import read_bytes, write_bytes +_orig_cluster = cluster + +def cluster(*args, **kwargs): + if sys.version_info < (3,) and not sys.platform.startswith('win'): + pytest.skip("hdfs3 is not fork safe, test can hang on Python 2") + return _orig_cluster(*args, **kwargs) + + ip = get_ip() def setup_module(module):