diff --git a/package.json b/package.json index 2c8719c618..f64d6b18a8 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,7 @@ "aegir": "^9.3.0", "buffer-loader": "0.0.1", "chai": "^3.5.0", + "delay": "^1.3.1", "detect-node": "^2.0.3", "eslint-plugin-react": "^6.9.0", "execa": "^0.6.0", @@ -68,10 +69,10 @@ "form-data": "^2.1.2", "fs-pull-blob-store": "^0.4.1", "gulp": "^3.9.1", - "interface-ipfs-core": "^0.23.0", + "interface-ipfs-core": "^0.23.3", "ipfsd-ctl": "^0.18.1", "left-pad": "^1.1.3", - "lodash": "^4.17.2", + "lodash": "^4.17.4", "mocha": "^3.2.0", "ncp": "^2.0.0", "nexpect": "^0.5.0", @@ -85,11 +86,12 @@ "async": "^2.1.4", "bl": "^1.2.0", "boom": "^4.2.0", - "debug": "^2.5.1", + "debug": "^2.6.0", "fs-pull-blob-store": "^0.3.0", "glob": "^7.1.1", "hapi": "^16.1.0", "hapi-set-header": "^1.0.2", + "hoek": "^4.1.0", "idb-pull-blob-store": "^0.5.1", "ipfs-api": "^12.1.4", "ipfs-bitswap": "^0.9.0", @@ -101,9 +103,10 @@ "ipfs-unixfs-engine": "^0.15.0", "ipld-resolver": "^0.4.1", "isstream": "^0.1.2", - "joi": "^10.0.6", - "libp2p-ipfs-nodejs": "^0.17.1", - "libp2p-ipfs-browser": "^0.17.3", + "libp2p-floodsub": "0.7.1", + "joi": "^10.1.0", + "libp2p-ipfs-nodejs": "^0.17.3", + "libp2p-ipfs-browser": "^0.17.4", "lodash.flatmap": "^4.5.0", "lodash.get": "^4.4.2", "lodash.has": "^4.5.2", @@ -132,7 +135,7 @@ "temp": "^0.8.3", "through2": "^2.0.3", "update-notifier": "^1.0.3", - "yargs": "^6.5.0" + "yargs": "^6.6.0" }, "contributors": [ "Andrew de Andrade ", diff --git a/src/cli/commands/pubsub.js b/src/cli/commands/pubsub.js new file mode 100644 index 0000000000..dac25c7690 --- /dev/null +++ b/src/cli/commands/pubsub.js @@ -0,0 +1,14 @@ +'use strict' + +module.exports = { + command: 'pubsub', + + description: 'pubsub commands', + + builder (yargs) { + return yargs + .commandDir('pubsub') + }, + + handler (argv) {} +} diff --git a/src/cli/commands/pubsub/ls.js b/src/cli/commands/pubsub/ls.js new file mode 100644 index 0000000000..82ca7cbb54 --- /dev/null +++ b/src/cli/commands/pubsub/ls.js @@ -0,0 +1,32 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'ls', + + describe: 'Get your list of subscriptions', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + ipfs.pubsub.ls((err, subscriptions) => { + if (err) { + throw err + } + + subscriptions.forEach((sub) => { + console.log(sub) + }) + }) + }) + } +} diff --git a/src/cli/commands/pubsub/peers.js b/src/cli/commands/pubsub/peers.js new file mode 100644 index 0000000000..4519677b64 --- /dev/null +++ b/src/cli/commands/pubsub/peers.js @@ -0,0 +1,32 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'peers ', + + describe: 'Get all peers subscribed to a topic', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + ipfs.pubsub.peers(argv.topic, (err, peers) => { + if (err) { + throw err + } + + peers.forEach((peer) => { + console.log(peer) + }) + }) + }) + } +} diff --git a/src/cli/commands/pubsub/pub.js b/src/cli/commands/pubsub/pub.js new file mode 100644 index 0000000000..ef303d1c04 --- /dev/null +++ b/src/cli/commands/pubsub/pub.js @@ -0,0 +1,30 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'pub ', + + describe: 'Publish data to a topic', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + const data = new Buffer(String(argv.data)) + + ipfs.pubsub.publish(argv.topic, data, (err) => { + if (err) { + throw err + } + }) + }) + } +} diff --git a/src/cli/commands/pubsub/sub.js b/src/cli/commands/pubsub/sub.js new file mode 100644 index 0000000000..f88aeed727 --- /dev/null +++ b/src/cli/commands/pubsub/sub.js @@ -0,0 +1,32 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'sub ', + + describe: 'Subscribe to a topic', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + const handler = (msg) => { + console.log(msg.data.toString()) + } + + ipfs.pubsub.subscribe(argv.topic, handler, (err) => { + if (err) { + throw err + } + }) + }) + } +} diff --git a/src/core/components/go-offline.js b/src/core/components/go-offline.js index 0c5d084c0e..f0786702da 100644 --- a/src/core/components/go-offline.js +++ b/src/core/components/go-offline.js @@ -1,9 +1,14 @@ 'use strict' -module.exports = function goOffline (self) { - return (cb) => { +module.exports = (self) => { + return (callback) => { self._blockService.goOffline() self._bitswap.stop() - self.libp2p.stop(cb) + self._pubsub.stop((err) => { + if (err) { + return callback(err) + } + self.libp2p.stop(callback) + }) } } diff --git a/src/core/components/go-online.js b/src/core/components/go-online.js index 28894f7e76..d710478899 100644 --- a/src/core/components/go-online.js +++ b/src/core/components/go-online.js @@ -2,15 +2,16 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') +const FloodSub = require('libp2p-floodsub') -module.exports = function goOnline (self) { - return (cb) => { +module.exports = (self) => { + return (callback) => { series([ - self.load, - self.libp2p.start + (cb) => self.load(cb), + (cb) => self.libp2p.start(cb) ], (err) => { if (err) { - return cb(err) + return callback(err) } self._bitswap = new Bitswap( @@ -18,9 +19,22 @@ module.exports = function goOnline (self) { self._repo.blockstore, self._libp2pNode.peerBook ) - self._bitswap.start() - self._blockService.goOnline(self._bitswap) - cb() + + self._pubsub = new FloodSub(self._libp2pNode) + + series([ + (cb) => { + self._bitswap.start() + cb() + }, + (cb) => { + self._blockService.goOnline(self._bitswap) + cb() + }, + (cb) => self._pubsub.start(cb) // , + // For all of the protocols to handshake with each other + // (cb) => setTimeout(cb, 1000) // Still not decided if we want this + ], callback) }) } } diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js new file mode 100644 index 0000000000..b0a1eb283d --- /dev/null +++ b/src/core/components/pubsub.js @@ -0,0 +1,97 @@ +'use strict' + +const promisify = require('promisify-es6') +const setImmediate = require('async/setImmediate') + +const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR + +module.exports = function pubsub (self) { + return { + subscribe: (topic, options, handler, callback) => { + if (!self.isOnline()) { + throw OFFLINE_ERROR + } + + if (typeof options === 'function') { + callback = handler + handler = options + options = {} + } + + if (!callback) { + return new Promise((resolve, reject) => { + subscribe(topic, options, handler, (err) => { + if (err) { + return reject(err) + } + resolve() + }) + }) + } + + subscribe(topic, options, handler, callback) + }, + + unsubscribe: (topic, handler) => { + const ps = self._pubsub + + ps.removeListener(topic, handler) + + if (ps.listenerCount(topic) === 0) { + ps.unsubscribe(topic) + } + }, + + publish: promisify((topic, data, callback) => { + if (!self.isOnline()) { + return setImmediate(() => callback(OFFLINE_ERROR)) + } + + if (!Buffer.isBuffer(data)) { + return setImmediate(() => callback(new Error('data must be a Buffer'))) + } + + self._pubsub.publish(topic, data) + setImmediate(() => callback()) + }), + + ls: promisify((callback) => { + if (!self.isOnline()) { + return setImmediate(() => callback(OFFLINE_ERROR)) + } + + const subscriptions = Array.from( + self._pubsub.subscriptions + ) + + setImmediate(() => callback(null, subscriptions)) + }), + + peers: promisify((topic, callback) => { + if (!self.isOnline()) { + return setImmediate(() => callback(OFFLINE_ERROR)) + } + + const peers = Array.from(self._pubsub.peers.values()) + .filter((peer) => peer.topics.has(topic)) + .map((peer) => peer.info.id.toB58String()) + + setImmediate(() => callback(null, peers)) + }), + + setMaxListeners (n) { + return self._pubsub.setMaxListeners(n) + } + } + + function subscribe (topic, options, handler, callback) { + const ps = self._pubsub + + if (ps.listenerCount(topic) === 0) { + ps.subscribe(topic) + } + + ps.on(topic, handler) + setImmediate(() => callback()) + } +} diff --git a/src/core/index.js b/src/core/index.js index a58836c10c..fa4c2e8ae2 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -23,6 +23,7 @@ const swarm = require('./components/swarm') const ping = require('./components/ping') const files = require('./components/files') const bitswap = require('./components/bitswap') +const pubsub = require('./components/pubsub') exports = module.exports = IPFS @@ -44,6 +45,7 @@ function IPFS (repoInstance) { this._bitswap = null this._blockService = new BlockService(this._repo) this._ipldResolver = new IPLDResolver(this._blockService) + this._pubsub = null // IPFS Core exposed components @@ -67,4 +69,5 @@ function IPFS (repoInstance) { this.files = files(this) this.bitswap = bitswap(this) this.ping = ping(this) + this.pubsub = pubsub(this) } diff --git a/src/http-api/error-handler.js b/src/http-api/error-handler.js new file mode 100644 index 0000000000..82062a492b --- /dev/null +++ b/src/http-api/error-handler.js @@ -0,0 +1,53 @@ +'use strict' + +const Hoek = require('hoek') + +module.exports = (server) => { + server.ext('onRequest', (request, reply) => { + request.handleError = handleError + reply.continue() + }) + + server.ext('onPreResponse', (request, reply) => { + const res = request.response + const req = request.raw.req + + let statusCode = 200 + let msg = 'Sorry, something went wrong, please retrace your steps.' + + if (res.isBoom) { + statusCode = res.output.payload.statusCode + + if (res.message && res.isDeveloperError) { + msg = res.message.replace('Uncaught error: ', '') + } + + const debug = { + method: req.method, + url: request.url.path, + headers: request.raw.req.headers, + info: request.info, + payload: request.payload, + response: res.output.payload + } + + server.log('error', debug) + + reply({ + Message: msg, + Code: 1 + }).code(statusCode) + return + } + + reply.continue() + }) +} + +function handleError (error, errorMessage) { + if (errorMessage) { + return Hoek.assert(!error, errorMessage) + } + + return Hoek.assert(!error, error) +} diff --git a/src/http-api/index.js b/src/http-api/index.js index 9cd866a5cb..21a6bc5e44 100644 --- a/src/http-api/index.js +++ b/src/http-api/index.js @@ -14,6 +14,7 @@ const log = debug('api') log.error = debug('api:error') const IPFS = require('../core') +const errorHandler = require('./error-handler') exports = module.exports = function HttpApi (repo) { this.ipfs = null @@ -80,6 +81,9 @@ exports = module.exports = function HttpApi (repo) { labels: 'Gateway' }) + // Nicer errors + errorHandler(this.server) + // load routes require('./routes')(this.server) diff --git a/src/http-api/resources/bitswap.js b/src/http-api/resources/bitswap.js index 78ad4649dc..ed749ea004 100644 --- a/src/http-api/resources/bitswap.js +++ b/src/http-api/resources/bitswap.js @@ -42,6 +42,6 @@ exports.unwant = { parseArgs: parseKey, handler: (request, reply) => { - reply(boom.badRequrest(new Error('Not implemented yet'))) + reply(boom.badRequest(new Error('Not implemented yet'))) } } diff --git a/src/http-api/resources/index.js b/src/http-api/resources/index.js index b90a9d912c..671a349952 100644 --- a/src/http-api/resources/index.js +++ b/src/http-api/resources/index.js @@ -10,3 +10,4 @@ exports.block = require('./block') exports.swarm = require('./swarm') exports.bitswap = require('./bitswap') exports.files = require('./files') +exports.pubsub = require('./pubsub') diff --git a/src/http-api/resources/pubsub.js b/src/http-api/resources/pubsub.js new file mode 100644 index 0000000000..39ece9935c --- /dev/null +++ b/src/http-api/resources/pubsub.js @@ -0,0 +1,112 @@ +'use strict' + +const PassThrough = require('stream').PassThrough + +exports = module.exports + +exports.subscribe = { + handler: (request, reply) => { + const query = request.query + const discover = query.discover === 'true' + const topic = query.arg + + if (!topic) { + return reply(new Error('Missing topic')) + } + + const ipfs = request.server.app.ipfs + + const res = new PassThrough({highWaterMark: 1}) + + const handler = (msg) => { + res.write(JSON.stringify({ + from: msg.from, + data: msg.data.toString('base64'), + seqno: msg.seqno.toString('base64'), + topicCIDs: msg.topicCIDs + }) + '\n', 'utf8') + } + + // js-ipfs-api needs a reply, and go-ipfs does the same thing + res.write('{}\n') + + const unsubscribe = () => { + ipfs.pubsub.unsubscribe(topic, handler) + res.end() + } + + request.once('disconnect', unsubscribe) + request.once('finish', unsubscribe) + + ipfs.pubsub.subscribe(topic, { + discover: discover + }, handler, (err) => { + if (err) { + return reply(err) + } + + reply(res) + .header('X-Chunked-Output', '1') + .header('content-type', 'application/json') + }) + } +} + +exports.publish = { + handler: (request, reply) => { + const arg = request.query.arg + const topic = arg[0] + const buf = arg[1] + + const ipfs = request.server.app.ipfs + + if (!topic) { + return reply(new Error('Missing topic')) + } + + if (!buf) { + return reply(new Error('Missing buf')) + } + + ipfs.pubsub.publish(topic, new Buffer(String(buf)), (err) => { + if (err) { + return reply(new Error(`Failed to publish to topic ${topic}: ${err}`)) + } + + reply() + }) + } +} + +exports.ls = { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.pubsub.ls((err, subscriptions) => { + if (err) { + return reply(new Error(`Failed to list subscriptions: ${err}`)) + } + + reply({Strings: subscriptions}) + }) + } +} + +exports.peers = { + handler: (request, reply) => { + const topic = request.query.arg + const ipfs = request.server.app.ipfs + + if (!topic) { + return reply(new Error('Missing topic')) + } + + ipfs.pubsub.peers(topic, (err, peers) => { + if (err) { + return reply(new Error(`Failed to find peers subscribed to ${topic}: ${err}`)) + } + + reply({Strings: peers}) + }) + } +} diff --git a/src/http-api/routes/index.js b/src/http-api/routes/index.js index 587f25de77..7b0afa885d 100644 --- a/src/http-api/routes/index.js +++ b/src/http-api/routes/index.js @@ -11,4 +11,5 @@ module.exports = (server) => { require('./swarm')(server) require('./bitswap')(server) require('./files')(server) + require('./pubsub')(server) } diff --git a/src/http-api/routes/pubsub.js b/src/http-api/routes/pubsub.js new file mode 100644 index 0000000000..6f55b52b65 --- /dev/null +++ b/src/http-api/routes/pubsub.js @@ -0,0 +1,39 @@ +'use strict' + +const resources = require('./../resources') + +module.exports = (server) => { + const api = server.select('API') + + api.route({ + method: '*', + path: '/api/v0/pubsub/sub', + config: { + handler: resources.pubsub.subscribe.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/pubsub/pub', + config: { + handler: resources.pubsub.publish.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/pubsub/ls', + config: { + handler: resources.pubsub.ls.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/pubsub/peers', + config: { + handler: resources.pubsub.peers.handler + } + }) +} diff --git a/test/cli/test-commands.js b/test/cli/test-commands.js index da484580d1..4399d6c520 100644 --- a/test/cli/test-commands.js +++ b/test/cli/test-commands.js @@ -7,11 +7,13 @@ const ipfsBase = require('../utils/ipfs-exec') const ipfs = ipfsBase(repoPath) const describeOnlineAndOffline = require('../utils/on-and-off') +const commandCount = 61 + describe('commands', () => { describeOnlineAndOffline(repoPath, () => { it('list the commands', () => { return ipfs('commands').then((out) => { - expect(out.split('\n')).to.have.length(56) + expect(out.split('\n')).to.have.length(commandCount) }) }) }) @@ -20,7 +22,7 @@ describe('commands', () => { return ipfsBase(repoPath, { cwd: '/tmp' })('commands').then((out) => { - expect(out.split('\n').length).to.equal(56) + expect(out.split('\n').length).to.equal(commandCount) }) }) }) diff --git a/test/cli/test-pubsub.js b/test/cli/test-pubsub.js new file mode 100644 index 0000000000..3fc60968a1 --- /dev/null +++ b/test/cli/test-pubsub.js @@ -0,0 +1,119 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const delay = require('delay') +const waterfall = require('async/waterfall') +const HttpAPI = require('../../src/http-api') +const createTempNode = require('../utils/temp-node') +const repoPath = require('./index').repoPath +const ipfs = require('../utils/ipfs-exec')(repoPath) + +describe.skip('pubsub', () => { + const topicA = 'nonscentsA' + const topicB = 'nonscentsB' + const topicC = 'nonscentsC' + let node + let id + + before((done) => { + createTempNode(1, (err, _node) => { + expect(err).to.not.exist + node = _node + node.goOnline(done) + }) + }) + + after((done) => { + node.goOffline(done) + }) + + describe('api running', () => { + let httpAPI + + before((done) => { + httpAPI = new HttpAPI(repoPath) + + waterfall([ + (cb) => httpAPI.start(cb), + (cb) => node.id(cb), + (_id, cb) => { + id = _id + ipfs(`swarm connect ${id.addresses[0]}`) + .then(() => cb()) + .catch(cb) + } + ], done) + }) + + after((done) => { + httpAPI.stop(done) + }) + + it('subscribe and publish', () => { + const sub = ipfs(`pubsub sub ${topicA}`) + + sub.stdout.on('data', (c) => { + expect(c.toString()).to.be.eql('world\n') + sub.kill() + }) + + return Promise.all([ + sub.catch(ignoreKill), + delay(200) + .then(() => ipfs(`pubsub pub ${topicA} world`)) + .then((out) => { + expect(out).to.be.eql('') + }) + ]) + }) + + it('ls', () => { + const sub = ipfs(`pubsub sub ${topicB}`) + + sub.stdout.once('data', (data) => { + expect(data.toString()).to.be.eql('world\n') + ipfs('pubsub ls') + .then((out) => { + expect(out).to.be.eql(topicB) + sub.kill() + }) + }) + + return Promise.all([ + sub.catch(ignoreKill), + delay(200) + .then(() => ipfs(`pubsub pub ${topicB} world`)) + ]) + }) + + it('peers', () => { + const handler = (msg) => { + expect(msg.data.toString()).to.be.eql('world') + ipfs(`pubsub peers ${topicC}`) + .then((out) => { + expect(out).to.be.eql(id.id) + sub2.kill() + node.pubsub.unsubscribe(topicC, handler) + }) + } + + const sub1 = node.pubsub.subscribe(topicC, handler) + const sub2 = ipfs(`pubsub sub ${topicC}`) + + return Promise.all([ + sub1, + sub2.catch(ignoreKill), + delay(200) + .then(() => ipfs(`pubsub pub ${topicC} world`)) + ]) + }) + }) +}) + +function ignoreKill (err) { + if (!err.killed) { + throw err + } +} diff --git a/test/core/node-only/test-swarm-2.js b/test/core/node-only/test-pubsub.js similarity index 93% rename from test/core/node-only/test-swarm-2.js rename to test/core/node-only/test-pubsub.js index bf81c71e60..bdc83ad572 100644 --- a/test/core/node-only/test-swarm-2.js +++ b/test/core/node-only/test-pubsub.js @@ -1,5 +1,4 @@ /* eslint-env mocha */ - 'use strict' const test = require('interface-ipfs-core') @@ -17,4 +16,4 @@ const common = { } } -test.swarm(common) +test.pubsub(common) diff --git a/test/core/node-only/test-swarm.js b/test/core/node-only/test-swarm.js index 5c0224187e..bf81c71e60 100644 --- a/test/core/node-only/test-swarm.js +++ b/test/core/node-only/test-swarm.js @@ -1,131 +1,20 @@ /* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const parallel = require('async/parallel') - -const createTempNode = require('../../utils/temp-node') - -describe('swarm', function () { - this.timeout(40 * 1000) - - let nodeA - let nodeB - - // let nodeAMultiaddr - let nodeBMultiaddr - it('create 2 temporary nodes', (done) => { - parallel([ - (cb) => { - createTempNode(2, (err, tmpNode) => { - expect(err).to.not.exist - nodeA = tmpNode - cb() - }) - }, - (cb) => { - createTempNode(3, (err, tmpNode) => { - expect(err).to.not.exist - nodeB = tmpNode - cb() - }) - } - ], done) - }) - - it('get each peer addr', (done) => { - parallel([ - (cb) => { - nodeA.id((err, res) => { - expect(err).to.not.exist - // nodeAMultiaddr = `${res.addresses[0]}/ipfs/${res.id}` - cb() - }) - }, - (cb) => { - nodeB.id((err, res) => { - expect(err).to.not.exist - nodeBMultiaddr = res.addresses[0] - cb() - }) - } - ], done) - }) - - it('start 2 nodes', (done) => { - parallel([ - nodeA.goOnline, - nodeB.goOnline - ], done) - }) - - it('libp2p.swarm.connect nodeA to nodeB', (done) => { - nodeA.swarm.connect(nodeBMultiaddr, (err) => { - expect(err).to.not.exist - // So that identify has time to execute - setTimeout(done, 500) - }) - }) - - it('libp2p.swarm.peers on nodeA and nodeB match each other', (done) => { - parallel([ - (cb) => { - nodeA.swarm.peers((err, res) => { - expect(err).to.not.exist - expect(Object.keys(res)).to.have.length.above(0) - cb() - }) - }, - (cb) => { - nodeB.swarm.peers((err, res) => { - expect(err).to.not.exist - expect(Object.keys(res)).to.have.length.above(0) - cb() - }) - } - ], done) - }) +'use strict' - it('libp2p.swarm.localAddrs', (done) => { - nodeB.swarm.localAddrs((err, res) => { - expect(err).to.not.exist - expect(res.length > 1).to.equal(true) - done() - }) - }) +const test = require('interface-ipfs-core') +const IPFSFactory = require('../../utils/factory-core') - it('libp2p.swarm.disconnect nodeB from nodeA', (done) => { - nodeA.swarm.disconnect(nodeBMultiaddr, (err) => { - expect(err).to.not.exist - // So that identify has time to execute - setTimeout(check, 500) +let factory - function check () { - parallel([ - (cb) => { - nodeA.swarm.peers((err, res) => { - expect(err).to.not.exist - expect(Object.keys(res)).to.have.length(0) - cb() - }) - }, - (cb) => { - nodeB.swarm.peers((err, res) => { - expect(err).to.not.exist - expect(Object.keys(res)).to.have.length(0) - cb() - }) - } - ], done) - } - }) - }) +const common = { + setup: function (cb) { + factory = new IPFSFactory() + cb(null, factory) + }, + teardown: function (cb) { + factory.dismantle(cb) + } +} - it('stop', (done) => { - parallel([ - nodeA.goOffline, - nodeB.goOffline - ], done) - }) -}) +test.swarm(common) diff --git a/test/http-api/inject/test-pubsub.js b/test/http-api/inject/test-pubsub.js new file mode 100644 index 0000000000..29976046ad --- /dev/null +++ b/test/http-api/inject/test-pubsub.js @@ -0,0 +1,142 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const createTempNode = require('./../../utils/temp-node') + +module.exports = (http) => { + describe('/pubsub', () => { + let api + let tmpNode + + const buf = new Buffer('some message') + const topic = 'nonScents' + const topicNotSubscribed = 'somethingRandom' + + before((done) => { + api = http.api.server.select('API') + + createTempNode(47, (err, _ipfs) => { + expect(err).to.not.exist + tmpNode = _ipfs + tmpNode.goOnline((err) => { + expect(err).to.not.exist + done() + }) + }) + }) + + after((done) => { + setTimeout(() => { + tmpNode.goOffline(done) + }, 1000) + }) + + describe('/sub', () => { + it('returns 500 if no topic is provided', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/sub` + }, (res) => { + expect(res.statusCode).to.equal(500) + expect(res.result.Code).to.be.eql(1) + done() + }) + }) + + it('returns 200 with topic', (done) => { + // TODO: Agree on a better way to test this (currently this hangs) + // Regarding: https://github.com/ipfs/js-ipfs/pull/644#issuecomment-267687194 + // Current Patch: Subscribe to a topic so the other tests run as expected + const ipfs = api.app.ipfs + const handler = (msg) => {} + ipfs.pubsub.subscribe(topic, handler, () => { + setTimeout(() => { + ipfs.pubsub.unsubscribe(topic, handler) + done() + }, 100) + }) + // api.inject({ + // method: 'GET', + // url: `/api/v0/pubsub/sub/${topic}` + // }, (res) => { + // console.log(res.result) + // expect(res.statusCode).to.equal(200) + // done() + // }) + }) + }) + + describe('/pub', () => { + it('returns 500 if no buffer is provided', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pubsub/pub?arg=${topic}&arg=` + }, (res) => { + expect(res.statusCode).to.equal(500) + expect(res.result.Code).to.be.eql(1) + done() + }) + }) + + it('returns 200 with topic and buffer', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pubsub/pub?arg=${topic}&arg=${buf}` + }, (res) => { + expect(res.statusCode).to.equal(200) + done() + }) + }) + }) + + describe.skip('/ls', () => { + it('returns 200', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/ls` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result.Strings).to.be.eql([topic]) + done() + }) + }) + }) + + describe('/peers', () => { + it('returns 500 if no topic is provided', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/peers` + }, (res) => { + expect(res.statusCode).to.equal(500) + expect(res.result.Code).to.be.eql(1) + done() + }) + }) + + it('returns 200 if not subscribed to a topic', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/peers?arg=${topicNotSubscribed}` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result.Strings).to.be.eql([]) + done() + }) + }) + + it('returns 200 with topic', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/peers?arg=${topic}` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result.Strings).to.be.eql([]) + done() + }) + }) + }) + }) +} diff --git a/test/http-api/interface-ipfs-core-over-ipfs-api/test-pubsub.js b/test/http-api/interface-ipfs-core-over-ipfs-api/test-pubsub.js new file mode 100644 index 0000000000..721c09607f --- /dev/null +++ b/test/http-api/interface-ipfs-core-over-ipfs-api/test-pubsub.js @@ -0,0 +1,24 @@ +/* eslint-env mocha */ + +'use strict' + +/* +const test = require('interface-ipfs-core') +const FactoryClient = require('./../../utils/factory-http') + +let fc + +const common = { + setup: function (callback) { + fc = new FactoryClient() + callback(null, fc) + }, + teardown: function (callback) { + fc.dismantle(callback) + } +} +*/ + +// TODO +// needs: https://github.com/ipfs/js-ipfs-api/pull/493 +// test.pubsub(common) diff --git a/test/utils/ipfs-exec.js b/test/utils/ipfs-exec.js index 0525d0c7fa..d761502e61 100644 --- a/test/utils/ipfs-exec.js +++ b/test/utils/ipfs-exec.js @@ -20,7 +20,7 @@ module.exports = (repoPath, opts) => { env.IPFS_PATH = repoPath const config = Object.assign({}, { - stipEof: true, + stripEof: true, env: env, timeout: 60 * 1000 }, opts) @@ -33,13 +33,19 @@ module.exports = (repoPath, opts) => { args = args[0].split(' ') } - return exec(args).then((res) => { + const cp = exec(args) + const res = cp.then((res) => { // We can't escape the os.tmpDir warning due to: // https://github.com/shelljs/shelljs/blob/master/src/tempdir.js#L43 // expect(res.stderr).to.be.eql('') - return res.stdout }) + + res.kill = cp.kill.bind(cp) + res.stdout = cp.stdout + res.stderr = cp.stderr + + return res } ipfs.fail = function ipfsFail () {