diff --git a/p2p/kademlia.py b/p2p/kademlia.py index 104c19883f..2cbda9761d 100644 --- a/p2p/kademlia.py +++ b/p2p/kademlia.py @@ -500,10 +500,7 @@ async def bond(self, node: Node, cancel_token: CancelToken) -> bool: got_pong = await self.wait_pong(node, token, cancel_token) if not got_pong: self.logger.debug("bonding failed, didn't receive pong from %s", node) - # Drop the failing node and schedule a populate_not_full_buckets() call to try and - # fill its spot. self.routing.remove_node(node) - asyncio.ensure_future(self.populate_not_full_buckets()) return False # Give the remote node a chance to ping us before we move on and start sending find_node diff --git a/p2p/peer.py b/p2p/peer.py index 02321fe542..6734d776cd 100644 --- a/p2p/peer.py +++ b/p2p/peer.py @@ -647,8 +647,9 @@ class PeerPool(BaseService): """PeerPool attempts to keep connections to at least min_peers on the given network.""" logger = logging.getLogger("p2p.peer.PeerPool") _connect_loop_sleep = 2 - _last_lookup: float = 0 - _lookup_interval: int = 20 + _discovery_lookup_running = asyncio.Lock() + _discovery_last_lookup: float = 0 + _discovery_lookup_interval: int = 30 def __init__(self, peer_class: Type[BasePeer], @@ -738,14 +739,21 @@ async def connect(self, remote: Node) -> BasePeer: self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote) return None - async def lookup_random_node(self) -> None: - # This method runs in the background, so we must catch OperationCancelled here otherwise - # asyncio will warn that its exception was never retrieved. - try: - await self.discovery.lookup_random(self.cancel_token) - except OperationCancelled: - pass - self._last_lookup = time.time() + async def maybe_lookup_random_node(self) -> None: + if self._discovery_last_lookup + self._discovery_lookup_interval > time.time(): + return + elif self._discovery_lookup_running.locked(): + self.logger.debug("Node discovery lookup already in progress, not running another") + return + async with self._discovery_lookup_running: + # This method runs in the background, so we must catch OperationCancelled here + # otherwise asyncio will warn that its exception was never retrieved. + try: + await self.discovery.lookup_random(self.cancel_token) + except OperationCancelled: + pass + finally: + self._discovery_last_lookup = time.time() async def maybe_connect_to_more_peers(self) -> None: """Connect to more peers if we're not yet connected to at least self.min_peers.""" @@ -756,9 +764,7 @@ async def maybe_connect_to_more_peers(self) -> None: [remote for remote in self.connected_nodes]) return - if self._last_lookup + self._lookup_interval < time.time(): - self.logger.debug("Last node discovery lookup too long ago, triggering another") - asyncio.ensure_future(self.lookup_random_node()) + asyncio.ensure_future(self.maybe_lookup_random_node()) await self._connect_to_nodes(self.get_nodes_to_connect()) @@ -863,7 +869,7 @@ def _get_random_bootnode(self) -> Generator[Node, None, None]: else: self.logger.warning('No bootnodes available') - async def lookup_random_node(self) -> None: + async def maybe_lookup_random_node(self) -> None: # Do nothing as we don't have a DiscoveryProtocol pass