Skip to content

Commit 64e82e3

Browse files
committed
Add a sample for processing poison messages
1 parent 95e9b8b commit 64e82e3

File tree

4 files changed

+262
-2
lines changed

4 files changed

+262
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Changelog
22
Newest updates are at the top of this file.
33

4+
## xxx xx 2021 - v5.xxx
5+
* Add a sample amqsbo.go to show how to deal with poison messages
6+
47
## Mar 25 2021 - v5.2.0
58
Scope of mqmetric changes seem to justify new minor number
69
* Update for MQ 9.2.2

ibmmq/mqiDLH.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func NewMQDLH(md *MQMD) *MQDLH {
5353
dlh.CodedCharSetId = MQCCSI_UNDEFINED
5454
dlh.PutApplType = 0
5555
dlh.PutApplName = ""
56-
dlh.PutTime = ""
57-
dlh.PutDate = ""
56+
dlh.PutTime = md.PutTime // Copy over the original put timestamp
57+
dlh.PutDate = md.PutDate
5858
dlh.Format = ""
5959
dlh.DestQName = ""
6060
dlh.DestQMgrName = ""

samples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ Allow use of a userid/password for authentication. There are no default values f
3131
* amqsinq.go : Demonstrate the Inq API for inquiring about object attributes
3232
* amqsset.go : Demonstrate how to set attributes of an MQ object using the MQSET verb
3333
* amqscb.go : Demonstrate use of the CALLBACK capability for asynchronous consumption of messages
34+
* amqsbo.go : Show how to deal with poison messages by putting them to a configured backout queue
35+
* amqsdlh.go : Putting a message to a DLQ with a dead-letter header
3436

3537
Some trivial scripts run the sample programs in matching pairs:
3638
* putget.sh : Run amqsput and then use the generated MsgId to get the same message with amqsget

samples/amqsbo.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* This is an example of a Go program that deals with unprocessable or "poison" messages on an IBM MQ
3+
* queue by moving them to a backout queue. As the message is moved to the backout queue
4+
* a dead-letter header is attached both to indicate the reason for the move, and to
5+
* permit a DLQ handler program to further process the message.
6+
*
7+
* The queue and queue manager name can be given as parameters on the
8+
* command line. Defaults are coded in the program.
9+
*
10+
* The input queue should be defined with both BOTHRESH and BOQNAME values and
11+
* the backout queue must exist
12+
* DEF QL(DEV.QUEUE.1) BOTHRESH(3) BOQNAME(DEV.QUEUE.BACKOUT) REPLACE
13+
* DEF QL(DEV.QUEUE.BACKOUT)
14+
*
15+
* Each MQI call prints its success or failure.
16+
*
17+
*/
18+
package main
19+
20+
/*
21+
Copyright (c) IBM Corporation 2018, 2021
22+
23+
Licensed under the Apache License, Version 2.0 (the "License");
24+
you may not use this file except in compliance with the License.
25+
You may obtain a copy of the License at
26+
27+
http://www.apache.org/licenses/LICENSE-2.0
28+
29+
Unless required by applicable law or agreed to in writing, software
30+
distributed under the License is distributed on an "AS IS" BASIS,
31+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
32+
See the License for the specific language governing permissions and
33+
limitations under the license.
34+
35+
Contributors:
36+
Mark Taylor - Initial Contribution
37+
*/
38+
39+
import (
40+
"fmt"
41+
"os"
42+
"strings"
43+
"time"
44+
45+
"github.com/ibm-messaging/mq-golang/v5/ibmmq"
46+
)
47+
48+
var qMgrObject ibmmq.MQObject
49+
var qObject ibmmq.MQObject
50+
51+
func main() {
52+
os.Exit(mainWithRc())
53+
}
54+
55+
// This function puts the message to the nominated backout queue. Adding a Dead Letter Header to the message is
56+
// usually done, although some application designs choose not to do it.
57+
func moveMsg(qMgrObject ibmmq.MQQueueManager, qName string, boQName string, md *ibmmq.MQMD, buffer []byte, reason int32) error {
58+
59+
// Construct the DLH based on the original message descriptor. This also modifies
60+
// the message descriptor.
61+
dlh := ibmmq.NewMQDLH(md)
62+
63+
// Fill in the reason this message needs to be put to a DLQ along with
64+
// any other relevant information.
65+
dlh.Reason = reason
66+
dlh.DestQName = qName
67+
dlh.DestQMgrName = qMgrObject.Name
68+
69+
mqod := ibmmq.NewMQOD()
70+
mqod.ObjectType = ibmmq.MQOT_Q
71+
mqod.ObjectName = boQName
72+
73+
pmo := ibmmq.NewMQPMO()
74+
pmo.Options = ibmmq.MQGMO_SYNCPOINT
75+
76+
// The message is put directly to the backout queue. Since we don't
77+
// expect to use this queue frequently, using Put1 is a better idea
78+
// than separately opening the queue and using Put.
79+
fmt.Printf("About to move poison message to %s queue\n", boQName)
80+
return qMgrObject.Put1(mqod, md, pmo, append(dlh.Bytes(), buffer...))
81+
}
82+
83+
// The real main function is here to set a return code.
84+
func mainWithRc() int {
85+
var values map[int32]interface{}
86+
87+
// These are the attributes that control how a poison message is handled
88+
boQName := ""
89+
boThreshold := int32(0)
90+
91+
// The default queue manager and queue to be used. These can be overridden on command line.
92+
qMgrName := "QM1"
93+
qName := "DEV.QUEUE.1"
94+
95+
fmt.Println("Sample AMQSBO.GO start")
96+
97+
// Get the queue and queue manager names from command line for overriding
98+
// the defaults. Parameters are not required.
99+
if len(os.Args) >= 2 {
100+
qName = os.Args[1]
101+
}
102+
103+
if len(os.Args) >= 3 {
104+
qMgrName = os.Args[2]
105+
}
106+
107+
// This is where we connect to the queue manager. It is assumed
108+
// that the queue manager is either local, or you have set the
109+
// client connection information externally eg via a CCDT or the
110+
// MQSERVER environment variable
111+
qMgrObject, err := ibmmq.Conn(qMgrName)
112+
if err != nil {
113+
fmt.Println(err)
114+
} else {
115+
fmt.Printf("Connected to queue manager %s\n", qMgrName)
116+
defer disc(qMgrObject)
117+
}
118+
119+
// Open of the application queue
120+
if err == nil {
121+
// Create the Object Descriptor that allows us to give the queue name
122+
mqod := ibmmq.NewMQOD()
123+
124+
// We have to say how we are going to use this queue. In this case, to GET
125+
// messages and to look for the backout configuration options associated
126+
// with the queue. That is done in the openOptions parameter.
127+
openOptions := ibmmq.MQOO_INPUT_EXCLUSIVE | ibmmq.MQOO_INQUIRE
128+
129+
// Opening a QUEUE (rather than a Topic or other object type) and give the name
130+
mqod.ObjectType = ibmmq.MQOT_Q
131+
mqod.ObjectName = qName
132+
133+
qObject, err = qMgrObject.Open(mqod, openOptions)
134+
if err != nil {
135+
fmt.Println(err)
136+
} else {
137+
fmt.Println("Opened queue", qObject.Name)
138+
defer close(qObject)
139+
}
140+
141+
// The backout configuration options for the queue are discoverable by the Inq() verb
142+
selectors := []int32{
143+
ibmmq.MQCA_BACKOUT_REQ_Q_NAME,
144+
ibmmq.MQIA_BACKOUT_THRESHOLD,
145+
}
146+
147+
values, err = qObject.Inq(selectors)
148+
if err != nil {
149+
fmt.Println(err)
150+
} else {
151+
// The returned values are extracted and converted to usable
152+
// datatypes. See amqsinq.go for more information on this verb
153+
boQName = (values[ibmmq.MQCA_BACKOUT_REQ_Q_NAME]).(string)
154+
boThreshold = (values[ibmmq.MQIA_BACKOUT_THRESHOLD]).(int32)
155+
fmt.Printf("Backout QName=%s Threshold=%d\n", boQName, boThreshold)
156+
157+
// If the queue doesn't have suitable configuration, then we can't continue
158+
if boQName == "" || boThreshold == 0 {
159+
err = fmt.Errorf("Backout parameters not correctly set")
160+
fmt.Println(err)
161+
}
162+
}
163+
}
164+
165+
msgAvail := true
166+
for msgAvail == true && err == nil {
167+
var datalen int
168+
169+
// The GET requires control structures, the Message Descriptor (MQMD)
170+
// and Get Options (MQGMO). Create those with default values.
171+
mqmd := ibmmq.NewMQMD()
172+
gmo := ibmmq.NewMQGMO()
173+
174+
// Use syncpoint control so that commit/backout works as expected
175+
gmo.Options = ibmmq.MQGMO_SYNCPOINT
176+
gmo.Options |= ibmmq.MQGMO_NO_WAIT
177+
178+
// Create a buffer for the message data. This one is large enough
179+
// for the messages put by the amqsput sample. Note that in this case
180+
// the make() operation is just allocating space - len(buffer)==0 initially.
181+
buffer := make([]byte, 0, 1024)
182+
183+
// Now we can try to get the message. This operation returns
184+
// a buffer that can be used directly.
185+
buffer, datalen, err = qObject.GetSlice(mqmd, gmo, buffer)
186+
187+
if err != nil {
188+
msgAvail = false
189+
fmt.Println(err)
190+
mqret := err.(*ibmmq.MQReturn)
191+
if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {
192+
// If there's no message available, then I won't treat that as a real error as
193+
// it's an expected situation
194+
err = nil
195+
}
196+
} else {
197+
fmt.Printf("Got message of length %d. Backout Count=%d \t%s\n", datalen, mqmd.BackoutCount, strings.TrimSpace(string(buffer)))
198+
199+
// If we have reached the backout threshold then move the message to the backout queue
200+
if mqmd.BackoutCount >= boThreshold {
201+
// Pick an reason for the failure - this can be a user-chosen number
202+
reason := ibmmq.MQRC_UNEXPECTED_ERROR
203+
err = moveMsg(qMgrObject, qName, boQName, mqmd, buffer, reason)
204+
if err != nil {
205+
fmt.Println(err)
206+
}
207+
// For this program, we'll commit even if there is an error putting to the backout queue so we don't
208+
// get into an infinite loop. But there may be more advanced strategies depending on the error code. For
209+
// example, you might count the number of failures here and delay the retries before really giving up.
210+
qMgrObject.Cmit()
211+
} else {
212+
// In real life, there would be some processing of the message here before deciding to backout or
213+
// commit the transaction. But here we will always to the backout.
214+
qMgrObject.Back()
215+
216+
// Adding an increasing delay in here may help with some error conditions so you don't just spin quickly.
217+
// For example if the reason for the failure in processing the message is due to a temporary unavailability
218+
// of another component such as a database.
219+
time.Sleep(1 * time.Second)
220+
}
221+
}
222+
}
223+
224+
// Exit with any return code extracted from the failing MQI call.
225+
// Deferred disconnect will happen after the return
226+
if err != nil {
227+
mqret, ok := err.(*ibmmq.MQReturn)
228+
if ok {
229+
return int(mqret.MQCC)
230+
}
231+
}
232+
return 0
233+
}
234+
235+
// Disconnect from the queue manager
236+
func disc(qMgrObject ibmmq.MQQueueManager) error {
237+
err := qMgrObject.Disc()
238+
if err == nil {
239+
fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name)
240+
} else {
241+
fmt.Println(err)
242+
}
243+
return err
244+
}
245+
246+
// Close the queue if it was opened
247+
func close(object ibmmq.MQObject) error {
248+
err := object.Close(0)
249+
if err == nil {
250+
fmt.Println("Closed queue")
251+
} else {
252+
fmt.Println(err)
253+
}
254+
return err
255+
}

0 commit comments

Comments
 (0)