From 0e08b559a6462e676d407a7d322b28dc6de9ca36 Mon Sep 17 00:00:00 2001 From: Links Date: Sun, 21 Apr 2019 08:43:11 +0200 Subject: [PATCH] add support for grouping messages in on UDP requests this is very helpfull if many data points are send over the Network. In my usecase reducing the UDP packets send by 19 times --- lib/statsd.js | 43 ++++++++++++++++++++++++++++++++----- test/test_statsd.js | 52 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 7 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index f27383b..4bc162f 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -12,9 +12,11 @@ var dgram = require('dgram'), * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. * @option global_tags {Array=} Optional tags that will be added to every metric + * @option groupTime {Number} An optional number for grouping all requests in a timeframe + * @option maxGroupSize{Number} An optional number for max packet sieze if grouping is used * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, groupTime, maxGroupSize) { var options = host || {}, self = this; @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags + global_tags : global_tags, + groupTime : groupTime, + maxGroupSize: maxGroupSize }; } @@ -38,6 +42,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.socket = dgram.createSocket('udp4'); this.mock = options.mock; this.global_tags = options.global_tags || []; + this.groupTime = options.groupTime || 0; + this.maxGroupSize= options.maxGroupSize || 1300; + if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -193,7 +200,6 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac */ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) { var message = this.prefix + stat + this.suffix + ':' + value + '|' + type, - buf, merged_tags = []; if(sampleRate && sampleRate < 1){ @@ -217,8 +223,35 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) // Only send this stat if we're not a mock Client. if(!this.mock) { - buf = new Buffer(message); - this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + if(typeof callback !== 'function' && this.groupTime > 0){ + if(!this.send_message) { + this.send_message = message; + } else { + var newSize = (this.send_message.length + 1 + message.length); + if(newSize > this.maxGroupSize) { + var buf = new Buffer(this.send_message); + this.send_message = message; + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + } else { + this.send_message += "\n" + message; + } + } + + if(!this.timmer_running) { + var self = this; + function send_timer() { + var buf = new Buffer(self.send_message); + self.send_message = undefined; + self.timmer_running = false; + self.socket.send(buf, 0, buf.length, self.port, self.host, callback); + } + setTimeout(send_timer, this.groupTime); + this.timmer_running = true; + } + } else { + var buf = new Buffer(message); + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + } } else { if(typeof callback === 'function'){ callback(null, 0); diff --git a/test/test_statsd.js b/test/test_statsd.js index 0fbb314..dbc1e31 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -84,12 +84,14 @@ describe('StatsD', function(){ assert.equal(global.statsd, undefined); assert.equal(statsd.mock, undefined); assert.deepEqual(statsd.global_tags, []); + assert.equal(statsd.groupTime, 0); + assert.equal(statsd.maxGroupSize, 1300); assert.ok(!statsd.mock); }); it('should set the proper values when specified', function(){ // cachedDns isn't tested here; see below - var statsd = new StatsD('host', 1234, 'prefix', 'suffix', true, null, true, ['gtag']); + var statsd = new StatsD('host', 1234, 'prefix', 'suffix', true, null, true, ['gtag'], 5, 1500); assert.equal(statsd.host, 'host'); assert.equal(statsd.port, 1234); assert.equal(statsd.prefix, 'prefix'); @@ -97,6 +99,8 @@ describe('StatsD', function(){ assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); assert.deepEqual(statsd.global_tags, ['gtag']); + assert.equal(statsd.groupTime, 5); + assert.equal(statsd.maxGroupSize, 1500); }); it('should set the proper values with options hash format', function(){ @@ -108,7 +112,9 @@ describe('StatsD', function(){ suffix: 'suffix', globalize: true, mock: true, - global_tags: ['gtag'] + global_tags: ['gtag'], + groupTime: 5, + maxGroupSize: 1500 }); assert.equal(statsd.host, 'host'); assert.equal(statsd.port, 1234); @@ -117,6 +123,8 @@ describe('StatsD', function(){ assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); assert.deepEqual(statsd.global_tags, ['gtag']); + assert.equal(statsd.groupTime, 5); + assert.equal(statsd.maxGroupSize, 1500); }); it('should attempt to cache a dns record if dnsCache is specified', function(done){ @@ -680,4 +688,44 @@ describe('StatsD', function(){ }); }); + + describe('#groupTime', function(finished){ + it('check if messages are grouped', function(finished){ + udpTest(function(message, server){ + assert.equal(message, 'test:42|g\ntest2:42|g\ntest3:42|g'); + server.close(); + finished(); + }, function(server){ + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + groupTime: 1 + }); + statsd.gauge('test', 42); + statsd.gauge('test2', 42); + statsd.gauge('test3', 42); + }); + }); + + it('check if messages are split at size', function(finished){ + udpTest(function(message, server){ + assert.equal(message, 'test:42|g\ntest2:42|g'); + server.close(); + finished(); + }, function(server){ + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + groupTime: 1, + maxGroupSize: 20, + }); + statsd.gauge('test', 42); + statsd.gauge('test2', 42); + statsd.gauge('this will be in the next message', 42); + }); + }); + }); + });