From 41ee77e3c6683cc6c111ecc4087fc02cdc548371 Mon Sep 17 00:00:00 2001 From: roam Date: Tue, 15 Jan 2013 17:17:47 +0800 Subject: [PATCH 1/4] Revert "Revert "Fixed packet send callback design issue"" This reverts commit ead08d6e5b5eb66012950805ef088e1a8efd47ea. --- lib/socket.js | 15 ++++++++++++--- lib/transports/flashsocket.js | 8 ++++++++ lib/transports/websocket.js | 8 ++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/socket.js b/lib/socket.js index 5a2f0b91b..81ffbb316 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -247,6 +247,13 @@ Socket.prototype.setupSendCallback = function () { if ('function' == typeof seqFn) { debug('executing send callback'); seqFn(self.transport); + } else if (Array.isArray(seqFn)) { + debug('executing batch send callback'); + for (var i in seqFn) { + if ('function' == typeof seqFn) { + seqFn[i](self.transport); + } + } } } }); @@ -288,9 +295,8 @@ Socket.prototype.sendPacket = function (type, data, callback) { this.writeBuffer.push(packet); //add send callback to object - if (callback) { - this.packetsFn.push(callback); - } + this.packetsFn.push(callback); + this.flush(); } }; @@ -309,6 +315,9 @@ Socket.prototype.flush = function () { this.server.emit('flush', this, this.writeBuffer); var wbuf = this.writeBuffer; this.writeBuffer = []; + if (!this.transport.supportsFraming) { + this.packetsFn = [this.packetsFn]; + } this.transport.send(wbuf); this.emit('drain'); this.server.emit('drain', this); diff --git a/lib/transports/flashsocket.js b/lib/transports/flashsocket.js index 4773c1114..794bb29fa 100644 --- a/lib/transports/flashsocket.js +++ b/lib/transports/flashsocket.js @@ -36,6 +36,14 @@ FlashSocket.prototype.__proto__ = WebSocket.prototype; FlashSocket.prototype.name = 'flashsocket'; +/** + * Advertise framing support. + * + * @api public + */ + +FlashSocket.prototype.supportsFraming = true; + /** * Listens for new configuration changes of the Manager * this way we can enable and disable the flash server. diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 6c474c8e6..448699e5e 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -55,6 +55,14 @@ WebSocket.prototype.name = 'websocket'; WebSocket.prototype.handlesUpgrades = true; +/** + * Advertise framing support. + * + * @api public + */ + +WebSocket.prototype.supportsFraming = true; + /** * Processes the incoming data. * From 06b39687ebce3d7370827855515541692071aa31 Mon Sep 17 00:00:00 2001 From: roam Date: Wed, 16 Jan 2013 14:35:56 +0800 Subject: [PATCH 2/4] Fixed send callback test failures --- lib/socket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/socket.js b/lib/socket.js index 81ffbb316..42f502b32 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -250,7 +250,7 @@ Socket.prototype.setupSendCallback = function () { } else if (Array.isArray(seqFn)) { debug('executing batch send callback'); for (var i in seqFn) { - if ('function' == typeof seqFn) { + if ('function' == typeof seqFn[i]) { seqFn[i](self.transport); } } From 72861f3d49bf5ac0d8d7671903fcbfbc6f1464e2 Mon Sep 17 00:00:00 2001 From: roam Date: Wed, 16 Jan 2013 15:13:50 +0800 Subject: [PATCH 3/4] Added a test case which makes the code before the send callback fix fail --- test/server.js | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/server.js b/test/server.js index 94254d8ae..c9d7cdae3 100644 --- a/test/server.js +++ b/test/server.js @@ -908,6 +908,29 @@ describe('server', function () { }); }); }); + + it('should not execute when it is not actually sent (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + socket.on('message', function(msg) { + socket.close(); + }); + + engine.on('connection', function (conn) { + var err = undefined; + + conn.send('a'); + conn.send('b', function (transport) { + err = new Error('Test invalidation'); + }); + + conn.on('close', function (reason) { + done(err); + }); + }); + }); + }); }); }); From 148f654c92c8ad56c859941b530562fca981a048 Mon Sep 17 00:00:00 2001 From: roam Date: Wed, 16 Jan 2013 17:31:42 +0800 Subject: [PATCH 4/4] Fixed bugs in previous send callback fix and updated test cases --- lib/socket.js | 11 ++++++++--- test/server.js | 40 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/lib/socket.js b/lib/socket.js index 42f502b32..c2a3f39e2 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -24,6 +24,7 @@ function Socket (id, server, transport) { this.readyState = 'opening'; this.writeBuffer = []; this.packetsFn = []; + this.sentCallbackFn = []; this.setTransport(transport); this.onOpen(); @@ -226,6 +227,7 @@ Socket.prototype.clearTransport = function () { Socket.prototype.onClose = function (reason, description) { if ('closed' != this.readyState) { this.packetsFn = []; + this.sentCallbackFn = []; this.clearTransport(); this.readyState = 'closed'; this.emit('close', reason, description); @@ -242,8 +244,8 @@ Socket.prototype.setupSendCallback = function () { var self = this; //the message was sent successfully, execute the callback this.transport.on('drain', function() { - if (self.packetsFn.length > 0) { - var seqFn = self.packetsFn.splice(0,1)[0]; + if (self.sentCallbackFn.length > 0) { + var seqFn = self.sentCallbackFn.splice(0,1)[0]; if ('function' == typeof seqFn) { debug('executing send callback'); seqFn(self.transport); @@ -316,8 +318,11 @@ Socket.prototype.flush = function () { var wbuf = this.writeBuffer; this.writeBuffer = []; if (!this.transport.supportsFraming) { - this.packetsFn = [this.packetsFn]; + this.sentCallbackFn.push(this.packetsFn) + } else { + this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); } + this.packetsFn = []; this.transport.send(wbuf); this.emit('drain'); this.server.emit('drain', this); diff --git a/test/server.js b/test/server.js index c9d7cdae3..bead79a8e 100644 --- a/test/server.js +++ b/test/server.js @@ -883,6 +883,43 @@ describe('server', function () { }); }); + it('should execute in multipart packet (polling)', function (done) { + var engine = listen(function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + var i = 0; + var j = 0; + + engine.on('connection', function (conn) { + conn.send('d', function (transport) { + i++; + }); + + conn.send('c', function (transport) { + i++; + }); + + conn.send('b', function (transport) { + i++; + }); + + conn.send('a', function (transport) { + i++; + }); + + }); + socket.on('open', function () { + socket.on('message', function (msg) { + j++; + }); + }); + + setTimeout(function () { + expect(i).to.be(j); + done(); + }, 200); + }); + }); + it('should clean callback references when socket gets closed with pending callbacks', function (done) { var engine = listen({ allowUpgrades: false }, function (port) { var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); @@ -903,6 +940,7 @@ describe('server', function () { conn.on('close', function (reason) { expect(conn.packetsFn).to.be.empty(); + expect(conn.sentCallbackFn).to.be.empty(); done(); }); }); @@ -913,7 +951,7 @@ describe('server', function () { var engine = listen({ allowUpgrades: false }, function (port) { var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); - socket.on('message', function(msg) { + socket.transport.on('pollComplete', function(msg) { socket.close(); });