Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion examples/streams_with_filters/example_streams_with_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def __init__(self):

def on_amqp_message(self, event: Event):
# only messages with banana filters and with subject yellow
# and application property from = italy get received
self._count = self._count + 1
logger.info(
"Received message: {}, subject {}.[Total Consumed: {}]".format(
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
Converter.bytes_to_string(event.message.body),
event.message.subject,
event.message.application_properties,
self._count,
)
)
Expand Down Expand Up @@ -88,13 +90,15 @@ def main() -> None:
addr_queue,
message_handler=MyMessageHandler(),
# the consumer will only receive messages with filter value banana and subject yellow
# and application property from = italy
stream_consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
values=["banana"],
message_properties=MessageProperties(
subject="yellow",
),
application_properties={"from": "italy"},
),
),
)
Expand All @@ -108,11 +112,13 @@ def main() -> None:
# publish with a filter of apple
for i in range(MESSAGES_TO_PUBLISH):
color = "green" if i % 2 == 0 else "yellow"
from_value = "italy" if i % 3 == 0 else "spain"
publisher.publish(
Message(
Converter.string_to_bytes(body="apple: " + str(i)),
annotations={"x-stream-filter-value": "apple"},
subject=color,
application_properties={"from": from_value},
)
)

Expand All @@ -121,11 +127,13 @@ def main() -> None:
# publish with a filter of banana
for i in range(MESSAGES_TO_PUBLISH):
color = "green" if i % 2 == 0 else "yellow"
from_value = "italy" if i % 3 == 0 else "spain"
publisher.publish(
Message(
body=Converter.string_to_bytes("banana: " + str(i)),
annotations={"x-stream-filter-value": "banana"},
subject=color,
application_properties={"from": from_value},
)
)

Expand Down
18 changes: 18 additions & 0 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
AMQP_PROPERTIES_FILTER = "amqp:properties-filter"
AMQP_APPLICATION_PROPERTIES_FILTER = "amqp:application-properties-filter"


@dataclass
Expand Down Expand Up @@ -252,6 +253,11 @@ def __init__(

if filter_options is not None and filter_options.message_properties is not None:
self._filter_message_properties(filter_options.message_properties)
if (
filter_options is not None
and filter_options.application_properties is not None
):
self._filter_application_properties(filter_options.application_properties)

def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
"""
Expand Down Expand Up @@ -316,6 +322,18 @@ def _filter_message_properties(
symbol(AMQP_PROPERTIES_FILTER), filter_prop
)

def _filter_application_properties(
self, application_properties: Optional[dict[str, Any]]
) -> None:
app_prop = {}
if application_properties is not None:
app_prop = application_properties.copy()

if len(app_prop) > 0:
self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = (
Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop)
)

def filter_set(self) -> Dict[symbol, Described]:
"""
Get the current filter set configuration.
Expand Down