Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
const { once } = require('events');

{
// forEach works on synchronous streams with a synchronous predicate
Expand Down Expand Up @@ -43,14 +43,59 @@ const { setTimeout } = require('timers/promises');
})().then(common.mustCall());
}

{
// forEach works on an infinite stream
const ac = new AbortController();
const { signal } = ac;
const stream = Readable.from(async function* () {
while (true) yield 1;
}(), { signal });
let i = 0;
assert.rejects(stream.forEach(common.mustCall((x) => {
i++;
if (i === 10) ac.abort();
assert.strictEqual(x, 1);
}, 10)), { name: 'AbortError' }).then(common.mustCall());
}

{
// Emitting an error during `forEach`
const stream = Readable.from([1, 2, 3, 4, 5]);
assert.rejects(stream.forEach(async (x) => {
if (x === 3) {
stream.emit('error', new Error('boom'));
}
}), /boom/).then(common.mustCall());
}

{
// Throwing an error during `forEach` (sync)
const stream = Readable.from([1, 2, 3, 4, 5]);
assert.rejects(stream.forEach((x) => {
if (x === 3) {
throw new Error('boom');
}
}), /boom/).then(common.mustCall());
}

{
// Throwing an error during `forEach` (async)
const stream = Readable.from([1, 2, 3, 4, 5]);
assert.rejects(stream.forEach(async (x) => {
if (x === 3) {
return Promise.reject(new Error('boom'));
}
}), /boom/).then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const forEachPromise =
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
await once(signal, 'abort');
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
Expand Down