Skip to content

Commit 9f873b3

Browse files
committed
Revert "stream: make finished call the callback if the stream is closed"
This reverts commit b03845b. PR-URL: #29717 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Beth Griggs <[email protected]>
1 parent 8c60685 commit 9f873b3

File tree

4 files changed

+32
-330
lines changed

4 files changed

+32
-330
lines changed

lib/internal/streams/async_iterator.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
112112
return() {
113113
return new Promise((resolve, reject) => {
114114
const stream = this[kStream];
115+
116+
// TODO(ronag): Remove this check once finished() handles
117+
// already ended and/or destroyed streams.
118+
const ended = stream.destroyed || stream.readableEnded ||
119+
(stream._readableState && stream._readableState.endEmitted);
120+
if (ended) {
121+
resolve(createIterResult(undefined, true));
122+
return;
123+
}
124+
115125
finished(stream, (err) => {
116126
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
117127
reject(err);

lib/internal/streams/end-of-stream.js

Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,6 @@ function isRequest(stream) {
1313
return stream.setHeader && typeof stream.abort === 'function';
1414
}
1515

16-
function isReadable(stream) {
17-
return typeof stream.readable === 'boolean' ||
18-
typeof stream.readableEnded === 'boolean' ||
19-
!!stream._readableState;
20-
}
21-
22-
function isWritable(stream) {
23-
return typeof stream.writable === 'boolean' ||
24-
typeof stream.writableEnded === 'boolean' ||
25-
!!stream._writableState;
26-
}
27-
2816
function eos(stream, opts, callback) {
2917
if (arguments.length === 2) {
3018
callback = opts;
@@ -40,51 +28,43 @@ function eos(stream, opts, callback) {
4028

4129
callback = once(callback);
4230

43-
const onerror = (err) => {
44-
callback.call(stream, err);
45-
};
46-
47-
let writableFinished = stream.writableFinished ||
48-
(stream._writableState && stream._writableState.finished);
49-
let readableEnded = stream.readableEnded ||
50-
(stream._readableState && stream._readableState.endEmitted);
51-
52-
if (writableFinished || readableEnded || stream.destroyed ||
53-
stream.aborted) {
54-
if (opts.error !== false) stream.on('error', onerror);
55-
// A destroy(err) call emits error in nextTick.
56-
process.nextTick(callback.bind(stream));
57-
return () => {
58-
stream.removeListener('error', onerror);
59-
};
60-
}
61-
62-
let readable = opts.readable ||
63-
(opts.readable !== false && isReadable(stream));
64-
let writable = opts.writable ||
65-
(opts.writable !== false && isWritable(stream));
31+
let readable = opts.readable || (opts.readable !== false && stream.readable);
32+
let writable = opts.writable || (opts.writable !== false && stream.writable);
6633

6734
const onlegacyfinish = () => {
6835
if (!stream.writable) onfinish();
6936
};
7037

38+
var writableEnded = stream._writableState && stream._writableState.finished;
7139
const onfinish = () => {
7240
writable = false;
73-
writableFinished = true;
41+
writableEnded = true;
7442
if (!readable) callback.call(stream);
7543
};
7644

45+
var readableEnded = stream.readableEnded ||
46+
(stream._readableState && stream._readableState.endEmitted);
7747
const onend = () => {
7848
readable = false;
7949
readableEnded = true;
8050
if (!writable) callback.call(stream);
8151
};
8252

53+
const onerror = (err) => {
54+
callback.call(stream, err);
55+
};
56+
8357
const onclose = () => {
58+
let err;
8459
if (readable && !readableEnded) {
85-
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
86-
} else if (writable && !writableFinished) {
87-
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
60+
if (!stream._readableState || !stream._readableState.ended)
61+
err = new ERR_STREAM_PREMATURE_CLOSE();
62+
return callback.call(stream, err);
63+
}
64+
if (writable && !writableEnded) {
65+
if (!stream._writableState || !stream._writableState.ended)
66+
err = new ERR_STREAM_PREMATURE_CLOSE();
67+
return callback.call(stream, err);
8868
}
8969
};
9070

test/parallel/test-http-client-finished.js

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -25,109 +25,3 @@ const { finished } = require('stream');
2525
.end();
2626
}));
2727
}
28-
29-
{
30-
// Test abort before finished.
31-
32-
const server = http.createServer(function(req, res) {
33-
});
34-
35-
server.listen(0, common.mustCall(function() {
36-
const req = http.request({
37-
port: this.address().port
38-
}, common.mustNotCall());
39-
req.abort();
40-
finished(req, common.mustCall(() => {
41-
server.close();
42-
}));
43-
}));
44-
}
45-
46-
{
47-
// Test abort after request.
48-
49-
const server = http.createServer(function(req, res) {
50-
});
51-
52-
server.listen(0, common.mustCall(function() {
53-
const req = http.request({
54-
port: this.address().port
55-
}).end();
56-
finished(req, (err) => {
57-
common.expectsError({
58-
type: Error,
59-
code: 'ERR_STREAM_PREMATURE_CLOSE'
60-
})(err);
61-
finished(req, common.mustCall(() => {
62-
server.close();
63-
}));
64-
});
65-
req.abort();
66-
}));
67-
}
68-
69-
{
70-
// Test abort before end.
71-
72-
const server = http.createServer(function(req, res) {
73-
res.write('test');
74-
});
75-
76-
server.listen(0, common.mustCall(function() {
77-
const req = http.request({
78-
port: this.address().port
79-
}).on('response', common.mustCall((res) => {
80-
req.abort();
81-
finished(res, common.mustCall(() => {
82-
finished(res, common.mustCall(() => {
83-
server.close();
84-
}));
85-
}));
86-
})).end();
87-
}));
88-
}
89-
90-
{
91-
// Test destroy before end.
92-
93-
const server = http.createServer(function(req, res) {
94-
res.write('test');
95-
});
96-
97-
server.listen(0, common.mustCall(function() {
98-
http.request({
99-
port: this.address().port
100-
}).on('response', common.mustCall((res) => {
101-
// TODO(ronag): Bug? Won't emit 'close' unless read.
102-
res.on('data', () => {});
103-
res.destroy();
104-
finished(res, common.mustCall(() => {
105-
finished(res, common.mustCall(() => {
106-
server.close();
107-
}));
108-
}));
109-
})).end();
110-
}));
111-
}
112-
113-
{
114-
// Test finish after end.
115-
116-
const server = http.createServer(function(req, res) {
117-
res.end('asd');
118-
});
119-
120-
server.listen(0, common.mustCall(function() {
121-
http.request({
122-
port: this.address().port
123-
}).on('response', common.mustCall((res) => {
124-
// TODO(ronag): Bug? Won't emit 'close' unless read.
125-
res.on('data', () => {});
126-
finished(res, common.mustCall(() => {
127-
finished(res, common.mustCall(() => {
128-
server.close();
129-
}));
130-
}));
131-
})).end();
132-
}));
133-
}

0 commit comments

Comments
 (0)