- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 33.2k
gh-107219: Fix concurrent.futures terminate_broken() #109244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| @serhiy-storchaka @methane @ambv @gpshead @pitrou: Would you mind to have a look? I would like to merge this fix as soon as possible since the bug #107219 is affecting very badly the Python workflow. The CI failure rate is very high because of this  For now, I prefer to use  | 
| With this change, I can no longer reproduce bug. On my Windows VM which has 2 CPUs, I can easily reproduce the hang in around 30 seconds on the Python main branch: 
 I stressed the test with: 
 In 8 minutes, I failed to reproduce the bug anymore with this change. Bonus: Moreover, I can no longer hang the test when I interrupt it with CTRL+C. | 
| 
 Oh! For the first time in like 2 weeks,  Note: There are only these two unrelated failures: These 2 tests passed when re-run in verbose mode (Result: FAILURE then SUCCESS). | 
| ov = self._send_ov | ||
| if ov is not None: | ||
| # Interrupt WaitForMultipleObjects() in _send_bytes() | ||
| ov.cancel() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio uses a similar code in ProactorEventLoop:
cpython/Lib/asyncio/windows_events.py
Lines 67 to 81 in 1ec4537
| def _cancel_overlapped(self): | |
| if self._ov is None: | |
| return | |
| try: | |
| self._ov.cancel() | |
| except OSError as exc: | |
| context = { | |
| 'message': 'Cancelling an overlapped future failed', | |
| 'exception': exc, | |
| 'future': self, | |
| } | |
| if self._source_traceback: | |
| context['source_traceback'] = self._source_traceback | |
| self._loop.call_exception_handler(context) | |
| self._ov = None | 
asyncio uses more advanced code around to handle more cases. For example, in asyncio, the cancel() API is part of the public API.
Here the cancellation is a standard action in the Windows Overlapped API. The cancellation is synchronous, it's easy!
Hopefully, we are not in the very complicated RegisterWaitWithQueue() case! This case requires an asynchronous cancellation which is really complicated to handle: the completion of the cancellation should be awaited!? See this horror story: https://vstinner.github.io/asyncio-proactor-cancellation-from-hell.html
| # close() was called by another thread while | ||
| # WaitForMultipleObjects() was waiting for the overlapped | ||
| # operation. | ||
| raise OSError(errno.EPIPE, "handle is closed") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to raise a BrokenPipeError exception here, since Queue._feed() has a special code path for that to ignore EPIPE errors silently:
cpython/Lib/multiprocessing/queues.py
Lines 255 to 257 in 1ec4537
| except Exception as e: | |
| if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: | |
| return | 
And concurrent.futures uses this code path for its "call queue" which is causing troubles here:
cpython/Lib/concurrent/futures/process.py
Lines 724 to 732 in 1ec4537
| self._call_queue = _SafeQueue( | |
| max_size=queue_size, ctx=self._mp_context, | |
| pending_work_items=self._pending_work_items, | |
| shutdown_lock=self._shutdown_lock, | |
| thread_wakeup=self._executor_manager_thread_wakeup) | |
| # Killed worker processes can produce spurious "broken pipe" | |
| # tracebacks in the queue's own worker thread. But we detect killed | |
| # processes anyway, so silence the tracebacks. | |
| self._call_queue._ignore_epipe = True | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds like we got lucky that callers were handling one thing we could raise! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning, I started by adding a new exception. But I chose to reuse the existing code instead. IMO BrokenPipeError perfectly makes sense for a PipeConnection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
But I have one suggestion and one question/suggestion.
| finally: | ||
| self._send_ov = None | ||
| nwritten, err = ov.GetOverlappedResult(True) | ||
| if err == WSA_OPERATION_ABORTED: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other value can it be? There is assert err == 0 below, so I guess that any error was unexpected.
Could we simply check that err is not zero here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to write a minimalist change: change at least code as possible. I introduce one new error, I added a check for this error, and that's all. I don't know the code enough to answer to your question. I'm not a multiprocessing or Windows API expert at all :-(
Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation.
Address Serhiy's review.
9987dc7    to
    069fbfa      
    Compare
  
    | BUFSIZE = 8192 | ||
| # A very generous timeout when it comes to local connections... | ||
| CONNECTION_TIMEOUT = 20. | ||
| WSA_OPERATION_ABORTED = 995 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the same as _winapi.ERROR_OPERATION_ABORTED.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm confused. I don't recall which doc I was looking to. WriteFile() is documented to return ERROR_OPERATION_ABORTED when it's canceled: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefile
| Thanks @vstinner for the PR 🌮🎉.. I'm working now to backport this PR to: 3.11, 3.12. | 
| There's a new commit after the PR has been approved. @serhiy-storchaka: please review the changes made to this pull request. | 
…109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <[email protected]>
| GH-109254 is a backport of this pull request to the 3.12 branch. | 
| GH-109255 is a backport of this pull request to the 3.11 branch. | 
| PR merged, thanks for the review @serhiy-storchaka. I wanted to merge this fix ASAP since it prevented to merge others PRs. | 
…109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the sources of GetOverlappedResult() in _winapi.c, the only value of err can be ERROR_SUCCESS (0), ERROR_MORE_DATA, ERROR_OPERATION_ABORTED, ERROR_IO_INCOMPLETE.
| Great work, @vstinner! | 
… (#109255) gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <[email protected]>
| 
 Well, if you're confident, you can modify the  By the way, having  
 Thanks. | 
… (#109254) gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <[email protected]>
Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed.
Changes: