Skip to content

Commit 2eb822c

Browse files
DanielePalaiaDanielePalaiaGsantomaggio
authored
improve binding/unbinding operarations (#44)
* improve binding/unbinding operarations * fixes for code review * Typo and remove two warnings Signed-off-by: Gabriele Santomaggio <[email protected]> --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: DanielePalaia <daniele985@@gmail.com> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent d6722a0 commit 2eb822c

File tree

9 files changed

+284
-41
lines changed

9 files changed

+284
-41
lines changed

examples/getting_started/getting_started.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55
AddressHelper,
66
AMQPMessagingHandler,
7-
BindingSpecification,
87
Connection,
98
Environment,
109
Event,
1110
ExchangeSpecification,
11+
ExchangeToQueueBindingSpecification,
1212
Message,
1313
OutcomeState,
1414
QuorumQueueSpecification,
@@ -102,7 +102,7 @@ def main() -> None:
102102

103103
print("binding queue to exchange")
104104
bind_name = management.bind(
105-
BindingSpecification(
105+
ExchangeToQueueBindingSpecification(
106106
source_exchange=exchange_name,
107107
destination_queue=queue_name,
108108
binding_key=routing_key,

examples/reconnection/reconnection_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
from rabbitmq_amqp_python_client import (
99
AddressHelper,
1010
AMQPMessagingHandler,
11-
BindingSpecification,
1211
Connection,
1312
ConnectionClosed,
1413
Consumer,
1514
Environment,
1615
Event,
1716
ExchangeSpecification,
17+
ExchangeToQueueBindingSpecification,
1818
Management,
1919
Message,
2020
Publisher,
@@ -160,7 +160,7 @@ def main() -> None:
160160

161161
print("binding queue to exchange")
162162
bind_name = connection_configuration.management.bind(
163-
BindingSpecification(
163+
ExchangeToQueueBindingSpecification(
164164
source_exchange=exchange_name,
165165
destination_queue=queue_name,
166166
binding_key=routing_key,

examples/tls/tls_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55
AddressHelper,
66
AMQPMessagingHandler,
7-
BindingSpecification,
87
ClientCert,
98
Connection,
109
Environment,
1110
Event,
1211
ExchangeSpecification,
12+
ExchangeToQueueBindingSpecification,
1313
Message,
1414
QuorumQueueSpecification,
1515
SslConfigurationContext,
@@ -102,7 +102,7 @@ def main() -> None:
102102

103103
print("binding queue to exchange")
104104
bind_name = management.bind(
105-
BindingSpecification(
105+
ExchangeToQueueBindingSpecification(
106106
source_exchange=exchange_name,
107107
destination_queue=queue_name,
108108
binding_key=routing_key,

rabbitmq_amqp_python_client/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from .connection import Connection
77
from .consumer import Consumer
88
from .entities import (
9-
BindingSpecification,
109
ExchangeSpecification,
10+
ExchangeToExchangeBindingSpecification,
11+
ExchangeToQueueBindingSpecification,
1112
OffsetSpecification,
1213
StreamOptions,
1314
)
@@ -52,7 +53,8 @@
5253
"QuorumQueueSpecification",
5354
"ClassicQueueSpecification",
5455
"StreamSpecification",
55-
"BindingSpecification",
56+
"ExchangeToQueueBindingSpecification",
57+
"ExchangeToExchangeBindingSpecification",
5658
"QueueType",
5759
"Publisher",
5860
"Message",

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
from .entities import BindingSpecification
1+
from typing import Optional
2+
3+
from .entities import (
4+
ExchangeToExchangeBindingSpecification,
5+
ExchangeToQueueBindingSpecification,
6+
)
27
from .qpid.proton._message import Message
38

49

@@ -7,19 +12,22 @@ def _is_unreserved(char: str) -> bool:
712
return char.isalnum() or char in "-._~"
813

914

10-
def encode_path_segment(input_string: str) -> str:
15+
def encode_path_segment(input_string: Optional[str]) -> str:
1116
encoded = []
1217

1318
# Iterate over each character in the input string
14-
for char in input_string:
15-
# Check if the character is an unreserved character
16-
if _is_unreserved(char):
17-
encoded.append(char) # Append as is
18-
else:
19-
# Encode character to %HH format
20-
encoded.append(f"%{ord(char):02X}")
19+
if input_string is not None:
20+
for char in input_string:
21+
# Check if the character is an unreserved character
22+
if _is_unreserved(char):
23+
encoded.append(char) # Append as is
24+
else:
25+
# Encode character to %HH format
26+
encoded.append(f"%{ord(char):02X}")
27+
28+
return "".join(encoded)
2129

22-
return "".join(encoded)
30+
return ""
2331

2432

2533
class AddressHelper:
@@ -58,8 +66,13 @@ def path_address() -> str:
5866

5967
@staticmethod
6068
def binding_path_with_exchange_queue(
61-
bind_specification: BindingSpecification,
69+
bind_specification: ExchangeToQueueBindingSpecification,
6270
) -> str:
71+
if bind_specification.binding_key is not None:
72+
key = ";key=" + encode_path_segment(bind_specification.binding_key)
73+
else:
74+
key = ";key="
75+
6376
binding_path_wth_exchange_queue_key = (
6477
"/bindings"
6578
+ "/"
@@ -68,11 +81,28 @@ def binding_path_with_exchange_queue(
6881
+ ";"
6982
+ "dstq="
7083
+ encode_path_segment(bind_specification.destination_queue)
84+
+ key
85+
+ ";args="
86+
)
87+
return binding_path_wth_exchange_queue_key
88+
89+
@staticmethod
90+
def binding_path_with_exchange_exchange(
91+
bind_specification: ExchangeToExchangeBindingSpecification,
92+
) -> str:
93+
binding_path_wth_exchange_exchange_key = (
94+
"/bindings"
95+
+ "/"
96+
+ "src="
97+
+ encode_path_segment(bind_specification.source_exchange)
98+
+ ";"
99+
+ "dstq="
100+
+ encode_path_segment(bind_specification.destination_exchange)
71101
+ ";key="
72102
+ encode_path_segment(bind_specification.binding_key)
73103
+ ";args="
74104
)
75-
return binding_path_wth_exchange_queue_key
105+
return binding_path_wth_exchange_exchange_key
76106

77107
@staticmethod
78108
def message_to_address_helper(message: Message, address: str) -> Message:

rabbitmq_amqp_python_client/entities.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,32 @@ class OffsetSpecification(Enum):
4141

4242

4343
@dataclass
44-
class BindingSpecification:
44+
class ExchangeToQueueBindingSpecification:
4545
source_exchange: str
4646
destination_queue: str
47-
binding_key: str
47+
binding_key: Optional[str] = None
48+
49+
50+
@dataclass
51+
class ExchangeToExchangeBindingSpecification:
52+
source_exchange: str
53+
destination_exchange: str
54+
binding_key: Optional[str] = None
4855

4956

5057
class StreamOptions:
5158

5259
def __init__(self): # type: ignore
5360
self._filter_set: Dict[symbol, Described] = {}
5461

55-
def offset(self, offset_spefication: Union[OffsetSpecification, int]) -> None:
56-
if isinstance(offset_spefication, int):
62+
def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
63+
if isinstance(offset_specification, int):
5764
self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described(
58-
symbol(STREAM_OFFSET_SPEC), offset_spefication
65+
symbol(STREAM_OFFSET_SPEC), offset_specification
5966
)
6067
else:
6168
self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described(
62-
symbol(STREAM_OFFSET_SPEC), offset_spefication.name
69+
symbol(STREAM_OFFSET_SPEC), offset_specification.name
6370
)
6471

6572
def filter_values(self, filters: list[str]) -> None:

rabbitmq_amqp_python_client/management.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
from .address_helper import AddressHelper
66
from .common import CommonValues, QueueType
77
from .entities import (
8-
BindingSpecification,
98
ExchangeSpecification,
9+
ExchangeToExchangeBindingSpecification,
10+
ExchangeToQueueBindingSpecification,
1011
QueueInfo,
1112
)
1213
from .exceptions import ValidationCodeException
@@ -301,12 +302,25 @@ def _validate_reponse_code(
301302
"wrong response code received: " + str(response_code)
302303
)
303304

304-
def bind(self, bind_specification: BindingSpecification) -> str:
305+
def bind(
306+
self,
307+
bind_specification: Union[
308+
ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification
309+
],
310+
) -> str:
305311
logger.debug("Bind Operation called")
312+
306313
body = {}
307-
body["binding_key"] = bind_specification.binding_key
314+
if bind_specification.binding_key is not None:
315+
body["binding_key"] = bind_specification.binding_key
316+
else:
317+
body["binding_key"] = ""
308318
body["source"] = bind_specification.source_exchange
309-
body["destination_queue"] = bind_specification.destination_queue
319+
if isinstance(bind_specification, ExchangeToQueueBindingSpecification):
320+
body["destination_queue"] = bind_specification.destination_queue
321+
elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification):
322+
body["destination_exchange"] = bind_specification.destination_exchange
323+
310324
body["arguments"] = {} # type: ignore
311325

312326
path = AddressHelper.path_address()
@@ -319,17 +333,43 @@ def bind(self, bind_specification: BindingSpecification) -> str:
319333
CommonValues.response_code_204.value,
320334
],
321335
)
336+
binding_path = ""
322337

323-
binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue(
324-
bind_specification
325-
)
326-
return binding_path_with_queue
338+
if isinstance(bind_specification, ExchangeToQueueBindingSpecification):
339+
binding_path = AddressHelper.binding_path_with_exchange_queue(
340+
bind_specification
341+
)
342+
elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification):
343+
binding_path = AddressHelper.binding_path_with_exchange_exchange(
344+
bind_specification
345+
)
327346

328-
def unbind(self, binding_exchange_queue_path: str) -> None:
347+
return binding_path
348+
349+
def unbind(
350+
self,
351+
bind_specification: Union[
352+
str,
353+
ExchangeToQueueBindingSpecification,
354+
ExchangeToExchangeBindingSpecification,
355+
],
356+
) -> None:
329357
logger.debug("UnBind Operation called")
358+
binding_name = ""
359+
if isinstance(bind_specification, str):
360+
binding_name = bind_specification
361+
else:
362+
if isinstance(bind_specification, ExchangeToQueueBindingSpecification):
363+
binding_name = AddressHelper.binding_path_with_exchange_queue(
364+
bind_specification
365+
)
366+
elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification):
367+
binding_name = AddressHelper.binding_path_with_exchange_exchange(
368+
bind_specification
369+
)
330370
self.request(
331371
None,
332-
binding_exchange_queue_path,
372+
binding_name,
333373
CommonValues.command_delete.value,
334374
[
335375
CommonValues.response_code_204.value,

0 commit comments

Comments
 (0)