From 3c6685d952271c4ff858a11f7514e2bb13b3cd5b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 14:59:27 +0200 Subject: [PATCH 1/3] implement application properties filter complete: https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/42 Signed-off-by: Gabriele Santomaggio --- .../example_streams_with_filters.py | 10 +++++++++- rabbitmq_amqp_python_client/entities.py | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index e95982d..e9721ac 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -29,11 +29,13 @@ def __init__(self): def on_amqp_message(self, event: Event): # only messages with banana filters and with subject yellow + # and application property from = italy get received self._count = self._count + 1 logger.info( - "Received message: {}, subject {}.[Total Consumed: {}]".format( + "Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format( Converter.bytes_to_string(event.message.body), event.message.subject, + event.message.application_properties, self._count, ) ) @@ -88,6 +90,7 @@ def main() -> None: addr_queue, message_handler=MyMessageHandler(), # the consumer will only receive messages with filter value banana and subject yellow + # and application property from = italy stream_consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first, filter_options=StreamFilterOptions( @@ -95,6 +98,7 @@ def main() -> None: message_properties=MessageProperties( subject="yellow", ), + application_properties={"from": "italy"} ), ), ) @@ -108,11 +112,13 @@ def main() -> None: # publish with a filter of apple for i in range(MESSAGES_TO_PUBLISH): color = "green" if i % 2 == 0 else "yellow" + from_value = "italy" if i % 3 == 0 else "spain" publisher.publish( Message( Converter.string_to_bytes(body="apple: " + str(i)), annotations={"x-stream-filter-value": "apple"}, subject=color, + application_properties={"from": from_value}, ) ) @@ -121,11 +127,13 @@ def main() -> None: # publish with a filter of banana for i in range(MESSAGES_TO_PUBLISH): color = "green" if i % 2 == 0 else "yellow" + from_value = "italy" if i % 3 == 0 else "spain" publisher.publish( Message( body=Converter.string_to_bytes("banana: " + str(i)), annotations={"x-stream-filter-value": "banana"}, subject=color, + application_properties={"from": from_value}, ) ) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index db6d13f..3e8b5a7 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -11,6 +11,7 @@ STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" AMQP_PROPERTIES_FILTER = "amqp:properties-filter" +AMQP_APPLICATION_PROPERTIES_FILTER = "amqp:application-properties-filter" @dataclass @@ -252,6 +253,11 @@ def __init__( if filter_options is not None and filter_options.message_properties is not None: self._filter_message_properties(filter_options.message_properties) + if ( + filter_options is not None + and filter_options.application_properties is not None + ): + self._filter_application_properties(filter_options.application_properties) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ @@ -316,6 +322,19 @@ def _filter_message_properties( symbol(AMQP_PROPERTIES_FILTER), filter_prop ) + def _filter_application_properties( + self, application_properties: Optional[dict[str, Any]] + ) -> None: + app_prop = {} + if application_properties is not None: + for key, value in application_properties.items(): + app_prop[key] = value + + if len(app_prop) > 0: + self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = ( + Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop) + ) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration. From 9fb4b41e83ca2efc1941edc0dc4146bacfc8617c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 15:02:28 +0200 Subject: [PATCH 2/3] formatting Signed-off-by: Gabriele Santomaggio --- examples/streams_with_filters/example_streams_with_filters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index e9721ac..9f5c2dc 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -98,7 +98,7 @@ def main() -> None: message_properties=MessageProperties( subject="yellow", ), - application_properties={"from": "italy"} + application_properties={"from": "italy"}, ), ), ) From f410ab2ae224b8f103d1f82fe7f40b0346fe3200 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 15:21:50 +0200 Subject: [PATCH 3/3] Update rabbitmq_amqp_python_client/entities.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/entities.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 3e8b5a7..5423006 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -327,8 +327,7 @@ def _filter_application_properties( ) -> None: app_prop = {} if application_properties is not None: - for key, value in application_properties.items(): - app_prop[key] = value + app_prop = application_properties.copy() if len(app_prop) > 0: self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = (