Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .npmignore

This file was deleted.

15 changes: 15 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
type LimiterCallback = () => void;
type AsyncJobCallback = (cb: LimiterCallback) => void | any;
type OnDoneCallback = (...args: any) => any;

declare class Queue {
constructor(options?: {concurrency?: number});
readonly length: number;
push(...items: AsyncJobCallback[]): number;
splice(start: number, deleteCount?: number): AsyncJobCallback[];
splice(start: number, deleteCount: number, ...items: AsyncJobCallback[]): AsyncJobCallback[];
unshift(...items: AsyncJobCallback[]): number;
onDone(cb: OnDoneCallback): void;
}

export default Queue;
132 changes: 82 additions & 50 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,96 @@
'use strict';

function Queue(options) {
if (!(this instanceof Queue)) {
return new Queue(options);
}
/**
* @typedef {() => void} LimiterCallback
* @typedef {(cb: LimiterCallback) => void | any} AsyncJobCallback
* @typedef {(...args: any) => any} OnDoneCallback
*
* @extends Array<AsyncJobCallback>
*/
class Queue {
/**
* @param {{ concurrency: number; }} [options={ concurrency: Infinity }]
*/
constructor(options) {
/** @readonly @type {number} */
this.concurrency = (options || {}).concurrency || Infinity;

options = options || {};
this.concurrency = options.concurrency || Infinity;
this.pending = 0;
this.jobs = [];
this.onDoneCbs = [];
this._done = done.bind(this);
this._run = run.bind(this);
}
/** @private @type {number} */
this.pending = 0;

// Called upon completion of a job. Calls run() again
// to pluck the next job off the queue, if it exists.
function done() {
this.pending--;
this._run();
}
/** @private @type {AsyncJobCallback[]} */
this.jobs = [];

/** @private @type {OnDoneCallback[]} */
this.onDoneCbs = [];

function run() {
// Do we have capacity for jobs?
// If so, start them, uip to the concurrency limit
while (this.pending < this.concurrency && this.jobs.length) {
this.pending++;
var job = this.jobs.shift();
job(this._done);
this.push = this._extendFromArray('push');
this.splice = this._extendFromArray('splice');
this.unshift = this._extendFromArray('unshift');

this._run = this._run.bind(this);
this._done = this._done.bind(this);
}

// Are we done processing all jobs? If so, call onDone callbacks
while (this.length === 0 && this.onDoneCbs.length) {
var cb = this.onDoneCbs.pop();
cb();
get length() {
return this.pending + this.jobs.length;
}
}

// Replicate popular array methods to queue up jobs.
['push', 'splice', 'unshift'].forEach(function(method) {
Queue.prototype[method] = function() {
var methodResult = Array.prototype[method].apply(this.jobs, arguments);
process.nextTick(this._run);
return methodResult;
};
});
/**
* Called upon completion of a job. Calls _run() again
* to pluck the next job off the queue, if it exists.
* @private
*/
_done() {
this.pending--;
this._run();
}

Object.defineProperty(Queue.prototype, 'length', {
get: function() {
return this.pending + this.jobs.length;
/** @private */
_run() {
// Do we have capacity for jobs?
// If so, start them, uip to the concurrency limit
while (this.pending < this.concurrency && this.jobs.length) {
this.pending++;
const job = this.jobs.shift();
job && job(this._done);
}

// Are we done processing all jobs? If so, call onDone callbacks
while (this.length === 0 && this.onDoneCbs.length) {
var cb = this.onDoneCbs.pop();
cb && cb();
}
}

/**
* Replicate popular array methods to queue up jobs.
* @private
* @param {string} method
*/
_extendFromArray(method) {
return function() {
// @ts-ignore
const methodResult = Array.prototype[method].apply(this.jobs, arguments);
// @ts-ignore
process.nextTick(this._run);
return methodResult;
};
}
});

// Simply adds a callback to the end of the job list
Queue.prototype.onDone = function(cb) {
if (typeof cb === 'function') this.onDoneCbs.push(cb);
// If there are no jobs in the queue, this will call `cb()` in the next tick.
// This is intended for that there is predictable behavior even when running a
// job list of length 0.
process.nextTick(this._run);
};
/**
* Simply adds a callback to the end of the job list
* @param {any} cb
*/
onDone(cb) {
if (typeof cb === 'function') this.onDoneCbs.push(cb);
// If there are no jobs in the queue,
// this will call `cb()` in the next tick.
// This is intended for that there is predictable
// behavior even when running a job list of length 0.
// @ts-ignore
process.nextTick(this._run);
}
}

module.exports = Queue;
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
"example": "node example",
"lint": "eslint ."
},
"files": [
"index.js",
"index.d.ts"
],
"main": "index.js",
"types": "index.d.ts",
"repository": "https://github.com/strml/async-limiter.git",
"author": "Samuel Reed <[email protected]",
"license": "MIT"
Expand Down
2 changes: 1 addition & 1 deletion test/throttleSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ describe('Async-Limiter', function(endTest) {
});

it('has a length property that follows concurrency', function(endTest) {
var t = Limiter({ concurrency: 1 });
var t = new Limiter({ concurrency: 1 });

t.push(function(cb) {
setTimeout(function() {
Expand Down