diff --git a/example.js b/example.js index aa61bdb..3deda0d 100644 --- a/example.js +++ b/example.js @@ -7,21 +7,16 @@ const SetBool = rosnodejs.require('std_srvs').srv.SetBool; rosnodejs.initNode('/my_node') .then((rosNode) => { // EXP 1) Service Server - let service = rosNode.advertiseService({ - service: '/set_bool', - type: 'std_srvs/SetBool' - }, (req, resp) => { - console.log('Handling request! ' + JSON.stringify(req)); - resp.success = !req.data; - resp.message = 'Inverted!'; - return true; + let service = rosNode.advertiseService('/set_bool', SetBool, + (req, resp) => { + console.log('Handling request! ' + JSON.stringify(req)); + resp.success = !req.data; + resp.message = 'Inverted!'; + return true; }); // EXP 2) Service Client - let serviceClient = rosNode.serviceClient({ - service: '/set_bool', - type: 'std_srvs/SetBool' - }); + let serviceClient = rosNode.serviceClient('/set_bool', 'std_srvs/SetBool'); rosNode.waitForService(serviceClient.getService(), 2000) .then((available) => { if (available) { @@ -39,13 +34,13 @@ rosnodejs.initNode('/my_node') }); // EXP 4) Publisher - let pub = rosNode.advertise({ - topic: '/my_topic', - type: 'std_msgs/String', - queueSize: 1, - latching: true, - throttleMs: 9 - }); + let pub = rosNode.advertise( '/my_topic', std_msgs.String, + { + queueSize: 1, + latching: true, + throttleMs: 9 + } + ); let msgStart = 'my message '; let iter = 0; @@ -60,12 +55,16 @@ rosnodejs.initNode('/my_node') }, 5); // EXP 5) Subscriber - let sub = rosNode.subscribe({ - topic: '/my_topic', - type: 'std_msgs/String', - queueSize: 1, - throttleMs: 1000}, + let sub = rosNode.subscribe('/my_topic', 'std_msgs/String', (data) => { console.log('SUB DATA ' + data.data); - }); + }, + { + queueSize: 1, + throttleMs: 1000 + } + ); +}) +.catch((err) => { + console.log(err); }); diff --git a/example2.js b/example2.js index a48f13a..f4e01a7 100644 --- a/example2.js +++ b/example2.js @@ -4,7 +4,7 @@ let rosnodejs = require('./index.js'); // const std_msgs = rosnodejs.require('std_msgs').msg; // const SetBool = rosnodejs.require('std_srvs').srv.SetBool; -rosnodejs.use(['std_msgs/String'], +rosnodejs.use(['std_msgs/String'], ['std_srvs/SetBool'], function() { const msg = new (rosnodejs.message('std_msgs/String'))( @@ -14,10 +14,8 @@ rosnodejs.use(['std_msgs/String'], .then((rosNode) => { // EXP 1) Service Server - let service = rosNode.advertiseService({ - service: '/set_bool', - type: 'std_srvs/SetBool' - }, (req, resp) => { + let service = rosNode.advertiseService('/set_bool','std_srvs/SetBool', + (req, resp) => { console.log('Handling request! ' + JSON.stringify(req)); resp.success = !req.data; resp.message = 'Inverted!'; @@ -26,36 +24,31 @@ rosnodejs.use(['std_msgs/String'], // EXP 2) Service Client setTimeout(function() { - let serviceClient = rosNode.serviceClient({ - service: '/set_bool', - type: 'std_srvs/SetBool' - }); + let serviceClient = rosNode.serviceClient('/set_bool','std_srvs/SetBool'); rosNode.waitForService(serviceClient.getService(), 2000) .then((available) => { if (available) { - const request = - new (rosnodejs.serviceRequest('std_srvs/SetBool'))({ + const request = + new (rosnodejs.serviceRequest('std_srvs/SetBool'))({ data: false }); serviceClient.call(request, (resp) => { console.log('Service response ' + JSON.stringify(resp)); }); } else { - console.log('Service not available'); + console.log('Service not available'); } }); }, 1000); // wait a second before calling our service // EXP 3) Params rosNode.setParam('~junk', {'hi': 2}).then(() => { - rosNode.getParam('~junk').then((val) => { - console.log('Got Param!!! ' + JSON.stringify(val)); + rosNode.getParam('~junk').then((val) => { + console.log('Got Param!!! ' + JSON.stringify(val)); }); }); // // EXP 4) Publisher - let pub = rosNode.advertise({ - topic: '/my_topic', - type: 'std_msgs/String', + let pub = rosNode.advertise('/my_topic','std_msgs/String', { queueSize: 1, latching: true, throttleMs: 9 @@ -73,15 +66,12 @@ rosnodejs.use(['std_msgs/String'], }, 5); // EXP 5) Subscriber - let sub = rosNode.subscribe({ - topic: '/my_topic', - type: 'std_msgs/String', - queueSize: 1, - throttleMs: 1000}, + let sub = rosNode.subscribe('/my_topic', 'std_msgs/String', (data) => { console.log('SUB DATA ', data, data.data); - }); + }, + {queueSize: 1, + throttleMs: 1000}); }); - -}); +}); diff --git a/index.js b/index.js index 2d876f2..f5538bb 100644 --- a/index.js +++ b/index.js @@ -149,8 +149,10 @@ let Rosnodejs = { /** create message classes for all the given types */ _useMessages(types, callback) { var Messages = []; - types.forEach(function(type) { + types.forEach(function(type) { + console.log('get message type! ' + type); messages.getMessage(type, function(error, Message) { + console.log('got it!'); Messages.push(Message); if (Messages.length === types.length) { callback(); @@ -162,9 +164,11 @@ let Rosnodejs = { /** create message classes for all the given types */ _useServices(types, callback) { var count = types.length; - types.forEach(function(type) { + types.forEach(function(type) { + console.log('get service type! ' + type); messages.getServiceRequest(type, function() { messages.getServiceResponse(type, function() { + console.log('got it!'); if (--count == 0) { callback(); } diff --git a/lib/ActionClient.js b/lib/ActionClient.js index da617a9..cb20e00 100644 --- a/lib/ActionClient.js +++ b/lib/ActionClient.js @@ -30,37 +30,25 @@ class ActionClient extends EventEmitter { const nh = rosnodejs.nh; // FIXME: support user options for these parameters - this._goalPub = nh.advertise({ - topic: this._actionServer + '/goal', - type: this._actionType + 'Goal', - queueSize: 1 - }); + this._goalPub = nh.advertise(this._actionServer + '/goal', this._actionType + 'Goal', + { queueSize: 1 }); - this._cancelPub = nh.advertise({ - topic: this._actionServer + '/goal', - type: 'actionlib_msgs/GoalID', - queueSize: 1 - }); + this._cancelPub = nh.advertise(this._actionServer + '/cancel', 'actionlib_msgs/GoalID', + { queueSize: 1 }); - this._statusSub = nh.subscribe({ - topic: this._actionServer + '/status', - type: 'actionlib_msgs/GoalStatusArray', - queueSize: 1}, - (msg) => { this._handleStatus(msg); } + this._statusSub = nh.subscribe(this._actionServer + '/status', 'actionlib_msgs/GoalStatusArray', + (msg) => { this._handleStatus(msg); }, + { queueSize: 1 } ); - this._feedbackSub = nh.subscribe({ - topic: this._actionServer + '/feedback', - type: this._actionType + 'Feedback', - queueSize: 1}, - (msg) => { this._handleFeedback(msg); } + this._feedbackSub = nh.subscribe(this._actionServer + '/feedback', this._actionType + 'Feedback', + (msg) => { this._handleFeedback(msg); }, + { queueSize: 1 } ); - this._statusSub = nh.subscribe({ - topic: this._actionServer + '/result', - type: this._actionType + 'Result', - queueSize: 1}, - (msg) => { this._handleResult(msg); } + this._statusSub = nh.subscribe(this._actionServer + '/result', this._actionType + 'Result', + (msg) => { this._handleResult(msg); }, + { queueSize: 1 } ); this._goals = {}; diff --git a/lib/NodeHandle.js b/lib/NodeHandle.js index b8a1261..38a2447 100644 --- a/lib/NodeHandle.js +++ b/lib/NodeHandle.js @@ -18,6 +18,7 @@ 'use strict'; let RosNode = require('./RosNode.js'); +const messageUtils = require('../utils/message_utils.js'); class NodeHandle { constructor(node) { @@ -36,52 +37,96 @@ class NodeHandle { // Pubs, Subs, Services //------------------------------------------------------------------ /** - * Create a ros publisher with the provided options - * @param options {object} - * @param options.topic {string} topic to publish on ('/chatter') - * @param options.type {string} type of message to publish ('std_msgs/String') - * @param options.latching {boolean} latch messages - * @param options.tpcNoDelay {boolean} set TCP no delay option on Socket - * @param options.queueSize {number} number of messages to queue when publishing - * @param options.throttleMs {number} milliseconds to throttle when publishing + * Creates a ros publisher with the provided options + * @param topic {string} + * @param type {string|Object} string representing message type or instance + * @param [options] {object} + * @param [options.latching] {boolean} latch messages + * @param [options.tpcNoDelay] {boolean} set TCP no delay option on Socket + * @param [options.queueSize] {number} number of messages to queue when publishing + * @param [options.throttleMs] {number} milliseconds to throttle when publishing * @return {Publisher} */ - advertise(options) { + advertise(topic, type, options) { + options = options || {}; + options.topic = topic; + if (typeof type === 'string' || type instanceof String) { + options.type = type; + options.typeClass = messageUtils.getHandlerForMsgType(type); + } + else { + options.typeClass = type; + options.type = type.datatype(); + } return this._node.advertise(options); } /** - * Create a ros subscriber with the provided options - * @param options {object} - * @param options.topic {string} topic to publish on ('/chatter') - * @param options.type {string} type of message to publish ('std_msgs/String') - * @param options.queueSize {number} number of messages to queue when subscribing - * @param options.throttleMs {number} milliseconds to throttle when subscribing + * Creates a ros subscriber with the provided options + * @param topic {string} + * @param type {string|Object} string representing message type or instance + * @param callback {function} function to call when message is received + * @param [options] {object} + * @param [options.queueSize] {number} number of messages to queue when subscribing + * @param [options.throttleMs] {number} milliseconds to throttle when subscribing * @return {Subscriber} */ - subscribe(options, callback) { + subscribe(topic, type, callback, options) { + options = options || {}; + options.topic = topic; + if (typeof type === 'string' || type instanceof String) { + options.type = type; + options.typeClass = messageUtils.getHandlerForMsgType(type); + } + else { + options.typeClass = type; + options.type = type.datatype(); + } return this._node.subscribe(options, callback); } /** - * Create a ros Service server with the provided options - * @param options {object} - * @param options.service {string} service to provide e.g ('/add_two_ints') - * @param options.type {string} type of service ('tutorial_msgs/AddTwoInts') + * Creates a ros Service server with the provided options + * @param service {string} + * @param type {string|Object} string representing service type or instance + * @param callback {function} function to call when this service is called + * e.g. + * (request, response) => { + * response.data = !request.data; + * return true; + * } * @return {ServiceServer} */ - advertiseService(options, callback) { + advertiseService(service, type, callback) { + let options = {service: service}; + if (typeof type === 'string' || type instanceof String) { + options.type = type; + options.typeClass = messageUtils.getHandlerForSrvType(type); + } + else { + options.typeClass = type; + // TODO: this is not terribly robust... + options.type = type.Request.datatype().slice('Request'.length); + } return this._node.advertiseService(options, callback); } /** - * Create a ros Service server with the provided options - * @param options {object} - * @param options.service {string} service to provide e.g ('/add_two_ints') - * @param options.type {string} type of service ('tutorial_msgs/AddTwoInts') + * Creates a ros Service client with the provided options + * @param service {string} + * @param type {string|Object} string representing service type or instance * @return {ServiceClient} */ - serviceClient(options) { + serviceClient(service, type) { + let options = {service: service}; + if (typeof type === 'string' || type instanceof String) { + options.type = type; + options.typeClass = messageUtils.getHandlerForSrvType(type); + } + else { + options.typeClass = type; + options.type = type.Request.datatype().slice('Request'.length); + } return this._node.serviceClient(options); } diff --git a/lib/Publisher.js b/lib/Publisher.js index eac2ddb..125aac2 100644 --- a/lib/Publisher.js +++ b/lib/Publisher.js @@ -24,7 +24,6 @@ let DeserializeStream = SerializationUtils.DeserializeStream; let Deserialize = SerializationUtils.Deserialize; let Serialize = SerializationUtils.Serialize; let TcprosUtils = require('../utils/tcpros_utils.js'); -let messageUtils = require('../utils/message_utils.js'); let EventEmitter = require('events'); let log = require('../utils/logger.js'); @@ -80,7 +79,7 @@ class Publisher extends EventEmitter { this._server = null; - this._messageHandler = messageUtils.getHandlerForMsgType(this._type); + this._messageHandler = options.typeClass; // messages published before this publisher // was registered will be held here diff --git a/lib/ServiceClient.js b/lib/ServiceClient.js index b4efa13..c54189a 100644 --- a/lib/ServiceClient.js +++ b/lib/ServiceClient.js @@ -24,7 +24,6 @@ let DeserializeStream = SerializationUtils.DeserializeStream; let Deserialize = SerializationUtils.Deserialize; let Serialize = SerializationUtils.Serialize; let TcprosUtils = require('../utils/tcpros_utils.js'); -let messageUtils = require('../utils/message_utils.js'); let EventEmitter = require('events'); let log = require('../utils/logger.js'); @@ -44,7 +43,7 @@ class ServiceClient extends EventEmitter { this._nodeHandle = nodeHandle; - this._messageHandler = messageUtils.getHandlerForSrvType(this._type); + this._messageHandler = options.typeClass; }; getService() { diff --git a/lib/ServiceServer.js b/lib/ServiceServer.js index 1d7704f..0db5c37 100644 --- a/lib/ServiceServer.js +++ b/lib/ServiceServer.js @@ -24,7 +24,6 @@ let DeserializeStream = SerializationUtils.DeserializeStream; let Deserialize = SerializationUtils.Deserialize; let Serialize = SerializationUtils.Serialize; let TcprosUtils = require('../utils/tcpros_utils.js'); -let messageUtils = require('../utils/message_utils.js'); let EventEmitter = require('events'); let log = require('../utils/logger.js'); @@ -45,7 +44,7 @@ class ServiceServer extends EventEmitter { this._requestCallback = callback; - this._messageHandler = messageUtils.getHandlerForSrvType(this._type); + this._messageHandler = options.typeClass; this._register(); }; diff --git a/lib/Subscriber.js b/lib/Subscriber.js index 15b110a..8e54b7a 100644 --- a/lib/Subscriber.js +++ b/lib/Subscriber.js @@ -24,7 +24,6 @@ let Deserialize = SerializationUtils.Deserialize; let Serialize = SerializationUtils.Serialize; let TcprosUtils = require('../utils/tcpros_utils.js'); let Socket = require('net').Socket; -let messageUtils = require('../utils/message_utils.js'); let EventEmitter = require('events'); let log = require('../utils/logger.js'); @@ -65,7 +64,7 @@ class Subscriber extends EventEmitter { this._nodeHandle = nodeHandle; this._log = log.createLogger({name: 'sub' + this.getTopic()}); - this._messageHandler = messageUtils.getHandlerForMsgType(this._type); + this._messageHandler = options.typeClass; this._ready = false; diff --git a/package.json b/package.json index a9d4bb4..b131872 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,8 @@ "js" ], "scripts": { - "test": "mocha --recursive", + "test": "mocha --recursive --grep gennodejsTest --invert", + "gennodejsTest": "mocha test/gennodejsTest", "flatten": "node flatten.js" }, "author": "chris smith", diff --git a/test/gennodejsTest.js b/test/gennodejsTest.js index 1fdf1af..b55c854 100644 --- a/test/gennodejsTest.js +++ b/test/gennodejsTest.js @@ -4,7 +4,7 @@ const chai = require('chai'); const expect = chai.expect; const msgUtils = require('../utils/message_utils.js'); -describe('genjsTests', () => { +describe('gennodejsTests', () => { msgUtils.findMessageFiles(); msgUtils.loadMessagePackage('std_msgs'); msgUtils.loadMessagePackage('test_msgs'); diff --git a/test/xmlrpcTest.js b/test/xmlrpcTest.js index 127bec7..15917fd 100644 --- a/test/xmlrpcTest.js +++ b/test/xmlrpcTest.js @@ -6,223 +6,344 @@ const rosnodejs = require('../index.js'); const xmlrpc = require('xmlrpc'); const netUtils = require('../utils/network_utils.js'); -describe('XmlrpcTests', () => { +describe('Protocol Test', () => { // NOTE: make sure a roscore is not running (or something else at this address) rosnodejs.require('std_msgs'); rosnodejs.require('std_srvs'); - let masterStub = xmlrpc.createServer({host: 'localhost', port: 11311}); + let masterStub; const nodeName = '/testNode'; - beforeEach(() => { - masterStub.on('getUri', (err, params, callback) => { - const resp = [ 1, '', 'localhost:11311/' ] - callback(null, resp); - }); - - return rosnodejs.initNode(nodeName); + before((done) => { + masterStub = xmlrpc.createServer({host: 'localhost', port: 11311}, () => { done(); }); }); - it('registerSubscriber', (done) => { - const topic = '/test_topic'; - const msgType = 'std_msgs/String'; - masterStub.on('registerSubscriber', (err, params, callback) => { - expect(params.length).to.equal(4); - expect(params[0]).to.equal(nodeName); - expect(params[1]).to.equal(topic); - expect(params[2]).to.equal(msgType); - expect(params[3].startsWith('http://')).to.be.true; - - const info = netUtils.getAddressAndPortFromUri(params[3]); - expect(info.host).to.be.a('string'); - expect(info.host.length).to.not.equal(0); - expect(info.port).to.be.a('string'); - expect(info.port.length).to.not.equal(0); - + after((done) => { + if (masterStub.httpServer.listening) { + masterStub.close(() => { done(); }); + } + else { done(); - }); - - const nh = rosnodejs.nh; - const sub = nh.subscribe({ - topic: topic, - type: msgType, - queueSize: 1, - throttleMs: 1000}, - (data) => {} - ); + } }); - it('unregisterSubscriber', (done) => { - const topic = '/test_topic'; - const msgType = 'std_msgs/String'; - let nodeUri; + describe('Xmlrpc', () => { - masterStub.on('registerSubscriber', (err, params, callback) => { - nodeUri = params[3]; + beforeEach(() => { + masterStub.on('getUri', (err, params, callback) => { + const resp = [ 1, '', 'localhost:11311/' ] + callback(null, resp); + }); - const resp = [ 1, 'registered!', [] ]; - callback(null, resp); + return rosnodejs.initNode(nodeName); }); - masterStub.on('unregisterSubscriber', (err, params, callback) => { - expect(params.length).to.equal(3); - expect(params[0]).to.equal(nodeName); - expect(params[1]).to.equal(topic); - expect(params[2]).to.equal(nodeUri); + afterEach(() => { + const nh = rosnodejs.nh; - done(); + // clear out any service, subs, pubs + nh._node._services = {}; + nh._node._subscribers = {}; + nh._node._publishers = {}; + + // remove any master api handlers we set up + masterStub.removeAllListeners(); }); - const nh = rosnodejs.nh; - const sub = nh.subscribe({ - topic: topic, - type: msgType, - queueSize: 1, - throttleMs: 1000}, - (data) => {} - ); - - sub.on('registered', () => { - nh.unsubscribe(topic); + it('registerSubscriber', (done) => { + const topic = '/test_topic'; + const msgType = 'std_msgs/String'; + masterStub.on('registerSubscriber', (err, params, callback) => { + expect(params.length).to.equal(4); + expect(params[0]).to.equal(nodeName); + expect(params[1]).to.equal(topic); + expect(params[2]).to.equal(msgType); + expect(params[3].startsWith('http://')).to.be.true; + + const info = netUtils.getAddressAndPortFromUri(params[3]); + expect(info.host).to.be.a('string'); + expect(info.host.length).to.not.equal(0); + expect(info.port).to.be.a('string'); + expect(info.port.length).to.not.equal(0); + + done(); + }); + + const nh = rosnodejs.nh; + const sub = nh.subscribe(topic, msgType, + (data) => {}, + { queueSize: 1, throttleMs: 1000 } + ); }); - }); - it('registerPublisher', (done) => { - const topic = '/test_topic'; - const msgType = 'std_msgs/String'; - masterStub.on('registerPublisher', (err, params, callback) => { - expect(params.length).to.equal(4); - expect(params[0]).to.equal(nodeName); - expect(params[1]).to.equal(topic); - expect(params[2]).to.equal(msgType); - expect(params[3].startsWith('http://')).to.be.true; - - const info = netUtils.getAddressAndPortFromUri(params[3]); - expect(info.host).to.be.a('string'); - expect(info.host.length).to.not.equal(0); - expect(info.port).to.be.a('string'); - expect(info.port.length).to.not.equal(0); + it('unregisterSubscriber', (done) => { + const topic = '/test_topic'; + const msgType = 'std_msgs/String'; + let nodeUri; - done(); - }); + masterStub.on('registerSubscriber', (err, params, callback) => { + nodeUri = params[3]; - const nh = rosnodejs.getNodeHandle(); - const sub = nh.advertise({ - topic: topic, - type: msgType, - latching: true, - queueSize: 1, - throttleMs: 1000} - ); - }); + const resp = [ 1, 'registered!', [] ]; + callback(null, resp); + }); - it('unregisterPublisher', (done) => { - const topic = '/test_topic'; - const msgType = 'std_msgs/String'; - let nodeUri; + masterStub.on('unregisterSubscriber', (err, params, callback) => { + expect(params.length).to.equal(3); + expect(params[0]).to.equal(nodeName); + expect(params[1]).to.equal(topic); + expect(params[2]).to.equal(nodeUri); - masterStub.on('registerPublisher', (err, params, callback) => { - nodeUri = params[3]; + done(); + }); - const resp = [ 1, 'registered!', [] ]; - callback(null, resp); + const nh = rosnodejs.nh; + const sub = nh.subscribe(topic, msgType, + (data) => {}, + { queueSize: 1, throttleMs: 1000 } + ); + + sub.on('registered', () => { + nh.unsubscribe(topic); + }); }); - masterStub.on('unregisterPublisher', (err, params, callback) => { - expect(params.length).to.equal(3); - expect(params[0]).to.equal(nodeName); - expect(params[1]).to.equal(topic); - expect(params[2]).to.equal(nodeUri); + it('registerPublisher', (done) => { + const topic = '/test_topic'; + const msgType = 'std_msgs/String'; + masterStub.on('registerPublisher', (err, params, callback) => { + expect(params.length).to.equal(4); + expect(params[0]).to.equal(nodeName); + expect(params[1]).to.equal(topic); + expect(params[2]).to.equal(msgType); + expect(params[3].startsWith('http://')).to.be.true; + + const info = netUtils.getAddressAndPortFromUri(params[3]); + expect(info.host).to.be.a('string'); + expect(info.host.length).to.not.equal(0); + expect(info.port).to.be.a('string'); + expect(info.port.length).to.not.equal(0); + + done(); + }); + + const nh = rosnodejs.getNodeHandle(); + const pub = nh.advertise(topic, msgType, { latching: true, + queueSize: 1, + throttleMs: 1000 } + ); + }); - done(); + it('unregisterPublisher', (done) => { + const topic = '/test_topic'; + const msgType = 'std_msgs/String'; + let nodeUri; + + masterStub.on('registerPublisher', (err, params, callback) => { + nodeUri = params[3]; + + const resp = [ 1, 'registered!', [] ]; + callback(null, resp); + }); + + masterStub.on('unregisterPublisher', (err, params, callback) => { + expect(params.length).to.equal(3); + expect(params[0]).to.equal(nodeName); + expect(params[1]).to.equal(topic); + expect(params[2]).to.equal(nodeUri); + + done(); + }); + + const nh = rosnodejs.nh; + const pub = nh.advertise(topic, msgType, { latching: true, + queueSize: 1, + throttleMs: 1000 } + ); + + pub.on('registered', () => { + nh.unadvertise(topic); + }); }); - const nh = rosnodejs.nh; - const pub = nh.advertise({ - topic: topic, - type: msgType, - latching: true, - queueSize: 1, - throttleMs: 1000} - ); - - pub.on('registered', () => { - nh.unadvertise(topic); + it('registerService', (done) => { + const service = '/test_service'; + const srvType = 'std_srvs/Empty'; + masterStub.on('registerService', (err, params, callback) => { + expect(params.length).to.equal(4); + expect(params[0]).to.equal(nodeName); + expect(params[1]).to.equal(service); + expect(params[2].startsWith('rosrpc://')).to.be.true; + + let info = netUtils.getAddressAndPortFromUri(params[2]); + expect(info.host).to.be.a('string'); + expect(info.host.length).to.not.equal(0); + expect(info.port).to.be.a('string'); + expect(info.port.length).to.not.equal(0); + + expect(params[3].startsWith('http://')).to.be.true; + + info = netUtils.getAddressAndPortFromUri(params[3]); + expect(info.host).to.be.a('string'); + expect(info.host.length).to.not.equal(0); + expect(info.port).to.be.a('string'); + expect(info.port.length).to.not.equal(0); + + done(); + }); + + const nh = rosnodejs.nh; + const serv = nh.advertiseService(service, srvType, (req, resp) => {}); }); - }); - it('registerService', (done) => { - const service = '/test_service'; - const srvType = 'std_srvs/Empty'; - masterStub.on('registerService', (err, params, callback) => { - expect(params.length).to.equal(4); - expect(params[0]).to.equal(nodeName); - expect(params[1]).to.equal(service); - expect(params[2].startsWith('rosrpc://')).to.be.true; - - let info = netUtils.getAddressAndPortFromUri(params[2]); - expect(info.host).to.be.a('string'); - expect(info.host.length).to.not.equal(0); - expect(info.port).to.be.a('string'); - expect(info.port.length).to.not.equal(0); - - expect(params[3].startsWith('http://')).to.be.true; - - info = netUtils.getAddressAndPortFromUri(params[3]); - expect(info.host).to.be.a('string'); - expect(info.host.length).to.not.equal(0); - expect(info.port).to.be.a('string'); - expect(info.port.length).to.not.equal(0); + it('unregisterService', (done) => { + const service = '/test_service'; + const srvType = 'std_srvs/Empty'; + let serviceUri = null; + masterStub.on('registerService', (err, params, callback) => { + serviceUri = params[2]; - done(); - }); + const resp = [1, 'registered!', '']; + callback(null, resp); + }); - const nh = rosnodejs.nh; - const serv = nh.advertiseService({ - service: service, - type: srvType - }, (req, resp) => {}); - }); + masterStub.on('unregisterService', (err, params, callback) => { + expect(params.length).to.equal(3); + expect(params[0]).to.equal(nodeName); + expect(params[1]).to.equal(service); + expect(params[2]).to.equal(serviceUri); - it('unregisterService', (done) => { - const service = '/test_service'; - const srvType = 'std_srvs/Empty'; - let serviceUri = null; - masterStub.on('registerService', (err, params, callback) => { - serviceUri = params[2]; + done(); + }); - const resp = [1, 'registered!', '']; - callback(null, resp); - }); + const nh = rosnodejs.nh; + const serv = nh.advertiseService(service, srvType, (req, resp) => {}); - masterStub.on('unregisterService', (err, params, callback) => { - expect(params.length).to.equal(3); - expect(params[0]).to.equal(nodeName); - expect(params[1]).to.equal(service); - expect(params[2]).to.equal(serviceUri); + serv.on('registered', () => { + nh.unadvertiseService(service); + }); + }); + }); - done(); + describe('Pub-Sub', () => { + const topic = '/test_topic'; + const msgType = 'std_msgs/Int8'; + + beforeEach(() => { + let pubInfo = null; + let subInfo = null; + + masterStub.on('getUri', (err, params, callback) => { + const resp = [ 1, '', 'localhost:11311/' ] + callback(null, resp); + }); + + masterStub.on('registerSubscriber', (err, params, callback) => { + subInfo = params[3]; + //console.log('sub reg ' + params); + //console.log(pubInfo); + + const resp = [1, 'You did it!', []]; + if (pubInfo) { + resp[2].push(pubInfo); + } + callback(null, resp); + }); + + masterStub.on('unregisterSubscriber', (err, params, callback) => { + const resp = [1, 'You did it!', subInfo ? 1 : 0]; + callback(null, resp); + subInfo = null; + }); + + masterStub.on('registerPublisher', (err, params, callback) => { + //console.log('pub reg'); + pubInfo = params[3]; + const resp = [1, 'You did it!', []]; + if (subInfo) { + resp[2].push(pubInfo); + let subAddrParts = subInfo.replace('http://', '').split(':'); + let client = xmlrpc.createClient({host: subAddrParts[0], port: subAddrParts[1]}); + let data = [1, topic, [pubInfo]]; + client.methodCall('publisherUpdate', data, (err, response) => { }); + } + callback(null, resp); + }); + + masterStub.on('unregisterPublisher', (err, params, callback) => { + const resp = [1, 'You did it!', pubInfo ? 1 : 0]; + callback(null, resp); + pubInfo = null; + }); + + return rosnodejs.initNode(nodeName); }); - const nh = rosnodejs.nh; - const serv = nh.advertiseService({ - service: service, - type: srvType - }, (req, resp) => {}); + afterEach(() => { + const nh = rosnodejs.nh; - serv.on('registered', () => { - nh.unadvertiseService(service); + // clear out any service, subs, pubs + nh._node._services = {}; + nh._node._subscribers = {}; + nh._node._publishers = {}; + + // remove any master api handlers we set up + masterStub.removeAllListeners(); }); - }); - afterEach(() => { - const nh = rosnodejs.nh; + it('Basic', (done) => { + const nh = rosnodejs.nh; + const valsToSend = [1,2,3]; + const valsReceived = new Set(valsToSend); + const pub = nh.advertise(topic, msgType, { queueSize: 3 }); + + const sub = nh.subscribe(topic, msgType, (data) => { + valsReceived.delete(data.data); + if (valsReceived.size === 0) { + done(); + } + }, {queueSize: 3}); + + pub.on('connection', () => { + valsToSend.forEach((val) => { + pub.publish({data: val}); + }); + }); + }); + + it('Latch', (done) => { + const nh = rosnodejs.nh; + const pub = nh.advertise(topic, msgType, { latching: true }); - // clear out any service, subs, pubs - nh._node._services = {}; - nh._node._subscribers = {}; - nh._node._publishers = {}; + pub.publish({data: 1}); - // remove any master api handlers we set up - masterStub.removeAllListeners(); + pub.on('registered', () => { + const sub = nh.subscribe(topic, msgType, (data) => { + done(); + }); + }); + }); + + it('Throttle Pub', (done) => { + const nh = rosnodejs.nh; + const valsToSend = [1,2,3,4,5,6,7,8,9,10]; + const pub = nh.advertise(topic, msgType, { queueSize: 1, throttleMs: 100}); + let numMsgsReceived = 0; + + const sub = nh.subscribe(topic, msgType, (data) => { + ++numMsgsReceived; + if (data.data === valsToSend[valsToSend.length -1]) { + expect(numMsgsReceived).to.equal(valsToSend.length/2 + 1); + done(); + } + }, {queueSize: 1}); + + pub.on('connection', () => { + valsToSend.forEach((val, index) => { + setTimeout(() => { + pub.publish({data: val}); + }, 50*index); + }); + }); + }); }); }); diff --git a/utils/message_utils.js b/utils/message_utils.js index b323757..a8a072c 100644 --- a/utils/message_utils.js +++ b/utils/message_utils.js @@ -227,18 +227,18 @@ let MessageUtils = { let type = parts[1]; return messagePackage.srv[type]; } else { - // console.log("get service from registry"); + // console.log("get service %s from registry", rosDataType); let request = messages.getFromRegistry(rosDataType, "request"); let response = messages.getFromRegistry(rosDataType, "response"); if (request && response) { - return { + return { // Request: new request(), // Response: new response() Request: request, Response: response }; } else { - console.error('Unable to find message package ' + msgPackage); + console.error('Unable to find service package %s: %j %j', msgPackage, request, response); throw new Error(); } } diff --git a/utils/messages.js b/utils/messages.js index 0390c1d..a91a1dd 100644 --- a/utils/messages.js +++ b/utils/messages.js @@ -243,7 +243,7 @@ function extractFields(content, details, type, callback) { } else { // response lines = lines.slice(divider+1); - } + } } async.forEachSeries(lines, parseLine, function(error) { @@ -338,7 +338,7 @@ function buildMessageClass(details) { }; Message.constants = Message.prototype.constants = details.constants; Message.fields = Message.prototype.fields = details.fields; - Message.serialize = Message.prototype.serialize = + Message.serialize = Message.prototype.serialize = function(obj, bufferInfo) { return serializeMessage(obj, bufferInfo); } @@ -358,6 +358,7 @@ function getMessageFromRegistry(messageType, type) { } function setMessageInRegistry(messageType, type, message) { + console.log('add %s to registry', messageType); registry[messageType + "-" + type] = message; } @@ -375,7 +376,7 @@ var isNormalizedMessageType = /.*\/.*$/; function normalizeMessageType(messageType, packageName) { var normalizedMessageType = messageType; if (messageType == "Header") { - normalizedMessageType = getMessageType("std_msgs", messageType); + normalizedMessageType = getMessageType("std_msgs", messageType); } else if (messageType.match(isNormalizedMessageType) === null) { normalizedMessageType = getMessageType(packageName, messageType); } @@ -406,7 +407,7 @@ function serializeMessage(message, bufferInfo) { function serializeInnerMessage(message, buffer, bufferOffset) { message.fields.forEach(function(field) { var fieldValue = message[field.name]; - + if (fieldsUtil.isPrimitive(field.type)) { fieldsUtil.serializePrimitive( field.type, fieldValue, buffer, bufferOffset); diff --git a/utils/xmlrpc_utils.js b/utils/xmlrpc_utils.js index 27aca20..93f3d2b 100644 --- a/utils/xmlrpc_utils.js +++ b/utils/xmlrpc_utils.js @@ -22,8 +22,8 @@ module.exports = { this.call(method, data, resolve, reject, timeout); } else if (err || resp[0] !== 1) { - log.warn('Some other error during %s: %s, %j', method, err, resp); - log.warn((new Error()).stack); + log.debug('Some other error during %s: %s, %j', method, err, resp); + log.debug((new Error()).stack); reject(err, resp); } else {