Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d957541
stream: add Transform.by utility function
Jun 25, 2019
9005e46
docs: stream.Transform.by typo
Jul 1, 2019
944f228
stream: Transform.by SourceIterator next optimization
Jul 1, 2019
84c84c9
docs: Transform.by doc tweaks
davidmarkclements Jul 2, 2019
a1937c7
docs: sort type-parser types alphabetically within their groups
davidmarkclements Jul 2, 2019
ad35382
docs: sort type-parser types alphabetically within their groups
davidmarkclements Jul 2, 2019
8d82c5b
docs: typo
davidmarkclements Jul 2, 2019
7a1ef77
docs: Transform.by clarify as async iterable
davidmarkclements Jul 2, 2019
5707445
stream: Transform. by remove unnecessary defensive code
davidmarkclements Jul 2, 2019
970ed3d
stream: Transform.by code style
davidmarkclements Jul 2, 2019
6e67a85
stream: Transform.by minor refactoring
Jul 3, 2019
98aebc5
streams: Transform.by check fn return value instead of fn instance
Jul 5, 2019
eee19c5
docs: emphasize Transform.by objectMode default behaviour
Jul 7, 2019
26e96be
docs: Transform.by function naming convention
davidmarkclements Jul 7, 2019
185a6f4
docs: add transform content to streams <-> async generators compatibi…
Jul 7, 2019
8e73c51
docs: includemissing parens
Jul 7, 2019
b73347f
tests: preempt conflict with #28566
Jul 7, 2019
9789a5b
tests: fix transform async iterator test
Jul 12, 2019
bad0bfd
docs: add meta data to Transform.by
davidmarkclements Jul 15, 2019
f2c8b22
stream: error handling bug fix, ensure stream is destroyed after proc…
Jul 15, 2019
001fe01
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
9545ee0
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
f0fa8b6
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
b504b15
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
447a895
streams: Transform.by, rm unncessary check
Sep 9, 2019
2fe96e4
lint fixes
Dec 13, 2019
1115d52
Merge branch 'transform-by' of github.com:davidmarkclements/node into…
safareli May 15, 2021
a8151fe
use implementation from @ronag
safareli May 15, 2021
3049db3
remove ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE
safareli May 15, 2021
3ce77ac
fix review comment
safareli May 15, 2021
59791d0
fix lint errors
safareli May 15, 2021
6480563
fix md lint errors
safareli May 15, 2021
307e926
fix review comment
safareli May 17, 2021
382b9b5
rename test file
safareli May 17, 2021
58dc92d
try fix doc build error
safareli May 17, 2021
b4b5441
fix other doc error
safareli May 17, 2021
a985441
Apply stream.md suggestions from code review
safareli Jun 4, 2021
e0c1f0e
apply @aduh95 suggestions
safareli Jun 6, 2021
c5c862d
Apply suggestions from code review
safareli Jun 6, 2021
adc6059
Update doc/api/stream.md
safareli Jun 6, 2021
967989e
assign `encoding` to the async generator
safareli Jun 8, 2021
6ccafab
add encoding to yield
safareli Jun 9, 2021
9bccc23
try fix some tests
safareli Jun 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
and [`stream.addAbortSignal()`][].
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][],
[`stream.addAbortSignal()`][] and [`stream.Transform.by()`][].

### Streams Promises API
<!-- YAML
Expand Down Expand Up @@ -1889,7 +1889,51 @@ const stream = addAbortSignal(
}
})();
```
## API for stream implementers

### `stream.Transform.by(asyncGeneratorFunction[, options])`
<!-- YAML
added: REPLACEME
-->

* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which
accepts a `source` async iterable which can be used to read incoming data, while
transformed data is pushed to the stream with the `yield` keyword.
* `options` {Object} Options provided to `new stream.Transform([options])`.
By default, `Transform.by()` will set `options.objectMode` to `true`,
unless this is explicitly opted out by setting `options.objectMode` to `false`.
* Returns: {stream.Transform}

A utility method for creating Transform Streams with async generator functions.
The async generator is supplied a single argument, `source`, which is used to
read incoming chunks.

Yielded values become the data chunks emitted from the stream.

```js
const { Readable, Transform } = require('stream');

