Skip to content

Commit bef1708

Browse files
committed
stream: add ReadableByteStream.tee()
This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong [email protected]
1 parent aa90e7a commit bef1708

File tree

4 files changed

+339
-53
lines changed

4 files changed

+339
-53
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 304 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ class ReadableStream {
215215
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
216216
this[kState] = {
217217
disturbed: false,
218+
reader: undefined,
218219
state: 'readable',
219220
storedError: undefined,
220221
stream: undefined,
@@ -1111,7 +1112,6 @@ class ReadableByteStreamController {
11111112
chunk);
11121113
}
11131114
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1114-
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
11151115
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
11161116
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
11171117
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@@ -1122,11 +1122,7 @@ class ReadableByteStreamController {
11221122
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
11231123
if (this[kState].stream[kState].state !== 'readable')
11241124
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1125-
readableByteStreamControllerEnqueue(
1126-
this,
1127-
chunkBuffer,
1128-
chunkByteLength,
1129-
chunkByteOffset);
1125+
readableByteStreamControllerEnqueue(this, chunk);
11301126
}
11311127

11321128
/**
@@ -1430,6 +1426,13 @@ function readableStreamPipeTo(
14301426
}
14311427

14321428
function readableStreamTee(stream, cloneForBranch2) {
1429+
if (isReadableByteStreamController(stream[kState].controller)) {
1430+
return readableByteStreamTee(stream);
1431+
}
1432+
return readableStreamDefaultTee(stream, cloneForBranch2);
1433+
}
1434+
1435+
function readableStreamDefaultTee(stream, cloneForBranch2) {
14331436
const reader = new ReadableStreamDefaultReader(stream);
14341437
let reading = false;
14351438
let canceled1 = false;
@@ -1524,6 +1527,296 @@ function readableStreamTee(stream, cloneForBranch2) {
15241527
return [branch1, branch2];
15251528
}
15261529

1530+
function readableByteStreamTee(stream) {
1531+
assert(isReadableStream(stream));
1532+
assert(isReadableByteStreamController(stream[kState].controller));
1533+
1534+
let reader = new ReadableStreamDefaultReader(stream);
1535+
let reading = false;
1536+
let readAgainForBranch1 = false;
1537+
let readAgainForBranch2 = false;
1538+
let canceled1 = false;
1539+
let canceled2 = false;
1540+
let reason1;
1541+
let reason2;
1542+
let branch1;
1543+
let branch2;
1544+
const cancelDeferred = createDeferredPromise();
1545+
1546+
function forwardReaderError(thisReader) {
1547+
PromisePrototypeThen(
1548+
thisReader[kState].close.promise,
1549+
undefined,
1550+
(error) => {
1551+
if (thisReader !== reader) {
1552+
return;
1553+
}
1554+
readableStreamDefaultControllerError(branch1[kState].controller, error);
1555+
readableStreamDefaultControllerError(branch2[kState].controller, error);
1556+
if (!canceled1 || !canceled2) {
1557+
cancelDeferred.resolve();
1558+
}
1559+
}
1560+
);
1561+
}
1562+
1563+
function pullWithDefaultReader() {
1564+
if (isReadableStreamBYOBReader(reader)) {
1565+
readableStreamBYOBReaderRelease(reader);
1566+
reader = new ReadableStreamDefaultReader(stream);
1567+
forwardReaderError(reader);
1568+
}
1569+
1570+
const readRequest = {
1571+
[kChunk](chunk) {
1572+
queueMicrotask(() => {
1573+
readAgainForBranch1 = false;
1574+
readAgainForBranch2 = false;
1575+
const chunk1 = chunk;
1576+
let chunk2 = chunk;
1577+
1578+
if (!canceled1 && !canceled2) {
1579+
try {
1580+
chunk2 = new Uint8Array(
1581+
ArrayBufferPrototypeSlice(
1582+
chunk.buffer,
1583+
chunk.byteOffset,
1584+
chunk.byteOffset + chunk.byteLength
1585+
)
1586+
);
1587+
} catch (error) {
1588+
readableByteStreamControllerError(
1589+
branch1[kState].controller,
1590+
error
1591+
);
1592+
readableByteStreamControllerError(
1593+
branch2[kState].controller,
1594+
error
1595+
);
1596+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1597+
return;
1598+
}
1599+
}
1600+
if (!canceled1) {
1601+
readableByteStreamControllerEnqueue(
1602+
branch1[kState].controller,
1603+
chunk1
1604+
);
1605+
}
1606+
if (!canceled2) {
1607+
readableByteStreamControllerEnqueue(
1608+
branch2[kState].controller,
1609+
chunk2
1610+
);
1611+
}
1612+
reading = false;
1613+
1614+
if (readAgainForBranch1) {
1615+
pull1Algorithm();
1616+
} else if (readAgainForBranch2) {
1617+
pull2Algorithm();
1618+
}
1619+
});
1620+
},
1621+
[kClose]() {
1622+
reading = false;
1623+
1624+
if (!canceled1) {
1625+
readableByteStreamControllerClose(branch1[kState].controller);
1626+
}
1627+
if (!canceled2) {
1628+
readableByteStreamControllerClose(branch2[kState].controller);
1629+
}
1630+
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1631+
readableByteStreamControllerRespond(branch1[kState].controller, 0);
1632+
}
1633+
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1634+
readableByteStreamControllerRespond(branch2[kState].controller, 0);
1635+
}
1636+
if (!canceled1 || !canceled2) {
1637+
cancelDeferred.resolve();
1638+
}
1639+
},
1640+
[kError]() {
1641+
reading = false;
1642+
},
1643+
};
1644+
1645+
readableStreamDefaultReaderRead(reader, readRequest);
1646+
}
1647+
1648+
function pullWithBYOBReader(view, forBranch2) {
1649+
if (isReadableStreamDefaultReader(reader)) {
1650+
readableStreamDefaultReaderRelease(reader);
1651+
reader = new ReadableStreamBYOBReader(stream);
1652+
forwardReaderError(reader);
1653+
}
1654+
1655+
const byobBranch = forBranch2 === true ? branch2 : branch1;
1656+
const otherBranch = forBranch2 === false ? branch2 : branch1;
1657+
const readIntoRequest = {
1658+
[kChunk](chunk) {
1659+
queueMicrotask(() => {
1660+
readAgainForBranch1 = false;
1661+
readAgainForBranch2 = false;
1662+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1663+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1664+
1665+
if (!otherCanceled) {
1666+
let clonedChunk;
1667+
1668+
try {
1669+
clonedChunk = new Uint8Array(
1670+
ArrayBufferPrototypeSlice(
1671+
chunk.buffer,
1672+
chunk.byteOffset,
1673+
chunk.byteOffset + chunk.byteLength
1674+
)
1675+
);
1676+
} catch (error) {
1677+
readableByteStreamControllerError(
1678+
byobBranch[kState].controller,
1679+
error
1680+
);
1681+
readableByteStreamControllerError(
1682+
otherBranch[kState].controller,
1683+
error
1684+
);
1685+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1686+
return;
1687+
}
1688+
if (!byobCanceled) {
1689+
readableByteStreamControllerRespondWithNewView(
1690+
byobBranch[kState].controller,
1691+
chunk
1692+
);
1693+
}
1694+
1695+
readableByteStreamControllerEnqueue(
1696+
otherBranch[kState].controller,
1697+
clonedChunk
1698+
);
1699+
} else if (!byobCanceled) {
1700+
readableByteStreamControllerRespondWithNewView(
1701+
byobBranch[kState].controller,
1702+
chunk
1703+
);
1704+
}
1705+
reading = false;
1706+
1707+
if (readAgainForBranch1) {
1708+
pull1Algorithm();
1709+
} else if (readAgainForBranch2) {
1710+
pull2Algorithm();
1711+
}
1712+
});
1713+
},
1714+
[kClose](chunk) {
1715+
reading = false;
1716+
1717+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1718+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1719+
1720+
if (!byobCanceled) {
1721+
readableByteStreamControllerClose(byobBranch[kState].controller);
1722+
}
1723+
if (!otherCanceled) {
1724+
readableByteStreamControllerClose(otherBranch[kState].controller);
1725+
}
1726+
if (chunk !== undefined) {
1727+
if (!byobCanceled) {
1728+
readableByteStreamControllerRespondWithNewView(
1729+
byobBranch[kState].controller,
1730+
chunk
1731+
);
1732+
}
1733+
if (
1734+
!otherCanceled &&
1735+
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1736+
) {
1737+
readableByteStreamControllerRespond(
1738+
otherBranch[kState].controller,
1739+
0
1740+
);
1741+
}
1742+
}
1743+
if (!byobCanceled || !otherCanceled) {
1744+
cancelDeferred.resolve();
1745+
}
1746+
},
1747+
[kError]() {
1748+
reading = false;
1749+
},
1750+
};
1751+
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1752+
}
1753+
1754+
function pull1Algorithm() {
1755+
if (reading) {
1756+
readAgainForBranch1 = true;
1757+
return PromiseResolve();
1758+
}
1759+
reading = true;
1760+
1761+
const byobRequest = branch1[kState].controller.byobRequest;
1762+
if (byobRequest === null) {
1763+
pullWithDefaultReader();
1764+
} else {
1765+
pullWithBYOBReader(byobRequest[kState].view, false);
1766+
}
1767+
return PromiseResolve();
1768+
}
1769+
1770+
function pull2Algorithm() {
1771+
if (reading) {
1772+
readAgainForBranch2 = true;
1773+
return PromiseResolve();
1774+
}
1775+
reading = true;
1776+
1777+
const byobRequest = branch2[kState].controller.byobRequest;
1778+
if (byobRequest === null) {
1779+
pullWithDefaultReader();
1780+
} else {
1781+
pullWithBYOBReader(byobRequest[kState].view, true);
1782+
}
1783+
return PromiseResolve();
1784+
}
1785+
1786+
function cancel1Algorithm(reason) {
1787+
canceled1 = true;
1788+
reason1 = reason;
1789+
if (canceled2) {
1790+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1791+
}
1792+
return cancelDeferred.promise;
1793+
}
1794+
1795+
function cancel2Algorithm(reason) {
1796+
canceled2 = true;
1797+
reason2 = reason;
1798+
if (canceled1) {
1799+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1800+
}
1801+
return cancelDeferred.promise;
1802+
}
1803+
1804+
branch1 = new ReadableStream({
1805+
type: 'bytes',
1806+
pull: pull1Algorithm,
1807+
cancel: cancel1Algorithm,
1808+
});
1809+
branch2 = new ReadableStream({
1810+
type: 'bytes',
1811+
pull: pull2Algorithm,
1812+
cancel: cancel2Algorithm,
1813+
});
1814+
1815+
forwardReaderError(reader);
1816+
1817+
return [branch1, branch2];
1818+
}
1819+
15271820
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
15281821
const {
15291822
buffer,
@@ -2317,18 +2610,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
23172610
desc.bytesFilled += size;
23182611
}
23192612

2320-
function readableByteStreamControllerEnqueue(
2321-
controller,
2322-
buffer,
2323-
byteLength,
2324-
byteOffset) {
2613+
function readableByteStreamControllerEnqueue(controller, chunk) {
23252614
const {
23262615
closeRequested,
23272616
pendingPullIntos,
23282617
queue,
23292618
stream,
23302619
} = controller[kState];
23312620

2621+
const buffer = ArrayBufferViewGetBuffer(chunk);
2622+
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2623+
const byteLength = ArrayBufferViewGetByteLength(chunk);
2624+
23322625
if (closeRequested || stream[kState].state !== 'readable')
23332626
return;
23342627

test/parallel/test-whatwg-readablestream.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1561,7 +1561,7 @@ class Source {
15611561
assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller));
15621562
readableStreamDefaultControllerEnqueue(controller);
15631563
readableByteStreamControllerClose(controller);
1564-
readableByteStreamControllerEnqueue(controller);
1564+
readableByteStreamControllerEnqueue(controller, new Uint8Array(1));
15651565
}
15661566

15671567
{

0 commit comments

Comments
 (0)