Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

fix: make pubsub.unsubscribe async and alter pubsub.subscribe signature #260

Merged
merged 8 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions SPEC/PUBSUB.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ pubsub API

##### `Go` **WIP**

##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, handler, callback)
##### `JavaScript` - ipfs.pubsub.subscribe(options, callback)

- `topic: string`
- `options: Object` - (Optional), might contain the following properties:
- `discover`: type: Boolean - Will use the DHT to find other peers.
- `handler: (msg) => ()` - Event handler which will be called with a message object everytime one is received. The `msg` has the format `{from: string, seqno: Buffer, data: Buffer, topicIDs: Array<string>}`.
- `callback: (Error) => ()` (Optional) Called once the subscription is established.
- `options: Object`: Object containing the following properties:
- `topic: string`
- `discover: Boolean` - Will use the DHT to find other peers.
- `handler: (msg) => ()` - Event handler which will be called with a message object everytime one is received. The `msg` has the format `{from: string, seqno: Buffer, data: Buffer, topicIDs: Array<string>}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler moves into the options object? That's a bit of a weird pattern, much nicer when it was separated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm torn here. The thing is that promisify'ing things forces a weird arg check case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @victorbjelkholm on this one, I'd prefer that the implementation choices in js-ipfs didn't affect the public API.

i.e. I don't think promisify should dictate what our public API looks like even if it means handling some difficult argument combinations.

IMHO the most intuitive API would be this:

ipfs.pubsub.subscribe(topic, handler, options, callback)
ipfs.pubsub.unsubscribe(topic, handler, callback)

Most important and non-optional arguments first, followed by optional args.

That said, if you just renamed the options arg to something like details then that would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe your proposal of:

ipfs.pubsub.subscribe(topic, handler, options, callback)
ipfs.pubsub.unsubscribe(topic, handler, callback)

Pretty much solves the main issue. I like it!

- `callback: (Error) => ()` Called once the subscription is established.

If no `callback` is passed, a [promise][] is returned.

Expand All @@ -28,7 +28,10 @@ const receiveMsg = (msg) => {
console.log(msg.data.toString())
}

ipfs.pubsub.subscribe(topic, receiveMsg)
ipfs.pubsub.subscribe({
topic: topic,
handler: receiveMsg
})
```

A great source of [examples][] can be found in the tests for this API.
Expand All @@ -39,10 +42,14 @@ A great source of [examples][] can be found in the tests for this API.

##### `Go` **WIP**

##### `JavaScript` - `ipfs.pubsub.unsubscribe(topic, handler)`
##### `JavaScript` - `ipfs.pubsub.unsubscribe(options, callback)`

- `options: Object`: Object containing the following properties:
- `topic: string` - The topic to unsubscribe from
- `handler: (msg) => ()` - The handler to remove.
- `callback: (Error) => ()` (Optional) Called once the unsubscribe is done.

- `topic: string` - The topic to unsubscribe from
- `handler: (msg) => ()` - The handler to remove.
If no `callback` is passed, a [promise][] is returned.

This works like `EventEmitter.removeListener`, as that only the `handler` passed to a `subscribe` call before is removed from listening. The underlying subscription will only be canceled once all listeners for a topic have been removed.

Expand All @@ -55,12 +62,18 @@ const receiveMsg = (msg) => {
console.log(msg.toString())
}

ipfs.pubsub.subscribe(topic, receiveMsg)

