Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 35 additions & 22 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,34 +163,39 @@ def _declare_queue(
body = {}
args: dict[str, Any] = {}

body["auto_delete"] = queue_specification.is_auto_delete
body["durable"] = queue_specification.is_durable

if queue_specification.dead_letter_exchange is not None:
args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
if queue_specification.dead_letter_routing_key is not None:
args["x-dead-letter-routing-key"] = (
queue_specification.dead_letter_routing_key
)
if queue_specification.overflow is not None:
args["x-overflow"] = queue_specification.overflow
if queue_specification.overflow_behaviour is not None:
args["x-overflow"] = queue_specification.overflow_behaviour
if queue_specification.max_len is not None:
args["x-max-length"] = queue_specification.max_len
if queue_specification.max_len_bytes is not None:
args["x-max-length-bytes"] = queue_specification.max_len_bytes
if queue_specification.message_ttl is not None:
args["x-message-ttl"] = queue_specification.message_ttl
if queue_specification.expires is not None:
args["x-expires"] = queue_specification.expires
args["x-message-ttl"] = int(
queue_specification.message_ttl.total_seconds() * 1000
)
if queue_specification.auto_expires is not None:
args["x-expires"] = int(
queue_specification.auto_expires.total_seconds() * 1000
)
if queue_specification.single_active_consumer is not None:
args["x-single-active-consumer"] = (
queue_specification.single_active_consumer
)

if isinstance(queue_specification, ClassicQueueSpecification):
body["auto_delete"] = queue_specification.is_auto_delete
body["durable"] = queue_specification.is_durable
body["exclusive"] = queue_specification.is_exclusive

args["x-queue-type"] = QueueType.classic.value
if queue_specification.maximum_priority is not None:
args["x-maximum-priority"] = queue_specification.maximum_priority
if queue_specification.max_priority is not None:
args["x-max-priority"] = queue_specification.max_priority

if isinstance(queue_specification, QuorumQueueSpecification):
args["x-queue-type"] = QueueType.quorum.value
Expand All @@ -203,12 +208,17 @@ def _declare_queue(
)

if queue_specification.quorum_initial_group_size is not None:
args["x-initial-quorum-group-size"] = (
args["x-quorum-initial-group-size"] = (
queue_specification.quorum_initial_group_size
)

if queue_specification.cluster_target_size is not None:
args["cluster_target_size"] = queue_specification.cluster_target_size
if queue_specification.cluster_target_group_size is not None:
args["x-quorum-target-group-size"] = (
queue_specification.cluster_target_group_size
)

if queue_specification.leader_locator is not None:
args["x-queue-leader-locator"] = queue_specification.leader_locator

body["arguments"] = args # type: ignore

Expand All @@ -226,22 +236,26 @@ def _declare_stream(
if stream_specification.max_len_bytes is not None:
args["x-max-length-bytes"] = stream_specification.max_len_bytes

if stream_specification.max_time_retention is not None:
args["x-max-time-retention"] = stream_specification.max_time_retention
if stream_specification.max_age is not None:
args["x-max-age"] = (
str(int(stream_specification.max_age.total_seconds())) + "s"
)

if stream_specification.max_segment_size_in_bytes is not None:
args["x-max-segment-size-in-bytes"] = (
stream_specification.max_segment_size_in_bytes
if stream_specification.stream_max_segment_size_bytes is not None:
args["x-stream-max-segment-size-bytes"] = (
stream_specification.stream_max_segment_size_bytes
)

if stream_specification.filter_size is not None:
args["x-filter-size"] = stream_specification.filter_size
if stream_specification.stream_filter_size_bytes is not None:
args["x-stream-filter-size-bytes"] = (
stream_specification.stream_filter_size_bytes
)

if stream_specification.initial_group_size is not None:
args["x-initial-group-size"] = stream_specification.initial_group_size

if stream_specification.leader_locator is not None:
args["x-leader-locator"] = stream_specification.leader_locator
args["x-queue-leader-locator"] = stream_specification.leader_locator

body["arguments"] = args

Expand Down Expand Up @@ -276,7 +290,6 @@ def delete_queue(self, name: str) -> None:
def _validate_reponse_code(
self, response_code: int, expected_response_codes: list[int]
) -> None:
logger.debug("response_code received: " + str(response_code))
if response_code == CommonValues.response_code_409.value:
raise ValidationCodeException("ErrPreconditionFailed")

Expand Down
22 changes: 12 additions & 10 deletions rabbitmq_amqp_python_client/queues.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,44 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional


@dataclass
class QueueSpecification:
name: str
expires: Optional[int] = None
message_ttl: Optional[int] = None
overflow: Optional[str] = None
auto_expires: Optional[timedelta] = None
message_ttl: Optional[timedelta] = None
overflow_behaviour: Optional[str] = None
single_active_consumer: Optional[bool] = None
dead_letter_exchange: Optional[str] = None
dead_letter_routing_key: Optional[str] = None
max_len: Optional[int] = None
max_len_bytes: Optional[int] = None
leader_locator: Optional[str] = None
is_auto_delete: bool = False
is_durable: bool = True


@dataclass
class ClassicQueueSpecification(QueueSpecification):
maximum_priority: Optional[int] = None
max_priority: Optional[int] = None
is_auto_delete: bool = False
is_exclusive: bool = False
is_durable: bool = True


@dataclass
class QuorumQueueSpecification(QueueSpecification):
deliver_limit: Optional[int] = None
dead_letter_strategy: Optional[str] = None
quorum_initial_group_size: Optional[int] = None
cluster_target_size: Optional[int] = None
cluster_target_group_size: Optional[int] = None


@dataclass
class StreamSpecification:
name: str
max_len_bytes: Optional[int] = None
max_time_retention: Optional[int] = None
max_segment_size_in_bytes: Optional[int] = None
filter_size: Optional[int] = None
max_age: Optional[timedelta] = None
stream_max_segment_size_bytes: Optional[int] = None
stream_filter_size_bytes: Optional[int] = None
initial_group_size: Optional[int] = None
leader_locator: Optional[str] = None
Loading