@@ -25,18 +25,16 @@ use std::{
2525 collections:: { BTreeSet , HashMap } ,
2626 fmt,
2727 net:: IpAddr ,
28- num:: NonZeroUsize ,
2928 task:: { Context , Poll } ,
30- time:: Duration ,
29+ time:: { Duration , Instant } ,
3130} ;
3231
3332use futures:: StreamExt ;
3433use futures_ticker:: Ticker ;
35- use lru :: LruCache ;
34+ use hashlink :: LinkedHashMap ;
3635use prometheus_client:: registry:: Registry ;
3736use rand:: { seq:: SliceRandom , thread_rng} ;
3837
39- use instant:: Instant ;
4038use libp2p:: core:: { multiaddr:: Protocol :: Ip4 , multiaddr:: Protocol :: Ip6 , Endpoint , Multiaddr } ;
4139use libp2p:: identity:: Keypair ;
4240use libp2p:: identity:: PeerId ;
@@ -78,8 +76,11 @@ use std::{cmp::Ordering::Equal, fmt::Debug};
7876#[ cfg( test) ]
7977mod tests;
8078
81- /// IDONTWANT Cache capacity.
82- const IDONTWANT_CAP : usize = 100 ;
79+ /// IDONTWANT cache capacity.
80+ const IDONTWANT_CAP : usize = 10_000 ;
81+
82+ /// IDONTWANT timeout before removal.
83+ const IDONTWANT_TIMEOUT : Duration = Duration :: new ( 3 , 0 ) ;
8384
8485/// Determines if published messages should be signed or not.
8586///
@@ -1377,6 +1378,11 @@ where
13771378 "IWANT: Peer has asked for message too many times; ignoring request"
13781379 ) ;
13791380 } else if let Some ( peer) = & mut self . connected_peers . get_mut ( peer_id) {
1381+ if peer. dont_send . get ( & id) . is_some ( ) {
1382+ tracing:: debug!( %peer_id, message=%id, "Peer already sent IDONTWANT for this message" ) ;
1383+ continue ;
1384+ }
1385+
13801386 tracing:: debug!( peer=%peer_id, "IWANT: Sending cached messages to peer" ) ;
13811387 if peer
13821388 . sender
@@ -1970,7 +1976,10 @@ where
19701976 }
19711977 // if the mesh needs peers add the peer to the mesh
19721978 if !self . explicit_peers . contains ( propagation_source)
1973- && matches ! ( peer. kind, PeerKind :: Gossipsubv1_1 | PeerKind :: Gossipsub )
1979+ && matches ! (
1980+ peer. kind,
1981+ PeerKind :: Gossipsubv1_1 | PeerKind :: Gossipsub | PeerKind :: Gossipsubv1_2
1982+ )
19741983 && !Self :: score_below_threshold_from_scores (
19751984 & self . peer_score ,
19761985 propagation_source,
@@ -2485,6 +2494,17 @@ where
24852494 }
24862495 self . failed_messages . shrink_to_fit ( ) ;
24872496
2497+ // Clear stale IDONTWANTs.
2498+ for peer in self . connected_peers . values_mut ( ) {
2499+ while let Some ( ( _front, instant) ) = peer. dont_send . front ( ) {
2500+ if ( * instant + IDONTWANT_TIMEOUT ) >= Instant :: now ( ) {
2501+ break ;
2502+ } else {
2503+ peer. dont_send . pop_front ( ) ;
2504+ }
2505+ }
2506+ }
2507+
24882508 tracing:: debug!( "Completed Heartbeat" ) ;
24892509 if let Some ( metrics) = self . metrics . as_mut ( ) {
24902510 let duration = u64:: try_from ( start. elapsed ( ) . as_millis ( ) ) . unwrap_or ( u64:: MAX ) ;
@@ -2677,9 +2697,18 @@ where
26772697 return ;
26782698 } ;
26792699
2680- let recipient_peers = mesh_peers. iter ( ) . filter ( |peer_id| {
2681- * peer_id != propagation_source && Some ( * peer_id) != message. source . as_ref ( )
2682- } ) ;
2700+ let iwant_peers = self
2701+ . peer_score
2702+ . as_ref ( )
2703+ . map ( |( _peer_score, .., gossip_promises) | gossip_promises. peers_for_message ( msg_id) )
2704+ . unwrap_or ( vec ! [ ] ) ;
2705+
2706+ let recipient_peers = mesh_peers
2707+ . iter ( )
2708+ . chain ( iwant_peers. iter ( ) )
2709+ . filter ( |peer_id| {
2710+ * peer_id != propagation_source && Some ( * peer_id) != message. source . as_ref ( )
2711+ } ) ;
26832712
26842713 for peer_id in recipient_peers {
26852714 let Some ( peer) = self . connected_peers . get_mut ( peer_id) else {
@@ -2689,7 +2718,7 @@ where
26892718 } ;
26902719
26912720 // Only gossipsub 1.2 peers support IDONTWANT.
2692- if peer. kind = = PeerKind :: Gossipsubv1_2 {
2721+ if peer. kind ! = PeerKind :: Gossipsubv1_2 {
26932722 continue ;
26942723 }
26952724
@@ -3121,7 +3150,7 @@ where
31213150 connections : vec ! [ ] ,
31223151 sender : RpcSender :: new ( self . config . connection_handler_queue_len ( ) ) ,
31233152 topics : Default :: default ( ) ,
3124- dont_send : LruCache :: new ( NonZeroUsize :: new ( IDONTWANT_CAP ) . unwrap ( ) ) ,
3153+ dont_send : LinkedHashMap :: new ( ) ,
31253154 } ) ;
31263155 // Add the new connection
31273156 connected_peer. connections . push ( connection_id) ;
@@ -3152,7 +3181,7 @@ where
31523181 connections : vec ! [ ] ,
31533182 sender : RpcSender :: new ( self . config . connection_handler_queue_len ( ) ) ,
31543183 topics : Default :: default ( ) ,
3155- dont_send : LruCache :: new ( NonZeroUsize :: new ( IDONTWANT_CAP ) . unwrap ( ) ) ,
3184+ dont_send : LinkedHashMap :: new ( ) ,
31563185 } ) ;
31573186 // Add the new connection
31583187 connected_peer. connections . push ( connection_id) ;
@@ -3319,7 +3348,11 @@ where
33193348 continue ;
33203349 } ;
33213350 for message_id in message_ids {
3322- peer. dont_send . push ( message_id, ( ) ) ;
3351+ peer. dont_send . insert ( message_id, Instant :: now ( ) ) ;
3352+ // Don't exceed capacity.
3353+ if peer. dont_send . len ( ) > IDONTWANT_CAP {
3354+ peer. dont_send . pop_front ( ) ;
3355+ }
33233356 }
33243357 }
33253358 }
@@ -3472,7 +3505,11 @@ fn get_random_peers_dynamic(
34723505 . iter ( )
34733506 . filter ( |( _, p) | p. topics . contains ( topic_hash) )
34743507 . filter ( |( peer_id, _) | f ( peer_id) )
3475- . filter ( |( _, p) | p. kind == PeerKind :: Gossipsub || p. kind == PeerKind :: Gossipsubv1_1 )
3508+ . filter ( |( _, p) | {
3509+ p. kind == PeerKind :: Gossipsub
3510+ || p. kind == PeerKind :: Gossipsubv1_1
3511+ || p. kind == PeerKind :: Gossipsubv1_2
3512+ } )
34763513 . map ( |( peer_id, _) | * peer_id)
34773514 . collect :: < Vec < PeerId > > ( ) ;
34783515
0 commit comments