|
4 | 4 | const { expect } = require('aegir/utils/chai')
|
5 | 5 | const sinon = require('sinon')
|
6 | 6 | const uint8ArrayFromString = require('uint8arrays/from-string')
|
7 |
| - |
| 7 | +const uint8ArrayToString = require('uint8arrays/to-string') |
| 8 | +const { sha256 } = require('multiformats/hashes/sha2') |
8 | 9 | const { utils } = require('libp2p-interfaces/src/pubsub')
|
9 |
| -const pWaitFor = require('p-wait-for') |
| 10 | +const { SignaturePolicy } = require('libp2p-interfaces/src/pubsub/signature-policy') |
| 11 | +const PeerStreams = require('libp2p-interfaces/src/pubsub/peer-streams') |
| 12 | +const PeerId = require('peer-id') |
10 | 13 |
|
11 | 14 | const Floodsub = require('../src')
|
12 |
| -const { createPeers } = require('./utils/create-peer') |
13 | 15 |
|
14 | 16 | const defOptions = {
|
15 |
| - emitSelf: true |
| 17 | + emitSelf: true, |
| 18 | + globalSignaturePolicy: SignaturePolicy.StrictNoSign |
16 | 19 | }
|
17 | 20 |
|
18 | 21 | const topic = 'my-topic'
|
19 | 22 | const message = uint8ArrayFromString('a neat message')
|
20 | 23 |
|
21 | 24 | describe('floodsub', () => {
|
22 |
| - let floodsub1, floodsub2 |
23 |
| - let peer1, peer2 |
| 25 | + let floodsub |
24 | 26 |
|
25 | 27 | before(async () => {
|
26 | 28 | expect(Floodsub.multicodec).to.exist()
|
27 | 29 |
|
28 |
| - ;[peer1, peer2] = await createPeers({ number: 2 }) |
29 |
| - floodsub1 = new Floodsub(peer1, defOptions) |
30 |
| - floodsub2 = new Floodsub(peer2, defOptions) |
| 30 | + const libp2p = { |
| 31 | + peerId: await PeerId.create(), |
| 32 | + registrar: { |
| 33 | + handle: () => {}, |
| 34 | + register: () => {}, |
| 35 | + unregister: () => {} |
| 36 | + } |
| 37 | + } |
| 38 | + |
| 39 | + floodsub = new Floodsub(libp2p, defOptions) |
31 | 40 | })
|
32 | 41 |
|
33 | 42 | beforeEach(() => {
|
34 |
| - return Promise.all([ |
35 |
| - floodsub1.start(), |
36 |
| - floodsub2.start() |
37 |
| - ]) |
| 43 | + floodsub.start() |
38 | 44 | })
|
39 | 45 |
|
40 |
| - afterEach(async () => { |
| 46 | + afterEach(() => { |
41 | 47 | sinon.restore()
|
42 |
| - await floodsub1.stop() |
43 |
| - await floodsub2.stop() |
44 |
| - await peer1.stop() |
45 |
| - await peer2.stop() |
| 48 | + floodsub.stop() |
46 | 49 | })
|
47 | 50 |
|
48 |
| - it('checks cache when processing incoming message', async () => { |
49 |
| - sinon.spy(floodsub2.seenCache, 'has') |
50 |
| - sinon.spy(floodsub2.seenCache, 'put') |
51 |
| - sinon.spy(floodsub2, '_processRpcMessage') |
52 |
| - sinon.spy(floodsub2, '_publish') |
| 51 | + it('checks cache when processing incoming message', async function () { |
| 52 | + const otherPeer = await PeerId.create() |
| 53 | + const sig = await sha256.encode(message) |
| 54 | + const key = uint8ArrayToString(sig, 'base64') |
| 55 | + let callCount = 0 |
53 | 56 |
|
54 |
| - let messageReceived = false |
55 |
| - function checkMessage (msg) { |
56 |
| - messageReceived = true |
| 57 | + const peerStream = new PeerStreams({ |
| 58 | + id: otherPeer, |
| 59 | + protocol: 'test' |
| 60 | + }) |
| 61 | + const rpc = { |
| 62 | + subscriptions: [], |
| 63 | + msgs: [{ |
| 64 | + receivedFrom: peerStream.id.toB58String(), |
| 65 | + data: message, |
| 66 | + topicIDs: [topic] |
| 67 | + }] |
57 | 68 | }
|
58 | 69 |
|
59 |
| - // connect peers |
60 |
| - await floodsub1._libp2p.dial(floodsub2._libp2p.peerId) |
61 |
| - |
62 |
| - // subscribe and wait for subscription to be received in the other peer |
63 |
| - floodsub2.subscribe(topic) |
64 |
| - floodsub2.on(topic, checkMessage) |
65 |
| - await pWaitFor(() => { |
66 |
| - const subs = floodsub1.getSubscribers(topic) |
67 |
| - |
68 |
| - return subs.length === 1 |
| 70 | + floodsub.subscribe(topic) |
| 71 | + floodsub.on(topic, () => { |
| 72 | + callCount++ |
69 | 73 | })
|
70 | 74 |
|
71 |
| - await floodsub1.publish(topic, message) |
72 |
| - await pWaitFor(() => messageReceived === true) |
| 75 | + // the message should not be in the cache |
| 76 | + expect(floodsub.seenCache.has(key)).to.be.false() |
| 77 | + |
| 78 | + // receive the message once |
| 79 | + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) |
73 | 80 |
|
74 |
| - expect(floodsub2.seenCache.has.callCount).to.eql(2) // Put also calls .has |
75 |
| - expect(floodsub2.seenCache.put.callCount).to.eql(1) |
76 |
| - expect(floodsub2._publish.callCount).to.eql(1) // Forward message |
| 81 | + // should have received the message |
| 82 | + expect(callCount).to.equal(1) |
77 | 83 |
|
78 |
| - const [msgProcessed] = floodsub2._processRpcMessage.getCall(0).args |
| 84 | + // should be in the cache now |
| 85 | + expect(floodsub.seenCache.has(key)).to.be.true() |
79 | 86 |
|
80 |
| - // Force a second process for the message |
81 |
| - await floodsub2._processRpcMessage(msgProcessed) |
| 87 | + // receive the message multiple times |
| 88 | + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) |
| 89 | + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) |
| 90 | + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) |
82 | 91 |
|
83 |
| - expect(floodsub2.seenCache.has.callCount).to.eql(3) |
84 |
| - expect(floodsub2.seenCache.put.callCount).to.eql(1) // No new put |
85 |
| - expect(floodsub2._publish.callCount).to.eql(1) // Not forwarded |
| 92 | + // should only have emitted the message once |
| 93 | + expect(callCount).to.equal(1) |
86 | 94 | })
|
87 | 95 |
|
88 | 96 | it('forwards normalized messages on publish', async () => {
|
89 |
| - sinon.spy(floodsub1, '_forwardMessage') |
90 |
| - sinon.spy(utils, 'randomSeqno') |
| 97 | + sinon.spy(floodsub, '_forwardMessage') |
91 | 98 |
|
92 |
| - await floodsub1.publish(topic, message) |
93 |
| - expect(floodsub1._forwardMessage.callCount).to.eql(1) |
94 |
| - const [messageToEmit] = floodsub1._forwardMessage.getCall(0).args |
95 |
| - |
96 |
| - const computedSeqno = utils.randomSeqno.getCall(0).returnValue |
97 |
| - utils.randomSeqno.restore() |
98 |
| - sinon.stub(utils, 'randomSeqno').returns(computedSeqno) |
| 99 | + await floodsub.publish(topic, message) |
| 100 | + expect(floodsub._forwardMessage.callCount).to.equal(1) |
| 101 | + const [messageToEmit] = floodsub._forwardMessage.getCall(0).args |
99 | 102 |
|
100 | 103 | const expected = utils.normalizeInRpcMessage(
|
101 |
| - await floodsub1._buildMessage({ |
102 |
| - receivedFrom: peer1.peerId.toB58String(), |
103 |
| - from: peer1.peerId.toB58String(), |
| 104 | + await floodsub._buildMessage({ |
| 105 | + receivedFrom: floodsub.peerId.toB58String(), |
104 | 106 | data: message,
|
105 | 107 | topicIDs: [topic]
|
106 | 108 | }))
|
107 | 109 |
|
108 | 110 | expect(messageToEmit).to.eql(expected)
|
109 | 111 | })
|
| 112 | + |
| 113 | + it('does not send received message back to original sender', async () => { |
| 114 | + sinon.spy(floodsub, '_sendRpc') |
| 115 | + |
| 116 | + const sender = await PeerId.create() |
| 117 | + |
| 118 | + const peerStream = new PeerStreams({ |
| 119 | + id: sender, |
| 120 | + protocol: 'test' |
| 121 | + }) |
| 122 | + const rpc = { |
| 123 | + subscriptions: [], |
| 124 | + msgs: [{ |
| 125 | + receivedFrom: peerStream.id.toB58String(), |
| 126 | + data: message, |
| 127 | + topicIDs: [topic] |
| 128 | + }] |
| 129 | + } |
| 130 | + |
| 131 | + // otherPeer is subscribed to the topic |
| 132 | + floodsub.topics.set(topic, new Set([sender.toB58String()])) |
| 133 | + |
| 134 | + // receive the message |
| 135 | + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) |
| 136 | + |
| 137 | + // should not forward back to the sender |
| 138 | + expect(floodsub._sendRpc.called).to.be.false() |
| 139 | + }) |
110 | 140 | })
|
0 commit comments