@@ -30,8 +30,8 @@ about MQ queues
3030*/
3131
3232import (
33- "github.com/ibm-messaging/mq-golang/ibmmq"
3433 // "fmt"
34+ "github.com/ibm-messaging/mq-golang/ibmmq"
3535 "strings"
3636 "time"
3737)
@@ -48,6 +48,17 @@ const (
4848 ATTR_Q_SINCE_GET = "time_since_get"
4949 ATTR_Q_MAX_DEPTH = "attribute_max_depth"
5050 ATTR_Q_USAGE = "attribute_usage"
51+
52+ // The next two attributes are given the same name
53+ // as the published statistics from the amqsrua-style
54+ // vaues. That allows a dashboard for Distributed and z/OS
55+ // to merge the same query.
56+ ATTR_Q_INTERVAL_PUT = "mqput_mqput1_count"
57+ ATTR_Q_INTERVAL_GET = "mqget_count"
58+ // This is the Highest Depth returned over an interval via the
59+ // RESET QSTATS command. Contrast with the attribute_max_depth
60+ // value which is the DISPLAY QL(x) MAXDEPTH attribute.
61+ ATTR_Q_INTERVAL_HI_DEPTH = "hi_depth"
5162)
5263
5364var QueueStatus StatusSet
@@ -89,9 +100,19 @@ func QueueInitAttributes() {
89100 QueueStatus .Attributes [attr ] = newStatusAttribute (attr , "Queue Depth" , ibmmq .MQIA_CURRENT_Q_DEPTH )
90101 }
91102
103+ if platform == ibmmq .MQPL_ZOS && useResetQStats {
104+ attr = ATTR_Q_INTERVAL_PUT
105+ QueueStatus .Attributes [attr ] = newStatusAttribute (attr , "Put/Put1 Count" , ibmmq .MQIA_MSG_ENQ_COUNT )
106+ attr = ATTR_Q_INTERVAL_GET
107+ QueueStatus .Attributes [attr ] = newStatusAttribute (attr , "Get Count" , ibmmq .MQIA_MSG_DEQ_COUNT )
108+ attr = ATTR_Q_INTERVAL_HI_DEPTH
109+ QueueStatus .Attributes [attr ] = newStatusAttribute (attr , "Highest Depth" , ibmmq .MQIA_HIGH_Q_DEPTH )
110+ }
111+
92112 // This is not really a monitoring metric but it enables calculations to be made such as %full for
93113 // the queue. It's extracted at startup of the program via INQUIRE_Q and not updated later even if the
94- // queue definition is changed. It's not easy to generate the % value in this program as the CurDepth will
114+ // queue definition is changed until rediscovery of the queues on a schedule.
115+ // It's not easy to generate the % value in this program as the CurDepth will
95116 // usually - but not always - come from the published resource stats. So we don't have direct access to it.
96117 // Recording the MaxDepth allows Prometheus etc to do the calculation regardless of how the CurDepth was obtained.
97118 attr = ATTR_Q_MAX_DEPTH
@@ -142,6 +163,9 @@ func CollectQueueStatus(patterns string) error {
142163 }
143164 //fmt.Printf("Collecting qStatus for %s\n",qName)
144165 err = collectQueueStatus (qName , ibmmq .MQOT_Q )
166+ if err == nil && useResetQStats {
167+ err = collectResetQStats (qName )
168+ }
145169 }
146170 } else {
147171 for _ , pattern := range queuePatterns {
@@ -150,6 +174,9 @@ func CollectQueueStatus(patterns string) error {
150174 continue
151175 }
152176 err = collectQueueStatus (pattern , ibmmq .MQOT_Q )
177+ if err == nil && useResetQStats {
178+ err = collectResetQStats (pattern )
179+ }
153180 }
154181 }
155182 return err
@@ -204,6 +231,43 @@ func collectQueueStatus(pattern string, instanceType int32) error {
204231 return err
205232}
206233
234+ func collectResetQStats (pattern string ) error {
235+ var err error
236+
237+ statusClearReplyQ ()
238+ putmqmd , pmo , cfh , buf := statusSetCommandHeaders ()
239+
240+ // Can allow all the other fields to default
241+ cfh .Command = ibmmq .MQCMD_RESET_Q_STATS
242+
243+ // Add the parameters one at a time into a buffer
244+ pcfparm := new (ibmmq.PCFParameter )
245+ pcfparm .Type = ibmmq .MQCFT_STRING
246+ pcfparm .Parameter = ibmmq .MQCA_Q_NAME
247+ pcfparm .String = []string {pattern }
248+ cfh .ParameterCount ++
249+ buf = append (buf , pcfparm .Bytes ()... )
250+
251+ buf = append (cfh .Bytes (), buf ... )
252+
253+ // And now put the command to the queue
254+ err = cmdQObj .Put (putmqmd , pmo , buf )
255+ if err != nil {
256+ return err
257+ }
258+
259+ // Now get the responses - loop until all have been received (one
260+ // per queue) or we run out of time
261+ for allReceived := false ; ! allReceived ; {
262+ cfh , buf , allReceived , err = statusGetReply ()
263+ if buf != nil {
264+ parseResetQStatsData (cfh , buf )
265+ }
266+ }
267+
268+ return err
269+ }
270+
207271// Issue the INQUIRE_Q call for wildcarded queue names and
208272// extract the required attributes - currently, just the
209273// Maximum Queue Depth
@@ -334,16 +398,68 @@ func parseQData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
334398 now := time .Now ()
335399 QueueStatus .Attributes [ATTR_Q_SINCE_PUT ].Values [key ] = newStatusValueInt64 (statusTimeDiff (now , lastPutDate , lastPutTime ))
336400 QueueStatus .Attributes [ATTR_Q_SINCE_GET ].Values [key ] = newStatusValueInt64 (statusTimeDiff (now , lastGetDate , lastGetTime ))
337-
338401 if s , ok := qInfoMap [key ]; ok {
339- maxDepth := s .MaxDepth
402+ maxDepth := s .AttrMaxDepth
340403 QueueStatus .Attributes [ATTR_Q_MAX_DEPTH ].Values [key ] = newStatusValueInt64 (maxDepth )
341- usage := s .Usage
404+ usage := s .AttrUsage
342405 QueueStatus .Attributes [ATTR_Q_USAGE ].Values [key ] = newStatusValueInt64 (usage )
343406 }
344407 return key
345408}
346409
410+ // Given a PCF response message, parse it to extract the desired statistics
411+ func parseResetQStatsData (cfh * ibmmq.MQCFH , buf []byte ) string {
412+ var elem * ibmmq.PCFParameter
413+
414+ qName := ""
415+ key := ""
416+
417+ parmAvail := true
418+ bytesRead := 0
419+ offset := 0
420+ datalen := len (buf )
421+ if cfh == nil || cfh .ParameterCount == 0 {
422+ return ""
423+ }
424+
425+ // Parse it once to extract the fields that are needed for the map key
426+ for parmAvail && cfh .CompCode != ibmmq .MQCC_FAILED {
427+ elem , bytesRead = ibmmq .ReadPCFParameter (buf [offset :])
428+ offset += bytesRead
429+ // Have we now reached the end of the message
430+ if offset >= datalen {
431+ parmAvail = false
432+ }
433+
434+ // Only one field needed for queues
435+ switch elem .Parameter {
436+ case ibmmq .MQCA_Q_NAME :
437+ qName = strings .TrimSpace (elem .String [0 ])
438+ }
439+ }
440+
441+ // Create a unique key for this instance
442+ key = qName
443+
444+ QueueStatus .Attributes [ATTR_Q_NAME ].Values [key ] = newStatusValueString (qName )
445+
446+ // And then re-parse the message so we can store the metrics now knowing the map key
447+ parmAvail = true
448+ offset = 0
449+ for parmAvail && cfh .CompCode != ibmmq .MQCC_FAILED {
450+ elem , bytesRead = ibmmq .ReadPCFParameter (buf [offset :])
451+ offset += bytesRead
452+ // Have we now reached the end of the message
453+ if offset >= datalen {
454+ parmAvail = false
455+ }
456+
457+ statusGetIntAttributes (QueueStatus , elem , key )
458+ }
459+
460+ return key
461+ }
462+
347463func parseQAttrData (cfh * ibmmq.MQCFH , buf []byte ) {
348464 var elem * ibmmq.PCFParameter
349465
@@ -388,15 +504,15 @@ func parseQAttrData(cfh *ibmmq.MQCFH, buf []byte) {
388504 v := elem .Int64Value [0 ]
389505 if v > 0 {
390506 if qInfo , ok := qInfoMap [qName ]; ok {
391- qInfo .MaxDepth = v
507+ qInfo .AttrMaxDepth = v
392508 }
393509 }
394510 //fmt.Printf("MaxQDepth for %s = %d \n",qName,v)
395511 case ibmmq .MQIA_USAGE :
396512 v := elem .Int64Value [0 ]
397513 if v > 0 {
398514 if qInfo , ok := qInfoMap [qName ]; ok {
399- qInfo .Usage = v
515+ qInfo .AttrUsage = v
400516 }
401517 }
402518 }
0 commit comments