2626
2727from .util import debug , info , Finalize , register_after_fork , is_exiting
2828
29- _queue_alive = 0
30- _queue_shutdown = 1
31- _queue_shutdown_immediate = 2
32-
3329#
3430# Queue type using a pipe, buffer and thread
3531#
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
5248 # For use by concurrent.futures
5349 self ._ignore_epipe = False
5450 self ._reset ()
55- self ._shutdown_state = ctx .Value ('i ' , _queue_alive )
51+ self ._is_shutdown = ctx .Value ('B ' , False )
5652
5753 if sys .platform != 'win32' :
5854 register_after_fork (self , Queue ._after_fork )
@@ -61,12 +57,12 @@ def __getstate__(self):
6157 context .assert_spawning (self )
6258 return (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6359 self ._rlock , self ._wlock , self ._sem , self ._opid ,
64- self ._shutdown_state )
60+ self ._is_shutdown )
6561
6662 def __setstate__ (self , state ):
6763 (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6864 self ._rlock , self ._wlock , self ._sem , self ._opid ,
69- self ._shutdown_state ) = state
65+ self ._is_shutdown ) = state
7066 self ._reset ()
7167
7268 def _after_fork (self ):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
8884 self ._recv_bytes = self ._reader .recv_bytes
8985 self ._poll = self ._reader .poll
9086
91- def _is_alive (self ):
92- return self ._shutdown_state .value == _queue_alive
93-
94- def _is_shutdown (self ):
95- return self ._shutdown_state .value == _queue_shutdown
96-
97- def _is_shutdown_immediate (self ):
98- return self ._shutdown_state .value == _queue_shutdown_immediate
99-
100- def _set_shutdown (self ):
101- self ._shutdown_state .value = _queue_shutdown
102-
103- def _set_shutdown_immediate (self ):
104- self ._shutdown_state .value = _queue_shutdown_immediate
105-
10687 def put (self , obj , block = True , timeout = None ):
10788 if self ._closed :
10889 raise ValueError (f"Queue { self !r} is closed" )
109- if not self ._is_alive () :
90+ if self ._is_shutdown . value :
11091 raise ShutDown
11192 if not self ._sem .acquire (block , timeout ):
112- if not self ._is_alive () :
93+ if self ._is_shutdown . value :
11394 raise ShutDown
11495 raise Full
11596
11697 with self ._notempty :
98+ if self ._is_shutdown .value :
99+ raise ShutDown
117100 if self ._thread is None :
118101 self ._start_thread ()
119102 self ._buffer .append (obj )
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124107 raise ValueError (f"Queue { self !r} is closed" )
125108 if block and timeout is None :
126109 with self ._rlock :
127- # checks shutdown state
128- if (self ._is_shutdown_immediate ()
129- or (self ._is_shutdown () and self .empty ())):
110+ if self ._is_shutdown .value and self .empty ():
130111 raise ShutDown
131112 res = self ._recv_bytes ()
132113 self ._sem .release ()
133114 else :
134115 if block :
135116 deadline = time .monotonic () + timeout
136117 if not self ._rlock .acquire (block , timeout ):
137- if (self ._is_shutdown_immediate ()
138- or (self ._is_shutdown () and self .empty ())):
118+ if self ._is_shutdown .value and self .empty ():
139119 raise ShutDown
140120 raise Empty
141121 try :
142122 if block :
143123 timeout = deadline - time .monotonic ()
144124 if not self ._poll (timeout ):
145- if not self ._is_alive () :
125+ if self ._is_shutdown . value :
146126 raise ShutDown
147127 raise Empty
148128 elif not self ._poll ():
149- if not self ._is_alive () :
129+ if self ._is_shutdown . value :
150130 raise ShutDown
151131 raise Empty
152132
153- # here queue is not empty
154- if self ._is_shutdown_immediate ():
155- raise ShutDown
156- # here shutdown state queue is alive or shutdown
157133 res = self ._recv_bytes ()
158134 self ._sem .release ()
159135 finally :
@@ -178,18 +154,19 @@ def get_nowait(self):
178154 def put_nowait (self , obj ):
179155 return self .put (obj , False )
180156
157+ def _clear (self ):
158+ with self ._rlock :
159+ while self ._poll ():
160+ self ._recv_bytes ()
161+
181162 def shutdown (self , immediate = False ):
182163 if self ._closed :
183164 raise ValueError (f"Queue { self !r} is closed" )
184- with self ._shutdown_state .get_lock ():
185- if self ._is_shutdown_immediate ():
186- return
165+ with self ._is_shutdown .get_lock ():
166+ self ._is_shutdown .value = True
187167 if immediate :
188- self ._set_shutdown_immediate ()
189- with self ._notempty :
190- self ._notempty .notify_all ()
191- else :
192- self ._set_shutdown ()
168+ self ._clear ()
169+ self ._sem .release (self .qsize ())
193170
194171 def close (self ):
195172 self ._closed = True
@@ -384,14 +361,16 @@ def __setstate__(self, state):
384361 def put (self , obj , block = True , timeout = None ):
385362 if self ._closed :
386363 raise ValueError (f"Queue { self !r} is closed" )
387- if not self ._is_alive () :
364+ if self ._is_shutdown . value :
388365 raise ShutDown
389366 if not self ._sem .acquire (block , timeout ):
390- if not self ._is_alive () :
367+ if self ._is_shutdown . value :
391368 raise ShutDown
392369 raise Full
393370
394371 with self ._notempty , self ._cond :
372+ if self ._is_shutdown .value :
373+ raise ShutDown
395374 if self ._thread is None :
396375 self ._start_thread ()
397376 self ._buffer .append (obj )
@@ -400,27 +379,22 @@ def put(self, obj, block=True, timeout=None):
400379
401380 def task_done (self ):
402381 with self ._cond :
403- if self ._is_shutdown_immediate ():
404- raise ShutDown
405382 if not self ._unfinished_tasks .acquire (False ):
406383 raise ValueError ('task_done() called too many times' )
407384 if self ._unfinished_tasks ._semlock ._is_zero ():
408385 self ._cond .notify_all ()
409386
410387 def join (self ):
411388 with self ._cond :
412- if self ._is_shutdown_immediate ():
413- raise ShutDown
414389 if not self ._unfinished_tasks ._semlock ._is_zero ():
415390 self ._cond .wait ()
416- if self ._is_shutdown_immediate ():
417- raise ShutDown
418391
419- def shutdown (self , immediate = False ):
420- with self ._cond :
421- is_alive = self ._is_alive ()
422- super ().shutdown (immediate )
423- if is_alive :
392+ def _clear (self ):
393+ with self ._rlock :
394+ while self ._poll ():
395+ self ._recv_bytes ()
396+ self ._unfinished_tasks .acquire (block = False )
397+ with self ._cond :
424398 self ._cond .notify_all ()
425399
426400#
0 commit comments