From 515e7feb652342ecb4c0f730b07becd7b9b110c2 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 18 Feb 2025 15:32:38 +0100 Subject: [PATCH 1/3] improve binding/unbinding operarations --- rabbitmq_amqp_python_client/__init__.py | 2 + rabbitmq_amqp_python_client/address_helper.py | 47 +++++-- rabbitmq_amqp_python_client/entities.py | 5 +- rabbitmq_amqp_python_client/exceptions.py | 9 ++ rabbitmq_amqp_python_client/management.py | 67 ++++++++-- tests/test_management.py | 121 ++++++++++++++++++ 6 files changed, 230 insertions(+), 21 deletions(-) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 6bf5811..a7a7e88 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -13,6 +13,7 @@ ) from .environment import Environment from .exceptions import ( + AmqpValidationException, ArgumentOutOfRangeException, ValidationCodeException, ) @@ -73,4 +74,5 @@ "OffsetSpecification", "OutcomeState", "Environment", + "AmqpValidationException", ] diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index 4e05470..fa26f46 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -1,3 +1,5 @@ +from typing import Optional + from .entities import BindingSpecification from .qpid.proton._message import Message @@ -7,19 +9,22 @@ def _is_unreserved(char: str) -> bool: return char.isalnum() or char in "-._~" -def encode_path_segment(input_string: str) -> str: +def encode_path_segment(input_string: Optional[str]) -> str: encoded = [] # Iterate over each character in the input string - for char in input_string: - # Check if the character is an unreserved character - if _is_unreserved(char): - encoded.append(char) # Append as is - else: - # Encode character to %HH format - encoded.append(f"%{ord(char):02X}") + if input_string is not None: + for char in input_string: + # Check if the character is an unreserved character + if _is_unreserved(char): + encoded.append(char) # Append as is + else: + # Encode character to %HH format + encoded.append(f"%{ord(char):02X}") + + return "".join(encoded) - return "".join(encoded) + return "" class AddressHelper: @@ -60,6 +65,11 @@ def path_address() -> str: def binding_path_with_exchange_queue( bind_specification: BindingSpecification, ) -> str: + if bind_specification.binding_key is not None: + key = ";key=" + encode_path_segment(bind_specification.binding_key) + else: + key = ";key=" + binding_path_wth_exchange_queue_key = ( "/bindings" + "/" @@ -68,11 +78,28 @@ def binding_path_with_exchange_queue( + ";" + "dstq=" + encode_path_segment(bind_specification.destination_queue) + + key + + ";args=" + ) + return binding_path_wth_exchange_queue_key + + @staticmethod + def binding_path_with_exchange_exchange( + bind_specification: BindingSpecification, + ) -> str: + binding_path_wth_exchange_exchange_key = ( + "/bindings" + + "/" + + "src=" + + encode_path_segment(bind_specification.source_exchange) + + ";" + + "dstq=" + + encode_path_segment(bind_specification.destination_exchange) + ";key=" + encode_path_segment(bind_specification.binding_key) + ";args=" ) - return binding_path_wth_exchange_queue_key + return binding_path_wth_exchange_exchange_key @staticmethod def message_to_address_helper(message: Message, address: str) -> Message: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 54c3e4a..f9852f6 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -43,8 +43,9 @@ class OffsetSpecification(Enum): @dataclass class BindingSpecification: source_exchange: str - destination_queue: str - binding_key: str + binding_key: Optional[str] = None + destination_exchange: Optional[str] = None + destination_queue: Optional[str] = None class StreamOptions: diff --git a/rabbitmq_amqp_python_client/exceptions.py b/rabbitmq_amqp_python_client/exceptions.py index 9544dcc..1c212f4 100644 --- a/rabbitmq_amqp_python_client/exceptions.py +++ b/rabbitmq_amqp_python_client/exceptions.py @@ -14,3 +14,12 @@ def __init__(self, msg: str): def __str__(self) -> str: return repr(self.msg) + + +class AmqpValidationException(BaseException): + # Constructor or Initializer + def __init__(self, msg: str): + self.msg = msg + + def __str__(self) -> str: + return repr(self.msg) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 70f9d46..c7818aa 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -9,7 +9,10 @@ ExchangeSpecification, QueueInfo, ) -from .exceptions import ValidationCodeException +from .exceptions import ( + AmqpValidationException, + ValidationCodeException, +) from .options import ReceiverOption, SenderOption from .qpid.proton._message import Message from .qpid.proton.utils import ( @@ -303,10 +306,19 @@ def _validate_reponse_code( def bind(self, bind_specification: BindingSpecification) -> str: logger.debug("Bind Operation called") + self._validate_binding(bind_specification) + body = {} - body["binding_key"] = bind_specification.binding_key + if bind_specification.binding_key is not None: + body["binding_key"] = bind_specification.binding_key + else: + body["binding_key"] = "" body["source"] = bind_specification.source_exchange - body["destination_queue"] = bind_specification.destination_queue + if bind_specification.destination_queue is not None: + body["destination_queue"] = bind_specification.destination_queue + elif bind_specification.destination_exchange is not None: + body["destination_exchange"] = bind_specification.destination_exchange + body["arguments"] = {} # type: ignore path = AddressHelper.path_address() @@ -320,16 +332,53 @@ def bind(self, bind_specification: BindingSpecification) -> str: ], ) - binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue( - bind_specification - ) - return binding_path_with_queue + if bind_specification.destination_queue is not None: + binding_path = AddressHelper.binding_path_with_exchange_queue( + bind_specification + ) + elif bind_specification.destination_exchange is not None: + binding_path = AddressHelper.binding_path_with_exchange_exchange( + bind_specification + ) - def unbind(self, binding_exchange_queue_path: str) -> None: + return binding_path + + def _validate_binding(self, bind_specification: BindingSpecification) -> None: + if ( + bind_specification.destination_queue is not None + and bind_specification.destination_exchange is not None + ): + raise AmqpValidationException( + "just one of destination_queue and destination_exchange of BindingSpecification must be set " + "for a binding operation" + ) + + if ( + bind_specification.destination_queue is None + and bind_specification.destination_exchange is None + ): + raise AmqpValidationException( + "at least one of destination_queue and destination_exchange of BindingSpecification must be set " + "for a binding operation" + ) + + def unbind(self, bind_specification: Union[BindingSpecification, str]) -> None: logger.debug("UnBind Operation called") + if isinstance(bind_specification, str): + binding_name = bind_specification + else: + self._validate_binding(bind_specification) + if bind_specification.destination_queue is not None: + binding_name = AddressHelper.binding_path_with_exchange_queue( + bind_specification + ) + elif bind_specification.destination_exchange is not None: + binding_name = AddressHelper.binding_path_with_exchange_exchange( + bind_specification + ) self.request( None, - binding_exchange_queue_path, + binding_name, CommonValues.command_delete.value, [ CommonValues.response_code_204.value, diff --git a/tests/test_management.py b/tests/test_management.py index e91a131..0e06016 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -1,6 +1,7 @@ from datetime import timedelta from rabbitmq_amqp_python_client import ( + AmqpValidationException, BindingSpecification, ClassicQueueSpecification, ExchangeSpecification, @@ -98,6 +99,126 @@ def test_bind_exchange_to_queue(management: Management) -> None: management.unbind(binding_exchange_queue_path) +def test_bind_no_destination(management: Management) -> None: + + exchange_name = "test-bind-exchange-to-queue-exchange" + queue_name = "test-bind-exchange-to-queue-queue" + routing_key = "routing-key" + raised = False + + management.declare_exchange(ExchangeSpecification(name=exchange_name)) + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + try: + management.bind( + BindingSpecification( + source_exchange=exchange_name, + binding_key=routing_key, + ) + ) + except AmqpValidationException: + raised = True + + assert raised is True + + management.delete_exchange(exchange_name) + + management.delete_queue(queue_name) + + +def test_bind_exchange_to_queue_without_key(management: Management) -> None: + + exchange_name = "test-bind-exchange-to-queue-exchange" + queue_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=exchange_name)) + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + binding_exchange_queue_path = management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + ) + ) + + assert ( + binding_exchange_queue_path + == "/bindings/src=" + exchange_name + ";dstq=" + queue_name + ";key=" + ";args=" + ) + + management.unbind(binding_exchange_queue_path) + + management.delete_exchange(exchange_name) + + management.delete_queue(queue_name) + + +def test_bind_unbind_by_binding_spec(management: Management) -> None: + + exchange_name = "test-bind-exchange-to-queue-exchange" + queue_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=exchange_name)) + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + ) + ) + + management.unbind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + ) + ) + + management.delete_exchange(exchange_name) + + management.delete_queue(queue_name) + + +def test_bind_exchange_to_exchange(management: Management) -> None: + + source_exchange_name = "source_exchange" + destination_exchange_name = "destination_exchange" + routing_key = "routing-key" + + management.declare_exchange(ExchangeSpecification(name=source_exchange_name)) + + management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) + + binding_exchange_exchange_path = management.bind( + BindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + binding_key=routing_key, + ) + ) + + assert ( + binding_exchange_exchange_path + == "/bindings/src=" + + source_exchange_name + + ";dstq=" + + destination_exchange_name + + ";key=" + + routing_key + + ";args=" + ) + + management.unbind(binding_exchange_exchange_path) + + management.delete_exchange(source_exchange_name) + + management.delete_exchange(destination_exchange_name) + + def test_queue_info_with_validations(management: Management) -> None: queue_name = "test_queue_info_with_validation" From 4d5949cedc397ba9cc070f05c23ed858a353d69e Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 18 Feb 2025 16:52:44 +0100 Subject: [PATCH 2/3] fixes for code review --- examples/getting_started/getting_started.py | 4 +- examples/reconnection/reconnection_example.py | 4 +- examples/tls/tls_example.py | 4 +- rabbitmq_amqp_python_client/__init__.py | 8 +- rabbitmq_amqp_python_client/address_helper.py | 9 +- rabbitmq_amqp_python_client/entities.py | 12 ++- rabbitmq_amqp_python_client/exceptions.py | 9 -- rabbitmq_amqp_python_client/management.py | 57 ++++------ tests/test_management.py | 101 +++++++++++++----- tests/utils.py | 6 +- 10 files changed, 123 insertions(+), 91 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index ecbbc4a..9960a7b 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -4,11 +4,11 @@ from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, AMQPMessagingHandler, - BindingSpecification, Connection, Environment, Event, ExchangeSpecification, + ExchangeToQueueBindingSpecification, Message, OutcomeState, QuorumQueueSpecification, @@ -102,7 +102,7 @@ def main() -> None: print("binding queue to exchange") bind_name = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 34bfb40..3cf851b 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -8,13 +8,13 @@ from rabbitmq_amqp_python_client import ( AddressHelper, AMQPMessagingHandler, - BindingSpecification, Connection, ConnectionClosed, Consumer, Environment, Event, ExchangeSpecification, + ExchangeToQueueBindingSpecification, Management, Message, Publisher, @@ -160,7 +160,7 @@ def main() -> None: print("binding queue to exchange") bind_name = connection_configuration.management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 48ec488..d118c71 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -4,12 +4,12 @@ from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, AMQPMessagingHandler, - BindingSpecification, ClientCert, Connection, Environment, Event, ExchangeSpecification, + ExchangeToQueueBindingSpecification, Message, QuorumQueueSpecification, SslConfigurationContext, @@ -102,7 +102,7 @@ def main() -> None: print("binding queue to exchange") bind_name = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index a7a7e88..dca93f4 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -6,14 +6,14 @@ from .connection import Connection from .consumer import Consumer from .entities import ( - BindingSpecification, ExchangeSpecification, + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, OffsetSpecification, StreamOptions, ) from .environment import Environment from .exceptions import ( - AmqpValidationException, ArgumentOutOfRangeException, ValidationCodeException, ) @@ -53,7 +53,8 @@ "QuorumQueueSpecification", "ClassicQueueSpecification", "StreamSpecification", - "BindingSpecification", + "ExchangeToQueueBindingSpecification", + "ExchangeToExchangeBindingSpecification", "QueueType", "Publisher", "Message", @@ -74,5 +75,4 @@ "OffsetSpecification", "OutcomeState", "Environment", - "AmqpValidationException", ] diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index fa26f46..138f724 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -1,6 +1,9 @@ from typing import Optional -from .entities import BindingSpecification +from .entities import ( + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, +) from .qpid.proton._message import Message @@ -63,7 +66,7 @@ def path_address() -> str: @staticmethod def binding_path_with_exchange_queue( - bind_specification: BindingSpecification, + bind_specification: ExchangeToQueueBindingSpecification, ) -> str: if bind_specification.binding_key is not None: key = ";key=" + encode_path_segment(bind_specification.binding_key) @@ -85,7 +88,7 @@ def binding_path_with_exchange_queue( @staticmethod def binding_path_with_exchange_exchange( - bind_specification: BindingSpecification, + bind_specification: ExchangeToExchangeBindingSpecification, ) -> str: binding_path_wth_exchange_exchange_key = ( "/bindings" diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index f9852f6..05b3501 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -41,11 +41,17 @@ class OffsetSpecification(Enum): @dataclass -class BindingSpecification: +class ExchangeToQueueBindingSpecification: source_exchange: str + destination_queue: str + binding_key: Optional[str] = None + + +@dataclass +class ExchangeToExchangeBindingSpecification: + source_exchange: str + destination_exchange: str binding_key: Optional[str] = None - destination_exchange: Optional[str] = None - destination_queue: Optional[str] = None class StreamOptions: diff --git a/rabbitmq_amqp_python_client/exceptions.py b/rabbitmq_amqp_python_client/exceptions.py index 1c212f4..9544dcc 100644 --- a/rabbitmq_amqp_python_client/exceptions.py +++ b/rabbitmq_amqp_python_client/exceptions.py @@ -14,12 +14,3 @@ def __init__(self, msg: str): def __str__(self) -> str: return repr(self.msg) - - -class AmqpValidationException(BaseException): - # Constructor or Initializer - def __init__(self, msg: str): - self.msg = msg - - def __str__(self) -> str: - return repr(self.msg) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index c7818aa..fa377a6 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -5,14 +5,12 @@ from .address_helper import AddressHelper from .common import CommonValues, QueueType from .entities import ( - BindingSpecification, ExchangeSpecification, + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, QueueInfo, ) -from .exceptions import ( - AmqpValidationException, - ValidationCodeException, -) +from .exceptions import ValidationCodeException from .options import ReceiverOption, SenderOption from .qpid.proton._message import Message from .qpid.proton.utils import ( @@ -304,9 +302,13 @@ def _validate_reponse_code( "wrong response code received: " + str(response_code) ) - def bind(self, bind_specification: BindingSpecification) -> str: + def bind( + self, + bind_specification: Union[ + ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification + ], + ) -> str: logger.debug("Bind Operation called") - self._validate_binding(bind_specification) body = {} if bind_specification.binding_key is not None: @@ -314,9 +316,9 @@ def bind(self, bind_specification: BindingSpecification) -> str: else: body["binding_key"] = "" body["source"] = bind_specification.source_exchange - if bind_specification.destination_queue is not None: + if isinstance(bind_specification, ExchangeToQueueBindingSpecification): body["destination_queue"] = bind_specification.destination_queue - elif bind_specification.destination_exchange is not None: + elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification): body["destination_exchange"] = bind_specification.destination_exchange body["arguments"] = {} # type: ignore @@ -332,47 +334,34 @@ def bind(self, bind_specification: BindingSpecification) -> str: ], ) - if bind_specification.destination_queue is not None: + if isinstance(bind_specification, ExchangeToQueueBindingSpecification): binding_path = AddressHelper.binding_path_with_exchange_queue( bind_specification ) - elif bind_specification.destination_exchange is not None: + elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification): binding_path = AddressHelper.binding_path_with_exchange_exchange( bind_specification ) return binding_path - def _validate_binding(self, bind_specification: BindingSpecification) -> None: - if ( - bind_specification.destination_queue is not None - and bind_specification.destination_exchange is not None - ): - raise AmqpValidationException( - "just one of destination_queue and destination_exchange of BindingSpecification must be set " - "for a binding operation" - ) - - if ( - bind_specification.destination_queue is None - and bind_specification.destination_exchange is None - ): - raise AmqpValidationException( - "at least one of destination_queue and destination_exchange of BindingSpecification must be set " - "for a binding operation" - ) - - def unbind(self, bind_specification: Union[BindingSpecification, str]) -> None: + def unbind( + self, + bind_specification: Union[ + str, + ExchangeToQueueBindingSpecification, + ExchangeToExchangeBindingSpecification, + ], + ) -> None: logger.debug("UnBind Operation called") if isinstance(bind_specification, str): binding_name = bind_specification else: - self._validate_binding(bind_specification) - if bind_specification.destination_queue is not None: + if isinstance(bind_specification, ExchangeToQueueBindingSpecification): binding_name = AddressHelper.binding_path_with_exchange_queue( bind_specification ) - elif bind_specification.destination_exchange is not None: + elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification): binding_name = AddressHelper.binding_path_with_exchange_exchange( bind_specification ) diff --git a/tests/test_management.py b/tests/test_management.py index 0e06016..6396780 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -1,10 +1,10 @@ from datetime import timedelta from rabbitmq_amqp_python_client import ( - AmqpValidationException, - BindingSpecification, ClassicQueueSpecification, ExchangeSpecification, + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, ExchangeType, Management, QueueType, @@ -74,7 +74,7 @@ def test_bind_exchange_to_queue(management: Management) -> None: management.declare_queue(QuorumQueueSpecification(name=queue_name)) binding_exchange_queue_path = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, @@ -99,60 +99,65 @@ def test_bind_exchange_to_queue(management: Management) -> None: management.unbind(binding_exchange_queue_path) -def test_bind_no_destination(management: Management) -> None: +def test_bind_exchange_to_queue_without_key(management: Management) -> None: exchange_name = "test-bind-exchange-to-queue-exchange" queue_name = "test-bind-exchange-to-queue-queue" - routing_key = "routing-key" - raised = False management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue(QuorumQueueSpecification(name=queue_name)) - try: - management.bind( - BindingSpecification( - source_exchange=exchange_name, - binding_key=routing_key, - ) + binding_exchange_queue_path = management.bind( + ExchangeToQueueBindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, ) - except AmqpValidationException: - raised = True + ) - assert raised is True + assert ( + binding_exchange_queue_path + == "/bindings/src=" + exchange_name + ";dstq=" + queue_name + ";key=" + ";args=" + ) + + management.unbind(binding_exchange_queue_path) management.delete_exchange(exchange_name) management.delete_queue(queue_name) -def test_bind_exchange_to_queue_without_key(management: Management) -> None: +def test_bind_exchange_to_exchange_without_key(management: Management) -> None: - exchange_name = "test-bind-exchange-to-queue-exchange" - queue_name = "test-bind-exchange-to-queue-queue" + source_exchange_name = "test-bind-exchange-to-queue-exchange" + destination_exchange_name = "test-bind-exchange-to-queue-queue" - management.declare_exchange(ExchangeSpecification(name=exchange_name)) + management.declare_exchange(ExchangeSpecification(name=source_exchange_name)) - management.declare_queue(QuorumQueueSpecification(name=queue_name)) + management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) binding_exchange_queue_path = management.bind( - BindingSpecification( - source_exchange=exchange_name, - destination_queue=queue_name, + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, ) ) assert ( binding_exchange_queue_path - == "/bindings/src=" + exchange_name + ";dstq=" + queue_name + ";key=" + ";args=" + == "/bindings/src=" + + source_exchange_name + + ";dstq=" + + destination_exchange_name + + ";key=" + + ";args=" ) management.unbind(binding_exchange_queue_path) - management.delete_exchange(exchange_name) + management.delete_exchange(source_exchange_name) - management.delete_queue(queue_name) + management.delete_exchange(destination_exchange_name) def test_bind_unbind_by_binding_spec(management: Management) -> None: @@ -165,14 +170,14 @@ def test_bind_unbind_by_binding_spec(management: Management) -> None: management.declare_queue(QuorumQueueSpecification(name=queue_name)) management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, ) ) management.unbind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, ) @@ -183,6 +188,44 @@ def test_bind_unbind_by_binding_spec(management: Management) -> None: management.delete_queue(queue_name) +def test_bind_unbind_exchange_by_exchange_spec(management: Management) -> None: + + source_exchange_name = "test-bind-exchange-to-queue-exchange" + destination_exchange_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=source_exchange_name)) + + management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) + + binding_exchange_queue_path = management.bind( + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + ) + ) + + assert ( + binding_exchange_queue_path + == "/bindings/src=" + + source_exchange_name + + ";dstq=" + + destination_exchange_name + + ";key=" + + ";args=" + ) + + management.unbind( + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + ) + ) + + management.delete_exchange(source_exchange_name) + + management.delete_exchange(destination_exchange_name) + + def test_bind_exchange_to_exchange(management: Management) -> None: source_exchange_name = "source_exchange" @@ -194,7 +237,7 @@ def test_bind_exchange_to_exchange(management: Management) -> None: management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) binding_exchange_exchange_path = management.bind( - BindingSpecification( + ExchangeToExchangeBindingSpecification( source_exchange=source_exchange_name, destination_exchange=destination_exchange_name, binding_key=routing_key, diff --git a/tests/utils.py b/tests/utils.py index a70ece0..bc4ecca 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,10 +2,10 @@ from rabbitmq_amqp_python_client import ( AddressHelper, - BindingSpecification, Connection, Delivery, ExchangeSpecification, + ExchangeToQueueBindingSpecification, ExchangeType, Management, Message, @@ -55,7 +55,7 @@ def setup_dead_lettering(management: Management) -> str: ) management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering)) bind_path = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_dead_lettering, destination_queue=queue_dead_lettering, binding_key=binding_key, @@ -74,7 +74,7 @@ def create_binding( management.declare_queue(QuorumQueueSpecification(name=queue_name)) bind_name = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, From 48e256791d18635a9c3e395c899ce1dc998389d6 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 18 Feb 2025 19:22:57 +0100 Subject: [PATCH 3/3] Typo and remove two warnings Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/entities.py | 8 ++++---- rabbitmq_amqp_python_client/management.py | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 05b3501..8f15a3c 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -59,14 +59,14 @@ class StreamOptions: def __init__(self): # type: ignore self._filter_set: Dict[symbol, Described] = {} - def offset(self, offset_spefication: Union[OffsetSpecification, int]) -> None: - if isinstance(offset_spefication, int): + def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: + if isinstance(offset_specification, int): self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( - symbol(STREAM_OFFSET_SPEC), offset_spefication + symbol(STREAM_OFFSET_SPEC), offset_specification ) else: self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( - symbol(STREAM_OFFSET_SPEC), offset_spefication.name + symbol(STREAM_OFFSET_SPEC), offset_specification.name ) def filter_values(self, filters: list[str]) -> None: diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index fa377a6..333f0a6 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -333,6 +333,7 @@ def bind( CommonValues.response_code_204.value, ], ) + binding_path = "" if isinstance(bind_specification, ExchangeToQueueBindingSpecification): binding_path = AddressHelper.binding_path_with_exchange_queue( @@ -354,6 +355,7 @@ def unbind( ], ) -> None: logger.debug("UnBind Operation called") + binding_name = "" if isinstance(bind_specification, str): binding_name = bind_specification else: