Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 101 additions & 47 deletions lib/connection.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var net = require('net');
var util = require('util');
var tls = require('tls');
var EventEmitter = require('events').EventEmitter;
var Queue = require('double-ended-queue');

Expand Down Expand Up @@ -75,20 +76,8 @@ function Connection (opts)

this._outOfOrderPackets = [];

this.stream.once('error', function (err) {
err.fatal = true;
// stop receiving packets
connection.stream.removeAllListeners('data');
connection.addCommand = function (cmd) {
if (cmd.onResult) {
cmd.onResult(err);
} else {
connection.emit('error', err);
}
return;
};
connection._notifyError(err);
connection._fatalError = err;
this.stream.once('error', function(err) {
connection._handleNetworkError(err);
});

// see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
Expand Down Expand Up @@ -124,6 +113,23 @@ function Connection (opts)
}
util.inherits(Connection, EventEmitter);

Connection.prototype._handleNetworkError = function(err) {
var connection = this;
err.fatal = true;
// stop receiving packets
connection.stream.removeAllListeners('data');
connection.addCommand = function (cmd) {
if (cmd.onResult) {
cmd.onResult(err);
} else {
connection.emit('error', err);
}
return;
};
connection._notifyError(err);
connection._fatalError = err;
};

// notify all commands in the queue and bubble error as connection "error"
// called on stream error or unexpected termination
Connection.prototype._notifyError = function (err) {
Expand Down Expand Up @@ -175,42 +181,90 @@ Connection.prototype.writePacket = function (packet) {
this.write(packet.buffer);
};

Connection.prototype.startTLS = function _startTLS (onSecure) {
if (this.config.debug) {
console.log('Upgrading connection to TLS');
}
var connection = this;
var tls = require('tls');
var crypto = require('crypto');
var config = this.config;
var stream = this.stream;
var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
var credentials = crypto.createCredentials({
key : config.ssl.key,
cert : config.ssl.cert,
passphrase : config.ssl.passphrase,
ca : config.ssl.ca,
ciphers : config.ssl.ciphers
});
var securePair = tls.createSecurePair(credentials, false, true, rejectUnauthorized);
if (tls.TLSSocket) {
// 0.11+ environment
Connection.prototype.startTLS = function _startTLS (onSecure) {
if (this.config.debug) {
console.log('Upgrading connection to TLS');
}
var connection = this;
var stream = this.stream;
var secureContext = tls.createSecureContext({
ca : this.config.ssl.ca,
cert : this.config.ssl.cert,
ciphers : this.config.ssl.ciphers,
key : this.config.ssl.key,
passphrase : this.config.ssl.passphrase
});

if (stream.ondata) {
stream.ondata = null;
}
stream.removeAllListeners('data');
stream.pipe(securePair.encrypted);
securePair.encrypted.pipe(stream);
securePair.cleartext.on('data', function (data) {
connection.packetParser.execute(data);
});
connection.write = function (buffer) {
securePair.cleartext.write(buffer);
var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
var secureEstablished = false;
var secureSocket = new tls.TLSSocket(connection.stream, {
rejectUnauthorized : rejectUnauthorized,
requestCert : true,
secureContext : secureContext,
isServer : false
});

// error handler for secure socket
secureSocket.on('_tlsError', function(err) {
if (secureEstablished) {
connection._handleNetworkError(err);
} else {
onSecure(err);
}
});

secureSocket.on('secure', function() {
secureEstablished = true;
onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
});
secureSocket.on('data', function (data) {
connection.packetParser.execute(data);
});
connection.write = function (buffer) {
secureSocket.write(buffer);
};
// start TLS communications
secureSocket._start();
};
securePair.on('secure', function () {
onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
});
};
} else {
Connection.prototype.startTLS = function _startTLS (onSecure) {
if (this.config.debug) {
console.log('Upgrading connection to TLS');
}
var connection = this;
var tls = require('tls');
var crypto = require('crypto');
var config = this.config;
var stream = this.stream;
var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
var credentials = crypto.createCredentials({
key : config.ssl.key,
cert : config.ssl.cert,
passphrase : config.ssl.passphrase,
ca : config.ssl.ca,
ciphers : config.ssl.ciphers
});
var securePair = tls.createSecurePair(credentials, false, true, rejectUnauthorized);

if (stream.ondata) {
stream.ondata = null;
}
stream.removeAllListeners('data');
stream.pipe(securePair.encrypted);
securePair.encrypted.pipe(stream);
securePair.cleartext.on('data', function (data) {
connection.packetParser.execute(data);
});
connection.write = function (buffer) {
securePair.cleartext.write(buffer);
};
securePair.on('secure', function () {
onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
});
};
}

Connection.prototype.pipe = function () {
var connection = this;
Expand Down