Skip to content

Commit cf5e60b

Browse files
committed
feat(rabbitmq): 添加多channel管理功能支持
实现RabbitMQ连接的多channel管理,提高并发性能和资源利用率 - 新增create_channel/close_channel/list_channels等接口 - 支持在队列操作中指定使用特定channel - 保持原有API的向后兼容性 - 添加多channel使用示例和测试用例
1 parent 8251414 commit cf5e60b

File tree

2 files changed

+376
-70
lines changed

2 files changed

+376
-70
lines changed

example/example_multi_channel.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#!/usr/bin/env python3
2+
"""
3+
多channel管理示例
4+
5+
这个示例展示了如何使用多个channel来处理不同的队列操作,
6+
提高并发性能和资源利用率。
7+
"""
8+
9+
import logging
10+
import threading
11+
import time
12+
from use_rabbitmq import RabbitMQStore
13+
14+
logging.basicConfig(level=logging.INFO)
15+
16+
def test_multi_channel_basic():
17+
"""基本的多channel测试"""
18+
print("=== 基本多channel测试 ===")
19+
20+
with RabbitMQStore(
21+
host="localhost",
22+
port=5672,
23+
username="guest",
24+
password="guest"
25+
) as mq:
26+
# 创建多个channel
27+
channel1_id = mq.create_channel()
28+
channel2_id = mq.create_channel()
29+
30+
print(f"创建了两个channel: {channel1_id[:8]}..., {channel2_id[:8]}...")
31+
32+
# 使用不同的channel声明队列
33+
mq.declare_queue("queue1", channel_id=channel1_id)
34+
mq.declare_queue("queue2", channel_id=channel2_id)
35+
36+
# 使用不同的channel发送消息
37+
mq.send("queue1", "Message to queue1 via channel1", channel_id=channel1_id)
38+
mq.send("queue2", "Message to queue2 via channel2", channel_id=channel2_id)
39+
40+
# 检查消息数量
41+
count1 = mq.get_message_counts("queue1", channel_id=channel1_id)
42+
count2 = mq.get_message_counts("queue2", channel_id=channel2_id)
43+
44+
print(f"Queue1 消息数量: {count1}")
45+
print(f"Queue2 消息数量: {count2}")
46+
47+
# 列出所有channel状态
48+
channels = mq.list_channels()
49+
print(f"所有channel状态: {channels}")
50+
51+
# 清理
52+
mq.flush_queue("queue1", channel_id=channel1_id)
53+
mq.flush_queue("queue2", channel_id=channel2_id)
54+
55+
# 关闭channel
56+
mq.close_channel(channel1_id)
57+
mq.close_channel(channel2_id)
58+
59+
print("多channel基本测试完成")
60+
61+
def test_backward_compatibility():
62+
"""测试向后兼容性"""
63+
print("\n=== 向后兼容性测试 ===")
64+
65+
with RabbitMQStore(
66+
host="localhost",
67+
port=5672,
68+
username="guest",
69+
password="guest"
70+
) as mq:
71+
# 使用原有的API(不指定channel_id)
72+
mq.declare_queue("test_queue")
73+
mq.send("test_queue", "Backward compatibility test message")
74+
75+
count = mq.get_message_counts("test_queue")
76+
print(f"使用默认channel发送消息,队列消息数量: {count}")
77+
78+
# 清理
79+
mq.flush_queue("test_queue")
80+
81+
print("向后兼容性测试通过")
82+
83+
def test_concurrent_channels():
84+
"""测试并发使用多个channel"""
85+
print("\n=== 并发channel测试 ===")
86+
87+
def worker(mq, worker_id, channel_id):
88+
"""工作线程函数"""
89+
queue_name = f"worker_queue_{worker_id}"
90+
91+
# 声明队列
92+
mq.declare_queue(queue_name, channel_id=channel_id)
93+
94+
# 发送多条消息
95+
for i in range(5):
96+
message = f"Worker {worker_id} - Message {i}"
97+
mq.send(queue_name, message, channel_id=channel_id)
98+
time.sleep(0.1) # 模拟处理时间
99+
100+
# 检查消息数量
101+
count = mq.get_message_counts(queue_name, channel_id=channel_id)
102+
print(f"Worker {worker_id} (Channel {channel_id[:8]}...) 发送了 {count} 条消息")
103+
104+
# 清理
105+
mq.flush_queue(queue_name, channel_id=channel_id)
106+
107+
with RabbitMQStore(
108+
host="localhost",
109+
port=5672,
110+
username="guest",
111+
password="guest"
112+
) as mq:
113+
# 创建多个channel
114+
channels = []
115+
threads = []
116+
117+
for i in range(3):
118+
channel_id = mq.create_channel()
119+
channels.append(channel_id)
120+
121+
# 创建工作线程
122+
thread = threading.Thread(target=worker, args=(mq, i, channel_id))
123+
threads.append(thread)
124+
thread.start()
125+
126+
# 等待所有线程完成
127+
for thread in threads:
128+
thread.join()
129+
130+
# 关闭所有channel
131+
for channel_id in channels:
132+
mq.close_channel(channel_id)
133+
134+
print("并发channel测试完成")
135+
136+
def test_channel_recovery():
137+
"""测试channel恢复机制"""
138+
print("\n=== Channel恢复测试 ===")
139+
140+
with RabbitMQStore(
141+
host="localhost",
142+
port=5672,
143+
username="guest",
144+
password="guest"
145+
) as mq:
146+
# 创建channel
147+
channel_id = mq.create_channel()
148+
149+
# 使用channel
150+
mq.declare_queue("recovery_test", channel_id=channel_id)
151+
mq.send("recovery_test", "Test message", channel_id=channel_id)
152+
153+
print(f"Channel {channel_id[:8]}... 创建并使用成功")
154+
155+
# 模拟channel状态检查
156+
channels_status = mq.list_channels()
157+
print(f"Channel状态: {channels_status}")
158+
159+
# 清理
160+
mq.flush_queue("recovery_test", channel_id=channel_id)
161+
mq.close_channel(channel_id)
162+
163+
print("Channel恢复测试完成")
164+
165+
if __name__ == "__main__":
166+
print("RabbitMQ 多Channel管理示例")
167+
print("=" * 50)
168+
169+
try:
170+
test_multi_channel_basic()
171+
test_backward_compatibility()
172+
test_concurrent_channels()
173+
test_channel_recovery()
174+
175+
print("\n" + "=" * 50)
176+
print("所有多channel测试完成!")
177+
print("新功能特性:")
178+
print("1. 支持创建多个channel")
179+
print("2. 每个操作可以指定使用的channel")
180+
print("3. 保持向后兼容性")
181+
print("4. 支持并发使用多个channel")
182+
print("5. 自动管理channel生命周期")
183+
184+
except Exception as e:
185+
print(f"测试运行出错: {e}")
186+
import traceback
187+
traceback.print_exc()

0 commit comments

Comments
 (0)