diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index dca93f4..bc4afb7 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -6,6 +6,7 @@ from .connection import Connection from .consumer import Consumer from .entities import ( + ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, @@ -75,4 +76,5 @@ "OffsetSpecification", "OutcomeState", "Environment", + "ExchangeCustomSpecification", ] diff --git a/rabbitmq_amqp_python_client/common.py b/rabbitmq_amqp_python_client/common.py index eeef54f..1aa8f73 100644 --- a/rabbitmq_amqp_python_client/common.py +++ b/rabbitmq_amqp_python_client/common.py @@ -24,6 +24,7 @@ class ExchangeType(enum.Enum): direct = "direct" topic = "topic" fanout = "fanout" + headers = "headers" class QueueType(enum.Enum): diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8f15a3c..e123c59 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -20,6 +20,16 @@ class ExchangeSpecification: is_durable: bool = True +@dataclass +class ExchangeCustomSpecification: + name: str + exchange_type: str + arguments: dict[str, str] = field(default_factory=dict) + is_auto_delete: bool = False + is_internal: bool = False + is_durable: bool = True + + @dataclass class QueueInfo: name: str diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 333f0a6..429f89e 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -5,6 +5,7 @@ from .address_helper import AddressHelper from .common import CommonValues, QueueType from .entities import ( + ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, @@ -98,15 +99,21 @@ def _request( return msg def declare_exchange( - self, exchange_specification: ExchangeSpecification - ) -> ExchangeSpecification: + self, + exchange_specification: Union[ + ExchangeSpecification, ExchangeCustomSpecification + ], + ) -> Union[ExchangeSpecification, ExchangeCustomSpecification]: logger.debug("declare_exchange operation called") - body = {} + body: dict[str, Any] = {} body["auto_delete"] = exchange_specification.is_auto_delete body["durable"] = exchange_specification.is_durable - body["type"] = exchange_specification.exchange_type.value # type: ignore + if isinstance(exchange_specification, ExchangeSpecification): + body["type"] = exchange_specification.exchange_type.value + elif isinstance(exchange_specification, ExchangeCustomSpecification): + body["type"] = exchange_specification.exchange_type body["internal"] = exchange_specification.is_internal - body["arguments"] = exchange_specification.arguments # type: ignore + body["arguments"] = exchange_specification.arguments path = AddressHelper.exchange_address(exchange_specification.name) diff --git a/tests/test_management.py b/tests/test_management.py index 6396780..7d3338a 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -29,6 +29,19 @@ def test_declare_delete_exchange(management: Management) -> None: management.delete_exchange(exchange_name) +def test_declare_delete_exchange_headers(management: Management) -> None: + + exchange_name = "test-exchange" + + exchange_info = management.declare_exchange( + ExchangeSpecification(name=exchange_name, exchange_type=ExchangeType.headers) + ) + + assert exchange_info.name == exchange_name + + management.delete_exchange(exchange_name) + + def test_declare_delete_exchange_with_args(management: Management) -> None: exchange_name = "test-exchange-with-args"