1010import logging
1111from textwrap import dedent
1212
13- from tornado import web
13+ from tornado import web , gen
1414from tornado .concurrent import Future
1515from tornado .ioloop import IOLoop
1616
@@ -122,11 +122,122 @@ def __repr__(self):
122122 def create_stream (self ):
123123 km = self .kernel_manager
124124 identity = self .session .bsession
125- for channel in ('shell ' , 'control ' , 'iopub ' , 'stdin' ):
125+ for channel in ('iopub ' , 'shell ' , 'control ' , 'stdin' ):
126126 meth = getattr (km , 'connect_' + channel )
127127 self .channels [channel ] = stream = meth (self .kernel_id , identity = identity )
128128 stream .channel = channel
129129
130+ def nudge (self ):
131+ """Nudge the zmq connections with kernel_info_requests
132+ Returns a Future that will resolve when we have received
133+ a shell reply and at least one iopub message,
134+ ensuring that zmq subscriptions are established,
135+ sockets are fully connected, and kernel is responsive.
136+ Keeps retrying kernel_info_request until these are both received.
137+ """
138+ kernel = self .kernel_manager .get_kernel (self .kernel_id )
139+
140+ # Do not nudge busy kernels as kernel info requests sent to shell are
141+ # queued behind execution requests.
142+ # nudging in this case would cause a potentially very long wait
143+ # before connections are opened,
144+ # plus it is *very* unlikely that a busy kernel will not finish
145+ # establishing its zmq subscriptions before processing the next request.
146+ if getattr (kernel , "execution_state" ) == "busy" :
147+ self .log .debug ("Nudge: not nudging busy kernel %s" , self .kernel_id )
148+ f = Future ()
149+ f .set_result (None )
150+ return f
151+
152+ # Use a transient shell channel to prevent leaking
153+ # shell responses to the front-end.
154+ shell_channel = kernel .connect_shell ()
155+ # The IOPub used by the client, whose subscriptions we are verifying.
156+ iopub_channel = self .channels ["iopub" ]
157+
158+ info_future = Future ()
159+ iopub_future = Future ()
160+ both_done = gen .multi ([info_future , iopub_future ])
161+
162+ def finish (f = None ):
163+ """Ensure all futures are resolved
164+ which in turn triggers cleanup
165+ """
166+ for f in (info_future , iopub_future ):
167+ if not f .done ():
168+ f .set_result (None )
169+
170+ def cleanup (f = None ):
171+ """Common cleanup"""
172+ loop .remove_timeout (nudge_handle )
173+ iopub_channel .stop_on_recv ()
174+ if not shell_channel .closed ():
175+ shell_channel .close ()
176+
177+ # trigger cleanup when both message futures are resolved
178+ both_done .add_done_callback (cleanup )
179+
180+ def on_shell_reply (msg ):
181+ self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
182+ if not info_future .done ():
183+ self .log .debug ("Nudge: resolving shell future: %s" , self .kernel_id )
184+ info_future .set_result (None )
185+
186+ def on_iopub (msg ):
187+ self .log .debug ("Nudge: IOPub received: %s" , self .kernel_id )
188+ if not iopub_future .done ():
189+ iopub_channel .stop_on_recv ()
190+ self .log .debug ("Nudge: resolving iopub future: %s" , self .kernel_id )
191+ iopub_future .set_result (None )
192+
193+ iopub_channel .on_recv (on_iopub )
194+ shell_channel .on_recv (on_shell_reply )
195+ loop = IOLoop .current ()
196+
197+ # Nudge the kernel with kernel info requests until we get an IOPub message
198+ def nudge (count ):
199+ count += 1
200+
201+ # NOTE: this close check appears to never be True during on_open,
202+ # even when the peer has closed the connection
203+ if self .ws_connection is None or self .ws_connection .is_closing ():
204+ self .log .debug (
205+ "Nudge: cancelling on closed websocket: %s" , self .kernel_id
206+ )
207+ finish ()
208+ return
209+
210+ # check for stopped kernel
211+ if self .kernel_id not in self .kernel_manager :
212+ self .log .debug (
213+ "Nudge: cancelling on stopped kernel: %s" , self .kernel_id
214+ )
215+ finish ()
216+ return
217+
218+ # check for closed zmq socket
219+ if shell_channel .closed ():
220+ self .log .debug (
221+ "Nudge: cancelling on closed zmq socket: %s" , self .kernel_id
222+ )
223+ finish ()
224+ return
225+
226+ if not both_done .done ():
227+ log = self .log .warning if count % 10 == 0 else self .log .debug
228+ log ("Nudge: attempt %s on kernel %s" % (count , self .kernel_id ))
229+ self .session .send (shell_channel , "kernel_info_request" )
230+ nonlocal nudge_handle
231+ nudge_handle = loop .call_later (0.5 , nudge , count )
232+
233+ nudge_handle = loop .call_later (0 , nudge , count = 0 )
234+
235+ # resolve with a timeout if we get no response
236+ future = gen .with_timeout (loop .time () + self .kernel_info_timeout , both_done )
237+ # ensure we have no dangling resources or unresolved Futures in case of timeout
238+ future .add_done_callback (finish )
239+ return future
240+
130241 def request_kernel_info (self ):
131242 """send a request for kernel_info"""
132243 km = self .kernel_manager
@@ -249,7 +360,7 @@ async def _register_session(self):
249360 await stale_handler .close ()
250361 self ._open_sessions [self .session_key ] = self
251362
252- def open (self , kernel_id ):
363+ async def open (self , kernel_id ):
253364 super (ZMQChannelsHandler , self ).open ()
254365 km = self .kernel_manager
255366 km .notify_connect (kernel_id )
@@ -259,15 +370,23 @@ def open(self, kernel_id):
259370 if buffer_info and buffer_info ['session_key' ] == self .session_key :
260371 self .log .info ("Restoring connection for %s" , self .session_key )
261372 self .channels = buffer_info ['channels' ]
262- replay_buffer = buffer_info ['buffer' ]
263- if replay_buffer :
264- self .log .info ("Replaying %s buffered messages" , len (replay_buffer ))
265- for channel , msg_list in replay_buffer :
266- stream = self .channels [channel ]
267- self ._on_zmq_reply (stream , msg_list )
373+
374+ connected = self .nudge ()
375+
376+ def replay (value ):
377+ replay_buffer = buffer_info ['buffer' ]
378+ if replay_buffer :
379+ self .log .info ("Replaying %s buffered messages" , len (replay_buffer ))
380+ for channel , msg_list in replay_buffer :
381+ stream = self .channels [channel ]
382+ self ._on_zmq_reply (stream , msg_list )
383+
384+
385+ connected .add_done_callback (replay )
268386 else :
269387 try :
270388 self .create_stream ()
389+ connected = self .nudge ()
271390 except web .HTTPError as e :
272391 self .log .error ("Error opening stream: %s" , e )
273392 # WebSockets don't response to traditional error codes so we
@@ -281,8 +400,14 @@ def open(self, kernel_id):
281400 km .add_restart_callback (self .kernel_id , self .on_kernel_restarted )
282401 km .add_restart_callback (self .kernel_id , self .on_restart_failed , 'dead' )
283402
284- for channel , stream in self .channels .items ():
285- stream .on_recv_stream (self ._on_zmq_reply )
403+ def subscribe (value ):
404+ for channel , stream in self .channels .items ():
405+ stream .on_recv_stream (self ._on_zmq_reply )
406+
407+ connected .add_done_callback (subscribe )
408+
409+ return connected
410+
286411
287412 def on_message (self , msg ):
288413 if not self .channels :
0 commit comments