From 2b7718214bf6b407a9d74c769f4765e58b850a67 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 12 Mar 2025 13:37:11 +0100 Subject: [PATCH 1/2] improving stream options API --- examples/streams/example_with_streams.py | 16 ++++---- rabbitmq_amqp_python_client/entities.py | 32 ++++++++++++++-- tests/test_streams.py | 48 ++++++++---------------- 3 files changed, 50 insertions(+), 46 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 57cc7c0..bfc1e8f 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -13,7 +13,7 @@ StreamSpecification, ) -MESSAGES_TO_PUBLISH = 1 +MESSAGES_TO_PUBLISH = 100 class MyMessageHandler(AMQPMessagingHandler): @@ -87,7 +87,7 @@ def main() -> None: queue_name = "example-queue" print("connection to amqp server") - environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) + environment = Environment("amqp://guest:guest@localhost:5672/") connection = create_connection(environment) management = connection.management() @@ -98,16 +98,14 @@ def main() -> None: consumer_connection = create_connection(environment) - stream_filter_options = StreamOptions() - # can be first, last, next or an offset long - # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_filter_options.offset(OffsetSpecification.first) - stream_filter_options.filter_values(["banana"]) - consumer = consumer_connection.consumer( addr_queue, message_handler=MyMessageHandler(), - stream_filter_options=stream_filter_options, + # can be first, last, next or an offset long + # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered + stream_filter_options=StreamOptions( + offset_specification=OffsetSpecification.first, stream_filters=["banana"] + ), ) print( "create a consumer and consume the test message - press control + c to terminate to consume" diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 9878a6a..ad3000b 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Optional, Union from .common import ExchangeType, QueueType +from .exceptions import ValidationCodeException from .qpid.proton._data import Described, symbol STREAM_FILTER_SPEC = "rabbitmq:stream-filter" @@ -156,12 +157,35 @@ class StreamOptions: Attributes: _filter_set: Dictionary of stream filter specifications + + Args: + offset_specification: Either an OffsetSpecification enum value or + an integer offset + filters: List of filter strings to apply to the stream """ - def __init__(self): # type: ignore + def __init__( + self, + offset_specification: Optional[Union[OffsetSpecification, int]] = None, + stream_filters: Optional[list[str]] = None, + filter_match_unfiltered: bool = False, + ): + + if offset_specification is None and stream_filters is None: + raise ValidationCodeException( + "At least one between offset_specification and filters must be set when setting up filtering" + ) self._filter_set: Dict[symbol, Described] = {} + if offset_specification is not None: + self._offset(offset_specification) + + if stream_filters is not None: + self._filter_values(stream_filters) + + if filter_match_unfiltered is True: + self._filter_match_unfiltered(filter_match_unfiltered) - def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: + def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ Set the offset specification for the stream. @@ -178,7 +202,7 @@ def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: symbol(STREAM_OFFSET_SPEC), offset_specification.name ) - def filter_values(self, filters: list[str]) -> None: + def _filter_values(self, filters: list[str]) -> None: """ Set the filter values for the stream. @@ -189,7 +213,7 @@ def filter_values(self, filters: list[str]) -> None: symbol(STREAM_FILTER_SPEC), filters ) - def filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: + def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: """ Set whether to match unfiltered messages. diff --git a/tests/test_streams.py b/tests/test_streams.py index 890ab94..6c22f4f 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -65,9 +65,6 @@ def test_stream_read_from_last( addr_queue = AddressHelper.queue_address(stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(OffsetSpecification.last) - # consume and then publish try: connection_consumer = environment.connection() @@ -75,7 +72,9 @@ def test_stream_read_from_last( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions( + offset_specification=OffsetSpecification.last + ), ) publish_messages(connection, messages_to_send, stream_name) consumer.run() @@ -107,16 +106,13 @@ def test_stream_read_from_offset_zero( # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(0) - try: connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(offset_specification=0), ) consumer.run() @@ -148,16 +144,13 @@ def test_stream_read_from_offset_first( # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(OffsetSpecification.first) - try: connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(OffsetSpecification.first), ) consumer.run() @@ -189,16 +182,13 @@ def test_stream_read_from_offset_ten( # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(10) - try: connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(offset_specification=10), ) consumer.run() @@ -228,15 +218,13 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(stream_filters=["banana"]), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -268,15 +256,13 @@ def test_stream_filtering_mixed( # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, # check we are reading just from offset 10 as just banana filtering applies message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(stream_filters=["banana"]), ) # send with annotations filter apple and then banana # consumer will read just from offset 10 @@ -309,13 +295,11 @@ def test_stream_filtering_not_present( addr_queue = AddressHelper.queue_address(stream_name) # consume and then publish - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["apple"]) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( - addr_queue, stream_filter_options=stream_filter_options + addr_queue, stream_filter_options=StreamOptions(stream_filters=["apple"]) ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -351,15 +335,14 @@ def test_stream_match_unfiltered( # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) - stream_filter_options.filter_match_unfiltered(True) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions( + stream_filters=["banana"], filter_match_unfiltered=True + ), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name) @@ -391,16 +374,15 @@ def test_stream_reconnection( # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) - stream_filter_options.filter_match_unfiltered(True) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, # disconnection and check happens here message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions( + stream_filters=["banana"], filter_match_unfiltered=True + ), ) # send with annotations filter banana publish_messages(connection_with_reconnect, messages_to_send, stream_name) From c6967d58f202d524e71cbfc2e12d298353a84db5 Mon Sep 17 00:00:00 2001 From: Daniele Date: Thu, 13 Mar 2025 09:44:01 +0100 Subject: [PATCH 2/2] name convention --- examples/streams/example_with_streams.py | 2 +- rabbitmq_amqp_python_client/entities.py | 8 ++++---- tests/test_streams.py | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index bfc1e8f..cb53e73 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -104,7 +104,7 @@ def main() -> None: # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered stream_filter_options=StreamOptions( - offset_specification=OffsetSpecification.first, stream_filters=["banana"] + offset_specification=OffsetSpecification.first, filters=["banana"] ), ) print( diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index ad3000b..0577375 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -167,11 +167,11 @@ class StreamOptions: def __init__( self, offset_specification: Optional[Union[OffsetSpecification, int]] = None, - stream_filters: Optional[list[str]] = None, + filters: Optional[list[str]] = None, filter_match_unfiltered: bool = False, ): - if offset_specification is None and stream_filters is None: + if offset_specification is None and filters is None: raise ValidationCodeException( "At least one between offset_specification and filters must be set when setting up filtering" ) @@ -179,8 +179,8 @@ def __init__( if offset_specification is not None: self._offset(offset_specification) - if stream_filters is not None: - self._filter_values(stream_filters) + if filters is not None: + self._filter_values(filters) if filter_match_unfiltered is True: self._filter_match_unfiltered(filter_match_unfiltered) diff --git a/tests/test_streams.py b/tests/test_streams.py index 6c22f4f..280805d 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -224,7 +224,7 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=StreamOptions(stream_filters=["banana"]), + stream_filter_options=StreamOptions(filters=["banana"]), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -262,7 +262,7 @@ def test_stream_filtering_mixed( addr_queue, # check we are reading just from offset 10 as just banana filtering applies message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=StreamOptions(stream_filters=["banana"]), + stream_filter_options=StreamOptions(filters=["banana"]), ) # send with annotations filter apple and then banana # consumer will read just from offset 10 @@ -299,7 +299,7 @@ def test_stream_filtering_not_present( connection_consumer.dial() consumer = connection_consumer.consumer( - addr_queue, stream_filter_options=StreamOptions(stream_filters=["apple"]) + addr_queue, stream_filter_options=StreamOptions(filters=["apple"]) ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -341,7 +341,7 @@ def test_stream_match_unfiltered( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), stream_filter_options=StreamOptions( - stream_filters=["banana"], filter_match_unfiltered=True + filters=["banana"], filter_match_unfiltered=True ), ) # send with annotations filter banana @@ -381,7 +381,7 @@ def test_stream_reconnection( # disconnection and check happens here message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), stream_filter_options=StreamOptions( - stream_filters=["banana"], filter_match_unfiltered=True + filters=["banana"], filter_match_unfiltered=True ), ) # send with annotations filter banana