Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2621,6 +2621,12 @@ or a pipeline ends non gracefully with no explicit error.
An attempt was made to call [`stream.push()`][] after a `null`(EOF) had been
pushed to the stream.

<a id="ERR_STREAM_UNABLE_TO_PIPE"></a>

### `ERR_STREAM_UNABLE_TO_PIPE`

An attempt was made to pipe to a closed or destroyed stream in a pipeline.

<a id="ERR_STREAM_UNSHIFT_AFTER_END_EVENT"></a>

### `ERR_STREAM_UNSHIFT_AFTER_END_EVENT`
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,7 @@ E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNABLE_TO_PIPE', 'Connot pipe to a closed or destroyed stream', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error);
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_UNABLE_TO_PIPE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -253,10 +254,15 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const next = i + 1 < streams.length ? streams[i + 1] : null;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;

if (isNodeStream(stream)) {
if (next !== null && (next?.closed || next?.destroyed)) {
throw new ERR_STREAM_UNABLE_TO_PIPE();
}

if (end) {
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);
Expand Down
13 changes: 13 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');
const tmpdir = require('../common/tmpdir');
const fs = require('fs');

{
let finished = false;
Expand Down Expand Up @@ -69,6 +71,17 @@ const tsp = require('timers/promises');
}, /ERR_INVALID_ARG_TYPE/);
}

tmpdir.refresh();
{
assert.rejects(async () => {
const read = fs.createReadStream(__filename);
const write = fs.createWriteStream(tmpdir.resolve('a'));
const close = promisify(write.close);
await close.call(write);
await pipelinep(read, write);
}, /ERR_STREAM_UNABLE_TO_PIPE/).then(common.mustCall());
}

{
const read = new Readable({
read() {}
Expand Down