Skip to content

Commit e128da5

Browse files
committed
destroyable EventStreams; closes #302
- add reference to long-polling timer on class - add `EventStream` method `_destroy()`, which clears any existing long-polling timer, and deletes it. Destroys stream if not already destroyed. Implements `Readable#_destroy` in Node.js v8.x and newer. - add polyfill for `Readable#destroy`, which isn't present pre-Node.js-v8.x - add checks for `destroyed` stream state which effectively abort various tasks in `EventStream` - consolidate long-polling retry logic into its own private method, `EventStream#retryLongPoll()`. - add tests for logic changes - add note in docs
1 parent b673062 commit e128da5

File tree

3 files changed

+206
-3
lines changed

3 files changed

+206
-3
lines changed

docs/events.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ point in time:
8989
client.events.get({stream_position: '1408838928446360'}, callback);
9090
```
9191

92+
### Destroying the Stream
93+
94+
If you ever need to *stop* long-polling, use:
95+
96+
```js
97+
client.events.destroy();
98+
```
99+
100+
This *will not* cancel in-process network requests. It *will* ensure no further long-polling nor event fetching takes place.
101+
92102
Enterprise Events
93103
-----------------
94104

lib/event-stream.js

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ util.inherits(EventStream, Readable);
9393
* @private
9494
*/
9595
EventStream.prototype.getLongPollInfo = function() {
96+
if (this.destroyed) {
97+
return Promise.resolve(false);
98+
}
9699

97100
return this._client.events.getLongPollInfo()
98101
.then(longPollInfo => {
@@ -108,7 +111,7 @@ EventStream.prototype.getLongPollInfo = function() {
108111

109112
// Only retry on resolvable errors
110113
if (!err.authExpired) {
111-
setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
114+
this.retryPollInfo();
112115
}
113116
});
114117
};
@@ -121,6 +124,9 @@ EventStream.prototype.getLongPollInfo = function() {
121124
* @private
122125
*/
123126
EventStream.prototype.doLongPoll = function() {
127+
if (this.destroyed) {
128+
return Promise.resolve(false);
129+
}
124130

125131
// If we're over the max number of retries, reset
126132
if (this._longPollRetries > this._longPollInfo.max_retries) {
@@ -147,6 +153,9 @@ EventStream.prototype.doLongPoll = function() {
147153
this._longPollRetries += 1;
148154
return this._client.wrapWithDefaultHandler(this._client.get)(url, options)
149155
.then(data => {
156+
if (this.destroyed) {
157+
return false;
158+
}
150159

151160
if (data.message === 'reconnect') {
152161
return this.getLongPollInfo();
@@ -161,17 +170,34 @@ EventStream.prototype.doLongPoll = function() {
161170
return this.fetchEvents();
162171
})
163172
.catch(() => {
164-
setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
173+
this.retryPollInfo();
165174
});
166175
};
167176

177+
/**
178+
* Retries long-polling after a delay.
179+
* Does not attempt if stream is already destroyed.
180+
* @returns {void}
181+
* @private
182+
*/
183+
EventStream.prototype.retryPollInfo = function() {
184+
185+
if (!this.destroyed) {
186+
this._retryTimer = setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
187+
}
188+
};
189+
168190
/**
169191
* Fetch the latest group of events and push them into the stream
170192
* @returns {Promise} Promise for testing purposes
171193
* @private
172194
*/
173195
EventStream.prototype.fetchEvents = function() {
174196

197+
if (this.destroyed) {
198+
return Promise.resolve(false);
199+
}
200+
175201
var eventParams = {
176202
stream_position: this._streamPosition,
177203
limit: 500
@@ -229,7 +255,8 @@ EventStream.prototype.fetchEvents = function() {
229255
.catch(err => {
230256

231257
this.emit('error', err);
232-
setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
258+
259+
this.retryPollInfo();
233260
})
234261
);
235262
};
@@ -269,4 +296,34 @@ EventStream.prototype._read = function() {
269296
this.getLongPollInfo();
270297
};
271298

299+
/**
300+
* Implementation of stream-internal `_destroy` function (v8.0.0 and later).
301+
* Called by stream consumers to effectively stop polling via the public
302+
* `destroy()`.
303+
* @returns {void}
304+
* @private
305+
*/
306+
EventStream.prototype._destroy = function() {
307+
clearTimeout(this._retryTimer);
308+
delete this._retryTimer;
309+
};
310+
311+
// backwards-compat for Node.js pre-v8.0.0
312+
if (!(typeof Readable.destroy === 'function')) {
313+
/**
314+
* Destroys the stream. Rough polyfill for `Readable#destroy`.
315+
* @returns {void}
316+
* @public
317+
*/
318+
EventStream.prototype.destroy = function() {
319+
if (!this.destroyed) {
320+
process.nextTick(() => {
321+
this.emit('close');
322+
});
323+
this.destroyed = true;
324+
this._destroy();
325+
}
326+
};
327+
}
328+
272329
module.exports = EventStream;

