@@ -6,12 +6,12 @@ const eos = require('end-of-stream')
6
6
const PubsubMessageStream = require ( '../pubsub-message-stream' )
7
7
const stringlistToArray = require ( '../stringlist-to-array' )
8
8
9
- /* Internal subscriptions state and functions */
10
- const ps = new EventEmitter ( )
11
- const subscriptions = { }
12
-
13
9
/* Public API */
14
10
module . exports = ( send ) => {
11
+ /* Internal subscriptions state and functions */
12
+ const ps = new EventEmitter ( )
13
+ const subscriptions = { }
14
+ ps . id = Math . random ( )
15
15
return {
16
16
subscribe : ( topic , options , handler , callback ) => {
17
17
const defaultOptions = {
@@ -44,9 +44,16 @@ module.exports = (send) => {
44
44
subscribe ( topic , options , handler , callback )
45
45
} ,
46
46
unsubscribe ( topic , handler ) {
47
+ if ( ps . listenerCount ( topic ) === 0 || ! subscriptions [ topic ] ) {
48
+ throw new Error ( `Not subscribed to '${ topic } '` )
49
+ }
50
+
47
51
ps . removeListener ( topic , handler )
52
+
53
+ // Drop the request once we are actualy done
48
54
if ( ps . listenerCount ( topic ) === 0 ) {
49
55
subscriptions [ topic ] . abort ( )
56
+ subscriptions [ topic ] = null
50
57
}
51
58
} ,
52
59
publish : promisify ( ( topic , data , callback ) => {
@@ -85,7 +92,6 @@ module.exports = (send) => {
85
92
86
93
function subscribe ( topic , options , handler , callback ) {
87
94
ps . on ( topic , handler )
88
-
89
95
if ( subscriptions [ topic ] ) {
90
96
return callback ( )
91
97
}
@@ -104,6 +110,7 @@ module.exports = (send) => {
104
110
subscriptions [ topic ] = send . andTransform ( request , PubsubMessageStream . from , ( err , stream ) => {
105
111
if ( err ) {
106
112
subscriptions [ topic ] = null
113
+ ps . removeListener ( topic , handler )
107
114
return callback ( err )
108
115
}
109
116
@@ -121,6 +128,7 @@ module.exports = (send) => {
121
128
}
122
129
123
130
subscriptions [ topic ] = null
131
+ ps . removeListener ( topic , handler )
124
132
} )
125
133
126
134
callback ( )
0 commit comments