Skip to content

Commit d74a32e

Browse files
authored
[fix] Take into account the data queued in the sender (#971)
This makes the `bufferedAmount` getter take into account the data queued in the sender.
1 parent bc35fa4 commit d74a32e

File tree

4 files changed

+94
-33
lines changed

4 files changed

+94
-33
lines changed

lib/Sender.js

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,16 @@ class Sender {
2424
*/
2525
constructor (socket, extensions) {
2626
this.perMessageDeflate = (extensions || {})[PerMessageDeflate.extensionName];
27+
this._socket = socket;
28+
2729
this.firstFragment = true;
28-
this.processing = false;
2930
this.compress = false;
30-
this._socket = socket;
31-
this.onerror = null;
31+
32+
this.processing = false;
33+
this.bufferedBytes = 0;
3234
this.queue = [];
35+
36+
this.onerror = null;
3337
}
3438

3539
/**
@@ -86,10 +90,23 @@ class Sender {
8690
* @public
8791
*/
8892
ping (data, mask) {
93+
var readOnly = true;
94+
95+
if (data && !Buffer.isBuffer(data)) {
96+
if (data instanceof ArrayBuffer) {
97+
data = Buffer.from(data);
98+
} else if (ArrayBuffer.isView(data)) {
99+
data = viewToBuffer(data);
100+
} else {
101+
data = Buffer.from(data);
102+
readOnly = false;
103+
}
104+
}
105+
89106
if (this.perMessageDeflate) {
90-
this.enqueue([this.doPing, data, mask]);
107+
this.enqueue([this.doPing, data, mask, readOnly]);
91108
} else {
92-
this.doPing(data, mask);
109+
this.doPing(data, mask, readOnly);
93110
}
94111
}
95112

@@ -98,14 +115,15 @@ class Sender {
98115
*
99116
* @param {*} data The message to send
100117
* @param {Boolean} mask Specifies whether or not to mask `data`
118+
* @param {Boolean} readOnly Specifies whether `data` can be modified
101119
* @private
102120
*/
103-
doPing (data, mask) {
121+
doPing (data, mask, readOnly) {
104122
this.frameAndSend(data, {
105-
readOnly: true,
106123
opcode: 0x09,
107124
rsv1: false,
108125
fin: true,
126+
readOnly,
109127
mask
110128
});
111129

@@ -120,10 +138,23 @@ class Sender {
120138
* @public
121139
*/
122140
pong (data, mask) {
141+
var readOnly = true;
142+
143+
if (data && !Buffer.isBuffer(data)) {
144+
if (data instanceof ArrayBuffer) {
145+
data = Buffer.from(data);
146+
} else if (ArrayBuffer.isView(data)) {
147+
data = viewToBuffer(data);
148+
} else {
149+
data = Buffer.from(data);
150+
readOnly = false;
151+
}
152+
}
153+
123154
if (this.perMessageDeflate) {
124-
this.enqueue([this.doPong, data, mask]);
155+
this.enqueue([this.doPong, data, mask, readOnly]);
125156
} else {
126-
this.doPong(data, mask);
157+
this.doPong(data, mask, readOnly);
127158
}
128159
}
129160

@@ -132,14 +163,15 @@ class Sender {
132163
*
133164
* @param {*} data The message to send
134165
* @param {Boolean} mask Specifies whether or not to mask `data`
166+
* @param {Boolean} readOnly Specifies whether `data` can be modified
135167
* @private
136168
*/
137-
doPong (data, mask) {
169+
doPong (data, mask, readOnly) {
138170
this.frameAndSend(data, {
139-
readOnly: true,
140171
opcode: 0x0a,
141172
rsv1: false,
142173
fin: true,
174+
readOnly,
143175
mask
144176
});
145177

@@ -243,7 +275,7 @@ class Sender {
243275
/**
244276
* Frames and sends a piece of data according to the HyBi WebSocket protocol.
245277
*
246-
* @param {*} data The data to send
278+
* @param {Buffer} data The data to send
247279
* @param {Object} options Options object
248280
* @param {Number} options.opcode The opcode
249281
* @param {Boolean} options.readOnly Specifies whether `data` can be modified
@@ -267,17 +299,6 @@ class Sender {
267299
return;
268300
}
269301

270-
if (!Buffer.isBuffer(data)) {
271-
if (data instanceof ArrayBuffer) {
272-
data = Buffer.from(data);
273-
} else if (ArrayBuffer.isView(data)) {
274-
data = viewToBuffer(data);
275-
} else {
276-
data = Buffer.from(data);
277-
options.readOnly = false;
278-
}
279-
}
280-
281302
const mergeBuffers = data.length < 1024 || options.mask && options.readOnly;
282303
var dataOffset = options.mask ? 6 : 2;
283304
var payloadLength = data.length;
@@ -334,12 +355,13 @@ class Sender {
334355
dequeue () {
335356
if (this.processing) return;
336357

337-
const handler = this.queue.shift();
338-
if (!handler) return;
358+
const params = this.queue.shift();
359+
if (!params) return;
339360

361+
if (params[1]) this.bufferedBytes -= params[1].length;
340362
this.processing = true;
341363

342-
handler[0].apply(this, handler.slice(1));
364+
params[0].apply(this, params.slice(1));
343365
}
344366

345367
/**
@@ -361,6 +383,7 @@ class Sender {
361383
* @private
362384
*/
363385
enqueue (params) {
386+
if (params[1]) this.bufferedBytes += params[1].length;
364387
this.queue.push(params);
365388
this.dequeue();
366389
}

lib/WebSocket.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ class WebSocket extends EventEmitter {
8282
get bufferedAmount () {
8383
var amount = 0;
8484

85-
if (this._socket) amount = this._socket.bufferSize || 0;
85+
if (this._socket) {
86+
amount = this._socket.bufferSize + this._sender.bufferedBytes;
87+
}
8688
return amount;
8789
}
8890

test/Sender.test.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,24 @@ describe('Sender', function () {
7373
});
7474
});
7575

76+
describe('#pong', function () {
77+
it('works with multiple types of data', function (done) {
78+
let count = 0;
79+
const sender = new Sender({
80+
write: (data) => {
81+
assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
82+
if (++count === 3) done();
83+
}
84+
});
85+
86+
const array = new Uint8Array([0x68, 0x69]);
87+
88+
sender.pong(array.buffer, false);
89+
sender.pong(array, false);
90+
sender.pong('hi', false);
91+
});
92+
});
93+
7694
describe('#send', function () {
7795
it('compresses data if compress option is enabled', function (done) {
7896
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });

test/WebSocket.test.js

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,34 @@ describe('WebSocket', function () {
131131
});
132132

133133
it('defaults to zero upon "open"', function (done) {
134-
server.createServer(++port, (srv) => {
134+
const wss = new WebSocketServer({ port: ++port }, () => {
135135
const ws = new WebSocket(`ws://localhost:${port}`);
136136

137137
ws.onopen = () => {
138138
assert.strictEqual(ws.bufferedAmount, 0);
139-
140-
ws.on('close', () => srv.close(done));
141-
ws.close();
139+
wss.close(done);
142140
};
143141
});
144142
});
145143

146-
it('stress kernel write buffer', function (done) {
144+
it('takes into account the data in the sender queue', function (done) {
145+
const wss = new WebSocketServer({ port: ++port }, () => {
146+
const ws = new WebSocket(`ws://localhost:${port}`);
147+
148+
ws.on('open', () => {
149+
ws.send('foo');
150+
ws.send('bar', (err) => {
151+
assert.ifError(err);
152+
assert.strictEqual(ws.bufferedAmount, 0);
153+
wss.close(done);
154+
});
155+
156+
assert.strictEqual(ws.bufferedAmount, 3);
157+
});
158+
});
159+
});
160+
161+
it('takes into account the data in the socket queue', function (done) {
147162
const wss = new WebSocketServer({ port: ++port }, () => {
148163
const ws = new WebSocket(`ws://localhost:${port}`, {
149164
perMessageDeflate: false
@@ -152,7 +167,10 @@ describe('WebSocket', function () {
152167

153168
wss.on('connection', (ws) => {
154169
while (true) {
155-
if (ws.bufferedAmount > 0) break;
170+
if (ws._socket.bufferSize > 0) {
171+
assert.strictEqual(ws.bufferedAmount, ws._socket.bufferSize);
172+
break;
173+
}
156174
ws.send('hello'.repeat(1e4));
157175
}
158176
wss.close(done);

0 commit comments

Comments
 (0)