Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 60 additions & 28 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

const {
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
Promise
} = primordials;
const { Buffer } = require('buffer');

Expand All @@ -11,7 +12,6 @@ const {
} = require('internal/errors').codes;

function from(Readable, iterable, opts) {
let iterator;
if (typeof iterable === 'string' || iterable instanceof Buffer) {
return new Readable({
objectMode: true,
Expand All @@ -23,41 +23,73 @@ function from(Readable, iterable, opts) {
});
}

if (iterable && iterable[SymbolAsyncIterator])
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
iterator = iterable[SymbolIterator]();
else
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
let onDataNeeded;

const readable = new Readable({
objectMode: true,
...opts
...opts,
read() {
onDataNeeded && onDataNeeded();
},
async destroy(error, cb) {
onDataNeeded && onDataNeeded();
try {
await pumping;
} catch (e) {
// Do not hide present error
if (!error) error = e;
}
cb(error);
},
});
// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
async function next() {

if (!iterable[SymbolAsyncIterator] && !iterable[SymbolIterator])
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);

const pumping = pump();

return readable;

async function pump() {
/*
We're iterating over sync or async iterator with the appropriate sync
or async version of the `for-of` loop.

`for-await-of` loop has an edge case when looping over synchronous
iterator.

It does not close synchronous iterator with .return() if that iterator
yields rejected Promise, so finally blocks within such an iterator are
never executed.

In the application code developers can choose between async and sync
forms of the loop depending on their needs, but in the library code we
have to handle such edge cases properly and close iterators anyway.
*/
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
next();
if (iterable[SymbolAsyncIterator]) {
for await (const data of iterable) {
if (readable.destroyed) return;
if (!readable.push(data)) {
await new Promise((resolve) => { onDataNeeded = resolve; });
if (readable.destroyed) return;
}
}
} else {
reading = false;
for (const data of iterable) {
const value = await data;
if (readable.destroyed) return;
if (!readable.push(value)) {
await new Promise((resolve) => { onDataNeeded = resolve; });
if (readable.destroyed) return;
}
}
}
} catch (err) {
readable.destroy(err);
if (!readable.destroyed) readable.push(null);
} catch (error) {
if (!readable.destroyed) readable.destroy(error);
}
}
return readable;
}

module.exports = from;
117 changes: 117 additions & 0 deletions test/parallel/test-readable-from-iterator-closing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
'use strict';

const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function asyncSupport() {
const finallyMustCall = mustCall();
async function* generate() {
try {
yield 'a';
mustNotCall('only first item is read');
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

for await (const chunk of stream) {
strictEqual(chunk, 'a');
break;
}
}

asyncSupport().then(mustCall());

async function syncSupport() {
const finallyMustCall = mustCall();
function* generate() {
try {
yield 'a';
mustNotCall('only first item is read');
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

for await (const chunk of stream) {
strictEqual(chunk, 'a');
break;
}
}

syncSupport().then(mustCall());

async function syncPromiseSupport() {
const finallyMustCall = mustCall();
function* generate() {
try {
yield Promise.resolve('a');
mustNotCall('only first item is read');
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

for await (const chunk of stream) {
strictEqual(chunk, 'a');
break;
}
}

syncPromiseSupport().then(mustCall());

async function syncRejectedSupport() {
const finallyMustCall = mustCall();
const noBodyCall = mustNotCall();
const catchMustCall = mustCall();

function* generate() {
try {
yield Promise.reject('a');
mustNotCall();
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

try {
for await (const chunk of stream) {
noBodyCall(chunk);
}
} catch {
catchMustCall();
}
}

syncRejectedSupport().then(mustCall());

async function noReturnAfterThrow() {
const returnMustNotCall = mustNotCall();
const noBodyCall = mustNotCall();
const catchMustCall = mustCall();

const stream = Readable.from({
[Symbol.asyncIterator]() { return this; },
async next() { throw new Error('a'); },
async return() { returnMustNotCall(); return { done: true }; },
});

try {
for await (const chunk of stream) {
noBodyCall(chunk);
}
} catch {
catchMustCall();
}
}

noReturnAfterThrow().then(mustCall());