Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit af998bb

Browse files
refactor to new event based pubsub api
1 parent 5198b96 commit af998bb

8 files changed

+107
-173
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
"ipld-dag-pb": "^0.9.3",
3535
"is-ipfs": "^0.2.1",
3636
"isstream": "^0.1.2",
37-
"js-base64": "^2.1.9",
3837
"lru-cache": "^4.0.2",
3938
"multiaddr": "^2.1.1",
4039
"multipart-stream": "^2.0.1",

src/api/pubsub.js

Lines changed: 72 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,54 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const EventEmitter = require('events')
5+
const eos = require('end-of-stream')
46
const PubsubMessageStream = require('../pubsub-message-stream')
57
const stringlistToArray = require('../stringlist-to-array')
68

79
/* Internal subscriptions state and functions */
8-
let subscriptions = {}
9-
10-
const addSubscription = (topic, request) => {
11-
subscriptions[topic] = { request: request }
12-
}
13-
14-
const removeSubscription = promisify((topic, callback) => {
15-
if (!subscriptions[topic]) {
16-
return callback(new Error(`Not subscribed to ${topic}`))
17-
}
18-
19-
subscriptions[topic].request.abort()
20-
delete subscriptions[topic]
21-
22-
if (callback) {
23-
callback(null)
24-
}
25-
})
10+
const ps = new EventEmitter()
11+
const subscriptions = {}
2612

2713
/* Public API */
2814
module.exports = (send) => {
2915
return {
30-
subscribe: promisify((topic, options, callback) => {
16+
subscribe: (topic, options, handler, callback) => {
3117
const defaultOptions = {
3218
discover: false
3319
}
3420

3521
if (typeof options === 'function') {
36-
callback = options
22+
callback = handler
23+
handler = options
3724
options = defaultOptions
3825
}
3926

4027
if (!options) {
4128
options = defaultOptions
4229
}
4330

44-
// If we're already subscribed, return an error
45-
if (subscriptions[topic]) {
46-
return callback(new Error(`Already subscribed to '${topic}'`))
31+
// promisify doesn't work as we always pass a
32+
// function as last argument (`handler`)
33+
if (!callback) {
34+
return new Promise((resolve, reject) => {
35+
subscribe(topic, options, handler, (err) => {
36+
if (err) {
37+
return reject(err)
38+
}
39+
resolve()
40+
})
41+
})
4742
}
4843

49-
// Request params
50-
const request = {
51-
path: 'pubsub/sub',
52-
args: [topic],
53-
qs: { discover: options.discover }
44+
subscribe(topic, options, handler, callback)
45+
},
46+
unsubscribe (topic, handler) {
47+
ps.removeListener(topic, handler)
48+
if (ps.listenerCount(topic) === 0) {
49+
subscriptions[topic].abort()
5450
}
55-
56-
// Start the request and transform the response stream to Pubsub messages stream
57-
const req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
58-
if (err) {
59-
return callback(err)
60-
}
61-
// Add a cancel method to the stream so that the subscription can be cleanly cancelled
62-
stream.cancel = promisify((cb) => removeSubscription(topic, cb))
63-
// Add the request to the active subscriptions and return the stream
64-
addSubscription(topic, req)
65-
callback(null, stream)
66-
})
67-
}),
51+
},
6852
publish: promisify((topic, data, callback) => {
6953
const buf = Buffer.isBuffer(data) ? data : new Buffer(data)
7054

@@ -93,6 +77,53 @@ module.exports = (send) => {
9377
}
9478

9579
send.andTransform(request, stringlistToArray, callback)
80+
}),
81+
setMaxListeners (n) {
82+
return ps.setMaxListeners(n)
83+
}
84+
}
85+
86+
function subscribe (topic, options, handler, callback) {
87+
ps.on(topic, handler)
88+
89+
if (subscriptions[topic]) {
90+
return callback()
91+
}
92+
93+
// Request params
94+
const request = {
95+
path: 'pubsub/sub',
96+
args: [topic],
97+
qs: {
98+
discover: options.discover
99+
}
100+
}
101+
102+
// Start the request and transform the response
103+
// stream to Pubsub messages stream
104+
subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
105+
if (err) {
106+
subscriptions[topic] = null
107+
return callback(err)
108+
}
109+
110+
stream.on('data', (msg) => {
111+
ps.emit(topic, msg)
112+
})
113+
114+
stream.on('error', (err) => {
115+
ps.emit('error', err)
116+
})
117+
118+
eos(stream, (err) => {
119+
if (err) {
120+
ps.emit('error', err)
121+
}
122+
123+
subscriptions[topic] = null
124+
})
125+
126+
callback()
96127
})
97128
}
98129
}

