Skip to content

Commit f1031ed

Browse files
committed
Nudge kernel with info request until we receive IOPub messages
1 parent 6fef2a8 commit f1031ed

File tree

3 files changed

+125
-23
lines changed

3 files changed

+125
-23
lines changed

jupyter_server/gateway/managers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False):
442442
self.log.debug("Shutdown kernel response: %d %s", response.code, response.reason)
443443
self.remove_kernel(kernel_id)
444444

445-
async def restart_kernel(self, kernel_id, now=False, **kwargs):
445+
async def restart_kernel(self, kernel_id, channels=None, now=False, **kwargs):
446446
"""Restart a kernel by its kernel uuid.
447447
448448
Parameters

jupyter_server/services/kernels/handlers.py

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,8 @@ async def post(self, kernel_id, action):
7777
await ensure_async(km.interrupt_kernel(kernel_id))
7878
self.set_status(204)
7979
if action == 'restart':
80-
8180
try:
82-
await km.restart_kernel(kernel_id)
81+
await km.restart_kernel(kernel_id, km.channels)
8382
except Exception as e:
8483
self.log.error("Exception restarting kernel", exc_info=True)
8584
self.set_status(500)
@@ -127,6 +126,64 @@ def create_stream(self):
127126
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
128127
stream.channel = channel
129128

129+
shell_channel = self.channels['shell']
130+
iopub_channel = self.channels['iopub']
131+
132+
future = Future()
133+
info_future = Future()
134+
iopub_future = Future()
135+
136+
def finish():
137+
"""Common cleanup"""
138+
loop.remove_timeout(timeout)
139+
loop.remove_timeout(nudge_handle)
140+
iopub_channel.stop_on_recv()
141+
shell_channel.stop_on_recv()
142+
143+
def on_shell_reply(msg):
144+
if not info_future.done():
145+
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
146+
shell_channel.stop_on_recv()
147+
self.log.debug("Nudge: resolving shell future")
148+
info_future.set_result(msg)
149+
if iopub_future.done():
150+
finish()
151+
self.log.debug("Nudge: resolving main future in shell handler")
152+
future.set_result(info_future.result())
153+
154+
def on_iopub(msg):
155+
if not iopub_future.done():
156+
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
157+
iopub_channel.stop_on_recv()
158+
self.log.debug("Nudge: resolving iopub future")
159+
iopub_future.set_result(None)
160+
if info_future.done():
161+
finish()
162+
self.log.debug("Nudge: resolving main future in iopub handler")
163+
future.set_result(info_future.result())
164+
165+
def on_timeout():
166+
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
167+
finish()
168+
if not future.done():
169+
future.set_exception(TimeoutError("Timeout waiting for nudge"))
170+
171+
iopub_channel.on_recv(on_iopub)
172+
shell_channel.on_recv(on_shell_reply)
173+
loop = IOLoop.current()
174+
175+
# Nudge the kernel with kernel info requests until we get an IOPub message
176+
def nudge():
177+
self.log.debug("Nudge")
178+
if not future.done():
179+
self.log.debug("nudging")
180+
self.session.send(shell_channel, "kernel_info_request")
181+
nudge_handle = loop.call_later(0.5, nudge)
182+
nudge_handle = loop.call_later(0, nudge)
183+
184+
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
185+
return future
186+
130187
def request_kernel_info(self):
131188
"""send a request for kernel_info"""
132189
km = self.kernel_manager
@@ -192,6 +249,7 @@ def initialize(self):
192249
super(ZMQChannelsHandler, self).initialize()
193250
self.zmq_stream = None
194251
self.channels = {}
252+
self.kernel_manager.channels = self.channels
195253
self.kernel_id = None
196254
self.kernel_info_channel = None
197255
self._kernel_info_future = Future()
@@ -249,7 +307,7 @@ async def _register_session(self):
249307
await stale_handler.close()
250308
self._open_sessions[self.session_key] = self
251309

252-
def open(self, kernel_id):
310+
async def open(self, kernel_id):
253311
super(ZMQChannelsHandler, self).open()
254312
km = self.kernel_manager
255313
km.notify_connect(kernel_id)
@@ -265,9 +323,11 @@ def open(self, kernel_id):
265323
for channel, msg_list in replay_buffer:
266324
stream = self.channels[channel]
267325
self._on_zmq_reply(stream, msg_list)
326+
connected = Future()
327+
connected.set_result(None)
268328
else:
269329
try:
270-
self.create_stream()
330+
connected = self.create_stream()
271331
except web.HTTPError as e:
272332
self.log.error("Error opening stream: %s", e)
273333
# WebSockets don't response to traditional error codes so we
@@ -281,8 +341,14 @@ def open(self, kernel_id):
281341
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
282342
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
283343

284-
for channel, stream in self.channels.items():
285-
stream.on_recv_stream(self._on_zmq_reply)
344+
def subscribe(value):
345+
for channel, stream in self.channels.items():
346+
stream.on_recv_stream(self._on_zmq_reply)
347+
348+
connected.add_done_callback(subscribe)
349+
350+
return connected
351+
286352

287353
def on_message(self, msg):
288354
if not self.channels:

jupyter_server/services/kernels/kernelmanager.py

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -314,33 +314,59 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False):
314314

315315
return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)
316316

317-
async def restart_kernel(self, kernel_id):
317+
async def restart_kernel(self, kernel_id, channels):
318318
"""Restart a kernel by kernel_id"""
319319
self._check_kernel_id(kernel_id)
320-
await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id))
320+
await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id, channels))
321321
kernel = self.get_kernel(kernel_id)
322322
# return a Future that will resolve when the kernel has successfully restarted
323-
channel = kernel.connect_shell()
323+
shell_channel = self.channels['shell']
324+
iopub_channel = self.channels['iopub']
325+
326+
session = Session(
327+
config=kernel.session.config,
328+
key=kernel.session.key,
329+
)
330+
324331
future = Future()
332+
info_future = Future()
333+
iopub_future = Future()
325334

326335
def finish():
327-
"""Common cleanup when restart finishes/fails for any reason."""
328-
if not channel.closed():
329-
channel.close()
336+
"""Common cleanup"""
330337
loop.remove_timeout(timeout)
338+
loop.remove_timeout(nudge_handle)
339+
iopub_channel.stop_on_recv()
340+
shell_channel.stop_on_recv()
331341
kernel.remove_restart_callback(on_restart_failed, 'dead')
332342

333-
def on_reply(msg):
334-
self.log.debug("Kernel info reply received: %s", kernel_id)
335-
finish()
336-
if not future.done():
337-
future.set_result(msg)
343+
def on_shell_reply(msg):
344+
if not info_future.done():
345+
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
346+
shell_channel.stop_on_recv()
347+
self.log.debug("Nudge: resolving shell future")
348+
info_future.set_result(msg)
349+
if iopub_future.done():
350+
finish()
351+
self.log.debug("Nudge: resolving main future in shell handler")
352+
future.set_result(info_future.result())
353+
354+
def on_iopub(msg):
355+
if not iopub_future.done():
356+
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
357+
iopub_channel.stop_on_recv()
358+
self.log.debug("Nudge: resolving iopub future")
359+
iopub_future.set_result(None)
360+
if info_future.done():
361+
finish()
362+
self.log.debug("Nudge: resolving main future in iopub handler")
363+
future.set_result(info_future.result())
338364

339365
def on_timeout():
340-
self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
366+
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
341367
finish()
342368
if not future.done():
343-
future.set_exception(TimeoutError("Timeout waiting for restart"))
369+
future.set_exception(TimeoutError("Timeout waiting for nudge"))
344370

345371
def on_restart_failed():
346372
self.log.warning("Restarting kernel failed: %s", kernel_id)
@@ -349,10 +375,20 @@ def on_restart_failed():
349375
future.set_exception(RuntimeError("Restart failed"))
350376

351377
kernel.add_restart_callback(on_restart_failed, 'dead')
352-
kernel.session.send(channel, "kernel_info_request")
353-
channel.on_recv(on_reply)
378+
379+
iopub_channel.on_recv(on_iopub)
380+
shell_channel.on_recv(on_shell_reply)
354381
loop = IOLoop.current()
355-
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
382+
383+
# Nudge the kernel with kernel info requests until we get an IOPub message
384+
def nudge():
385+
self.log.debug("Nudge")
386+
if not future.done():
387+
self.log.debug("nudging")
388+
session.send(shell_channel, "kernel_info_request")
389+
nudge_handle = loop.call_later(0.5, nudge)
390+
nudge_handle = loop.call_later(0, nudge)
391+
356392
return future
357393

358394
def notify_connect(self, kernel_id):

0 commit comments

Comments
 (0)