Skip to content

Commit c47dacb

Browse files
jmf-mordis1st1
authored andcommitted
bpo-32574: Fix leaks in asyncio.Queue.put() and .get() (#5208)
1 parent c9070d0 commit c47dacb

File tree

3 files changed

+63
-2
lines changed

3 files changed

+63
-2
lines changed

Lib/asyncio/queues.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ async def put(self, item):
121121
await putter
122122
except:
123123
putter.cancel() # Just in case putter is not done yet.
124+
try:
125+
# Clean self._putters from canceled putters.
126+
self._putters.remove(putter)
127+
except ValueError:
128+
# The putter could be removed from self._putters by a
129+
# previous get_nowait call.
130+
pass
124131
if not self.full() and not putter.cancelled():
125132
# We were woken up by get_nowait(), but can't take
126133
# the call. Wake up the next in line.
@@ -152,12 +159,13 @@ async def get(self):
152159
await getter
153160
except:
154161
getter.cancel() # Just in case getter is not done yet.
155-
156162
try:
163+
# Clean self._getters from canceled getters.
157164
self._getters.remove(getter)
158165
except ValueError:
166+
# The getter could be removed from self._getters by a
167+
# previous put_nowait call.
159168
pass
160-
161169
if not self.empty() and not getter.cancelled():
162170
# We were woken up by put_nowait(), but can't take
163171
# the call. Wake up the next in line.

Lib/test/test_asyncio/test_queues.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,56 @@ async def getter():
520520
self.loop.run_until_complete(
521521
asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop))
522522

523+
def test_cancelled_puts_not_being_held_in_self_putters(self):
524+
def a_generator():
525+
yield 0.01
526+
yield 0.1
527+
528+
loop = self.new_test_loop(a_generator)
529+
530+
# Full queue.
531+
queue = asyncio.Queue(loop=loop, maxsize=1)
532+
queue.put_nowait(1)
533+
534+
# Task waiting for space to put an item in the queue.
535+
put_task = loop.create_task(queue.put(1))
536+
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
537+
538+
# Check that the putter is correctly removed from queue._putters when
539+
# the task is canceled.
540+
self.assertEqual(len(queue._putters), 1)
541+
put_task.cancel()
542+
with self.assertRaises(asyncio.CancelledError):
543+
loop.run_until_complete(put_task)
544+
self.assertEqual(len(queue._putters), 0)
545+
546+
def test_cancelled_put_silence_value_error_exception(self):
547+
def gen():
548+
yield 0.01
549+
yield 0.1
550+
551+
loop = self.new_test_loop(gen)
552+
553+
# Full Queue.
554+
queue = asyncio.Queue(1, loop=loop)
555+
queue.put_nowait(1)
556+
557+
# Task waiting for space to put a item in the queue.
558+
put_task = loop.create_task(queue.put(1))
559+
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
560+
561+
# get_nowait() remove the future of put_task from queue._putters.
562+
queue.get_nowait()
563+
# When canceled, queue.put is going to remove its future from
564+
# self._putters but it was removed previously by queue.get_nowait().
565+
put_task.cancel()
566+
567+
# The ValueError exception triggered by queue._putters.remove(putter)
568+
# inside queue.put should be silenced.
569+
# If the ValueError is silenced we should catch a CancelledError.
570+
with self.assertRaises(asyncio.CancelledError):
571+
loop.run_until_complete(put_task)
572+
523573

524574
class LifoQueueTests(_QueueTestBase):
525575

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix memory leak in asyncio.Queue, when the queue has limited size and it is
2+
full, the cancelation of queue.put() can cause a memory leak. Patch by: José
3+
Melero.

0 commit comments

Comments
 (0)