From ee0e5dd610564d7d5fd309c4cc1c952916dd3074 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 19 Feb 2025 13:23:41 +0100 Subject: [PATCH 1/2] adding headers and custom ExchangeTypes --- rabbitmq_amqp_python_client/common.py | 1 + rabbitmq_amqp_python_client/entities.py | 2 +- rabbitmq_amqp_python_client/management.py | 11 +++++--- tests/test_management.py | 33 +++++++++++++++++++++++ 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/rabbitmq_amqp_python_client/common.py b/rabbitmq_amqp_python_client/common.py index eeef54f..1aa8f73 100644 --- a/rabbitmq_amqp_python_client/common.py +++ b/rabbitmq_amqp_python_client/common.py @@ -24,6 +24,7 @@ class ExchangeType(enum.Enum): direct = "direct" topic = "topic" fanout = "fanout" + headers = "headers" class QueueType(enum.Enum): diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8f15a3c..ae1d361 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -14,7 +14,7 @@ class ExchangeSpecification: name: str arguments: dict[str, str] = field(default_factory=dict) - exchange_type: ExchangeType = ExchangeType.direct + exchange_type: Union[ExchangeType, str] = ExchangeType.direct is_auto_delete: bool = False is_internal: bool = False is_durable: bool = True diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 333f0a6..a3c80ca 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -3,7 +3,7 @@ from typing import Any, Optional, Union from .address_helper import AddressHelper -from .common import CommonValues, QueueType +from .common import CommonValues, ExchangeType, QueueType from .entities import ( ExchangeSpecification, ExchangeToExchangeBindingSpecification, @@ -101,12 +101,15 @@ def declare_exchange( self, exchange_specification: ExchangeSpecification ) -> ExchangeSpecification: logger.debug("declare_exchange operation called") - body = {} + body: dict[str, Any] = {} body["auto_delete"] = exchange_specification.is_auto_delete body["durable"] = exchange_specification.is_durable - body["type"] = exchange_specification.exchange_type.value # type: ignore + if isinstance(exchange_specification.exchange_type, ExchangeType): + body["type"] = exchange_specification.exchange_type.value + else: + body["type"] = str(exchange_specification.exchange_type) body["internal"] = exchange_specification.is_internal - body["arguments"] = exchange_specification.arguments # type: ignore + body["arguments"] = exchange_specification.arguments path = AddressHelper.exchange_address(exchange_specification.name) diff --git a/tests/test_management.py b/tests/test_management.py index 6396780..a28165e 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -29,6 +29,39 @@ def test_declare_delete_exchange(management: Management) -> None: management.delete_exchange(exchange_name) +def test_declare_delete_exchange_headers(management: Management) -> None: + + exchange_name = "test-exchange" + + exchange_info = management.declare_exchange( + ExchangeSpecification(name=exchange_name, exchange_type=ExchangeType.headers) + ) + + assert exchange_info.name == exchange_name + + management.delete_exchange(exchange_name) + + +def test_declare_delete_exchange_custom(management: Management) -> None: + + exchange_name = "test-exchange" + + exchange_arguments = {} + exchange_arguments["x-delayed-type"] = "direct" + + exchange_info = management.declare_exchange( + ExchangeSpecification( + name=exchange_name, + exchange_type="x-local-random", + arguments=exchange_arguments, + ) + ) + + assert exchange_info.name == exchange_name + + management.delete_exchange(exchange_name) + + def test_declare_delete_exchange_with_args(management: Management) -> None: exchange_name = "test-exchange-with-args" From b79036499e555c15c4045bbd63ab9e1c90569f65 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 19 Feb 2025 13:50:11 +0100 Subject: [PATCH 2/2] adding a class for CustomExchangeSpec --- rabbitmq_amqp_python_client/__init__.py | 2 ++ rabbitmq_amqp_python_client/entities.py | 12 +++++++++++- rabbitmq_amqp_python_client/management.py | 16 ++++++++++------ tests/test_management.py | 20 -------------------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index dca93f4..bc4afb7 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -6,6 +6,7 @@ from .connection import Connection from .consumer import Consumer from .entities import ( + ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, @@ -75,4 +76,5 @@ "OffsetSpecification", "OutcomeState", "Environment", + "ExchangeCustomSpecification", ] diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index ae1d361..e123c59 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -14,7 +14,17 @@ class ExchangeSpecification: name: str arguments: dict[str, str] = field(default_factory=dict) - exchange_type: Union[ExchangeType, str] = ExchangeType.direct + exchange_type: ExchangeType = ExchangeType.direct + is_auto_delete: bool = False + is_internal: bool = False + is_durable: bool = True + + +@dataclass +class ExchangeCustomSpecification: + name: str + exchange_type: str + arguments: dict[str, str] = field(default_factory=dict) is_auto_delete: bool = False is_internal: bool = False is_durable: bool = True diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index a3c80ca..429f89e 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -3,8 +3,9 @@ from typing import Any, Optional, Union from .address_helper import AddressHelper -from .common import CommonValues, ExchangeType, QueueType +from .common import CommonValues, QueueType from .entities import ( + ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, @@ -98,16 +99,19 @@ def _request( return msg def declare_exchange( - self, exchange_specification: ExchangeSpecification - ) -> ExchangeSpecification: + self, + exchange_specification: Union[ + ExchangeSpecification, ExchangeCustomSpecification + ], + ) -> Union[ExchangeSpecification, ExchangeCustomSpecification]: logger.debug("declare_exchange operation called") body: dict[str, Any] = {} body["auto_delete"] = exchange_specification.is_auto_delete body["durable"] = exchange_specification.is_durable - if isinstance(exchange_specification.exchange_type, ExchangeType): + if isinstance(exchange_specification, ExchangeSpecification): body["type"] = exchange_specification.exchange_type.value - else: - body["type"] = str(exchange_specification.exchange_type) + elif isinstance(exchange_specification, ExchangeCustomSpecification): + body["type"] = exchange_specification.exchange_type body["internal"] = exchange_specification.is_internal body["arguments"] = exchange_specification.arguments diff --git a/tests/test_management.py b/tests/test_management.py index a28165e..7d3338a 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -42,26 +42,6 @@ def test_declare_delete_exchange_headers(management: Management) -> None: management.delete_exchange(exchange_name) -def test_declare_delete_exchange_custom(management: Management) -> None: - - exchange_name = "test-exchange" - - exchange_arguments = {} - exchange_arguments["x-delayed-type"] = "direct" - - exchange_info = management.declare_exchange( - ExchangeSpecification( - name=exchange_name, - exchange_type="x-local-random", - arguments=exchange_arguments, - ) - ) - - assert exchange_info.name == exchange_name - - management.delete_exchange(exchange_name) - - def test_declare_delete_exchange_with_args(management: Management) -> None: exchange_name = "test-exchange-with-args"