Skip to content

Commit c9d25ab

Browse files
benjamingrronag
andcommitted
stream: add toArray
Add the toArray method from the TC39 iterator helper proposal to Readable streams. This also enables a common-use case of converting a stream to an array. Co-Authored-By: Robert Nagy <[email protected]>
1 parent 3f0bcfb commit c9d25ab

File tree

3 files changed

+140
-0
lines changed

3 files changed

+140
-0
lines changed

doc/api/stream.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,46 @@ await dnsResults.forEach((result) => {
18891889
console.log('done'); // Stream has finished
18901890
```
18911891

1892+
### `readable.toArray([options])`
1893+
1894+
<!-- YAML
1895+
added: REPLACEME
1896+
-->
1897+
1898+
> Stability: 1 - Experimental
1899+
1900+
* `options` {Object}
1901+
* `signal` {AbortSignal} allows cancelling the toArray operation if the
1902+
signal is aborted.
1903+
* Returns: {Promise} a promise containing an array (if the stream is in
1904+
object mode) or Buffer with the contents of the stream.
1905+
1906+
This method allows easily obtaining the contents of a stream. If the
1907+
stream is in [object mode][object-mode] an array of its contents is returned.
1908+
If the stream is not in object mode a Buffer containing its data is returned.
1909+
1910+
As this method reads the entire stream into memory, it negates the benefits of
1911+
streams. It's intended for interoperability and convenience, not as the primary
1912+
way to consume streams.
1913+
1914+
```mjs
1915+
import { Readable } from 'stream';
1916+
import { Resolver } from 'dns/promises';
1917+
1918+
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
1919+
1920+
// Make dns queries concurrently using .map and collect
1921+
// the results into an aray using toArray
1922+
const dnsResults = await Readable.from([
1923+
'nodejs.org',
1924+
'openjsf.org',
1925+
'www.linuxfoundation.org',
1926+
]).map(async (domain) => {
1927+
const { address } = await resolver.resolve4(domain, { ttl: true });
1928+
return address;
1929+
}, { concurrency: 2 }).toArray();
1930+
```
1931+
18921932
### Duplex and transform streams
18931933

18941934
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict';
22

33
const { AbortController } = require('internal/abort_controller');
4+
const { Buffer } = require('buffer');
5+
46
const {
57
codes: {
68
ERR_INVALID_ARG_TYPE,
@@ -10,6 +12,7 @@ const {
1012
const { validateInteger } = require('internal/validators');
1113

1214
const {
15+
ArrayPrototypePush,
1316
MathFloor,
1417
Promise,
1518
PromiseReject,
@@ -174,11 +177,25 @@ async function * filter(fn, options) {
174177
yield* this.map(filterFn, options);
175178
}
176179

180+
async function toArray(options) {
181+
const result = [];
182+
for await (const val of this) {
183+
if (options?.signal?.aborted) {
184+
throw new AbortError({ cause: options.signal.reason });
185+
}
186+
ArrayPrototypePush(result, val);
187+
}
188+
if (!this.readableObjectMode) {
189+
return Buffer.concat(result);
190+
}
191+
return result;
192+
}
177193
module.exports.streamReturningOperators = {
178194
filter,
179195
map,
180196
};
181197

182198
module.exports.promiseReturningOperators = {
183199
forEach,
200+
toArray,
184201
};
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// Works on a synchronous stream
12+
(async () => {
13+
const tests = [
14+
[],
15+
[1],
16+
[1, 2, 3],
17+
Array(100).fill().map((_, i) => i),
18+
];
19+
for (const test of tests) {
20+
const stream = Readable.from(test);
21+
const result = await stream.toArray();
22+
assert.deepStrictEqual(result, test);
23+
}
24+
})().then(common.mustCall());
25+
}
26+
27+
{
28+
// Works on a non-object-mode stream and flattens it
29+
(async () => {
30+
const stream = Readable.from(
31+
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
32+
, { objectMode: false });
33+
const result = await stream.toArray();
34+
assert.strictEqual(Buffer.isBuffer(result), true);
35+
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
36+
})().then(common.mustCall());
37+
}
38+
39+
{
40+
// Works on an asynchronous stream
41+
(async () => {
42+
const tests = [
43+
[],
44+
[1],
45+
[1, 2, 3],
46+
Array(100).fill().map((_, i) => i),
47+
];
48+
for (const test of tests) {
49+
const stream = Readable.from(test).map((x) => Promise.resolve(x));
50+
const result = await stream.toArray();
51+
assert.deepStrictEqual(result, test);
52+
}
53+
})().then(common.mustCall());
54+
}
55+
56+
{
57+
// Support for AbortSignal
58+
const ac = new AbortController();
59+
let stream;
60+
assert.rejects(async () => {
61+
stream = Readable.from([1, 2, 3]).map(async (x) => {
62+
if (x === 3) {
63+
await setTimeout(100, {}); // Explicitly do not pass signal here
64+
}
65+
Promise.resolve(x);
66+
});
67+
await stream.toArray({ signal: ac.signal });
68+
}, {
69+
name: 'AbortError',
70+
}).then(common.mustCall(() => {
71+
// Only stops toArray, does not destory the stream
72+
assert(stream.destroyed, false);
73+
}));
74+
75+
setImmediate(() => {
76+
ac.abort();
77+
});
78+
}
79+
{
80+
// Test result is a Promise
81+
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
82+
assert.strictEqual(result instanceof Promise, true);
83+
}

0 commit comments

Comments
 (0)