7676_threads_queues = weakref .WeakKeyDictionary ()
7777_global_shutdown = False
7878
79+
80+ # This constants control the maximal wakeup. If a job is submitted to the
81+ # Executor, it might take up to _POLL_TIMEOUT for the executor to notice and
82+ # start launching the job. This _POLL_TIMEOUT is to be cumulated with the
83+ # communication overhead.
84+ _POLL_TIMEOUT = .001
85+
86+
87+ class _Sentinel :
88+ __slot__ = ["_state" ]
89+
90+ def __init__ (self ):
91+ self ._state = False
92+
93+ def set (self ):
94+ self ._state = True
95+
96+ def get_and_unset (self ):
97+ s = self ._state
98+ if s :
99+ self ._state = False
100+ return s
101+
102+
79103def _python_exit ():
80104 global _global_shutdown
81105 _global_shutdown = True
82106 items = list (_threads_queues .items ())
83- for t , q in items :
84- q . put ( None )
107+ for t , wakeup in items :
108+ wakeup . set ( )
85109 for t , q in items :
86110 t .join ()
87111
@@ -266,7 +290,8 @@ def _queue_management_worker(executor_reference,
266290 pending_work_items ,
267291 work_ids_queue ,
268292 call_queue ,
269- result_queue ):
293+ result_queue ,
294+ wakeup ):
270295 """Manages the communication between this process and the worker processes.
271296
272297 This function is run in a local thread.
@@ -284,6 +309,8 @@ def _queue_management_worker(executor_reference,
284309 derived from _WorkItems for processing by the process workers.
285310 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
286311 process workers.
312+ wakeup: A _Sentinel to allow waking up the queue_manager_thread from
313+ the main Thread and avoid deadlocks caused by broken queues.
287314 """
288315 executor = None
289316
@@ -292,38 +319,56 @@ def shutting_down():
292319 or executor ._shutdown_thread )
293320
294321 def shutdown_worker ():
295- # This is an upper bound
296- nb_children_alive = sum (p .is_alive () for p in processes .values ())
297- for i in range (0 , nb_children_alive ):
298- try :
299- call_queue .put_nowait (None )
300- except Full :
301- pass
322+ # This is an upper bound on the number of children alive.
323+ n_children_alive = sum (p .is_alive () for p in processes .values ())
324+ n_children_to_stop = n_children_alive
325+ n_sentinels_sent = 0
326+ # Sent the right number of sentinels, to make sure all children are
327+ # properly terminated.
328+ while n_sentinels_sent < n_children_to_stop and n_children_alive > 0 :
329+ for i in range (n_children_to_stop - n_sentinels_sent ):
330+ try :
331+ call_queue .put_nowait (None )
332+ n_sentinels_sent += 1
333+ except Full :
334+ break
335+ n_children_alive = sum (p .is_alive () for p in processes .values ())
336+
302337 # Release the queue's resources as soon as possible.
303338 call_queue .close ()
304339 # If .join() is not called on the created processes then
305340 # some ctx.Queue methods may deadlock on Mac OS X.
306341 for p in processes .values ():
307342 p .join ()
308343
309- reader = result_queue ._reader
344+ result_reader = result_queue ._reader
310345
311346 while True :
312347 _add_call_item_to_queue (pending_work_items ,
313348 work_ids_queue ,
314349 call_queue )
315350
316- sentinels = [p .sentinel for p in processes .values ()]
317- assert sentinels
318- ready = wait ([reader ] + sentinels )
319-
351+ # Wait for a result to be ready in the result_queue while checking
352+ # that worker process are still running.
353+ worker_sentinels = [p .sentinel for p in processes .values ()]
320354 received_item = False
321- if reader in ready :
355+ while not wakeup .get_and_unset ():
356+ ready = wait ([result_reader ] + worker_sentinels ,
357+ timeout = _POLL_TIMEOUT )
358+ if len (ready ) > 0 :
359+ break
360+ else :
361+ # The thread has been woken up by the main thread or the gc.
362+ ready = []
363+ result_item = None
364+ received_item = True
365+
366+ if result_reader in ready :
322367 try :
323- result_item = reader .recv ()
368+ result_item = result_reader .recv ()
324369 received_item = True
325- except :
326- pass
370+ except BaseException as e :
371+ traceback . print_exc ()
327372 if not received_item :
328373 # Mark the process pool broken so that submits fail right now.
329374 executor = executor_reference ()
@@ -499,12 +544,20 @@ def __init__(self, max_workers=None, mp_context=None,
499544 self ._result_queue = mp_context .SimpleQueue ()
500545 self ._work_ids = queue .Queue ()
501546
547+ # Permits to wake_up the queue_manager_thread independently of
548+ # result_queue state. This avoid deadlocks caused by the non
549+ # transmission of wakeup signal when a worker died with the
550+ # _result_queue write lock.
551+ self ._wakeup = _Sentinel ()
552+
502553 def _start_queue_management_thread (self ):
503- # When the executor gets lost, the weakref callback will wake up
504- # the queue management thread.
505- def weakref_cb (_ , q = self ._result_queue ):
506- q .put (None )
507554 if self ._queue_management_thread is None :
555+ # When the executor gets lost, the weakref callback will wake up
556+ # the queue management thread.
557+ def weakref_cb (_ , wakeup = self ._wakeup ):
558+ mp .util .debug ('Executor collected: triggering callback for'
559+ ' QueueManager wakeup' )
560+ wakeup .set ()
508561 # Start the processes so that their sentinels are known.
509562 self ._adjust_process_count ()
510563 self ._queue_management_thread = threading .Thread (
@@ -514,11 +567,12 @@ def weakref_cb(_, q=self._result_queue):
514567 self ._pending_work_items ,
515568 self ._work_ids ,
516569 self ._call_queue ,
517- self ._result_queue ),
570+ self ._result_queue ,
571+ self ._wakeup ),
518572 name = "QueueManagerThread" )
519573 self ._queue_management_thread .daemon = True
520574 self ._queue_management_thread .start ()
521- _threads_queues [self ._queue_management_thread ] = self ._result_queue
575+ _threads_queues [self ._queue_management_thread ] = self ._wakeup
522576
523577 def _adjust_process_count (self ):
524578 for _ in range (len (self ._processes ), self ._max_workers ):
@@ -545,7 +599,7 @@ def submit(self, fn, *args, **kwargs):
545599 self ._work_ids .put (self ._queue_count )
546600 self ._queue_count += 1
547601 # Wake up queue management thread
548- self ._result_queue . put ( None )
602+ self ._wakeup . set ( )
549603
550604 self ._start_queue_management_thread ()
551605 return f
@@ -585,7 +639,7 @@ def shutdown(self, wait=True):
585639 self ._shutdown_thread = True
586640 if self ._queue_management_thread :
587641 # Wake up queue management thread
588- self ._result_queue . put ( None )
642+ self ._wakeup . set ( )
589643 if wait :
590644 self ._queue_management_thread .join ()
591645 # To reduce the risk of opening too many files, remove references to
0 commit comments