Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def main() -> None:
message_handler=MyMessageHandler(),
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_consumer_options=StreamConsumerOptions(
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def main() -> None:
message_handler=MyMessageHandler(),
# the consumer will only receive messages with filter value banana and subject yellow
# and application property from = italy
stream_consumer_options=StreamConsumerOptions(
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
values=["banana"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def main() -> None:
consumer = consumer_connection.consumer(
addr_queue,
message_handler=MyMessageHandler(),
stream_consumer_options=StreamConsumerOptions(
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(sql=sql),
),
Expand Down
2 changes: 2 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .connection import Connection
from .consumer import Consumer
from .entities import (
ConsumerOptions,
ExchangeCustomSpecification,
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
Expand Down Expand Up @@ -89,6 +90,7 @@
"ConnectionClosed",
"StreamConsumerOptions",
"StreamFilterOptions",
"ConsumerOptions",
"MessageProperties",
"OffsetSpecification",
"OutcomeState",
Expand Down
31 changes: 22 additions & 9 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from .address_helper import validate_address
from .consumer import Consumer
from .entities import (
ConsumerOptions,
OAuth2Options,
RecoveryConfiguration,
StreamConsumerOptions,
)
from .exceptions import (
ArgumentOutOfRangeException,
Expand Down Expand Up @@ -211,15 +211,15 @@ def _validate_server_properties(self) -> None:

logger.debug(f"Connected to RabbitMQ server version {server_version}")

def _is_server_version_gte_4_2_0(self) -> bool:
def _is_server_version_gte(self, target_version: str) -> bool:
"""
Check if the server version is greater than or equal to 4.2.0.
Check if the server version is greater than or equal to version.

This is an internal method that can be used to conditionally enable
features that require RabbitMQ 4.2.0 or higher.
features that require RabbitMQ version or higher.

Returns:
bool: True if server version >= 4.2.0, False otherwise
bool: True if server version >= version, False otherwise

Raises:
ValidationCodeException: If connection is not established or
Expand All @@ -237,7 +237,12 @@ def _is_server_version_gte_4_2_0(self) -> bool:
raise ValidationCodeException("Server version not provided")

try:
return version.parse(str(server_version)) >= version.parse("4.2.0")
srv = version.parse(str(server_version))
trg = version.parse(target_version)
# compare the version even if it contains pre-release or build metadata
return (
version.parse("{}.{}.{}".format(srv.major, srv.minor, srv.micro)) >= trg
)
except Exception as e:
raise ValidationCodeException(
f"Failed to parse server version '{server_version}': {e}"
Expand Down Expand Up @@ -376,7 +381,7 @@ def consumer(
self,
destination: str,
message_handler: Optional[MessagingHandler] = None,
stream_consumer_options: Optional[StreamConsumerOptions] = None,
consumer_options: Optional[ConsumerOptions] = None,
credit: Optional[int] = None,
) -> Consumer:
"""
Expand All @@ -385,7 +390,7 @@ def consumer(
Args:
destination: The address to consume from
message_handler: Optional handler for processing messages
stream_consumer_options: Optional configuration for stream consumption
consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options.
credit: Optional credit value for flow control

Returns:
Expand All @@ -398,8 +403,16 @@ def consumer(
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
if consumer_options is not None:
consumer_options.validate(
{
"4.0.0": self._is_server_version_gte("4.0.0"),
"4.1.0": self._is_server_version_gte("4.1.0"),
"4.2.0": self._is_server_version_gte("4.2.0"),
}
)
consumer = Consumer(
self._conn, destination, message_handler, stream_consumer_options, credit
self._conn, destination, message_handler, consumer_options, credit
)
self._consumers.append(consumer)
return consumer
Expand Down
4 changes: 2 additions & 2 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Literal, Optional, Union, cast

from .amqp_consumer_handler import AMQPMessagingHandler
from .entities import StreamConsumerOptions
from .entities import ConsumerOptions
from .options import (
ReceiverOptionUnsettled,
ReceiverOptionUnsettledWithFilters,
Expand Down Expand Up @@ -38,7 +38,7 @@ def __init__(
conn: BlockingConnection,
addr: str,
handler: Optional[AMQPMessagingHandler] = None,
stream_options: Optional[StreamConsumerOptions] = None,
stream_options: Optional[ConsumerOptions] = None,
credit: Optional[int] = None,
):
"""
Expand Down
47 changes: 45 additions & 2 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ class ExchangeToExchangeBindingSpecification:
binding_key: Optional[str] = None


class ConsumerOptions:
def validate(self, versions: Dict[str, bool]) -> None:
raise NotImplementedError("Subclasses should implement this method")

def filter_set(self) -> Dict[symbol, Described]:
raise NotImplementedError("Subclasses should implement this method")


@dataclass
class MessageProperties:
"""
Expand Down Expand Up @@ -215,7 +223,7 @@ def __init__(
self.sql = sql


class StreamConsumerOptions:
class StreamConsumerOptions(ConsumerOptions):
"""
Configuration options for stream queues.

Expand All @@ -237,6 +245,7 @@ def __init__(
):

self._filter_set: Dict[symbol, Described] = {}
self._filter_option = filter_options

if offset_specification is None and filter_options is None:
raise ValidationCodeException(
Expand Down Expand Up @@ -329,7 +338,6 @@ def _filter_message_properties(
def _filter_application_properties(
self, application_properties: Optional[dict[str, Any]]
) -> None:
app_prop = {}
if application_properties is not None:
app_prop = application_properties.copy()
Comment on lines 341 to 342
Copy link

Copilot AI Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable app_prop is declared but never used in this method. The original code at line 340 that was removed likely used this variable. Either use app_prop in the method or remove the unnecessary assignment.

Copilot uses AI. Check for mistakes.

Expand All @@ -356,6 +364,41 @@ def filter_set(self) -> Dict[symbol, Described]:
"""
return self._filter_set

def validate(self, versions: Dict[str, bool]) -> None:
"""
Validates stream filter options against supported RabbitMQ server versions.

Args:
versions: Dictionary mapping version strings to boolean indicating support.

Raises:
ValidationCodeException: If a filter option requires a higher RabbitMQ version.
"""
if self._filter_option is None:
return
if self._filter_option.values and not versions.get("4.1.0", False):
raise ValidationCodeException(
"Stream filter by values requires RabbitMQ 4.1.0 or higher"
)
if self._filter_option.match_unfiltered and not versions.get("4.1.0", False):
raise ValidationCodeException(
"Stream filter by match_unfiltered requires RabbitMQ 4.1.0 or higher"
)
if self._filter_option.sql and not versions.get("4.2.0", False):
raise ValidationCodeException(
"Stream filter by SQL requires RabbitMQ 4.2.0 or higher"
)
if self._filter_option.message_properties and not versions.get("4.1.0", False):
raise ValidationCodeException(
"Stream filter by message_properties requires RabbitMQ 4.1.0 or higher"
)
if self._filter_option.application_properties and not versions.get(
"4.1.0", False
):
raise ValidationCodeException(
"Stream filter by application_properties requires RabbitMQ 4.1.0 or higher"
)


@dataclass
class RecoveryConfiguration:
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/options.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .entities import StreamConsumerOptions
from .entities import ConsumerOptions
from .qpid.proton._data import ( # noqa: E402
PropertyDict,
symbol,
Expand Down Expand Up @@ -68,8 +68,8 @@ def test(self, link: Link) -> bool:


class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore
def __init__(self, addr: str, filter_options: StreamConsumerOptions):
super().__init__(filter_options.filter_set())
def __init__(self, addr: str, consumer_options: ConsumerOptions):
super().__init__(consumer_options.filter_set())
self._addr = addr

def apply(self, link: Link) -> None:
Expand Down
25 changes: 14 additions & 11 deletions tests/test_server_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_is_server_version_gte_4_2_0_exact_version(self):
mock_blocking_conn.conn = mock_proton_conn
self.connection._conn = mock_blocking_conn

result = self.connection._is_server_version_gte_4_2_0()
result = self.connection._is_server_version_gte("4.2.0")
assert result is True

def test_is_server_version_gte_4_2_0_higher_versions(self):
Expand All @@ -322,7 +322,7 @@ def test_is_server_version_gte_4_2_0_higher_versions(self):
mock_blocking_conn.conn = mock_proton_conn
self.connection._conn = mock_blocking_conn

result = self.connection._is_server_version_gte_4_2_0()
result = self.connection._is_server_version_gte("4.2.0")
assert result is True, f"Version {version_str} should return True"

def test_is_server_version_gte_4_2_0_lower_versions(self):
Expand All @@ -338,15 +338,15 @@ def test_is_server_version_gte_4_2_0_lower_versions(self):
mock_blocking_conn.conn = mock_proton_conn
self.connection._conn = mock_blocking_conn

result = self.connection._is_server_version_gte_4_2_0()
result = self.connection._is_server_version_gte("4.2.0")
assert result is False, f"Version {version_str} should return False"

def test_is_server_version_gte_4_2_0_no_connection(self):
"""Test when connection is None."""
self.connection._conn = None

with pytest.raises(ValidationCodeException) as exc_info:
self.connection._is_server_version_gte_4_2_0()
self.connection._is_server_version_gte("4.2.0")

assert "Connection not established" in str(exc_info.value)

Expand All @@ -357,7 +357,7 @@ def test_is_server_version_gte_4_2_0_no_proton_connection(self):
self.connection._conn = mock_blocking_conn

with pytest.raises(ValidationCodeException) as exc_info:
self.connection._is_server_version_gte_4_2_0()
self.connection._is_server_version_gte("4.2.0")

assert "Connection not established" in str(exc_info.value)

Expand All @@ -370,7 +370,7 @@ def test_is_server_version_gte_4_2_0_no_remote_properties(self):
self.connection._conn = mock_blocking_conn

with pytest.raises(ValidationCodeException) as exc_info:
self.connection._is_server_version_gte_4_2_0()
self.connection._is_server_version_gte("4.2.0")

assert "No remote properties received from server" in str(exc_info.value)

Expand All @@ -388,7 +388,7 @@ def test_is_server_version_gte_4_2_0_missing_version(self):
self.connection._conn = mock_blocking_conn

with pytest.raises(ValidationCodeException) as exc_info:
self.connection._is_server_version_gte_4_2_0()
self.connection._is_server_version_gte("4.2.0")

assert "Server version not provided" in str(exc_info.value)

Expand All @@ -406,7 +406,7 @@ def test_is_server_version_gte_4_2_0_invalid_version_format(self):
self.connection._conn = mock_blocking_conn

with pytest.raises(ValidationCodeException) as exc_info:
self.connection._is_server_version_gte_4_2_0()
self.connection._is_server_version_gte("4.2.0")

error_msg = str(exc_info.value)
assert "Failed to parse server version" in error_msg
Expand All @@ -419,7 +419,10 @@ def test_is_server_version_gte_4_2_0_edge_cases(self):
("4.2.0", True), # Exact match
("4.2.0.0", True), # With extra zeroes
("v4.2.0", True), # With v prefix
("4.2.0-rc1", False), # Pre-release should be less than 4.2.0
(
"4.2.0-rc1",
True,
), # Pre-release should be less than 4.2.0 but accepted it equal
]

for version_str, expected in test_cases:
Expand All @@ -433,12 +436,12 @@ def test_is_server_version_gte_4_2_0_edge_cases(self):

if version_str == "4.2.0-rc1":
# Pre-release versions should be handled correctly
result = self.connection._is_server_version_gte_4_2_0()
result = self.connection._is_server_version_gte("4.2.0")
assert (
result == expected
), f"Version {version_str} should return {expected}"
else:
result = self.connection._is_server_version_gte_4_2_0()
result = self.connection._is_server_version_gte("4.2.0")
assert (
result == expected
), f"Version {version_str} should return {expected}"
Loading