diff --git a/Makefile b/Makefile index 0282241..18277d3 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,7 @@ format: poetry run black rabbitmq_amqp_python_client/ poetry run black tests/ poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503 + poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid . test: format poetry run pytest . diff --git a/examples/README.md b/examples/README.md index c9442f2..24962d1 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,4 +4,5 @@ Client examples - [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection - [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection - [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities - - [Oauth](./oauth/oAuth2.py) - Connection through Oauth token \ No newline at end of file + - [Oauth](./oauth/oAuth2.py) - Connection through Oauth token + - [Streams with filters](./streams_with_filters/example_streams_with_filters.py) - Example supporting stream capabilities with filters \ No newline at end of file diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py new file mode 100644 index 0000000..e95982d --- /dev/null +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -0,0 +1,159 @@ +# type: ignore +import logging +import time + +from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, + Connection, + ConnectionClosed, + Converter, + Environment, + Event, + Message, + MessageProperties, + OffsetSpecification, + StreamConsumerOptions, + StreamFilterOptions, + StreamSpecification, +) + +MESSAGES_TO_PUBLISH = 100 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_amqp_message(self, event: Event): + # only messages with banana filters and with subject yellow + self._count = self._count + 1 + logger.info( + "Received message: {}, subject {}.[Total Consumed: {}]".format( + Converter.bytes_to_string(event.message.body), + event.message.subject, + self._count, + ) + ) + self.delivery_context.accept(event) + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + + return connection + + +logging.basicConfig() +logger = logging.getLogger("[streams_with_filters]") +logger.setLevel(logging.INFO) + + +def main() -> None: + """ + In this example we create a stream queue and a consumer with filtering options. + The example combines two filters: + - filter value: banana + - subject: yellow + + See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions + """ + + queue_name = "stream-example-with_filtering-queue" + logger.info("Creating connection") + environment = Environment("amqp://guest:guest@localhost:5672/") + connection = create_connection(environment) + management = connection.management() + # delete the queue if it exists + management.delete_queue(queue_name) + # create a stream queue + management.declare_queue(StreamSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + consumer_connection = create_connection(environment) + + consumer = consumer_connection.consumer( + addr_queue, + message_handler=MyMessageHandler(), + # the consumer will only receive messages with filter value banana and subject yellow + stream_consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + values=["banana"], + message_properties=MessageProperties( + subject="yellow", + ), + ), + ), + ) + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + + # print("create a publisher and publish a test message") + publisher = connection.publisher(addr_queue) + + # publish with a filter of apple + for i in range(MESSAGES_TO_PUBLISH): + color = "green" if i % 2 == 0 else "yellow" + publisher.publish( + Message( + Converter.string_to_bytes(body="apple: " + str(i)), + annotations={"x-stream-filter-value": "apple"}, + subject=color, + ) + ) + + time.sleep(0.5) # wait a bit to ensure messages are published in different chunks + + # publish with a filter of banana + for i in range(MESSAGES_TO_PUBLISH): + color = "green" if i % 2 == 0 else "yellow" + publisher.publish( + Message( + body=Converter.string_to_bytes("banana: " + str(i)), + annotations={"x-stream-filter-value": "banana"}, + subject=color, + ) + ) + + publisher.close() + + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + print("connection closed") + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break + + # + logger.info("consumer exited, deleting queue") + management.delete_queue(queue_name) + + print("closing connections") + management.close() + print("after management closing") + environment.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 0ebd3e7..5dc2c02 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -10,10 +10,12 @@ ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, + MessageProperties, OAuth2Options, OffsetSpecification, RecoveryConfiguration, StreamConsumerOptions, + StreamFilterOptions, ) from .environment import Environment from .exceptions import ( @@ -86,6 +88,8 @@ "PKCS12Store", "ConnectionClosed", "StreamConsumerOptions", + "StreamFilterOptions", + "MessageProperties", "OffsetSpecification", "OutcomeState", "Environment", diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 9dcf9c0..db6d13f 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from datetime import timedelta +from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, Optional, Union @@ -10,6 +10,7 @@ STREAM_FILTER_SPEC = "rabbitmq:stream-filter" STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" +AMQP_PROPERTIES_FILTER = "amqp:properties-filter" @dataclass @@ -149,6 +150,42 @@ class ExchangeToExchangeBindingSpecification: binding_key: Optional[str] = None +@dataclass +class MessageProperties: + """ + Properties for an AMQP message. + + Attributes: + message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str). + user_id: Identity of the user responsible for producing the message. + to: Intended destination node of the message. + subject: Summary information about the message content and purpose. + reply_to: Address of the node to send replies to. + correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str). + content_type: RFC-2046 MIME type for the message's body. + content_encoding: Modifier to the content-type. + absolute_expiry_time: Absolute time when the message expires. + creation_time: Absolute time when the message was created. + group_id: Group the message belongs to. + group_sequence: Relative position of this message within its group. + reply_to_group_id: Id for sending replies to a specific group. + """ + + message_id: Optional[Union[int, str, bytes]] = None + user_id: Optional[bytes] = None + to: Optional[str] = None + subject: Optional[str] = None + reply_to: Optional[str] = None + correlation_id: Optional[Union[int, str, bytes]] = None + content_type: Optional[str] = None + content_encoding: Optional[str] = None + absolute_expiry_time: Optional[datetime] = None + creation_time: Optional[datetime] = None + group_id: Optional[str] = None + group_sequence: Optional[int] = None + reply_to_group_id: Optional[str] = None + + """ StreamFilterOptions defines the filtering options for a stream consumer. for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering @@ -159,6 +196,7 @@ class StreamFilterOptions: values: Optional[list[str]] = None match_unfiltered: bool = False application_properties: Optional[dict[str, Any]] = None + message_properties: Optional[MessageProperties] = None sql: str = "" def __init__( @@ -166,11 +204,13 @@ def __init__( values: Optional[list[str]] = None, match_unfiltered: bool = False, application_properties: Optional[dict[str, Any]] = None, + message_properties: Optional[MessageProperties] = None, sql: str = "", ): self.values = values self.match_unfiltered = match_unfiltered self.application_properties = application_properties + self.message_properties = message_properties self.sql = sql @@ -195,27 +235,23 @@ def __init__( filter_options: Optional[StreamFilterOptions] = None, ): - self.streamFilterOptions = filter_options + self._filter_set: Dict[symbol, Described] = {} - if offset_specification is None and self.streamFilterOptions is None: + if offset_specification is None and filter_options is None: raise ValidationCodeException( "At least one between offset_specification and filters must be set when setting up filtering" ) - self._filter_set: Dict[symbol, Described] = {} if offset_specification is not None: self._offset(offset_specification) - if ( - self.streamFilterOptions is not None - and self.streamFilterOptions.values is not None - ): - self._filter_values(self.streamFilterOptions.values) + if filter_options is not None and filter_options.values is not None: + self._filter_values(filter_options.values) - if ( - self.streamFilterOptions is not None - and self.streamFilterOptions.match_unfiltered - ): - self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered) + if filter_options is not None and filter_options.match_unfiltered: + self._filter_match_unfiltered(filter_options.match_unfiltered) + + if filter_options is not None and filter_options.message_properties is not None: + self._filter_message_properties(filter_options.message_properties) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ @@ -257,6 +293,29 @@ def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered ) + def _filter_message_properties( + self, message_properties: Optional[MessageProperties] + ) -> None: + """ + Set AMQP message properties for filtering. + + Args: + message_properties: MessageProperties object containing AMQP message properties + """ + if message_properties is not None: + # dictionary of symbols and described + filter_prop: Dict[symbol, Any] = {} + + for key, value in message_properties.__dict__.items(): + if value is not None: + # replace _ with - for the key + filter_prop[symbol(key.replace("_", "-"))] = value + + if len(filter_prop) > 0: + self._filter_set[symbol(AMQP_PROPERTIES_FILTER)] = Described( + symbol(AMQP_PROPERTIES_FILTER), filter_prop + ) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration. diff --git a/tests/test_streams.py b/tests/test_streams.py index e7da2a8..dce875c 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,14 +1,21 @@ from rabbitmq_amqp_python_client import ( AddressHelper, + AMQPMessagingHandler, Connection, + Converter, Environment, OffsetSpecification, StreamConsumerOptions, StreamSpecification, ) from rabbitmq_amqp_python_client.entities import ( + MessageProperties, StreamFilterOptions, ) +from rabbitmq_amqp_python_client.qpid.proton import ( + Event, + Message, +) from .conftest import ( ConsumerTestException, @@ -398,3 +405,69 @@ def test_stream_reconnection( consumer.close() management.delete_queue(stream_name) + + +class MyMessageHandlerMessagePropertiesFilter(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.subject == "important_15" + assert event.message.group_id == "group_15" + assert event.message.body == Converter.string_to_bytes("hello_15") + raise ConsumerTestException("consumed") + + +def test_stream_filter_message_properties( + connection: Connection, environment: Environment +) -> None: + consumer = None + stream_name = "test_stream_filter_message_properties" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerMessagePropertiesFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + message_properties=MessageProperties( + subject="important_15", group_id="group_15" + ) + ) + ), + ) + publisher = connection.publisher(addr_queue) + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + subject="important_{}".format(i), + group_id="group_{}".format(i), + ) + publisher.publish(msg) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name)