Skip to content

Commit 7a3ae6e

Browse files
committed
Nudge kernel with info request until we receive IOPub messages
1 parent 0c83c9d commit 7a3ae6e

File tree

1 file changed

+79
-8
lines changed

1 file changed

+79
-8
lines changed

notebook/services/kernels/handlers.py

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,65 @@ def create_stream(self):
128128
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
129129
stream.channel = channel
130130

131+
def nudge(self):
132+
shell_channel = self.channels['shell']
133+
iopub_channel = self.channels['iopub']
134+
135+
future = Future()
136+
info_future = Future()
137+
iopub_future = Future()
138+
139+
def finish():
140+
"""Common cleanup"""
141+
loop.remove_timeout(timeout)
142+
loop.remove_timeout(nudge_handle)
143+
iopub_channel.stop_on_recv()
144+
shell_channel.stop_on_recv()
145+
146+
def on_shell_reply(msg):
147+
if not info_future.done():
148+
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
149+
shell_channel.stop_on_recv()
150+
self.log.debug("Nudge: resolving shell future")
151+
info_future.set_result(msg)
152+
if iopub_future.done():
153+
finish()
154+
self.log.debug("Nudge: resolving main future in shell handler")
155+
future.set_result(info_future.result())
156+
157+
def on_iopub(msg):
158+
if not iopub_future.done():
159+
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
160+
iopub_channel.stop_on_recv()
161+
self.log.debug("Nudge: resolving iopub future")
162+
iopub_future.set_result(None)
163+
if info_future.done():
164+
finish()
165+
self.log.debug("Nudge: resolving main future in iopub handler")
166+
future.set_result(info_future.result())
167+
168+
def on_timeout():
169+
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
170+
finish()
171+
if not future.done():
172+
future.set_exception(TimeoutError("Timeout waiting for nudge"))
173+
174+
iopub_channel.on_recv(on_iopub)
175+
shell_channel.on_recv(on_shell_reply)
176+
loop = IOLoop.current()
177+
178+
# Nudge the kernel with kernel info requests until we get an IOPub message
179+
def nudge():
180+
self.log.debug("Nudge")
181+
if not future.done():
182+
self.log.debug("nudging")
183+
self.session.send(shell_channel, "kernel_info_request")
184+
nudge_handle = loop.call_later(0.5, nudge)
185+
nudge_handle = loop.call_later(0, nudge)
186+
187+
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
188+
return future
189+
131190
def request_kernel_info(self):
132191
"""send a request for kernel_info"""
133192
km = self.kernel_manager
@@ -253,6 +312,7 @@ def _register_session(self):
253312
yield stale_handler.close()
254313
self._open_sessions[self.session_key] = self
255314

315+
@gen.coroutine
256316
def open(self, kernel_id):
257317
super().open()
258318
km = self.kernel_manager
@@ -263,15 +323,21 @@ def open(self, kernel_id):
263323
if buffer_info and buffer_info['session_key'] == self.session_key:
264324
self.log.info("Restoring connection for %s", self.session_key)
265325
self.channels = buffer_info['channels']
266-
replay_buffer = buffer_info['buffer']
267-
if replay_buffer:
268-
self.log.info("Replaying %s buffered messages", len(replay_buffer))
269-
for channel, msg_list in replay_buffer:
270-
stream = self.channels[channel]
271-
self._on_zmq_reply(stream, msg_list)
326+
connected = self.nudge()
327+
328+
def replay(value):
329+
replay_buffer = buffer_info['buffer']
330+
if replay_buffer:
331+
self.log.info("Replaying %s buffered messages", len(replay_buffer))
332+
for channel, msg_list in replay_buffer:
333+
stream = self.channels[channel]
334+
self._on_zmq_reply(stream, msg_list)
335+
336+
connected.add_done_callback(replay)
272337
else:
273338
try:
274339
self.create_stream()
340+
connected = self.nudge()
275341
except web.HTTPError as e:
276342
self.log.error("Error opening stream: %s", e)
277343
# WebSockets don't response to traditional error codes so we
@@ -285,8 +351,13 @@ def open(self, kernel_id):
285351
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
286352
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
287353

288-
for channel, stream in self.channels.items():
289-
stream.on_recv_stream(self._on_zmq_reply)
354+
def subscribe(value):
355+
for channel, stream in self.channels.items():
356+
stream.on_recv_stream(self._on_zmq_reply)
357+
358+
connected.add_done_callback(subscribe)
359+
360+
return connected
290361

291362
def on_message(self, msg):
292363
if not self.channels:

0 commit comments

Comments
 (0)