Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 162 additions & 1 deletion docs/stream-filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class MyMessageHandler(AMQPMessagingHandler):
self.delivery_context.accept(event)

stream_address = AddressHelper.queue_address("some-stream")
consumer = consumer_connection.consumer(
consumer = connection.consumer(
stream_address,
message_handler=MyMessageHandler(),
### This Bloom filter will be evaluated server-side per chunk (Stage 1).
Expand Down Expand Up @@ -427,6 +427,7 @@ The filter syntax is defined in the AMQP 1.0 extension specification [AMQP Filte
RabbitMQ supports a subset of this specification as described below.

AMQP filter expressions are either [**Property** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929259) or [**SQL** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929276).


:::note

Expand All @@ -447,6 +448,7 @@ RabbitMQ implements:

As described in the specification, prefix and suffix matching is supported using `&p:<prefix>` and `&s:<suffix>` modifiers.


#### Example: Property Filter Expressions

The following example causes RabbitMQ to deliver only messages for which **all** of the following apply:
Expand All @@ -471,6 +473,56 @@ Consumer consumer = connection.consumerBuilder()
```
</TabItem>

<TabItem value="csharp" label="C#">
```csharp
IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue").
MessageHandler((context, message) =>
{
// process the messages
}
).Stream().Offset(StreamOffsetSpecification.First).Filter()
.UserId("John"u8.ToArray())
.Subject("&p:Order")
.Property("region", "emea")
.Stream().Builder()
.BuildAndStartAsync();
```
</TabItem>

<TabItem value="go" label="Go">
```go
var subjectPrt = "&p:Order"
consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue",
&rmq.StreamConsumerOptions{
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
Properties: &amqp.MessageProperties{Subject: &subjectPrt, UserID: []byte("John")},
ApplicationProperties: map[string]interface{}{"region": "emea"},
},
})
```
</TabItem>

<TabItem value="python" label="Python">
```python
consumer = connection.consumer(
"my-queue",
message_handler=MyMessageHandler(),
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
message_properties=MessageProperties(
subject="&p:Order",
user_id= "John".encode("utf-8"),

),
application_properties={"region": "emea"},
),
),
)
```
</TabItem>

<TabItem value="Erlang" label="Erlang">
```erlang
Filter = #{<<"filter-name-1">> =>
Expand Down Expand Up @@ -725,6 +777,52 @@ Consumer consumer = connection.consumerBuilder()
```
</TabItem>

<TabItem value="csharp" label="C#">
```csharp
IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue").
MessageHandler((context, message) =>
{
// process the messages
}
).Stream().Offset(StreamOffsetSpecification.First).Filter()
.Sql("properties.user_id = 'John' AND"
+ "properties.subject LIKE 'Order%' AND region = 'emea'")
.Stream().Builder()
.Stream().Builder()
.BuildAndStartAsync();
```
</TabItem>

<TabItem value="go" label="Go">
```go
consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue",
&rmq.StreamConsumerOptions{
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
SQL: "properties.user_id = 'John' AND "
+ "properties.subject LIKE 'Order%' AND region = 'emea'",
},
})
```
</TabItem>

<TabItem value="python" label="Python">
```python
consumer = connection.consumer(
"my-queue",
message_handler=MyMessageHandler(),
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
sql="properties.user_id = 'John' AND "
+ "properties.subject LIKE 'Order%' AND region = 'emea'",
),
),
)
```
</TabItem>



<TabItem value="Erlang" label="Erlang">
```erlang
Expand Down Expand Up @@ -829,6 +927,69 @@ Consumer consumer = connection.consumerBuilder()
```
</TabItem>

<TabItem value="csharp" label="C#">
```csharp
IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue").
MessageHandler((context, message) =>
// message processing
}).Stream().Offset(StreamOffsetSpecification.First)
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.FilterValues("order.created")
.Filter()
// This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
.Sql("p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)").Stream().Builder()
.BuildAndStartAsync().ConfigureAwait(false);
```
</TabItem>

<TabItem value="go" label="Go">
```csharp
consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue",
&rmq.StreamConsumerOptions{
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
Values: []string{"order.created"},
// This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
SQL:"p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)",
},
})
```
</TabItem>

<TabItem value="python" label="Python">
```python
consumer = consumer_connection.consumer(
"my-queue",
message_handler=MyMessageHandler(),
# the consumer will only receive messages with filter value banana and subject yellow
# and application property from = italy
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
# This Bloom filter will be evaluated server-side per chunk (Stage 1).
values=["order.created"],
# This complex SQL filter expression will be evaluted server-side
# per message at stage 2.
sql="p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)",
),
),
)
```
</TabItem>


<TabItem value="Erlang" label="Erlang">
```erlang
Expression = <<"p.subject = 'order.created' AND "
Expand Down