Skip to content

Commit 93626cd

Browse files
committed
address review
1 parent 64364d1 commit 93626cd

File tree

7 files changed

+72
-23
lines changed

7 files changed

+72
-23
lines changed

Cargo.lock

Lines changed: 12 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ fnv = "1"
124124
fs2 = "0.4"
125125
futures = "0.3"
126126
hex = "0.4"
127+
hashlink = "0.9.0"
127128
hyper = "1"
128129
itertools = "0.10"
129130
lazy_static = "1"

beacon_node/gossipsub/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ tracing = "0.1.37"
3838
void = "1.0.2"
3939

4040
prometheus-client = "0.22.0"
41-
lru.workspace = true
41+
hashlink.workspace = true
4242

4343
[dev-dependencies]
4444
quickcheck = { workspace = true }

beacon_node/gossipsub/src/behaviour.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,16 @@ use std::{
2525
collections::{BTreeSet, HashMap},
2626
fmt,
2727
net::IpAddr,
28-
num::NonZeroUsize,
2928
task::{Context, Poll},
30-
time::Duration,
29+
time::{Duration, Instant},
3130
};
3231

3332
use futures::StreamExt;
3433
use futures_ticker::Ticker;
35-
use lru::LruCache;
34+
use hashlink::LinkedHashMap;
3635
use prometheus_client::registry::Registry;
3736
use rand::{seq::SliceRandom, thread_rng};
3837

39-
use instant::Instant;
4038
use libp2p::core::{multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr};
4139
use libp2p::identity::Keypair;
4240
use libp2p::identity::PeerId;
@@ -78,8 +76,11 @@ use std::{cmp::Ordering::Equal, fmt::Debug};
7876
#[cfg(test)]
7977
mod tests;
8078

81-
/// IDONTWANT Cache capacity.
82-
const IDONTWANT_CAP: usize = 100;
79+
/// IDONTWANT cache capacity.
80+
const IDONTWANT_CAP: usize = 10_000;
81+
82+
/// IDONTWANT timeout before removal.
83+
const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
8384

8485
/// Determines if published messages should be signed or not.
8586
///
@@ -1377,6 +1378,11 @@ where
13771378
"IWANT: Peer has asked for message too many times; ignoring request"
13781379
);
13791380
} else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
1381+
if peer.dont_send.get(&id).is_some() {
1382+
tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
1383+
continue;
1384+
}
1385+
13801386
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
13811387
if peer
13821388
.sender
@@ -2485,6 +2491,17 @@ where
24852491
}
24862492
self.failed_messages.shrink_to_fit();
24872493

2494+
// Clear stale IDONTWANTs.
2495+
for peer in self.connected_peers.values_mut() {
2496+
while let Some((_front, instant)) = peer.dont_send.front() {
2497+
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2498+
break;
2499+
} else {
2500+
peer.dont_send.pop_front();
2501+
}
2502+
}
2503+
}
2504+
24882505
tracing::debug!("Completed Heartbeat");
24892506
if let Some(metrics) = self.metrics.as_mut() {
24902507
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
@@ -2677,9 +2694,18 @@ where
26772694
return;
26782695
};
26792696

2680-
let recipient_peers = mesh_peers.iter().filter(|peer_id| {
2681-
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
2682-
});
2697+
let iwant_peers = self
2698+
.peer_score
2699+
.as_ref()
2700+
.map(|(_peer_score, .., gossip_promises)| gossip_promises.peers_for_message(msg_id))
2701+
.unwrap_or(vec![]);
2702+
2703+
let recipient_peers = mesh_peers
2704+
.iter()
2705+
.chain(iwant_peers.iter())
2706+
.filter(|peer_id| {
2707+
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
2708+
});
26832709

