Skip to content

Commit bca0fef

Browse files
author
DanielePalaia
committed
improving disconnection management
1 parent 4980be8 commit bca0fef

File tree

9 files changed

+119
-160
lines changed

9 files changed

+119
-160
lines changed

examples/getting_started/getting_started.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self):
2323
super().__init__()
2424
self._count = 0
2525

26-
def on_message(self, event: Event):
26+
def on_amqp_message(self, event: Event):
2727
print("received message: " + str(event.message.body))
2828

2929
# accepting
@@ -147,7 +147,7 @@ def main() -> None:
147147
consumer.close()
148148
# once we finish consuming if we close the connection we need to create a new one
149149
# connection = create_connection()
150-
# management = connection.management()
150+
management = connection.management()
151151

152152
print("unbind")
153153
management.unbind(bind_name)

examples/reconnection/reconnection_example.py

Lines changed: 31 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,22 @@
11
# type: ignore
2-
3-
4-
import time
5-
from dataclasses import dataclass
6-
from typing import Optional
7-
82
from rabbitmq_amqp_python_client import (
93
AddressHelper,
104
AMQPMessagingHandler,
115
Connection,
126
ConnectionClosed,
13-
Consumer,
147
Environment,
158
Event,
169
ExchangeSpecification,
1710
ExchangeToQueueBindingSpecification,
18-
Management,
1911
Message,
20-
Publisher,
2112
QuorumQueueSpecification,
2213
)
2314

24-
2515
# here we keep track of the objects we need to reconnect
26-
@dataclass
27-
class ConnectionConfiguration:
28-
connection: Optional[Connection] = None
29-
management: Optional[Management] = None
30-
publisher: Optional[Publisher] = None
31-
consumer: Optional[Consumer] = None
32-
16+
MESSAGES_TO_PUBLISH = 50000
3317

34-
connection_configuration = ConnectionConfiguration()
35-
MESSAGES_TO_PUBLSH = 50000
3618

37-
38-
# disconnection callback
39-
# here you can cleanup or reconnect
40-
def on_disconnection():
41-
42-
print("disconnected")
43-
global environment
44-
exchange_name = "test-exchange"
45-
queue_name = "example-queue"
46-
routing_key = "routing-key"
47-
48-
global connection_configuration
49-
50-
addr = AddressHelper.exchange_address(exchange_name, routing_key)
51-
addr_queue = AddressHelper.queue_address(queue_name)
52-
53-
if connection_configuration.connection is not None:
54-
connection_configuration.connection = create_connection()
55-
if connection_configuration.management is not None:
56-
connection_configuration.management = (
57-
connection_configuration.connection.management()
58-
)
59-
if connection_configuration.publisher is not None:
60-
connection_configuration.publisher = (
61-
connection_configuration.connection.publisher(addr)
62-
)
63-
if connection_configuration.consumer is not None:
64-
connection_configuration.consumer = (
65-
connection_configuration.connection.consumer(
66-
addr_queue, message_handler=MyMessageHandler()
67-
)
68-
)
69-
70-
71-
environment = Environment(
72-
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
73-
)
19+
environment = Environment(uri="amqp://guest:guest@localhost:5672/", reconnect=True)
7420

7521

7622
class MyMessageHandler(AMQPMessagingHandler):
@@ -102,7 +48,7 @@ def on_message(self, event: Event):
10248

10349
self._count = self._count + 1
10450

105-
if self._count == MESSAGES_TO_PUBLSH:
51+
if self._count == MESSAGES_TO_PUBLISH:
10652
print("closing receiver")
10753
# if you want you can add cleanup operations here
10854

@@ -136,29 +82,22 @@ def main() -> None:
13682
queue_name = "example-queue"
13783
routing_key = "routing-key"
13884

139-
global connection_configuration
140-
14185
print("connection to amqp server")
142-
if connection_configuration.connection is None:
143-
connection_configuration.connection = create_connection()
144-
145-
if connection_configuration.management is None:
146-
connection_configuration.management = (
147-
connection_configuration.connection.management()
148-
)
86+
connection = create_connection()
87+
management = connection.management()
88+
publisher = None
89+
consumer = None
14990

