From b0e264c8e4ce472dc5ecd6b9e6c13fd302c5b4da Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 15 Sep 2025 16:52:18 +0200 Subject: [PATCH 1/3] refactor consumer option add an abstract class to define the consumer options. Each Queue type can define the consumer options type: Signed-off-by: Gabriele Santomaggio --- examples/streams/example_with_streams.py | 2 +- .../example_streams_with_filters.py | 2 +- .../example_streams_with_sql_filters.py | 2 +- rabbitmq_amqp_python_client/__init__.py | 2 + rabbitmq_amqp_python_client/connection.py | 31 +++++++--- rabbitmq_amqp_python_client/consumer.py | 4 +- rabbitmq_amqp_python_client/entities.py | 47 ++++++++++++++- rabbitmq_amqp_python_client/options.py | 6 +- tests/test_server_validation.py | 25 ++++---- tests/test_streams.py | 58 ++++++++++++++----- 10 files changed, 136 insertions(+), 43 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 207af6a..91bcc7f 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -104,7 +104,7 @@ def main() -> None: message_handler=MyMessageHandler(), # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first ), ) diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index 1365866..c05585b 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -91,7 +91,7 @@ def main() -> None: message_handler=MyMessageHandler(), # the consumer will only receive messages with filter value banana and subject yellow # and application property from = italy - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first, filter_options=StreamFilterOptions( values=["banana"], diff --git a/examples/streams_with_sql_filters/example_streams_with_sql_filters.py b/examples/streams_with_sql_filters/example_streams_with_sql_filters.py index 435b8f9..de1d105 100644 --- a/examples/streams_with_sql_filters/example_streams_with_sql_filters.py +++ b/examples/streams_with_sql_filters/example_streams_with_sql_filters.py @@ -88,7 +88,7 @@ def main() -> None: consumer = consumer_connection.consumer( addr_queue, message_handler=MyMessageHandler(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first, filter_options=StreamFilterOptions(sql=sql), ), diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 5dc2c02..e787f23 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -6,6 +6,7 @@ from .connection import Connection from .consumer import Consumer from .entities import ( + ConsumerOptions, ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, @@ -89,6 +90,7 @@ "ConnectionClosed", "StreamConsumerOptions", "StreamFilterOptions", + "ConsumerOptions", "MessageProperties", "OffsetSpecification", "OutcomeState", diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index b08843a..e3fe076 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -16,9 +16,9 @@ from .address_helper import validate_address from .consumer import Consumer from .entities import ( + ConsumerOptions, OAuth2Options, RecoveryConfiguration, - StreamConsumerOptions, ) from .exceptions import ( ArgumentOutOfRangeException, @@ -211,15 +211,15 @@ def _validate_server_properties(self) -> None: logger.debug(f"Connected to RabbitMQ server version {server_version}") - def _is_server_version_gte_4_2_0(self) -> bool: + def _is_server_version_gte(self, target_version: str) -> bool: """ - Check if the server version is greater than or equal to 4.2.0. + Check if the server version is greater than or equal to version. This is an internal method that can be used to conditionally enable - features that require RabbitMQ 4.2.0 or higher. + features that require RabbitMQ version or higher. Returns: - bool: True if server version >= 4.2.0, False otherwise + bool: True if server version >= version, False otherwise Raises: ValidationCodeException: If connection is not established or @@ -237,7 +237,12 @@ def _is_server_version_gte_4_2_0(self) -> bool: raise ValidationCodeException("Server version not provided") try: - return version.parse(str(server_version)) >= version.parse("4.2.0") + srv = version.parse(str(server_version)) + trg = version.parse(target_version) + # compare the version even if it contains pre-release or build metadata + return ( + version.parse("{}.{}.{}".format(srv.major, srv.minor, srv.micro)) >= trg + ) except Exception as e: raise ValidationCodeException( f"Failed to parse server version '{server_version}': {e}" @@ -376,7 +381,7 @@ def consumer( self, destination: str, message_handler: Optional[MessagingHandler] = None, - stream_consumer_options: Optional[StreamConsumerOptions] = None, + consumer_options: Optional[ConsumerOptions] = None, credit: Optional[int] = None, ) -> Consumer: """ @@ -385,7 +390,7 @@ def consumer( Args: destination: The address to consume from message_handler: Optional handler for processing messages - stream_consumer_options: Optional configuration for stream consumption + consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options.co credit: Optional credit value for flow control Returns: @@ -398,8 +403,16 @@ def consumer( raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) + if consumer_options is not None: + consumer_options.validate( + { + "4.0.0": self._is_server_version_gte("4.0.0"), + "4.1.0": self._is_server_version_gte("4.1.0"), + "4.2.0": self._is_server_version_gte("4.2.0"), + } + ) consumer = Consumer( - self._conn, destination, message_handler, stream_consumer_options, credit + self._conn, destination, message_handler, consumer_options, credit ) self._consumers.append(consumer) return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index e8b2791..d57782e 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -2,7 +2,7 @@ from typing import Literal, Optional, Union, cast from .amqp_consumer_handler import AMQPMessagingHandler -from .entities import StreamConsumerOptions +from .entities import ConsumerOptions from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, @@ -38,7 +38,7 @@ def __init__( conn: BlockingConnection, addr: str, handler: Optional[AMQPMessagingHandler] = None, - stream_options: Optional[StreamConsumerOptions] = None, + stream_options: Optional[ConsumerOptions] = None, credit: Optional[int] = None, ): """ diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 54491ba..df7b933 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -153,6 +153,14 @@ class ExchangeToExchangeBindingSpecification: binding_key: Optional[str] = None +class ConsumerOptions: + def validate(self, versions: Dict[str, bool]) -> None: + raise NotImplementedError("Subclasses should implement this method") + + def filter_set(self) -> Dict[symbol, Described]: + raise NotImplementedError("Subclasses should implement this method") + + @dataclass class MessageProperties: """ @@ -215,7 +223,7 @@ def __init__( self.sql = sql -class StreamConsumerOptions: +class StreamConsumerOptions(ConsumerOptions): """ Configuration options for stream queues. @@ -237,6 +245,7 @@ def __init__( ): self._filter_set: Dict[symbol, Described] = {} + self._filter_option = filter_options if offset_specification is None and filter_options is None: raise ValidationCodeException( @@ -329,7 +338,6 @@ def _filter_message_properties( def _filter_application_properties( self, application_properties: Optional[dict[str, Any]] ) -> None: - app_prop = {} if application_properties is not None: app_prop = application_properties.copy() @@ -356,6 +364,41 @@ def filter_set(self) -> Dict[symbol, Described]: """ return self._filter_set + def validate(self, versions: Dict[str, bool]) -> None: + """ + Validates stream filter options against supported RabbitMQ server versions. + + Args: + versions: Dictionary mapping version strings to boolean indicating support. + + Raises: + ValidationCodeException: If a filter option requires a higher RabbitMQ version. + """ + if self._filter_option is None: + return + if self._filter_option.values and not versions.get("4.1.0", False): + raise ValidationCodeException( + "Stream filter by values requires RabbitMQ 4.1.0 or higher" + ) + if self._filter_option.match_unfiltered and not versions.get("4.1.0", False): + raise ValidationCodeException( + "Stream filter by match_unfiltered requires RabbitMQ 4.1.0 or higher" + ) + if self._filter_option.sql and not versions.get("4.2.0", False): + raise ValidationCodeException( + "Stream filter by SQL requires RabbitMQ 4.2.0 or higher" + ) + if self._filter_option.message_properties and not versions.get("4.1.0", False): + raise ValidationCodeException( + "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" + ) + if self._filter_option.application_properties and not versions.get( + "4.1.0", False + ): + raise ValidationCodeException( + "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" + ) + @dataclass class RecoveryConfiguration: diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index d6ccba2..9d49ebe 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,4 +1,4 @@ -from .entities import StreamConsumerOptions +from .entities import ConsumerOptions from .qpid.proton._data import ( # noqa: E402 PropertyDict, symbol, @@ -68,8 +68,8 @@ def test(self, link: Link) -> bool: class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore - def __init__(self, addr: str, filter_options: StreamConsumerOptions): - super().__init__(filter_options.filter_set()) + def __init__(self, addr: str, consumer_options: ConsumerOptions): + super().__init__(consumer_options.filter_set()) self._addr = addr def apply(self, link: Link) -> None: diff --git a/tests/test_server_validation.py b/tests/test_server_validation.py index e6c9d58..a075cd8 100644 --- a/tests/test_server_validation.py +++ b/tests/test_server_validation.py @@ -306,7 +306,7 @@ def test_is_server_version_gte_4_2_0_exact_version(self): mock_blocking_conn.conn = mock_proton_conn self.connection._conn = mock_blocking_conn - result = self.connection._is_server_version_gte_4_2_0() + result = self.connection._is_server_version_gte("4.2.0") assert result is True def test_is_server_version_gte_4_2_0_higher_versions(self): @@ -322,7 +322,7 @@ def test_is_server_version_gte_4_2_0_higher_versions(self): mock_blocking_conn.conn = mock_proton_conn self.connection._conn = mock_blocking_conn - result = self.connection._is_server_version_gte_4_2_0() + result = self.connection._is_server_version_gte("4.2.0") assert result is True, f"Version {version_str} should return True" def test_is_server_version_gte_4_2_0_lower_versions(self): @@ -338,7 +338,7 @@ def test_is_server_version_gte_4_2_0_lower_versions(self): mock_blocking_conn.conn = mock_proton_conn self.connection._conn = mock_blocking_conn - result = self.connection._is_server_version_gte_4_2_0() + result = self.connection._is_server_version_gte("4.2.0") assert result is False, f"Version {version_str} should return False" def test_is_server_version_gte_4_2_0_no_connection(self): @@ -346,7 +346,7 @@ def test_is_server_version_gte_4_2_0_no_connection(self): self.connection._conn = None with pytest.raises(ValidationCodeException) as exc_info: - self.connection._is_server_version_gte_4_2_0() + self.connection._is_server_version_gte("4.2.0") assert "Connection not established" in str(exc_info.value) @@ -357,7 +357,7 @@ def test_is_server_version_gte_4_2_0_no_proton_connection(self): self.connection._conn = mock_blocking_conn with pytest.raises(ValidationCodeException) as exc_info: - self.connection._is_server_version_gte_4_2_0() + self.connection._is_server_version_gte("4.2.0") assert "Connection not established" in str(exc_info.value) @@ -370,7 +370,7 @@ def test_is_server_version_gte_4_2_0_no_remote_properties(self): self.connection._conn = mock_blocking_conn with pytest.raises(ValidationCodeException) as exc_info: - self.connection._is_server_version_gte_4_2_0() + self.connection._is_server_version_gte("4.2.0") assert "No remote properties received from server" in str(exc_info.value) @@ -388,7 +388,7 @@ def test_is_server_version_gte_4_2_0_missing_version(self): self.connection._conn = mock_blocking_conn with pytest.raises(ValidationCodeException) as exc_info: - self.connection._is_server_version_gte_4_2_0() + self.connection._is_server_version_gte("4.2.0") assert "Server version not provided" in str(exc_info.value) @@ -406,7 +406,7 @@ def test_is_server_version_gte_4_2_0_invalid_version_format(self): self.connection._conn = mock_blocking_conn with pytest.raises(ValidationCodeException) as exc_info: - self.connection._is_server_version_gte_4_2_0() + self.connection._is_server_version_gte("4.2.0") error_msg = str(exc_info.value) assert "Failed to parse server version" in error_msg @@ -419,7 +419,10 @@ def test_is_server_version_gte_4_2_0_edge_cases(self): ("4.2.0", True), # Exact match ("4.2.0.0", True), # With extra zeroes ("v4.2.0", True), # With v prefix - ("4.2.0-rc1", False), # Pre-release should be less than 4.2.0 + ( + "4.2.0-rc1", + True, + ), # Pre-release should be less than 4.2.0 but accepted it equal ] for version_str, expected in test_cases: @@ -433,12 +436,12 @@ def test_is_server_version_gte_4_2_0_edge_cases(self): if version_str == "4.2.0-rc1": # Pre-release versions should be handled correctly - result = self.connection._is_server_version_gte_4_2_0() + result = self.connection._is_server_version_gte("4.2.0") assert ( result == expected ), f"Version {version_str} should return {expected}" else: - result = self.connection._is_server_version_gte_4_2_0() + result = self.connection._is_server_version_gte("4.2.0") assert ( result == expected ), f"Version {version_str} should return {expected}" diff --git a/tests/test_streams.py b/tests/test_streams.py index 2e503fb..23b349b 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -9,6 +9,7 @@ OffsetSpecification, StreamConsumerOptions, StreamSpecification, + ValidationCodeException, ) from rabbitmq_amqp_python_client.entities import ( MessageProperties, @@ -81,7 +82,7 @@ def test_stream_read_from_last( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.last ), ) @@ -119,7 +120,7 @@ def test_stream_read_from_offset_zero( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_consumer_options=StreamConsumerOptions(offset_specification=0), + consumer_options=StreamConsumerOptions(offset_specification=0), ) consumer.run() @@ -155,7 +156,7 @@ def test_stream_read_from_offset_first( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_consumer_options=StreamConsumerOptions(OffsetSpecification.first), + consumer_options=StreamConsumerOptions(OffsetSpecification.first), ) consumer.run() @@ -191,7 +192,7 @@ def test_stream_read_from_offset_ten( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_consumer_options=StreamConsumerOptions(offset_specification=10), + consumer_options=StreamConsumerOptions(offset_specification=10), ) consumer.run() @@ -225,7 +226,7 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions(values=["banana"]) ), ) @@ -263,7 +264,7 @@ def test_stream_filtering_mixed( addr_queue, # check we are reading just from offset 10 as just banana filtering applies message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions(values=["banana"]) ), ) @@ -301,7 +302,7 @@ def test_stream_filtering_not_present( consumer = connection_consumer.consumer( addr_queue, - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions(values=["apple"]) ), ) @@ -343,7 +344,7 @@ def test_stream_match_unfiltered( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( values=["banana"], match_unfiltered=True ) @@ -383,7 +384,7 @@ def test_stream_reconnection( addr_queue, # disconnection and check happens here message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( values=["banana"], match_unfiltered=True ) @@ -436,7 +437,7 @@ def test_stream_filter_message_properties( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerMessagePropertiesFilter(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( message_properties=MessageProperties( subject="important_15", group_id="group_15" @@ -499,7 +500,7 @@ def test_stream_filter_application_properties( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerApplicationPropertiesFilter(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( application_properties={"key": "value_17"}, ) @@ -567,7 +568,7 @@ def test_stream_filter_sql(connection: Connection, environment: Environment) -> consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerSQLFilter(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions(sql=sql) ), ) @@ -639,7 +640,7 @@ def test_stream_filter_mixing_different( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerMixingDifferentFilters(), - stream_consumer_options=StreamConsumerOptions( + consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( values=["the_value_filter"], application_properties={"key": "app_value_9999"}, @@ -674,3 +675,34 @@ def test_stream_filter_mixing_different( if consumer is not None: consumer.close() management.delete_queue(stream_name) + + +def test_consumer_options_validation() -> None: + try: + x = StreamConsumerOptions(filter_options=StreamFilterOptions(sql="test")) + x.validate({"4.0.0": True, "4.1.0": False, "4.2.0": False}) + assert False + except ValidationCodeException: + assert True + + try: + x = StreamConsumerOptions( + filter_options=StreamFilterOptions( + message_properties=MessageProperties(subject="important_9999") + ) + ) + x.validate({"4.0.0": True, "4.1.0": True, "4.2.0": False}) + assert True + except ValidationCodeException: + assert False + + try: + x = StreamConsumerOptions( + filter_options=StreamFilterOptions( + application_properties={"key": "app_value_9999"} + ) + ) + x.validate({"4.0.0": True, "4.1.0": True, "4.2.0": False}) + assert True + except ValidationCodeException: + assert False From d7f9f91f27b3727dff9d57e38e6013838c8ace72 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 15 Sep 2025 16:56:06 +0200 Subject: [PATCH 2/3] Update rabbitmq_amqp_python_client/entities.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/entities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index df7b933..8b82e4f 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -390,13 +390,13 @@ def validate(self, versions: Dict[str, bool]) -> None: ) if self._filter_option.message_properties and not versions.get("4.1.0", False): raise ValidationCodeException( - "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" + "Stream filter by message_properties requires RabbitMQ 4.1.0 or higher" ) if self._filter_option.application_properties and not versions.get( "4.1.0", False ): raise ValidationCodeException( - "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" + "Stream filter by application_properties requires RabbitMQ 4.1.0 or higher" ) From ac6e30cbe7a3dc0aeb51c9f0af178b0e10c4fbaa Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 15 Sep 2025 16:56:20 +0200 Subject: [PATCH 3/3] Update rabbitmq_amqp_python_client/connection.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index e3fe076..e5f347d 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -390,7 +390,7 @@ def consumer( Args: destination: The address to consume from message_handler: Optional handler for processing messages - consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options.co + consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options. credit: Optional credit value for flow control Returns: