diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index e95982d..9f5c2dc 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -29,11 +29,13 @@ def __init__(self): def on_amqp_message(self, event: Event): # only messages with banana filters and with subject yellow + # and application property from = italy get received self._count = self._count + 1 logger.info( - "Received message: {}, subject {}.[Total Consumed: {}]".format( + "Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format( Converter.bytes_to_string(event.message.body), event.message.subject, + event.message.application_properties, self._count, ) ) @@ -88,6 +90,7 @@ def main() -> None: addr_queue, message_handler=MyMessageHandler(), # the consumer will only receive messages with filter value banana and subject yellow + # and application property from = italy stream_consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first, filter_options=StreamFilterOptions( @@ -95,6 +98,7 @@ def main() -> None: message_properties=MessageProperties( subject="yellow", ), + application_properties={"from": "italy"}, ), ), ) @@ -108,11 +112,13 @@ def main() -> None: # publish with a filter of apple for i in range(MESSAGES_TO_PUBLISH): color = "green" if i % 2 == 0 else "yellow" + from_value = "italy" if i % 3 == 0 else "spain" publisher.publish( Message( Converter.string_to_bytes(body="apple: " + str(i)), annotations={"x-stream-filter-value": "apple"}, subject=color, + application_properties={"from": from_value}, ) ) @@ -121,11 +127,13 @@ def main() -> None: # publish with a filter of banana for i in range(MESSAGES_TO_PUBLISH): color = "green" if i % 2 == 0 else "yellow" + from_value = "italy" if i % 3 == 0 else "spain" publisher.publish( Message( body=Converter.string_to_bytes("banana: " + str(i)), annotations={"x-stream-filter-value": "banana"}, subject=color, + application_properties={"from": from_value}, ) ) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index db6d13f..5423006 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -11,6 +11,7 @@ STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" AMQP_PROPERTIES_FILTER = "amqp:properties-filter" +AMQP_APPLICATION_PROPERTIES_FILTER = "amqp:application-properties-filter" @dataclass @@ -252,6 +253,11 @@ def __init__( if filter_options is not None and filter_options.message_properties is not None: self._filter_message_properties(filter_options.message_properties) + if ( + filter_options is not None + and filter_options.application_properties is not None + ): + self._filter_application_properties(filter_options.application_properties) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ @@ -316,6 +322,18 @@ def _filter_message_properties( symbol(AMQP_PROPERTIES_FILTER), filter_prop ) + def _filter_application_properties( + self, application_properties: Optional[dict[str, Any]] + ) -> None: + app_prop = {} + if application_properties is not None: + app_prop = application_properties.copy() + + if len(app_prop) > 0: + self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = ( + Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop) + ) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration.