@@ -95,6 +95,7 @@ const {
9595 ArrayBufferViewGetByteOffset,
9696 ArrayBufferGetByteLength,
9797 AsyncIterator,
98+ cloneAsUint8Array,
9899 copyArrayBuffer,
99100 customInspect,
100101 dequeueValue,
@@ -215,6 +216,7 @@ class ReadableStream {
215216 throw new ERR_INVALID_ARG_VALUE ( 'source' , 'Object' , source ) ;
216217 this [ kState ] = {
217218 disturbed : false ,
219+ reader : undefined ,
218220 state : 'readable' ,
219221 storedError : undefined ,
220222 stream : undefined ,
@@ -1111,7 +1113,6 @@ class ReadableByteStreamController {
11111113 chunk ) ;
11121114 }
11131115 const chunkByteLength = ArrayBufferViewGetByteLength ( chunk ) ;
1114- const chunkByteOffset = ArrayBufferViewGetByteOffset ( chunk ) ;
11151116 const chunkBuffer = ArrayBufferViewGetBuffer ( chunk ) ;
11161117 const chunkBufferByteLength = ArrayBufferGetByteLength ( chunkBuffer ) ;
11171118 if ( chunkByteLength === 0 || chunkBufferByteLength === 0 ) {
@@ -1122,11 +1123,7 @@ class ReadableByteStreamController {
11221123 throw new ERR_INVALID_STATE . TypeError ( 'Controller is already closed' ) ;
11231124 if ( this [ kState ] . stream [ kState ] . state !== 'readable' )
11241125 throw new ERR_INVALID_STATE . TypeError ( 'ReadableStream is already closed' ) ;
1125- readableByteStreamControllerEnqueue (
1126- this ,
1127- chunkBuffer ,
1128- chunkByteLength ,
1129- chunkByteOffset ) ;
1126+ readableByteStreamControllerEnqueue ( this , chunk ) ;
11301127 }
11311128
11321129 /**
@@ -1430,6 +1427,13 @@ function readableStreamPipeTo(
14301427}
14311428
14321429function readableStreamTee ( stream , cloneForBranch2 ) {
1430+ if ( isReadableByteStreamController ( stream [ kState ] . controller ) ) {
1431+ return readableByteStreamTee ( stream ) ;
1432+ }
1433+ return readableStreamDefaultTee ( stream , cloneForBranch2 ) ;
1434+ }
1435+
1436+ function readableStreamDefaultTee ( stream , cloneForBranch2 ) {
14331437 const reader = new ReadableStreamDefaultReader ( stream ) ;
14341438 let reading = false ;
14351439 let canceled1 = false ;
@@ -1524,6 +1528,284 @@ function readableStreamTee(stream, cloneForBranch2) {
15241528 return [ branch1 , branch2 ] ;
15251529}
15261530
1531+ function readableByteStreamTee ( stream ) {
1532+ assert ( isReadableStream ( stream ) ) ;
1533+ assert ( isReadableByteStreamController ( stream [ kState ] . controller ) ) ;
1534+
1535+ let reader = new ReadableStreamDefaultReader ( stream ) ;
1536+ let reading = false ;
1537+ let readAgainForBranch1 = false ;
1538+ let readAgainForBranch2 = false ;
1539+ let canceled1 = false ;
1540+ let canceled2 = false ;
1541+ let reason1 ;
1542+ let reason2 ;
1543+ let branch1 ;
1544+ let branch2 ;
1545+ const cancelDeferred = createDeferredPromise ( ) ;
1546+
1547+ function forwardReaderError ( thisReader ) {
1548+ PromisePrototypeThen (
1549+ thisReader [ kState ] . close . promise ,
1550+ undefined ,
1551+ ( error ) => {
1552+ if ( thisReader !== reader ) {
1553+ return ;
1554+ }
1555+ readableStreamDefaultControllerError ( branch1 [ kState ] . controller , error ) ;
1556+ readableStreamDefaultControllerError ( branch2 [ kState ] . controller , error ) ;
1557+ if ( ! canceled1 || ! canceled2 ) {
1558+ cancelDeferred . resolve ( ) ;
1559+ }
1560+ }
1561+ ) ;
1562+ }
1563+
1564+ function pullWithDefaultReader ( ) {
1565+ if ( isReadableStreamBYOBReader ( reader ) ) {
1566+ readableStreamBYOBReaderRelease ( reader ) ;
1567+ reader = new ReadableStreamDefaultReader ( stream ) ;
1568+ forwardReaderError ( reader ) ;
1569+ }
1570+
1571+ const readRequest = {
1572+ [ kChunk ] ( chunk ) {
1573+ queueMicrotask ( ( ) => {
1574+ readAgainForBranch1 = false ;
1575+ readAgainForBranch2 = false ;
1576+ const chunk1 = chunk ;
1577+ let chunk2 = chunk ;
1578+
1579+ if ( ! canceled1 && ! canceled2 ) {
1580+ try {
1581+ chunk2 = cloneAsUint8Array ( chunk ) ;
1582+ } catch ( error ) {
1583+ readableByteStreamControllerError (
1584+ branch1 [ kState ] . controller ,
1585+ error
1586+ ) ;
1587+ readableByteStreamControllerError (
1588+ branch2 [ kState ] . controller ,
1589+ error
1590+ ) ;
1591+ cancelDeferred . resolve ( readableStreamCancel ( stream , error ) ) ;
1592+ return ;
1593+ }
1594+ }
1595+ if ( ! canceled1 ) {
1596+ readableByteStreamControllerEnqueue (
1597+ branch1 [ kState ] . controller ,
1598+ chunk1
1599+ ) ;
1600+ }
1601+ if ( ! canceled2 ) {
1602+ readableByteStreamControllerEnqueue (
1603+ branch2 [ kState ] . controller ,
1604+ chunk2
1605+ ) ;
1606+ }
1607+ reading = false ;
1608+
1609+ if ( readAgainForBranch1 ) {
1610+ pull1Algorithm ( ) ;
1611+ } else if ( readAgainForBranch2 ) {
1612+ pull2Algorithm ( ) ;
1613+ }
1614+ } ) ;
1615+ } ,
1616+ [ kClose ] ( ) {
1617+ reading = false ;
1618+
1619+ if ( ! canceled1 ) {
1620+ readableByteStreamControllerClose ( branch1 [ kState ] . controller ) ;
1621+ }
1622+ if ( ! canceled2 ) {
1623+ readableByteStreamControllerClose ( branch2 [ kState ] . controller ) ;
1624+ }
1625+ if ( branch1 [ kState ] . controller [ kState ] . pendingPullIntos . length > 0 ) {
1626+ readableByteStreamControllerRespond ( branch1 [ kState ] . controller , 0 ) ;
1627+ }
1628+ if ( branch2 [ kState ] . controller [ kState ] . pendingPullIntos . length > 0 ) {
1629+ readableByteStreamControllerRespond ( branch2 [ kState ] . controller , 0 ) ;
1630+ }
1631+ if ( ! canceled1 || ! canceled2 ) {
1632+ cancelDeferred . resolve ( ) ;
1633+ }
1634+ } ,
1635+ [ kError ] ( ) {
1636+ reading = false ;
1637+ } ,
1638+ } ;
1639+
1640+ readableStreamDefaultReaderRead ( reader , readRequest ) ;
1641+ }
1642+
1643+ function pullWithBYOBReader ( view , forBranch2 ) {
1644+ if ( isReadableStreamDefaultReader ( reader ) ) {
1645+ readableStreamDefaultReaderRelease ( reader ) ;
1646+ reader = new ReadableStreamBYOBReader ( stream ) ;
1647+ forwardReaderError ( reader ) ;
1648+ }
1649+
1650+ const byobBranch = forBranch2 === true ? branch2 : branch1 ;
1651+ const otherBranch = forBranch2 === false ? branch2 : branch1 ;
1652+ const readIntoRequest = {
1653+ [ kChunk ] ( chunk ) {
1654+ queueMicrotask ( ( ) => {
1655+ readAgainForBranch1 = false ;
1656+ readAgainForBranch2 = false ;
1657+ const byobCanceled = forBranch2 === true ? canceled2 : canceled1 ;
1658+ const otherCanceled = forBranch2 === false ? canceled2 : canceled1 ;
1659+
1660+ if ( ! otherCanceled ) {
1661+ let clonedChunk ;
1662+
1663+ try {
1664+ clonedChunk = cloneAsUint8Array ( chunk ) ;
1665+ } catch ( error ) {
1666+ readableByteStreamControllerError (
1667+ byobBranch [ kState ] . controller ,
1668+ error
1669+ ) ;
1670+ readableByteStreamControllerError (
1671+ otherBranch [ kState ] . controller ,
1672+ error
1673+ ) ;
1674+ cancelDeferred . resolve ( readableStreamCancel ( stream , error ) ) ;
1675+ return ;
1676+ }
1677+ if ( ! byobCanceled ) {
1678+ readableByteStreamControllerRespondWithNewView (
1679+ byobBranch [ kState ] . controller ,
1680+ chunk
1681+ ) ;
1682+ }
1683+
1684+ readableByteStreamControllerEnqueue (
1685+ otherBranch [ kState ] . controller ,
1686+ clonedChunk
1687+ ) ;
1688+ } else if ( ! byobCanceled ) {
1689+ readableByteStreamControllerRespondWithNewView (
1690+ byobBranch [ kState ] . controller ,
1691+ chunk
1692+ ) ;
1693+ }
1694+ reading = false ;
1695+
1696+ if ( readAgainForBranch1 ) {
1697+ pull1Algorithm ( ) ;
1698+ } else if ( readAgainForBranch2 ) {
1699+ pull2Algorithm ( ) ;
1700+ }
1701+ } ) ;
1702+ } ,
1703+ [ kClose ] ( chunk ) {
1704+ reading = false ;
1705+
1706+ const byobCanceled = forBranch2 === true ? canceled2 : canceled1 ;
1707+ const otherCanceled = forBranch2 === false ? canceled2 : canceled1 ;
1708+
1709+ if ( ! byobCanceled ) {
1710+ readableByteStreamControllerClose ( byobBranch [ kState ] . controller ) ;
1711+ }
1712+ if ( ! otherCanceled ) {
1713+ readableByteStreamControllerClose ( otherBranch [ kState ] . controller ) ;
1714+ }
1715+ if ( chunk !== undefined ) {
1716+ if ( ! byobCanceled ) {
1717+ readableByteStreamControllerRespondWithNewView (
1718+ byobBranch [ kState ] . controller ,
1719+ chunk
1720+ ) ;
1721+ }
1722+ if (
1723+ ! otherCanceled &&
1724+ otherBranch [ kState ] . controller [ kState ] . pendingPullIntos . length > 0
1725+ ) {
1726+ readableByteStreamControllerRespond (
1727+ otherBranch [ kState ] . controller ,
1728+ 0
1729+ ) ;
1730+ }
1731+ }
1732+ if ( ! byobCanceled || ! otherCanceled ) {
1733+ cancelDeferred . resolve ( ) ;
1734+ }
1735+ } ,
1736+ [ kError ] ( ) {
1737+ reading = false ;
1738+ } ,
1739+ } ;
1740+ readableStreamBYOBReaderRead ( reader , view , readIntoRequest ) ;
1741+ }
1742+
1743+ function pull1Algorithm ( ) {
1744+ if ( reading ) {
1745+ readAgainForBranch1 = true ;
1746+ return PromiseResolve ( ) ;
1747+ }
1748+ reading = true ;
1749+
1750+ const byobRequest = branch1 [ kState ] . controller . byobRequest ;
1751+ if ( byobRequest === null ) {
1752+ pullWithDefaultReader ( ) ;
1753+ } else {
1754+ pullWithBYOBReader ( byobRequest [ kState ] . view , false ) ;
1755+ }
1756+ return PromiseResolve ( ) ;
1757+ }
1758+
1759+ function pull2Algorithm ( ) {
1760+ if ( reading ) {
1761+ readAgainForBranch2 = true ;
1762+ return PromiseResolve ( ) ;
1763+ }
1764+ reading = true ;
1765+
1766+ const byobRequest = branch2 [ kState ] . controller . byobRequest ;
1767+ if ( byobRequest === null ) {
1768+ pullWithDefaultReader ( ) ;
1769+ } else {
1770+ pullWithBYOBReader ( byobRequest [ kState ] . view , true ) ;
1771+ }
1772+ return PromiseResolve ( ) ;
1773+ }
1774+
1775+ function cancel1Algorithm ( reason ) {
1776+ canceled1 = true ;
1777+ reason1 = reason ;
1778+ if ( canceled2 ) {
1779+ cancelDeferred . resolve ( readableStreamCancel ( stream , [ reason1 , reason2 ] ) ) ;
1780+ }
1781+ return cancelDeferred . promise ;
1782+ }
1783+
1784+ function cancel2Algorithm ( reason ) {
1785+ canceled2 = true ;
1786+ reason2 = reason ;
1787+ if ( canceled1 ) {
1788+ cancelDeferred . resolve ( readableStreamCancel ( stream , [ reason1 , reason2 ] ) ) ;
1789+ }
1790+ return cancelDeferred . promise ;
1791+ }
1792+
1793+ branch1 = new ReadableStream ( {
1794+ type : 'bytes' ,
1795+ pull : pull1Algorithm ,
1796+ cancel : cancel1Algorithm ,
1797+ } ) ;
1798+ branch2 = new ReadableStream ( {
1799+ type : 'bytes' ,
1800+ pull : pull2Algorithm ,
1801+ cancel : cancel2Algorithm ,
1802+ } ) ;
1803+
1804+ forwardReaderError ( reader ) ;
1805+
1806+ return [ branch1 , branch2 ] ;
1807+ }
1808+
15271809function readableByteStreamControllerConvertPullIntoDescriptor ( desc ) {
15281810 const {
15291811 buffer,
@@ -2317,18 +2599,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
23172599 desc . bytesFilled += size ;
23182600}
23192601
2320- function readableByteStreamControllerEnqueue (
2321- controller ,
2322- buffer ,
2323- byteLength ,
2324- byteOffset ) {
2602+ function readableByteStreamControllerEnqueue ( controller , chunk ) {
23252603 const {
23262604 closeRequested,
23272605 pendingPullIntos,
23282606 queue,
23292607 stream,
23302608 } = controller [ kState ] ;
23312609
2610+ const buffer = ArrayBufferViewGetBuffer ( chunk ) ;
2611+ const byteOffset = ArrayBufferViewGetByteOffset ( chunk ) ;
2612+ const byteLength = ArrayBufferViewGetByteLength ( chunk ) ;
2613+
23322614 if ( closeRequested || stream [ kState ] . state !== 'readable' )
23332615 return ;
23342616
0 commit comments