setTimeout(() => {
// unsubscribe a second later
ipfs.pubsub.unsubscribe(topic, receiveMsg)
}, 1000)
ipfs.pubsub.subscribe({
topic: topic,
handler: receiveMsg
}, () => {
setTimeout(() => {
// unsubscribe a second later
ipfs.pubsub.unsubscribe({
topic: topic,
handler: receiveMsg
})
}, 1000)
})
```

A great source of [examples][] can be found in the tests for this API.
Expand Down
85 changes: 41 additions & 44 deletions js/src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const series = require('async/series')
const each = require('async/each')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
const whilst = require('async/whilst')
Expand Down Expand Up @@ -137,12 +138,12 @@ module.exports = (common) => {
expect(msg).to.have.property('topicIDs').eql([topic])
expect(msg).to.have.property('from', ipfs1.peerId.id)

ipfs1.pubsub.unsubscribe(topic, handler)

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
ipfs1.pubsub.unsubscribe(topic, handler, () => {
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
})
})
}

Expand All @@ -163,12 +164,12 @@ module.exports = (common) => {
expect(msg).to.have.property('topicIDs').eql([topic])
expect(msg).to.have.property('from', ipfs1.peerId.id)

ipfs1.pubsub.unsubscribe(topic, handler)

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
ipfs1.pubsub.unsubscribe(topic, handler, () => {
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
})
})
}

Expand All @@ -189,12 +190,12 @@ module.exports = (common) => {
expect(msg).to.have.property('topicIDs').eql([topic])
expect(msg).to.have.property('from', ipfs1.peerId.id)

ipfs1.pubsub.unsubscribe(topic, handler)

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
ipfs1.pubsub.unsubscribe(topic, handler, () => {
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
})
})
}

Expand All @@ -211,14 +212,10 @@ module.exports = (common) => {
const handler1 = (msg) => {
expect(msg.data.toString()).to.eql('hello')

ipfs1.pubsub.unsubscribe(topic, handler1)

series([
(cb) => ipfs1.pubsub.unsubscribe(topic, handler1, cb)
(cb) => ipfs1.pubsub.ls(cb),
(cb) => {
ipfs1.pubsub.unsubscribe(topic, handler2)
cb()
},
(cb) => ipfs1.pubsub.unsubscribe(topic, handler2, cb),
(cb) => ipfs1.pubsub.ls(cb)
], (err, res) => {
expect(err).to.not.exist()
Expand Down Expand Up @@ -251,8 +248,7 @@ module.exports = (common) => {

const handler = (msg) => {
expect(msg.data.toString()).to.eql('hi')
ipfs1.pubsub.unsubscribe(topic, handler)
check()
ipfs1.pubsub.unsubscribe(topic, handler, check)
}

ipfs1.pubsub.subscribe(topic, {
Expand Down Expand Up @@ -389,8 +385,7 @@ module.exports = (common) => {
expect(err).to.not.exist()
expect(topics).to.be.eql([topic])

ipfs1.pubsub.unsubscribe(topic, sub1)
done()
ipfs1.pubsub.unsubscribe(topic, sub1, done)
})
})
})
Expand All @@ -414,11 +409,8 @@ module.exports = (common) => {
ipfs1.pubsub.ls((err, list) => {
expect(err).to.not.exist()

expect(
list.sort()
).to.be.eql(
topics.map((t) => t.name).sort()
)
expect(list.sort())
.to.eql(topics.map((t) => t.name).sort())

topics.forEach((t) => {
ipfs1.pubsub.unsubscribe(t.name, t.handler)
Expand All @@ -439,9 +431,11 @@ module.exports = (common) => {
topic = getTopic()
})

afterEach(() => {
ipfs1.pubsub.unsubscribe(topic, sub1)
ipfs2.pubsub.unsubscribe(topic, sub2)
afterEach((done) => {
parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb)
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb)
], done)
})

it('receive messages from different node', (done) => {
Expand Down Expand Up @@ -673,14 +667,17 @@ module.exports = (common) => {
},
(err) => {
expect(err).to.not.exist()
handlers.forEach((handler) => {
ipfs1.pubsub.unsubscribe(someTopic, handler)
})

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.eql([])
done()
each(
handlers,
(handler, cb) => ipfs1.pubsub.unsubscribe(someTopic, handler, cb)
(err) => {
expect(err).to.not.exist()
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.eql([])
done()
})
})
})
}
)
Expand Down