Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ format:
poetry run black rabbitmq_amqp_python_client/
poetry run black tests/
poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .

test: format
poetry run pytest .
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ Client examples
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
- [Streams with filters](./streams_with_filters/example_streams_with_filters.py) - Example supporting stream capabilities with filters
159 changes: 159 additions & 0 deletions examples/streams_with_filters/example_streams_with_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# type: ignore
import logging
import time

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Converter,
Environment,
Event,
Message,
MessageProperties,
OffsetSpecification,
StreamConsumerOptions,
StreamFilterOptions,
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_amqp_message(self, event: Event):
# only messages with banana filters and with subject yellow
self._count = self._count + 1
logger.info(
"Received message: {}, subject {}.[Total Consumed: {}]".format(
Converter.bytes_to_string(event.message.body),
event.message.subject,
self._count,
)
)
self.delivery_context.accept(event)

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")

def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")


def create_connection(environment: Environment) -> Connection:
connection = environment.connection()
connection.dial()

return connection


logging.basicConfig()
logger = logging.getLogger("[streams_with_filters]")
logger.setLevel(logging.INFO)


def main() -> None:
"""
In this example we create a stream queue and a consumer with filtering options.
The example combines two filters:
- filter value: banana
- subject: yellow

See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
"""

queue_name = "stream-example-with_filtering-queue"
logger.info("Creating connection")
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)
management = connection.management()
# delete the queue if it exists
management.delete_queue(queue_name)
# create a stream queue
management.declare_queue(StreamSpecification(name=queue_name))

addr_queue = AddressHelper.queue_address(queue_name)

consumer_connection = create_connection(environment)

consumer = consumer_connection.consumer(
addr_queue,
message_handler=MyMessageHandler(),
# the consumer will only receive messages with filter value banana and subject yellow
stream_consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
values=["banana"],
message_properties=MessageProperties(
subject="yellow",
),
),
),
)
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)

# print("create a publisher and publish a test message")
publisher = connection.publisher(addr_queue)

# publish with a filter of apple
for i in range(MESSAGES_TO_PUBLISH):
color = "green" if i % 2 == 0 else "yellow"
publisher.publish(
Message(
Converter.string_to_bytes(body="apple: " + str(i)),
annotations={"x-stream-filter-value": "apple"},
subject=color,
)
)

time.sleep(0.5) # wait a bit to ensure messages are published in different chunks

# publish with a filter of banana
for i in range(MESSAGES_TO_PUBLISH):
color = "green" if i % 2 == 0 else "yellow"
publisher.publish(
Message(
body=Converter.string_to_bytes("banana: " + str(i)),
annotations={"x-stream-filter-value": "banana"},
subject=color,
)
)

publisher.close()

while True:
try:
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
print("connection closed")
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

#
logger.info("consumer exited, deleting queue")
management.delete_queue(queue_name)

print("closing connections")
management.close()
print("after management closing")
environment.close()
print("after connection closing")


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
MessageProperties,
OAuth2Options,
OffsetSpecification,
RecoveryConfiguration,
StreamConsumerOptions,
StreamFilterOptions,
)
from .environment import Environment
from .exceptions import (
Expand Down Expand Up @@ -86,6 +88,8 @@
"PKCS12Store",
"ConnectionClosed",
"StreamConsumerOptions",
"StreamFilterOptions",
"MessageProperties",
"OffsetSpecification",
"OutcomeState",
"Environment",
Expand Down
87 changes: 73 additions & 14 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from datetime import timedelta
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, Optional, Union

Expand All @@ -10,6 +10,7 @@
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
AMQP_PROPERTIES_FILTER = "amqp:properties-filter"


@dataclass
Expand Down Expand Up @@ -149,6 +150,42 @@ class ExchangeToExchangeBindingSpecification:
binding_key: Optional[str] = None


@dataclass
class MessageProperties:
"""
Properties for an AMQP message.

Attributes:
message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str).
user_id: Identity of the user responsible for producing the message.
to: Intended destination node of the message.
subject: Summary information about the message content and purpose.
reply_to: Address of the node to send replies to.
correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str).
content_type: RFC-2046 MIME type for the message's body.
content_encoding: Modifier to the content-type.
absolute_expiry_time: Absolute time when the message expires.
creation_time: Absolute time when the message was created.
group_id: Group the message belongs to.
group_sequence: Relative position of this message within its group.
reply_to_group_id: Id for sending replies to a specific group.
"""

message_id: Optional[Union[int, str, bytes]] = None
user_id: Optional[bytes] = None
to: Optional[str] = None
subject: Optional[str] = None
reply_to: Optional[str] = None
correlation_id: Optional[Union[int, str, bytes]] = None
content_type: Optional[str] = None
content_encoding: Optional[str] = None
absolute_expiry_time: Optional[datetime] = None
creation_time: Optional[datetime] = None
group_id: Optional[str] = None
group_sequence: Optional[int] = None
reply_to_group_id: Optional[str] = None


"""
StreamFilterOptions defines the filtering options for a stream consumer.
for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
Expand All @@ -159,18 +196,21 @@ class StreamFilterOptions:
values: Optional[list[str]] = None
match_unfiltered: bool = False
application_properties: Optional[dict[str, Any]] = None
message_properties: Optional[MessageProperties] = None
sql: str = ""

def __init__(
self,
values: Optional[list[str]] = None,
match_unfiltered: bool = False,
application_properties: Optional[dict[str, Any]] = None,
message_properties: Optional[MessageProperties] = None,
sql: str = "",
):
self.values = values
self.match_unfiltered = match_unfiltered
self.application_properties = application_properties
self.message_properties = message_properties
self.sql = sql


Expand All @@ -195,27 +235,23 @@ def __init__(
filter_options: Optional[StreamFilterOptions] = None,
):

self.streamFilterOptions = filter_options
self._filter_set: Dict[symbol, Described] = {}

if offset_specification is None and self.streamFilterOptions is None:
if offset_specification is None and filter_options is None:
raise ValidationCodeException(
"At least one between offset_specification and filters must be set when setting up filtering"
)
self._filter_set: Dict[symbol, Described] = {}
if offset_specification is not None:
self._offset(offset_specification)

if (
self.streamFilterOptions is not None
and self.streamFilterOptions.values is not None
):
self._filter_values(self.streamFilterOptions.values)
if filter_options is not None and filter_options.values is not None:
self._filter_values(filter_options.values)

if (
self.streamFilterOptions is not None
and self.streamFilterOptions.match_unfiltered
):
self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered)
if filter_options is not None and filter_options.match_unfiltered:
self._filter_match_unfiltered(filter_options.match_unfiltered)

if filter_options is not None and filter_options.message_properties is not None:
self._filter_message_properties(filter_options.message_properties)

def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
"""
Expand Down Expand Up @@ -257,6 +293,29 @@ def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered
)

def _filter_message_properties(
self, message_properties: Optional[MessageProperties]
) -> None:
"""
Set AMQP message properties for filtering.

Args:
message_properties: MessageProperties object containing AMQP message properties
"""
if message_properties is not None:
# dictionary of symbols and described
filter_prop: Dict[symbol, Any] = {}

for key, value in message_properties.__dict__.items():
if value is not None:
# replace _ with - for the key
filter_prop[symbol(key.replace("_", "-"))] = value

if len(filter_prop) > 0:
self._filter_set[symbol(AMQP_PROPERTIES_FILTER)] = Described(
symbol(AMQP_PROPERTIES_FILTER), filter_prop
)

def filter_set(self) -> Dict[symbol, Described]:
"""
Get the current filter set configuration.
Expand Down
Loading