Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions p2p/kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ async def wait_ping(self, remote: Node, cancel_token: CancelToken) -> bool:
self.logger.debug('got expected ping from %s', remote)
except TimeoutError:
self.logger.debug('timed out waiting for ping from %s', remote)
# TODO: Use a contextmanager to ensure we always delete the callback from the list.
del self.ping_callbacks[remote]
finally:
del self.ping_callbacks[remote]
return got_ping

async def wait_pong(self, remote: Node, token: bytes, cancel_token: CancelToken) -> bool:
Expand All @@ -445,11 +445,11 @@ async def wait_pong(self, remote: Node, token: bytes, cancel_token: CancelToken)
except TimeoutError:
self.logger.debug(
'timed out waiting for pong from %s (token == %s)', remote, encode_hex(token))
# TODO: Use a contextmanager to ensure we always delete the callback from the list.
del self.pong_callbacks[pingid]
finally:
del self.pong_callbacks[pingid]
return got_pong

async def wait_neighbours(self, remote: Node, cancel_token: CancelToken) -> List[Node]:
async def wait_neighbours(self, remote: Node, cancel_token: CancelToken) -> Tuple[Node, ...]:
"""Wait for a neihgbours packet from the given node.

Returns the list of neighbours received.
Expand Down Expand Up @@ -479,7 +479,7 @@ def process(response):

# TODO: Use a contextmanager to ensure we always delete the callback from the list.
del self.neighbours_callbacks[remote]
return [n for n in neighbours if n != self.this_node]
return tuple(n for n in neighbours if n != self.this_node)

def ping(self, node: Node) -> bytes:
if node == self.this_node:
Expand Down Expand Up @@ -516,7 +516,12 @@ async def bond(self, node: Node, cancel_token: CancelToken) -> bool:
return True

async def bootstrap(self, bootstrap_nodes: List[Node], cancel_token: CancelToken) -> None:
bonded = await asyncio.gather(*[self.bond(n, cancel_token) for n in bootstrap_nodes])
bonded = await asyncio.gather(*(
self.bond(n, cancel_token)
for n
in bootstrap_nodes
if (n not in self.ping_callbacks and n not in self.pong_callbacks)
))
if not any(bonded):
self.logger.info("Failed to bond with bootstrap nodes %s", bootstrap_nodes)
return
Expand All @@ -539,15 +544,19 @@ async def _find_node(node_id, remote):
candidates = await self.wait_neighbours(remote, cancel_token)
if not candidates:
self.logger.debug("got no candidates from %s, returning", remote)
return candidates
candidates = [c for c in candidates if c not in nodes_seen]
return tuple()
all_candidates = tuple(c for c in candidates if c not in nodes_seen)
candidates = tuple(
c for c in all_candidates
if (c not in self.ping_callbacks and c not in self.pong_callbacks)
)
self.logger.debug("got %s new candidates", len(candidates))
# Add new candidates to nodes_seen so that we don't attempt to bond with failing ones
# in the future.
nodes_seen.update(candidates)
bonded = await asyncio.gather(*[self.bond(c, cancel_token) for c in candidates])
bonded = await asyncio.gather(*(self.bond(c, cancel_token) for c in candidates))
self.logger.debug("bonded with %s candidates", bonded.count(True))
return [c for c in candidates if bonded[candidates.index(c)]]
return tuple(c for c in candidates if bonded[candidates.index(c)])

def _exclude_if_asked(nodes):
nodes_to_ask = list(set(nodes).difference(nodes_asked))
Expand All @@ -559,8 +568,7 @@ def _exclude_if_asked(nodes):
while nodes_to_ask:
self.logger.debug("node lookup; querying %s", nodes_to_ask)
nodes_asked.update(nodes_to_ask)
results = await asyncio.gather(
*[_find_node(node_id, n) for n in nodes_to_ask])
results = await asyncio.gather(*(_find_node(node_id, n) for n in nodes_to_ask))
for candidates in results:
closest.extend(candidates)
closest = sort_by_distance(closest, node_id)[:k_bucket_size]
Expand Down
4 changes: 2 additions & 2 deletions tests/p2p/test_kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def test_wait_neighbours(cancel_token):

# Schedule a call to proto.recv_neighbours() simulating a neighbours response from the node we
# expect.
neighbours = [random_node(), random_node(), random_node()]
neighbours = (random_node(), random_node(), random_node())
recv_neighbours_coroutine = asyncio.coroutine(lambda: proto.recv_neighbours(node, neighbours))
asyncio.ensure_future(recv_neighbours_coroutine())

Expand All @@ -125,7 +125,7 @@ async def test_wait_neighbours(cancel_token):
# If wait_neighbours() times out, we get an empty list of neighbours.
received_neighbours = await proto.wait_neighbours(node, cancel_token)

assert received_neighbours == []
assert received_neighbours == tuple()
assert node not in proto.neighbours_callbacks


Expand Down