Skip to content

BlockingConnectionPool deadlock (double condition.acquire) #3056

Closed as not planned
@TsaplinIA

Description

@TsaplinIA

Version: 5.0.1

Platform: Ubuntu 22.04.3 LTS

Description: Got deadlock when use BlockingConnectionPool

Here is example:

import asyncio
import time
import uuid

import redis.asyncio as aioredis
import sys

if sys.version_info >= (3, 11, 3):
    from asyncio import timeout as async_timeout
else:
    from async_timeout import timeout as async_timeout

class MyCondition(asyncio.Condition):
    async def __aenter__(self):
        global last_acquire_id
        current_task_id = id(asyncio.current_task())
        print(
            f"Task({current_task_id}) wanna acquire condition "
            f"is deadlock" if last_acquire_id == current_task_id else ""
        )
        await self.acquire()
        print(f"Task({current_task_id}) acquire condition")
        last_acquire_id = current_task_id
        return Noneasync def __aexit__(self, exc_type, exc, tb):
        global last_acquire_id
        current_task_id = id(asyncio.current_task())
​
        print(f"Task({current_task_id}) wanna release condition")
        self.release()
        if last_acquire_id == current_task_id:
            last_acquire_id = None
        print(f"Task({id(asyncio.current_task())}) release condition")
​
​
class MyBlockingConnectionPool(aioredis.BlockingConnectionPool):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._condition = MyCondition()
​
​
async def get_something_from_redis(redis: aioredis.Redis):
    random_key = uuid.uuid4().hex
    try:
        return await redis.get(random_key)
    except (TimeoutError, aioredis.ConnectionError) as e:
        pass
​
​
async def start_loop(redis: aioredis.Redis, iter_size: int = 100):
    while True:
        tasks = [
            asyncio.create_task(get_something_from_redis(redis))
            for _ in range(iter_size)
        ]
        await asyncio.gather(*tasks)
​
​
async def main():
    redis_blocking_pool: aioredis.Redis = aioredis.Redis(
        connection_pool=MyBlockingConnectionPool.from_url(
            "redis://127.0.0.1:6379/0",
            max_connections=10,
            timeout=0.01,
        )
    )
​
    print("START LOOP")
    await start_loop(redis_blocking_pool, 100)
    print("END LOOP")
​
if __name__ == "__main__":
    asyncio.run(main())

result:

OUTPUT:
START LOOP
​
Task(140051711422144) acquire condition
Task(140051711422144) wanna release condition
Task(140051711422144) release condition
Task(140051711422528) acquire condition
Task(140051711422528) wanna release condition
Task(140051711422528) release condition
Task(140051711422336) acquire condition
Task(140051711422336) wanna release condition
Task(140051711422336) release condition
Task(140051711422912) acquire condition
Task(140051711422912) wanna release condition
Task(140051711422912) release condition
Task(140051711423104) acquire condition
Task(140051711423104) wanna acquire condition is deadlock

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions