diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 75edee1..70f9d46 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -163,34 +163,39 @@ def _declare_queue( body = {} args: dict[str, Any] = {} - body["auto_delete"] = queue_specification.is_auto_delete - body["durable"] = queue_specification.is_durable - if queue_specification.dead_letter_exchange is not None: args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange if queue_specification.dead_letter_routing_key is not None: args["x-dead-letter-routing-key"] = ( queue_specification.dead_letter_routing_key ) - if queue_specification.overflow is not None: - args["x-overflow"] = queue_specification.overflow + if queue_specification.overflow_behaviour is not None: + args["x-overflow"] = queue_specification.overflow_behaviour if queue_specification.max_len is not None: args["x-max-length"] = queue_specification.max_len if queue_specification.max_len_bytes is not None: args["x-max-length-bytes"] = queue_specification.max_len_bytes if queue_specification.message_ttl is not None: - args["x-message-ttl"] = queue_specification.message_ttl - if queue_specification.expires is not None: - args["x-expires"] = queue_specification.expires + args["x-message-ttl"] = int( + queue_specification.message_ttl.total_seconds() * 1000 + ) + if queue_specification.auto_expires is not None: + args["x-expires"] = int( + queue_specification.auto_expires.total_seconds() * 1000 + ) if queue_specification.single_active_consumer is not None: args["x-single-active-consumer"] = ( queue_specification.single_active_consumer ) if isinstance(queue_specification, ClassicQueueSpecification): + body["auto_delete"] = queue_specification.is_auto_delete + body["durable"] = queue_specification.is_durable + body["exclusive"] = queue_specification.is_exclusive + args["x-queue-type"] = QueueType.classic.value - if queue_specification.maximum_priority is not None: - args["x-maximum-priority"] = queue_specification.maximum_priority + if queue_specification.max_priority is not None: + args["x-max-priority"] = queue_specification.max_priority if isinstance(queue_specification, QuorumQueueSpecification): args["x-queue-type"] = QueueType.quorum.value @@ -203,12 +208,17 @@ def _declare_queue( ) if queue_specification.quorum_initial_group_size is not None: - args["x-initial-quorum-group-size"] = ( + args["x-quorum-initial-group-size"] = ( queue_specification.quorum_initial_group_size ) - if queue_specification.cluster_target_size is not None: - args["cluster_target_size"] = queue_specification.cluster_target_size + if queue_specification.cluster_target_group_size is not None: + args["x-quorum-target-group-size"] = ( + queue_specification.cluster_target_group_size + ) + + if queue_specification.leader_locator is not None: + args["x-queue-leader-locator"] = queue_specification.leader_locator body["arguments"] = args # type: ignore @@ -226,22 +236,26 @@ def _declare_stream( if stream_specification.max_len_bytes is not None: args["x-max-length-bytes"] = stream_specification.max_len_bytes - if stream_specification.max_time_retention is not None: - args["x-max-time-retention"] = stream_specification.max_time_retention + if stream_specification.max_age is not None: + args["x-max-age"] = ( + str(int(stream_specification.max_age.total_seconds())) + "s" + ) - if stream_specification.max_segment_size_in_bytes is not None: - args["x-max-segment-size-in-bytes"] = ( - stream_specification.max_segment_size_in_bytes + if stream_specification.stream_max_segment_size_bytes is not None: + args["x-stream-max-segment-size-bytes"] = ( + stream_specification.stream_max_segment_size_bytes ) - if stream_specification.filter_size is not None: - args["x-filter-size"] = stream_specification.filter_size + if stream_specification.stream_filter_size_bytes is not None: + args["x-stream-filter-size-bytes"] = ( + stream_specification.stream_filter_size_bytes + ) if stream_specification.initial_group_size is not None: args["x-initial-group-size"] = stream_specification.initial_group_size if stream_specification.leader_locator is not None: - args["x-leader-locator"] = stream_specification.leader_locator + args["x-queue-leader-locator"] = stream_specification.leader_locator body["arguments"] = args @@ -276,7 +290,6 @@ def delete_queue(self, name: str) -> None: def _validate_reponse_code( self, response_code: int, expected_response_codes: list[int] ) -> None: - logger.debug("response_code received: " + str(response_code)) if response_code == CommonValues.response_code_409.value: raise ValidationCodeException("ErrPreconditionFailed") diff --git a/rabbitmq_amqp_python_client/queues.py b/rabbitmq_amqp_python_client/queues.py index 8b4c4ba..f1bda53 100644 --- a/rabbitmq_amqp_python_client/queues.py +++ b/rabbitmq_amqp_python_client/queues.py @@ -1,26 +1,28 @@ from dataclasses import dataclass +from datetime import timedelta from typing import Optional @dataclass class QueueSpecification: name: str - expires: Optional[int] = None - message_ttl: Optional[int] = None - overflow: Optional[str] = None + auto_expires: Optional[timedelta] = None + message_ttl: Optional[timedelta] = None + overflow_behaviour: Optional[str] = None single_active_consumer: Optional[bool] = None dead_letter_exchange: Optional[str] = None dead_letter_routing_key: Optional[str] = None max_len: Optional[int] = None max_len_bytes: Optional[int] = None leader_locator: Optional[str] = None - is_auto_delete: bool = False - is_durable: bool = True @dataclass class ClassicQueueSpecification(QueueSpecification): - maximum_priority: Optional[int] = None + max_priority: Optional[int] = None + is_auto_delete: bool = False + is_exclusive: bool = False + is_durable: bool = True @dataclass @@ -28,15 +30,15 @@ class QuorumQueueSpecification(QueueSpecification): deliver_limit: Optional[int] = None dead_letter_strategy: Optional[str] = None quorum_initial_group_size: Optional[int] = None - cluster_target_size: Optional[int] = None + cluster_target_group_size: Optional[int] = None @dataclass class StreamSpecification: name: str max_len_bytes: Optional[int] = None - max_time_retention: Optional[int] = None - max_segment_size_in_bytes: Optional[int] = None - filter_size: Optional[int] = None + max_age: Optional[timedelta] = None + stream_max_segment_size_bytes: Optional[int] = None + stream_filter_size_bytes: Optional[int] = None initial_group_size: Optional[int] = None leader_locator: Optional[str] = None diff --git a/tests/test_management.py b/tests/test_management.py index f3aba80..e91a131 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -1,7 +1,10 @@ +from datetime import timedelta + from rabbitmq_amqp_python_client import ( BindingSpecification, ClassicQueueSpecification, ExchangeSpecification, + ExchangeType, Management, QueueType, QuorumQueueSpecification, @@ -25,6 +28,28 @@ def test_declare_delete_exchange(management: Management) -> None: management.delete_exchange(exchange_name) +def test_declare_delete_exchange_with_args(management: Management) -> None: + + exchange_name = "test-exchange-with-args" + + exchange_arguments = {} + exchange_arguments["test"] = "test" + + exchange_info = management.declare_exchange( + ExchangeSpecification( + name=exchange_name, + exchange_type=ExchangeType.topic, + arguments=exchange_arguments, + ) + ) + + assert exchange_info.name == exchange_name + assert exchange_info.exchange_type == ExchangeType.topic + assert exchange_info.arguments == exchange_arguments + + management.delete_exchange(exchange_name) + + def test_declare_purge_delete_queue(management: Management) -> None: queue_name = "my_queue" @@ -88,7 +113,7 @@ def test_queue_info_with_validations(management: Management) -> None: assert queue_info.name == queue_name assert queue_info.queue_type == QueueType.quorum - assert queue_info.is_durable == queue_specification.is_durable + assert queue_info.is_durable is True assert queue_info.message_count == 0 @@ -115,25 +140,23 @@ def test_queue_precondition_fail(management: Management) -> None: queue_name = "test-queue_precondition_fail" - queue_specification = QuorumQueueSpecification( - name=queue_name, is_auto_delete=False - ) + queue_specification = QuorumQueueSpecification(name=queue_name, max_len_bytes=100) management.declare_queue(queue_specification) management.declare_queue(queue_specification) queue_specification = QuorumQueueSpecification( name=queue_name, - is_auto_delete=True, + max_len_bytes=200, ) - management.delete_queue(queue_name) - try: management.declare_queue(queue_specification) except ValidationCodeException: test_failure = False + management.delete_queue(queue_name) + assert test_failure is False @@ -141,7 +164,7 @@ def test_declare_classic_queue(management: Management) -> None: queue_name = "test-declare_classic_queue" - queue_specification = QuorumQueueSpecification( + queue_specification = ClassicQueueSpecification( name=queue_name, is_auto_delete=False, ) @@ -154,81 +177,192 @@ def test_declare_classic_queue(management: Management) -> None: def test_declare_classic_queue_with_args(management: Management) -> None: - queue_name = "test-queue_with_args" + queue_name = "test-queue_with_args-2" queue_specification = ClassicQueueSpecification( name=queue_name, is_auto_delete=False, + is_exclusive=False, + is_durable=True, dead_letter_exchange="my_exchange", dead_letter_routing_key="my_key", - max_len=50000000, + max_len=500000, max_len_bytes=1000000000, - expires=2000, + message_ttl=timedelta(seconds=2), + overflow_behaviour="reject-publish", + auto_expires=timedelta(seconds=10), single_active_consumer=True, + max_priority=100, ) - queue_info = management.declare_queue(queue_specification) + management.declare_queue(queue_specification) + + queue_info = management.queue_info(queue_name) assert queue_specification.name == queue_info.name assert queue_specification.is_auto_delete == queue_info.is_auto_delete - assert queue_specification.dead_letter_exchange == queue_info.dead_letter_exchange + assert queue_specification.is_exclusive == queue_info.is_exclusive + assert queue_specification.is_durable == queue_info.is_durable + assert ( + queue_specification.message_ttl.total_seconds() * 1000 + ) == queue_info.arguments["x-message-ttl"] + assert queue_specification.overflow_behaviour == queue_info.arguments["x-overflow"] + assert ( + queue_specification.auto_expires.total_seconds() * 1000 + ) == queue_info.arguments["x-expires"] + assert queue_specification.max_priority == queue_info.arguments["x-max-priority"] + + assert ( + queue_specification.dead_letter_exchange + == queue_info.arguments["x-dead-letter-exchange"] + ) assert ( queue_specification.dead_letter_routing_key - == queue_info.dead_letter_routing_key + == queue_info.arguments["x-dead-letter-routing-key"] ) - assert queue_specification.max_len == queue_info.max_len - assert queue_specification.max_len_bytes == queue_info.max_len_bytes - assert queue_specification.expires == queue_info.expires + assert queue_specification.max_len == queue_info.arguments["x-max-length"] + assert ( + queue_specification.max_len_bytes == queue_info.arguments["x-max-length-bytes"] + ) + assert ( - queue_specification.single_active_consumer == queue_info.single_active_consumer + queue_specification.single_active_consumer + == queue_info.arguments["x-single-active-consumer"] ) management.delete_queue(queue_name) -def test_declare_classic_queue_with_invalid_args(management: Management) -> None: +def test_declare_quorum_queue_with_args(management: Management) -> None: + queue_name = "test-queue_with_args" - test_failure = True - queue_specification = ClassicQueueSpecification( + queue_specification = QuorumQueueSpecification( name=queue_name, - max_len=-5, + dead_letter_exchange="my_exchange", + dead_letter_routing_key="my_key", + max_len=500000, + max_len_bytes=1000000000, + message_ttl=timedelta(seconds=2), + overflow_behaviour="reject-publish", + auto_expires=timedelta(seconds=2), + single_active_consumer=True, + deliver_limit=10, + dead_letter_strategy="at-least-once", + quorum_initial_group_size=5, + cluster_target_group_size=5, ) - try: - management.declare_queue(queue_specification) - except ValidationCodeException: - test_failure = False + management.declare_queue(queue_specification) - management.delete_queue(queue_name) + queue_info = management.queue_info(queue_name) - assert test_failure is False + assert queue_specification.name == queue_info.name + assert queue_info.is_auto_delete is False + assert queue_info.is_exclusive is False + assert queue_info.is_durable is True + assert ( + queue_specification.message_ttl.total_seconds() * 1000 + ) == queue_info.arguments["x-message-ttl"] + assert queue_specification.overflow_behaviour == queue_info.arguments["x-overflow"] + assert ( + queue_specification.auto_expires.total_seconds() * 1000 + ) == queue_info.arguments["x-expires"] + + assert ( + queue_specification.dead_letter_exchange + == queue_info.arguments["x-dead-letter-exchange"] + ) + assert ( + queue_specification.dead_letter_routing_key + == queue_info.arguments["x-dead-letter-routing-key"] + ) + assert queue_specification.max_len == queue_info.arguments["x-max-length"] + assert ( + queue_specification.max_len_bytes == queue_info.arguments["x-max-length-bytes"] + ) + + assert ( + queue_specification.single_active_consumer + == queue_info.arguments["x-single-active-consumer"] + ) + + assert queue_specification.deliver_limit == queue_info.arguments["x-deliver-limit"] + assert ( + queue_specification.dead_letter_strategy + == queue_info.arguments["x-dead-letter-strategy"] + ) + assert ( + queue_specification.quorum_initial_group_size + == queue_info.arguments["x-quorum-initial-group-size"] + ) + assert ( + queue_specification.cluster_target_group_size + == queue_info.arguments["x-quorum-target-group-size"] + ) + + management.delete_queue(queue_name) def test_declare_stream_with_args(management: Management) -> None: + stream_name = "test-stream_with_args" stream_specification = StreamSpecification( name=stream_name, - max_len_bytes=1000000000, - max_time_retention=10000000, - max_segment_size_in_bytes=100000000, - filter_size=1000, - initial_group_size=3, - leader_locator="node1", + max_len_bytes=1000, + max_age=timedelta(seconds=200000), + stream_max_segment_size_bytes=200, + stream_filter_size_bytes=100, + initial_group_size=5, ) - stream_info = management.declare_queue(stream_specification) + management.declare_queue(stream_specification) + + stream_info = management.queue_info(stream_name) assert stream_specification.name == stream_info.name - assert stream_specification.max_len_bytes == stream_info.max_len_bytes - assert stream_specification.max_time_retention == stream_info.max_time_retention + assert stream_info.is_auto_delete is False + assert stream_info.is_exclusive is False + assert stream_info.is_durable is True + assert ( + stream_specification.max_len_bytes + == stream_info.arguments["x-max-length-bytes"] + ) assert ( - stream_specification.max_segment_size_in_bytes - == stream_info.max_segment_size_in_bytes + str(int(stream_specification.max_age.total_seconds())) + "s" + == stream_info.arguments["x-max-age"] + ) + assert ( + stream_specification.stream_max_segment_size_bytes + == stream_info.arguments["x-stream-max-segment-size-bytes"] + ) + assert ( + stream_specification.stream_filter_size_bytes + == stream_info.arguments["x-stream-filter-size-bytes"] + ) + assert ( + stream_specification.initial_group_size + == stream_info.arguments["x-initial-group-size"] ) - assert stream_specification.filter_size == stream_info.filter_size - assert stream_specification.initial_group_size == stream_info.initial_group_size - assert stream_specification.leader_locator == stream_info.leader_locator management.delete_queue(stream_name) + + +def test_declare_classic_queue_with_invalid_args(management: Management) -> None: + queue_name = "test-queue_with_args" + test_failure = True + + queue_specification = ClassicQueueSpecification( + name=queue_name, + max_len=-5, + ) + + try: + management.declare_queue(queue_specification) + except ValidationCodeException: + test_failure = False + + management.delete_queue(queue_name) + + assert test_failure is False