Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 42 additions & 50 deletions p2p/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
from p2p import les
from p2p.cancellable import CancellableMixin
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
from p2p.exceptions import NoEligiblePeers
from p2p.exceptions import NoEligiblePeers, ValidationError
from p2p.p2p_proto import DisconnectReason
from p2p.peer import BasePeer, ETHPeer, LESPeer, HeaderRequest, PeerPool, PeerSubscriber
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
from p2p.rlp import BlockBody
from p2p.service import BaseService
from p2p.utils import (
Expand Down Expand Up @@ -91,7 +91,6 @@ def __init__(self,
self._syncing = False
self._sync_complete = asyncio.Event()
self._sync_requests: asyncio.Queue[HeaderRequestingPeer] = asyncio.Queue()
self._new_headers: asyncio.Queue[Tuple[BlockHeader, ...]] = asyncio.Queue()
self._executor = get_asyncio_executor()

@property
Expand Down Expand Up @@ -207,7 +206,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer)
await peer.disconnect(DisconnectReason.timeout)
break
except ValueError as err:
except ValidationError as err:
self.logger.warn(
"Invalid header response sent by peer %s disconnecting: %s",
peer, err,
Expand Down Expand Up @@ -253,47 +252,37 @@ async def _fetch_missing_headers(
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""
self.logger.debug("Fetching chain segment starting at #%d", start_at)
request = peer.request_block_headers(

headers = await peer.get_block_headers(
start_at,
peer.max_headers_fetch,
skip=0,
reverse=False,
)

# Pass the peer's token to self.wait() because we want to abort if either we
# or the peer terminates.
headers = tuple(await self.wait(
self._new_headers.get(),
token=peer.cancel_token,
timeout=self._reply_timeout))

# check that the response headers are a valid match for our
# requested headers.
request.validate_headers(headers)

# the inner list comprehension is required to get python to evaluate
# the asynchronous comprehension
missing_headers = tuple([
header
for header
in headers
if not (await self.wait(self.db.coro_header_exists(header.hash)))
])
if len(missing_headers) != len(headers):
self.logger.debug(
"Discarding %d / %d headers that we already have",
len(headers) - len(missing_headers),
len(headers),
)
return headers

def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
if not headers:
self.logger.warn("Got an empty BlockHeaders msg")
return
self.logger.debug(
"Got BlockHeaders from %d to %d", headers[0].block_number, headers[-1].block_number)
self._new_headers.put_nowait(headers)
# We only want headers that are missing, so we iterate over the list
# until we find the first missing header, after which we return all of
# the remaining headers.
async def get_missing_tail(self: 'BaseHeaderChainSyncer',
headers: Tuple[BlockHeader, ...]
) -> AsyncGenerator[BlockHeader, None]:
iter_headers = iter(headers)
for header in iter_headers:
is_missing = not await self.wait(self.db.coro_header_exists(header.hash))
if is_missing:
yield header
break
else:
self.logger.debug("Discarding header that we already have: %s", header)

for header in iter_headers:
yield header

# The inner list comprehension is needed because async_generators
# cannot be cast to a tuple.
tail_headers = tuple([header async for header in get_missing_tail(self, headers)])

return tail_headers

@abstractmethod
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
Expand All @@ -313,26 +302,27 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
msg: protocol._DecodedMsgType) -> None:
if isinstance(cmd, les.Announce):
self._sync_requests.put_nowait(peer)
elif isinstance(cmd, les.BlockHeaders):
msg = cast(Dict[str, Any], msg)
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg['headers'])))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth leaving a comment here that BlockHeaders messages are not handled here because they're handled in a Peer method? Otherwise someone reading this code might have a hard time figuring out why those messages are not handled like the others

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we should probably leave the elif block here otherwise it will be handled in the else block, which logs a debug msg and may be confusing to someone reading the logs

elif isinstance(cmd, les.GetBlockHeaders):
msg = cast(Dict[str, Any], msg)
await self._handle_get_block_headers(cast(LESPeer, peer), msg)
elif isinstance(cmd, les.BlockHeaders):
# `BlockHeaders` messages are handled at the peer level.
pass
else:
self.logger.debug("Ignoring %s message from %s", cmd, peer)

async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
self.logger.debug("Peer %s made header request: %s", peer, msg)
request = HeaderRequest(
request = les.HeaderRequest(
msg['query'].block_number_or_hash,
msg['query'].max_headers,
msg['query'].skip,
msg['query'].reverse,
msg['request_id'],
)
headers = await self._handler.lookup_headers(request)
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=request.request_id)

async def _process_headers(
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
Expand Down Expand Up @@ -538,16 +528,17 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int:
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
msg: protocol._DecodedMsgType) -> None:
peer = cast(ETHPeer, peer)
if isinstance(cmd, eth.BlockHeaders):
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg)))
elif isinstance(cmd, eth.BlockBodies):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

