diff --git a/tests/test_streams.py b/tests/test_streams.py index dce875c..a3b49f2 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,3 +1,5 @@ +import time + from rabbitmq_amqp_python_client import ( AddressHelper, AMQPMessagingHandler, @@ -471,3 +473,140 @@ def test_stream_filter_message_properties( consumer.close() management.delete_queue(stream_name) + + +class MyMessageHandlerApplicationPropertiesFilter(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.application_properties == {"key": "value_17"} + raise ConsumerTestException("consumed") + + +def test_stream_filter_application_properties( + connection: Connection, environment: Environment +) -> None: + consumer = None + stream_name = "test_stream_application_message_properties" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerApplicationPropertiesFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + application_properties={"key": "value_17"}, + ) + ), + ) + publisher = connection.publisher(addr_queue) + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + application_properties={"key": "value_{}".format(i)}, + ) + publisher.publish(msg) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name) + + +class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.annotations["x-stream-filter-value"] == "the_value_filter" + assert event.message.application_properties == {"key": "app_value_9999"} + assert event.message.subject == "important_9999" + assert event.message.body == Converter.string_to_bytes("the_right_one_9999") + raise ConsumerTestException("consumed") + + +def test_stream_filter_mixing_different( + connection: Connection, environment: Environment +) -> None: + consumer = None + stream_name = "test_stream_filter_mixing_different" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.delete_queue(stream_name) + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerMixingDifferentFilters(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + values=["the_value_filter"], + application_properties={"key": "app_value_9999"}, + message_properties=MessageProperties(subject="important_9999"), + ) + ), + ) + publisher = connection.publisher(addr_queue) + # all these messages will be filtered out + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + ) + publisher.publish(msg) + + time.sleep(1) # wait a bit to ensure messages are published in different chunks + msg = Message( + body=Converter.string_to_bytes("the_right_one_9999"), + annotations={"x-stream-filter-value": "the_value_filter"}, + application_properties={"key": "app_value_9999"}, + subject="important_9999", + ) + publisher.publish(msg) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name)