Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 49 additions & 110 deletions lib/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,13 @@ class Publisher extends EventEmitter {

this._subClients = {};

this._port = 0;

this._server = null;

this._messageHandler = options.typeClass;

// messages published before this publisher
// was registered will be held here
this._msgQueue = [];

this._setupTcp()
.then(() => {
this._register();
});
this._register();
};

getTopic() {
Expand Down Expand Up @@ -213,110 +206,60 @@ class Publisher extends EventEmitter {
this._msgQueue = [];
}

_setupTcp() {
// recursively tries to setup server on open port
// calls callback when setup is done
let _createServer = (callback) => {
NetworkUtils.getFreePort()
.then((port) => {
let server = net.createServer((subscriber) => {
let subName = subscriber.remoteAddress + ":"
+ subscriber.remotePort;
subscriber.name = subName;
this._log.debug('Publisher ' + this.getTopic()
+ ' got connection from ' + subName);

// subscriber will send us tcpros handshake before we can
// start publishing to it.
subscriber.$handshake =
this._handleHandshake.bind(this, subscriber);

// handshake will be TCPROS encoded, so use a DeserializeStream to
// handle any chunking
let deserializeStream = new DeserializeStream();
subscriber.pipe(deserializeStream);
deserializeStream.on('message', subscriber.$handshake);

// if this publisher had the tcpNoDelay option set
// disable the nagle algorithm
if (this._tcpNoDelay) {
subscriber.setNoDelay(true);
}
handleSubscriberConnection(subscriber, deserializeStream, header) {
let error = TcprosUtils.validateSubHeader(
header, this.getTopic(), this.getType(),
this._messageHandler.md5sum());
if (error !== null) {
this._log.error('Unable to validate connection header '
+ JSON.stringify(header));
subscriber.end(Serialize(error));
return;
}
// else
this._log.debug('Pub %s got connection header %s', this.getTopic(), JSON.stringify(header));

// create and send response
let respHeader =
TcprosUtils.createPubHeader(
this._nodeHandle.getNodeName(),
this._messageHandler.md5sum(),
this.getType(),
this.getLatching());
subscriber.write(respHeader);

// remove connections to deserializeStream - it's unnecessary now
deserializeStream.removeAllListeners();

// if this publisher had the tcpNoDelay option set
// disable the nagle algorithm
if (this._tcpNoDelay) {
subscriber.setNoDelay(true);
}

subscriber.on('close', () => {
this._log.info('Publisher ' + this.getTopic() + ' client '
+ subscriber.name + ' disconnected!');
delete this._subClients[subscriber.name];
});

subscriber.on('end', () => {
this._log.info('Sub %s sent END', subscriber.name);
});

subscriber.on('error', () => {
this._log.info('Sub %s had error', subscriber.name);
});
}).listen(port);

// it's possible the port was taken before we could use it
server.on('error', (err) => {
if (err.code === 'EADDRINUSE') {
_createServer(callback);
}
});
subscriber.on('close', () => {
this._log.info('Publisher %s' + this.getTopic() + ' client '
+ subscriber.name + ' disconnected!');
delete this._subClients[subscriber.name];
});

// the port was available
server.on('listening', () => {
this._log.debug('Listening on port ' + port);
this._port = port;
this._server = server;
callback(port);
});
});
};
subscriber.on('end', () => {
this._log.info('Sub %s sent END', subscriber.name);
});

return new Promise((resolve, reject) => {
_createServer(resolve);
subscriber.on('error', () => {
this._log.warn('Sub %s had error', subscriber.name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit funny to me that this error is reported as a warning. Was that intended?

});
}

_handleHandshake(subscriber, data) {
if (!subscriber.$initialized) {
let header = TcprosUtils.parseSubHeader(data);
let valid = TcprosUtils.validateSubHeader(
header, this.getTopic(), this.getType(),
this._messageHandler.md5sum());
if (valid !== null) {
this._log.error('Unable to validate connection header '
+ JSON.stringify(header));
subscriber.write(Serialize(valid));
return;
}
this._log.debug('Pub ' + this.getTopic()
+ ' got connection header ' + JSON.stringify(header));

let respHeader =
TcprosUtils.createPubHeader(
this._nodeHandle.getNodeName(),
this._messageHandler.md5sum(),
this.getType(),
this.getLatching());
subscriber.write(respHeader);

if (this._lastSentMsg !== null) {
this._log.debug('Sending latched msg to new subscriber');
subscriber.write(this._lastSentMsg);
}
if (this._lastSentMsg !== null) {
this._log.debug('Sending latched msg to new subscriber');
subscriber.write(this._lastSentMsg);
}

// if handshake good, add to list, we'll start publishing to it
this._subClients[subscriber.name] = subscriber;
// if handshake good, add to list, we'll start publishing to it
this._subClients[subscriber.name] = subscriber;

this.emit('connection', subscriber.name);
}
else {
this._log.error(
'Got message from subscriber after handshake - what gives!!');
}
this.emit('connection', header, subscriber.name);
}

_register() {
Expand All @@ -332,14 +275,10 @@ class Publisher extends EventEmitter {
}
})
.catch((err, resp) => {
this._log.error('reg pub err ' + err + ' resp: '
this._log.error('reg pub err ' + err + ' resp: '
+ JSON.stringify(resp));
})
}

getSubPort() {
return this._port;
}
};

module.exports = Publisher;
92 changes: 85 additions & 7 deletions lib/RosNode.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ let ServiceClient = require('./ServiceClient.js');
let ServiceServer = require('./ServiceServer.js');
let NetworkUtils = require('../utils/network_utils.js');
let messageUtils = require('../utils/message_utils.js');
let tcprosUtils = require('../utils/tcpros_utils.js');
let SerializationUtils = require('../utils/serialization_utils.js');
let DeserializeStream = SerializationUtils.DeserializeStream;
let Deserialize = SerializationUtils.Deserialize;
let Serialize = SerializationUtils.Serialize;
let EventEmitter = require('events');
let logger = require('../utils/logger.js');

Expand All @@ -43,8 +48,12 @@ class RosNode extends EventEmitter {

this._log = logger.createLogger({name: nodeName});

this._slaveApiServer = null;
this._xmlrpcPort = null;

this._tcprosServer = null;
this._tcprosPort = null;

this._nodeName = nodeName;

this._rosMasterAddress = rosMaster;
Expand All @@ -60,7 +69,8 @@ class RosNode extends EventEmitter {

this._services = {};

this._setupSlaveApi();
this._setupTcprosServer()
.then(this._setupSlaveApi.bind(this));

this._setupExitHandler();
}
Expand Down Expand Up @@ -145,12 +155,12 @@ class RosNode extends EventEmitter {
// Master API
//------------------------------------------------------------------

registerService(service, serviceUri) {
return this._masterApi.registerService(this._nodeName, service, serviceUri, this._getXmlrpcUri());
registerService(service) {
return this._masterApi.registerService(this._nodeName, service, NetworkUtils.formatServiceUri(this._tcprosPort), this._getXmlrpcUri());
}

unregisterService(service, serviceUri) {
return this._masterApi.unregisterService(this._nodeName, service, serviceUri);
unregisterService(service) {
return this._masterApi.unregisterService(this._nodeName, service, NetworkUtils.formatServiceUri(this._tcprosPort));
}

registerSubscriber(topic, topicType) {
Expand Down Expand Up @@ -270,13 +280,81 @@ class RosNode extends EventEmitter {
});
}

_setupTcprosServer() {
// recursively tries to setup server on open port
// calls callback when setup is done
let _createServer = (callback) => {
NetworkUtils.getFreePort()
.then((port) => {
let server = net.createServer((connection) => {
let conName = connection.remoteAddress + ":"
+ connection.remotePort;
connection.name = conName;
this._log.debug('Node %s got connection from %s', this.getNodeName(), conName);

// data from connections will be TCPROS encoded, so use a
// DeserializeStream to handle any chunking
let deserializeStream = new DeserializeStream();
connection.pipe(deserializeStream);

const checkConnectionHeader = (headerData) => {
const header = tcprosUtils.parseTcpRosHeader(headerData);
if (!header) {
this._log.error('Unable to validate connection header %s', headerData);
connection.end(Serialize(String('Unable to validate connection header')));
return;
}
this._log.debug('Got connection header: %j', header);

if (header.hasOwnProperty('topic')) {
// this is a subscriber, validate header and pass off connection to appropriate publisher
const topic = header.topic;
const pub = this._publishers[topic];
if (pub) {
pub.handleSubscriberConnection(connection, deserializeStream, header);
}
}
else if (header.hasOwnProperty('service')) {
// this is a service client, validate header and pass off connection to appropriate service provider
const service = header.service;
const serviceProvider = this._services[service];
if (serviceProvider) {
serviceProvider.handleClientConnection(connection, deserializeStream, header);
}
}
};
deserializeStream.on('message', checkConnectionHeader);
}).listen(port);

// it's possible the port was taken before we could use it
server.on('error', (err) => {
if (err.code === 'EADDRINUSE') {
_createServer(callback);
}
});

// the port was available
server.on('listening', () => {
this._log.debug('Listening on port ' + port);
this._tcprosPort = port;
this._server = server;
callback(port);
});
});
};

return new Promise((resolve, reject) => {
_createServer(resolve);
});
}

_handleTopicRequest(err, params, callback) {
this._log.debug('Got topic request ' + JSON.stringify(params));
if (!err) {
let topic = params[1];
let pub = this._publishers[topic];
if (pub) {
let port = pub.getSubPort();
let port = this._tcprosPort;
let resp = [
1,
'Allocated topic connection on port ' + port,
Expand Down Expand Up @@ -416,7 +494,7 @@ class RosNode extends EventEmitter {

Object.keys(this._services).forEach((service) => {
let serv = this._services[service];
promises.push(this.unregisterService(service, serv.getServiceUri()));
promises.push(this.unregisterService(service));
});

if (!fromExit) {
Expand Down
Loading