diff --git a/docs/examples/README.md b/docs/examples/README.md index d8e523b..adfa0a9 100644 --- a/docs/examples/README.md +++ b/docs/examples/README.md @@ -10,4 +10,5 @@ - [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client. - [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client. - [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues. -- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client. \ No newline at end of file +- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client. +- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams. \ No newline at end of file diff --git a/docs/examples/sql_stream_filter/sql_stream_filter.go b/docs/examples/sql_stream_filter/sql_stream_filter.go new file mode 100644 index 0000000..9039ebc --- /dev/null +++ b/docs/examples/sql_stream_filter/sql_stream_filter.go @@ -0,0 +1,120 @@ +package main + +import ( + "context" + "fmt" + "github.com/Azure/go-amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +func ptr[T any](v T) *T { + return &v +} + +func main() { + // see: https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions + queueName := "stream-sql-filter-example" + + rmq.Info("AMQP 1.0 Client SQL Stream Filter Example") + + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) + + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + + _, err = amqpConnection.Management().DeclareQueue(context.Background(), &rmq.StreamQueueSpecification{ + Name: queueName, + }) + if err != nil { + rmq.Error("Error declaring stream queue", err) + return + } + + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{ + Queue: queueName, + }, nil) + if err != nil { + rmq.Error("Error creating publisher", err) + return + } + + // this message will be stored on the queue but not received by the consumer + // because it does not match the filter + msgNotIntTheFilter := amqp.NewMessage([]byte("Message that does not match the filter")) + msgNotIntTheFilter.Properties = &amqp.MessageProperties{ + Subject: ptr("No"), + } + msgNotIntTheFilter.ApplicationProperties = map[string]interface{}{"keyNo": "valueNO"} + + pr, err := publisher.Publish(context.Background(), msgNotIntTheFilter) + if err != nil { + rmq.Error("Error publishing message", err) + return + } + rmq.Info("Published message that does not match the filter", "publish result", pr.Outcome) + + // this message will be stored on the queue and received by the consumer + // because it matches the filter + msgInTheFilter := amqp.NewMessage([]byte("Message that matches the filter")) + msgInTheFilter.Properties = &amqp.MessageProperties{ + Subject: ptr("Yes_I_am_in_the_filter"), + To: ptr("the_id"), + } + msgInTheFilter.ApplicationProperties = map[string]interface{}{"keyYes": "valueYES"} + + pr, err = publisher.Publish(context.Background(), msgInTheFilter) + if err != nil { + rmq.Error("Error publishing message", err) + return + } + rmq.Info("Published message that matches the filter", "publish result", pr.Outcome) + + consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, &rmq.StreamConsumerOptions{ + InitialCredits: 10, + Offset: &rmq.OffsetFirst{}, + StreamFilterOptions: &rmq.StreamFilterOptions{ + // the SQL expression to filter messages + SQL: "properties.subject LIKE '%Yes_I_am_in_the_filter%' AND properties.to = 'the_id' AND keyYes = 'valueYES'"}, + }) + + if err != nil { + rmq.Error("Error creating consumer", err) + return + } + + consumerContext, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Consume messages from the queue. It should only receive the message that matches the filter + // the second + deliveryContext, err := consumer.Receive(consumerContext) + if err != nil { + rmq.Error("[Consumer]", "Error receiving message", err) + return + } + rmq.Info("[Consumer]", "Body", + fmt.Sprintf("%s", deliveryContext.Message().Data), + "Subject", *deliveryContext.Message().Properties.Subject, "To", *deliveryContext.Message().Properties.To) + err = deliveryContext.Accept(context.Background()) + if err != nil { + rmq.Error("Error accepting message", err) + return + } + + err = amqpConnection.Management().DeleteQueue(context.Background(), queueName) + if err != nil { + rmq.Error("Error deleting stream queue", err) + return + } + err = amqpConnection.Close(context.Background()) + if err != nil { + rmq.Error("Error closing connection", err) + return + } + _ = env.CloseConnections(context.Background()) + rmq.Info("AMQP 1.0 Client SQL Stream Filter Example Completed") + +} diff --git a/pkg/rabbitmqamqp/amqp_consumer_stream_test.go b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go index 3372996..b846242 100644 --- a/pkg/rabbitmqamqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go @@ -555,6 +555,98 @@ var _ = Describe("Consumer stream test", func() { }() }) + It("SQL filter consumer", func() { + qName := generateName("SQL filter consumer") + connection, err := Dial(context.Background(), "amqp://", nil) + Expect(err).To(BeNil()) + queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.name).To(Equal(qName)) + + publishMessagesWithMessageLogic(qName, "not_in_the_filter", 10, func(msg *amqp.Message) { + msg.Properties = &amqp.MessageProperties{Subject: ptr("not")} + }) + + publishMessagesWithMessageLogic(qName, "in_the_filter", 10, func(msg *amqp.Message) { + msg.Properties = &amqp.MessageProperties{Subject: ptr("in_the_filter")} + }) + + publishMessagesWithMessageLogic(qName, "in_the_filter_with_more", 10, func(msg *amqp.Message) { + msg.Properties = &amqp.MessageProperties{Subject: ptr("in_the_filter_with_more")} + }) + + consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + InitialCredits: 200, + Offset: &OffsetFirst{}, + StreamFilterOptions: &StreamFilterOptions{ + SQL: "properties.subject LIKE '%in_the_filter%'", + }, + }) + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 20; i++ { + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + if i < 10 { + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "in_the_filter"))) + } else { + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i-10, "in_the_filter_with_more"))) + } + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("SQL filter consumer combined to other fields", func() { + qName := generateName("SQL filter consumer combined to other fields") + connection, err := Dial(context.Background(), "amqp://", nil) + Expect(err).To(BeNil()) + queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.name).To(Equal(qName)) + publishMessagesWithMessageLogic(qName, "not_in_the_filter", 10, func(msg *amqp.Message) { + msg.Properties = &amqp.MessageProperties{Subject: ptr("not")} + }) + + publishMessagesWithMessageLogic(qName, "in_the_filter", 10, func(msg *amqp.Message) { + msg.Properties = &amqp.MessageProperties{Subject: ptr("p_in_the_filter"), To: ptr("the_id")} + msg.ApplicationProperties = map[string]interface{}{"a_in_the_filter_key": "a_in_the_filter_value"} + }) + + consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + InitialCredits: 200, + Offset: &OffsetFirst{}, + StreamFilterOptions: &StreamFilterOptions{ + SQL: "properties.subject LIKE '%in_the_filter%' AND properties.to = 'the_id' AND a_in_the_filter_key = 'a_in_the_filter_value'", + }, + }) + + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 10; i++ { + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "in_the_filter"))) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) }) type msgLogic = func(*amqp.Message) diff --git a/pkg/rabbitmqamqp/amqp_types.go b/pkg/rabbitmqamqp/amqp_types.go index 6687b0f..04f5b52 100644 --- a/pkg/rabbitmqamqp/amqp_types.go +++ b/pkg/rabbitmqamqp/amqp_types.go @@ -118,6 +118,13 @@ type IOffsetSpecification interface { toLinkFilter() amqp.LinkFilter } +// DescriptorCodeSqlFilter see: +// https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/amqp10_common/include/amqp10_filter.hrl +// see DESCRIPTOR_CODE_SQL_FILTER in rabbitmq-server +// DESCRIPTOR_CODE_SQL_FILTER is the uint64 code for amqpSqlFilter = "amqp:sql-filter" +const DescriptorCodeSqlFilter = 0x120 + +const sqlFilter = "sql-filter" const rmqStreamFilter = "rabbitmq:stream-filter" const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec" const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered" @@ -172,7 +179,27 @@ type StreamFilterOptions struct { // Filter the data based on Message Properties Properties *amqp.MessageProperties - // SQLFilter + /* SQLFilter: documentation https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions + It requires RabbitMQ 4.2 or later + + Example: + + Define a message like: + var msg := NewMessage([]byte(..)) + msg.Properties = &amqp.MessageProperties{Subject: ptr("mySubject"), To: ptr("To")} + msg.ApplicationProperties = map[string]interface{}{"filter_key": "filter_value"} + + publisher.Publish(context.Background(), msg) + Then you can create a consumer with a SQL filter like: + consumer, err := connection.NewConsumer(context.Background(), "myQueue", &StreamConsumerOptions{ + InitialCredits: 200, + Offset: &OffsetFirst{}, + StreamFilterOptions: &StreamFilterOptions{ + SQL: "properties.subject LIKE '%mySubject%' AND properties.to = 'To' AND filter_key = 'filter_value'", + }, + }) + + */ SQL string } @@ -206,7 +233,10 @@ func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter { filters = append(filters, sco.Offset.toLinkFilter()) if sco.StreamFilterOptions != nil && !isStringNilOrEmpty(&sco.StreamFilterOptions.SQL) { - + // here we use DescriptorCodeSqlFilter as the code for the sql filter + // since we need to create a simple DescribedType + // see DescriptorCodeSqlFilter const for more information + filters = append(filters, amqp.NewLinkFilter(sqlFilter, DescriptorCodeSqlFilter, sco.StreamFilterOptions.SQL)) } if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Values != nil {