Skip to content

Commit 430247d

Browse files
committed
Nudge kernel with info request until we receive IOPub messages
1 parent 0c83c9d commit 430247d

File tree

2 files changed

+118
-16
lines changed

2 files changed

+118
-16
lines changed

notebook/services/kernels/handlers.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,64 @@ def create_stream(self):
127127
meth = getattr(km, 'connect_' + channel)
128128
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
129129
stream.channel = channel
130+
131+
shell_channel = self.channels['shell']
132+
iopub_channel = self.channels['iopub']
133+
134+
future = Future()
135+
info_future = Future()
136+
iopub_future = Future()
137+
138+
def finish():
139+
"""Common cleanup"""
140+
loop.remove_timeout(timeout)
141+
loop.remove_timeout(nudge_handle)
142+
iopub_channel.stop_on_recv()
143+
shell_channel.stop_on_recv()
144+
145+
def on_shell_reply(msg):
146+
if not info_future.done():
147+
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
148+
shell_channel.stop_on_recv()
149+
self.log.debug("Nudge: resolving shell future")
150+
info_future.set_result(msg)
151+
if iopub_future.done():
152+
finish()
153+
self.log.debug("Nudge: resolving main future in shell handler")
154+
future.set_result(info_future.result())
155+
156+
def on_iopub(msg):
157+
if not iopub_future.done():
158+
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
159+
iopub_channel.stop_on_recv()
160+
self.log.debug("Nudge: resolving iopub future")
161+
iopub_future.set_result(None)
162+
if info_future.done():
163+
finish()
164+
self.log.debug("Nudge: resolving main future in iopub handler")
165+
future.set_result(info_future.result())
166+
167+
def on_timeout():
168+
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
169+
finish()
170+
if not future.done():
171+
future.set_exception(TimeoutError("Timeout waiting for nudge"))
172+
173+
iopub_channel.on_recv(on_iopub)
174+
shell_channel.on_recv(on_shell_reply)
175+
loop = IOLoop.current()
176+
177+
# Nudge the kernel with kernel info requests until we get an IOPub message
178+
def nudge():
179+
self.log.debug("Nudge")
180+
if not future.done():
181+
self.log.debug("nudging")
182+
self.session.send(shell_channel, "kernel_info_request")
183+
nudge_handle = loop.call_later(0.5, nudge)
184+
nudge_handle = loop.call_later(0, nudge)
185+
186+
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
187+
return future
130188

131189
def request_kernel_info(self):
132190
"""send a request for kernel_info"""
@@ -253,6 +311,7 @@ def _register_session(self):
253311
yield stale_handler.close()
254312
self._open_sessions[self.session_key] = self
255313

314+
@gen.coroutine
256315
def open(self, kernel_id):
257316
super().open()
258317
km = self.kernel_manager
@@ -269,9 +328,11 @@ def open(self, kernel_id):
269328
for channel, msg_list in replay_buffer:
270329
stream = self.channels[channel]
271330
self._on_zmq_reply(stream, msg_list)
331+
connected = Future()
332+
connected.set_result(None)
272333
else:
273334
try:
274-
self.create_stream()
335+
connected = self.create_stream()
275336
except web.HTTPError as e:
276337
self.log.error("Error opening stream: %s", e)
277338
# WebSockets don't response to traditional error codes so we
@@ -285,8 +346,13 @@ def open(self, kernel_id):
285346
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
286347
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
287348

288-
for channel, stream in self.channels.items():
289-
stream.on_recv_stream(self._on_zmq_reply)
349+
def subscribe(value):
350+
for channel, stream in self.channels.items():
351+
stream.on_recv_stream(self._on_zmq_reply)
352+
353+
connected.add_done_callback(subscribe)
354+
355+
return connected
290356

291357
def on_message(self, msg):
292358
if not self.channels:

notebook/services/kernels/kernelmanager.py

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -310,27 +310,52 @@ async def restart_kernel(self, kernel_id, now=False):
310310
await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now))
311311
kernel = self.get_kernel(kernel_id)
312312
# return a Future that will resolve when the kernel has successfully restarted
313-
channel = kernel.connect_shell()
313+
shell_channel = kernel.connect_shell() # TODO: do not use transient channels
314+
iopub_channel = kernel.connect_iopub()
315+
314316
future = Future()
317+
info_future = Future()
318+
iopub_future = Future()
315319

316320
def finish():
317-
"""Common cleanup when restart finishes/fails for any reason."""
318-
if not channel.closed():
319-
channel.close()
321+
"""Common cleanup"""
320322
loop.remove_timeout(timeout)
323+
loop.remove_timeout(nudge_handle)
324+
# iopub_channel.stop_on_recv() # TODO: do not use transient channels
325+
# shell_channel.stop_on_recv()
326+
if not iopub_channel.closed():
327+
iopub_channel.close()
328+
if not shell_channel.closed():
329+
shell_channel.close()
321330
kernel.remove_restart_callback(on_restart_failed, 'dead')
322331

323-
def on_reply(msg):
324-
self.log.debug("Kernel info reply received: %s", kernel_id)
325-
finish()
326-
if not future.done():
327-
future.set_result(msg)
332+
def on_shell_reply(msg):
333+
if not info_future.done():
334+
self.log.debug("Nudge: shell info reply received: %s", kernel_id)
335+
shell_channel.stop_on_recv()
336+
self.log.debug("Nudge: resolving shell future")
337+
info_future.set_result(msg)
338+
if iopub_future.done():
339+
finish()
340+
self.log.debug("Nudge: resolving main future in shell handler")
341+
future.set_result(info_future.result())
342+
343+
def on_iopub(msg):
344+
if not iopub_future.done():
345+
self.log.debug("Nudge: first IOPub received: %s", kernel_id)
346+
iopub_channel.stop_on_recv()
347+
self.log.debug("Nudge: resolving iopub future")
348+
iopub_future.set_result(None)
349+
if info_future.done():
350+
finish()
351+
self.log.debug("Nudge: resolving main future in iopub handler")
352+
future.set_result(info_future.result())
328353

329354
def on_timeout():
330-
self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
355+
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", kernel_id)
331356
finish()
332357
if not future.done():
333-
future.set_exception(TimeoutError("Timeout waiting for restart"))
358+
future.set_exception(TimeoutError("Timeout waiting for nudge"))
334359

335360
def on_restart_failed():
336361
self.log.warning("Restarting kernel failed: %s", kernel_id)
@@ -339,10 +364,21 @@ def on_restart_failed():
339364
future.set_exception(RuntimeError("Restart failed"))
340365

341366
kernel.add_restart_callback(on_restart_failed, 'dead')
342-
kernel.session.send(channel, "kernel_info_request")
343-
channel.on_recv(on_reply)
367+
368+
iopub_channel.on_recv(on_iopub)
369+
shell_channel.on_recv(on_shell_reply)
344370
loop = IOLoop.current()
371+
372+
# Nudge the kernel with kernel info requests until we get an IOPub message
373+
def nudge():
374+
self.log.debug("Nudge")
375+
if not future.done():
376+
self.log.debug("nudging")
377+
kernel.session.send(shell_channel, "kernel_info_request")
378+
nudge_handle = loop.call_later(0.5, nudge)
379+
345380
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
381+
nudge_handle = loop.call_later(0, nudge)
346382
return future
347383

348384
def notify_connect(self, kernel_id):

0 commit comments

Comments
 (0)