Skip to content

Commit 4125fa4

Browse files
Merge pull request #528 from SylvainCorlay/async-subclasses
Async subclasses
2 parents fa0c78b + 6beb0f9 commit 4125fa4

File tree

9 files changed

+782
-37
lines changed

9 files changed

+782
-37
lines changed

jupyter_client/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from .connect import *
55
from .launcher import *
66
from .client import KernelClient
7-
from .manager import KernelManager, run_kernel
7+
from .manager import KernelManager, AsyncKernelManager, run_kernel
88
from .blocking import BlockingKernelClient
99
from .asynchronous import AsyncKernelClient
10-
from .multikernelmanager import MultiKernelManager
10+
from .multikernelmanager import MultiKernelManager, AsyncMultiKernelManager

jupyter_client/asynchronous/client.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def wait_for_ready(self, timeout=None):
135135
self._handle_kernel_info_reply(msg)
136136
break
137137

138-
if not self.is_alive():
138+
if not await self.is_alive():
139139
raise RuntimeError('Kernel died before replying to kernel_info')
140140

141141
# Check if current time is ready check time plus timeout
@@ -234,6 +234,24 @@ def _output_hook_kernel(self, session, socket, parent_header, msg):
234234
else:
235235
self._output_hook_default(msg)
236236

237+
async def is_alive(self):
238+
"""Is the kernel process still running?"""
239+
from ..manager import KernelManager, AsyncKernelManager
240+
if isinstance(self.parent, KernelManager):
241+
# This KernelClient was created by a KernelManager,
242+
# we can ask the parent KernelManager:
243+
if isinstance(self.parent, AsyncKernelManager):
244+
return await self.parent.is_alive()
245+
return self.parent.is_alive()
246+
if self._hb_channel is not None:
247+
# We don't have access to the KernelManager,
248+
# so we use the heartbeat.
249+
return self._hb_channel.is_beating()
250+
else:
251+
# no heartbeat and not local, we can't tell if it's running,
252+
# so naively return True
253+
return True
254+
237255
async def execute_interactive(self, code, silent=False, store_history=True,
238256
user_expressions=None, allow_stdin=None, stop_on_error=True,
239257
timeout=None, output_hook=None, stdin_hook=None,

jupyter_client/ioloop/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
from .manager import IOLoopKernelManager
2-
from .restarter import IOLoopKernelRestarter
1+
from .manager import IOLoopKernelManager, AsyncIOLoopKernelManager
2+
from .restarter import IOLoopKernelRestarter, AsyncIOLoopKernelRestarter

jupyter_client/ioloop/manager.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
Type,
1212
)
1313

14-
from jupyter_client.manager import KernelManager
15-
from .restarter import IOLoopKernelRestarter
14+
from jupyter_client.manager import KernelManager, AsyncKernelManager
15+
from .restarter import IOLoopKernelRestarter, AsyncIOLoopKernelRestarter
1616

1717

1818
def as_zmqstream(f):
@@ -21,9 +21,11 @@ def wrapped(self, *args, **kwargs):
2121
return ZMQStream(socket, self.loop)
2222
return wrapped
2323

24+
2425
class IOLoopKernelManager(KernelManager):
2526

2627
loop = Instance('tornado.ioloop.IOLoop')
28+
2729
def _loop_default(self):
2830
return ioloop.IOLoop.current()
2931

@@ -59,3 +61,43 @@ def stop_restarter(self):
5961
connect_iopub = as_zmqstream(KernelManager.connect_iopub)
6062
connect_stdin = as_zmqstream(KernelManager.connect_stdin)
6163
connect_hb = as_zmqstream(KernelManager.connect_hb)
64+
65+
66+
class AsyncIOLoopKernelManager(AsyncKernelManager):
67+
68+
loop = Instance('tornado.ioloop.IOLoop')
69+
70+
def _loop_default(self):
71+
return ioloop.IOLoop.current()
72+
73+
restarter_class = Type(
74+
default_value=AsyncIOLoopKernelRestarter,
75+
klass=AsyncIOLoopKernelRestarter,
76+
help=(
77+
'Type of KernelRestarter to use. '
78+
'Must be a subclass of AsyncIOLoopKernelManager.\n'
79+
'Override this to customize how kernel restarts are managed.'
80+
),
81+
config=True,
82+
)
83+
_restarter = Instance('jupyter_client.ioloop.AsyncIOLoopKernelRestarter', allow_none=True)
84+
85+
def start_restarter(self):
86+
if self.autorestart and self.has_kernel:
87+
if self._restarter is None:
88+
self._restarter = self.restarter_class(
89+
kernel_manager=self, loop=self.loop,
90+
parent=self, log=self.log
91+
)
92+
self._restarter.start()
93+
94+
def stop_restarter(self):
95+
if self.autorestart:
96+
if self._restarter is not None:
97+
self._restarter.stop()
98+
self._restarter = None
99+
100+
connect_shell = as_zmqstream(AsyncKernelManager.connect_shell)
101+
connect_iopub = as_zmqstream(AsyncKernelManager.connect_iopub)
102+
connect_stdin = as_zmqstream(AsyncKernelManager.connect_stdin)
103+
connect_hb = as_zmqstream(AsyncKernelManager.connect_hb)

jupyter_client/ioloop/restarter.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
Instance,
1717
)
1818

19+
1920
class IOLoopKernelRestarter(KernelRestarter):
2021
"""Monitor and autorestart a kernel."""
2122

2223
loop = Instance('tornado.ioloop.IOLoop')
24+
2325
def _loop_default(self):
2426
warnings.warn("IOLoopKernelRestarter.loop is deprecated in jupyter-client 5.2",
2527
DeprecationWarning, stacklevel=4,
@@ -41,3 +43,39 @@ def stop(self):
4143
if self._pcallback is not None:
4244
self._pcallback.stop()
4345
self._pcallback = None
46+
47+
48+
class AsyncIOLoopKernelRestarter(IOLoopKernelRestarter):
49+
50+
async def poll(self):
51+
if self.debug:
52+
self.log.debug('Polling kernel...')
53+
is_alive = await self.kernel_manager.is_alive()
54+
if not is_alive:
55+
if self._restarting:
56+
self._restart_count += 1
57+
else:
58+
self._restart_count = 1
59+
60+
if self._restart_count >= self.restart_limit:
61+
self.log.warning("AsyncIOLoopKernelRestarter: restart failed")
62+
self._fire_callbacks('dead')
63+
self._restarting = False
64+
self._restart_count = 0
65+
self.stop()
66+
else:
67+
newports = self.random_ports_until_alive and self._initial_startup
68+
self.log.info('AsyncIOLoopKernelRestarter: restarting kernel (%i/%i), %s random ports',
69+
self._restart_count,
70+
self.restart_limit,
71+
'new' if newports else 'keep'
72+
)
73+
self._fire_callbacks('restart')
74+
await self.kernel_manager.restart_kernel(now=True, newports=newports)
75+
self._restarting = True
76+
else:
77+
if self._initial_startup:
78+
self._initial_startup = False
79+
if self._restarting:
80+
self.log.debug("AsyncIOLoopKernelRestarter: restart apparently succeeded")
81+
self._restarting = False

0 commit comments

Comments
 (0)