From 683bb12833a7463ef7baed36073c373f80b1ff5e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 11:25:58 +0200 Subject: [PATCH 1/7] Implement filters based on message properites. Closes: https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/42 Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/__init__.py | 4 + rabbitmq_amqp_python_client/entities.py | 120 +++++++++++++++++++++--- tests/test_streams.py | 73 ++++++++++++++ 3 files changed, 183 insertions(+), 14 deletions(-) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 0ebd3e7..5dc2c02 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -10,10 +10,12 @@ ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, + MessageProperties, OAuth2Options, OffsetSpecification, RecoveryConfiguration, StreamConsumerOptions, + StreamFilterOptions, ) from .environment import Environment from .exceptions import ( @@ -86,6 +88,8 @@ "PKCS12Store", "ConnectionClosed", "StreamConsumerOptions", + "StreamFilterOptions", + "MessageProperties", "OffsetSpecification", "OutcomeState", "Environment", diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 9dcf9c0..37c07cf 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from datetime import timedelta +from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, Optional, Union @@ -10,6 +10,7 @@ STREAM_FILTER_SPEC = "rabbitmq:stream-filter" STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" +AMQP_PROPERTIES_FILTER = "amqp:properties-filter" @dataclass @@ -149,6 +150,77 @@ class ExchangeToExchangeBindingSpecification: binding_key: Optional[str] = None +class MessageProperties: + def __init__( + self, + message_id: Optional[Any] = None, + user_id: Optional[bytes] = None, + to: Optional[str] = None, + subject: Optional[str] = None, + reply_to: Optional[str] = None, + correlation_id: Optional[Any] = None, + content_type: Optional[str] = None, + content_encoding: Optional[str] = None, + absolute_expiry_time: Optional[datetime] = None, + creation_time: Optional[datetime] = None, + group_id: Optional[str] = None, + group_sequence: Optional[int] = None, + reply_to_group_id: Optional[str] = None, + ): + # Message-id, if set, uniquely identifies a message within the message system. + # The message producer is usually responsible for setting the message-id in + # such a way that it is assured to be globally unique. A broker MAY discard a + # message as a duplicate if the value of the message-id matches that of a + # previously received message sent to the same node. + # + # The value is restricted to the following types: + # - int (for uint64), UUID, bytes, or str + self.message_id: Optional[Any] = message_id + + # The identity of the user responsible for producing the message. + # The client sets this value, and it MAY be authenticated by intermediaries. + self.user_id: Optional[bytes] = user_id + + # The to field identifies the node that is the intended destination of the message. + # On any given transfer this might not be the node at the receiving end of the link. + self.to: Optional[str] = to + + # A common field for summary information about the message content and purpose. + self.subject: Optional[str] = subject + + # The address of the node to send replies to. + self.reply_to: Optional[str] = reply_to + + # This is a client-specific id that can be used to mark or identify messages + # between clients. + # + # The value is restricted to the following types: + # - int (for uint64), UUID, bytes, or str + self.correlation_id: Optional[Any] = correlation_id + + # The RFC-2046 [RFC2046] MIME type for the message's application-data section (body). + self.content_type: Optional[str] = content_type + + # The content-encoding property is used as a modifier to the content-type. + self.content_encoding: Optional[str] = content_encoding + + # An absolute time when this message is considered to be expired. + self.absolute_expiry_time: Optional[datetime] = absolute_expiry_time + + # An absolute time when this message was created. + self.creation_time: Optional[datetime] = creation_time + + # Identifies the group the message belongs to. + self.group_id: Optional[str] = group_id + + # The relative position of this message within its group. + self.group_sequence: Optional[int] = group_sequence + + # This is a client-specific id that is used so that client can send replies to this + # message to a specific group. + self.reply_to_group_id: Optional[str] = reply_to_group_id + + """ StreamFilterOptions defines the filtering options for a stream consumer. for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering @@ -159,6 +231,7 @@ class StreamFilterOptions: values: Optional[list[str]] = None match_unfiltered: bool = False application_properties: Optional[dict[str, Any]] = None + message_properties: Optional[MessageProperties] = None sql: str = "" def __init__( @@ -166,11 +239,13 @@ def __init__( values: Optional[list[str]] = None, match_unfiltered: bool = False, application_properties: Optional[dict[str, Any]] = None, + message_properties: Optional[MessageProperties] = None, sql: str = "", ): self.values = values self.match_unfiltered = match_unfiltered self.application_properties = application_properties + self.message_properties = message_properties self.sql = sql @@ -195,27 +270,22 @@ def __init__( filter_options: Optional[StreamFilterOptions] = None, ): - self.streamFilterOptions = filter_options + self._filter_set: Dict[symbol, Described] = {} - if offset_specification is None and self.streamFilterOptions is None: + if offset_specification is None and filter_options is None: raise ValidationCodeException( "At least one between offset_specification and filters must be set when setting up filtering" ) - self._filter_set: Dict[symbol, Described] = {} if offset_specification is not None: self._offset(offset_specification) - if ( - self.streamFilterOptions is not None - and self.streamFilterOptions.values is not None - ): - self._filter_values(self.streamFilterOptions.values) + if filter_options is not None and filter_options.values is not None: + self._filter_values(filter_options.values) - if ( - self.streamFilterOptions is not None - and self.streamFilterOptions.match_unfiltered - ): - self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered) + if filter_options is not None and filter_options.match_unfiltered: + self._filter_match_unfiltered(filter_options.match_unfiltered) + + self._filter_message_properties(filter_options.message_properties) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ @@ -257,6 +327,28 @@ def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered ) + def _filter_message_properties(self, message_properties: MessageProperties) -> None: + """ + Set application properties for filtering. + + Args: + message_properties: MessageProperties object containing application properties + """ + if message_properties.__dict__ is not None: + # dictionary of symbols and described + filter_prop: Dict[symbol, Any] = {} + + for key, value in message_properties.__dict__.items(): + if key is not None: + if message_properties.__dict__[key] is not None: + # replace _ with - for the key + filter_prop[symbol(key.replace("_", "-"))] = value + + if len(filter_prop) > 0: + self._filter_set[symbol(AMQP_PROPERTIES_FILTER)] = Described( + symbol(AMQP_PROPERTIES_FILTER), filter_prop + ) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration. diff --git a/tests/test_streams.py b/tests/test_streams.py index e7da2a8..dce875c 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,14 +1,21 @@ from rabbitmq_amqp_python_client import ( AddressHelper, + AMQPMessagingHandler, Connection, + Converter, Environment, OffsetSpecification, StreamConsumerOptions, StreamSpecification, ) from rabbitmq_amqp_python_client.entities import ( + MessageProperties, StreamFilterOptions, ) +from rabbitmq_amqp_python_client.qpid.proton import ( + Event, + Message, +) from .conftest import ( ConsumerTestException, @@ -398,3 +405,69 @@ def test_stream_reconnection( consumer.close() management.delete_queue(stream_name) + + +class MyMessageHandlerMessagePropertiesFilter(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.subject == "important_15" + assert event.message.group_id == "group_15" + assert event.message.body == Converter.string_to_bytes("hello_15") + raise ConsumerTestException("consumed") + + +def test_stream_filter_message_properties( + connection: Connection, environment: Environment +) -> None: + consumer = None + stream_name = "test_stream_filter_message_properties" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerMessagePropertiesFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + message_properties=MessageProperties( + subject="important_15", group_id="group_15" + ) + ) + ), + ) + publisher = connection.publisher(addr_queue) + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + subject="important_{}".format(i), + group_id="group_{}".format(i), + ) + publisher.publish(msg) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name) From 7f8919a774a9ae822ba2b15ada8b39798971cd39 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 11:48:21 +0200 Subject: [PATCH 2/7] formatting Signed-off-by: Gabriele Santomaggio --- Makefile | 1 + rabbitmq_amqp_python_client/entities.py | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 0282241..cf62243 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ rabbitmq-server-stop: format: poetry run isort --skip rabbitmq_amqp_python_client/qpid --skip .venv . + poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid . poetry run black rabbitmq_amqp_python_client/ poetry run black tests/ poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503 diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 37c07cf..c29f51f 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -285,7 +285,8 @@ def __init__( if filter_options is not None and filter_options.match_unfiltered: self._filter_match_unfiltered(filter_options.match_unfiltered) - self._filter_message_properties(filter_options.message_properties) + if filter_options is not None and filter_options.message_properties is not None: + self._filter_message_properties(filter_options.message_properties) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ @@ -327,7 +328,9 @@ def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered ) - def _filter_message_properties(self, message_properties: MessageProperties) -> None: + def _filter_message_properties( + self, message_properties: Optional[MessageProperties] + ) -> None: """ Set application properties for filtering. From 46d3a2e68196b66cf0a736bf1189a58a3cea4f35 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 13:32:21 +0200 Subject: [PATCH 3/7] Add examples Signed-off-by: Gabriele Santomaggio --- Makefile | 2 +- examples/README.md | 3 +- .../example_streams_with_filters.py | 153 ++++++++++++++++++ 3 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 examples/streams_with_filters/example_streams_with_filters.py diff --git a/Makefile b/Makefile index cf62243..18277d3 100644 --- a/Makefile +++ b/Makefile @@ -9,10 +9,10 @@ rabbitmq-server-stop: format: poetry run isort --skip rabbitmq_amqp_python_client/qpid --skip .venv . - poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid . poetry run black rabbitmq_amqp_python_client/ poetry run black tests/ poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503 + poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid . test: format poetry run pytest . diff --git a/examples/README.md b/examples/README.md index c9442f2..24962d1 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,4 +4,5 @@ Client examples - [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection - [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection - [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities - - [Oauth](./oauth/oAuth2.py) - Connection through Oauth token \ No newline at end of file + - [Oauth](./oauth/oAuth2.py) - Connection through Oauth token + - [Streams with filters](./streams_with_filters/example_streams_with_filters.py) - Example supporting stream capabilities with filters \ No newline at end of file diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py new file mode 100644 index 0000000..e855de1 --- /dev/null +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -0,0 +1,153 @@ +# type: ignore +import logging +import time + +from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, + Connection, + ConnectionClosed, + Converter, + Environment, + Event, + Message, + MessageProperties, + OffsetSpecification, + StreamConsumerOptions, + StreamFilterOptions, + StreamSpecification, +) + +MESSAGES_TO_PUBLISH = 100 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_amqp_message(self, event: Event): + # only messages with banana filters and with subject yellow + self._count = self._count + 1 + logger.info("Received message: {}, subject {}.[Total Consumed: {}]". + format(Converter.bytes_to_string(event.message.body), event.message.subject, self._count)) + self.delivery_context.accept(event) + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + + return connection + + +logging.basicConfig() +logger = logging.getLogger("[streams_with_filters]") +logger.setLevel(logging.INFO) + + +def main() -> None: + """ + In this example we create a stream queue and a consumer with filtering options. + The example combines two filters: + - filter value: banana + - subject: yellow + + See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions + """ + + queue_name = "stream-example-with_filtering-queue" + logger.info("Creating connection") + environment = Environment("amqp://guest:guest@localhost:5672/") + connection = create_connection(environment) + management = connection.management() + # delete the queue if it exists + management.delete_queue(queue_name) + # create a stream queue + management.declare_queue(StreamSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + consumer_connection = create_connection(environment) + + consumer = consumer_connection.consumer( + addr_queue, + message_handler=MyMessageHandler(), + + # the consumer will only receive messages with filter value banana and subject yellow + + stream_consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + values=["banana"], + message_properties=MessageProperties(subject="yellow", ))) + + ) + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + + # print("create a publisher and publish a test message") + publisher = connection.publisher(addr_queue) + + # publish with a filter of apple + for i in range(MESSAGES_TO_PUBLISH): + color = "green" if i % 2 == 0 else "yellow" + publisher.publish( + Message( + Converter.string_to_bytes(body="apple: " + str(i)), + annotations={"x-stream-filter-value": "apple"}, + subject=color, + ) + ) + + time.sleep(0.5) # wait a bit to ensure messages are published in different chunks + + # publish with a filter of banana + for i in range(MESSAGES_TO_PUBLISH): + color = "green" if i % 2 == 0 else "yellow" + publisher.publish( + Message( + body=Converter.string_to_bytes("banana: " + str(i)), + annotations={"x-stream-filter-value": "banana"}, + subject=color, + ) + ) + + publisher.close() + + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + print("connection closed") + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break + + # + logger.info("consumer exited, deleting queue") + management.delete_queue(queue_name) + + print("closing connections") + management.close() + print("after management closing") + environment.close() + print("after connection closing") + + +if __name__ == "__main__": + main() From c0af23223096be43a4852c3c2323a772dd6f191c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 13:39:39 +0200 Subject: [PATCH 4/7] formatting Signed-off-by: Gabriele Santomaggio --- .../example_streams_with_filters.py | 18 ++-- rabbitmq_amqp_python_client/entities.py | 101 ++++++------------ 2 files changed, 45 insertions(+), 74 deletions(-) diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index e855de1..e95982d 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -30,8 +30,13 @@ def __init__(self): def on_amqp_message(self, event: Event): # only messages with banana filters and with subject yellow self._count = self._count + 1 - logger.info("Received message: {}, subject {}.[Total Consumed: {}]". - format(Converter.bytes_to_string(event.message.body), event.message.subject, self._count)) + logger.info( + "Received message: {}, subject {}.[Total Consumed: {}]".format( + Converter.bytes_to_string(event.message.body), + event.message.subject, + self._count, + ) + ) self.delivery_context.accept(event) def on_connection_closed(self, event: Event): @@ -82,15 +87,16 @@ def main() -> None: consumer = consumer_connection.consumer( addr_queue, message_handler=MyMessageHandler(), - # the consumer will only receive messages with filter value banana and subject yellow - stream_consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first, filter_options=StreamFilterOptions( values=["banana"], - message_properties=MessageProperties(subject="yellow", ))) - + message_properties=MessageProperties( + subject="yellow", + ), + ), + ), ) print( "create a consumer and consume the test message - press control + c to terminate to consume" diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index c29f51f..c05ab30 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -150,75 +150,40 @@ class ExchangeToExchangeBindingSpecification: binding_key: Optional[str] = None +@dataclass class MessageProperties: - def __init__( - self, - message_id: Optional[Any] = None, - user_id: Optional[bytes] = None, - to: Optional[str] = None, - subject: Optional[str] = None, - reply_to: Optional[str] = None, - correlation_id: Optional[Any] = None, - content_type: Optional[str] = None, - content_encoding: Optional[str] = None, - absolute_expiry_time: Optional[datetime] = None, - creation_time: Optional[datetime] = None, - group_id: Optional[str] = None, - group_sequence: Optional[int] = None, - reply_to_group_id: Optional[str] = None, - ): - # Message-id, if set, uniquely identifies a message within the message system. - # The message producer is usually responsible for setting the message-id in - # such a way that it is assured to be globally unique. A broker MAY discard a - # message as a duplicate if the value of the message-id matches that of a - # previously received message sent to the same node. - # - # The value is restricted to the following types: - # - int (for uint64), UUID, bytes, or str - self.message_id: Optional[Any] = message_id - - # The identity of the user responsible for producing the message. - # The client sets this value, and it MAY be authenticated by intermediaries. - self.user_id: Optional[bytes] = user_id - - # The to field identifies the node that is the intended destination of the message. - # On any given transfer this might not be the node at the receiving end of the link. - self.to: Optional[str] = to - - # A common field for summary information about the message content and purpose. - self.subject: Optional[str] = subject - - # The address of the node to send replies to. - self.reply_to: Optional[str] = reply_to - - # This is a client-specific id that can be used to mark or identify messages - # between clients. - # - # The value is restricted to the following types: - # - int (for uint64), UUID, bytes, or str - self.correlation_id: Optional[Any] = correlation_id - - # The RFC-2046 [RFC2046] MIME type for the message's application-data section (body). - self.content_type: Optional[str] = content_type - - # The content-encoding property is used as a modifier to the content-type. - self.content_encoding: Optional[str] = content_encoding - - # An absolute time when this message is considered to be expired. - self.absolute_expiry_time: Optional[datetime] = absolute_expiry_time - - # An absolute time when this message was created. - self.creation_time: Optional[datetime] = creation_time - - # Identifies the group the message belongs to. - self.group_id: Optional[str] = group_id - - # The relative position of this message within its group. - self.group_sequence: Optional[int] = group_sequence - - # This is a client-specific id that is used so that client can send replies to this - # message to a specific group. - self.reply_to_group_id: Optional[str] = reply_to_group_id + """ + Properties for an AMQP message. + + Attributes: + message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str). + user_id: Identity of the user responsible for producing the message. + to: Intended destination node of the message. + subject: Summary information about the message content and purpose. + reply_to: Address of the node to send replies to. + correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str). + content_type: RFC-2046 MIME type for the message's body. + content_encoding: Modifier to the content-type. + absolute_expiry_time: Absolute time when the message expires. + creation_time: Absolute time when the message was created. + group_id: Group the message belongs to. + group_sequence: Relative position of this message within its group. + reply_to_group_id: Id for sending replies to a specific group. + """ + + message_id: Optional[Any] = None + user_id: Optional[bytes] = None + to: Optional[str] = None + subject: Optional[str] = None + reply_to: Optional[str] = None + correlation_id: Optional[Any] = None + content_type: Optional[str] = None + content_encoding: Optional[str] = None + absolute_expiry_time: Optional[datetime] = None + creation_time: Optional[datetime] = None + group_id: Optional[str] = None + group_sequence: Optional[int] = None + reply_to_group_id: Optional[str] = None """ From 410b6f89d3e7098368d20f44a24c791bbf30871e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 13:57:09 +0200 Subject: [PATCH 5/7] Update rabbitmq_amqp_python_client/entities.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/entities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index c05ab30..aa998bf 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -302,7 +302,7 @@ def _filter_message_properties( Args: message_properties: MessageProperties object containing application properties """ - if message_properties.__dict__ is not None: + if message_properties is not None: # dictionary of symbols and described filter_prop: Dict[symbol, Any] = {} From fedeab61453f1a824f66bbe7a87a55552ab48dde Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 13:57:53 +0200 Subject: [PATCH 6/7] Update rabbitmq_amqp_python_client/entities.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/entities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index aa998bf..099bb31 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -297,10 +297,10 @@ def _filter_message_properties( self, message_properties: Optional[MessageProperties] ) -> None: """ - Set application properties for filtering. + Set AMQP message properties for filtering. Args: - message_properties: MessageProperties object containing application properties + message_properties: MessageProperties object containing AMQP message properties """ if message_properties is not None: # dictionary of symbols and described From 2bfb2c524153485bd6911d0c2fe978629075cfa2 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 14:02:36 +0200 Subject: [PATCH 7/7] formatting Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/entities.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 099bb31..db6d13f 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -171,12 +171,12 @@ class MessageProperties: reply_to_group_id: Id for sending replies to a specific group. """ - message_id: Optional[Any] = None + message_id: Optional[Union[int, str, bytes]] = None user_id: Optional[bytes] = None to: Optional[str] = None subject: Optional[str] = None reply_to: Optional[str] = None - correlation_id: Optional[Any] = None + correlation_id: Optional[Union[int, str, bytes]] = None content_type: Optional[str] = None content_encoding: Optional[str] = None absolute_expiry_time: Optional[datetime] = None @@ -307,10 +307,9 @@ def _filter_message_properties( filter_prop: Dict[symbol, Any] = {} for key, value in message_properties.__dict__.items(): - if key is not None: - if message_properties.__dict__[key] is not None: - # replace _ with - for the key - filter_prop[symbol(key.replace("_", "-"))] = value + if value is not None: + # replace _ with - for the key + filter_prop[symbol(key.replace("_", "-"))] = value if len(filter_prop) > 0: self._filter_set[symbol(AMQP_PROPERTIES_FILTER)] = Described(