diff --git a/docs/stream-filtering.md b/docs/stream-filtering.md index 3e90c1f33..77cd959b1 100644 --- a/docs/stream-filtering.md +++ b/docs/stream-filtering.md @@ -258,7 +258,7 @@ class MyMessageHandler(AMQPMessagingHandler): self.delivery_context.accept(event) stream_address = AddressHelper.queue_address("some-stream") -consumer = consumer_connection.consumer( +consumer = connection.consumer( stream_address, message_handler=MyMessageHandler(), ### This Bloom filter will be evaluated server-side per chunk (Stage 1). @@ -427,6 +427,7 @@ The filter syntax is defined in the AMQP 1.0 extension specification [AMQP Filte RabbitMQ supports a subset of this specification as described below. AMQP filter expressions are either [**Property** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929259) or [**SQL** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929276). + :::note @@ -447,6 +448,7 @@ RabbitMQ implements: As described in the specification, prefix and suffix matching is supported using `&p:` and `&s:` modifiers. + #### Example: Property Filter Expressions The following example causes RabbitMQ to deliver only messages for which **all** of the following apply: @@ -471,6 +473,56 @@ Consumer consumer = connection.consumerBuilder() ``` + +```csharp +IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). + MessageHandler((context, message) => + { + // process the messages + } + ).Stream().Offset(StreamOffsetSpecification.First).Filter() + .UserId("John"u8.ToArray()) + .Subject("&p:Order") + .Property("region", "emea") + .Stream().Builder() + .BuildAndStartAsync(); +``` + + + +```go + var subjectPrt = "&p:Order" + consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", + &rmq.StreamConsumerOptions{ + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + Properties: &amqp.MessageProperties{Subject: &subjectPrt, UserID: []byte("John")}, + ApplicationProperties: map[string]interface{}{"region": "emea"}, + }, + }) +``` + + + +```python + consumer = connection.consumer( + "my-queue", + message_handler=MyMessageHandler(), + consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + message_properties=MessageProperties( + subject="&p:Order", + user_id= "John".encode("utf-8"), + + ), + application_properties={"region": "emea"}, + ), + ), + ) +``` + + ```erlang Filter = #{<<"filter-name-1">> => @@ -725,6 +777,52 @@ Consumer consumer = connection.consumerBuilder() ``` + +```csharp +IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). + MessageHandler((context, message) => + { + // process the messages + } + ).Stream().Offset(StreamOffsetSpecification.First).Filter() + .Sql("properties.user_id = 'John' AND" + + "properties.subject LIKE 'Order%' AND region = 'emea'") + .Stream().Builder() + .Stream().Builder() + .BuildAndStartAsync(); +``` + + + +```go +consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", + &rmq.StreamConsumerOptions{ + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + SQL: "properties.user_id = 'John' AND " + + "properties.subject LIKE 'Order%' AND region = 'emea'", + }, + }) +``` + + + +```python +consumer = connection.consumer( + "my-queue", + message_handler=MyMessageHandler(), + consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + sql="properties.user_id = 'John' AND " + + "properties.subject LIKE 'Order%' AND region = 'emea'", + ), + ), + ) +``` + + + ```erlang @@ -829,6 +927,69 @@ Consumer consumer = connection.consumerBuilder() ``` + +```csharp +IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue"). + MessageHandler((context, message) => + // message processing + }).Stream().Offset(StreamOffsetSpecification.First) + // This Bloom filter will be evaluated server-side per chunk (Stage 1). + .FilterValues("order.created") + .Filter() + // This complex SQL filter expression will be evaluted server-side + // per message at stage 2. + .Sql("p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)").Stream().Builder() + .BuildAndStartAsync().ConfigureAwait(false); +``` + + + +```csharp +consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue", + &rmq.StreamConsumerOptions{ + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + // This Bloom filter will be evaluated server-side per chunk (Stage 1). + Values: []string{"order.created"}, + // This complex SQL filter expression will be evaluted server-side + // per message at stage 2. + SQL:"p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", + }, + }) +``` + + + +```python + consumer = consumer_connection.consumer( + "my-queue", + message_handler=MyMessageHandler(), + # the consumer will only receive messages with filter value banana and subject yellow + # and application property from = italy + consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions( + # This Bloom filter will be evaluated server-side per chunk (Stage 1). + values=["order.created"], + # This complex SQL filter expression will be evaluted server-side + # per message at stage 2. + sql="p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", + ), + ), + ) +``` + + + ```erlang Expression = <<"p.subject = 'order.created' AND "