1
1
import Pubsub , { InMessage , utils } from 'libp2p-interfaces/src/pubsub'
2
2
import { MessageCache } from './message-cache'
3
- import {
4
- RPCCodec ,
5
- RPC , Message ,
6
- ControlMessage , ControlIHave , ControlGraft , ControlIWant , ControlPrune , PeerInfo
7
- } from './message'
3
+ import { RPC , IRPC } from './message/rpc'
8
4
import * as constants from './constants'
9
5
import { Heartbeat } from './heartbeat'
10
6
import { getGossipPeers } from './get-gossip-peers'
@@ -76,8 +72,8 @@ class Gossipsub extends Pubsub {
76
72
mesh : Map < string , Set < string > >
77
73
fanout : Map < string , Set < string > >
78
74
lastpub : Map < string , number >
79
- gossip : Map < string , ControlIHave [ ] >
80
- control : Map < string , ControlMessage >
75
+ gossip : Map < string , RPC . IControlIHave [ ] >
76
+ control : Map < string , RPC . IControlMessage >
81
77
peerhave :Map < string , number >
82
78
iasked :Map < string , number >
83
79
backoff : Map < string , Map < string , number > >
@@ -96,7 +92,7 @@ class Gossipsub extends Pubsub {
96
92
_libp2p : Libp2p
97
93
_options : GossipOptions
98
94
_directPeerInitial : NodeJS . Timeout
99
- log : Debugger
95
+ log : Debugger & { err : Debugger }
100
96
// eslint-disable-next-line @typescript-eslint/ban-types
101
97
emit : ( event : string | symbol , ...args : any [ ] ) => boolean
102
98
@@ -201,15 +197,15 @@ class Gossipsub extends Pubsub {
201
197
* Map of pending messages to gossip
202
198
* peer id => control messages
203
199
*
204
- * @type {Map<string, Array<ControlIHave object>> }
200
+ * @type {Map<string, Array<RPC.IControlIHave object>> }
205
201
*/
206
202
this . gossip = new Map ( )
207
203
208
204
/**
209
205
* Map of control messages
210
206
* peer id => control message
211
207
*
212
- * @type {Map<string, ControlMessage object> }
208
+ * @type {Map<string, RPC.IControlMessage object> }
213
209
*/
214
210
this . control = new Map ( )
215
211
@@ -279,7 +275,7 @@ class Gossipsub extends Pubsub {
279
275
* @returns {RPC }
280
276
*/
281
277
_decodeRpc ( bytes : Uint8Array ) {
282
- return RPCCodec . decode ( bytes )
278
+ return RPC . decode ( bytes )
283
279
}
284
280
285
281
/**
@@ -290,7 +286,7 @@ class Gossipsub extends Pubsub {
290
286
* @returns {Uint8Array }
291
287
*/
292
288
_encodeRpc ( rpc : RPC ) {
293
- return RPCCodec . encode ( rpc )
289
+ return RPC . encode ( rpc ) . finish ( )
294
290
}
295
291
296
292
/**
@@ -378,18 +374,18 @@ class Gossipsub extends Pubsub {
378
374
/**
379
375
* Handles an rpc control message from a peer
380
376
* @param {string } id peer id
381
- * @param {ControlMessage } controlMsg
377
+ * @param {RPC.IControlMessage } controlMsg
382
378
* @returns {void }
383
379
*/
384
- _processRpcControlMessage ( id : string , controlMsg : ControlMessage ) : void {
380
+ _processRpcControlMessage ( id : string , controlMsg : RPC . IControlMessage ) : void {
385
381
if ( ! controlMsg ) {
386
382
return
387
383
}
388
384
389
- const iwant = this . _handleIHave ( id , controlMsg . ihave )
390
- const ihave = this . _handleIWant ( id , controlMsg . iwant )
391
- const prune = this . _handleGraft ( id , controlMsg . graft )
392
- this . _handlePrune ( id , controlMsg . prune )
385
+ const iwant = controlMsg . ihave ? this . _handleIHave ( id , controlMsg . ihave ) : [ ]
386
+ const ihave = controlMsg . iwant ? this . _handleIWant ( id , controlMsg . iwant ) : [ ]
387
+ const prune = controlMsg . graft ? this . _handleGraft ( id , controlMsg . graft ) : [ ]
388
+ controlMsg . prune && this . _handlePrune ( id , controlMsg . prune )
393
389
394
390
if ( ! iwant . length && ! ihave . length && ! prune . length ) {
395
391
return
@@ -450,10 +446,10 @@ class Gossipsub extends Pubsub {
450
446
/**
451
447
* Handles IHAVE messages
452
448
* @param {string } id peer id
453
- * @param {Array<ControlIHave > } ihave
454
- * @returns {ControlIWant }
449
+ * @param {Array<RPC.IControlIHave > } ihave
450
+ * @returns {RPC.IControlIWant }
455
451
*/
456
- _handleIHave ( id : string , ihave : ControlIHave [ ] ) : ControlIWant [ ] {
452
+ _handleIHave ( id : string , ihave : RPC . IControlIHave [ ] ) : RPC . IControlIWant [ ] {
457
453
if ( ! ihave . length ) {
458
454
return [ ]
459
455
}
@@ -491,7 +487,7 @@ class Gossipsub extends Pubsub {
491
487
const iwant = new Map < string , Uint8Array > ( )
492
488
493
489
ihave . forEach ( ( { topicID, messageIDs } ) => {
494
- if ( ! topicID || ! this . mesh . has ( topicID ) ) {
490
+ if ( ! topicID || ! messageIDs || ! this . mesh . has ( topicID ) ) {
495
491
return
496
492
}
497
493
@@ -537,10 +533,10 @@ class Gossipsub extends Pubsub {
537
533
* Handles IWANT messages
538
534
* Returns messages to send back to peer
539
535
* @param {string } id peer id
540
- * @param {Array<ControlIWant > } iwant
541
- * @returns {Array<Message > }
536
+ * @param {Array<RPC.IControlIWant > } iwant
537
+ * @returns {Array<RPC.IMessage > }
542
538
*/
543
- _handleIWant ( id : string , iwant : ControlIWant [ ] ) : Message [ ] {
539
+ _handleIWant ( id : string , iwant : RPC . IControlIWant [ ] ) : RPC . IMessage [ ] {
544
540
if ( ! iwant . length ) {
545
541
return [ ]
546
542
}
@@ -557,7 +553,7 @@ class Gossipsub extends Pubsub {
557
553
const ihave = new Map < string , InMessage > ( )
558
554
559
555
iwant . forEach ( ( { messageIDs } ) => {
560
- messageIDs . forEach ( ( msgID ) => {
556
+ messageIDs && messageIDs . forEach ( ( msgID ) => {
561
557
const [ msg , count ] = this . messageCache . getForPeer ( msgID , id )
562
558
if ( ! msg ) {
563
559
return
@@ -586,10 +582,10 @@ class Gossipsub extends Pubsub {
586
582
/**
587
583
* Handles Graft messages
588
584
* @param {string } id peer id
589
- * @param {Array<ControlGraft > } graft
590
- * @return {Array<ControlPrune > }
585
+ * @param {Array<RPC.IControlGraft > } graft
586
+ * @return {Array<RPC.IControlPrune > }
591
587
*/
592
- _handleGraft ( id : string , graft : ControlGraft [ ] ) : ControlPrune [ ] {
588
+ _handleGraft ( id : string , graft : RPC . IControlGraft [ ] ) : RPC . IControlPrune [ ] {
593
589
const prune : string [ ] = [ ]
594
590
const score = this . score . score ( id )
595
591
const now = this . _now ( )
@@ -682,10 +678,10 @@ class Gossipsub extends Pubsub {
682
678
/**
683
679
* Handles Prune messages
684
680
* @param {string } id peer id
685
- * @param {Array<ControlPrune > } prune
681
+ * @param {Array<RPC.IControlPrune > } prune
686
682
* @returns {void }
687
683
*/
688
- _handlePrune ( id : string , prune : ControlPrune [ ] ) : void {
684
+ _handlePrune ( id : string , prune : RPC . IControlPrune [ ] ) : void {
689
685
const score = this . score . score ( id )
690
686
prune . forEach ( ( { topicID, backoff, peers } ) => {
691
687
if ( ! topicID ) {
@@ -811,10 +807,10 @@ class Gossipsub extends Pubsub {
811
807
812
808
/**
813
809
* Maybe attempt connection given signed peer records
814
- * @param {PeerInfo [] } peers
810
+ * @param {RPC.IPeerInfo [] } peers
815
811
* @returns {Promise<void> }
816
812
*/
817
- async _pxConnect ( peers : PeerInfo [ ] ) : Promise < void > {
813
+ async _pxConnect ( peers : RPC . IPeerInfo [ ] ) : Promise < void > {
818
814
if ( peers . length > constants . GossipsubPrunePeers ) {
819
815
shuffle ( peers )
820
816
peers = peers . slice ( 0 , constants . GossipsubPrunePeers )
@@ -1137,7 +1133,7 @@ class Gossipsub extends Pubsub {
1137
1133
/**
1138
1134
* @override
1139
1135
*/
1140
- _sendRpc ( id : string , outRpc : RPC ) : void {
1136
+ _sendRpc ( id : string , outRpc : IRPC ) : void {
1141
1137
const peerStreams = this . peers . get ( id )
1142
1138
if ( ! peerStreams || ! peerStreams . isWritable ) {
1143
1139
return
@@ -1157,10 +1153,10 @@ class Gossipsub extends Pubsub {
1157
1153
this . gossip . delete ( id )
1158
1154
}
1159
1155
1160
- peerStreams . write ( RPCCodec . encode ( outRpc ) )
1156
+ peerStreams . write ( RPC . encode ( outRpc ) . finish ( ) )
1161
1157
}
1162
1158
1163
- _piggybackControl ( id : string , outRpc : RPC , ctrl : ControlMessage ) : void {
1159
+ _piggybackControl ( id : string , outRpc : IRPC , ctrl : RPC . IControlMessage ) : void {
1164
1160
const tograft = ( ctrl . graft || [ ] )
1165
1161
. filter ( ( { topicID } ) => ( topicID && this . mesh . get ( topicID ) || new Set ( ) ) . has ( id ) )
1166
1162
const toprune = ( ctrl . prune || [ ] )
@@ -1171,14 +1167,14 @@ class Gossipsub extends Pubsub {
1171
1167
}
1172
1168
1173
1169
if ( outRpc . control ) {
1174
- outRpc . control . graft = outRpc . control . graft . concat ( tograft )
1175
- outRpc . control . prune = outRpc . control . prune . concat ( toprune )
1170
+ outRpc . control . graft = outRpc . control . graft && outRpc . control . graft . concat ( tograft )
1171
+ outRpc . control . prune = outRpc . control . prune && outRpc . control . prune . concat ( toprune )
1176
1172
} else {
1177
1173
outRpc . control = { ihave : [ ] , iwant : [ ] , graft : tograft , prune : toprune }
1178
1174
}
1179
1175
}
1180
1176
1181
- _piggybackGossip ( id : string , outRpc : RPC , ihave : ControlIHave [ ] ) : void {
1177
+ _piggybackGossip ( id : string , outRpc : IRPC , ihave : RPC . IControlIHave [ ] ) : void {
1182
1178
if ( ! outRpc . control ) {
1183
1179
outRpc . control = { ihave : [ ] , iwant : [ ] , graft : [ ] , prune : [ ] }
1184
1180
}
@@ -1194,7 +1190,7 @@ class Gossipsub extends Pubsub {
1194
1190
const doPX = this . _options . doPX
1195
1191
for ( const [ id , topics ] of tograft ) {
1196
1192
const graft = topics . map ( ( topicID ) => ( { topicID } ) )
1197
- let prune : ControlPrune [ ] = [ ]
1193
+ let prune : RPC . IControlPrune [ ] = [ ]
1198
1194
// If a peer also has prunes, process them now
1199
1195
const pruning = toprune . get ( id )
1200
1196
if ( pruning ) {
@@ -1305,10 +1301,10 @@ class Gossipsub extends Pubsub {
1305
1301
/**
1306
1302
* Adds new IHAVE messages to pending gossip
1307
1303
* @param {PeerStreams } peerStreams
1308
- * @param {Array<ControlIHave > } controlIHaveMsgs
1304
+ * @param {Array<RPC.IControlIHave > } controlIHaveMsgs
1309
1305
* @returns {void }
1310
1306
*/
1311
- _pushGossip ( id : string , controlIHaveMsgs : ControlIHave ) : void {
1307
+ _pushGossip ( id : string , controlIHaveMsgs : RPC . IControlIHave ) : void {
1312
1308
this . log ( 'Add gossip to %s' , id )
1313
1309
const gossip = this . gossip . get ( id ) || [ ]
1314
1310
this . gossip . set ( id , gossip . concat ( controlIHaveMsgs ) )
@@ -1327,9 +1323,9 @@ class Gossipsub extends Pubsub {
1327
1323
* @param {string } id
1328
1324
* @param {string } topic
1329
1325
* @param {boolean } doPX
1330
- * @returns {ControlPrune }
1326
+ * @returns {RPC.IControlPrune }
1331
1327
*/
1332
- _makePrune ( id : string , topic : string , doPX : boolean ) : ControlPrune {
1328
+ _makePrune ( id : string , topic : string , doPX : boolean ) : RPC . IControlPrune {
1333
1329
if ( this . peers . get ( id ) ! . protocol === constants . GossipsubIDv10 ) {
1334
1330
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
1335
1331
return {
@@ -1340,7 +1336,7 @@ class Gossipsub extends Pubsub {
1340
1336
// backoff is measured in seconds
1341
1337
// GossipsubPruneBackoff is measured in milliseconds
1342
1338
const backoff = constants . GossipsubPruneBackoff / 1000
1343
- const px : PeerInfo [ ] = [ ]
1339
+ const px : RPC . IPeerInfo [ ] = [ ]
1344
1340
if ( doPX ) {
1345
1341
// select peers for Peer eXchange
1346
1342
const peers = getGossipPeers ( this , topic , constants . GossipsubPrunePeers , ( xid : string ) : boolean => {
0 commit comments