diff --git a/index.js b/index.js index 5dd09c7b..87c31e24 100644 --- a/index.js +++ b/index.js @@ -75,9 +75,39 @@ function compression (options) { var stream var _end = res.end - var _on = res.on var _write = res.write + // proxy drain events from stream + var _addListener = interceptAddListener(res, function (type, listener) { + if (!listeners || type !== 'drain') { + // skip intercept + return false + } else if (stream) { + // add listener to stream instead + stream.on(type, listener) + } else { + // buffer listeners for future stream + listeners.push([type, listener]) + } + }) + + interceptRemoveListener(res, function (type, listener) { + if (!listeners || type !== 'drain') { + // skip intercept + return false + } else if (stream) { + // remove listener from stream + stream.removeListener(type, listener) + } else { + // remove buffered listener + for (var i = listeners.length - 1; i >= 0; i--) { + if (listeners[i][0] === type && listeners[i][1] === listener) { + listeners.splice(i, 1) + } + } + } + }) + // flush res.flush = function flush (cb) { if (stream) { @@ -128,24 +158,9 @@ function compression (options) { : stream.end() } - res.on = function on (type, listener) { - if (!listeners || type !== 'drain') { - return _on.call(this, type, listener) - } - - if (stream) { - return stream.on(type, listener) - } - - // buffer listeners for future stream - listeners.push([type, listener]) - - return this - } - function nocompress (msg) { debug('no compression: %s', msg) - addListeners(res, _on, listeners) + addListeners(res, _addListener, listeners) listeners = null } @@ -231,7 +246,7 @@ function compression (options) { _end.call(res) }) - _on.call(res, 'drain', function onResponseDrain () { + _addListener.call(res, 'drain', function onResponseDrain () { stream.resume() }) @@ -254,9 +269,9 @@ function compression (options) { * @private */ -function addListeners (stream, on, listeners) { +function addListeners (stream, addListener, listeners) { for (var i = 0; i < listeners.length; i++) { - on.apply(stream, listeners[i]) + addListener.apply(stream, listeners[i]) } } @@ -274,6 +289,69 @@ function chunkLength (chunk, encoding) { : Buffer.byteLength(chunk, encoding) } +/** + * Intercept add listener on event emitter. + * @private + */ + +function interceptAddListener (ee, fn) { + var _addListener = ee.addListener + var _on = ee.on + + if (_addListener) { + ee.addListener = function addListener (type, listener) { + return fn.call(this, type, listener) === false + ? _addListener.call(this, type, listener) + : this + } + } + + if (_on) { + ee.on = function on (type, listener) { + return fn.call(this, type, listener) === false + ? _on.call(this, type, listener) + : this + } + } + + return _addListener || _on || noop +} + +/** + * Intercept remove listener on event emitter. + * @private + */ + +function interceptRemoveListener (ee, fn) { + var _removeListener = ee.removeListener + var _off = ee.off + + if (_removeListener) { + ee.removeListener = function removeListener (type, listener) { + return fn.call(this, type, listener) === false + ? _removeListener.call(this, type, listener) + : this + } + } + + if (_off) { + ee.off = function off (type, listener) { + return fn.call(this, type, listener) === false + ? _off.call(this, type, listener) + : this + } + } + + return _removeListener || _off || noop +} + +/** + * Reusable no-op function. + * @private + */ + +function noop () {} + /** * Default filter function. * @private diff --git a/test/compression.js b/test/compression.js index 78173c33..13074437 100644 --- a/test/compression.js +++ b/test/compression.js @@ -306,6 +306,186 @@ describe('compression()', function () { .expect(200, done) }) + describe('listeners', function () { + it('should support removeListener("drain") after on("drain"); stream present', function (done) { + // compression doesn't proxy listenerCount() to the compression stream, so + // instead watch for a MaxListenersExceededWarning + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain") after addListener("drain")', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support off("drain") after addListener("drain")', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.off('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); buffered', function (done) { + // Variant of above tests for scenario when the listener is buffered (stream + // is not yet present). + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + res.on('end', function () {}) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) { + // Variant of above test for scenario when the listener is buffered (stream + // is not yet present) and the same listener is added two or more times. + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({ threshold: 0 }, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + // https://github.com/expressjs/compression/issues/135 + it('should not leak event listeners when res.unpipe()', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + var server = createServer({ threshold: 0 }, function (req, res) { + var times = 0 + var int = setInterval(function () { + var rs = require('fs').createReadStream('does not exist') + rs.on('error', function (e) { + rs.unpipe(res) + }) + rs.pipe(res) + if (times++ > res.getMaxListeners()) { + clearInterval(int) + res.end('hello, world') + } + }) + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + }) + describe('http2', function () { it('should work with http2 server', function (done) { var server = createHttp2Server({ threshold: 0 }, function (req, res) {