Skip to content

async iterators/generators in stream.pipeline() #27140

@boneskull

Description

@boneskull

Apologies for all the code here.

We can consume a Readable stream using an async iterator:

// source: http://2ality.com/2018/04/async-iter-nodejs.html
async function printAsyncIterable(iterable) {
  for await (const chunk of iterable) {
    console.log('>>> '+chunk);
  }
}

printAsyncIterable(fs.createReadStream('my-file.txt', 'utf8'));

And we can use async generators similarly to how one would use a Transform stream:

/**
 * Parameter: async iterable of chunks (strings)
 * Result: async iterable of lines (incl. newlines)
 */
async function* chunksToLines(chunksAsync) {
  let previous = '';
  for await (const chunk of chunksAsync) {
    previous += chunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf('\n')) >= 0) {
      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

/**
 * Parameter: async iterable of lines
 * Result: async iterable of numbered lines
 */
async function* numberLines(linesAsync) {
  let counter = 1;
  for await (const line of linesAsync) {
    yield counter + ': ' + line;
    counter++;
  }
}

Then, we can "pipe" these together like so:

async function main() {
  printAsyncIterable(
    numberLines(
      chunksToLines(
        fs.createReadStream('my-file.txt', 'utf8')
      )
    )
  );
}
main();

That's neat, but also kind of hideous. What if we could leverage stream.pipeline() to do something like this?

async function main() {
  stream.pipeline(
    fs.createReadStream('my-file.txt', 'utf8'),
    chunksToLines,
    numberLines,
    printAsyncIterable
  );
}
main();

I'm unfamiliar with the guts of stream.pipeline()--and completely new to async iterators and generators--so don't know how feasible something like this is.

FWIW, the "hideous nested function calls" can be naively replaced by use of the godlike Array.prototype.reduce():

const pipeline = async (...args) => args.reduce((acc, arg) => arg(acc));

async function main() {
  pipeline(
    fs.createReadStream('my-file.txt', 'utf8'),
    chunksToLines,
    numberLines,
    printAsyncIterable
  );
}
main();

Reference: https://twitter.com/b0neskull/status/1115325542566227968

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature requestIssues that request new features to be added to Node.js.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions