Skip to content

Commit 8251414

Browse files
authored
V0.31 (#9)
* feat(rabbitmq): 添加上下文管理器支持并修复连接泄漏问题 实现上下文管理器接口,支持with语法自动资源清理 重构连接和通道关闭逻辑,确保线程安全和正确关闭顺序 添加连接泄漏测试脚本和示例代码 更新文档说明推荐使用上下文管理器模式
1 parent 0c54239 commit 8251414

File tree

5 files changed

+323
-25
lines changed

5 files changed

+323
-25
lines changed

.vscode/launch.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"version": "0.2.0",
3+
"configurations": [
4+
{
5+
"name": "Python Debugger: Current File",
6+
"type": "debugpy",
7+
"request": "launch",
8+
"program": "${file}",
9+
"console": "integratedTerminal",
10+
"env": {
11+
"PYTHONPATH": "${workspaceFolder}/src"
12+
}
13+
}
14+
]
15+
}

README.md

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
- 线程安全设计
2121
- 完善的错误处理和日志记录
2222
- 支持消息确认机制
23+
- **修复连接泄漏问题**,确保生产环境稳定性
24+
- 支持上下文管理器,自动资源清理
2325

2426
## 安装
2527

@@ -37,6 +39,29 @@ poetry add use-rabbitmq
3739

3840
### 基本使用
3941

42+
#### 推荐方式:使用上下文管理器
43+
44+
```python
45+
from use_rabbitmq import RabbitMQStore
46+
47+
# 推荐:使用上下文管理器,自动清理资源
48+
with RabbitMQStore(
49+
host="localhost",
50+
port=5672,
51+
username="guest",
52+
password="guest"
53+
) as rmq:
54+
# 发送消息
55+
rmq.send(queue_name="test_queue", message="Hello, RabbitMQ!")
56+
57+
# 获取消息数量
58+
count = rmq.get_message_counts("test_queue")
59+
print(f"队列中有 {count} 条消息")
60+
# 连接会自动关闭,避免连接泄漏
61+
```
62+
63+
#### 传统方式:手动管理资源
64+
4065
```python
4166
from use_rabbitmq import RabbitMQStore
4267

@@ -48,14 +73,18 @@ rmq = RabbitMQStore(
4873
password="guest"
4974
)
5075

51-
# 发送消息
52-
rmq.send(queue_name="test_queue", message="Hello, RabbitMQ!")
53-
54-
# 使用装饰器监听队列
55-
@rmq.listener(queue_name="test_queue")
56-
def process_message(message):
57-
print(f"收到消息: {message.body}")
58-
message.ack() # 确认消息已处理
76+
try:
77+
# 发送消息
78+
rmq.send(queue_name="test_queue", message="Hello, RabbitMQ!")
79+
80+
# 使用装饰器监听队列
81+
@rmq.listener(queue_name="test_queue")
82+
def process_message(message):
83+
print(f"收到消息: {message.body}")
84+
message.ack() # 确认消息已处理
85+
finally:
86+
# 重要:手动关闭连接,避免连接泄漏
87+
rmq.shutdown()
5988
```
6089

