Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ Object.defineProperty(Readable.prototype, 'readableEnded', {
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
const state = this._readableState;
state.length = 0;
state.buffer.clear();
cb(err);
};

Expand Down Expand Up @@ -414,6 +417,10 @@ function howMuchToRead(n, state) {

// You can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
const state = this._readableState;
if (state.destroyed) {
return null;
}
debug('read', n);
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed
// in this scenario, so we are doing it manually.
Expand All @@ -422,7 +429,6 @@ Readable.prototype.read = function(n) {
} else if (!Number.isInteger(n)) {
n = parseInt(n, 10);
}
const state = this._readableState;
const nOrig = n;

// If we're asking for more than the current hwm, then raise the hwm.
Expand Down Expand Up @@ -491,9 +497,8 @@ Readable.prototype.read = function(n) {
}

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed, then it's
// not allowed.
if (state.ended || state.reading || state.destroyed) {
// reading, then it's unnecessary.
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
Expand Down
2 changes: 0 additions & 2 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ const assert = require('assert');

duplex.destroy();

duplex.removeListener('end', fail);
duplex.removeListener('finish', fail);
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
assert.strictEqual(duplex.destroyed, true);
}
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,14 @@ async function tests() {
let err = null;
try {
for await (const k of readable) {
assert.strictEqual(k, 'hello');
received++;
}
} catch (e) {
err = e;
}

assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(received, 1);
assert.strictEqual(received, 0);
}

{
Expand Down
15 changes: 12 additions & 3 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ const assert = require('assert');
read.on('close', common.mustCall());

read.destroy();

read.removeListener('end', fail);
read.on('end', common.mustCall());
assert.strictEqual(read.destroyed, true);
}

Expand Down Expand Up @@ -198,3 +195,15 @@ const assert = require('assert');
assert.strictEqual(read.destroyed, true);
read.read();
}

{
// Don't emit 'data' after destroy
const read = new Readable({
read() {
read.push('asd');
}
});
read.on('data', common.mustCall(() => {
read.destroy();
}));
}
2 changes: 0 additions & 2 deletions test/parallel/test-stream-transform-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ const assert = require('assert');

transform.destroy();

transform.removeListener('end', fail);
transform.removeListener('finish', fail);
transform.on('end', common.mustCall());
transform.on('finish', common.mustCall());
}

Expand Down