Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 5695f39

Browse files
fix issues due to the new event pubsub api
1 parent af2b096 commit 5695f39

File tree

2 files changed

+13
-30
lines changed

2 files changed

+13
-30
lines changed

src/api/pubsub.js

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ const eos = require('end-of-stream')
66
const PubsubMessageStream = require('../pubsub-message-stream')
77
const stringlistToArray = require('../stringlist-to-array')
88

9-
/* Internal subscriptions state and functions */
10-
const ps = new EventEmitter()
11-
const subscriptions = {}
12-
139
/* Public API */
1410
module.exports = (send) => {
11+
/* Internal subscriptions state and functions */
12+
const ps = new EventEmitter()
13+
const subscriptions = {}
14+
ps.id = Math.random()
1515
return {
1616
subscribe: (topic, options, handler, callback) => {
1717
const defaultOptions = {
@@ -44,9 +44,16 @@ module.exports = (send) => {
4444
subscribe(topic, options, handler, callback)
4545
},
4646
unsubscribe (topic, handler) {
47+
if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) {
48+
throw new Error(`Not subscribed to '${topic}'`)
49+
}
50+
4751
ps.removeListener(topic, handler)
52+
53+
// Drop the request once we are actualy done
4854
if (ps.listenerCount(topic) === 0) {
4955
subscriptions[topic].abort()
56+
subscriptions[topic] = null
5057
}
5158
},
5259
publish: promisify((topic, data, callback) => {
@@ -85,7 +92,6 @@ module.exports = (send) => {
8592

8693
function subscribe (topic, options, handler, callback) {
8794
ps.on(topic, handler)
88-
8995
if (subscriptions[topic]) {
9096
return callback()
9197
}
@@ -104,6 +110,7 @@ module.exports = (send) => {
104110
subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
105111
if (err) {
106112
subscriptions[topic] = null
113+
ps.removeListener(topic, handler)
107114
return callback(err)
108115
}
109116

@@ -121,6 +128,7 @@ module.exports = (send) => {
121128
}
122129

123130
subscriptions[topic] = null
131+
ps.removeListener(topic, handler)
124132
})
125133

126134
callback()

test/ipfs-api/pubsub-message.spec.js

Lines changed: 0 additions & 25 deletions
This file was deleted.

0 commit comments

Comments
 (0)