Skip to content

Commit b8f5fb9

Browse files
committed
stream: add readableDidRead
Adds did read accessor used to determine whether a readable has been read from. Refs: nodejs/undici#907
1 parent e605c86 commit b8f5fb9

File tree

3 files changed

+138
-1
lines changed

3 files changed

+138
-1
lines changed

doc/api/stream.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,6 +1222,17 @@ added: v11.4.0
12221222
Is `true` if it is safe to call [`readable.read()`][stream-read], which means
12231223
the stream has not been destroyed or emitted `'error'` or `'end'`.
12241224

1225+
##### `readable.readableDidRead`
1226+
<!-- YAML
1227+
added: REPLACEME
1228+
-->
1229+
1230+
* {boolean}
1231+
1232+
Allows determining if the stream has been or is about to be read.
1233+
Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been
1234+
emitted.
1235+
12251236
##### `readable.readableEncoding`
12261237
<!-- YAML
12271238
added: v12.7.0

lib/internal/streams/readable.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ function ReadableState(options, stream, isDuplex) {
171171
// If true, a maybeReadMore has been scheduled.
172172
this.readingMore = false;
173173

174+
this.dataEmitted = false;
175+
174176
this.decoder = null;
175177
this.encoding = null;
176178
if (options && options.encoding) {
@@ -314,6 +316,7 @@ function addChunk(stream, state, chunk, addToFront) {
314316
} else {
315317
state.awaitDrainWriters = null;
316318
}
319+
state.dataEmitted = true;
317320
stream.emit('data', chunk);
318321
} else {
319322
// Update the buffer info.
@@ -539,8 +542,10 @@ Readable.prototype.read = function(n) {
539542
endReadable(this);
540543
}
541544

542-
if (ret !== null)
545+
if (ret !== null) {
546+
state.dataEmitted = true;
543547
this.emit('data', ret);
548+
}
544549

545550
return ret;
546551
};
@@ -1177,6 +1182,18 @@ ObjectDefineProperties(Readable.prototype, {
11771182
}
11781183
},
11791184

1185+
readableDidRead: {
1186+
enumerable: false,
1187+
get: function() {
1188+
return (
1189+
this._readableState.dataEmitted ||
1190+
this._readableState.endEmitted ||
1191+
this._readableState.errorEmitted ||
1192+
this._readableState.closeEmitted
1193+
);
1194+
}
1195+
},
1196+
11801197
readableHighWaterMark: {
11811198
enumerable: false,
11821199
get: function() {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const Readable = require('stream').Readable;
5+
6+
function noop() {}
7+
8+
function check(readable, data, fn) {
9+
assert.strictEqual(readable.readableDidRead, false);
10+
assert.strictEqual(readable.readableUsed, false);
11+
if (data === -1) {
12+
readable.on('error', common.mustCall());
13+
readable.on('data', common.mustNotCall());
14+
assert.strictEqual(readable.readableUsed, true);
15+
readable.on('end', common.mustNotCall());
16+
} else {
17+
readable.on('error', common.mustNotCall());
18+
if (data === -2) {
19+
readable.on('end', common.mustNotCall());
20+
} else {
21+
readable.on('end', common.mustCall());
22+
}
23+
if (data > 0) {
24+
readable.on('data', common.mustCallAtLeast(data));
25+
assert.strictEqual(readable.readableUsed, true);
26+
} else {
27+
readable.on('data', common.mustNotCall());
28+
assert.strictEqual(readable.readableUsed, true);
29+
}
30+
}
31+
readable.on('close', common.mustCall());
32+
fn();
33+
setImmediate(() => {
34+
assert.strictEqual(readable.readableDidRead, true);
35+
assert.strictEqual(readable.readableUsed, true);
36+
});
37+
}
38+
39+
{
40+
const readable = new Readable({
41+
read() {
42+
this.push(null);
43+
}
44+
});
45+
check(readable, 0, () => {
46+
readable.read();
47+
});
48+
}
49+
50+
{
51+
const readable = new Readable({
52+
read() {
53+
this.push(null);
54+
}
55+
});
56+
check(readable, 0, () => {
57+
readable.resume();
58+
});
59+
}
60+
61+
{
62+
const readable = new Readable({
63+
read() {
64+
this.push(null);
65+
}
66+
});
67+
check(readable, -2, () => {
68+
readable.destroy();
69+
});
70+
}
71+
72+
{
73+
const readable = new Readable({
74+
read() {
75+
this.push(null);
76+
}
77+
});
78+
79+
check(readable, -1, () => {
80+
readable.destroy(new Error());
81+
});
82+
}
83+
84+
{
85+
const readable = new Readable({
86+
read() {
87+
this.push('data');
88+
this.push(null);
89+
}
90+
});
91+
92+
check(readable, 1, () => {
93+
readable.on('data', noop);
94+
});
95+
}
96+
97+
{
98+
const readable = new Readable({
99+
read() {
100+
this.push('data');
101+
this.push(null);
102+
}
103+
});
104+
105+
check(readable, 1, () => {
106+
readable.on('data', noop);
107+
readable.off('data', noop);
108+
});
109+
}

0 commit comments

Comments
 (0)