From a8f5ad26be2ab6ab625315e4d9bf93d400f45ed7 Mon Sep 17 00:00:00 2001 From: Vadim Sabirov Date: Mon, 9 Nov 2020 01:27:12 +0300 Subject: [PATCH 1/5] Add the ability to subscribe to different topics with their own handlers. --- pkg/broker.go | 8 +------- pkg/broker_test.go | 8 +++----- pkg/mocks/BrokerInterface.go | 9 +++------ pkg/subscriber.go | 18 ++++++++++++++++-- pkg/subscriber_test.go | 2 +- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/pkg/broker.go b/pkg/broker.go index 880b9f3..9a75b78 100644 --- a/pkg/broker.go +++ b/pkg/broker.go @@ -165,13 +165,7 @@ func (b *Broker) RegisterSubscriber(topic string, fn interface{}) error { return errors.New("handler func must have outcome argument") } - if len(b.subscriber.handlers) > 0 { - if b.subscriber.handlers[0].reqEl != reqType.Elem() { - return errors.New("first arguments for all handlers must have equal types") - } - } - - h := &handler{method: refFn, reqEl: reqType.Elem()} + h := &handler{method: refFn, reqEl: reqType.Elem(), topic: topic} b.subscriber.handlers = append(b.subscriber.handlers, h) b.subscriber.ext[key] = true diff --git a/pkg/broker_test.go b/pkg/broker_test.go index f7f1c4d..9200955 100644 --- a/pkg/broker_test.go +++ b/pkg/broker_test.go @@ -11,7 +11,7 @@ import ( ) const ( - defaultAmqpUrl = "amqp://127.0.0.1:5672" + defaultAmqpUrl = "amqp://rabbitmq:rabbitmq@127.0.0.1:5672" ) type TestStruct struct{} @@ -200,13 +200,12 @@ func TestBroker_RegisterSubscriber_HandlerIncorrectFirstArgTypes(t *testing.T) { } err = b.RegisterSubscriber("test", fn2) - assert.Error(t, err) - assert.Equal(t, "first arguments for all handlers must have equal types", err.Error()) + assert.NoError(t, err) broker, ok := b.(*Broker) assert.True(t, ok) - assert.Len(t, broker.subscriber.handlers, 1) + assert.Len(t, broker.subscriber.handlers, 2) } func TestBroker_RegisterSubscriber_HandlerMoreOneHandlersCorrect(t *testing.T) { @@ -267,7 +266,6 @@ func TestBroker_InitSubscriber(t *testing.T) { sub := broker.initSubscriber(topic) assert.Equal(t, broker.rabbitMQ, sub.rabbit) - assert.Equal(t, topic, sub.topic) assert.Len(t, sub.handlers, 0) assert.Len(t, sub.ext, 0) diff --git a/pkg/mocks/BrokerInterface.go b/pkg/mocks/BrokerInterface.go index 2888a0e..fae47bc 100644 --- a/pkg/mocks/BrokerInterface.go +++ b/pkg/mocks/BrokerInterface.go @@ -2,12 +2,9 @@ package mocks -import ( - amqp "github.com/streadway/amqp" - mock "github.com/stretchr/testify/mock" - - proto "github.com/golang/protobuf/proto" -) +import amqp "github.com/streadway/amqp" +import mock "github.com/stretchr/testify/mock" +import proto "github.com/golang/protobuf/proto" // BrokerInterface is an autogenerated mock type for the BrokerInterface type type BrokerInterface struct { diff --git a/pkg/subscriber.go b/pkg/subscriber.go index cec6d08..d1dd5ac 100644 --- a/pkg/subscriber.go +++ b/pkg/subscriber.go @@ -24,10 +24,10 @@ const ( type handler struct { method reflect.Value reqEl reflect.Type + topic string } type subscriber struct { - topic string handlers []*handler fn func(msg amqp.Delivery) @@ -47,7 +47,6 @@ type subscriber struct { func (b *Broker) initSubscriber(topic string) (subs *subscriber) { subs = &subscriber{ - topic: topic, rabbit: b.rabbitMQ, handlers: []*handler{}, ext: make(map[string]bool), @@ -87,6 +86,8 @@ func (s *subscriber) Subscribe() (err error) { return } + var handlerExists = false + for _, h := range s.handlers { st := reflect.New(h.reqEl).Interface().(proto.Message) @@ -107,6 +108,12 @@ func (s *subscriber) Subscribe() (err error) { continue } + if msg.RoutingKey == h.topic { + handlerExists = true + } else { + continue + } + returnValues := h.method.Call([]reflect.Value{reflect.ValueOf(st), reflect.ValueOf(msg)}) if err := returnValues[0].Interface(); err != nil { @@ -127,6 +134,13 @@ func (s *subscriber) Subscribe() (err error) { } } } + + if !handlerExists { + log.Printf("[*] Unable to find handler for the routing key %s. Message skipped. \n Message: %s \n", msg.RoutingKey, string(msg.Body)) + if s.opts.ConsumeOpts.Opts[OptAutoAck] == false { + _ = msg.Reject(false) + } + } } s.fn = fn diff --git a/pkg/subscriber_test.go b/pkg/subscriber_test.go index fac6b8d..4943957 100644 --- a/pkg/subscriber_test.go +++ b/pkg/subscriber_test.go @@ -130,4 +130,4 @@ func TestSubscriber_Consume_Error(t *testing.T) { _, err = broker.subscriber.consume() assert.NotNil(t, err) -} \ No newline at end of file +} From 9537bb378b62080922f1065132fd1cf4952f7200 Mon Sep 17 00:00:00 2001 From: Vadim Sabirov Date: Tue, 10 Nov 2020 16:05:26 +0300 Subject: [PATCH 2/5] Remove credentials in test --- pkg/broker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/broker_test.go b/pkg/broker_test.go index 9200955..a6abdaf 100644 --- a/pkg/broker_test.go +++ b/pkg/broker_test.go @@ -11,7 +11,7 @@ import ( ) const ( - defaultAmqpUrl = "amqp://rabbitmq:rabbitmq@127.0.0.1:5672" + defaultAmqpUrl = "amqp://127.0.0.1:5672" ) type TestStruct struct{} From 4358ac4b6ed011839a9e06d8d4d8eac65cbc185a Mon Sep 17 00:00:00 2001 From: Vadim Sabirov Date: Sun, 3 Jan 2021 15:51:28 +0300 Subject: [PATCH 3/5] Move check of the handler for the topic to the beginning of the cycle. --- pkg/subscriber.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/subscriber.go b/pkg/subscriber.go index d1dd5ac..32aebfe 100644 --- a/pkg/subscriber.go +++ b/pkg/subscriber.go @@ -89,6 +89,12 @@ func (s *subscriber) Subscribe() (err error) { var handlerExists = false for _, h := range s.handlers { + if msg.RoutingKey == h.topic { + handlerExists = true + } else { + continue + } + st := reflect.New(h.reqEl).Interface().(proto.Message) if msg.ContentType == protobufContentType { @@ -108,12 +114,6 @@ func (s *subscriber) Subscribe() (err error) { continue } - if msg.RoutingKey == h.topic { - handlerExists = true - } else { - continue - } - returnValues := h.method.Call([]reflect.Value{reflect.ValueOf(st), reflect.ValueOf(msg)}) if err := returnValues[0].Interface(); err != nil { From 8211028b8899004cc7e1064af99cbfdd714c6f78 Mon Sep 17 00:00:00 2001 From: Vadim Sabirov Date: Sat, 16 Jan 2021 04:59:46 +0300 Subject: [PATCH 4/5] Add constants for message header --- pkg/rabbitmq.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/rabbitmq.go b/pkg/rabbitmq.go index c98a0d8..20810c9 100644 --- a/pkg/rabbitmq.go +++ b/pkg/rabbitmq.go @@ -18,6 +18,9 @@ const ( OptImmediate = "immediate" OptInternal = "internal" + HeaderXDeath = "x-death" + HeaderRoutingKeys = "routing-keys" + errorNicConnection = "connection not open" errorNilChannel = "channel not open" ) From 400705194f842065c2a13c948ed28f5ec2480403 Mon Sep 17 00:00:00 2001 From: Vadim Sabirov Date: Sat, 23 Jan 2021 00:03:30 +0300 Subject: [PATCH 5/5] Detect routing key for death message --- pkg/subscriber.go | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/pkg/subscriber.go b/pkg/subscriber.go index 32aebfe..92b2d51 100644 --- a/pkg/subscriber.go +++ b/pkg/subscriber.go @@ -3,6 +3,7 @@ package rabbitmq import ( "bytes" "errors" + "fmt" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/streadway/amqp" @@ -91,18 +92,24 @@ func (s *subscriber) Subscribe() (err error) { for _, h := range s.handlers { if msg.RoutingKey == h.topic { handlerExists = true - } else { + } else if deathHeaders, ok := msg.Headers[HeaderXDeath]; ok && msg.RoutingKey == defaultQueueBindKey { + routingKey := s.getFirstRoutingKeyFromDeathLetter(deathHeaders) + + if routingKey == h.topic { + msg.RoutingKey = routingKey + handlerExists = true + } + } + + if !handlerExists { continue } st := reflect.New(h.reqEl).Interface().(proto.Message) if msg.ContentType == protobufContentType { - err = proto.Unmarshal(msg.Body, st) - } else if msg.ContentType == jsonContentType { - err = jsonpb.Unmarshal(bytes.NewReader(msg.Body), st) } @@ -111,7 +118,7 @@ func (s *subscriber) Subscribe() (err error) { _ = msg.Nack(false, false) } log.Printf("[*] Cannot unmarshal message, message skipped. \n Error: %s \n Message: %s \n", err.Error(), string(msg.Body)) - continue + break } returnValues := h.method.Call([]reflect.Value{reflect.ValueOf(st), reflect.ValueOf(msg)}) @@ -133,6 +140,8 @@ func (s *subscriber) Subscribe() (err error) { _ = msg.Ack(false) } } + + break } if !handlerExists { @@ -242,3 +251,22 @@ func (s *subscriber) consume() (dls <-chan amqp.Delivery, err error) { return } + +func (s *subscriber) getFirstRoutingKeyFromDeathLetter(deathHeaders interface{}) string { + if len(deathHeaders.([]interface{})) < 1 { + return "" + } + + deathHeader := deathHeaders.([]interface{})[0] + routingKeys, ok := deathHeader.(amqp.Table)[HeaderRoutingKeys] + + if !ok { + return "" + } + + if len(routingKeys.([]interface{})) < 1 { + return "" + } + + return fmt.Sprint(routingKeys.([]interface{})[0]) +}