From c59c9410382663f32f44a0ac7ab2d115eb587cdc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 17 Sep 2025 14:56:32 +0200 Subject: [PATCH 1/3] Filters documentation for amqp 1.0 clients. Csharp Signed-off-by: Gabriele Santomaggio --- docs/stream-filtering.md | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/docs/stream-filtering.md b/docs/stream-filtering.md index 3e90c1f33..cd5d79019 100644 --- a/docs/stream-filtering.md +++ b/docs/stream-filtering.md @@ -471,6 +471,22 @@ Consumer consumer = connection.consumerBuilder() ``` + +```csharp +IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). + MessageHandler((context, message) => + { + // process the messages + } + ).Stream().Offset(StreamOffsetSpecification.First).Filter() + .UserId("John"u8.ToArray()) + .Subject("&p:Order") + .Property("region", "emea") + .Stream().Builder() + .BuildAndStartAsync(); +``` + + ```erlang Filter = #{<<"filter-name-1">> => @@ -725,6 +741,23 @@ Consumer consumer = connection.consumerBuilder() ``` + +```csharp +IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). + MessageHandler((context, message) => + { + // process the messages + } + ).Stream().Offset(StreamOffsetSpecification.First).Filter() + .Sql("properties.user_id = 'John' AND" + + "properties.subject LIKE 'Order%' AND region = 'emea'") + .Stream().Builder() + .Stream().Builder() + .BuildAndStartAsync(); +``` + + + ```erlang @@ -829,6 +862,25 @@ Consumer consumer = connection.consumerBuilder() ``` + +```csharp +IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). + MessageHandler((context, message) => + // message processing + }).Stream().Offset(StreamOffsetSpecification.First) + // This Bloom filter will be evaluated server-side per chunk (Stage 1). + .FilterValues("order.created") + .Filter() + // This complex SQL filter expression will be evaluted server-side + // per message at stage 2. + .Sql("p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)").Stream().Builder() + .BuildAndStartAsync().ConfigureAwait(false); +``` + + ```erlang Expression = <<"p.subject = 'order.created' AND " From ff85d5fdd166c7a7cd70d1080f3fd6b315d39aac Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 17 Sep 2025 17:27:59 +0200 Subject: [PATCH 2/3] amqp1.0 stream filter for golang client Signed-off-by: Gabriele Santomaggio --- docs/stream-filtering.md | 48 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/docs/stream-filtering.md b/docs/stream-filtering.md index cd5d79019..d5e02a730 100644 --- a/docs/stream-filtering.md +++ b/docs/stream-filtering.md @@ -487,6 +487,21 @@ IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). ``` + +```go + var subjectPrt = "&p:Order" + consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", + &rmq.StreamConsumerOptions{ + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + Properties: &amqp.MessageProperties{Subject: &subjectPrt, UserID: []byte("John")}, + ApplicationProperties: map[string]interface{}{"region": "emea"}, + }, + }) +``` + + + ```erlang Filter = #{<<"filter-name-1">> => @@ -757,6 +772,19 @@ IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). ``` + +```go +consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", + &rmq.StreamConsumerOptions{ + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + SQL: "properties.user_id = 'John' AND " + + "properties.subject LIKE 'Order%' AND region = 'emea'", + }, + }) +``` + + @@ -881,6 +909,26 @@ IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). ``` + +```csharp +consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", + &rmq.StreamConsumerOptions{ + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + // This Bloom filter will be evaluated server-side per chunk (Stage 1). + Values: []string{"order.created"}, + // This complex SQL filter expression will be evaluted server-side + // per message at stage 2. + SQL:"p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", + }, + }) +``` + + + ```erlang Expression = <<"p.subject = 'order.created' AND " From 63d1f5fdad3f8705945e62b6884786749a0b2416 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 18 Sep 2025 13:55:30 +0200 Subject: [PATCH 3/3] add python documentation Signed-off-by: Gabriele Santomaggio --- docs/stream-filtering.md | 63 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/docs/stream-filtering.md b/docs/stream-filtering.md index d5e02a730..77cd959b1 100644 --- a/docs/stream-filtering.md +++ b/docs/stream-filtering.md @@ -258,7 +258,7 @@ class MyMessageHandler(AMQPMessagingHandler): self.delivery_context.accept(event) stream_address = AddressHelper.queue_address("some-stream") -consumer = consumer_connection.consumer( +consumer = connection.consumer( stream_address, message_handler=MyMessageHandler(), ### This Bloom filter will be evaluated server-side per chunk (Stage 1). @@ -427,6 +427,7 @@ The filter syntax is defined in the AMQP 1.0 extension specification [AMQP Filte RabbitMQ supports a subset of this specification as described below. AMQP filter expressions are either [**Property** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929259) or [**SQL** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929276). + :::note @@ -447,6 +448,7 @@ RabbitMQ implements: As described in the specification, prefix and suffix matching is supported using `&p:` and `&s:` modifiers. + #### Example: Property Filter Expressions The following example causes RabbitMQ to deliver only messages for which **all** of the following apply: @@ -501,6 +503,25 @@ IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). ``` + +```python + consumer = connection.consumer( + "my-queue", + message_handler=MyMessageHandler(), + consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + message_properties=MessageProperties( + subject="&p:Order", + user_id= "John".encode("utf-8"), + + ), + application_properties={"region": "emea"}, + ), + ), + ) +``` + ```erlang @@ -785,6 +806,22 @@ consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", ``` + +```python +consumer = connection.consumer( + "my-queue", + message_handler=MyMessageHandler(), + consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + sql="properties.user_id = 'John' AND " + + "properties.subject LIKE 'Order%' AND region = 'emea'", + ), + ), + ) +``` + + @@ -928,6 +965,30 @@ consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", ``` + +```python + consumer = consumer_connection.consumer( + "my-queue", + message_handler=MyMessageHandler(), + # the consumer will only receive messages with filter value banana and subject yellow + # and application property from = italy + consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + # This Bloom filter will be evaluated server-side per chunk (Stage 1). + values=["order.created"], + # This complex SQL filter expression will be evaluted server-side + # per message at stage 2. + sql="p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", + ), + ), + ) +``` + + ```erlang