2626
2727from .util import debug , info , Finalize , register_after_fork , is_exiting
2828
29- _queue_alive = 0
30- _queue_shutdown = 1
31- _queue_shutdown_immediate = 2
32-
3329#
3430# Queue type using a pipe, buffer and thread
3531#
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
5248 # For use by concurrent.futures
5349 self ._ignore_epipe = False
5450 self ._reset ()
55- self ._shutdown_state = ctx .Value ('i ' , _queue_alive )
51+ self ._is_shutdown = ctx .Value ('B ' , False , lock = self . _rlock )
5652
5753 if sys .platform != 'win32' :
5854 register_after_fork (self , Queue ._after_fork )
@@ -61,12 +57,12 @@ def __getstate__(self):
6157 context .assert_spawning (self )
6258 return (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6359 self ._rlock , self ._wlock , self ._sem , self ._opid ,
64- self ._shutdown_state )
60+ self ._is_shutdown )
6561
6662 def __setstate__ (self , state ):
6763 (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6864 self ._rlock , self ._wlock , self ._sem , self ._opid ,
69- self ._shutdown_state ) = state
65+ self ._is_shutdown ) = state
7066 self ._reset ()
7167
7268 def _after_fork (self ):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
8884 self ._recv_bytes = self ._reader .recv_bytes
8985 self ._poll = self ._reader .poll
9086
91- def _is_alive (self ):
92- return self ._shutdown_state .value == _queue_alive
93-
94- def _is_shutdown (self ):
95- return self ._shutdown_state .value == _queue_shutdown
96-
97- def _is_shutdown_immediate (self ):
98- return self ._shutdown_state .value == _queue_shutdown_immediate
99-
100- def _set_shutdown (self ):
101- self ._shutdown_state .value = _queue_shutdown
102-
103- def _set_shutdown_immediate (self ):
104- self ._shutdown_state .value = _queue_shutdown_immediate
105-
10687 def put (self , obj , block = True , timeout = None ):
10788 if self ._closed :
10889 raise ValueError (f"Queue { self !r} is closed" )
109- if not self ._is_alive () :
90+ if self ._is_shutdown . value :
11091 raise ShutDown
11192 if not self ._sem .acquire (block , timeout ):
112- if not self ._is_alive () :
93+ if self ._is_shutdown . value :
11394 raise ShutDown
11495 raise Full
11596
11697 with self ._notempty :
98+ if self ._is_shutdown .value :
99+ raise ShutDown
117100 if self ._thread is None :
118101 self ._start_thread ()
119102 self ._buffer .append (obj )
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124107 raise ValueError (f"Queue { self !r} is closed" )
125108 if block and timeout is None :
126109 with self ._rlock :
127- # checks shutdown state
128- if (self ._is_shutdown_immediate ()
129- or (self ._is_shutdown () and self .empty ())):
110+ if self ._is_shutdown .value and self .empty ():
130111 raise ShutDown
131112 res = self ._recv_bytes ()
132113 self ._sem .release ()
133114 else :
134115 if block :
135116 deadline = time .monotonic () + timeout
136117 if not self ._rlock .acquire (block , timeout ):
137- if (self ._is_shutdown_immediate ()
138- or (self ._is_shutdown () and self .empty ())):
118+ if self ._is_shutdown .value and self .empty ():
139119 raise ShutDown
140120 raise Empty
141121 try :
142122 if block :
143123 timeout = deadline - time .monotonic ()
144124 if not self ._poll (timeout ):
145- if not self ._is_alive () :
125+ if self ._is_shutdown . value :
146126 raise ShutDown
147127 raise Empty
148128 elif not self ._poll ():
149- if not self ._is_alive () :
129+ if self ._is_shutdown . value :
150130 raise ShutDown
151131 raise Empty
152132
153- # here queue is not empty
154- if self ._is_shutdown_immediate ():
155- raise ShutDown
156- # here shutdown state queue is alive or shutdown
157133 res = self ._recv_bytes ()
158134 self ._sem .release ()
159135 finally :
@@ -178,18 +154,21 @@ def get_nowait(self):
178154 def put_nowait (self , obj ):
179155 return self .put (obj , False )
180156
157+ def _clear (self ):
158+ with self ._rlock :
159+ while self ._poll ():
160+ self ._recv_bytes ()
161+
181162 def shutdown (self , immediate = False ):
182163 if self ._closed :
183164 raise ValueError (f"Queue { self !r} is closed" )
184- with self ._shutdown_state .get_lock ():
185- if self ._is_shutdown_immediate ():
186- return
165+ with self ._is_shutdown .get_lock ():
166+ self ._is_shutdown .value = True
187167 if immediate :
188- self ._set_shutdown_immediate ()
168+ self ._clear ()
189169 with self ._notempty :
190170 self ._notempty .notify_all ()
191- else :
192- self ._set_shutdown ()
171+ self ._sem .release (self .qsize ())
193172
194173 def close (self ):
195174 self ._closed = True
@@ -384,14 +363,16 @@ def __setstate__(self, state):
384363 def put (self , obj , block = True , timeout = None ):
385364 if self ._closed :
386365 raise ValueError (f"Queue { self !r} is closed" )
387- if not self ._is_alive () :
366+ if self ._is_shutdown . value :
388367 raise ShutDown
389368 if not self ._sem .acquire (block , timeout ):
390- if not self ._is_alive () :
369+ if self ._is_shutdown . value :
391370 raise ShutDown
392371 raise Full
393372
394373 with self ._notempty , self ._cond :
374+ if self ._is_shutdown .value :
375+ raise ShutDown
395376 if self ._thread is None :
396377 self ._start_thread ()
397378 self ._buffer .append (obj )
@@ -400,27 +381,22 @@ def put(self, obj, block=True, timeout=None):
400381
401382 def task_done (self ):
402383 with self ._cond :
403- if self ._is_shutdown_immediate ():
404- raise ShutDown
405384 if not self ._unfinished_tasks .acquire (False ):
406385 raise ValueError ('task_done() called too many times' )
407386 if self ._unfinished_tasks ._semlock ._is_zero ():
408387 self ._cond .notify_all ()
409388
410389 def join (self ):
411390 with self ._cond :
412- if self ._is_shutdown_immediate ():
413- raise ShutDown
414391 if not self ._unfinished_tasks ._semlock ._is_zero ():
415392 self ._cond .wait ()
416- if self ._is_shutdown_immediate ():
417- raise ShutDown
418393
419- def shutdown (self , immediate = False ):
420- with self ._cond :
421- is_alive = self ._is_alive ()
422- super ().shutdown (immediate )
423- if is_alive :
394+ def _clear (self ):
395+ with self ._rlock :
396+ while self ._poll ():
397+ self ._recv_bytes ()
398+ self ._unfinished_tasks .acquire (block = False )
399+ with self ._cond :
424400 self ._cond .notify_all ()
425401
426402#
0 commit comments