src/pubsub-message-stream.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@ class PubsubMessageStream extends TransformStream {
1616
}
1717

1818
_transform (obj, enc, callback) {
19+
let msg
1920
try {
20-
const message = PubsubMessage.deserialize(obj, 'base64')
21-
this.push(message)
21+
msg = PubsubMessage.deserialize(obj, 'base64')
2222
} catch (e) {
2323
// Not a valid pubsub message
2424
// go-ipfs returns '{}' as the very first object atm, we skip that
25+
return callback()
2526
}
27+
28+
this.push(msg)
2629
callback()
2730
}
2831
}

src/pubsub-message-utils.js

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,38 @@
11
'use strict'
22

3-
const Base58 = require('bs58')
4-
const Base64 = require('js-base64').Base64
5-
const PubsubMessage = require('./pubsub-message')
6-
7-
class PubsubMessageUtils {
8-
static create (senderId, data, seqNo, topics) {
9-
return new PubsubMessage(senderId, data, seqNo, topics)
10-
}
11-
12-
static deserialize (data, enc = 'json') {
3+
module.exports = {
4+
deserialize (data, enc = 'json') {
135
enc = enc ? enc.toLowerCase() : null
146

157
if (enc === 'json') {
16-
return PubsubMessageUtils._deserializeFromJson(data)
8+
return deserializeFromJson(data)
179
} else if (enc === 'base64') {
18-
return PubsubMessageUtils._deserializeFromBase64(data)
10+
return deserializeFromBase64(data)
1911
}
2012

2113
throw new Error(`Unsupported encoding: '${enc}'`)
2214
}
15+
}
2316

24-
static _deserializeFromJson (data) {
25-
const json = JSON.parse(data)
26-
return PubsubMessageUtils._deserializeFromBase64(json)
27-
}
28-
29-
static _deserializeFromBase64 (obj) {
30-
if (!PubsubMessageUtils._isPubsubMessage(obj)) {
31-
throw new Error(`Not a pubsub message`)
32-
}
33-
34-
const senderId = Base58.encode(obj.from)
35-
const payload = Base64.decode(obj.data)
36-
const seqno = Base64.decode(obj.seqno)
37-
const topics = obj.topicIDs || obj.topicCIDs
17+
function deserializeFromJson (data) {
18+
const json = JSON.parse(data)
19+
return deserializeFromBase64(json)
20+
}
3821

39-
return PubsubMessageUtils.create(senderId, payload, seqno, topics)
22+
function deserializeFromBase64 (obj) {
23+
if (!isPubsubMessage(obj)) {
24+
throw new Error(`Not a pubsub message`)
4025
}
4126

42-
static _isPubsubMessage (obj) {
43-
return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
27+
return {
28+
// TODO: broken see https://github.com/ipfs/go-ipfs/issues/3522
29+
from: obj.from,
30+
seqno: new Buffer(obj.seqno, 'base64'),
31+
data: new Buffer(obj.data, 'base64'),
32+
topicCIDs: obj.topicIDs || obj.topicCIDs
4433
}
4534
}
4635

47-
module.exports = PubsubMessageUtils
36+
function isPubsubMessage (obj) {
37+
return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
38+
}

src/pubsub-message.js

Lines changed: 0 additions & 28 deletions
This file was deleted.

test/factory/daemon-spawner.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ function spawnEphemeralNode (callback) {
7878
Discovery: {},
7979
API: {
8080
'HTTPHeaders.Access-Control-Allow-Origin': ['*'],
81-
'HTTPHeaders.Access-Control-Allow-Credentials': 'true',
81+
'HTTPHeaders.Access-Control-Allow-Credentials': ['true'],
8282
'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET']
8383
}
8484
}

test/ipfs-api/pubsub-message.spec.js

Lines changed: 7 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,86 +2,24 @@
22
'use strict'
33

44
const expect = require('chai').expect
5-
// const PubsubMessage = require('../../src/pubsub-message')
65
const PubsubMessageUtils = require('../../src/pubsub-message-utils')
76

8-
// NOTE!
9-
// (Most of) these tests are skipped for now until we figure out the
10-
// final data types for the messages coming over the wire
11-
127
const topicName = 'js-ipfs-api-tests'
138

149
describe('.pubsub-message', () => {
15-
it.skip('create message', () => {
16-
// TODO
17-
})
18-
19-
it.skip('deserialize message from JSON object', () => {
10+
it('deserialize message from JSON object', () => {
2011
const obj = {
2112
from: 'BI:ۛv�m�uyѱ����tU�+��#���V',
2213
data: 'aGk=',
2314
seqno: 'FIlj2BpyEgI=',
2415
topicIDs: [ topicName ]
2516
}
26-
try {
27-
const message = PubsubMessageUtils.deserialize(obj)
28-
expect(message.from).to.equal('AAA')
29-
expect(message.data).to.equal('hi')
30-
expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002')
31-
expect(message.topicIDs.length).to.equal(1)
32-
expect(message.topicIDs[0]).to.equal(topicName)
33-
} catch (e) {
34-
expect(e).to.not.exist
35-
}
36-
})
37-
38-
describe('immutable properties', () => {
39-
const sender = 'A'
40-
const data = 'hello'
41-
const seqno = '123'
42-
const topicIDs = ['hello world']
43-
44-
const message = PubsubMessageUtils.create(sender, data, seqno, topicIDs)
45-
46-
it('from', () => {
47-
try {
48-
message.from = 'not allowed'
49-
} catch (e) {
50-
expect(e).to.be.an('error')
51-
expect(e.toString()).to.equal(`TypeError: Cannot set property from of #<PubsubMessage> which has only a getter`)
52-
}
53-
expect(message.from).to.equal(sender)
54-
})
55-
56-
it('data', () => {
57-
try {
58-
message.data = 'not allowed'
59-
} catch (e) {
60-
expect(e).to.be.an('error')
61-
expect(e.toString()).to.equal(`TypeError: Cannot set property data of #<PubsubMessage> which has only a getter`)
62-
}
63-
expect(message.data).to.equal(data)
64-
})
65-
66-
it('seqno', () => {
67-
try {
68-
message.seqno = 'not allowed'
69-
} catch (e) {
70-
expect(e).to.be.an('error')
71-
expect(e.toString()).to.equal(`TypeError: Cannot set property seqno of #<PubsubMessage> which has only a getter`)
72-
}
73-
expect(message.seqno).to.equal(seqno)
74-
})
7517

76-
it('topicIDs', () => {
77-
try {
78-
message.topicIDs = ['not allowed']
79-
} catch (e) {
80-
expect(e).to.be.an('error')
81-
expect(e.toString()).to.equal(`TypeError: Cannot set property topicIDs of #<PubsubMessage> which has only a getter`)
82-
}
83-
expect(message.topicIDs[0]).to.equal(topicIDs[0])
84-
expect(message.topicIDs.length).to.equal(topicIDs.length)
85-
})
18+
const message = PubsubMessageUtils.deserialize(obj)
19+
expect(message.from).to.equal('AAA')
20+
expect(message.data).to.equal('hi')
21+
expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002')
22+
expect(message.topicIDs.length).to.equal(1)
23+
expect(message.topicIDs[0]).to.equal(topicName)
8624
})
8725
})

test/setup/spawn-daemons.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ function startDisposableDaemons (callback) {
2929
Bootstrap: [],
3030
Discovery: {},
3131
'API.HTTPHeaders.Access-Control-Allow-Origin': ['*'],
32-
'API.HTTPHeaders.Access-Control-Allow-Credentials': 'true',
32+
'API.HTTPHeaders.Access-Control-Allow-Credentials': ['true'],
3333
'API.HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET']
3434
}
3535

0 commit comments

Comments
 (0)