const readable = Readable.from(['hello', 'streams']);
async function * mapper(source) {
for await (const chunk of source) {
// If objectMode was set to false, the buffer would have to be converted
// to a string here but since it is true by default for both Readable.from()
// and Transform.by() each chunk is already a string.
yield chunk.toUpperCase();
}
}
const transform = Transform.by(mapper);
readable.pipe(transform);
transform.on('data', (chunk) => {
console.log(chunk);
});
```

The `source` parameter has an `encoding` property which represents the encoding
of the `WriteableStream` side of the transform. This is the same `encoding`
value that would be passed as the second parameter to the `transform()` function
option (or `_transform()` method) supplied to `stream.Transform`.

## API for Stream Implementers

<!--type=misc-->

Expand Down Expand Up @@ -1928,7 +1972,7 @@ on the type of stream being created, as detailed in the chart below:
| Reading only | [`Readable`][] | [`_read()`][stream-_read] |
| Writing only | [`Writable`][] | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] |
| Reading and writing | [`Duplex`][] | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] |
| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final] |
| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][], [`_flush()`][stream-_flush], [`_final()`][stream-_final] |

The implementation code for a stream should *never* call the "public" methods
of a stream that are intended for use by consumers (as described in the
Expand Down Expand Up @@ -3095,6 +3139,35 @@ readable.on('data', (chunk) => {
});
```

#### Creating transform streams with async generator functions

We can construct a Node.js transform stream with an asynchronous
generator function using the `Transform.by()` utility method.

```js
const { Readable, Transform } = require('stream');

async function * toUpperCase(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
}
}
const transform = Transform.by(toUpperCase);

async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());

readable.pipe(transform);
transform.on('data', (chunk) => {
console.log(chunk);
});
```

#### Piping to writable streams from async iterators

When writing to a writable stream from an async iterator, ensure correct
Expand Down Expand Up @@ -3259,6 +3332,7 @@ contain multi-byte characters.
[`readable.push('')`]: #stream_readable_push
[`readable.setEncoding()`]: #stream_readable_setencoding_encoding
[`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options
[`stream.Transform.by()`]: #stream_stream_transform_by_asyncgeneratorfunction_options
[`stream.cork()`]: #stream_writable_cork
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
[`stream.pipe()`]: #stream_readable_pipe_destination_options
Expand Down
24 changes: 24 additions & 0 deletions lib/internal/streams/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const {
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('internal/streams/duplex');
const from = require('internal/streams/from');
const { createDeferredPromise } = require('internal/util');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);

Expand Down Expand Up @@ -244,3 +246,25 @@ Transform.prototype._read = function() {
callback();
}
};

Transform.by = function by(asyncGeneratorFn, options) {
let { promise, resolve } = createDeferredPromise();
const asyncGenerator = async function*() {
while (true) {
const { chunk, done, encoding, cb } = await promise;
process.nextTick(cb);
if (done) return;
yield { chunk, encoding };
({ promise, resolve } = createDeferredPromise());
}
}();
return from(Duplex, asyncGeneratorFn(asyncGenerator), {
objectMode: true,
autoDestroy: true,
...options,
write: (chunk, encoding, cb) => {
resolve({ chunk, done: false, encoding, cb });
},
final: (cb) => resolve({ done: true, cb })
});
};
24 changes: 24 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,30 @@ async function tests() {
mustReach[1]();
}

{
console.log('readable side of a transform stream pushes null');
const transform = new Transform({
objectMode: true,
transform: (chunk, enc, cb) => { cb(null, chunk); }
});
transform.push(0);
transform.push(1);
process.nextTick(() => {
transform.push(null);
});

const mustReach = [ common.mustCall(), common.mustCall() ];

const iter = transform[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);

for await (const d of iter) {
assert.strictEqual(d, 1);
mustReach[0]();
}
mustReach[1]();
}

{
console.log('all next promises must be resolved on end');
const r = new Readable({
Expand Down
Loading