Skip to content

Commit d5de7fe

Browse files
committed
support res.removeListener('drain'), res.once('drain')
Fixes expressjs#152 Fixes expressjs#135
1 parent becc1c0 commit d5de7fe

File tree

3 files changed

+214
-0
lines changed

3 files changed

+214
-0
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ node_js:
1212
- "7.10"
1313
- "8.11"
1414
- "9.11"
15+
- "10"
1516
sudo: false
1617
cache:
1718
directories:

index.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ function compression (options) {
6464

6565
var _end = res.end
6666
var _on = res.on
67+
var _removeListener = res.removeListener
6768
var _write = res.write
6869

6970
// flush
@@ -131,6 +132,32 @@ function compression (options) {
131132
return this
132133
}
133134

135+
res.addListener = res.on
136+
137+
res.removeListener = function removeListener (type, listener) {
138+
if (!listeners || type !== 'drain') {
139+
return _removeListener.call(this, type, listener)
140+
}
141+
142+
if (stream) {
143+
return stream.removeListener(type, listener)
144+
}
145+
146+
// remove buffered listener
147+
for (var i = listeners.length - 1; i >= 0; i--) {
148+
if (listeners[i][0] === type && listeners[i][1] === listener) {
149+
listeners.splice(i, 1)
150+
}
151+
}
152+
153+
return this
154+
}
155+
156+
if (res.off) {
157+
// emitter.off was added in Node.js v10+; don't add it to earlier versions
158+
res.off = res.removeListener
159+
}
160+
134161
function nocompress (msg) {
135162
debug('no compression: %s', msg)
136163
addListeners(res, _on, listeners)

test/compression.js

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,185 @@ describe('compression()', function () {
301301
.expect(200, done)
302302
})
303303

304+
it('should support removeListener("drain") after on("drain"); stream present', function (done) {
305+
// compression doesn't proxy listenerCount() to the compression stream, so
306+
// instead watch for a MaxListenersExceededWarning
307+
var hasWarned = false
308+
var onWarning = function () {
309+
hasWarned = true
310+
}
311+
process.on('warning', onWarning)
312+
var server = createServer({threshold: 0}, function (req, res) {
313+
res.setHeader('Content-Type', 'text/plain')
314+
var len = bytes('40kb')
315+
var buf = Buffer.alloc(len, '.')
316+
res.write(buf)
317+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
318+
var listener = function () {}
319+
res.on('drain', listener)
320+
res.removeListener('drain', listener)
321+
}
322+
res.end()
323+
})
324+
325+
request(server)
326+
.get('/')
327+
.set('Accept-Encoding', 'gzip')
328+
.expect(function () {
329+
process.removeListener('warning', onWarning)
330+
assert.ok(!hasWarned)
331+
})
332+
.expect(200, done)
333+
})
334+
335+
it('should support removeListener("drain") after addListener("drain")', function (done) {
336+
var hasWarned = false
337+
var onWarning = function () {
338+
hasWarned = true
339+
}
340+
process.on('warning', onWarning)
341+
var server = createServer({threshold: 0}, function (req, res) {
342+
res.setHeader('Content-Type', 'text/plain')
343+
var len = bytes('40kb')
344+
var buf = Buffer.alloc(len, '.')
345+
res.write(buf)
346+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
347+
var listener = function () {}
348+
res.addListener('drain', listener)
349+
res.removeListener('drain', listener)
350+
}
351+
res.end()
352+
})
353+
354+
request(server)
355+
.get('/')
356+
.set('Accept-Encoding', 'gzip')
357+
.expect(function () {
358+
process.removeListener('warning', onWarning)
359+
assert.ok(!hasWarned)
360+
})
361+
.expect(200, done)
362+
})
363+
364+
it('should support off("drain") after addListener("drain")', function (done) {
365+
if (!require('events').prototype.off) { // off was added in Node.js v10
366+
this.skip()
367+
}
368+
var hasWarned = false
369+
var onWarning = function () {
370+
hasWarned = true
371+
}
372+
process.on('warning', onWarning)
373+
var server = createServer({threshold: 0}, function (req, res) {
374+
res.setHeader('Content-Type', 'text/plain')
375+
var len = bytes('40kb')
376+
var buf = Buffer.alloc(len, '.')
377+
res.write(buf)
378+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
379+
var listener = function () {}
380+
res.addListener('drain', listener)
381+
res.off('drain', listener)
382+
}
383+
res.end()
384+
})
385+
386+
request(server)
387+
.get('/')
388+
.set('Accept-Encoding', 'gzip')
389+
.expect(function () {
390+
process.removeListener('warning', onWarning)
391+
assert.ok(!hasWarned)
392+
})
393+
.expect(200, done)
394+
})
395+
396+
it('should support removeListener("drain"); buffered', function (done) {
397+
// Variant of above tests for scenario when the listener is buffered (stream
398+
// is not yet present).
399+
var hasWarned = false
400+
var onWarning = function () {
401+
hasWarned = true
402+
}
403+
process.on('warning', onWarning)
404+
var server = createServer({threshold: 0}, function (req, res) {
405+
res.setHeader('Content-Type', 'text/plain')
406+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
407+
var listener = function () {}
408+
res.on('drain', listener)
409+
res.removeListener('drain', listener)
410+
}
411+
res.end()
412+
})
413+
414+
request(server)
415+
.get('/')
416+
.set('Accept-Encoding', 'gzip')
417+
.expect(function () {
418+
process.removeListener('warning', onWarning)
419+
assert.ok(!hasWarned)
420+
})
421+
.expect(200, done)
422+
})
423+
424+
it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) {
425+
// Variant of above test for scenario when the listener is buffered (stream
426+
// is not yet present) and the same listener is added two or more times.
427+
var hasWarned = false
428+
var onWarning = function () {
429+
hasWarned = true
430+
}
431+
process.on('warning', onWarning)
432+
var server = createServer({threshold: 0}, function (req, res) {
433+
res.setHeader('Content-Type', 'text/plain')
434+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
435+
var listener = function () {}
436+
res.on('drain', listener)
437+
res.on('drain', listener)
438+
res.removeListener('drain', listener)
439+
}
440+
res.end()
441+
})
442+
443+
request(server)
444+
.get('/')
445+
.set('Accept-Encoding', 'gzip')
446+
.expect(function () {
447+
process.removeListener('warning', onWarning)
448+
assert.ok(!hasWarned)
449+
})
450+
.expect(200, done)
451+
})
452+
453+
it('should not leak event listeners when res.unpipe() is used (#135)', function (done) {
454+
var hasWarned = false
455+
var onWarning = function () {
456+
hasWarned = true
457+
}
458+
var server = createServer({threshold: 0}, function (req, res) {
459+
var times = 0
460+
var int = setInterval(function () {
461+
var rs = require('fs').createReadStream('does not exist')
462+
rs.on('error', function (e) {
463+
rs.unpipe(res)
464+
})
465+
rs.pipe(res)
466+
if (times++ > res.getMaxListeners()) {
467+
clearInterval(int)
468+
res.end('hello, world')
469+
}
470+
})
471+
})
472+
473+
request(server)
474+
.get('/')
475+
.set('Accept-Encoding', 'gzip')
476+
.expect(function () {
477+
process.removeListener('warning', onWarning)
478+
assert.ok(!hasWarned)
479+
})
480+
.expect(200, done)
481+
})
482+
304483
describe('threshold', function () {
305484
it('should not compress responses below the threshold size', function (done) {
306485
var server = createServer({ threshold: '1kb' }, function (req, res) {
@@ -656,6 +835,13 @@ function createServer (opts, fn) {
656835
return
657836
}
658837

838+
if (typeof res.getMaxListeners !== 'function') {
839+
// Added in v0.11.2
840+
res.getMaxListeners = function getMaxListeners () {
841+
return 10
842+
}
843+
}
844+
659845
fn(req, res)
660846
})
661847
})

0 commit comments

Comments
 (0)