Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit 1e9d909

Browse files
committed
chore: complete pubsub tests
1 parent 91dba97 commit 1e9d909

File tree

9 files changed

+410
-164
lines changed

9 files changed

+410
-164
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"scripts": {
2525
"lint": "aegir lint",
2626
"build": "aegir build",
27+
"prepare": "aegir build --no-bundle",
2728
"test": "aegir test",
2829
"test:node": "aegir test --target node",
2930
"test:browser": "aegir test --target browser",

src/pubsub/tests/api.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/* eslint-env mocha */
22
'use strict'
33

4-
const chai = require('chai')
5-
const { expect } = chai
4+
const { expect } = require('aegir/utils/chai')
65
const sinon = require('sinon')
76

87
const pDefer = require('p-defer')
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/* eslint-env mocha */
2+
'use strict'
3+
4+
const { expect } = require('aegir/utils/chai')
5+
const sinon = require('sinon')
6+
const pDefer = require('p-defer')
7+
const pWaitFor = require('p-wait-for')
8+
const uint8ArrayToString = require('uint8arrays/to-string')
9+
10+
const { expectSet } = require('./utils')
11+
12+
module.exports = (common) => {
13+
describe('pubsub connection handlers', () => {
14+
let psA, psB
15+
16+
describe('nodes send state on connection', () => {
17+
// Create pubsub nodes and connect them
18+
before(async () => {
19+
[psA, psB] = await common.setup(2)
20+
21+
expect(psA.peers.size).to.be.eql(0)
22+
expect(psB.peers.size).to.be.eql(0)
23+
24+
// Start pubsub
25+
psA.start()
26+
psB.start()
27+
})
28+
29+
// Make subscriptions prior to nodes connected
30+
before(() => {
31+
psA.subscribe('Za')
32+
psB.subscribe('Zb')
33+
34+
expect(psA.peers.size).to.equal(0)
35+
expectSet(psA.subscriptions, ['Za'])
36+
expect(psB.peers.size).to.equal(0)
37+
expectSet(psB.subscriptions, ['Zb'])
38+
})
39+
40+
after(async () => {
41+
sinon.restore()
42+
await common.teardown()
43+
})
44+
45+
it('existing subscriptions are sent upon peer connection', async function () {
46+
this.timeout(10e3)
47+
48+
await Promise.all([
49+
psA._libp2p.dial(psB.peerId),
50+
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
51+
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
52+
])
53+
54+
expect(psA.peers.size).to.equal(1)
55+
expect(psB.peers.size).to.equal(1)
56+
57+
expectSet(psA.subscriptions, ['Za'])
58+
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
59+
60+
expectSet(psB.subscriptions, ['Zb'])
61+
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
62+
})
63+
})
64+
65+
describe('pubsub started before connect', () => {
66+
// Create pubsub nodes and start them
67+
beforeEach(async () => {
68+
[psA, psB] = await common.setup(2)
69+
70+
psA.start()
71+
psB.start()
72+
})
73+
74+
afterEach(async () => {
75+
sinon.restore()
76+
77+
await common.teardown()
78+
})
79+
80+
it('should get notified of connected peers on dial', async () => {
81+
const connection = await psA._libp2p.dial(psB.peerId)
82+
expect(connection).to.exist()
83+
84+
return Promise.all([
85+
pWaitFor(() => psA.peers.size === 1),
86+
pWaitFor(() => psB.peers.size === 1)
87+
])
88+
})
89+
90+
it('should receive pubsub messages', async () => {
91+
const defer = pDefer()
92+
const topic = 'test-topic'
93+
const data = 'hey!'
94+
95+
await psA._libp2p.dial(psB.peerId)
96+
97+
let subscribedTopics = psA.getTopics()
98+
expect(subscribedTopics).to.not.include(topic)
99+
100+
psA.on(topic, (msg) => {
101+
expect(uint8ArrayToString(msg.data)).to.equal(data)
102+
defer.resolve()
103+
})
104+
psA.subscribe(topic)
105+
106+
subscribedTopics = psA.getTopics()
107+
expect(subscribedTopics).to.include(topic)
108+
109+
// wait for psB to know about psA subscription
110+
await pWaitFor(() => {
111+
const subscribedPeers = psB.getSubscribers(topic)
112+
return subscribedPeers.includes(psA.peerId.toB58String())
113+
})
114+
psB.publish(topic, data)
115+
116+
await defer.promise
117+
})
118+
})
119+
120+
describe('pubsub started after connect', () => {
121+
// Create pubsub nodes
122+
beforeEach(async () => {
123+
[psA, psB] = await common.setup(2)
124+
})
125+
126+
afterEach(async () => {
127+
sinon.restore()
128+
129+
psA && psA.stop()
130+
psB && psB.stop()
131+
132+
await common.teardown()
133+
})
134+
135+
it('should get notified of connected peers after starting', async () => {
136+
const connection = await psA._libp2p.dial(psB.peerId)
137+
expect(connection).to.exist()
138+
expect(psA.peers.size).to.be.eql(0)
139+
expect(psB.peers.size).to.be.eql(0)
140+
141+
psA.start()
142+
psB.start()
143+
144+
return Promise.all([
145+
pWaitFor(() => psA.peers.size === 1),
146+
pWaitFor(() => psB.peers.size === 1)
147+
])
148+
})
149+
150+
it('should receive pubsub messages', async () => {
151+
const defer = pDefer()
152+
const topic = 'test-topic'
153+
const data = 'hey!'
154+
155+
await psA._libp2p.dial(psB.peerId)
156+
157+
psA.start()
158+
psB.start()
159+
160+
await Promise.all([
161+
pWaitFor(() => psA.peers.size === 1),
162+
pWaitFor(() => psB.peers.size === 1)
163+
])
164+
165+
let subscribedTopics = psA.getTopics()
166+
expect(subscribedTopics).to.not.include(topic)
167+
168+
psA.on(topic, (msg) => {
169+
expect(uint8ArrayToString(msg.data)).to.equal(data)
170+
defer.resolve()
171+
})
172+
psA.subscribe(topic)
173+
174+
subscribedTopics = psA.getTopics()
175+
expect(subscribedTopics).to.include(topic)
176+
177+
// wait for psB to know about psA subscription
178+
await pWaitFor(() => {
179+
const subscribedPeers = psB.getSubscribers(topic)
180+
return subscribedPeers.includes(psA.peerId.toB58String())
181+
})
182+
psB.publish(topic, data)
183+
184+
await defer.promise
185+
})
186+
})
187+
188+
describe('pubsub with intermittent connections', () => {
189+
// Create pubsub nodes and start them
190+
beforeEach(async () => {
191+
[psA, psB] = await common.setup(2)
192+
193+
psA.start()
194+
psB.start()
195+
})
196+
197+
afterEach(async () => {
198+
sinon.restore()
199+
200+
psA && psA.stop()
201+
psB && psB.stop()
202+
203+
await common.teardown()
204+
})
205+
206+
it('should receive pubsub messages after a node restart', async function () {
207+
this.timeout(10e3)
208+
const topic = 'test-topic'
209+
const data = 'hey!'
210+
const psAid = psA.peerId.toB58String()
211+
212+
let counter = 0
213+
const defer1 = pDefer()
214+
const defer2 = pDefer()
215+
216+
await psA._libp2p.dial(psB.peerId)
217+
218+
let subscribedTopics = psA.getTopics()
219+
expect(subscribedTopics).to.not.include(topic)
220+
221+
psA.on(topic, (msg) => {
222+
expect(uint8ArrayToString(msg.data)).to.equal(data)
223+
counter++
224+
counter === 1 ? defer1.resolve() : defer2.resolve()
225+
})
226+
psA.subscribe(topic)
227+
228+
subscribedTopics = psA.getTopics()
229+
expect(subscribedTopics).to.include(topic)
230+
231+
// wait for psB to know about psA subscription
232+
await pWaitFor(() => {
233+
const subscribedPeers = psB.getSubscribers(topic)
234+
return subscribedPeers.includes(psAid)
235+
})
236+
psB.publish(topic, data)
237+
238+
await defer1.promise
239+
240+
psB.stop()
241+
await psB._libp2p.stop()
242+
await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId))
243+
await psB._libp2p.start()
244+
psB.start()
245+
246+
psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
247+
await psA._libp2p.dial(psB.peerId)
248+
249+
// wait for remoteLibp2p to know about libp2p subscription
250+
await pWaitFor(() => {
251+
const subscribedPeers = psB.getSubscribers(topic)
252+
return subscribedPeers.includes(psAid)
253+
})
254+
255+
psB.publish(topic, data)
256+
257+
await defer2.promise
258+
})
259+
260+
it('should handle quick reconnects with a delayed disconnect', async () => {
261+
// Subscribe on both
262+
const handlerSpy = sinon.spy()
263+
const topic = 'reconnect-channel'
264+
265+
psA.on(topic, handlerSpy)
266+
psB.on(topic, handlerSpy)
267+
await Promise.all([
268+
psA.subscribe(topic),
269+
psB.subscribe(topic)
270+
])
271+
272+
// Create two connections to the remote peer
273+
const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId)
274+
// second connection
275+
await psA._libp2p.dialer.connectToPeer(psB.peerId)
276+
expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2)
277+
278+
// Wait for subscriptions to occur
279+
await pWaitFor(() => {
280+
return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) &&
281+
psB.getSubscribers(topic).includes(psA.peerId.toB58String())
282+
})
283+
284+
// Verify messages go both ways
285+
psA.publish(topic, 'message1')
286+
psB.publish(topic, 'message2')
287+
await pWaitFor(() => handlerSpy.callCount >= 2)
288+
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])
289+
290+
// Disconnect the first connection (this acts as a delayed reconnect)
291+
const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set')
292+
293+
await originalConnection.close()
294+
await pWaitFor(() => psAConnUpdateSpy.callCount === 1)
295+
296+
// Verify messages go both ways after the disconnect
297+
handlerSpy.resetHistory()
298+
psA.publish(topic, 'message3')
299+
psB.publish(topic, 'message4')
300+
await pWaitFor(() => handlerSpy.callCount >= 2)
301+
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
302+
})
303+
})
304+
})
305+
}

