1212)
1313
1414__all__ = [
15+ 'UNBOUND' , 'UNBOUND_ERROR' , 'UNBOUND_REMOVE' ,
1516 'create' , 'list_all' ,
1617 'Queue' ,
1718 'QueueError' , 'QueueNotFoundError' , 'QueueEmpty' , 'QueueFull' ,
19+ 'ItemInterpreterDestroyed' ,
1820]
1921
2022
@@ -32,47 +34,119 @@ class QueueFull(QueueError, queue.Full):
3234 """
3335
3436
37+ class ItemInterpreterDestroyed (QueueError ):
38+ """Raised from get() and get_nowait()."""
39+
40+
3541_SHARED_ONLY = 0
3642_PICKLED = 1
3743
38- def create (maxsize = 0 , * , syncobj = False ):
44+
45+ class UnboundItem :
46+ """Represents a Queue item no longer bound to an interpreter.
47+
48+ An item is unbound when the interpreter that added it to the queue
49+ is destroyed.
50+ """
51+
52+ __slots__ = ()
53+
54+ def __new__ (cls ):
55+ return UNBOUND
56+
57+ def __repr__ (self ):
58+ return f'interpreters.queues.UNBOUND'
59+
60+
61+ UNBOUND = object .__new__ (UnboundItem )
62+ UNBOUND_ERROR = object ()
63+ UNBOUND_REMOVE = object ()
64+
65+ _UNBOUND_CONSTANT_TO_FLAG = {
66+ UNBOUND_REMOVE : 1 ,
67+ UNBOUND_ERROR : 2 ,
68+ UNBOUND : 3 ,
69+ }
70+ _UNBOUND_FLAG_TO_CONSTANT = {v : k
71+ for k , v in _UNBOUND_CONSTANT_TO_FLAG .items ()}
72+
73+ def _serialize_unbound (unbound ):
74+ op = unbound
75+ try :
76+ flag = _UNBOUND_CONSTANT_TO_FLAG [op ]
77+ except KeyError :
78+ raise NotImplementedError (f'unsupported unbound replacement op { op !r} ' )
79+ return flag ,
80+
81+
82+ def _resolve_unbound (flag ):
83+ try :
84+ op = _UNBOUND_FLAG_TO_CONSTANT [flag ]
85+ except KeyError :
86+ raise NotImplementedError (f'unsupported unbound replacement op { flag !r} ' )
87+ if op is UNBOUND_REMOVE :
88+ # "remove" not possible here
89+ raise NotImplementedError
90+ elif op is UNBOUND_ERROR :
91+ raise ItemInterpreterDestroyed ("item's original interpreter destroyed" )
92+ elif op is UNBOUND :
93+ return UNBOUND
94+ else :
95+ raise NotImplementedError (repr (op ))
96+
97+
98+ def create (maxsize = 0 , * , syncobj = False , unbounditems = UNBOUND ):
3999 """Return a new cross-interpreter queue.
40100
41101 The queue may be used to pass data safely between interpreters.
42102
43103 "syncobj" sets the default for Queue.put()
44104 and Queue.put_nowait().
105+
106+ "unbounditems" likewise sets the default. See Queue.put() for
107+ supported values. The default value is UNBOUND, which replaces
108+ the unbound item.
45109 """
46110 fmt = _SHARED_ONLY if syncobj else _PICKLED
47- qid = _queues .create (maxsize , fmt )
48- return Queue (qid , _fmt = fmt )
111+ unbound = _serialize_unbound (unbounditems )
112+ unboundop , = unbound
113+ qid = _queues .create (maxsize , fmt , unboundop )
114+ return Queue (qid , _fmt = fmt , _unbound = unbound )
49115
50116
51117def list_all ():
52118 """Return a list of all open queues."""
53- return [Queue (qid , _fmt = fmt )
54- for qid , fmt in _queues .list_all ()]
119+ return [Queue (qid , _fmt = fmt , _unbound = ( unboundop ,) )
120+ for qid , fmt , unboundop in _queues .list_all ()]
55121
56122
57123_known_queues = weakref .WeakValueDictionary ()
58124
59125class Queue :
60126 """A cross-interpreter queue."""
61127
62- def __new__ (cls , id , / , * , _fmt = None ):
128+ def __new__ (cls , id , / , * , _fmt = None , _unbound = None ):
63129 # There is only one instance for any given ID.
64130 if isinstance (id , int ):
65131 id = int (id )
66132 else :
67133 raise TypeError (f'id must be an int, got { id !r} ' )
68134 if _fmt is None :
69- _fmt , = _queues .get_queue_defaults (id )
135+ if _unbound is None :
136+ _fmt , op = _queues .get_queue_defaults (id )
137+ _unbound = (op ,)
138+ else :
139+ _fmt , _ = _queues .get_queue_defaults (id )
140+ elif _unbound is None :
141+ _ , op = _queues .get_queue_defaults (id )
142+ _unbound = (op ,)
70143 try :
71144 self = _known_queues [id ]
72145 except KeyError :
73146 self = super ().__new__ (cls )
74147 self ._id = id
75148 self ._fmt = _fmt
149+ self ._unbound = _unbound
76150 _known_queues [id ] = self
77151 _queues .bind (id )
78152 return self
@@ -124,14 +198,15 @@ def qsize(self):
124198
125199 def put (self , obj , timeout = None , * ,
126200 syncobj = None ,
201+ unbound = None ,
127202 _delay = 10 / 1000 , # 10 milliseconds
128203 ):
129204 """Add the object to the queue.
130205
131206 This blocks while the queue is full.
132207
133208 If "syncobj" is None (the default) then it uses the
134- queue's default, set with create_queue()..
209+ queue's default, set with create_queue().
135210
136211 If "syncobj" is false then all objects are supported,
137212 at the expense of worse performance.
@@ -152,11 +227,37 @@ def put(self, obj, timeout=None, *,
152227 actually is. That's a slightly different and stronger promise
153228 than just (initial) equality, which is all "syncobj=False"
154229 can promise.
230+
231+ "unbound" controls the behavior of Queue.get() for the given
232+ object if the current interpreter (calling put()) is later
233+ destroyed.
234+
235+ If "unbound" is None (the default) then it uses the
236+ queue's default, set with create_queue(),
237+ which is usually UNBOUND.
238+
239+ If "unbound" is UNBOUND_ERROR then get() will raise an
240+ ItemInterpreterDestroyed exception if the original interpreter
241+ has been destroyed. This does not otherwise affect the queue;
242+ the next call to put() will work like normal, returning the next
243+ item in the queue.
244+
245+ If "unbound" is UNBOUND_REMOVE then the item will be removed
246+ from the queue as soon as the original interpreter is destroyed.
247+ Be aware that this will introduce an imbalance between put()
248+ and get() calls.
249+
250+ If "unbound" is UNBOUND then it is returned by get() in place
251+ of the unbound item.
155252 """
156253 if syncobj is None :
157254 fmt = self ._fmt
158255 else :
159256 fmt = _SHARED_ONLY if syncobj else _PICKLED
257+ if unbound is None :
258+ unboundop , = self ._unbound
259+ else :
260+ unboundop , = _serialize_unbound (unbound )
160261 if timeout is not None :
161262 timeout = int (timeout )
162263 if timeout < 0 :
@@ -166,29 +267,37 @@ def put(self, obj, timeout=None, *,
166267 obj = pickle .dumps (obj )
167268 while True :
168269 try :
169- _queues .put (self ._id , obj , fmt )
270+ _queues .put (self ._id , obj , fmt , unboundop )
170271 except QueueFull as exc :
171272 if timeout is not None and time .time () >= end :
172273 raise # re-raise
173274 time .sleep (_delay )
174275 else :
175276 break
176277
177- def put_nowait (self , obj , * , syncobj = None ):
278+ def put_nowait (self , obj , * , syncobj = None , unbound = None ):
178279 if syncobj is None :
179280 fmt = self ._fmt
180281 else :
181282 fmt = _SHARED_ONLY if syncobj else _PICKLED
283+ if unbound is None :
284+ unboundop , = self ._unbound
285+ else :
286+ unboundop , = _serialize_unbound (unbound )
182287 if fmt is _PICKLED :
183288 obj = pickle .dumps (obj )
184- _queues .put (self ._id , obj , fmt )
289+ _queues .put (self ._id , obj , fmt , unboundop )
185290
186291 def get (self , timeout = None , * ,
187292 _delay = 10 / 1000 , # 10 milliseconds
188293 ):
189294 """Return the next object from the queue.
190295
191296 This blocks while the queue is empty.
297+
298+ If the next item's original interpreter has been destroyed
299+ then the "next object" is determined by the value of the
300+ "unbound" argument to put().
192301 """
193302 if timeout is not None :
194303 timeout = int (timeout )
@@ -197,13 +306,16 @@ def get(self, timeout=None, *,
197306 end = time .time () + timeout
198307 while True :
199308 try :
200- obj , fmt = _queues .get (self ._id )
309+ obj , fmt , unboundop = _queues .get (self ._id )
201310 except QueueEmpty as exc :
202311 if timeout is not None and time .time () >= end :
203312 raise # re-raise
204313 time .sleep (_delay )
205314 else :
206315 break
316+ if unboundop is not None :
317+ assert obj is None , repr (obj )
318+ return _resolve_unbound (unboundop )
207319 if fmt == _PICKLED :
208320 obj = pickle .loads (obj )
209321 else :
@@ -217,9 +329,12 @@ def get_nowait(self):
217329 is the same as get().
218330 """
219331 try :
220- obj , fmt = _queues .get (self ._id )
332+ obj , fmt , unboundop = _queues .get (self ._id )
221333 except QueueEmpty as exc :
222334 raise # re-raise
335+ if unboundop is not None :
336+ assert obj is None , repr (obj )
337+ return _resolve_unbound (unboundop )
223338 if fmt == _PICKLED :
224339 obj = pickle .loads (obj )
225340 else :
0 commit comments