Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client.
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
120 changes: 120 additions & 0 deletions docs/examples/sql_stream_filter/sql_stream_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"context"
"fmt"
"github.com/Azure/go-amqp"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

func ptr[T any](v T) *T {
return &v
}

func main() {
// see: https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions
queueName := "stream-sql-filter-example"

rmq.Info("AMQP 1.0 Client SQL Stream Filter Example")

env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)

amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rmq.Error("Error opening connection", err)
return
}

_, err = amqpConnection.Management().DeclareQueue(context.Background(), &rmq.StreamQueueSpecification{
Name: queueName,
})
if err != nil {
rmq.Error("Error declaring stream queue", err)
return
}

publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{
Queue: queueName,
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
}

// this message will be stored on the queue but not received by the consumer
// because it does not match the filter
msgNotIntTheFilter := amqp.NewMessage([]byte("Message that does not match the filter"))
msgNotIntTheFilter.Properties = &amqp.MessageProperties{
Subject: ptr("No"),
}
msgNotIntTheFilter.ApplicationProperties = map[string]interface{}{"keyNo": "valueNO"}

pr, err := publisher.Publish(context.Background(), msgNotIntTheFilter)
if err != nil {
rmq.Error("Error publishing message", err)
return
}
rmq.Info("Published message that does not match the filter", "publish result", pr.Outcome)

// this message will be stored on the queue and received by the consumer
// because it matches the filter
msgInTheFilter := amqp.NewMessage([]byte("Message that matches the filter"))
msgInTheFilter.Properties = &amqp.MessageProperties{
Subject: ptr("Yes_I_am_in_the_filter"),
To: ptr("the_id"),
}
msgInTheFilter.ApplicationProperties = map[string]interface{}{"keyYes": "valueYES"}

pr, err = publisher.Publish(context.Background(), msgInTheFilter)
if err != nil {
rmq.Error("Error publishing message", err)
return
}
rmq.Info("Published message that matches the filter", "publish result", pr.Outcome)

consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, &rmq.StreamConsumerOptions{
InitialCredits: 10,
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
// the SQL expression to filter messages
SQL: "properties.subject LIKE '%Yes_I_am_in_the_filter%' AND properties.to = 'the_id' AND keyYes = 'valueYES'"},
})

if err != nil {
rmq.Error("Error creating consumer", err)
return
}

consumerContext, cancel := context.WithCancel(context.Background())
defer cancel()

// Consume messages from the queue. It should only receive the message that matches the filter
// the second
deliveryContext, err := consumer.Receive(consumerContext)
if err != nil {
rmq.Error("[Consumer]", "Error receiving message", err)
return
}
rmq.Info("[Consumer]", "Body",
fmt.Sprintf("%s", deliveryContext.Message().Data),
"Subject", *deliveryContext.Message().Properties.Subject, "To", *deliveryContext.Message().Properties.To)
err = deliveryContext.Accept(context.Background())
if err != nil {
rmq.Error("Error accepting message", err)
return
}

err = amqpConnection.Management().DeleteQueue(context.Background(), queueName)
if err != nil {
rmq.Error("Error deleting stream queue", err)
return
}
err = amqpConnection.Close(context.Background())
if err != nil {
rmq.Error("Error closing connection", err)
return
}
_ = env.CloseConnections(context.Background())
rmq.Info("AMQP 1.0 Client SQL Stream Filter Example Completed")

}
92 changes: 92 additions & 0 deletions pkg/rabbitmqamqp/amqp_consumer_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,98 @@ var _ = Describe("Consumer stream test", func() {
}()
})

It("SQL filter consumer", func() {
qName := generateName("SQL filter consumer")
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.name).To(Equal(qName))

publishMessagesWithMessageLogic(qName, "not_in_the_filter", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{Subject: ptr("not")}
})

publishMessagesWithMessageLogic(qName, "in_the_filter", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{Subject: ptr("in_the_filter")}
})

publishMessagesWithMessageLogic(qName, "in_the_filter_with_more", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{Subject: ptr("in_the_filter_with_more")}
})

consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
SQL: "properties.subject LIKE '%in_the_filter%'",
},
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 20; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
if i < 10 {
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "in_the_filter")))
} else {
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i-10, "in_the_filter_with_more")))
}
Expect(dc.Accept(context.Background())).To(BeNil())
}

Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})

It("SQL filter consumer combined to other fields", func() {
qName := generateName("SQL filter consumer combined to other fields")
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.name).To(Equal(qName))
publishMessagesWithMessageLogic(qName, "not_in_the_filter", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{Subject: ptr("not")}
})

publishMessagesWithMessageLogic(qName, "in_the_filter", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{Subject: ptr("p_in_the_filter"), To: ptr("the_id")}
msg.ApplicationProperties = map[string]interface{}{"a_in_the_filter_key": "a_in_the_filter_value"}
})

consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
SQL: "properties.subject LIKE '%in_the_filter%' AND properties.to = 'the_id' AND a_in_the_filter_key = 'a_in_the_filter_value'",
},
})

Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "in_the_filter")))
Expect(dc.Accept(context.Background())).To(BeNil())
}

Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
})

type msgLogic = func(*amqp.Message)
Expand Down
34 changes: 32 additions & 2 deletions pkg/rabbitmqamqp/amqp_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ type IOffsetSpecification interface {
toLinkFilter() amqp.LinkFilter
}

// DescriptorCodeSqlFilter see:
// https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/amqp10_common/include/amqp10_filter.hrl
// see DESCRIPTOR_CODE_SQL_FILTER in rabbitmq-server
// DESCRIPTOR_CODE_SQL_FILTER is the uint64 code for amqpSqlFilter = "amqp:sql-filter"
const DescriptorCodeSqlFilter = 0x120

const sqlFilter = "sql-filter"
const rmqStreamFilter = "rabbitmq:stream-filter"
const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"
const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"
Expand Down Expand Up @@ -172,7 +179,27 @@ type StreamFilterOptions struct {
// Filter the data based on Message Properties
Properties *amqp.MessageProperties

// SQLFilter
/* SQLFilter: documentation https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions
It requires RabbitMQ 4.2 or later

Example:
<code>
Define a message like:
var msg := NewMessage([]byte(..))
msg.Properties = &amqp.MessageProperties{Subject: ptr("mySubject"), To: ptr("To")}
msg.ApplicationProperties = map[string]interface{}{"filter_key": "filter_value"}

publisher.Publish(context.Background(), msg)
Then you can create a consumer with a SQL filter like:
consumer, err := connection.NewConsumer(context.Background(), "myQueue", &StreamConsumerOptions{
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
SQL: "properties.subject LIKE '%mySubject%' AND properties.to = 'To' AND filter_key = 'filter_value'",
},
})
</code>
*/
SQL string
}

Expand Down Expand Up @@ -206,7 +233,10 @@ func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter {

filters = append(filters, sco.Offset.toLinkFilter())
if sco.StreamFilterOptions != nil && !isStringNilOrEmpty(&sco.StreamFilterOptions.SQL) {

// here we use DescriptorCodeSqlFilter as the code for the sql filter
// since we need to create a simple DescribedType
// see DescriptorCodeSqlFilter const for more information
filters = append(filters, amqp.NewLinkFilter(sqlFilter, DescriptorCodeSqlFilter, sco.StreamFilterOptions.SQL))
}

if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Values != nil {
Expand Down