22
33import time
44import _interpchannels as _channels
5+ from . import _crossinterp
56
67# aliases:
78from _interpchannels import (
89 ChannelError , ChannelNotFoundError , ChannelClosedError ,
910 ChannelEmptyError , ChannelNotEmptyError ,
1011)
12+ from ._crossinterp import (
13+ UNBOUND_ERROR , UNBOUND_REMOVE ,
14+ )
1115
1216
1317__all__ = [
18+ 'UNBOUND' , 'UNBOUND_ERROR' , 'UNBOUND_REMOVE' ,
1419 'create' , 'list_all' ,
1520 'SendChannel' , 'RecvChannel' ,
1621 'ChannelError' , 'ChannelNotFoundError' , 'ChannelEmptyError' ,
22+ 'ItemInterpreterDestroyed' ,
1723]
1824
1925
20- def create ():
26+ class ItemInterpreterDestroyed (ChannelError ,
27+ _crossinterp .ItemInterpreterDestroyed ):
28+ """Raised from get() and get_nowait()."""
29+
30+
31+ UNBOUND = _crossinterp .UnboundItem .singleton ('queue' , __name__ )
32+
33+
34+ def _serialize_unbound (unbound ):
35+ if unbound is UNBOUND :
36+ unbound = _crossinterp .UNBOUND
37+ return _crossinterp .serialize_unbound (unbound )
38+
39+
40+ def _resolve_unbound (flag ):
41+ resolved = _crossinterp .resolve_unbound (flag , ItemInterpreterDestroyed )
42+ if resolved is _crossinterp .UNBOUND :
43+ resolved = UNBOUND
44+ return resolved
45+
46+
47+ def create (* , unbounditems = UNBOUND ):
2148 """Return (recv, send) for a new cross-interpreter channel.
2249
2350 The channel may be used to pass data safely between interpreters.
51+
52+ "unbounditems" sets the default for the send end of the channel.
53+ See SendChannel.send() for supported values. The default value
54+ is UNBOUND, which replaces the unbound item when received.
2455 """
25- cid = _channels .create ()
26- recv , send = RecvChannel (cid ), SendChannel (cid )
56+ unbound = _serialize_unbound (unbounditems )
57+ unboundop , = unbound
58+ cid = _channels .create (unboundop )
59+ recv , send = RecvChannel (cid ), SendChannel (cid , _unbound = unbound )
2760 return recv , send
2861
2962
3063def list_all ():
3164 """Return a list of (recv, send) for all open channels."""
32- return [(RecvChannel (cid ), SendChannel (cid ))
33- for cid in _channels .list_all ()]
65+ return [(RecvChannel (cid ), SendChannel (cid , _unbound = unbound ))
66+ for cid , unbound in _channels .list_all ()]
3467
3568
3669class _ChannelEnd :
@@ -106,12 +139,15 @@ def recv(self, timeout=None, *,
106139 if timeout < 0 :
107140 raise ValueError (f'timeout value must be non-negative' )
108141 end = time .time () + timeout
109- obj = _channels .recv (self ._id , _sentinel )
142+ obj , unboundop = _channels .recv (self ._id , _sentinel )
110143 while obj is _sentinel :
111144 time .sleep (_delay )
112145 if timeout is not None and time .time () >= end :
113146 raise TimeoutError
114- obj = _channels .recv (self ._id , _sentinel )
147+ obj , unboundop = _channels .recv (self ._id , _sentinel )
148+ if unboundop is not None :
149+ assert obj is None , repr (obj )
150+ return _resolve_unbound (unboundop )
115151 return obj
116152
117153 def recv_nowait (self , default = _NOT_SET ):
@@ -122,9 +158,13 @@ def recv_nowait(self, default=_NOT_SET):
122158 is the same as recv().
123159 """
124160 if default is _NOT_SET :
125- return _channels .recv (self ._id )
161+ obj , unboundop = _channels .recv (self ._id )
126162 else :
127- return _channels .recv (self ._id , default )
163+ obj , unboundop = _channels .recv (self ._id , default )
164+ if unboundop is not None :
165+ assert obj is None , repr (obj )
166+ return _resolve_unbound (unboundop )
167+ return obj
128168
129169 def close (self ):
130170 _channels .close (self ._id , recv = True )
@@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd):
135175
136176 _end = 'send'
137177
178+ def __new__ (cls , cid , * , _unbound = None ):
179+ if _unbound is None :
180+ try :
181+ op = _channels .get_channel_defaults (cid )
182+ _unbound = (op ,)
183+ except ChannelNotFoundError :
184+ _unbound = _serialize_unbound (UNBOUND )
185+ self = super ().__new__ (cls , cid )
186+ self ._unbound = _unbound
187+ return self
188+
138189 @property
139190 def is_closed (self ):
140191 info = self ._info
141192 return info .closed or info .closing
142193
143- def send (self , obj , timeout = None ):
194+ def send (self , obj , timeout = None , * ,
195+ unbound = None ,
196+ ):
144197 """Send the object (i.e. its data) to the channel's receiving end.
145198
146199 This blocks until the object is received.
147200 """
148- _channels .send (self ._id , obj , timeout = timeout , blocking = True )
201+ if unbound is None :
202+ unboundop , = self ._unbound
203+ else :
204+ unboundop , = _serialize_unbound (unbound )
205+ _channels .send (self ._id , obj , unboundop , timeout = timeout , blocking = True )
149206
150- def send_nowait (self , obj ):
207+ def send_nowait (self , obj , * ,
208+ unbound = None ,
209+ ):
151210 """Send the object to the channel's receiving end.
152211
153212 If the object is immediately received then return True
154213 (else False). Otherwise this is the same as send().
155214 """
215+ if unbound is None :
216+ unboundop , = self ._unbound
217+ else :
218+ unboundop , = _serialize_unbound (unbound )
156219 # XXX Note that at the moment channel_send() only ever returns
157220 # None. This should be fixed when channel_send_wait() is added.
158221 # See bpo-32604 and gh-19829.
159- return _channels .send (self ._id , obj , blocking = False )
222+ return _channels .send (self ._id , obj , unboundop , blocking = False )
160223
161- def send_buffer (self , obj , timeout = None ):
224+ def send_buffer (self , obj , timeout = None , * ,
225+ unbound = None ,
226+ ):
162227 """Send the object's buffer to the channel's receiving end.
163228
164229 This blocks until the object is received.
165230 """
166- _channels .send_buffer (self ._id , obj , timeout = timeout , blocking = True )
231+ if unbound is None :
232+ unboundop , = self ._unbound
233+ else :
234+ unboundop , = _serialize_unbound (unbound )
235+ _channels .send_buffer (self ._id , obj , unboundop ,
236+ timeout = timeout , blocking = True )
167237
168- def send_buffer_nowait (self , obj ):
238+ def send_buffer_nowait (self , obj , * ,
239+ unbound = None ,
240+ ):
169241 """Send the object's buffer to the channel's receiving end.
170242
171243 If the object is immediately received then return True
172244 (else False). Otherwise this is the same as send().
173245 """
174- return _channels .send_buffer (self ._id , obj , blocking = False )
246+ if unbound is None :
247+ unboundop , = self ._unbound
248+ else :
249+ unboundop , = _serialize_unbound (unbound )
250+ return _channels .send_buffer (self ._id , obj , unboundop , blocking = False )
175251
176252 def close (self ):
177253 _channels .close (self ._id , send = True )
0 commit comments