@@ -126,6 +126,9 @@ def __init__(self, exc, tb):
126
126
tb = traceback .format_exception (type (exc ), exc , tb )
127
127
tb = '' .join (tb )
128
128
self .exc = exc
129
+ # Traceback object needs to be garbage-collected as its frames
130
+ # contain references to all the objects in the exception scope
131
+ self .exc .__traceback__ = None
129
132
self .tb = '\n """\n %s"""' % tb
130
133
def __reduce__ (self ):
131
134
return _rebuild_exc , (self .exc , self .tb )
@@ -612,6 +615,10 @@ def __init__(self, max_workers=None, mp_context=None,
612
615
mp_context = mp .get_context ()
613
616
self ._mp_context = mp_context
614
617
618
+ # https://github.com/python/cpython/issues/90622
619
+ self ._safe_to_dynamically_spawn_children = (
620
+ self ._mp_context .get_start_method (allow_none = False ) != "fork" )
621
+
615
622
if initializer is not None and not callable (initializer ):
616
623
raise TypeError ("initializer must be a callable" )
617
624
self ._initializer = initializer
@@ -662,6 +669,8 @@ def __init__(self, max_workers=None, mp_context=None,
662
669
def _start_executor_manager_thread (self ):
663
670
if self ._executor_manager_thread is None :
664
671
# Start the processes so that their sentinels are known.
672
+ if not self ._safe_to_dynamically_spawn_children : # ie, using fork.
673
+ self ._launch_processes ()
665
674
self ._executor_manager_thread = _ExecutorManagerThread (self )
666
675
self ._executor_manager_thread .start ()
667
676
_threads_wakeups [self ._executor_manager_thread ] = \
@@ -674,14 +683,31 @@ def _adjust_process_count(self):
674
683
675
684
process_count = len (self ._processes )
676
685
if process_count < self ._max_workers :
677
- p = self ._mp_context .Process (
678
- target = _process_worker ,
679
- args = (self ._call_queue ,
680
- self ._result_queue ,
681
- self ._initializer ,
682
- self ._initargs ))
683
- p .start ()
684
- self ._processes [p .pid ] = p
686
+ # Assertion disabled as this codepath is also used to replace a
687
+ # worker that unexpectedly dies, even when using the 'fork' start
688
+ # method. That means there is still a potential deadlock bug. If a
689
+ # 'fork' mp_context worker dies, we'll be forking a new one when
690
+ # we know a thread is running (self._executor_manager_thread).
691
+ #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
692
+ self ._spawn_process ()
693
+
694
+ def _launch_processes (self ):
695
+ # https://github.com/python/cpython/issues/90622
696
+ assert not self ._executor_manager_thread , (
697
+ 'Processes cannot be fork()ed after the thread has started, '
698
+ 'deadlock in the child processes could result.' )
699
+ for _ in range (len (self ._processes ), self ._max_workers ):
700
+ self ._spawn_process ()
701
+
702
+ def _spawn_process (self ):
703
+ p = self ._mp_context .Process (
704
+ target = _process_worker ,
705
+ args = (self ._call_queue ,
706
+ self ._result_queue ,
707
+ self ._initializer ,
708
+ self ._initargs ))
709
+ p .start ()
710
+ self ._processes [p .pid ] = p
685
711
686
712
def submit (self , fn , / , * args , ** kwargs ):
687
713
with self ._shutdown_lock :
@@ -702,7 +728,8 @@ def submit(self, fn, /, *args, **kwargs):
702
728
# Wake up queue management thread
703
729
self ._executor_manager_thread_wakeup .wakeup ()
704
730
705
- self ._adjust_process_count ()
731
+ if self ._safe_to_dynamically_spawn_children :
732
+ self ._adjust_process_count ()
706
733
self ._start_executor_manager_thread ()
707
734
return f
708
735
submit .__doc__ = _base .Executor .submit .__doc__
0 commit comments