Skip to content
This repository was archived by the owner on Sep 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions lib/statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ var dgram = require('dgram'),
* @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once
* @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent.
* @option global_tags {Array=} Optional tags that will be added to every metric
* @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement
* @bufferFlushInterval {Number} the time out value to flush out buffer if not
* @constructor
*/
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) {
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, maxBufferSize, bufferFlushInterval) {
var options = host || {},
self = this;

Expand All @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
globalize : globalize,
cacheDns : cacheDns,
mock : mock === true,
global_tags : global_tags
global_tags : global_tags,
maxBufferSize : maxBufferSize,
bufferFlushInterval: bufferFlushInterval
};
}

Expand All @@ -38,6 +42,13 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
this.socket = dgram.createSocket('udp4');
this.mock = options.mock;
this.global_tags = options.global_tags || [];
this.maxBufferSize = options.maxBufferSize || 0;
this.bufferFlushInterval = options.bufferFlushInterval || 1000;
this.buffer = "";

if(this.maxBufferSize > 0) {
this.intervalHandle = setInterval(this.timeoutCallback.bind(this), this.bufferFlushInterval);
}

if(options.cacheDns === true){
dns.lookup(options.host, function(err, address, family){
Expand Down Expand Up @@ -193,7 +204,6 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac
*/
Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) {
var message = this.prefix + stat + this.suffix + ':' + value + '|' + type,
buf,
merged_tags = [];

if(sampleRate && sampleRate < 1){
Expand All @@ -217,21 +227,68 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback)

// Only send this stat if we're not a mock Client.
if(!this.mock) {
buf = new Buffer(message);
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
} else {
if(this.maxBufferSize === 0) {
this.sendMessage(message, callback);
}
else {
this.enqueue(message);
}
}
else {
if(typeof callback === 'function'){
callback(null, 0);
}
}
};

/**
*
* @param message {String}
*/
Client.prototype.enqueue = function(message){
this.buffer += message + "\n";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to have an array and loop on each entry on flush? I don't really the like the string thing with new lines... here's an example with a PHP implementation: https://github.com/beberlei/metrics/blob/master/src/Beberlei/Metrics/Collector/StatsD.php#L95-L107

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would defeat the purpose. We aggregate packets here to reduce the number of calls to socket.send, which is an expensive call.

if(this.buffer.length >= this.maxBufferSize) {
this.flushQueue();
}
}

/**
*
*/
Client.prototype.flushQueue = function(){
this.sendMessage(this.buffer);
this.buffer = "";
}

/**
*
* @param message {String}
* @param callback {Function}
*/
Client.prototype.sendMessage = function(message, callback){
var buf = new Buffer(message);
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
}

/**
*
*/
Client.prototype.timeoutCallback = function(){
if(this.buffer !== "") {
this.flushQueue();
}
}

/**
* Close the underlying socket and stop listening for data on it.
*/
Client.prototype.close = function(){
this.socket.close();
if(this.intervalHandle) {
clearInterval(this.intervalHandle);
}
this.socket.close();
}

exports = module.exports = Client;
exports.StatsD = Client;

21 changes: 21 additions & 0 deletions perfTest/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var statsD = require('../lib/statsd');
var count = 0;
var options = {
maxBufferSize: process.argv[2]
};
var statsd = new statsD(options);

var start = new Date();

function sendPacket() {
count++;
statsd.increment('abc.cde.efg.ghk.klm', 1);
if(count %100000 === 0) {
var stop = new Date();
console.log(stop - start);
start = stop;
}
setImmediate(sendPacket);
}

sendPacket();
72 changes: 72 additions & 0 deletions test/test_statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -679,5 +679,77 @@ describe('StatsD', function(){
assertMockClientMethod('set', finished);
});
});
describe('buffer', function() {
it('should aggregate packets when maxBufferSize is set to non-zero', function (finished) {
udpTest(function (message, server) {
assert.equal(message, 'a:1|c\nb:2|c\n');
server.close();
finished();
}, function (server) {
var address = server.address();
var options = {
host: address.host,
port: address.port,
maxBufferSize: 8
};
var statsd = new StatsD(options);

statsd.increment('a', 1);
statsd.increment('b', 2);
});
});

it('should not aggregate packets when maxBufferSize is set to zero', function (finished) {
var results = [
'a:1|c',
'b:2|c'
];
var msgCount = 0;
udpTest(function (message, server) {
var index = results.indexOf(message);
assert.equal(index >= 0, true);
results.splice(index, 1);
msgCount++;
if (msgCount >= 2) {
assert.equal(results.length, 0);
server.close();
finished();
}
}, function (server) {
var address = server.address();
var options = {
host: address.host,
port: address.port,
maxBufferSize: 0
};
var statsd = new StatsD(options);

statsd.increment('a', 1);
statsd.increment('b', 2);
});
});

it('should flush the buffer when timeout value elapsed', function (finished) {
var timestamp;
udpTest(function (message, server) {
assert.equal(message, 'a:1|c\n');
var elapsed = Date.now() - timestamp;
assert.equal(elapsed > 1000, true);
server.close();
finished();
}, function (server) {
var address = server.address();
var options = {
host: address.host,
port: address.port,
maxBufferSize: 1220,
bufferFlushInterval: 1100
};
var statsd = new StatsD(options);

timestamp = new Date();
statsd.increment('a', 1);
});
});
});
});