From 7d7e4c16413d070fd683ceada6c514676b9fc91e Mon Sep 17 00:00:00 2001 From: haad Date: Sun, 11 Sep 2016 20:32:03 +0200 Subject: [PATCH] feat(pubsub): Add publish/subscribe This commit adds publish/subscribe methods to js-ipfs-api. These changes requires special version of go-ipfs currently, with pub/sub activated. --- README.md | 27 ++++++++ package.json | 6 +- src/api/pubsub.js | 106 ++++++++++++++++++++++++++++ src/index.js | 2 +- src/load-commands.js | 5 +- test/ipfs-api/pubsub.spec.js | 130 +++++++++++++++++++++++++++++++++++ 6 files changed, 271 insertions(+), 5 deletions(-) create mode 100644 src/api/pubsub.js create mode 100644 test/ipfs-api/pubsub.spec.js diff --git a/README.md b/README.md index cba5bde23..20815fcc6 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,33 @@ ipfs.id() This relies on a global `Promise` object. If you are in an environment where that is not yet available you need to bring your own polyfill. +### Publish/Subscribe (experimental) + +js-ipfs-api supports the up-and-coming feature publish/subscribe. This requires you +to use a development build of go-ipfs from the `feat/floodsub` branch (issue for tracking here: https://github.com/ipfs/go-ipfs/pull/3202). + +Usage: + +```js +const subscription = ipfsApi.pubsub.subscribe('my-topic') +subscription.on('data', (msg) => { + console.log('message', msg.data) + // => 'Hello there!' +}) +setTimeout(() => { + // Stop subscription after 10 seconds + subscription.cancel() +}, 1000 * 10) + +ipfsApi.pubsub.publish('my-topic', 'Hello there!', (err, successful) => { + if (err) { + console.log('Something went wrong publishing a message') + throw err + } + // successful = true/false +}) +``` + ## Development ### Testing diff --git a/package.json b/package.json index 7d0535aaa..160b63328 100644 --- a/package.json +++ b/package.json @@ -49,10 +49,12 @@ "devDependencies": { "aegir": "^8.0.0", "chai": "^3.5.0", + "go-ipfs-dep": "https://github.com/haadcode/go-ipfs-dep.git#0a5229816b3a41f17876361314a90c1d4dec79b3", "gulp": "^3.9.1", "hapi": "^15.0.2", "interface-ipfs-core": "^0.15.0", - "ipfsd-ctl": "^0.14.0", + "ipfsd-ctl": "https://github.com/haadcode/js-ipfsd-ctl.git#257712108919a05c625b02399b82d873675bc559", + "js-base64": "^2.1.9", "pre-commit": "^1.1.3", "socket.io": "^1.4.8", "socket.io-client": "^1.4.8", @@ -103,4 +105,4 @@ "url": "https://github.com/ipfs/js-ipfs-api/issues" }, "homepage": "https://github.com/ipfs/js-ipfs-api" -} \ No newline at end of file +} diff --git a/src/api/pubsub.js b/src/api/pubsub.js new file mode 100644 index 000000000..582a9dd8b --- /dev/null +++ b/src/api/pubsub.js @@ -0,0 +1,106 @@ +'use strict' + +const promisify = require('promisify-es6') +const bs58 = require('bs58') +const Base64 = require('js-base64').Base64 +const Stream = require('stream') +const Readable = Stream.Readable +const http = require('http') + +let activeSubscriptions = [] + +const subscriptionExists = (subscriptions, topic) => { + return subscriptions.indexOf(topic) !== -1 +} +const removeSubscription = (subscriptions, topic) => { + const indexToRemove = subscriptions.indexOf(topic) + return subscriptions.filter((el, index) => { + return index !== indexToRemove + }) +} +const addSubscription = (subscriptions, topic) => { + return subscriptions.concat([topic]) +} +const parseMessage = (message) => { + return Object.assign({}, message, { + from: bs58.encode(message.from), + data: Base64.decode(message.data), + seqno: Base64.decode(message.seqno) + }) +} + +module.exports = (send, config) => { + return { + subscribe: (topic, options) => { + if (!options) { + options = {} + } + + var rs = new Readable({objectMode: true}) + rs._read = () => {} + + if (!subscriptionExists(activeSubscriptions, topic)) { + activeSubscriptions = addSubscription(activeSubscriptions, topic) + } else { + throw new Error('Already subscribed to ' + topic) + } + + let url = '/api/v0/pubsub/sub/' + topic + if (options.discover) { + url = url + '?discover=true' + } + // we're using http.get here to have more control over the request + // and avoid refactoring of the request-api where wreck is gonna be + // replaced by fetch (https://github.com/ipfs/js-ipfs-api/pull/355) + const request = http.get({ + host: config.host, + port: config.port, + path: url + }, (response) => { + response.on('data', function (d) { + let data + try { + data = JSON.parse(d) + } catch (err) { + return rs.emit('error', err) + } + + // skip "double subscription" error + if (!data.Message) { + rs.emit('data', parseMessage(data)) + } + }) + response.on('end', function () { + rs.emit('end') + }) + }) + rs.cancel = () => { + request.abort() + activeSubscriptions = removeSubscription(activeSubscriptions, topic) + } + return rs + }, + publish: promisify((topic, data, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + if (!options) { + options = {} + } + + const isBuffer = Buffer.isBuffer(data) + const buf = isBuffer ? data : new Buffer(data) + + send({ + path: 'pubsub/pub', + args: [topic, buf] + }, (err, result) => { + if (err) { + return callback(err) + } + callback(null, true) + }) + }) + } +} diff --git a/src/index.js b/src/index.js index e5b4f3399..7aa19790f 100644 --- a/src/index.js +++ b/src/index.js @@ -36,7 +36,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) { } const requestAPI = getRequestAPI(config) - const cmds = loadCommands(requestAPI) + const cmds = loadCommands(requestAPI, config) cmds.send = requestAPI cmds.Buffer = Buffer diff --git a/src/load-commands.js b/src/load-commands.js index b69c197cb..537286c4a 100644 --- a/src/load-commands.js +++ b/src/load-commands.js @@ -22,6 +22,7 @@ function requireCommands () { object: require('./api/object'), pin: require('./api/pin'), ping: require('./api/ping'), + pubsub: require('./api/pubsub'), refs: require('./api/refs'), repo: require('./api/repo'), swarm: require('./api/swarm'), @@ -53,12 +54,12 @@ function requireCommands () { return cmds } -function loadCommands (send) { +function loadCommands (send, config) { const files = requireCommands() const cmds = {} Object.keys(files).forEach((file) => { - cmds[file] = files[file](send) + cmds[file] = files[file](send, config) }) return cmds diff --git a/test/ipfs-api/pubsub.spec.js b/test/ipfs-api/pubsub.spec.js new file mode 100644 index 000000000..862a072c2 --- /dev/null +++ b/test/ipfs-api/pubsub.spec.js @@ -0,0 +1,130 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ['error', 8] */ +'use strict' + +const expect = require('chai').expect +const isNode = require('detect-node') +const FactoryClient = require('../factory/factory-client') +const map = require('async/map') + +const topicName = 'js-ipfs-api-tests' + +const publish = (ipfs, data, callback) => { + ipfs.pubsub.publish(topicName, data, (err, successful) => { + expect(err).to.not.exist + expect(successful).to.equal(true) + callback() + }) +} + +describe('.pubsub', () => { + if (!isNode) { + return + } + + let ipfs + let fc + + before(function (done) { + fc = new FactoryClient() + fc.spawnNode((err, node) => { + expect(err).to.not.exist + if (err) done(err) + ipfs = node + done() + }) + }) + + after((done) => { + fc.dismantle(done) + }) + + describe('.publish', () => { + it('message from string', (done) => { + publish(ipfs, 'hello friend', done) + }) + it('message from buffer', (done) => { + publish(ipfs, new Buffer('hello friend'), done) + }) + }) + + describe('.subscribe', () => { + it('one topic', (done) => { + const subscription = ipfs.pubsub.subscribe(topicName) + subscription.on('data', (d) => { + expect(d.data).to.equal('hi') + subscription.cancel() + }) + subscription.on('end', () => { + done() + }) + setTimeout(publish.bind(null, ipfs, 'hi', () => {}), 0) + }) + it('fails when already subscribed', () => { + const firstSub = ipfs.pubsub.subscribe(topicName) + let caughtErr = null + try { + ipfs.pubsub.subscribe(topicName) + } catch (err) { + caughtErr = err + } + expect(caughtErr.toString()).to.equal('Error: Already subscribed to ' + topicName) + firstSub.cancel() + }) + it('receive multiple messages', (done) => { + let receivedMessages = [] + let interval = null + const expectedMessages = 2 + const subscription = ipfs.pubsub.subscribe(topicName) + subscription.on('data', (d) => { + receivedMessages.push(d.data) + if (receivedMessages.length === expectedMessages) { + receivedMessages.forEach((msg) => { + expect(msg).to.be.equal('hi') + }) + clearInterval(interval) + subscription.cancel() + done() + } + }) + + setTimeout(() => { + interval = setInterval(publish.bind(null, ipfs, 'hi', () => {}), 10) + }, 10) + }) + }) + describe('multiple nodes pub/sub', () => { + let clients = {} + before(function (done) { + const keys = ['a', 'b'] + fc = new FactoryClient() + map(['a', 'b'], (_, cb) => { + return fc.spawnNode(cb) + }, (err, nodes) => { + if (err) return done(err) + keys.forEach((key, i) => { + clients[key] = nodes[i] + }) + done() + }) + }) + after((done) => { + fc.dismantle(done) + }) + it('receive messages from different node', (done) => { + const expectedString = 'hello from the other side' + const subscription = clients.a.pubsub.subscribe(topicName) + subscription.on('data', (d) => { + expect(d.data).to.be.equal(expectedString) + subscription.cancel() + done() + }) + setTimeout(() => { + clients.b.pubsub.publish(topicName, expectedString, (err, result) => { + expect(err).to.not.exist + expect(result).to.equal(true) + }) + }, 100) + }) + }) +})