if isinstance(cmd, eth.BlockBodies):
await self._handle_block_bodies(peer, list(cast(Tuple[BlockBody], msg)))
elif isinstance(cmd, eth.Receipts):
await self._handle_block_receipts(peer, cast(List[List[Receipt]], msg))
elif isinstance(cmd, eth.NewBlock):
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
elif isinstance(cmd, eth.GetBlockHeaders):
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
elif isinstance(cmd, eth.BlockHeaders):
# `BlockHeaders` messages are handled at the peer level.
pass
elif isinstance(cmd, eth.GetBlockBodies):
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
Expand Down Expand Up @@ -613,7 +604,7 @@ async def _handle_get_block_headers(
peer: ETHPeer,
query: Dict[str, Any]) -> None:
self.logger.debug("Peer %s made header request: %s", peer, query)
request = HeaderRequest(
request = eth.HeaderRequest(
query['block_number_or_hash'],
query['max_headers'],
query['skip'],
Expand Down Expand Up @@ -732,7 +723,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
peer.sub_proto.send_node_data(nodes)

async def lookup_headers(self,
request: HeaderRequest) -> Tuple[BlockHeader, ...]:
request: protocol.BaseHeaderRequest) -> Tuple[BlockHeader, ...]:
"""
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
between each, in reverse order if :reverse: is True.
Expand All @@ -753,7 +744,8 @@ async def lookup_headers(self,
return headers

async def _get_block_numbers_for_request(self,
request: HeaderRequest) -> Tuple[BlockNumber, ...]:
request: protocol.BaseHeaderRequest
) -> Tuple[BlockNumber, ...]:
"""
Generate the block numbers for a given `HeaderRequest`.
"""
Expand Down
28 changes: 28 additions & 0 deletions p2p/eth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import (
Any,
cast,
List,
Tuple,
Expand All @@ -16,8 +17,10 @@
from eth.rlp.receipts import Receipt
from eth.rlp.transactions import BaseTransactionFields

from p2p.exceptions import ValidationError
from p2p.protocol import (
BaseBlockHeaders,
BaseHeaderRequest,
Command,
Protocol,
_DecodedMsgType,
Expand Down Expand Up @@ -70,6 +73,31 @@ class GetBlockHeaders(Command):
]


class HeaderRequest(BaseHeaderRequest):
max_size = MAX_HEADERS_FETCH

def __init__(self,
block_number_or_hash: BlockIdentifier,
max_headers: int,
skip: int,
reverse: bool) -> None:
self.block_number_or_hash = block_number_or_hash
self.max_headers = max_headers
self.skip = skip
self.reverse = reverse

def validate_response(self, response: Any) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use Any here, even though you raise a ValidationError if it's not a tuple?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Any is what we want. It is allowed to pass anything into this function, but it will raise a validation error if it's not the correct type or it isn't well formed.

That said, I can see an argument for dropping both isinstance checks here if we think that mypy is sufficient to catch those cases (Which is the way I'm leaning)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I look at it, I don't think we get much protection from mypy here, so I'm leaning towards keeping this as Any and keeping the type checks within the function body. The way the message handling works in the Peer.handle_sub_proto_msg I don't think mypy is going to be able to enforce the appropriate types for the validate_response message.

"""
Core `Request` API used for validation.
"""
if not isinstance(response, tuple):
raise ValidationError("Response to `HeaderRequest` must be a tuple")
elif not all(isinstance(item, BlockHeader) for item in response):
raise ValidationError("Response must be a tuple of `BlockHeader` objects")

return self.validate_headers(cast(Tuple[BlockHeader, ...], response))


class BlockHeaders(BaseBlockHeaders):
_cmd_id = 4
structure = sedes.CountableList(BlockHeader)
Expand Down
7 changes: 7 additions & 0 deletions p2p/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,10 @@ class NoInternalAddressMatchesDevice(BaseP2PError):
def __init__(self, *args: Any, device_hostname: str=None, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.device_hostname = device_hostname


class ValidationError(BaseP2PError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this as it's the same name of the exception from the eth package, and I believe we'll often end up writing code that catches one when in fact is the other that will be raised. Like we just saw with the OperationCancelled exception after the move to the cancel_token lib

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @carver

how would the two of you feel about a common/shared validation library that exposed a common ValidationError exception that we could use across all of our libraries. It is one of those things that we have implemented almost everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
Raised when something does not pass a validation check.
"""
pass
42 changes: 42 additions & 0 deletions p2p/les.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
from eth.rlp.headers import BlockHeader
from eth.rlp.receipts import Receipt

from p2p.exceptions import (
ValidationError,
)
from p2p.protocol import (
BaseBlockHeaders,
BaseHeaderRequest,
Command,
Protocol,
_DecodedMsgType,
Expand Down Expand Up @@ -163,6 +167,44 @@ class GetBlockHeadersQuery(rlp.Serializable):
]


class HeaderRequest(BaseHeaderRequest):
request_id: int

max_size = MAX_HEADERS_FETCH

def __init__(self,
block_number_or_hash: BlockIdentifier,
max_headers: int,
skip: int,
reverse: bool,
request_id: int) -> None:
self.block_number_or_hash = block_number_or_hash
self.max_headers = max_headers
self.skip = skip
self.reverse = reverse
self.request_id = request_id

def validate_response(self, response: Any) -> None:
"""
Core `Request` API used for validation.
"""
if not isinstance(response, dict):
raise ValidationError("Response to `HeaderRequest` must be a dict")

request_id = response['request_id']
if request_id != self.request_id:
raise ValidationError(
"Response `request_id` does not match. expected: %s | got: %s".format(
self.request_id,
request_id,
)
)
elif not all(isinstance(item, BlockHeader) for item in response['headers']):
raise ValidationError("Response must be a tuple of `BlockHeader` objects")

return self.validate_headers(cast(Tuple[BlockHeader, ...], response['headers']))


class GetBlockHeaders(Command):
_cmd_id = 2
structure = [
Expand Down
Loading