From f5dfa004d9b34fa94e1454421adb7deac4c9121b Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Mon, 15 Nov 2021 15:39:05 +0200 Subject: [PATCH 01/19] stream: add map method to Readable Implement the map method on readable stream. This starts the alignment with the tc39-iterator-helpers proposal and adds a `.map` method to every Node.js readable stream. Co-Authored-By: Robert Nagy --- doc/api/stream.md | 43 ++++++++ lib/internal/streams/operators.js | 161 ++++++++++++++++++++++++++++++ lib/stream.js | 8 ++ test/parallel/test-stream-map.js | 108 ++++++++++++++++++++ 4 files changed, 320 insertions(+) create mode 100644 lib/internal/streams/operators.js create mode 100644 test/parallel/test-stream-map.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 81d9d660dc675b..fa2225d316939a 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1737,6 +1737,49 @@ async function showBoth() { showBoth(); ``` +### `readable.map(fn[, options])` + + + +> Stability: 1 - Experimental +* `fn` {Function|AsyncFunction} a function to map over every item in the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximal concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream mapped with the function `fn`. + +This method allows mapping over the stream. The `fn` function will be called +for every item in the stream. If the `fn` function returns a promise - that +promise will be `await`ed before being passed to the result stream. + +```mjs +import { Readable } from 'stream'; +import { Resolver } from 'dns/promises'; + +// With a synchronous mapper. +for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { + console.log(item); // 2, 4, 6, 8 +} +// With an asynchronous mapper, making at most 2 queries at a time. +const resolver = new Resolver(); +const dnsResults = await Readable.from([ + 'nodejs.org', + 'openjsf.org', + 'www.linuxfoundation.org', +]).map((domain) => resolver.resolve4(domain), { concurrency: 2 }); +for await (const result of dnsResults) { + console.log(result); // Kogs the DNS result of resolver.resolve4. +} +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js new file mode 100644 index 00000000000000..9c24d3ec9cbcf1 --- /dev/null +++ b/lib/internal/streams/operators.js @@ -0,0 +1,161 @@ +'use strict'; + +const { AbortController } = require('internal/abort_controller'); +const { kWeakHandler } = require('internal/event_target'); +const { + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_ARG_VALUE, + }, AbortError, +} = require('internal/errors'); + +const { + MathFloor, + NumberIsInteger, + Promise, + PromiseReject, + Symbol, +} = primordials; + +const kEmpty = Symbol('kEmpty'); +const kEof = Symbol('kEof'); + +async function * map(fn, options) { + if (typeof fn !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], this); + } + + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + + let concurrency = 1; + if (options?.concurrency != null) { + concurrency = MathFloor(options.concurrency); + } + + if (!NumberIsInteger(concurrency)) { + throw new ERR_INVALID_ARG_TYPE( + 'concurrency', ['Integer'], concurrency); + } + + if (concurrency < 1) { + throw new ERR_INVALID_ARG_VALUE('concurrency', concurrency); + } + + const ac = new AbortController(); + const stream = this; + const queue = []; + const signal = ac.signal; + const signalOpt = { signal }; + + options?.signal?.addEventListener('abort', () => ac.abort(), { + [kWeakHandler]: signalOpt + }); + + let next; + let resume; + let done = false; + + function onDone() { + done = true; + } + + async function pump() { + try { + for await (let val of stream) { + if (done) { + return; + } + + if (signal.aborted) { + throw new AbortError(); + } + + try { + val = fn(val, signalOpt); + } catch (err) { + val = PromiseReject(err); + } + + if (val === kEmpty) { + continue; + } + + if (typeof val?.catch === 'function') { + val.catch(onDone); + } + + queue.push(val); + if (next) { + next(); + next = null; + } + + if (!done && queue.length >= concurrency) { + await new Promise((resolve) => { + resume = resolve; + }); + } + } + queue.push(kEof); + } catch (err) { + const val = PromiseReject(err); + val.catch(onDone); + queue.push(val); + } finally { + done = true; + if (next) { + next(); + next = null; + } + } + } + + pump(); + + try { + while (true) { + while (queue.length > 0) { + let val = queue[0]; + + if (val === kEof) { + return; + } + + val = await val; + + if (signal.aborted) { + throw new AbortError(); + } + + if (val !== kEmpty) { + yield val; + } + + queue.shift(); + if (resume) { + resume(); + resume = null; + } + } + + await new Promise((resolve) => { + next = resolve; + }); + } + } finally { + ac.abort(); + + done = true; + if (resume) { + resume(); + resume = null; + } + } +} + +module.exports = { + map +}; diff --git a/lib/stream.js b/lib/stream.js index c7f61cf8873786..e87c19dd4d7dc7 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -23,12 +23,14 @@ const { ObjectDefineProperty, + ObjectKeys, } = primordials; const { promisify: { custom: customPromisify }, } = require('internal/util'); +const operators = require('internal/streams/operators'); const compose = require('internal/streams/compose'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); @@ -42,6 +44,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.isDisturbed = utils.isDisturbed; Stream.isErrored = utils.isErrored; Stream.Readable = require('internal/streams/readable'); +for (const key of ObjectKeys(operators)) { + const op = operators[key]; + Stream.Readable.prototype[key] = function(...args) { + return Stream.Readable.from(op.apply(this, args)); + }; +} Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js new file mode 100644 index 00000000000000..dbfa1ef4d569b3 --- /dev/null +++ b/test/parallel/test-stream-map.js @@ -0,0 +1,108 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); + +{ + // Map works on synchronous streams with a synchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on synchronous streams with an asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + await Promise.resolve(); + return x + x; + }); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + return x + x; + }).map((x) => x + x); + const result = [4, 8, 12, 16, 20]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Concurrency + AbortSignal + const ac = new AbortController(); + let calls = 0; + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); + // pump + (async () => { + for await (const item of stream) { + // nope + console.log(item); + } + })().catch(common.mustCall((e) => { + assert.strictEqual(e.name, 'AbortError'); + })); + + setImmediate(() => { + ac.abort(); + assert.strictEqual(calls, 2); + }); +} + +{ + // Concurrency result order + const stream = Readable.from([1, 2]).map(async (item, { signal }) => { + await setTimeout(10 - item, { signal }); + return item; + }, { concurrency: 2 }); + + (async () => { + const expected = [1, 2]; + for await (const item of stream) { + assert.strictEqual(item, expected.shift()); + } + })().then(common.mustCall()); +} + +{ + // Error cases + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const unused of Readable.from([1]).map(1)); + }, /ERR_INVALID_ARG_TYPE/); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).map((x) => x, { + concurrency: 'Foo' + })); + }, /ERR_INVALID_ARG_TYPE/); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).map((x) => x, 1)); + }, /ERR_INVALID_ARG_TYPE/); +} +{ + // Test result is a Readable + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x); + assert.strictEqual(stream.readable, true); +} From eae40bc3f198f83f099ef6c7d0839a7ddbcd922c Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sun, 21 Nov 2021 15:33:40 +0200 Subject: [PATCH 02/19] test code review --- test/parallel/test-stream-map.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index dbfa1ef4d569b3..6cb9b01e69c581 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -89,17 +89,17 @@ const { setTimeout } = require('timers/promises'); assert.rejects(async () => { // eslint-disable-next-line no-unused-vars for await (const unused of Readable.from([1]).map(1)); - }, /ERR_INVALID_ARG_TYPE/); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); assert.rejects(async () => { // eslint-disable-next-line no-unused-vars for await (const _ of Readable.from([1]).map((x) => x, { concurrency: 'Foo' })); - }, /ERR_INVALID_ARG_TYPE/); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); assert.rejects(async () => { // eslint-disable-next-line no-unused-vars for await (const _ of Readable.from([1]).map((x) => x, 1)); - }, /ERR_INVALID_ARG_TYPE/); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); } { // Test result is a Readable From 7678628bb083ea193ec25a5e043458c4d7ce81bc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 29 Nov 2021 12:03:07 +0100 Subject: [PATCH 03/19] fixup --- lib/internal/streams/operators.js | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 9c24d3ec9cbcf1..5002534ae3deac 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -8,10 +8,13 @@ const { ERR_INVALID_ARG_VALUE, }, AbortError, } = require('internal/errors'); +const { + validateInteger, + validateObject, +} = require('internal/validators'); const { MathFloor, - NumberIsInteger, Promise, PromiseReject, Symbol, @@ -30,15 +33,14 @@ async function * map(fn, options) { throw new ERR_INVALID_ARG_TYPE('options', ['Object']); } + validateObject(options, 'options', { nullable: true }); + let concurrency = 1; if (options?.concurrency != null) { concurrency = MathFloor(options.concurrency); } - if (!NumberIsInteger(concurrency)) { - throw new ERR_INVALID_ARG_TYPE( - 'concurrency', ['Integer'], concurrency); - } + validateInteger(concurrency, 'concurrency', 1); if (concurrency < 1) { throw new ERR_INVALID_ARG_VALUE('concurrency', concurrency); From 55eb7ff44b63c4271a466b433343a7701c9a5ac5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 29 Nov 2021 12:11:45 +0100 Subject: [PATCH 04/19] fixup: explicit listener --- lib/internal/streams/operators.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 5002534ae3deac..7687d3b26e7415 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -1,7 +1,6 @@ 'use strict'; const { AbortController } = require('internal/abort_controller'); -const { kWeakHandler } = require('internal/event_target'); const { codes: { ERR_INVALID_ARG_TYPE, @@ -52,9 +51,8 @@ async function * map(fn, options) { const signal = ac.signal; const signalOpt = { signal }; - options?.signal?.addEventListener('abort', () => ac.abort(), { - [kWeakHandler]: signalOpt - }); + const abort = () => ac.abort() + options?.signal?.addEventListener('abort', abort); let next; let resume; @@ -112,6 +110,7 @@ async function * map(fn, options) { next(); next = null; } + options?.signal?.removeEventListener('abort', abort); } } From 66fbd9ff14dbdfc64d8bb790b34c39bec73c3ba3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 29 Nov 2021 12:13:43 +0100 Subject: [PATCH 05/19] fixup --- lib/internal/streams/operators.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 7687d3b26e7415..6ec6f25324f61f 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -119,14 +119,12 @@ async function * map(fn, options) { try { while (true) { while (queue.length > 0) { - let val = queue[0]; + const val = await queue[0]; if (val === kEof) { return; } - val = await val; - if (signal.aborted) { throw new AbortError(); } From 416e46224d22e1a039555501b76c2c5394cb051a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 29 Nov 2021 12:16:57 +0100 Subject: [PATCH 06/19] fixup --- lib/internal/streams/operators.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 6ec6f25324f61f..d0060c88f5ee20 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -93,7 +93,7 @@ async function * map(fn, options) { next = null; } - if (!done && queue.length >= concurrency) { + if (!done && queue.length && queue.length >= concurrency) { await new Promise((resolve) => { resume = resolve; }); From 5a22655f8425757e049201ff799608458a6e51d1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 30 Nov 2021 10:47:09 +0100 Subject: [PATCH 07/19] Update lib/internal/streams/operators.js --- lib/internal/streams/operators.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index d0060c88f5ee20..50c4434d3b06d2 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -51,7 +51,7 @@ async function * map(fn, options) { const signal = ac.signal; const signalOpt = { signal }; - const abort = () => ac.abort() + const abort = () => ac.abort(); options?.signal?.addEventListener('abort', abort); let next; From 2e7c469ec7843e63c52f1507afbe69b22a07719b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 3 Dec 2021 11:55:46 +0100 Subject: [PATCH 08/19] fixup --- doc/api/stream.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index fa2225d316939a..3bb9dc07b3dedb 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1744,6 +1744,7 @@ added: REPLACEME --> > Stability: 1 - Experimental + * `fn` {Function|AsyncFunction} a function to map over every item in the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} From 751b4297a0d2c86a5070bbd3ad49f733c63023fe Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 3 Dec 2021 12:11:29 +0100 Subject: [PATCH 09/19] fixup --- tools/doc/type-parser.mjs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/doc/type-parser.mjs b/tools/doc/type-parser.mjs index 566a1c92030584..e38363cbacfcdd 100644 --- a/tools/doc/type-parser.mjs +++ b/tools/doc/type-parser.mjs @@ -208,12 +208,12 @@ const customTypesMap = { 'Stream': 'stream.html#stream', 'stream.Duplex': 'stream.html#class-streamduplex', - 'stream.Readable': 'stream.html#class-streamreadable', - 'stream.Transform': 'stream.html#class-streamtransform', - 'stream.Writable': 'stream.html#class-streamwritable', 'Duplex': 'stream.html#class-streamduplex', + 'stream.Readable': 'stream.html#class-streamreadable', 'Readable': 'stream.html#class-streamreadable', + 'stream.Transform': 'stream.html#class-streamtransform', 'Transform': 'stream.html#class-streamtransform', + 'stream.Writable': 'stream.html#class-streamwritable',, 'Writable': 'stream.html#class-streamwritable', 'Immediate': 'timers.html#class-immediate', From d91dddafca319e9b6449db9f68a213f0bd0f6bdd Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 3 Dec 2021 21:39:38 +0100 Subject: [PATCH 10/19] fixup --- lib/internal/streams/operators.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 50c4434d3b06d2..43f6cd9ea00399 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -32,8 +32,6 @@ async function * map(fn, options) { throw new ERR_INVALID_ARG_TYPE('options', ['Object']); } - validateObject(options, 'options', { nullable: true }); - let concurrency = 1; if (options?.concurrency != null) { concurrency = MathFloor(options.concurrency); From 0679c796aa05c3fe80de82eedfacbf9c5881e042 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Dec 2021 01:03:17 +0100 Subject: [PATCH 11/19] fixup --- lib/internal/streams/operators.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 43f6cd9ea00399..478642d4bf27ff 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -9,7 +9,6 @@ const { } = require('internal/errors'); const { validateInteger, - validateObject, } = require('internal/validators'); const { From a8e9dd6357d33e75a033a41f39aaebe97ca0ca7f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Dec 2021 01:10:01 +0100 Subject: [PATCH 12/19] fixup --- lib/internal/streams/operators.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 478642d4bf27ff..494d4c29ad44c5 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -7,12 +7,10 @@ const { ERR_INVALID_ARG_VALUE, }, AbortError, } = require('internal/errors'); -const { - validateInteger, -} = require('internal/validators'); const { MathFloor, + NumberIsInteger, Promise, PromiseReject, Symbol, @@ -36,7 +34,9 @@ async function * map(fn, options) { concurrency = MathFloor(options.concurrency); } - validateInteger(concurrency, 'concurrency', 1); + if (!NumberIsInteger(concurrency)) { + throw new ERR_INVALID_ARG_TYPE('concurrency', ['Integer']); + } if (concurrency < 1) { throw new ERR_INVALID_ARG_VALUE('concurrency', concurrency); From 2db55013365df6ef1be1aacdc2f29290c2ffa54c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Dec 2021 01:40:10 +0100 Subject: [PATCH 13/19] fixup --- test/parallel/test-bootstrap-modules.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 19ba89a290b6d7..9a687eede28ca9 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -110,6 +110,7 @@ const expectedModules = new Set([ 'NativeModule internal/streams/end-of-stream', 'NativeModule internal/streams/from', 'NativeModule internal/streams/legacy', + 'NativeModule internal/streams/operators', 'NativeModule internal/streams/passthrough', 'NativeModule internal/streams/pipeline', 'NativeModule internal/streams/readable', From 7045b682f4de3eb61c51a738800a208a756ffb07 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 27 Dec 2021 08:58:58 +0100 Subject: [PATCH 14/19] Apply suggestions from code review Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 2 +- lib/internal/streams/operators.js | 15 +++++---------- lib/stream.js | 2 +- test/parallel/test-stream-map.js | 8 ++++---- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 3bb9dc07b3dedb..ab285301e1afb0 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1777,7 +1777,7 @@ const dnsResults = await Readable.from([ 'www.linuxfoundation.org', ]).map((domain) => resolver.resolve4(domain), { concurrency: 2 }); for await (const result of dnsResults) { - console.log(result); // Kogs the DNS result of resolver.resolve4. + console.log(result); // Logs the DNS result of resolver.resolve4. } ``` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 494d4c29ad44c5..1cbadbce6802d6 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -5,7 +5,8 @@ const { codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, - }, AbortError, + }, + AbortError, } = require('internal/errors'); const { @@ -34,13 +35,7 @@ async function * map(fn, options) { concurrency = MathFloor(options.concurrency); } - if (!NumberIsInteger(concurrency)) { - throw new ERR_INVALID_ARG_TYPE('concurrency', ['Integer']); - } - - if (concurrency < 1) { - throw new ERR_INVALID_ARG_VALUE('concurrency', concurrency); - } + validateInteger(concurrency, 'concurrency', 1); const ac = new AbortController(); const stream = this; @@ -99,7 +94,7 @@ async function * map(fn, options) { queue.push(kEof); } catch (err) { const val = PromiseReject(err); - val.catch(onDone); + PromisePrototypeCatch(val, onDone); queue.push(val); } finally { done = true; @@ -153,5 +148,5 @@ async function * map(fn, options) { } module.exports = { - map + map, }; diff --git a/lib/stream.js b/lib/stream.js index e87c19dd4d7dc7..ce28cdbe2efe1a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -47,7 +47,7 @@ Stream.Readable = require('internal/streams/readable'); for (const key of ObjectKeys(operators)) { const op = operators[key]; Stream.Readable.prototype[key] = function(...args) { - return Stream.Readable.from(op.apply(this, args)); + return Stream.Readable.from(ReflectApply(op, this, arguments)); }; } Stream.Writable = require('internal/streams/writable'); diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index 6cb9b01e69c581..e89dea194b6716 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -54,14 +54,14 @@ const { setTimeout } = require('timers/promises'); await setTimeout(100, { signal }); }, { signal: ac.signal, concurrency: 2 }); // pump - (async () => { + assert.rejects(async () => { for await (const item of stream) { // nope console.log(item); } - })().catch(common.mustCall((e) => { - assert.strictEqual(e.name, 'AbortError'); - })); + }, { + name: 'AbortError', + }).then(common.mustCall()); setImmediate(() => { ac.abort(); From c61f6bad00de0fcc8eb8bcdbeca694a4537d8974 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Dec 2021 17:47:09 +0100 Subject: [PATCH 15/19] fixup --- tools/doc/type-parser.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/doc/type-parser.mjs b/tools/doc/type-parser.mjs index e38363cbacfcdd..b476316f6aed9a 100644 --- a/tools/doc/type-parser.mjs +++ b/tools/doc/type-parser.mjs @@ -213,7 +213,7 @@ const customTypesMap = { 'Readable': 'stream.html#class-streamreadable', 'stream.Transform': 'stream.html#class-streamtransform', 'Transform': 'stream.html#class-streamtransform', - 'stream.Writable': 'stream.html#class-streamwritable',, + 'stream.Writable': 'stream.html#class-streamwritable', 'Writable': 'stream.html#class-streamwritable', 'Immediate': 'timers.html#class-immediate', From 7e617c26a37f8d4d69296e9b8dbaf61da6ad24f1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Dec 2021 17:57:39 +0100 Subject: [PATCH 16/19] Update lib/stream.js --- lib/stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stream.js b/lib/stream.js index ce28cdbe2efe1a..e580efe87ed33a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -47,7 +47,7 @@ Stream.Readable = require('internal/streams/readable'); for (const key of ObjectKeys(operators)) { const op = operators[key]; Stream.Readable.prototype[key] = function(...args) { - return Stream.Readable.from(ReflectApply(op, this, arguments)); + return Stream.Readable.from(ReflectApply(op, this, args)); }; } Stream.Writable = require('internal/streams/writable'); From 47d9891a21dbdde9aec5b5d7acc8c8f802069f4f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Dec 2021 18:08:21 +0100 Subject: [PATCH 17/19] fixup --- lib/stream.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/stream.js b/lib/stream.js index e580efe87ed33a..0c94011a532bd1 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -24,6 +24,7 @@ const { ObjectDefineProperty, ObjectKeys, + ReflectApply, } = primordials; const { From 2d7cd8c4277d1b7cf1d2c031b5713893923b8de3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Dec 2021 18:19:21 +0100 Subject: [PATCH 18/19] fixup --- lib/internal/streams/operators.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 1cbadbce6802d6..7ffbbebd332d9e 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -4,16 +4,16 @@ const { AbortController } = require('internal/abort_controller'); const { codes: { ERR_INVALID_ARG_TYPE, - ERR_INVALID_ARG_VALUE, }, AbortError, } = require('internal/errors'); +const { validateInteger } = require('internal/validators'); const { MathFloor, - NumberIsInteger, Promise, PromiseReject, + PromisePrototypeCatch, Symbol, } = primordials; From 686d9e67140cd7e73292f67be33e48f508f7a039 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Dec 2021 18:58:12 +0100 Subject: [PATCH 19/19] ifxuo --- test/parallel/test-stream-map.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index e89dea194b6716..2d5c5894e1eb0f 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -95,7 +95,7 @@ const { setTimeout } = require('timers/promises'); for await (const _ of Readable.from([1]).map((x) => x, { concurrency: 'Foo' })); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); assert.rejects(async () => { // eslint-disable-next-line no-unused-vars for await (const _ of Readable.from([1]).map((x) => x, 1));