diff --git a/lib/statsd.js b/lib/statsd.js index f27383b..739142c 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -12,9 +12,11 @@ var dgram = require('dgram'), * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. * @option global_tags {Array=} Optional tags that will be added to every metric + * @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement + * @bufferFlushInterval {Number} the time out value to flush out buffer if not * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, maxBufferSize, bufferFlushInterval) { var options = host || {}, self = this; @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags + global_tags : global_tags, + maxBufferSize : maxBufferSize, + bufferFlushInterval: bufferFlushInterval }; } @@ -38,6 +42,13 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.socket = dgram.createSocket('udp4'); this.mock = options.mock; this.global_tags = options.global_tags || []; + this.maxBufferSize = options.maxBufferSize || 0; + this.bufferFlushInterval = options.bufferFlushInterval || 1000; + this.buffer = ""; + + if(this.maxBufferSize > 0) { + this.intervalHandle = setInterval(this.timeoutCallback.bind(this), this.bufferFlushInterval); + } if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -193,7 +204,6 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac */ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) { var message = this.prefix + stat + this.suffix + ':' + value + '|' + type, - buf, merged_tags = []; if(sampleRate && sampleRate < 1){ @@ -217,21 +227,68 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) // Only send this stat if we're not a mock Client. if(!this.mock) { - buf = new Buffer(message); - this.socket.send(buf, 0, buf.length, this.port, this.host, callback); - } else { + if(this.maxBufferSize === 0) { + this.sendMessage(message, callback); + } + else { + this.enqueue(message); + } + } + else { if(typeof callback === 'function'){ callback(null, 0); } } }; +/** + * + * @param message {String} + */ +Client.prototype.enqueue = function(message){ + this.buffer += message + "\n"; + if(this.buffer.length >= this.maxBufferSize) { + this.flushQueue(); + } +} + +/** + * + */ +Client.prototype.flushQueue = function(){ + this.sendMessage(this.buffer); + this.buffer = ""; +} + +/** + * + * @param message {String} + * @param callback {Function} + */ +Client.prototype.sendMessage = function(message, callback){ + var buf = new Buffer(message); + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); +} + +/** + * + */ +Client.prototype.timeoutCallback = function(){ + if(this.buffer !== "") { + this.flushQueue(); + } +} + /** * Close the underlying socket and stop listening for data on it. */ Client.prototype.close = function(){ - this.socket.close(); + if(this.intervalHandle) { + clearInterval(this.intervalHandle); + } + this.socket.close(); } exports = module.exports = Client; exports.StatsD = Client; + diff --git a/perfTest/test.js b/perfTest/test.js new file mode 100644 index 0000000..73ffd6c --- /dev/null +++ b/perfTest/test.js @@ -0,0 +1,21 @@ +var statsD = require('../lib/statsd'); +var count = 0; +var options = { + maxBufferSize: process.argv[2] +}; +var statsd = new statsD(options); + +var start = new Date(); + +function sendPacket() { + count++; + statsd.increment('abc.cde.efg.ghk.klm', 1); + if(count %100000 === 0) { + var stop = new Date(); + console.log(stop - start); + start = stop; + } + setImmediate(sendPacket); +} + +sendPacket(); diff --git a/test/test_statsd.js b/test/test_statsd.js index 0fbb314..f66ce86 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -679,5 +679,77 @@ describe('StatsD', function(){ assertMockClientMethod('set', finished); }); }); + describe('buffer', function() { + it('should aggregate packets when maxBufferSize is set to non-zero', function (finished) { + udpTest(function (message, server) { + assert.equal(message, 'a:1|c\nb:2|c\n'); + server.close(); + finished(); + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 8 + }; + var statsd = new StatsD(options); + + statsd.increment('a', 1); + statsd.increment('b', 2); + }); + }); + it('should not aggregate packets when maxBufferSize is set to zero', function (finished) { + var results = [ + 'a:1|c', + 'b:2|c' + ]; + var msgCount = 0; + udpTest(function (message, server) { + var index = results.indexOf(message); + assert.equal(index >= 0, true); + results.splice(index, 1); + msgCount++; + if (msgCount >= 2) { + assert.equal(results.length, 0); + server.close(); + finished(); + } + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 0 + }; + var statsd = new StatsD(options); + + statsd.increment('a', 1); + statsd.increment('b', 2); + }); + }); + + it('should flush the buffer when timeout value elapsed', function (finished) { + var timestamp; + udpTest(function (message, server) { + assert.equal(message, 'a:1|c\n'); + var elapsed = Date.now() - timestamp; + assert.equal(elapsed > 1000, true); + server.close(); + finished(); + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 1220, + bufferFlushInterval: 1100 + }; + var statsd = new StatsD(options); + + timestamp = new Date(); + statsd.increment('a', 1); + }); + }); + }); });