diff --git a/lib/socket.js b/lib/socket.js index 5a2f0b91b..c2a3f39e2 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -24,6 +24,7 @@ function Socket (id, server, transport) { this.readyState = 'opening'; this.writeBuffer = []; this.packetsFn = []; + this.sentCallbackFn = []; this.setTransport(transport); this.onOpen(); @@ -226,6 +227,7 @@ Socket.prototype.clearTransport = function () { Socket.prototype.onClose = function (reason, description) { if ('closed' != this.readyState) { this.packetsFn = []; + this.sentCallbackFn = []; this.clearTransport(); this.readyState = 'closed'; this.emit('close', reason, description); @@ -242,11 +244,18 @@ Socket.prototype.setupSendCallback = function () { var self = this; //the message was sent successfully, execute the callback this.transport.on('drain', function() { - if (self.packetsFn.length > 0) { - var seqFn = self.packetsFn.splice(0,1)[0]; + if (self.sentCallbackFn.length > 0) { + var seqFn = self.sentCallbackFn.splice(0,1)[0]; if ('function' == typeof seqFn) { debug('executing send callback'); seqFn(self.transport); + } else if (Array.isArray(seqFn)) { + debug('executing batch send callback'); + for (var i in seqFn) { + if ('function' == typeof seqFn[i]) { + seqFn[i](self.transport); + } + } } } }); @@ -288,9 +297,8 @@ Socket.prototype.sendPacket = function (type, data, callback) { this.writeBuffer.push(packet); //add send callback to object - if (callback) { - this.packetsFn.push(callback); - } + this.packetsFn.push(callback); + this.flush(); } }; @@ -309,6 +317,12 @@ Socket.prototype.flush = function () { this.server.emit('flush', this, this.writeBuffer); var wbuf = this.writeBuffer; this.writeBuffer = []; + if (!this.transport.supportsFraming) { + this.sentCallbackFn.push(this.packetsFn) + } else { + this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); + } + this.packetsFn = []; this.transport.send(wbuf); this.emit('drain'); this.server.emit('drain', this); diff --git a/lib/transports/flashsocket.js b/lib/transports/flashsocket.js index 4773c1114..794bb29fa 100644 --- a/lib/transports/flashsocket.js +++ b/lib/transports/flashsocket.js @@ -36,6 +36,14 @@ FlashSocket.prototype.__proto__ = WebSocket.prototype; FlashSocket.prototype.name = 'flashsocket'; +/** + * Advertise framing support. + * + * @api public + */ + +FlashSocket.prototype.supportsFraming = true; + /** * Listens for new configuration changes of the Manager * this way we can enable and disable the flash server. diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 6c474c8e6..448699e5e 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -55,6 +55,14 @@ WebSocket.prototype.name = 'websocket'; WebSocket.prototype.handlesUpgrades = true; +/** + * Advertise framing support. + * + * @api public + */ + +WebSocket.prototype.supportsFraming = true; + /** * Processes the incoming data. * diff --git a/test/server.js b/test/server.js index 94254d8ae..bead79a8e 100644 --- a/test/server.js +++ b/test/server.js @@ -883,6 +883,43 @@ describe('server', function () { }); }); + it('should execute in multipart packet (polling)', function (done) { + var engine = listen(function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + var i = 0; + var j = 0; + + engine.on('connection', function (conn) { + conn.send('d', function (transport) { + i++; + }); + + conn.send('c', function (transport) { + i++; + }); + + conn.send('b', function (transport) { + i++; + }); + + conn.send('a', function (transport) { + i++; + }); + + }); + socket.on('open', function () { + socket.on('message', function (msg) { + j++; + }); + }); + + setTimeout(function () { + expect(i).to.be(j); + done(); + }, 200); + }); + }); + it('should clean callback references when socket gets closed with pending callbacks', function (done) { var engine = listen({ allowUpgrades: false }, function (port) { var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); @@ -903,11 +940,35 @@ describe('server', function () { conn.on('close', function (reason) { expect(conn.packetsFn).to.be.empty(); + expect(conn.sentCallbackFn).to.be.empty(); done(); }); }); }); }); + + it('should not execute when it is not actually sent (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + socket.transport.on('pollComplete', function(msg) { + socket.close(); + }); + + engine.on('connection', function (conn) { + var err = undefined; + + conn.send('a'); + conn.send('b', function (transport) { + err = new Error('Test invalidation'); + }); + + conn.on('close', function (reason) { + done(err); + }); + }); + }); + }); }); });