From 5e8706f0e07d22dd57dc9f865ea7c02d0c7b55fd Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 29 Sep 2023 23:35:55 +0200 Subject: [PATCH 1/4] gh-109917, gh-105829: Fix concurrent.futures _ThreadWakeup.wakeup() Replace test_gh105829_should_not_deadlock_if_wakeup_pipe_full() test which was mocking too many concurrent.futures internals with a new test_wakeup() functional test. Co-Authored-By: elfstrom --- Lib/concurrent/futures/process.py | 32 +++++--- .../test_concurrent_futures/test_deadlock.py | 75 +------------------ .../test_process_pool.py | 46 ++++++++++++ 3 files changed, 72 insertions(+), 81 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 3990e6b1833d78..51fd953d65bc85 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -66,9 +66,13 @@ class _ThreadWakeup: + # Constant overriden by tests to make them faster + _wakeup_msg = b'x' + def __init__(self): self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) + self._awaken = False def close(self): # Please note that we do not take the shutdown lock when @@ -76,19 +80,29 @@ def close(self): # only be called safely from the same thread as all calls to # clear() even if you hold the shutdown lock. Otherwise we # might try to read from the closed pipe. - if not self._closed: - self._closed = True - self._writer.close() - self._reader.close() + if self._closed: + return + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - if not self._closed: - self._writer.send_bytes(b"") + if self._closed: + return + if self._awaken: + # gh-105829: Send a single message to not block if the pipe is + # full. wait_result_broken_or_wakeup() ignores the message anyway, + # it just calls clear(). + return + self._awaken = True + self._writer.send_bytes(self._wakeup_msg) def clear(self): - if not self._closed: - while self._reader.poll(): - self._reader.recv_bytes() + if self._closed: + return + while self._reader.poll(): + self._reader.recv_bytes() + self._awaken = False def _python_exit(): diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py index af702542081ad9..340eca23fcea3e 100644 --- a/Lib/test/test_concurrent_futures/test_deadlock.py +++ b/Lib/test/test_concurrent_futures/test_deadlock.py @@ -1,6 +1,4 @@ import contextlib -import queue -import signal import sys import time import unittest @@ -203,7 +201,7 @@ def test_shutdown_deadlock(self): self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: - self.executor = executor # Allow clean up in fail_on_deadlock + self.executor = executor # Allow clean up in _fail_on_deadlock f = executor.submit(_crash, delay=.1) executor.shutdown(wait=True) with self.assertRaises(BrokenProcessPool): @@ -216,7 +214,7 @@ def test_shutdown_deadlock_pickle(self): self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: - self.executor = executor # Allow clean up in fail_on_deadlock + self.executor = executor # Allow clean up in _fail_on_deadlock # Start the executor and get the executor_manager_thread to collect # the threads and avoid dangling thread that should be cleaned up @@ -244,79 +242,12 @@ def test_crash_big_data(self): data = "a" * support.PIPE_MAX_SIZE with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: - self.executor = executor # Allow clean up in fail_on_deadlock + self.executor = executor # Allow clean up in _fail_on_deadlock with self.assertRaises(BrokenProcessPool): list(executor.map(_crash_with_data, [data] * 10)) executor.shutdown(wait=True) - def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): - # Issue #105829: The _ExecutorManagerThread wakeup pipe could - # fill up and block. See: https://github.com/python/cpython/issues/105829 - - # Lots of cargo culting while writing this test, apologies if - # something is really stupid... - - self.executor.shutdown(wait=True) - - if not hasattr(signal, 'alarm'): - raise unittest.SkipTest( - "Tested platform does not support the alarm signal") - - def timeout(_signum, _frame): - import faulthandler - faulthandler.dump_traceback() - - raise RuntimeError("timed out while submitting jobs?") - - thread_run = futures.process._ExecutorManagerThread.run - def mock_run(self): - # Delay thread startup so the wakeup pipe can fill up and block - time.sleep(3) - thread_run(self) - - class MockWakeup(_ThreadWakeup): - """Mock wakeup object to force the wakeup to block""" - def __init__(self): - super().__init__() - self._dummy_queue = queue.Queue(maxsize=1) - - def wakeup(self): - self._dummy_queue.put(None, block=True) - super().wakeup() - - def clear(self): - try: - while True: - self._dummy_queue.get_nowait() - except queue.Empty: - super().clear() - - with (unittest.mock.patch.object(futures.process._ExecutorManagerThread, - 'run', mock_run), - unittest.mock.patch('concurrent.futures.process._ThreadWakeup', - MockWakeup)): - with self.executor_type(max_workers=2, - mp_context=self.get_context()) as executor: - self.executor = executor # Allow clean up in fail_on_deadlock - - job_num = 100 - job_data = range(job_num) - - # Need to use sigalarm for timeout detection because - # Executor.submit is not guarded by any timeout (both - # self._work_ids.put(self._queue_count) and - # self._executor_manager_thread_wakeup.wakeup() might - # timeout, maybe more?). In this specific case it was - # the wakeup call that deadlocked on a blocking pipe. - old_handler = signal.signal(signal.SIGALRM, timeout) - try: - signal.alarm(int(self.TIMEOUT)) - self.assertEqual(job_num, len(list(executor.map(int, job_data)))) - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, old_handler) - create_executor_tests(globals(), ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c73c2da1a01088..c22dc633ec7483 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -216,6 +216,52 @@ def mock_start_new_thread(func, *args): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() + def test_wakeup(self): + # gh-105829: Check that calling _ExecutorManagerThread wakeup() many + # times in ProcessPoolExecutor.submit() does not block if the + # _ThreadWakeup pipe becomes full. + + def get_pipe_size(connection): + try: + import fcntl + return fcntl.fcntl(connection.fileno(), fcntl.F_GETPIPE_SZ) + except ImportError: + # Assume 64 KiB pipe if we fail, makes test take longer + return 65_536 + + executor = self.executor + with executor: + # Summit a job to start the executor manager thread + # future = self.executor.submit(str, 12) + # future.result() + + # Wrap _ThreadWakeup.wakeup() to count how many times it has been + # called + thread_wakeup = executor._executor_manager_thread_wakeup + orig_wakeup = thread_wakeup.wakeup + nwakeup = 0 + def wrap_wakeup(): + nonlocal nwakeup + nwakeup += 1 + orig_wakeup() + thread_wakeup.wakeup = wrap_wakeup + + # Use longer "wakeup message" to make the hang more likely + # and to speed up the test + njob = self.worker_count * 2 # at least 2 jobs per worker + pipe_size = get_pipe_size(thread_wakeup._writer) + msg_len = min(pipe_size // njob, 512) + thread_wakeup._wakeup_msg = b'x' * msg_len + msg_size = 4 + len(thread_wakeup._wakeup_msg) + + njob = pipe_size // msg_size + job_data = range(njob) + if support.verbose: + print(f"run {njob:,} jobs") + + self.assertEqual(len(list(executor.map(int, job_data))), njob) + self.assertGreaterEqual(nwakeup, njob) + create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, From 2949c3d175f979f7db3fe7c397a7c79ef82162c7 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sat, 30 Sep 2023 10:57:11 +0200 Subject: [PATCH 2/4] Add some margin to the number of jobs --- Lib/test/test_concurrent_futures/test_process_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c22dc633ec7483..203509745aed7c 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -254,7 +254,7 @@ def wrap_wakeup(): thread_wakeup._wakeup_msg = b'x' * msg_len msg_size = 4 + len(thread_wakeup._wakeup_msg) - njob = pipe_size // msg_size + njob = pipe_size // msg_size + 10 # Add some margin job_data = range(njob) if support.verbose: print(f"run {njob:,} jobs") From fb905f512e212d7d775c5d1cee94bff206a3f634 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sat, 30 Sep 2023 11:19:25 +0200 Subject: [PATCH 3/4] gh-109917: Fix concurrent.future ProcessPoolExecutor locks Add a lock to _ThreadWakeup which is used internally by _ThreadWakeup methods to serialize method calls to make the API thread safe. No longer use the shutdown lock to access _ThreadWakeup. --- Lib/concurrent/futures/process.py | 86 ++++++++++++++----------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 51fd953d65bc85..e786d9c5130893 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -72,37 +72,41 @@ class _ThreadWakeup: def __init__(self): self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) + # True if self._wakeup_msg was sent to self._writer once. + # Cleared by clear() method. self._awaken = False + # Lock to protect _ThreadWakeup and make it thread safe. Use the lock + # to serialize method calls to make sure that the reader and the writer + # remains usable (at not closed) during a method call. + self._lock = threading.Lock() def close(self): - # Please note that we do not take the shutdown lock when - # calling clear() (to avoid deadlocking) so this method can - # only be called safely from the same thread as all calls to - # clear() even if you hold the shutdown lock. Otherwise we - # might try to read from the closed pipe. - if self._closed: - return - self._closed = True - self._writer.close() - self._reader.close() + with self._lock: + if self._closed: + return + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - if self._closed: - return - if self._awaken: - # gh-105829: Send a single message to not block if the pipe is - # full. wait_result_broken_or_wakeup() ignores the message anyway, - # it just calls clear(). - return - self._awaken = True - self._writer.send_bytes(self._wakeup_msg) + with self._lock: + if self._closed: + return + if self._awaken: + # gh-105829: Send a single message to not block if the pipe is + # full. wait_result_broken_or_wakeup() ignores the message anyway, + # it just calls clear(). + return + self._awaken = True + self._writer.send_bytes(self._wakeup_msg) def clear(self): - if self._closed: - return - while self._reader.poll(): - self._reader.recv_bytes() - self._awaken = False + with self._lock: + if self._closed: + return + while self._reader.poll(): + self._reader.recv_bytes() + self._awaken = False def _python_exit(): @@ -110,7 +114,6 @@ def _python_exit(): _global_shutdown = True items = list(_threads_wakeups.items()) for _, thread_wakeup in items: - # call not protected by ProcessPoolExecutor._shutdown_lock thread_wakeup.wakeup() for t, _ in items: t.join() @@ -181,10 +184,8 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, - thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items - self.shutdown_lock = shutdown_lock self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) @@ -193,8 +194,7 @@ def _on_queue_feeder_error(self, e, obj): tb = format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - with self.shutdown_lock: - self.thread_wakeup.wakeup() + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this # case, the executor_manager_thread fails all work_items # with BrokenProcessPool @@ -318,13 +318,10 @@ def __init__(self, executor): # When the executor gets garbage collected, the weakref callback # will wake up the queue management thread so that it can terminate # if there is no pending work item. - def weakref_cb(_, - thread_wakeup=self.thread_wakeup, - shutdown_lock=self.shutdown_lock): + def weakref_cb(_, thread_wakeup=self.thread_wakeup): mp.util.debug('Executor collected: triggering callback for' ' QueueManager wakeup') - with shutdown_lock: - thread_wakeup.wakeup() + thread_wakeup.wakeup() self.executor_reference = weakref.ref(executor, weakref_cb) @@ -452,11 +449,6 @@ def wait_result_broken_or_wakeup(self): elif wakeup_reader in ready: is_broken = False - # No need to hold the _shutdown_lock here because: - # 1. we're the only thread to use the wakeup reader - # 2. we're also the only thread to call thread_wakeup.close() - # 3. we want to avoid a possible deadlock when both reader and writer - # would block (gh-105829) self.thread_wakeup.clear() return result_item, is_broken, cause @@ -728,6 +720,12 @@ def __init__(self, max_workers=None, mp_context=None, # Map of pids to processes self._processes = {} + # The shutdown lock protects ProcessPoolExecutor. It is used to shut + # down the pool and to mark the pool as broken in a reliable way. It + # makes sure that pool remains usable during submit(). It is also used + # by _ExecutorManagerThread to adjust the process count if a worker + # process exited. + # # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() @@ -743,11 +741,6 @@ def __init__(self, max_workers=None, mp_context=None, # _result_queue to send wakeup signals to the executor_manager_thread # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. - # - # _shutdown_lock must be locked to access _ThreadWakeup.close() and - # .wakeup(). Care must also be taken to not call clear or close from - # more than one thread since _ThreadWakeup.clear() is not protected by - # the _shutdown_lock self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor @@ -758,7 +751,6 @@ def __init__(self, max_workers=None, mp_context=None, self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, - shutdown_lock=self._shutdown_lock, thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed @@ -812,6 +804,8 @@ def _spawn_process(self): self._processes[p.pid] = p def submit(self, fn, /, *args, **kwargs): + # The lock makes sure that the pool remains usable while we are + # submitting the task. with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) From 84dc6ac181e84078eac5b18fc745b2a59b840b6e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sat, 30 Sep 2023 21:45:53 +0200 Subject: [PATCH 4/4] Fix macOS which doesn't have fcntl.F_GETPIPE_SZ --- Lib/test/test_concurrent_futures/test_process_pool.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 203509745aed7c..47757027e7631b 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -224,10 +224,14 @@ def test_wakeup(self): def get_pipe_size(connection): try: import fcntl - return fcntl.fcntl(connection.fileno(), fcntl.F_GETPIPE_SZ) + if hasattr(fcntl, 'F_GETPIPE_SZ'): + return fcntl.fcntl(connection.fileno(), + fcntl.F_GETPIPE_SZ) except ImportError: - # Assume 64 KiB pipe if we fail, makes test take longer - return 65_536 + pass + + # Assume 64 KiB pipe if we fail, makes test take longer + return 65_536 executor = self.executor with executor: