@@ -78,7 +78,6 @@ def post(self, kernel_id, action):
7878 yield maybe_future (km .interrupt_kernel (kernel_id ))
7979 self .set_status (204 )
8080 if action == 'restart' :
81-
8281 try :
8382 yield maybe_future (km .restart_kernel (kernel_id ))
8483 except Exception as e :
@@ -121,12 +120,71 @@ def __repr__(self):
121120 return "%s(%s)" % (self .__class__ .__name__ , getattr (self , 'kernel_id' , 'uninitialized' ))
122121
123122 def create_stream (self ):
123+ self .log .debug ("Create stream" )
124124 km = self .kernel_manager
125125 identity = self .session .bsession
126126 for channel in ('shell' , 'control' , 'iopub' , 'stdin' ):
127127 meth = getattr (km , 'connect_' + channel )
128128 self .channels [channel ] = stream = meth (self .kernel_id , identity = identity )
129129 stream .channel = channel
130+
131+ shell_channel = self .channels ['shell' ]
132+ iopub_channel = self .channels ['iopub' ]
133+
134+ future = Future ()
135+ info_future = Future ()
136+ iopub_future = Future ()
137+
138+ def finish ():
139+ """Common cleanup"""
140+ loop .remove_timeout (timeout )
141+ loop .remove_timeout (nudge_handle )
142+ iopub_channel .stop_on_recv ()
143+ shell_channel .stop_on_recv ()
144+
145+ def on_shell_reply (msg ):
146+ if not info_future .done ():
147+ self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
148+ shell_channel .stop_on_recv ()
149+ self .log .debug ("Nudge: resolving shell future" )
150+ info_future .set_result (msg )
151+ if iopub_future .done ():
152+ finish ()
153+ self .log .debug ("Nudge: resolving main future in shell handler" )
154+ future .set_result (info_future .result ())
155+
156+ def on_iopub (msg ):
157+ if not iopub_future .done ():
158+ self .log .debug ("Nudge: first IOPub received: %s" , self .kernel_id )
159+ iopub_channel .stop_on_recv ()
160+ self .log .debug ("Nudge: resolving iopub future" )
161+ iopub_future .set_result (None )
162+ if info_future .done ():
163+ finish ()
164+ self .log .debug ("Nudge: resolving main future in iopub handler" )
165+ future .set_result (info_future .result ())
166+
167+ def on_timeout ():
168+ self .log .warning ("Nudge: Timeout waiting for kernel_info_reply: %s" , self .kernel_id )
169+ finish ()
170+ if not future .done ():
171+ future .set_exception (TimeoutError ("Timeout waiting for nudge" ))
172+
173+ iopub_channel .on_recv (on_iopub )
174+ shell_channel .on_recv (on_shell_reply )
175+ loop = IOLoop .current ()
176+
177+ # Nudge the kernel with kernel info requests until we get an IOPub message
178+ def nudge ():
179+ self .log .debug ("Nudge" )
180+ if not future .done ():
181+ self .log .debug ("nudging" )
182+ self .session .send (shell_channel , "kernel_info_request" )
183+ nudge_handle = loop .call_later (0.5 , nudge )
184+ nudge_handle = loop .call_later (0 , nudge )
185+
186+ timeout = loop .add_timeout (loop .time () + self .kernel_info_timeout , on_timeout )
187+ return future
130188
131189 def request_kernel_info (self ):
132190 """send a request for kernel_info"""
@@ -193,6 +251,7 @@ def initialize(self):
193251 super ().initialize ()
194252 self .zmq_stream = None
195253 self .channels = {}
254+ self .kernel_manager .channels = self .channels
196255 self .kernel_id = None
197256 self .kernel_info_channel = None
198257 self ._kernel_info_future = Future ()
@@ -253,6 +312,7 @@ def _register_session(self):
253312 yield stale_handler .close ()
254313 self ._open_sessions [self .session_key ] = self
255314
315+ @gen .coroutine
256316 def open (self , kernel_id ):
257317 super ().open ()
258318 km = self .kernel_manager
@@ -269,9 +329,11 @@ def open(self, kernel_id):
269329 for channel , msg_list in replay_buffer :
270330 stream = self .channels [channel ]
271331 self ._on_zmq_reply (stream , msg_list )
332+ connected = Future ()
333+ connected .set_result (None )
272334 else :
273335 try :
274- self .create_stream ()
336+ connected = self .create_stream ()
275337 except web .HTTPError as e :
276338 self .log .error ("Error opening stream: %s" , e )
277339 # WebSockets don't response to traditional error codes so we
@@ -285,8 +347,13 @@ def open(self, kernel_id):
285347 km .add_restart_callback (self .kernel_id , self .on_kernel_restarted )
286348 km .add_restart_callback (self .kernel_id , self .on_restart_failed , 'dead' )
287349
288- for channel , stream in self .channels .items ():
289- stream .on_recv_stream (self ._on_zmq_reply )
350+ def subscribe (value ):
351+ for channel , stream in self .channels .items ():
352+ stream .on_recv_stream (self ._on_zmq_reply )
353+
354+ connected .add_done_callback (subscribe )
355+
356+ return connected
290357
291358 def on_message (self , msg ):
292359 if not self .channels :
0 commit comments