From 853e20e9bd5f6c7381894de08af70ebf0918c41d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 10 Sep 2025 15:24:17 +0200 Subject: [PATCH 1/6] refactor the stream consumer options and the stream filter options. This implementation is similar to the golang implementation. So the idea is to have the same API ( as much as possible ) for all the amqp 1.0. clients. Add a new class StreamFilterOptions to configure only the filtering and leave the StreamConsumerOptions for offset and other future implementations Signed-off-by: Gabriele Santomaggio --- examples/streams/example_with_streams.py | 6 +-- rabbitmq_amqp_python_client/__init__.py | 4 +- rabbitmq_amqp_python_client/connection.py | 12 ++--- rabbitmq_amqp_python_client/consumer.py | 6 +-- rabbitmq_amqp_python_client/entities.py | 44 +++++++++++++---- rabbitmq_amqp_python_client/options.py | 4 +- .../qpid/proton/_message.py | 21 ++++---- tests/test_streams.py | 48 ++++++++++--------- 8 files changed, 88 insertions(+), 57 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index dc031ca..60ccd6e 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -10,7 +10,7 @@ Event, Message, OffsetSpecification, - StreamOptions, + StreamConsumerOptions, StreamSpecification, ) @@ -104,9 +104,7 @@ def main() -> None: message_handler=MyMessageHandler(), # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_filter_options=StreamOptions( - offset_specification=OffsetSpecification.first, filters=["banana"] - ), + stream_consumer_options=StreamConsumerOptions(offset_specification=OffsetSpecification.first), ) print( "create a consumer and consume the test message - press control + c to terminate to consume" diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 487e878..0ebd3e7 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -13,7 +13,7 @@ OAuth2Options, OffsetSpecification, RecoveryConfiguration, - StreamOptions, + StreamConsumerOptions, ) from .environment import Environment from .exceptions import ( @@ -85,7 +85,7 @@ "CurrentUserStore", "PKCS12Store", "ConnectionClosed", - "StreamOptions", + "StreamConsumerOptions", "OffsetSpecification", "OutcomeState", "Environment", diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 9672801..b08843a 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -18,7 +18,7 @@ from .entities import ( OAuth2Options, RecoveryConfiguration, - StreamOptions, + StreamConsumerOptions, ) from .exceptions import ( ArgumentOutOfRangeException, @@ -363,7 +363,7 @@ def publisher(self, destination: str = "") -> Publisher: ArgumentOutOfRangeException: If destination address format is invalid """ if destination != "": - if validate_address(destination) is False: + if not validate_address(destination): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) @@ -376,7 +376,7 @@ def consumer( self, destination: str, message_handler: Optional[MessagingHandler] = None, - stream_filter_options: Optional[StreamOptions] = None, + stream_consumer_options: Optional[StreamConsumerOptions] = None, credit: Optional[int] = None, ) -> Consumer: """ @@ -385,7 +385,7 @@ def consumer( Args: destination: The address to consume from message_handler: Optional handler for processing messages - stream_filter_options: Optional configuration for stream consumption + stream_consumer_options: Optional configuration for stream consumption credit: Optional credit value for flow control Returns: @@ -394,12 +394,12 @@ def consumer( Raises: ArgumentOutOfRangeException: If destination address format is invalid """ - if validate_address(destination) is False: + if not validate_address(destination): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) consumer = Consumer( - self._conn, destination, message_handler, stream_filter_options, credit + self._conn, destination, message_handler, stream_consumer_options, credit ) self._consumers.append(consumer) return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 15ef4c9..e8b2791 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -2,7 +2,7 @@ from typing import Literal, Optional, Union, cast from .amqp_consumer_handler import AMQPMessagingHandler -from .entities import StreamOptions +from .entities import StreamConsumerOptions from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, @@ -29,7 +29,7 @@ class Consumer: _conn (BlockingConnection): The underlying connection to RabbitMQ _addr (str): The address to consume from _handler (Optional[MessagingHandler]): Optional message handling callback - _stream_options (Optional[StreamOptions]): Configuration for stream consumption + _stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption _credit (Optional[int]): Flow control credit value """ @@ -38,7 +38,7 @@ def __init__( conn: BlockingConnection, addr: str, handler: Optional[AMQPMessagingHandler] = None, - stream_options: Optional[StreamOptions] = None, + stream_options: Optional[StreamConsumerOptions] = None, credit: Optional[int] = None, ): """ diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index a9c6a65..7215d2c 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -149,7 +149,26 @@ class ExchangeToExchangeBindingSpecification: binding_key: Optional[str] = None -class StreamOptions: +class StreamFilterOptions: + values: Optional[list[str]] = (None,) + match_unfiltered: bool = (False,) + application_properties: dict = (field(default_factory=dict),) + sql: str = "" + + def __init__( + self, + values: Optional[list[str]] = None, + match_unfiltered: bool = False, + application_properties: Optional[dict] = None, + sql: str = "", + ): + self.values = values + self.match_unfiltered = match_unfiltered + self.application_properties = application_properties + self.sql = sql + + +class StreamConsumerOptions: """ Configuration options for stream queues. @@ -167,11 +186,12 @@ class StreamOptions: def __init__( self, offset_specification: Optional[Union[OffsetSpecification, int]] = None, - filters: Optional[list[str]] = None, - filter_match_unfiltered: bool = False, + filter_options: Optional[StreamFilterOptions] = None, ): - if offset_specification is None and filters is None: + self.streamFilterOptions = filter_options + + if offset_specification is None and self.streamFilterOptions is None: raise ValidationCodeException( "At least one between offset_specification and filters must be set when setting up filtering" ) @@ -179,11 +199,17 @@ def __init__( if offset_specification is not None: self._offset(offset_specification) - if filters is not None: - self._filter_values(filters) - - if filter_match_unfiltered is True: - self._filter_match_unfiltered(filter_match_unfiltered) + if ( + self.streamFilterOptions is not None + and self.streamFilterOptions.values is not None + ): + self._filter_values(self.streamFilterOptions.values) + + if ( + self.streamFilterOptions is not None + and self.streamFilterOptions.match_unfiltered + ): + self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 2617dff..d6ccba2 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,4 +1,4 @@ -from .entities import StreamOptions +from .entities import StreamConsumerOptions from .qpid.proton._data import ( # noqa: E402 PropertyDict, symbol, @@ -68,7 +68,7 @@ def test(self, link: Link) -> bool: class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore - def __init__(self, addr: str, filter_options: StreamOptions): + def __init__(self, addr: str, filter_options: StreamConsumerOptions): super().__init__(filter_options.filter_set()) self._addr = addr diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 7968447..00690d2 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -117,7 +117,7 @@ def __init__( self._msg = pn_message() self.instructions = None self.annotations = None - self.properties = None + self.application_properties = None self.body = body self.inferred = inferred @@ -149,7 +149,7 @@ def _check_property_keys(self) -> None: # We cannot make changes to the dict while iterating, so we # must save and make the changes afterwards changed_keys = [] - for k in self.properties.keys(): + for k in self.application_properties.keys(): if isinstance(k, str): # strings and their subclasses if type(k) is symbol or type(k) is char: @@ -169,9 +169,12 @@ def _check_property_keys(self) -> None: ) # Make the key changes for old_key, new_key in changed_keys: - self.properties[new_key] = self.properties.pop(old_key) + self.application_properties[new_key] = self.application_properties.pop( + old_key + ) def _pre_encode(self) -> None: + inst = Data(pn_message_instructions(self._msg)) ann = Data(pn_message_annotations(self._msg)) props = Data(pn_message_properties(self._msg)) @@ -184,9 +187,9 @@ def _pre_encode(self) -> None: if self.annotations is not None: ann.put_object(self.annotations) props.clear() - if self.properties is not None: + if self.application_properties is not None: self._check_property_keys() - props.put_object(self.properties) + props.put_object(self.application_properties) body.clear() if self.body is not None: body.put_object(self.body) @@ -206,9 +209,9 @@ def _post_decode(self) -> None: else: self.annotations = None if props.next(): - self.properties = props.get_object() + self.application_properties = props.get_object() else: - self.properties = None + self.application_properties = None if body.next(): self.body = body.get_object() else: @@ -222,7 +225,7 @@ def clear(self) -> None: pn_message_clear(self._msg) self.instructions = None self.annotations = None - self.properties = None + self.application_properties = None self.body = None @property @@ -641,7 +644,7 @@ def __repr__(self) -> str: "reply_to_group_id", "instructions", "annotations", - "properties", + "application_properties", "body", ): value = getattr(self, attr) diff --git a/tests/test_streams.py b/tests/test_streams.py index 280805d..e7da2a8 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -3,9 +3,12 @@ Connection, Environment, OffsetSpecification, - StreamOptions, + StreamConsumerOptions, StreamSpecification, ) +from rabbitmq_amqp_python_client.entities import ( + StreamFilterOptions, +) from .conftest import ( ConsumerTestException, @@ -18,7 +21,6 @@ def test_stream_read_from_last_default( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_validation" messages_to_send = 10 @@ -52,7 +54,6 @@ def test_stream_read_from_last_default( def test_stream_read_from_last( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_validation" messages_to_send = 10 @@ -72,7 +73,7 @@ def test_stream_read_from_last( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=StreamOptions( + stream_consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.last ), ) @@ -90,7 +91,6 @@ def test_stream_read_from_last( def test_stream_read_from_offset_zero( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_validation" messages_to_send = 10 @@ -112,7 +112,7 @@ def test_stream_read_from_offset_zero( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_filter_options=StreamOptions(offset_specification=0), + stream_consumer_options=StreamConsumerOptions(offset_specification=0), ) consumer.run() @@ -128,7 +128,6 @@ def test_stream_read_from_offset_zero( def test_stream_read_from_offset_first( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_validation" messages_to_send = 10 @@ -150,7 +149,7 @@ def test_stream_read_from_offset_first( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_filter_options=StreamOptions(OffsetSpecification.first), + stream_consumer_options=StreamConsumerOptions(OffsetSpecification.first), ) consumer.run() @@ -166,7 +165,6 @@ def test_stream_read_from_offset_first( def test_stream_read_from_offset_ten( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_validation" messages_to_send = 20 @@ -188,7 +186,7 @@ def test_stream_read_from_offset_ten( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=StreamOptions(offset_specification=10), + stream_consumer_options=StreamConsumerOptions(offset_specification=10), ) consumer.run() @@ -203,7 +201,6 @@ def test_stream_read_from_offset_ten( def test_stream_filtering(connection: Connection, environment: Environment) -> None: - consumer = None stream_name = "test_stream_info_with_filtering" messages_to_send = 10 @@ -224,7 +221,9 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=StreamOptions(filters=["banana"]), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions(values=["banana"]) + ), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -241,7 +240,6 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N def test_stream_filtering_mixed( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_filtering" messages_to_send = 10 @@ -262,7 +260,9 @@ def test_stream_filtering_mixed( addr_queue, # check we are reading just from offset 10 as just banana filtering applies message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=StreamOptions(filters=["banana"]), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions(values=["banana"]) + ), ) # send with annotations filter apple and then banana # consumer will read just from offset 10 @@ -281,7 +281,6 @@ def test_stream_filtering_mixed( def test_stream_filtering_not_present( connection: Connection, environment: Environment ) -> None: - raised = False stream_name = "test_stream_info_with_filtering" messages_to_send = 10 @@ -299,7 +298,10 @@ def test_stream_filtering_not_present( connection_consumer.dial() consumer = connection_consumer.consumer( - addr_queue, stream_filter_options=StreamOptions(filters=["apple"]) + addr_queue, + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions(values=["apple"]) + ), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -320,7 +322,6 @@ def test_stream_filtering_not_present( def test_stream_match_unfiltered( connection: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_filtering" messages_to_send = 10 @@ -340,8 +341,10 @@ def test_stream_match_unfiltered( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=StreamOptions( - filters=["banana"], filter_match_unfiltered=True + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + values=["banana"], match_unfiltered=True + ) ), ) # send with annotations filter banana @@ -359,7 +362,6 @@ def test_stream_match_unfiltered( def test_stream_reconnection( connection_with_reconnect: Connection, environment: Environment ) -> None: - consumer = None stream_name = "test_stream_info_with_filtering" messages_to_send = 10 @@ -380,8 +382,10 @@ def test_stream_reconnection( addr_queue, # disconnection and check happens here message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), - stream_filter_options=StreamOptions( - filters=["banana"], filter_match_unfiltered=True + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + values=["banana"], match_unfiltered=True + ) ), ) # send with annotations filter banana From 65e5c2cb8905934de86f82930e533be2e4c97e11 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 10 Sep 2025 15:55:49 +0200 Subject: [PATCH 2/6] formatting Signed-off-by: Gabriele Santomaggio --- examples/streams/example_with_streams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 60ccd6e..207af6a 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -104,7 +104,9 @@ def main() -> None: message_handler=MyMessageHandler(), # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_consumer_options=StreamConsumerOptions(offset_specification=OffsetSpecification.first), + stream_consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first + ), ) print( "create a consumer and consume the test message - press control + c to terminate to consume" From dad91ebfc4c8e56f2b6ed54f87ba3ebcce29080f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 11 Sep 2025 09:22:43 +0200 Subject: [PATCH 3/6] formatting Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/entities.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 7215d2c..ca636e9 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -150,16 +150,16 @@ class ExchangeToExchangeBindingSpecification: class StreamFilterOptions: - values: Optional[list[str]] = (None,) - match_unfiltered: bool = (False,) - application_properties: dict = (field(default_factory=dict),) + values: Optional[list[str]] = None + match_unfiltered: bool = False + application_properties: Optional[dict[str, Any]] = None sql: str = "" def __init__( self, values: Optional[list[str]] = None, match_unfiltered: bool = False, - application_properties: Optional[dict] = None, + application_properties: Optional[dict[str, Any]] = None, sql: str = "", ): self.values = values From 60b854ff04710db4016e71491222a596ecccc9d7 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 11 Sep 2025 11:41:48 +0200 Subject: [PATCH 4/6] formatting Signed-off-by: Gabriele Santomaggio --- examples/streams/example_with_streams.py | 4 +--- rabbitmq_amqp_python_client/entities.py | 7 +++++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 207af6a..60ccd6e 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -104,9 +104,7 @@ def main() -> None: message_handler=MyMessageHandler(), # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_consumer_options=StreamConsumerOptions( - offset_specification=OffsetSpecification.first - ), + stream_consumer_options=StreamConsumerOptions(offset_specification=OffsetSpecification.first), ) print( "create a consumer and consume the test message - press control + c to terminate to consume" diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index ca636e9..627d979 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -148,7 +148,10 @@ class ExchangeToExchangeBindingSpecification: destination_exchange: str binding_key: Optional[str] = None - +""" + StreamFilterOptions defines the filtering options for a stream consumer. + for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering +""" class StreamFilterOptions: values: Optional[list[str]] = None match_unfiltered: bool = False @@ -180,7 +183,7 @@ class StreamConsumerOptions: Args: offset_specification: Either an OffsetSpecification enum value or an integer offset - filters: List of filter strings to apply to the stream + filter_options: Filter options for the stream consumer. See StreamFilterOptions """ def __init__( From 8ec62b8e2cf6761f9b8b1b1f99ce32dd1fcff5a4 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 11 Sep 2025 11:42:14 +0200 Subject: [PATCH 5/6] formatting Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/entities.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 627d979..9dcf9c0 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -148,10 +148,13 @@ class ExchangeToExchangeBindingSpecification: destination_exchange: str binding_key: Optional[str] = None + """ StreamFilterOptions defines the filtering options for a stream consumer. for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering """ + + class StreamFilterOptions: values: Optional[list[str]] = None match_unfiltered: bool = False From b3845b3fb0dbb6d1a7f532d66effb9e7bfadd298 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 11 Sep 2025 11:42:37 +0200 Subject: [PATCH 6/6] formatting Signed-off-by: Gabriele Santomaggio --- examples/streams/example_with_streams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 60ccd6e..207af6a 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -104,7 +104,9 @@ def main() -> None: message_handler=MyMessageHandler(), # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_consumer_options=StreamConsumerOptions(offset_specification=OffsetSpecification.first), + stream_consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first + ), ) print( "create a consumer and consume the test message - press control + c to terminate to consume"