From aefab6b005d664c0c6d629f3e7844e947282ffd1 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:06:26 +0000 Subject: [PATCH 1/6] fix forking --- Lib/asyncio/events.py | 8 +++ Lib/test/test_asyncio/test_unix_events.py | 59 +++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index a327ba54a323a8..dec48e656dc992 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -17,6 +17,7 @@ import subprocess import sys import threading +import signal from . import format_helpers @@ -665,6 +666,13 @@ class _Local(threading.local): def __init__(self): self._local = self._Local() + if hasattr(os, 'fork'): + def on_fork(): + self._local = self._Local() + signal.set_wakeup_fd(-1) + + os.register_at_fork(after_in_child=on_fork) + def get_event_loop(self): """Get the event loop for the current context. diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 93e8611f184d25..82b7a48b4eddc4 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -11,10 +11,13 @@ import sys import threading import unittest +import time from unittest import mock import warnings +import multiprocessing from test.support import os_helper from test.support import socket_helper +from test.support import wait_process if sys.platform == 'win32': raise unittest.SkipTest('UNIX only') @@ -1867,5 +1870,61 @@ async def runner(): wsock.close() +@unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()') +class TestFork(unittest.IsolatedAsyncioTestCase): + + async def test_fork(self): + loop = asyncio.get_running_loop() + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + pid = os.fork() + if pid == 0: + # child + try: + loop = asyncio.get_event_loop_policy().get_event_loop() + os.write(w, str(id(loop)).encode()) + finally: + os._exit(0) + else: + # parent + child_loop = int(os.read(r, 100).decode()) + self.assertNotEqual(child_loop, id(loop)) + wait_process(pid, exitcode=0) + + def test_fork_signal_handling(self): + multiprocessing.set_start_method('fork') + manager = multiprocessing.Manager() + self.addCleanup(manager.shutdown) + child_started = manager.Event() + child_handler_called = manager.Event() + parent_handler_called = manager.Event() + + def parent_handler(*args): + parent_handler_called.set() + + def child_handler(*args): + child_handler_called.set() + + def child_main(): + signal.signal(signal.SIGTERM, child_handler) + child_started.set() + time.sleep(1) + + async def main(): + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGTERM, parent_handler) + + process = multiprocessing.Process(target=child_main) + process.start() + child_started.wait() + os.kill(process.pid, signal.SIGTERM) + process.join() + + asyncio.run(main()) + + self.assertFalse(parent_handler_called.is_set()) + self.assertTrue(child_handler_called.is_set()) + if __name__ == '__main__': unittest.main() From 5bd9cddb01eff54ea72ea819e48bb03200e2e082 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:13:13 +0000 Subject: [PATCH 2/6] add comments --- Lib/asyncio/events.py | 1 + Lib/test/test_asyncio/test_unix_events.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index dec48e656dc992..39a01048b4c8dd 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -668,6 +668,7 @@ def __init__(self): self._local = self._Local() if hasattr(os, 'fork'): def on_fork(): + # Reset the loop and wakeupfd in the forked child process. self._local = self._Local() signal.set_wakeup_fd(-1) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 82b7a48b4eddc4..026fc2f62afc43 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1873,7 +1873,8 @@ async def runner(): @unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()') class TestFork(unittest.IsolatedAsyncioTestCase): - async def test_fork(self): + async def test_fork_not_share_event_loop(self): + # The forked process should not share the event loop with the parent loop = asyncio.get_running_loop() r, w = os.pipe() self.addCleanup(os.close, r) @@ -1893,6 +1894,8 @@ async def test_fork(self): wait_process(pid, exitcode=0) def test_fork_signal_handling(self): + # Sending signals to the forked process should not affect the parent + # process. multiprocessing.set_start_method('fork') manager = multiprocessing.Manager() self.addCleanup(manager.shutdown) From cce8038474e72bfad3638503ff5eb0502988da39 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:19:11 +0000 Subject: [PATCH 3/6] simplify test --- Lib/test/test_asyncio/test_unix_events.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 026fc2f62afc43..5f899757d367d3 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1900,23 +1900,17 @@ def test_fork_signal_handling(self): manager = multiprocessing.Manager() self.addCleanup(manager.shutdown) child_started = manager.Event() - child_handler_called = manager.Event() - parent_handler_called = manager.Event() - - def parent_handler(*args): - parent_handler_called.set() - - def child_handler(*args): - child_handler_called.set() + child_handled = manager.Event() + parent_handled = manager.Event() def child_main(): - signal.signal(signal.SIGTERM, child_handler) + signal.signal(signal.SIGTERM, lambda *args: child_handled.set()) child_started.set() time.sleep(1) async def main(): loop = asyncio.get_running_loop() - loop.add_signal_handler(signal.SIGTERM, parent_handler) + loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set()) process = multiprocessing.Process(target=child_main) process.start() @@ -1926,8 +1920,8 @@ async def main(): asyncio.run(main()) - self.assertFalse(parent_handler_called.is_set()) - self.assertTrue(child_handler_called.is_set()) + self.assertFalse(parent_handled.is_set()) + self.assertTrue(child_handled.is_set()) if __name__ == '__main__': unittest.main() From 39d158f2a2850b889b64090b3e7780b71ec94204 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Thu, 17 Nov 2022 10:56:49 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst diff --git a/Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst b/Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst new file mode 100644 index 00000000000000..ebd82173882726 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst @@ -0,0 +1 @@ +Fix :mod:`asyncio` to not share event loop and signal wakeupfd in forked processes. Patch by Kumar Aditya. From cff34601695b6104b392fa5b9a5d9c83a59830c6 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Sat, 19 Nov 2022 07:57:42 +0000 Subject: [PATCH 5/6] add more tests --- Lib/test/test_asyncio/test_unix_events.py | 33 +++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 5f899757d367d3..5db02edbf12471 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1894,10 +1894,10 @@ async def test_fork_not_share_event_loop(self): wait_process(pid, exitcode=0) def test_fork_signal_handling(self): - # Sending signals to the forked process should not affect the parent - # process. - multiprocessing.set_start_method('fork') - manager = multiprocessing.Manager() + # Sending signal to the forked process should not affect the parent + # process + ctx = multiprocessing.get_context('fork') + manager = ctx.Manager() self.addCleanup(manager.shutdown) child_started = manager.Event() child_handled = manager.Event() @@ -1912,16 +1912,39 @@ async def main(): loop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set()) - process = multiprocessing.Process(target=child_main) + process = ctx.Process(target=child_main) process.start() child_started.wait() os.kill(process.pid, signal.SIGTERM) process.join() + async def func(): + await asyncio.sleep(0.1) + return 42 + + # Test parent's loop is still functional + self.assertEqual(await asyncio.create_task(func()), 42) + asyncio.run(main()) self.assertFalse(parent_handled.is_set()) self.assertTrue(child_handled.is_set()) + def test_fork_asyncio_run(self): + ctx = multiprocessing.get_context('fork') + manager = ctx.Manager() + self.addCleanup(manager.shutdown) + result = manager.Value('i', 0) + + async def child_main(): + await asyncio.sleep(0.1) + result.value = 42 + + process = ctx.Process(target=lambda: asyncio.run(child_main())) + process.start() + process.join() + + self.assertEqual(result.value, 42) + if __name__ == '__main__': unittest.main() From c3b522bb564988513fb4710676f15c74a1c14526 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Sat, 19 Nov 2022 08:12:40 +0000 Subject: [PATCH 6/6] more tests --- Lib/test/test_asyncio/test_unix_events.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 5db02edbf12471..4e1dab2f86b4dd 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1946,5 +1946,21 @@ async def child_main(): self.assertEqual(result.value, 42) + def test_fork_asyncio_subprocess(self): + ctx = multiprocessing.get_context('fork') + manager = ctx.Manager() + self.addCleanup(manager.shutdown) + result = manager.Value('i', 1) + + async def child_main(): + proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass') + result.value = await proc.wait() + + process = ctx.Process(target=lambda: asyncio.run(child_main())) + process.start() + process.join() + + self.assertEqual(result.value, 0) + if __name__ == '__main__': unittest.main()