26842710
for peer_id in recipient_peers {
26852711
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
@@ -3121,7 +3147,7 @@ where
31213147
connections: vec![],
31223148
sender: RpcSender::new(self.config.connection_handler_queue_len()),
31233149
topics: Default::default(),
3124-
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
3150+
dont_send: LinkedHashMap::new(),
31253151
});
31263152
// Add the new connection
31273153
connected_peer.connections.push(connection_id);
@@ -3152,7 +3178,7 @@ where
31523178
connections: vec![],
31533179
sender: RpcSender::new(self.config.connection_handler_queue_len()),
31543180
topics: Default::default(),
3155-
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
3181+
dont_send: LinkedHashMap::new(),
31563182
});
31573183
// Add the new connection
31583184
connected_peer.connections.push(connection_id);
@@ -3319,7 +3345,11 @@ where
33193345
continue;
33203346
};
33213347
for message_id in message_ids {
3322-
peer.dont_send.push(message_id, ());
3348+
peer.dont_send.insert(message_id, Instant::now());
3349+
// Don't exceed capacity.
3350+
if peer.dont_send.len() > IDONTWANT_CAP {
3351+
peer.dont_send.pop_front();
3352+
}
33233353
}
33243354
}
33253355
}

beacon_node/gossipsub/src/gossip_promises.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ impl GossipPromises {
4141
self.promises.contains_key(message)
4242
}
4343

44+
///Get the peers we sent IWANT the input message id.
45+
pub(crate) fn peers_for_message(&self, message_id: &MessageId) -> Vec<PeerId> {
46+
self.promises
47+
.get(message_id)
48+
.map(|peers| peers.keys().copied().collect())
49+
.unwrap_or(vec![])
50+
}
51+
4452
/// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting.
4553
pub(crate) fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) {
4654
for message_id in messages {

beacon_node/gossipsub/src/protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
4343

4444
pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
4545
protocol: StreamProtocol::new("/meshsub/1.2.0"),
46-
kind: PeerKind::Gossipsubv1_1,
46+
kind: PeerKind::Gossipsubv1_2,
4747
};
4848
pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
4949
protocol: StreamProtocol::new("/meshsub/1.1.0"),

beacon_node/gossipsub/src/types.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,18 @@ use async_channel::{Receiver, Sender};
2525
use futures::stream::Peekable;
2626
use futures::{Future, Stream, StreamExt};
2727
use futures_timer::Delay;
28+
use hashlink::LinkedHashMap;
2829
use instant::Duration;
2930
use libp2p::identity::PeerId;
3031
use libp2p::swarm::ConnectionId;
31-
use lru::LruCache;
3232
use prometheus_client::encoding::EncodeLabelValue;
3333
use quick_protobuf::MessageWrite;
3434
use std::collections::BTreeSet;
3535
use std::fmt::Debug;
3636
use std::sync::atomic::{AtomicUsize, Ordering};
3737
use std::sync::Arc;
3838
use std::task::{Context, Poll};
39+
use std::time::Instant;
3940
use std::{fmt, pin::Pin};
4041

4142
use crate::rpc_proto::proto;
@@ -123,7 +124,7 @@ pub(crate) struct PeerConnections {
123124
/// Subscribed topics.
124125
pub(crate) topics: BTreeSet<TopicHash>,
125126
/// Don't send messages.
126-
pub(crate) dont_send: LruCache<MessageId, ()>,
127+
pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
127128
}
128129

129130
/// Describes the types of peers that can exist in the gossipsub context.
@@ -300,10 +301,10 @@ pub struct Prune {
300301
pub(crate) backoff: Option<u64>,
301302
}
302303

303-
/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
304+
/// The node requests us to not forward message ids - IDontWant control message.
304305
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
305306
pub struct IDontWant {
306-
/// A list of known message ids (peer_id + sequence _number) as a string.
307+
/// A list of known message ids.
307308
pub(crate) message_ids: Vec<MessageId>,
308309
}
309310

@@ -568,10 +569,10 @@ impl From<Rpc> for proto::RPC {
568569
control.prune.push(rpc_prune);
569570
}
570571
ControlAction::IDontWant(IDontWant { message_ids }) => {
571-
let rpc_iwant = proto::ControlIDontWant {
572+
let rpc_idontwant = proto::ControlIDontWant {
572573
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
573574
};
574-
control.idontwant.push(rpc_iwant);
575+
control.idontwant.push(rpc_idontwant);
575576
}
576577
}
577578
}

0 commit comments

Comments
 (0)