tests/lib/event-stream-test.js

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,25 @@ describe('EventStream', function() {
170170
clock.tick(1000);
171171
});
172172
});
173+
174+
describe('when stream is destroyed', function() {
175+
176+
beforeEach(function() {
177+
eventStream.destroy();
178+
});
179+
180+
it('should do nothing and resolve `false`', function() {
181+
182+
sandbox.mock(eventStream._client.events)
183+
.expects('getLongPollInfo')
184+
.never();
185+
186+
return eventStream.getLongPollInfo()
187+
.then(result => {
188+
assert.strictEqual(result, false);
189+
});
190+
});
191+
});
173192
});
174193

175194
describe('doLongPoll()', function() {
@@ -274,6 +293,54 @@ describe('EventStream', function() {
274293
return eventStream.doLongPoll();
275294
});
276295

296+
describe('when stream is destroyed', function() {
297+
298+
describe('before client fetch', function() {
299+
300+
beforeEach(function() {
301+
eventStream.destroy();
302+
});
303+
304+
it('should do nothing and resolve `false`', function() {
305+
306+
sandbox.mock(eventStream)
307+
.expects('getLongPollInfo')
308+
.never();
309+
sandbox.mock(boxClientFake)
310+
.expects('wrapWithDefaultHandler')
311+
.never();
312+
313+
return eventStream.doLongPoll()
314+
.then(result => {
315+
assert.strictEqual(result, false);
316+
});
317+
});
318+
});
319+
320+
describe('after client fetch', function() {
321+
322+
it('should resolve `false`', function() {
323+
324+
sandbox.mock(eventStream)
325+
.expects('getLongPollInfo')
326+
.never();
327+
sandbox.mock(boxClientFake).expects('wrapWithDefaultHandler')
328+
.returnsArg(0);
329+
sandbox.mock(boxClientFake).expects('get')
330+
.returns(Promise.resolve({
331+
message: 'reconnect'
332+
}));
333+
334+
const promise = eventStream.doLongPoll();
335+
336+
eventStream.destroy();
337+
338+
return promise.then(result => {
339+
assert.strictEqual(result, false);
340+
});
341+
});
342+
});
343+
});
277344
});
278345

279346
describe('fetchEvents()', function() {
@@ -550,6 +617,25 @@ describe('EventStream', function() {
550617
]);
551618
});
552619

620+
describe('when stream is destroyed before fetch', function() {
621+
622+
beforeEach(function() {
623+
624+
eventStream.destroy();
625+
});
626+
627+
it('should do nothing and resolve `false`', function() {
628+
629+
sandbox.mock(eventStream._rateLimiter)
630+
.expects('then')
631+
.never();
632+
633+
return eventStream.fetchEvents()
634+
.then(result => {
635+
assert.strictEqual(result, false);
636+
});
637+
});
638+
});
553639
});
554640

555641
describe('cleanupDedupFilter()', function() {
@@ -583,4 +669,54 @@ describe('EventStream', function() {
583669
});
584670
});
585671

672+
describe('destroy()', function() {
673+
674+
afterEach(function() {
675+
676+
eventStream.destroy();
677+
});
678+
679+
it('should cancel any active polling retry timer', function() {
680+
681+
eventStream.retryPollInfo();
682+
assert.property(eventStream, '_retryTimer');
683+
684+
eventStream.destroy();
685+
assert.notProperty(eventStream, '_retryTimer');
686+
});
687+
688+
it('should set `destroyed` prop if not already set', function() {
689+
690+
eventStream.destroy();
691+
assert.propertyVal(eventStream, 'destroyed', true);
692+
});
693+
});
694+
695+
describe('retryPollInfo()', function() {
696+
697+
describe('when not destroyed', function() {
698+
699+
afterEach(function() {
700+
701+
eventStream.destroy();
702+
});
703+
704+
it('should create a `_retryTimer` property', function() {
705+
706+
assert.notProperty(eventStream, '_retryTimer');
707+
eventStream.retryPollInfo();
708+
assert.property(eventStream, '_retryTimer');
709+
});
710+
});
711+
712+
describe('when destroyed', function() {
713+
714+
it('should not create a `_retryTimer` property', function() {
715+
716+
eventStream.destroy();
717+
eventStream.retryPollInfo();
718+
assert.notProperty(eventStream, '_retryTimer');
719+
});
720+
});
721+
});
586722
});

0 commit comments

Comments
 (0)