Skip to content

Commit 224c5ae

Browse files
committed
feat: 实现RabbitMQ连接工厂和多channel管理功能
新增RabbitConnectionFactory和ConnectionManager类,支持以下功能: 1. 设置client_name以在RabbitMQ管理界面标识连接 2. 单个连接管理多个channel 3. 线程安全的连接和channel操作 4. 自动重试机制确保连接可靠性 5. 装饰器模式简化消费者代码 添加相关示例、测试和文档说明
1 parent 0a91696 commit 224c5ae

File tree

7 files changed

+1678
-0
lines changed

7 files changed

+1678
-0
lines changed
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
"""
4+
RabbitConnectionFactory 使用示例
5+
6+
本示例展示了如何使用 RabbitConnectionFactory 来管理 RabbitMQ 连接,
7+
特别是如何设置 client_name 和在一个连接上创建多个 channel。
8+
9+
特性演示:
10+
- 设置 client_name(类似Java客户端)
11+
- 一个连接创建多个 channel
12+
- 连接和 channel 的管理
13+
- 线程安全操作
14+
"""
15+
16+
import time
17+
import threading
18+
from use_rabbitmq import (
19+
RabbitConnectionFactory,
20+
rabbitConnectionFactory,
21+
get_default_factory,
22+
create_rabbit_connection,
23+
get_connection_manager,
24+
ConnectionManager
25+
)
26+
27+
28+
def example_client_name():
29+
"""演示设置client_name功能"""
30+
print("=== Client Name 设置示例 ===")
31+
32+
# 创建带有自定义client_name的工厂
33+
factory = RabbitConnectionFactory(client_name="MyPythonApp")
34+
35+
# 获取连接管理器
36+
conn_manager = factory.get_connection_manager("main")
37+
38+
print(f"连接已创建,client_name: MyPythonApp#main")
39+
print("你可以在RabbitMQ管理界面的Connections页面看到这个名称")
40+
41+
42+
# 创建一个channel测试连接
43+
channel = conn_manager.create_channel("test")
44+
channel = conn_manager.create_channel("test2")
45+
channel = conn_manager.create_channel("test4")
46+
print(f"Channel创建成功: {channel}")
47+
48+
# 清理
49+
factory.shutdown_all()
50+
print("连接已关闭")
51+
52+
53+
def example_multiple_channels():
54+
"""演示一个连接创建多个channel"""
55+
print("\n=== 多Channel管理示例 ===")
56+
57+
factory = RabbitConnectionFactory(client_name="MultiChannelApp")
58+
conn_manager = factory.get_connection_manager("production")
59+
60+
# 创建多个不同用途的channel
61+
publisher_channel = conn_manager.create_channel("publisher", confirm_delivery=True)
62+
consumer_channel = conn_manager.create_channel("consumer", confirm_delivery=False)
63+
admin_channel = conn_manager.create_channel("admin", confirm_delivery=False)
64+
65+
print(f"已创建的channels: {conn_manager.list_channels()}")
66+
67+
# 使用不同的channel进行操作
68+
try:
69+
# 声明队列
70+
publisher_channel.queue.declare("test_queue", durable=True)
71+
72+
# 发送消息
73+
publisher_channel.basic.publish(
74+
"Hello from publisher channel!",
75+
"test_queue"
76+
)
77+
print("消息已通过publisher channel发送")
78+
79+
# 获取队列信息
80+
queue_info = admin_channel.queue.declare("test_queue", passive=True)
81+
print(f"队列信息: {queue_info}")
82+
83+
except Exception as e:
84+
print(f"操作出错: {e}")
85+
86+
# 关闭特定channel
87+
conn_manager.close_channel("admin")
88+
print(f"关闭admin channel后的channels: {conn_manager.list_channels()}")
89+
90+
# 清理所有资源
91+
factory.shutdown_all()
92+
print("所有连接和channel已关闭")
93+
94+
95+
def example_connection_reuse():
96+
"""演示连接复用"""
97+
print("\n=== 连接复用示例 ===")
98+
99+
factory = RabbitConnectionFactory(client_name="ReuseApp")
100+
101+
# 获取同名连接管理器(应该返回同一个实例)
102+
manager1 = factory.get_connection_manager("shared")
103+
manager2 = factory.get_connection_manager("shared")
104+
105+
print(f"manager1 和 manager2 是同一个实例: {manager1 is manager2}")
106+
107+
# 在同一个连接上创建不同的channel
108+
channel1 = manager1.create_channel("worker1")
109+
channel2 = manager2.create_channel("worker2")
110+
111+
print(f"两个channel都来自同一个连接管理器: {manager1 is manager2}")
112+
print(f"当前channels: {manager1.list_channels()}")
113+
114+
factory.shutdown_all()
115+
116+
117+
def example_thread_safety():
118+
"""演示线程安全操作"""
119+
print("\n=== 线程安全示例 ===")
120+
121+
factory = RabbitConnectionFactory(client_name="ThreadSafeApp")
122+
conn_manager = factory.get_connection_manager("threaded")
123+
124+
def worker(worker_id):
125+
"""工作线程函数"""
126+
try:
127+
# 每个线程创建自己的channel
128+
channel = conn_manager.create_channel(f"worker_{worker_id}")
129+
130+
# 声明队列
131+
queue_name = f"queue_{worker_id}"
132+
channel.queue.declare(queue_name, durable=True)
133+
134+
# 发送消息
135+
for i in range(3):
136+
message = f"Message {i} from worker {worker_id}"
137+
channel.basic.publish(message, queue_name)
138+
139+
print(f"Worker {worker_id} 完成任务")
140+
141+
except Exception as e:
142+
print(f"Worker {worker_id} 出错: {e}")
143+
144+
# 创建多个线程
145+
threads = []
146+
for i in range(3):
147+
thread = threading.Thread(target=worker, args=(i,))
148+
threads.append(thread)
149+
thread.start()
150+
151+
# 等待所有线程完成
152+
for thread in threads:
153+
thread.join()
154+
155+
print(f"所有线程完成,当前channels: {conn_manager.list_channels()}")
156+
157+
factory.shutdown_all()
158+
159+
160+
def example_context_manager():
161+
"""演示上下文管理器使用"""
162+
print("\n=== 上下文管理器示例 ===")
163+
164+
# 工厂级别的上下文管理器
165+
with RabbitConnectionFactory(client_name="ContextApp") as factory:
166+
conn_manager = factory.get_connection_manager("context_test")
167+
168+
# 连接管理器级别的上下文管理器
169+
with conn_manager as manager:
170+
channel1 = manager.create_channel("temp1")
171+
channel2 = manager.create_channel("temp2")
172+
173+
print(f"在上下文中创建的channels: {manager.list_channels()}")
174+
175+
# 进行一些操作
176+
channel1.queue.declare("temp_queue")
177+
channel1.basic.publish("Test message", "temp_queue")
178+
179+
print("连接管理器上下文退出,资源已自动清理")
180+
181+
print("工厂上下文退出,所有资源已自动清理")
182+
183+
184+
def example_error_handling():
185+
"""演示错误处理"""
186+
print("\n=== 错误处理示例 ===")
187+
188+
factory = RabbitConnectionFactory(client_name="ErrorHandlingApp")
189+
190+
try:
191+
# 使用错误的配置
192+
conn_manager = factory.get_connection_manager(
193+
"error_test",
194+
host="nonexistent_host",
195+
port=9999
196+
)
197+
198+
# 尝试创建channel(这会触发连接)
199+
channel = conn_manager.create_channel("test")
200+
201+
except Exception as e:
202+
print(f"预期的连接错误: {e}")
203+
204+
# 测试已关闭的管理器
205+
conn_manager = factory.get_connection_manager("normal")
206+
conn_manager.shutdown()
207+
208+
try:
209+
# 尝试在已关闭的管理器上创建channel
210+
channel = conn_manager.create_channel("test")
211+
except RuntimeError as e:
212+
print(f"预期的运行时错误: {e}")
213+
214+
factory.shutdown_all()
215+
216+
217+
def example_advanced_usage():
218+
"""演示高级用法"""
219+
print("\n=== 高级用法示例 ===")
220+
221+
# 使用自定义配置创建工厂
222+
custom_config = {
223+
"host": "localhost",
224+
"port": 5672,
225+
"username": "guest",
226+
"password": "guest",
227+
"heartbeat": 60,
228+
"virtual_host": "/"
229+
}
230+
231+
factory = RabbitConnectionFactory(
232+
client_name="AdvancedApp",
233+
default_config=custom_config
234+
)
235+
236+
# 创建不同环境的连接
237+
prod_manager = factory.get_connection_manager("production")
238+
dev_manager = factory.get_connection_manager(
239+
"development",
240+
virtual_host="/dev"
241+
)
242+
243+
print(f"活跃连接: {factory.list_connections()}")
244+
245+
# 为不同用途创建专门的channel
246+
# 生产环境
247+
prod_publisher = prod_manager.create_channel("publisher")
248+
prod_consumer = prod_manager.create_channel("consumer")
249+
250+
# 开发环境
251+
dev_tester = dev_manager.create_channel("tester")
252+
253+
print(f"生产环境channels: {prod_manager.list_channels()}")
254+
print(f"开发环境channels: {dev_manager.list_channels()}")
255+
256+
# 选择性关闭
257+
factory.remove_connection_manager("development")
258+
print(f"关闭开发环境后的连接: {factory.list_connections()}")
259+
260+
factory.shutdown_all()
261+
262+
263+
if __name__ == "__main__":
264+
print("RabbitConnectionFactory 高级功能演示")
265+
print("=" * 60)
266+
267+
try:
268+
example_client_name()
269+
example_multiple_channels()
270+
example_connection_reuse()
271+
example_thread_safety()
272+
example_context_manager()
273+
example_error_handling()
274+
example_advanced_usage()
275+
276+
print("\n所有示例执行完成!")
277+
print("\n主要特性总结:")
278+
print("1. ✅ 支持设置client_name,在RabbitMQ管理界面可见")
279+
print("2. ✅ 一个连接可以创建多个channel")
280+
print("3. ✅ 连接和channel的复用管理")
281+
print("4. ✅ 线程安全操作")
282+
print("5. ✅ 上下文管理器自动资源清理")
283+
print("6. ✅ 完善的错误处理")
284+
285+
except Exception as e:
286+
print(f"示例执行出错: {e}")
287+
print("请确保 RabbitMQ 服务正在运行,并且连接配置正确。")
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import logging
2+
import time
3+
from use_rabbitmq import RabbitConnectionFactory, get_connection_manager
4+
5+
logging.basicConfig(level=logging.INFO)
6+
7+
# 方式1: 使用RabbitConnectionFactory的装饰器
8+
factory = RabbitConnectionFactory(
9+
client_name="MultiChannelApp",
10+
default_config={
11+
"host": "localhost",
12+
"port": 5672,
13+
"username": "admin",
14+
"password": "admin",
15+
}
16+
)
17+
18+
# 在默认连接的不同channel上监听不同队列
19+
@factory.listener(queue_name="orders", channel_name="order_consumer", connection_name="default")
20+
def handle_orders(message):
21+
print(f"[Order Consumer] Received order: {message.body}")
22+
message.ack()
23+
24+
@factory.listener(queue_name="notifications", channel_name="notification_consumer", connection_name="default")
25+
def handle_notifications(message):
26+
print(f"[Notification Consumer] Received notification: {message.body}")
27+
message.ack()
28+
29+
@factory.listener(queue_name="logs", channel_name="log_consumer", connection_name="default")
30+
def handle_logs(message):
31+
print(f"[Log Consumer] Received log: {message.body}")
32+
message.ack()
33+
34+
# 方式2: 使用ConnectionManager的装饰器
35+
conn_manager = get_connection_manager("app_connection",
36+
host="localhost",
37+
port=5672,
38+
username="admin",
39+
password="admin")
40+
41+
@conn_manager.listener(queue_name="tasks", channel_name="task_consumer")
42+
def handle_tasks(message):
43+
print(f"[Task Consumer] Processing task: {message.body}")
44+
# 模拟任务处理
45+
time.sleep(1)
46+
message.ack()
47+
48+
@conn_manager.listener(queue_name="events", channel_name="event_consumer")
49+
def handle_events(message):
50+
print(f"[Event Consumer] Processing event: {message.body}")
51+
message.ack()
52+
53+
if __name__ == "__main__":
54+
print("Starting multi-channel consumers...")
55+
print("Factory connections:", factory.list_connections())
56+
print("Connection manager channels:", conn_manager.list_channels())
57+
58+
# 发送一些测试消息
59+
try:
60+
# 使用publisher channel发送消息
61+
publisher_channel = conn_manager.create_channel("publisher")
62+
63+
# 声明队列
64+
for queue in ["orders", "notifications", "logs", "tasks", "events"]:
65+
publisher_channel.queue.declare(queue, durable=True)
66+
67+
# 发送测试消息
68+
test_messages = {
69+
"orders": "New order #12345",
70+
"notifications": "User login notification",
71+
"logs": "Application started",
72+
"tasks": "Process payment for order #12345",
73+
"events": "User registered event"
74+
}
75+
76+
for queue, message in test_messages.items():
77+
publisher_channel.basic.publish(
78+
body=message,
79+
routing_key=queue,
80+
properties={'delivery_mode': 2} # 持久化消息
81+
)
82+
print(f"Published to {queue}: {message}")
83+
84+
# 让消费者运行一段时间
85+
print("\nConsumers are running. Press Ctrl+C to stop...")
86+
time.sleep(3000)
87+
88+
except KeyboardInterrupt:
89+
print("\nShutting down...")
90+
finally:
91+
# 清理资源
92+
factory.shutdown_all()
93+
conn_manager.shutdown()
94+
print("All connections closed.")

0 commit comments

Comments
 (0)