15091
print("declaring exchange and queue")
151-
connection_configuration.management.declare_exchange(
152-
ExchangeSpecification(name=exchange_name)
153-
)
92+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
15493

155-
connection_configuration.management.declare_queue(
94+
management.declare_queue(
15695
QuorumQueueSpecification(name=queue_name)
15796
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
15897
)
15998

16099
print("binding queue to exchange")
161-
bind_name = connection_configuration.management.bind(
100+
bind_name = management.bind(
162101
ExchangeToQueueBindingSpecification(
163102
source_exchange=exchange_name,
164103
destination_queue=queue_name,
@@ -171,34 +110,32 @@ def main() -> None:
171110
addr_queue = AddressHelper.queue_address(queue_name)
172111

173112
print("create a publisher and publish a test message")
174-
if connection_configuration.publisher is None:
175-
connection_configuration.publisher = (
176-
connection_configuration.connection.publisher(addr)
177-
)
113+
if publisher is None:
114+
publisher = connection.publisher(addr)
178115

179116
print("purging the queue")
180-
messages_purged = connection_configuration.management.purge_queue(queue_name)
117+
messages_purged = management.purge_queue(queue_name)
181118

182119
print("messages purged: " + str(messages_purged))
183-
# management.close()
184120

185121
# publishing messages
186122
while True:
187-
for i in range(MESSAGES_TO_PUBLSH):
123+
for i in range(MESSAGES_TO_PUBLISH):
188124

189125
if i % 1000 == 0:
190126
print("published 1000 messages...")
191127
try:
192-
if connection_configuration.publisher is not None:
193-
connection_configuration.publisher.publish(Message(body="test"))
128+
if publisher is not None:
129+
publisher.publish(Message(body="test"))
194130
except ConnectionClosed:
195131
print("publisher closing exception, resubmitting")
132+
publisher = connection.publisher(addr)
196133
continue
197134

198135
print("closing publisher")
199136
try:
200-
if connection_configuration.publisher is not None:
201-
connection_configuration.publisher.close()
137+
if publisher is not None:
138+
publisher.close()
202139
except ConnectionClosed:
203140
print("publisher closing exception, resubmitting")
204141
continue
@@ -207,43 +144,39 @@ def main() -> None:
207144
print(
208145
"create a consumer and consume the test message - press control + c to terminate to consume"
209146
)
210-
if connection_configuration.consumer is None:
211-
connection_configuration.consumer = (
212-
connection_configuration.connection.consumer(
213-
addr_queue, message_handler=MyMessageHandler()
214-
)
215-
)
147+
if consumer is None:
148+
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())
216149

217150
while True:
218151
try:
219-
connection_configuration.consumer.run()
152+
consumer.run()
220153
except KeyboardInterrupt:
221154
pass
222155
except ConnectionClosed:
223-
time.sleep(1)
156+
consumer = connection.consumer(
157+
addr_queue, message_handler=MyMessageHandler()
158+
)
224159
continue
225160
except Exception as e:
226161
print("consumer exited for exception " + str(e))
227162

228163
break
229164

230165
print("cleanup")
231-
connection_configuration.consumer.close()
232-
# once we finish consuming if we close the connection we need to create a new one
233-
# connection = create_connection()
234-
# management = connection.management()
166+
consumer.close()
235167

168+
management = connection.management()
236169
print("unbind")
237-
connection_configuration.management.unbind(bind_name)
170+
management.unbind(bind_name)
238171

239172
print("delete queue")
240-
connection_configuration.management.delete_queue(queue_name)
173+
management.delete_queue(queue_name)
241174

242175
print("delete exchange")
243-
connection_configuration.management.delete_exchange(exchange_name)
176+
management.delete_exchange(exchange_name)
244177

245178
print("closing connections")
246-
connection_configuration.management.close()
179+
management.close()
247180
print("after management closing")
248181
environment.close()
249182
print("after connection closing")

rabbitmq_amqp_python_client/amqp_consumer_handler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .delivery_context import DeliveryContext
2+
from .qpid.proton._events import Event
23
from .qpid.proton.handlers import MessagingHandler
34

45
"""
@@ -20,3 +21,10 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
2021
"""
2122
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
2223
self.delivery_context: DeliveryContext = DeliveryContext()
24+
25+
def on_amqp_message(self, event: Event) -> None:
26+
pass
27+
28+
def on_message(self, event: Event) -> None:
29+
print("first level callback")
30+
self.on_amqp_message(event)

rabbitmq_amqp_python_client/connection.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def __init__(
3535
# multi-node mode
3636
uris: Optional[list[str]] = None,
3737
ssl_context: Optional[SslConfigurationContext] = None,
38-
on_disconnection_handler: Optional[CB] = None, # type: ignore
38+
reconnect: bool = False,
3939
):
4040
"""
4141
Initialize a new Connection instance.
@@ -59,11 +59,13 @@ def __init__(
5959
self._addrs: Optional[list[str]] = uris
6060
self._conn: BlockingConnection
6161
self._management: Management
62-
self._on_disconnection_handler = on_disconnection_handler
62+
self._reconnect = reconnect
6363
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
6464
self._ssl_domain = None
6565
self._connections = [] # type: ignore
6666
self._index: int = -1
67+
self._publishers: list[Publisher] = []
68+
self._consumers: list[Consumer] = []
6769

6870
def _set_environment_connection_list(self, connections: []): # type: ignore
6971
self._connections = connections
@@ -91,12 +93,21 @@ def dial(self) -> None:
9193
self._conf_ssl_context.client_cert.client_key,
9294
self._conf_ssl_context.client_cert.password,
9395
)
94-
self._conn = BlockingConnection(
95-
url=self._addr,
96-
urls=self._addrs,
97-
ssl_domain=self._ssl_domain,
98-
on_disconnection_handler=self._on_disconnection_handler,
99-
)
96+
97+
if self._reconnect is False:
98+
self._conn = BlockingConnection(
99+
url=self._addr,
100+
urls=self._addrs,
101+
ssl_domain=self._ssl_domain,
102+
)
103+
else:
104+
self._conn = BlockingConnection(
105+
url=self._addr,
106+
urls=self._addrs,
107+
ssl_domain=self._ssl_domain,
108+
on_disconnection_handler=self._on_disconnection,
109+
)
110+
100111
self._open()
101112
logger.debug("Connection to the server established")
102113

@@ -122,6 +133,10 @@ def close(self) -> None:
122133
"""
123134
logger.debug("Closing connection")
124135
try:
136+
for publisher in self._publishers:
137+
publisher.close()
138+
for consumer in self._consumers:
139+
consumer.close()
125140
self._conn.close()
126141
except Exception as e:
127142
logger.error(f"Error closing connection: {e}")
@@ -150,7 +165,8 @@ def publisher(self, destination: str = "") -> Publisher:
150165
"destination address must start with /queues or /exchanges"
151166
)
152167
publisher = Publisher(self._conn, destination)
153-
return publisher
168+
self._publishers.append(publisher)
169+
return self._publishers[self._publishers.index(publisher)]
154170

155171
def consumer(
156172
self,
@@ -181,4 +197,31 @@ def consumer(
181197
consumer = Consumer(
182198
self._conn, destination, message_handler, stream_filter_options, credit
183199
)
200+
self._consumers.append(consumer)
184201
return consumer
202+
203+
def _on_disconnection(self) -> None:
204+
205+
print("disconnected")
206+
207+
if self in self._connections:
208+
self._connections.remove(self)
209+
210+
print("reconnecting")
211+
self._conn = BlockingConnection(
212+
url=self._addr,
213+
urls=self._addrs,
214+
ssl_domain=self._ssl_domain,
215+
on_disconnection_handler=self._on_disconnection,
216+
)
217+
self._open()
218+
self._connections.append(self)
219+
220+
for index, publisher in enumerate(self._publishers):
221+
# publisher = self._publishers.pop(index)
222+
# address = publisher.address
223+
self._publishers.remove(publisher)
224+
# self._publishers.insert(index, Publisher(self._conn, address))
225+
226+
for i, consumer in enumerate(self._consumers):
227+
self._consumers.remove(consumer)

0 commit comments

Comments
 (0)