Skip to content

Commit 54d0715

Browse files
author
DanielePalaia
committed
implement BackOff and MaxReconnectAttempts
1 parent 3f1ac5c commit 54d0715

File tree

11 files changed

+127
-33
lines changed

11 files changed

+127
-33
lines changed

examples/reconnection/reconnection_example.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@
1010
ExchangeToQueueBindingSpecification,
1111
Message,
1212
QuorumQueueSpecification,
13+
RecoveryConfiguration,
1314
)
1415

1516
# here we keep track of the objects we need to reconnect
1617
MESSAGES_TO_PUBLISH = 50000
1718

1819

19-
environment = Environment(uri="amqp://guest:guest@localhost:5672/", reconnect=True)
20+
environment = Environment(
21+
uri="amqp://guest:guest@localhost:5672/",
22+
recovery_configuration=RecoveryConfiguration(active_recovery=True),
23+
)
2024

2125

2226
class MyMessageHandler(AMQPMessagingHandler):

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ExchangeToExchangeBindingSpecification,
1212
ExchangeToQueueBindingSpecification,
1313
OffsetSpecification,
14+
RecoveryConfiguration,
1415
StreamOptions,
1516
)
1617
from .environment import Environment
@@ -87,4 +88,5 @@
8788
"OutcomeState",
8889
"Environment",
8990
"ExchangeCustomSpecification",
91+
"RecoveryConfiguration",
9092
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import logging
2+
import random
3+
import time
4+
from datetime import timedelta
25
from typing import (
36
Annotated,
47
Callable,
@@ -11,10 +14,11 @@
1114

1215
from .address_helper import validate_address
1316
from .consumer import Consumer
14-
from .entities import StreamOptions
17+
from .entities import RecoveryConfiguration, StreamOptions
1518
from .exceptions import ArgumentOutOfRangeException
1619
from .management import Management
1720
from .publisher import Publisher
21+
from .qpid.proton._exceptions import ConnectionException
1822
from .qpid.proton._handlers import MessagingHandler
1923
from .qpid.proton._transport import SSLDomain
2024
from .qpid.proton.utils import BlockingConnection
@@ -53,7 +57,7 @@ def __init__(
5357
ssl_context: Union[
5458
PosixSslConfigurationContext, WinSslConfigurationContext, None
5559
] = None,
56-
reconnect: bool = False,
60+
recovery_configuration: Optional[RecoveryConfiguration] = None,
5761
):
5862
"""
5963
Initialize a new Connection instance.
@@ -80,7 +84,7 @@ def __init__(
8084
PosixSslConfigurationContext, WinSslConfigurationContext, None
8185
] = ssl_context
8286
self._managements: list[Management] = []
83-
self._reconnect = reconnect
87+
self._recovery_configuration = recovery_configuration
8488
self._ssl_domain = None
8589
self._connections = [] # type: ignore
8690
self._index: int = -1
@@ -144,7 +148,10 @@ def dial(self) -> None:
144148
password,
145149
)
146150

147-
if self._reconnect is False:
151+
if (
152+
self._recovery_configuration is None
153+
or self._recovery_configuration.active_recovery is False
154+
):
148155
self._conn = BlockingConnection(
149156
url=self._addr,
150157
urls=self._addrs,
@@ -274,26 +281,53 @@ def _on_disconnection(self) -> None:
274281
if self in self._connections:
275282
self._connections.remove(self)
276283

277-
self._conn = BlockingConnection(
278-
url=self._addr,
279-
urls=self._addrs,
280-
ssl_domain=self._ssl_domain,
281-
on_disconnection_handler=self._on_disconnection,
282-
)
284+
base_delay = self._recovery_configuration.back_off_reconnect_interval # type: ignore
285+
max_delay = timedelta(minutes=1)
286+
287+
for attempt in range(self._recovery_configuration.MaxReconnectAttempts): # type: ignore
288+
289+
jitter = timedelta(milliseconds=random.randint(0, 500))
290+
delay = base_delay + jitter
291+
292+
if delay > max_delay:
293+
delay = max_delay
294+
295+
time.sleep(delay.total_seconds())
283296

284-
self._connections.append(self)
297+
try:
298+
self._conn = BlockingConnection(
299+
url=self._addr,
300+
urls=self._addrs,
301+
ssl_domain=self._ssl_domain,
302+
on_disconnection_handler=self._on_disconnection,
303+
)
304+
305+
self._connections.append(self)
306+
307+
for i, management in enumerate(self._managements):
308+
# Update the broken connection and sender in the management
309+
self._managements[i]._update_connection(self._conn)
285310

286-
for i, management in enumerate(self._managements):
287-
# Update the broken connection and sender in the management
288-
self._managements[i]._update_connection(self._conn)
311+
for i, publisher in enumerate(self._publishers):
312+
# Update the broken connection and sender in the publisher
313+
self._publishers[i]._update_connection(self._conn)
289314

290-
for i, publisher in enumerate(self._publishers):
291-
# Update the broken connection and sender in the publisher
292-
self._publishers[i]._update_connection(self._conn)
315+
for i, consumer in enumerate(self._consumers):
316+
# Update the broken connection and sender in the consumer
317+
self._consumers[i]._update_connection(self._conn)
318+
319+
except ConnectionException as e:
320+
base_delay *= 2
321+
logger.error(
322+
"Reconnection attempt failed",
323+
"attempt",
324+
attempt,
325+
"error",
326+
str(e),
327+
)
328+
continue
293329

294-
for i, consumer in enumerate(self._consumers):
295-
# Update the broken connection and sender in the consumer
296-
self._consumers[i]._update_connection(self._conn)
330+
break
297331

298332
@property
299333
def active_producers(self) -> int:

rabbitmq_amqp_python_client/entities.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from dataclasses import dataclass, field
2+
from datetime import timedelta
23
from enum import Enum
34
from typing import Any, Dict, Optional, Union
45

@@ -208,3 +209,27 @@ def filter_set(self) -> Dict[symbol, Described]:
208209
Dict[symbol, Described]: The current filter set configuration
209210
"""
210211
return self._filter_set
212+
213+
214+
@dataclass
215+
class RecoveryConfiguration:
216+
"""
217+
Configuration options for automatic reconnection.
218+
219+
This dataclass contains parameters to manage automatic reconnection
220+
221+
Attributes:
222+
active_recovery: Define if the recovery is activated. If is not activated the connection will not try to reconnect
223+
back_off_reconnect_interval: the time to wait before trying to createSender after a connection is closed.
224+
time will be increased exponentially with each attempt.
225+
Default is 5 seconds, each attempt will double the time.
226+
The minimum value is 1 second. Avoid setting a value low values since it can cause a high
227+
number of reconnection attempts.
228+
MaxReconnectAttempts: MaxReconnectAttempts The maximum number of reconnection attempts.
229+
Default is 5.
230+
The minimum value is 1.
231+
"""
232+
233+
active_recovery: bool = True
234+
back_off_reconnect_interval: timedelta = timedelta(0.5)
235+
MaxReconnectAttempts: int = 5

rabbitmq_amqp_python_client/environment.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010

1111
from .connection import Connection
12+
from .entities import RecoveryConfiguration
1213
from .ssl_configuration import (
1314
PosixSslConfigurationContext,
1415
WinSslConfigurationContext,
@@ -40,7 +41,7 @@ def __init__(
4041
ssl_context: Union[
4142
PosixSslConfigurationContext, WinSslConfigurationContext, None
4243
] = None,
43-
reconnect: bool = False,
44+
recovery_configuration: Optional[RecoveryConfiguration] = None,
4445
):
4546
"""
4647
Initialize a new Environment instance.
@@ -63,7 +64,7 @@ def __init__(
6364
self._uri = uri
6465
self._uris = uris
6566
self._ssl_context = ssl_context
66-
self._reconnect = reconnect
67+
self._recovery_configuration = recovery_configuration
6768
self._connections: list[Connection] = []
6869

6970
def connection(
@@ -85,7 +86,7 @@ def connection(
8586
uri=self._uri,
8687
uris=self._uris,
8788
ssl_context=self._ssl_context,
88-
reconnect=self._reconnect,
89+
recovery_configuration=self._recovery_configuration,
8990
)
9091
logger.debug("Environment: Creating and returning a new connection")
9192
self._connections.append(connection)

rabbitmq_amqp_python_client/qpid/proton/_tracing.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232

3333
import proton
3434
from proton import Sender as ProtonSender
35-
from proton.handlers import IncomingMessageHandler as ProtonIncomingMessageHandler
36-
from proton.handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler
35+
from proton.handlers import \
36+
IncomingMessageHandler as ProtonIncomingMessageHandler
37+
from proton.handlers import \
38+
OutgoingMessageHandler as ProtonOutgoingMessageHandler
3739

3840
_tracer = None
3941
_trace_key = proton.symbol("x-opt-qpid-tracestate")

rabbitmq_amqp_python_client/qpid/proton/_transport.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@
139139

140140
if TYPE_CHECKING:
141141
from ._condition import Condition
142-
from ._endpoints import Connection # would produce circular import
142+
from ._endpoints import \
143+
Connection # would produce circular import
143144

144145

145146
class TraceAdapter:

tests/conftest.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
PKCS12Store,
1313
PosixClientCert,
1414
PosixSslConfigurationContext,
15+
RecoveryConfiguration,
1516
WinClientCert,
1617
WinSslConfigurationContext,
1718
symbol,
@@ -47,6 +48,21 @@ def connection(pytestconfig):
4748
environment.close()
4849

4950

51+
@pytest.fixture()
52+
def connection_with_reconnect(pytestconfig):
53+
environment = Environment(
54+
uri="amqp://guest:guest@localhost:5672/",
55+
recovery_configuration=RecoveryConfiguration(active_recovery=True),
56+
)
57+
connection = environment.connection()
58+
connection.dial()
59+
try:
60+
yield connection
61+
62+
finally:
63+
environment.close()
64+
65+
5066
@pytest.fixture()
5167
def ssl_context(pytestconfig):
5268
if sys.platform == "win32":

tests/test_connection.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from rabbitmq_amqp_python_client import (
44
ConnectionClosed,
55
Environment,
6+
RecoveryConfiguration,
67
StreamSpecification,
78
)
89

@@ -11,7 +12,6 @@
1112

1213
def on_disconnected():
1314

14-
print("disconnected")
1515
global disconnected
1616
disconnected = True
1717

@@ -73,7 +73,10 @@ def test_connection_reconnection() -> None:
7373

7474
disconnected = False
7575

76-
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
76+
environment = Environment(
77+
"amqp://guest:guest@localhost:5672/",
78+
recovery_configuration=RecoveryConfiguration(active_recovery=True),
79+
)
7780

7881
connection = environment.connection()
7982
connection.dial()

tests/test_publisher.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
Message,
1010
OutcomeState,
1111
QuorumQueueSpecification,
12+
RecoveryConfiguration,
1213
StreamSpecification,
1314
ValidationCodeException,
1415
)
@@ -262,7 +263,10 @@ def test_disconnection_reconnection() -> None:
262263
disconnected = False
263264
generic_exception_raised = False
264265

265-
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
266+
environment = Environment(
267+
"amqp://guest:guest@localhost:5672/",
268+
recovery_configuration=RecoveryConfiguration(active_recovery=True),
269+
)
266270

267271
connection_test = environment.connection()
268272

0 commit comments

Comments
 (0)