6190
### 使用别名
@@ -146,13 +175,50 @@ def process_batch(message):
146175
message.ack()
147176
```
148177

178+
## 连接管理和资源清理
179+
180+
### 重要提醒:避免连接泄漏
181+
182+
在生产环境中,**强烈建议**使用以下方式之一来确保连接被正确关闭:
183+
184+
#### 方式1:上下文管理器(推荐)
185+
186+
```python
187+
with RabbitMQStore() as rmq:
188+
# 你的代码
189+
pass
190+
# 连接自动关闭
191+
```
192+
193+
#### 方式2:显式调用shutdown
194+
195+
```python
196+
rmq = RabbitMQStore()
197+
try:
198+
# 你的代码
199+
pass
200+
finally:
201+
rmq.shutdown() # 确保连接被关闭
202+
```
203+
204+
### 连接泄漏修复
205+
206+
本版本修复了以下连接泄漏问题:
207+
208+
- ✅ 线程安全的连接管理
209+
- ✅ 异常情况下的资源清理
210+
- ✅ 正确的连接和通道关闭顺序
211+
- ✅ 消费者停止机制优化
212+
- ✅ 析构函数异常处理
213+
149214
## 异常处理
150215

151216
该库内置了自动重试和重连机制:
152217

153218
- 连接错误自动重试
154219
- 发送消息失败自动重试
155220
- 消费者断线自动重连
221+
- 资源清理异常处理
156222

157223
## 贡献
158224

example/example_context_manager.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#!/usr/bin/env python3
2+
"""
3+
使用上下文管理器的RabbitMQ示例
4+
5+
这个示例展示了如何使用上下文管理器模式来确保RabbitMQ连接被正确关闭,
6+
避免连接泄漏问题。
7+
"""
8+
9+
import logging
10+
from use_rabbitmq import RabbitMQStore
11+
12+
logging.basicConfig(level=logging.INFO)
13+
14+
def example_with_context_manager():
15+
"""使用上下文管理器的示例"""
16+
print("=== 使用上下文管理器 ===")
17+
18+
# 推荐的使用方式:使用上下文管理器
19+
with RabbitMQStore(
20+
host="localhost",
21+
port=5672,
22+
username="guest",
23+
password="guest"
24+
) as mq:
25+
# 声明队列
26+
mq.declare_queue("test_queue", durable=True)
27+
28+
# 发送消息
29+
message = "Hello from context manager!"
30+
mq.send("test_queue", message)
31+
print(f"发送消息: {message}")
32+
33+
# 获取消息数量
34+
count = mq.get_message_counts("test_queue")
35+
print(f"队列中的消息数量: {count}")
36+
37+
# 清空队列
38+
mq.flush_queue("test_queue")
39+
print("队列已清空")
40+
41+
# 连接会在这里自动关闭
42+
print("连接已自动关闭")
43+
44+
def example_manual_cleanup():
45+
"""手动清理的示例"""
46+
print("\n=== 手动清理资源 ===")
47+
48+
mq = RabbitMQStore(
49+
host="localhost",
50+
port=5672,
51+
username="guest",
52+
password="guest"
53+
)
54+
55+
try:
56+
# 声明队列
57+
mq.declare_queue("test_queue2", durable=True)
58+
59+
# 发送消息
60+
message = "Hello from manual cleanup!"
61+
mq.send("test_queue2", message)
62+
print(f"发送消息: {message}")
63+
64+
# 获取消息数量
65+
count = mq.get_message_counts("test_queue2")
66+
print(f"队列中的消息数量: {count}")
67+
68+
except Exception as e:
69+
print(f"操作失败: {e}")
70+
finally:
71+
# 手动清理资源
72+
mq.shutdown()
73+
print("连接已手动关闭")
74+
75+
def example_multiple_operations():
76+
"""多个操作的示例"""
77+
print("\n=== 多个操作示例 ===")
78+
79+
with RabbitMQStore(
80+
host="localhost",
81+
port=5672,
82+
username="guest",
83+
password="guest"
84+
) as mq:
85+
# 创建多个队列
86+
queues = ["queue1", "queue2", "queue3"]
87+
88+
for queue_name in queues:
89+
mq.declare_queue(queue_name, durable=True)
90+
91+
# 发送多条消息
92+
for i in range(3):
93+
message = f"Message {i+1} to {queue_name}"
94+
mq.send(queue_name, message)
95+
print(f"发送到 {queue_name}: {message}")
96+
97+
# 检查消息数量
98+
count = mq.get_message_counts(queue_name)
99+
print(f"{queue_name} 中有 {count} 条消息")
100+
101+
# 清空队列
102+
mq.flush_queue(queue_name)
103+
print(f"{queue_name} 已清空")
104+
105+
print("所有操作完成,连接已自动关闭")
106+
107+
def example_error_handling():
108+
"""错误处理示例"""
109+
print("\n=== 错误处理示例 ===")
110+
111+
try:
112+
with RabbitMQStore(
113+
host="localhost",
114+
port=5672,
115+
username="guest",
116+
password="guest"
117+
) as mq:
118+
# 正常操作
119+
mq.declare_queue("error_test_queue", durable=True)
120+
mq.send("error_test_queue", "Test message")
121+
122+
# 模拟错误(这里故意引发异常)
123+
# raise ValueError("模拟的错误")
124+
125+
print("操作成功完成")
126+
127+
except Exception as e:
128+
print(f"捕获到异常: {e}")
129+
print("即使出现异常,连接也会被正确关闭")
130+
131+
if __name__ == "__main__":
132+
print("RabbitMQ 上下文管理器示例")
133+
print("=" * 50)
134+
135+
try:
136+
example_with_context_manager()
137+
example_manual_cleanup()
138+
example_multiple_operations()
139+
example_error_handling()
140+
141+
print("\n" + "=" * 50)
142+
print("所有示例完成!")
143+
print("建议使用上下文管理器模式以确保资源正确清理")
144+
while True:
145+
pass
146+
147+
except Exception as e:
148+
print(f"示例运行出错: {e}")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "use-rabbitmq"
3-
version = "0.2.5"
3+
version = "0.2.6"
44
description = ""
55
authors = ["miclon <[email protected]>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)