src/pubsub/tests/emit-self.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/* eslint-env mocha */
22
'use strict'
33

4-
const chai = require('chai')
5-
const { expect } = chai
4+
const { expect } = require('aegir/utils/chai')
65
const sinon = require('sinon')
76

87
const uint8ArrayFromString = require('uint8arrays/from-string')

src/pubsub/tests/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
const apiTest = require('./api')
55
const emitSelfTest = require('./emit-self')
66
const messagesTest = require('./messages')
7+
const connectionHandlersTest = require('./connection-handlers')
78
const twoNodesTest = require('./two-nodes')
89
const multipleNodesTest = require('./multiple-nodes')
910

@@ -12,6 +13,7 @@ module.exports = (common) => {
1213
apiTest(common)
1314
emitSelfTest(common)
1415
messagesTest(common)
16+
connectionHandlersTest(common)
1517
twoNodesTest(common)
1618
multipleNodesTest(common)
1719
})

src/pubsub/tests/messages.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/* eslint-env mocha */
22
'use strict'
33

4-
const chai = require('chai')
5-
const { expect } = chai
4+
const { expect } = require('aegir/utils/chai')
65
const sinon = require('sinon')
76

87
const PeerId = require('peer-id')

src/pubsub/tests/multiple-nodes.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
/* eslint max-nested-callbacks: ["error", 6] */
33
'use strict'
44

5-
const chai = require('chai')
6-
const { expect } = chai
5+
const { expect } = require('aegir/utils/chai')
76
const sinon = require('sinon')
87

98
const delay = require('delay')

0 commit comments

Comments
 (0)