1+ #!/usr/bin/env python3
2+ """
3+ 多channel管理示例
4+
5+ 这个示例展示了如何使用多个channel来处理不同的队列操作,
6+ 提高并发性能和资源利用率。
7+ """
8+
9+ import logging
10+ import threading
11+ import time
12+ from use_rabbitmq import RabbitMQStore
13+
14+ logging .basicConfig (level = logging .INFO )
15+
16+ def test_multi_channel_basic ():
17+ """基本的多channel测试"""
18+ print ("=== 基本多channel测试 ===" )
19+
20+ with RabbitMQStore (
21+ host = "localhost" ,
22+ port = 5672 ,
23+ username = "guest" ,
24+ password = "guest"
25+ ) as mq :
26+ # 创建多个channel
27+ channel1_id = mq .create_channel ()
28+ channel2_id = mq .create_channel ()
29+
30+ print (f"创建了两个channel: { channel1_id [:8 ]} ..., { channel2_id [:8 ]} ..." )
31+
32+ # 使用不同的channel声明队列
33+ mq .declare_queue ("queue1" , channel_id = channel1_id )
34+ mq .declare_queue ("queue2" , channel_id = channel2_id )
35+
36+ # 使用不同的channel发送消息
37+ mq .send ("queue1" , "Message to queue1 via channel1" , channel_id = channel1_id )
38+ mq .send ("queue2" , "Message to queue2 via channel2" , channel_id = channel2_id )
39+
40+ # 检查消息数量
41+ count1 = mq .get_message_counts ("queue1" , channel_id = channel1_id )
42+ count2 = mq .get_message_counts ("queue2" , channel_id = channel2_id )
43+
44+ print (f"Queue1 消息数量: { count1 } " )
45+ print (f"Queue2 消息数量: { count2 } " )
46+
47+ # 列出所有channel状态
48+ channels = mq .list_channels ()
49+ print (f"所有channel状态: { channels } " )
50+
51+ # 清理
52+ mq .flush_queue ("queue1" , channel_id = channel1_id )
53+ mq .flush_queue ("queue2" , channel_id = channel2_id )
54+
55+ # 关闭channel
56+ mq .close_channel (channel1_id )
57+ mq .close_channel (channel2_id )
58+
59+ print ("多channel基本测试完成" )
60+
61+ def test_backward_compatibility ():
62+ """测试向后兼容性"""
63+ print ("\n === 向后兼容性测试 ===" )
64+
65+ with RabbitMQStore (
66+ host = "localhost" ,
67+ port = 5672 ,
68+ username = "guest" ,
69+ password = "guest"
70+ ) as mq :
71+ # 使用原有的API(不指定channel_id)
72+ mq .declare_queue ("test_queue" )
73+ mq .send ("test_queue" , "Backward compatibility test message" )
74+
75+ count = mq .get_message_counts ("test_queue" )
76+ print (f"使用默认channel发送消息,队列消息数量: { count } " )
77+
78+ # 清理
79+ mq .flush_queue ("test_queue" )
80+
81+ print ("向后兼容性测试通过" )
82+
83+ def test_concurrent_channels ():
84+ """测试并发使用多个channel"""
85+ print ("\n === 并发channel测试 ===" )
86+
87+ def worker (mq , worker_id , channel_id ):
88+ """工作线程函数"""
89+ queue_name = f"worker_queue_{ worker_id } "
90+
91+ # 声明队列
92+ mq .declare_queue (queue_name , channel_id = channel_id )
93+
94+ # 发送多条消息
95+ for i in range (5 ):
96+ message = f"Worker { worker_id } - Message { i } "
97+ mq .send (queue_name , message , channel_id = channel_id )
98+ time .sleep (0.1 ) # 模拟处理时间
99+
100+ # 检查消息数量
101+ count = mq .get_message_counts (queue_name , channel_id = channel_id )
102+ print (f"Worker { worker_id } (Channel { channel_id [:8 ]} ...) 发送了 { count } 条消息" )
103+
104+ # 清理
105+ mq .flush_queue (queue_name , channel_id = channel_id )
106+
107+ with RabbitMQStore (
108+ host = "localhost" ,
109+ port = 5672 ,
110+ username = "guest" ,
111+ password = "guest"
112+ ) as mq :
113+ # 创建多个channel
114+ channels = []
115+ threads = []
116+
117+ for i in range (3 ):
118+ channel_id = mq .create_channel ()
119+ channels .append (channel_id )
120+
121+ # 创建工作线程
122+ thread = threading .Thread (target = worker , args = (mq , i , channel_id ))
123+ threads .append (thread )
124+ thread .start ()
125+
126+ # 等待所有线程完成
127+ for thread in threads :
128+ thread .join ()
129+
130+ # 关闭所有channel
131+ for channel_id in channels :
132+ mq .close_channel (channel_id )
133+
134+ print ("并发channel测试完成" )
135+
136+ def test_channel_recovery ():
137+ """测试channel恢复机制"""
138+ print ("\n === Channel恢复测试 ===" )
139+
140+ with RabbitMQStore (
141+ host = "localhost" ,
142+ port = 5672 ,
143+ username = "guest" ,
144+ password = "guest"
145+ ) as mq :
146+ # 创建channel
147+ channel_id = mq .create_channel ()
148+
149+ # 使用channel
150+ mq .declare_queue ("recovery_test" , channel_id = channel_id )
151+ mq .send ("recovery_test" , "Test message" , channel_id = channel_id )
152+
153+ print (f"Channel { channel_id [:8 ]} ... 创建并使用成功" )
154+
155+ # 模拟channel状态检查
156+ channels_status = mq .list_channels ()
157+ print (f"Channel状态: { channels_status } " )
158+
159+ # 清理
160+ mq .flush_queue ("recovery_test" , channel_id = channel_id )
161+ mq .close_channel (channel_id )
162+
163+ print ("Channel恢复测试完成" )
164+
165+ if __name__ == "__main__" :
166+ print ("RabbitMQ 多Channel管理示例" )
167+ print ("=" * 50 )
168+
169+ try :
170+ test_multi_channel_basic ()
171+ test_backward_compatibility ()
172+ test_concurrent_channels ()
173+ test_channel_recovery ()
174+
175+ print ("\n " + "=" * 50 )
176+ print ("所有多channel测试完成!" )
177+ print ("新功能特性:" )
178+ print ("1. 支持创建多个channel" )
179+ print ("2. 每个操作可以指定使用的channel" )
180+ print ("3. 保持向后兼容性" )
181+ print ("4. 支持并发使用多个channel" )
182+ print ("5. 自动管理channel生命周期" )
183+
184+ except Exception as e :
185+ print (f"测试运行出错: { e } " )
186+ import traceback
187+ traceback .print_exc ()
0 commit comments