From 862d8537078982766ff844368ae234d302a76ac5 Mon Sep 17 00:00:00 2001 From: Stephen McAuley Date: Sat, 11 Feb 2017 21:27:35 +0000 Subject: [PATCH 1/2] Declare passive * Update declarer interface with new passive methods * Add DeclareQueuePassive * Add DeclareExchangePassive * Add tests for new methods --- declaration.go | 35 +++++++++++++++++++++ declaration_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/declaration.go b/declaration.go index ea5784a..be5081c 100644 --- a/declaration.go +++ b/declaration.go @@ -8,7 +8,9 @@ type Declaration func(Declarer) error // Declarer is implemented by *amqp.Channel type Declarer interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error + ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error } @@ -31,6 +33,25 @@ func DeclareQueue(q *Queue) Declaration { } } +// DeclareQueuePassive is a way to declare AMQP queue +func DeclareQueuePassive(q *Queue) Declaration { + name := q.Name + return func(c Declarer) error { + q.Name = name + realQ, err := c.QueueDeclarePassive(q.Name, + q.Durable, + q.AutoDelete, + q.Exclusive, + false, + q.Args, + ) + q.l.Lock() + q.Name = realQ.Name + q.l.Unlock() + return err + } +} + // DeclareExchange is a way to declare AMQP exchange func DeclareExchange(e Exchange) Declaration { return func(c Declarer) error { @@ -45,6 +66,20 @@ func DeclareExchange(e Exchange) Declaration { } } +// DeclareExchange is a way to declare AMQP exchange +func DeclareExchangePassive(e Exchange) Declaration { + return func(c Declarer) error { + return c.ExchangeDeclarePassive(e.Name, + e.Kind, + e.Durable, + e.AutoDelete, + false, + false, + e.Args, + ) + } +} + // DeclareBinding is a way to declare AMQP binding between AMQP queue and exchange func DeclareBinding(b Binding) Declaration { return func(c Declarer) error { diff --git a/declaration_test.go b/declaration_test.go index 776d120..493cb47 100644 --- a/declaration_test.go +++ b/declaration_test.go @@ -7,9 +7,11 @@ import ( ) type testDeclarer struct { - _QueueDeclare func(string) (amqp.Queue, error) - _ExchangeDeclare func() error - _QueueBind func() error + _QueueDeclare func(string) (amqp.Queue, error) + _QueueDeclarePassive func(string) (amqp.Queue, error) + _ExchangeDeclare func() error + _ExchangeDeclarePassive func() error + _QueueBind func() error } func (td *testDeclarer) QueueDeclare(name string, durable, autoDelete, @@ -17,11 +19,21 @@ func (td *testDeclarer) QueueDeclare(name string, durable, autoDelete, return td._QueueDeclare(name) } +func (td *testDeclarer) QueueDeclarePassive(name string, durable, autoDelete, + exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { + return td._QueueDeclarePassive(name) +} + func (td *testDeclarer) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { return td._ExchangeDeclare() } +func (td *testDeclarer) ExchangeDeclarePassive(name, kind string, durable, autoDelete, + internal, noWait bool, args amqp.Table) error { + return td._ExchangeDeclarePassive() +} + func (td *testDeclarer) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { return td._QueueBind() @@ -64,6 +76,43 @@ func TestDeclareQueue(t *testing.T) { } } +func TestDeclareQueuePassive(t *testing.T) { + var ( + callOK, nameOK bool + ) + + q := &Queue{ + Name: "Q1 Passive", + } + + td := &testDeclarer{ + _QueueDeclarePassive: func(name string) (amqp.Queue, error) { + callOK = true + if name == "Q1 Passive" { + nameOK = true + } + return amqp.Queue{Name: "Q1_REAL_PASSIVE"}, nil + }, + } + + testDec := DeclareQueuePassive(q) + testDec(td) + + if !callOK { + t.Error("DeclareQueuePassive() should call declarer.QueueDeclarePassive()") + } + + if q.Name != "Q1_REAL_PASSIVE" { + t.Error("DeclareQueuePassive() should update queue name from AMQP reply") + } + + // call it another time (like reconnect event happened) + testDec(td) + if !nameOK { + t.Error("queue name should be preserved") + } +} + func TestDeclareExchange(t *testing.T) { var ok bool @@ -83,6 +132,25 @@ func TestDeclareExchange(t *testing.T) { } } +func TestDeclareExchangePassive(t *testing.T) { + var ok bool + + e := Exchange{Name: "ex1passive"} + + td := &testDeclarer{ + _ExchangeDeclarePassive: func() error { + ok = true + return nil + }, + } + + DeclareExchangePassive(e)(td) + + if !ok { + t.Error("DeclareExchangePassive() should call declarer.ExchangeDeclarePassive()") + } +} + func TestDeclareBinding(t *testing.T) { var ok bool From 64d7cdd161834d6b98c15f88c16b130523511695 Mon Sep 17 00:00:00 2001 From: Stephen McAuley Date: Wed, 8 Mar 2017 22:33:46 +0000 Subject: [PATCH 2/2] Interface for declare passive * Add separate interface for passive methods * Refactor passive methods and type assert to ensure they are similar types --- declaration.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/declaration.go b/declaration.go index be5081c..1b5ad60 100644 --- a/declaration.go +++ b/declaration.go @@ -1,6 +1,10 @@ package cony -import "github.com/streadway/amqp" +import ( + "errors" + + "github.com/streadway/amqp" +) // Declaration is a callback type to declare AMQP queue/exchange/binding type Declaration func(Declarer) error @@ -8,12 +12,16 @@ type Declaration func(Declarer) error // Declarer is implemented by *amqp.Channel type Declarer interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) - QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error - ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error } +// DeclarerPassive is implemented by *amqp.Channel +type DeclarerPassive interface { + QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error +} + // DeclareQueue is a way to declare AMQP queue func DeclareQueue(q *Queue) Declaration { name := q.Name @@ -37,8 +45,13 @@ func DeclareQueue(q *Queue) Declaration { func DeclareQueuePassive(q *Queue) Declaration { name := q.Name return func(c Declarer) error { + cp, found := c.(DeclarerPassive) + if !found { + return errors.New("Type not found.") + } + q.Name = name - realQ, err := c.QueueDeclarePassive(q.Name, + realQ, err := cp.QueueDeclarePassive(q.Name, q.Durable, q.AutoDelete, q.Exclusive, @@ -69,7 +82,12 @@ func DeclareExchange(e Exchange) Declaration { // DeclareExchange is a way to declare AMQP exchange func DeclareExchangePassive(e Exchange) Declaration { return func(c Declarer) error { - return c.ExchangeDeclarePassive(e.Name, + cp, found := c.(DeclarerPassive) + if !found { + return errors.New("Type not found.") + } + + return cp.ExchangeDeclarePassive(e.Name, e.Kind, e.Durable, e.AutoDelete,