diff --git a/ethpm/package.py b/ethpm/package.py index 4e9b47d371..bb224edf54 100644 --- a/ethpm/package.py +++ b/ethpm/package.py @@ -115,7 +115,7 @@ def __init__( validate_w3_instance(w3) self.w3 = w3 - self.w3.eth.defaultContractFactory = cast(Type[Contract], LinkableContract) + self.w3.eth._default_contract_factory = cast(Type[Contract], LinkableContract) self.manifest = manifest self._uri = uri diff --git a/newsfragments/2753.internal.rst b/newsfragments/2753.internal.rst new file mode 100644 index 0000000000..dccc136098 --- /dev/null +++ b/newsfragments/2753.internal.rst @@ -0,0 +1 @@ +Organize the ``eth`` module into separate files for better readability. \ No newline at end of file diff --git a/tests/core/pm-module/conftest.py b/tests/core/pm-module/conftest.py index fe188bcdf1..1f326c3a99 100644 --- a/tests/core/pm-module/conftest.py +++ b/tests/core/pm-module/conftest.py @@ -57,7 +57,7 @@ def setup_w3(): t = EthereumTester(backend=pyevm_backend) w3 = Web3(Web3.EthereumTesterProvider(ethereum_tester=t)) w3.eth.default_account = w3.eth.accounts[0] - w3.eth.defaultContractFactory = LinkableContract + w3.eth._default_contract_factory = LinkableContract w3.enable_unstable_package_management_api() return w3 diff --git a/tests/core/pm-module/test_registry_integration.py b/tests/core/pm-module/test_registry_integration.py index f58b6f0555..40a9b214cf 100644 --- a/tests/core/pm-module/test_registry_integration.py +++ b/tests/core/pm-module/test_registry_integration.py @@ -25,7 +25,7 @@ def fresh_w3(): w3 = Web3(Web3.EthereumTesterProvider()) w3.eth.default_account = w3.eth.accounts[0] - w3.eth.defaultContractFactory = LinkableContract + w3.eth._default_contract_factory = LinkableContract w3.enable_unstable_package_management_api() return w3 diff --git a/web3/_utils/blocks.py b/web3/_utils/blocks.py index b2ae0f7e0f..238e1fa964 100644 --- a/web3/_utils/blocks.py +++ b/web3/_utils/blocks.py @@ -1,5 +1,6 @@ from typing import ( Any, + Optional, ) from eth_utils import ( @@ -57,7 +58,7 @@ def is_hex_encoded_block_number(value: Any) -> bool: @curry def select_method_for_block_identifier( value: Any, if_hash: RPCEndpoint, if_number: RPCEndpoint, if_predefined: RPCEndpoint -) -> RPCEndpoint: +) -> Optional[RPCEndpoint]: if is_predefined_block_number(value): return if_predefined elif isinstance(value, bytes): diff --git a/web3/_utils/filters.py b/web3/_utils/filters.py index 72cc16003a..d82ab2d83e 100644 --- a/web3/_utils/filters.py +++ b/web3/_utils/filters.py @@ -376,7 +376,7 @@ def select_filter_method( if_new_block_filter: RPCEndpoint, if_new_pending_transaction_filter: RPCEndpoint, if_new_filter: RPCEndpoint, -) -> RPCEndpoint: +) -> Optional[RPCEndpoint]: if is_string(value): if value == "latest": diff --git a/web3/_utils/module_testing/eth_module.py b/web3/_utils/module_testing/eth_module.py index 717e9bcf7b..2ee3d64fb6 100644 --- a/web3/_utils/module_testing/eth_module.py +++ b/web3/_utils/module_testing/eth_module.py @@ -75,6 +75,7 @@ construct_error_generator_middleware, ) from web3.types import ( # noqa: F401 + ENS, BlockData, FilterParams, LogReceipt, @@ -1374,7 +1375,7 @@ async def test_async_eth_get_storage_at_ens_name( ) -> None: with ens_addresses(async_w3, {"emitter.eth": emitter_contract_address}): storage = await async_w3.eth.get_storage_at( # type: ignore - "emitter.eth", 0 + ENS("emitter.eth"), 0 ) assert isinstance(storage, HexBytes) @@ -1679,7 +1680,7 @@ def test_eth_get_storage_at_ens_name( self, w3: "Web3", emitter_contract_address: ChecksumAddress ) -> None: with ens_addresses(w3, {"emitter.eth": emitter_contract_address}): - storage = w3.eth.get_storage_at("emitter.eth", 0) + storage = w3.eth.get_storage_at(ENS("emitter.eth"), 0) assert isinstance(storage, HexBytes) def test_eth_get_storage_at_invalid_address(self, w3: "Web3") -> None: @@ -1703,7 +1704,7 @@ def test_eth_get_transaction_count_ens_name( w3, {"unlocked-acct-dual-type.eth": unlocked_account_dual_type} ): transaction_count = w3.eth.get_transaction_count( - "unlocked-acct-dual-type.eth" + ENS("unlocked-acct-dual-type.eth") ) assert is_integer(transaction_count) assert transaction_count >= 0 @@ -1774,7 +1775,7 @@ def test_eth_get_code_ens_address( self, w3: "Web3", math_contract_address: ChecksumAddress ) -> None: with ens_addresses(w3, {"mathcontract.eth": math_contract_address}): - code = w3.eth.get_code("mathcontract.eth") + code = w3.eth.get_code(ENS("mathcontract.eth")) assert isinstance(code, HexBytes) assert len(code) > 0 diff --git a/web3/eth.py b/web3/eth.py deleted file mode 100644 index f4333ee5b4..0000000000 --- a/web3/eth.py +++ /dev/null @@ -1,1005 +0,0 @@ -import asyncio -from typing import ( - Any, - Awaitable, - Callable, - List, - NoReturn, - Optional, - Sequence, - Tuple, - Type, - Union, - cast, - overload, -) -import warnings - -from eth_account import ( - Account, -) -from eth_typing import ( - Address, - BlockNumber, - ChecksumAddress, - HexStr, -) -from eth_utils import ( - is_checksum_address, - is_string, -) -from eth_utils.toolz import ( - assoc, - merge, -) -from hexbytes import ( - HexBytes, -) - -from web3._utils.blocks import ( - select_method_for_block_identifier, -) -from web3._utils.empty import ( - Empty, - empty, -) -from web3._utils.encoding import ( - to_hex, -) -from web3._utils.fee_utils import ( - async_fee_history_priority_fee, - fee_history_priority_fee, -) -from web3._utils.filters import ( - AsyncFilter, - Filter, - select_filter_method, -) -from web3._utils.rpc_abi import ( - RPC, -) -from web3._utils.threads import ( - Timeout, -) -from web3._utils.transactions import ( - assert_valid_transaction_params, - extract_valid_transaction_params, - get_required_transaction, - replace_transaction, -) -from web3.contract import ( - AsyncContract, - AsyncContractCaller, - Contract, - ContractCaller, -) -from web3.exceptions import ( - OffchainLookup, - TimeExhausted, - TooManyRequests, - TransactionNotFound, -) -from web3.method import ( - Method, - default_root_munger, -) -from web3.module import ( - Module, -) -from web3.types import ( - ENS, - BlockData, - BlockIdentifier, - BlockParams, - CallOverride, - FeeHistory, - FilterParams, - GasPriceStrategy, - LogReceipt, - MerkleProof, - Nonce, - SignedTx, - SyncStatus, - TxData, - TxParams, - TxReceipt, - Uncle, - Wei, - _Hash32, -) -from web3.utils import ( - async_handle_offchain_lookup, - handle_offchain_lookup, -) - - -class BaseEth(Module): - _default_account: Union[ChecksumAddress, Empty] = empty - _default_block: BlockIdentifier = "latest" - gasPriceStrategy = None - account = Account() - defaultContractFactory: Any = None - - _gas_price: Method[Callable[[], Wei]] = Method( - RPC.eth_gasPrice, - is_property=True, - ) - - @property - def default_block(self) -> BlockIdentifier: - return self._default_block - - @default_block.setter - def default_block(self, value: BlockIdentifier) -> None: - self._default_block = value - - @property - def default_account(self) -> Union[ChecksumAddress, Empty]: - return self._default_account - - @default_account.setter - def default_account(self, account: Union[ChecksumAddress, Empty]) -> None: - self._default_account = account - - def send_transaction_munger(self, transaction: TxParams) -> Tuple[TxParams]: - if "from" not in transaction and is_checksum_address(self.default_account): - transaction = assoc(transaction, "from", self.default_account) - - return (transaction,) - - _send_transaction: Method[Callable[[TxParams], HexBytes]] = Method( - RPC.eth_sendTransaction, mungers=[send_transaction_munger] - ) - - _send_raw_transaction: Method[Callable[[Union[HexStr, bytes]], HexBytes]] = Method( - RPC.eth_sendRawTransaction, - mungers=[default_root_munger], - ) - - _get_transaction: Method[Callable[[_Hash32], TxData]] = Method( - RPC.eth_getTransactionByHash, mungers=[default_root_munger] - ) - - _get_raw_transaction: Method[Callable[[_Hash32], HexBytes]] = Method( - RPC.eth_getRawTransactionByHash, mungers=[default_root_munger] - ) - - """ - `eth_getRawTransactionByBlockHashAndIndex` - `eth_getRawTransactionByBlockNumberAndIndex` - """ - _get_raw_transaction_by_block: Method[ - Callable[[BlockIdentifier, int], HexBytes] - ] = Method( - method_choice_depends_on_args=select_method_for_block_identifier( - if_predefined=RPC.eth_getRawTransactionByBlockNumberAndIndex, - if_hash=RPC.eth_getRawTransactionByBlockHashAndIndex, - if_number=RPC.eth_getRawTransactionByBlockNumberAndIndex, - ), - mungers=[default_root_munger], - ) - - def generate_gas_price( - self, transaction_params: Optional[TxParams] = None - ) -> Optional[Wei]: - if self.gasPriceStrategy: - return self.gasPriceStrategy(self.w3, transaction_params) - return None - - def set_gas_price_strategy(self, gas_price_strategy: GasPriceStrategy) -> None: - self.gasPriceStrategy = gas_price_strategy - - def estimate_gas_munger( - self, transaction: TxParams, block_identifier: Optional[BlockIdentifier] = None - ) -> Sequence[Union[TxParams, BlockIdentifier]]: - if "from" not in transaction and is_checksum_address(self.default_account): - transaction = assoc(transaction, "from", self.default_account) - - if block_identifier is None: - params: Sequence[Union[TxParams, BlockIdentifier]] = [transaction] - else: - params = [transaction, block_identifier] - - return params - - _estimate_gas: Method[Callable[..., int]] = Method( - RPC.eth_estimateGas, mungers=[estimate_gas_munger] - ) - - _fee_history: Method[Callable[..., FeeHistory]] = Method( - RPC.eth_feeHistory, mungers=[default_root_munger] - ) - - _max_priority_fee: Method[Callable[..., Wei]] = Method( - RPC.eth_maxPriorityFeePerGas, - is_property=True, - ) - - def get_block_munger( - self, block_identifier: BlockIdentifier, full_transactions: bool = False - ) -> Tuple[BlockIdentifier, bool]: - return (block_identifier, full_transactions) - - """ - `eth_getBlockByHash` - `eth_getBlockByNumber` - """ - _get_block: Method[Callable[..., BlockData]] = Method( - method_choice_depends_on_args=select_method_for_block_identifier( - if_predefined=RPC.eth_getBlockByNumber, - if_hash=RPC.eth_getBlockByHash, - if_number=RPC.eth_getBlockByNumber, - ), - mungers=[get_block_munger], - ) - - get_block_number: Method[Callable[[], BlockNumber]] = Method( - RPC.eth_blockNumber, - is_property=True, - ) - - get_coinbase: Method[Callable[[], ChecksumAddress]] = Method( - RPC.eth_coinbase, - is_property=True, - ) - - def block_id_munger( - self, - account: Union[Address, ChecksumAddress, ENS], - block_identifier: Optional[BlockIdentifier] = None, - ) -> Tuple[Union[Address, ChecksumAddress, ENS], BlockIdentifier]: - if block_identifier is None: - block_identifier = self.default_block - return (account, block_identifier) - - def get_storage_at_munger( - self, - account: Union[Address, ChecksumAddress, ENS], - position: int, - block_identifier: Optional[BlockIdentifier] = None, - ) -> Tuple[Union[Address, ChecksumAddress, ENS], int, BlockIdentifier]: - if block_identifier is None: - block_identifier = self.default_block - return (account, position, block_identifier) - - def call_munger( - self, - transaction: TxParams, - block_identifier: Optional[BlockIdentifier] = None, - state_override: Optional[CallOverride] = None, - ) -> Union[ - Tuple[TxParams, BlockIdentifier], Tuple[TxParams, BlockIdentifier, CallOverride] - ]: - # TODO: move to middleware - if "from" not in transaction and is_checksum_address(self.default_account): - transaction = assoc(transaction, "from", self.default_account) - - # TODO: move to middleware - if block_identifier is None: - block_identifier = self.default_block - - if state_override is None: - return (transaction, block_identifier) - else: - return (transaction, block_identifier, state_override) - - _get_accounts: Method[Callable[[], Tuple[ChecksumAddress]]] = Method( - RPC.eth_accounts, - is_property=True, - ) - - _get_hashrate: Method[Callable[[], int]] = Method( - RPC.eth_hashrate, - is_property=True, - ) - - _chain_id: Method[Callable[[], int]] = Method( - RPC.eth_chainId, - is_property=True, - ) - - _is_mining: Method[Callable[[], bool]] = Method( - RPC.eth_mining, - is_property=True, - ) - - _is_syncing: Method[Callable[[], Union[SyncStatus, bool]]] = Method( - RPC.eth_syncing, - is_property=True, - ) - - _get_transaction_receipt: Method[Callable[[_Hash32], TxReceipt]] = Method( - RPC.eth_getTransactionReceipt, mungers=[default_root_munger] - ) - - """ - `eth_getBlockTransactionCountByHash` - `eth_getBlockTransactionCountByNumber` - """ - get_block_transaction_count: Method[Callable[[BlockIdentifier], int]] = Method( - method_choice_depends_on_args=select_method_for_block_identifier( - if_predefined=RPC.eth_getBlockTransactionCountByNumber, - if_hash=RPC.eth_getBlockTransactionCountByHash, - if_number=RPC.eth_getBlockTransactionCountByNumber, - ), - mungers=[default_root_munger], - ) - - @overload - def contract( - self, address: None = None, **kwargs: Any - ) -> Union[Type[Contract], Type[AsyncContract]]: - ... # noqa: E704,E501 - - @overload # noqa: F811 - def contract( - self, address: Union[Address, ChecksumAddress, ENS], **kwargs: Any - ) -> Union[Contract, AsyncContract]: - ... # noqa: E704,E501 - - def contract( # noqa: F811 - self, - address: Optional[Union[Address, ChecksumAddress, ENS]] = None, - **kwargs: Any, - ) -> Union[Type[Contract], Contract, Type[AsyncContract], AsyncContract]: - ContractFactoryClass = kwargs.pop( - "ContractFactoryClass", self.defaultContractFactory - ) - - ContractFactory = ContractFactoryClass.factory(self.w3, **kwargs) - - if address: - return ContractFactory(address) - else: - return ContractFactory - - def set_contract_factory( - self, - contractFactory: Type[ - Union[Contract, AsyncContract, ContractCaller, AsyncContractCaller] - ], - ) -> None: - self.defaultContractFactory = contractFactory - - def filter_munger( - self, - filter_params: Optional[Union[str, FilterParams]] = None, - filter_id: Optional[HexStr] = None, - ) -> Union[List[FilterParams], List[HexStr], List[str]]: - if filter_id and filter_params: - raise TypeError( - "Ambiguous invocation: provide either a `filter_params` or a " - "`filter_id` argument. Both were supplied." - ) - if isinstance(filter_params, dict): - return [filter_params] - elif is_string(filter_params): - if filter_params in ["latest", "pending"]: - return [filter_params] - else: - raise ValueError( - "The filter API only accepts the values of `pending` or " - "`latest` for string based filters" - ) - elif filter_id and not filter_params: - return [filter_id] - else: - raise TypeError( - "Must provide either filter_params as a string or " - "a valid filter object, or a filter_id as a string " - "or hex." - ) - - -class AsyncEth(BaseEth): - is_async = True - defaultContractFactory: Type[ - Union[AsyncContract, AsyncContractCaller] - ] = AsyncContract - - @property - async def accounts(self) -> Tuple[ChecksumAddress]: - return await self._get_accounts() # type: ignore - - @property - async def block_number(self) -> BlockNumber: - # types ignored b/c mypy conflict with BlockingEth properties - return await self.get_block_number() # type: ignore - - @property - async def chain_id(self) -> int: - return await self._chain_id() # type: ignore - - @property - async def coinbase(self) -> ChecksumAddress: - # types ignored b/c mypy conflict with BlockingEth properties - return await self.get_coinbase() # type: ignore - - @property - async def gas_price(self) -> Wei: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._gas_price() # type: ignore - - @property - async def hashrate(self) -> int: - return await self._get_hashrate() # type: ignore - - @property - async def max_priority_fee(self) -> Wei: - """ - Try to use eth_maxPriorityFeePerGas but, since this is not part - of the spec and is only supported by some clients, fall back to - an eth_feeHistory calculation with min and max caps. - """ - try: - return await self._max_priority_fee() # type: ignore - except ValueError: - warnings.warn( - "There was an issue with the method eth_maxPriorityFeePerGas. " - "Calculating using eth_feeHistory." - ) - return await async_fee_history_priority_fee(self) - - @property - async def mining(self) -> bool: - return await self._is_mining() # type: ignore - - @property - async def syncing(self) -> Union[SyncStatus, bool]: - return await self._is_syncing() # type: ignore - - async def fee_history( - self, - block_count: int, - newest_block: Union[BlockParams, BlockNumber], - reward_percentiles: Optional[List[float]] = None, - ) -> FeeHistory: - return await self._fee_history( # type: ignore - block_count, newest_block, reward_percentiles - ) - - _call: Method[Callable[..., Awaitable[Union[bytes, bytearray]]]] = Method( - RPC.eth_call, mungers=[BaseEth.call_munger] - ) - - async def call( - self, - transaction: TxParams, - block_identifier: Optional[BlockIdentifier] = None, - state_override: Optional[CallOverride] = None, - ccip_read_enabled: Optional[bool] = None, - ) -> Union[bytes, bytearray]: - ccip_read_enabled_on_provider = self.w3.provider.global_ccip_read_enabled - if ( - # default conditions: - ccip_read_enabled_on_provider - and ccip_read_enabled is not False - # explicit call flag overrides provider flag, - # enabling ccip read for specific calls: - or not ccip_read_enabled_on_provider - and ccip_read_enabled is True - ): - return await self._durin_call(transaction, block_identifier, state_override) - - return await self._call(transaction, block_identifier, state_override) - - async def _durin_call( - self, - transaction: TxParams, - block_identifier: Optional[BlockIdentifier] = None, - state_override: Optional[CallOverride] = None, - ) -> Union[bytes, bytearray]: - max_redirects = self.w3.provider.ccip_read_max_redirects - - if not max_redirects or max_redirects < 4: - raise ValueError( - "ccip_read_max_redirects property on provider must be at least 4." - ) - - for _ in range(max_redirects): - try: - return await self._call(transaction, block_identifier, state_override) - except OffchainLookup as offchain_lookup: - durin_calldata = await async_handle_offchain_lookup( - offchain_lookup.payload, - transaction, - ) - transaction["data"] = durin_calldata - - raise TooManyRequests("Too many CCIP read redirects") - - async def send_transaction(self, transaction: TxParams) -> HexBytes: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._send_transaction(transaction) # type: ignore - - async def send_raw_transaction(self, transaction: Union[HexStr, bytes]) -> HexBytes: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._send_raw_transaction(transaction) # type: ignore - - async def get_transaction(self, transaction_hash: _Hash32) -> TxData: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._get_transaction(transaction_hash) # type: ignore - - async def get_raw_transaction(self, transaction_hash: _Hash32) -> TxData: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._get_raw_transaction(transaction_hash) # type: ignore - - async def get_raw_transaction_by_block( - self, block_identifier: BlockIdentifier, index: int - ) -> HexBytes: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._get_raw_transaction_by_block( # type: ignore - block_identifier, index - ) - - async def estimate_gas( - self, transaction: TxParams, block_identifier: Optional[BlockIdentifier] = None - ) -> int: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._estimate_gas(transaction, block_identifier) # type: ignore - - async def get_block( - self, block_identifier: BlockIdentifier, full_transactions: bool = False - ) -> BlockData: - # types ignored b/c mypy conflict with BlockingEth properties - return await self._get_block( # type: ignore - block_identifier, full_transactions - ) - - _get_balance: Method[Callable[..., Awaitable[Wei]]] = Method( - RPC.eth_getBalance, - mungers=[BaseEth.block_id_munger], - ) - - async def get_balance( - self, - account: Union[Address, ChecksumAddress, ENS], - block_identifier: Optional[BlockIdentifier] = None, - ) -> Wei: - return await self._get_balance(account, block_identifier) - - _get_code: Method[Callable[..., Awaitable[HexBytes]]] = Method( - RPC.eth_getCode, mungers=[BaseEth.block_id_munger] - ) - - async def get_code( - self, - account: Union[Address, ChecksumAddress, ENS], - block_identifier: Optional[BlockIdentifier] = None, - ) -> HexBytes: - return await self._get_code(account, block_identifier) - - _get_logs: Method[Callable[[FilterParams], Awaitable[List[LogReceipt]]]] = Method( - RPC.eth_getLogs, mungers=[default_root_munger] - ) - - async def get_logs( - self, - filter_params: FilterParams, - ) -> List[LogReceipt]: - return await self._get_logs(filter_params) - - _get_transaction_count: Method[Callable[..., Awaitable[Nonce]]] = Method( - RPC.eth_getTransactionCount, - mungers=[BaseEth.block_id_munger], - ) - - async def get_transaction_count( - self, - account: Union[Address, ChecksumAddress, ENS], - block_identifier: Optional[BlockIdentifier] = None, - ) -> Nonce: - return await self._get_transaction_count(account, block_identifier) - - async def get_transaction_receipt(self, transaction_hash: _Hash32) -> TxReceipt: - return await self._get_transaction_receipt(transaction_hash) # type: ignore - - async def wait_for_transaction_receipt( - self, transaction_hash: _Hash32, timeout: float = 120, poll_latency: float = 0.1 - ) -> TxReceipt: - async def _wait_for_tx_receipt_with_timeout( - _tx_hash: _Hash32, _poll_latence: float - ) -> TxReceipt: - while True: - try: - tx_receipt = await self._get_transaction_receipt( # type: ignore - _tx_hash - ) - except TransactionNotFound: - tx_receipt = None - if tx_receipt is not None: - break - await asyncio.sleep(poll_latency) - return tx_receipt - - try: - return await asyncio.wait_for( - _wait_for_tx_receipt_with_timeout(transaction_hash, poll_latency), - timeout=timeout, - ) - except asyncio.TimeoutError: - raise TimeExhausted( - f"Transaction {HexBytes(transaction_hash) !r} is not in the chain " - f"after {timeout} seconds" - ) - - _get_storage_at: Method[Callable[..., Awaitable[HexBytes]]] = Method( - RPC.eth_getStorageAt, - mungers=[BaseEth.get_storage_at_munger], - ) - - async def get_storage_at( - self, - account: Union[Address, ChecksumAddress, ENS], - position: int, - block_identifier: Optional[BlockIdentifier] = None, - ) -> HexBytes: - return await self._get_storage_at(account, position, block_identifier) - - filter: Method[ - Callable[[Optional[Union[str, FilterParams, HexStr]]], Awaitable[AsyncFilter]] - ] = Method( - method_choice_depends_on_args=select_filter_method( - if_new_block_filter=RPC.eth_newBlockFilter, - if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter, - if_new_filter=RPC.eth_newFilter, - ), - mungers=[BaseEth.filter_munger], - ) - - _get_filter_changes: Method[ - Callable[[HexStr], Awaitable[List[LogReceipt]]] - ] = Method(RPC.eth_getFilterChanges, mungers=[default_root_munger]) - - async def get_filter_changes(self, filter_id: HexStr) -> List[LogReceipt]: - return await self._get_filter_changes(filter_id) - - _get_filter_logs: Method[Callable[[HexStr], Awaitable[List[LogReceipt]]]] = Method( - RPC.eth_getFilterLogs, mungers=[default_root_munger] - ) - - async def get_filter_logs(self, filter_id: HexStr) -> List[LogReceipt]: - return await self._get_filter_logs(filter_id) - - _uninstall_filter: Method[Callable[[HexStr], Awaitable[bool]]] = Method( - RPC.eth_uninstallFilter, - mungers=[default_root_munger], - ) - - async def uninstall_filter(self, filter_id: HexStr) -> bool: - return await self._uninstall_filter(filter_id) - - -class Eth(BaseEth): - defaultContractFactory: Type[Union[Contract, ContractCaller]] = Contract - - def namereg(self) -> NoReturn: - raise NotImplementedError() - - def icapNamereg(self) -> NoReturn: - raise NotImplementedError() - - @property - def syncing(self) -> Union[SyncStatus, bool]: - return self._is_syncing() - - @property - def coinbase(self) -> ChecksumAddress: - return self.get_coinbase() - - @property - def mining(self) -> bool: - return self._is_mining() - - @property - def hashrate(self) -> int: - return self._get_hashrate() - - @property - def gas_price(self) -> Wei: - return self._gas_price() - - @property - def accounts(self) -> Tuple[ChecksumAddress]: - return self._get_accounts() - - @property - def block_number(self) -> BlockNumber: - return self.get_block_number() - - @property - def chain_id(self) -> int: - return self._chain_id() - - get_balance: Method[Callable[..., Wei]] = Method( - RPC.eth_getBalance, - mungers=[BaseEth.block_id_munger], - ) - - @property - def max_priority_fee(self) -> Wei: - """ - Try to use eth_maxPriorityFeePerGas but, since this is not part - of the spec and is only supported by some clients, fall back to - an eth_feeHistory calculation with min and max caps. - """ - try: - return self._max_priority_fee() - except ValueError: - warnings.warn( - "There was an issue with the method eth_maxPriorityFeePerGas. " - "Calculating using eth_feeHistory." - ) - return fee_history_priority_fee(self) - - get_storage_at: Method[Callable[..., HexBytes]] = Method( - RPC.eth_getStorageAt, - mungers=[BaseEth.get_storage_at_munger], - ) - - def get_proof_munger( - self, - account: Union[Address, ChecksumAddress, ENS], - positions: Sequence[int], - block_identifier: Optional[BlockIdentifier] = None, - ) -> Tuple[ - Union[Address, ChecksumAddress, ENS], Sequence[int], Optional[BlockIdentifier] - ]: - if block_identifier is None: - block_identifier = self.default_block - return (account, positions, block_identifier) - - get_proof: Method[ - Callable[ - [ - Tuple[ - Union[Address, ChecksumAddress, ENS], - Sequence[int], - Optional[BlockIdentifier], - ] - ], - MerkleProof, - ] - ] = Method( - RPC.eth_getProof, - mungers=[get_proof_munger], - ) - - def get_block( - self, block_identifier: BlockIdentifier, full_transactions: bool = False - ) -> BlockData: - return self._get_block(block_identifier, full_transactions) - - get_code: Method[Callable[..., HexBytes]] = Method( - RPC.eth_getCode, mungers=[BaseEth.block_id_munger] - ) - - """ - `eth_getUncleCountByBlockHash` - `eth_getUncleCountByBlockNumber` - """ - get_uncle_count: Method[Callable[[BlockIdentifier], int]] = Method( - method_choice_depends_on_args=select_method_for_block_identifier( - if_predefined=RPC.eth_getUncleCountByBlockNumber, - if_hash=RPC.eth_getUncleCountByBlockHash, - if_number=RPC.eth_getUncleCountByBlockNumber, - ), - mungers=[default_root_munger], - ) - - """ - `eth_getUncleByBlockHashAndIndex` - `eth_getUncleByBlockNumberAndIndex` - """ - get_uncle_by_block: Method[Callable[[BlockIdentifier, int], Uncle]] = Method( - method_choice_depends_on_args=select_method_for_block_identifier( - if_predefined=RPC.eth_getUncleByBlockNumberAndIndex, - if_hash=RPC.eth_getUncleByBlockHashAndIndex, - if_number=RPC.eth_getUncleByBlockNumberAndIndex, - ), - mungers=[default_root_munger], - ) - - def get_transaction(self, transaction_hash: _Hash32) -> TxData: - return self._get_transaction(transaction_hash) - - def get_raw_transaction(self, transaction_hash: _Hash32) -> _Hash32: - return self._get_raw_transaction(transaction_hash) - - def get_raw_transaction_by_block( - self, block_identifier: BlockIdentifier, index: int - ) -> HexBytes: - return self._get_raw_transaction_by_block(block_identifier, index) - - get_transaction_by_block: Method[Callable[[BlockIdentifier, int], TxData]] = Method( - method_choice_depends_on_args=select_method_for_block_identifier( - if_predefined=RPC.eth_getTransactionByBlockNumberAndIndex, - if_hash=RPC.eth_getTransactionByBlockHashAndIndex, - if_number=RPC.eth_getTransactionByBlockNumberAndIndex, - ), - mungers=[default_root_munger], - ) - - def wait_for_transaction_receipt( - self, transaction_hash: _Hash32, timeout: float = 120, poll_latency: float = 0.1 - ) -> TxReceipt: - try: - with Timeout(timeout) as _timeout: - while True: - try: - tx_receipt = self._get_transaction_receipt(transaction_hash) - except TransactionNotFound: - tx_receipt = None - if tx_receipt is not None: - break - _timeout.sleep(poll_latency) - return tx_receipt - - except Timeout: - raise TimeExhausted( - f"Transaction {HexBytes(transaction_hash) !r} is not in the chain " - f"after {timeout} seconds" - ) - - def get_transaction_receipt(self, transaction_hash: _Hash32) -> TxReceipt: - return self._get_transaction_receipt(transaction_hash) - - get_transaction_count: Method[Callable[..., Nonce]] = Method( - RPC.eth_getTransactionCount, - mungers=[BaseEth.block_id_munger], - ) - - def replace_transaction( - self, transaction_hash: _Hash32, new_transaction: TxParams - ) -> HexBytes: - current_transaction = get_required_transaction(self.w3, transaction_hash) - return replace_transaction(self.w3, current_transaction, new_transaction) - - # todo: Update Any to stricter kwarg checking with TxParams - # https://github.com/python/mypy/issues/4441 - def modify_transaction( - self, transaction_hash: _Hash32, **transaction_params: Any - ) -> HexBytes: - assert_valid_transaction_params(cast(TxParams, transaction_params)) - current_transaction = get_required_transaction(self.w3, transaction_hash) - current_transaction_params = extract_valid_transaction_params( - current_transaction - ) - new_transaction = merge(current_transaction_params, transaction_params) - return replace_transaction(self.w3, current_transaction, new_transaction) - - def send_transaction(self, transaction: TxParams) -> HexBytes: - return self._send_transaction(transaction) - - def send_raw_transaction(self, transaction: Union[HexStr, bytes]) -> HexBytes: - return self._send_raw_transaction(transaction) - - def sign_munger( - self, - account: Union[Address, ChecksumAddress, ENS], - data: Union[int, bytes] = None, - hexstr: HexStr = None, - text: str = None, - ) -> Tuple[Union[Address, ChecksumAddress, ENS], HexStr]: - message_hex = to_hex(data, hexstr=hexstr, text=text) - return (account, message_hex) - - sign: Method[Callable[..., HexStr]] = Method( - RPC.eth_sign, - mungers=[sign_munger], - ) - - sign_transaction: Method[Callable[[TxParams], SignedTx]] = Method( - RPC.eth_signTransaction, - mungers=[default_root_munger], - ) - - sign_typed_data: Method[Callable[..., HexStr]] = Method( - RPC.eth_signTypedData, - mungers=[default_root_munger], - ) - - _call: Method[Callable[..., Union[bytes, bytearray]]] = Method( - RPC.eth_call, mungers=[BaseEth.call_munger] - ) - - def call( - self, - transaction: TxParams, - block_identifier: Optional[BlockIdentifier] = None, - state_override: Optional[CallOverride] = None, - ccip_read_enabled: Optional[bool] = None, - ) -> Union[bytes, bytearray]: - ccip_read_enabled_on_provider = self.w3.provider.global_ccip_read_enabled - if ( - # default conditions: - ccip_read_enabled_on_provider - and ccip_read_enabled is not False - # explicit call flag overrides provider flag, - # enabling ccip read for specific calls: - or not ccip_read_enabled_on_provider - and ccip_read_enabled is True - ): - return self._durin_call(transaction, block_identifier, state_override) - - return self._call(transaction, block_identifier, state_override) - - def _durin_call( - self, - transaction: TxParams, - block_identifier: Optional[BlockIdentifier] = None, - state_override: Optional[CallOverride] = None, - ) -> Union[bytes, bytearray]: - max_redirects = self.w3.provider.ccip_read_max_redirects - - if not max_redirects or max_redirects < 4: - raise ValueError( - "ccip_read_max_redirects property on provider must be at least 4." - ) - - for _ in range(max_redirects): - try: - return self._call(transaction, block_identifier, state_override) - except OffchainLookup as offchain_lookup: - durin_calldata = handle_offchain_lookup( - offchain_lookup.payload, transaction - ) - transaction["data"] = durin_calldata - - raise TooManyRequests("Too many CCIP read redirects") - - def estimate_gas( - self, transaction: TxParams, block_identifier: Optional[BlockIdentifier] = None - ) -> int: - return self._estimate_gas(transaction, block_identifier) - - def fee_history( - self, - block_count: int, - newest_block: Union[BlockParams, BlockNumber], - reward_percentiles: Optional[List[float]] = None, - ) -> FeeHistory: - return self._fee_history(block_count, newest_block, reward_percentiles) - - filter: Method[ - Callable[[Optional[Union[str, FilterParams, HexStr]]], Filter] - ] = Method( - method_choice_depends_on_args=select_filter_method( - if_new_block_filter=RPC.eth_newBlockFilter, - if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter, - if_new_filter=RPC.eth_newFilter, - ), - mungers=[BaseEth.filter_munger], - ) - - get_filter_changes: Method[Callable[[HexStr], List[LogReceipt]]] = Method( - RPC.eth_getFilterChanges, mungers=[default_root_munger] - ) - - get_filter_logs: Method[Callable[[HexStr], List[LogReceipt]]] = Method( - RPC.eth_getFilterLogs, mungers=[default_root_munger] - ) - - uninstall_filter: Method[Callable[[HexStr], bool]] = Method( - RPC.eth_uninstallFilter, - mungers=[default_root_munger], - ) - - get_logs: Method[Callable[[FilterParams], List[LogReceipt]]] = Method( - RPC.eth_getLogs, mungers=[default_root_munger] - ) - - submit_hashrate: Method[Callable[[int, _Hash32], bool]] = Method( - RPC.eth_submitHashrate, - mungers=[default_root_munger], - ) - - submit_work: Method[Callable[[int, _Hash32, _Hash32], bool]] = Method( - RPC.eth_submitWork, - mungers=[default_root_munger], - ) - - get_work: Method[Callable[[], List[HexBytes]]] = Method( - RPC.eth_getWork, - is_property=True, - ) diff --git a/web3/eth/__init__.py b/web3/eth/__init__.py new file mode 100644 index 0000000000..7526a177e1 --- /dev/null +++ b/web3/eth/__init__.py @@ -0,0 +1,10 @@ +from .async_eth import ( # noqa: F401 + AsyncEth, +) +from .base import ( # noqa: F401 + BaseEth, +) +from .main import ( # noqa: F401 + Eth, + Contract, +) diff --git a/web3/eth/async_eth.py b/web3/eth/async_eth.py new file mode 100644 index 0000000000..ae243799bd --- /dev/null +++ b/web3/eth/async_eth.py @@ -0,0 +1,547 @@ +import asyncio +from typing import ( + Awaitable, + Callable, + List, + Optional, + Tuple, + Type, + Union, +) +import warnings + +from eth_typing import ( + Address, + BlockNumber, + ChecksumAddress, + HexStr, +) +from hexbytes import ( + HexBytes, +) + +from web3._utils.blocks import ( + select_method_for_block_identifier, +) +from web3._utils.fee_utils import ( + async_fee_history_priority_fee, +) +from web3._utils.filters import ( + AsyncFilter, + select_filter_method, +) +from web3._utils.rpc_abi import ( + RPC, +) +from web3.contract import ( + AsyncContract, + AsyncContractCaller, +) +from web3.eth.base import ( + BaseEth, +) +from web3.exceptions import ( + OffchainLookup, + TimeExhausted, + TooManyRequests, + TransactionNotFound, +) +from web3.method import ( + Method, + default_root_munger, +) +from web3.types import ( + ENS, + BlockData, + BlockIdentifier, + BlockParams, + CallOverride, + FeeHistory, + FilterParams, + LogReceipt, + Nonce, + SyncStatus, + TxData, + TxParams, + TxReceipt, + Wei, + _Hash32, +) +from web3.utils import ( + async_handle_offchain_lookup, +) + + +class AsyncEth(BaseEth): + is_async = True + + _default_contract_factory: Type[ + Union[AsyncContract, AsyncContractCaller] + ] = AsyncContract + + # eth_accounts + + _accounts: Method[Callable[[], Awaitable[Tuple[ChecksumAddress]]]] = Method( + RPC.eth_accounts, + is_property=True, + ) + + @property + async def accounts(self) -> Tuple[ChecksumAddress]: + return await self._accounts() + + # eth_hashrate + + _hashrate: Method[Callable[[], Awaitable[int]]] = Method( + RPC.eth_hashrate, + is_property=True, + ) + + @property + async def hashrate(self) -> int: + return await self._hashrate() + + # eth_blockNumber + + get_block_number: Method[Callable[[], Awaitable[BlockNumber]]] = Method( + RPC.eth_blockNumber, + is_property=True, + ) + + @property + async def block_number(self) -> BlockNumber: + return await self.get_block_number() + + # eth_chainId + + _chain_id: Method[Callable[[], Awaitable[int]]] = Method( + RPC.eth_chainId, + is_property=True, + ) + + @property + async def chain_id(self) -> int: + return await self._chain_id() + + # eth_coinbase + + _coinbase: Method[Callable[[], Awaitable[ChecksumAddress]]] = Method( + RPC.eth_coinbase, + is_property=True, + ) + + @property + async def coinbase(self) -> ChecksumAddress: + return await self._coinbase() + + # eth_gasPrice + + _gas_price: Method[Callable[[], Awaitable[Wei]]] = Method( + RPC.eth_gasPrice, + is_property=True, + ) + + @property + async def gas_price(self) -> Wei: + return await self._gas_price() + + # eth_maxPriorityFeePerGas + + _max_priority_fee: Method[Callable[[], Awaitable[Wei]]] = Method( + RPC.eth_maxPriorityFeePerGas, + is_property=True, + ) + + @property + async def max_priority_fee(self) -> Wei: + """ + Try to use eth_maxPriorityFeePerGas but, since this is not part + of the spec and is only supported by some clients, fall back to + an eth_feeHistory calculation with min and max caps. + """ + try: + return await self._max_priority_fee() + except ValueError: + warnings.warn( + "There was an issue with the method eth_maxPriorityFeePerGas. " + "Calculating using eth_feeHistory." + ) + return await async_fee_history_priority_fee(self) + + # eth_mining + + _mining: Method[Callable[[], Awaitable[bool]]] = Method( + RPC.eth_mining, + is_property=True, + ) + + @property + async def mining(self) -> bool: + return await self._mining() + + # eth_syncing + + _syncing: Method[Callable[[], Awaitable[Union[SyncStatus, bool]]]] = Method( + RPC.eth_syncing, + is_property=True, + ) + + @property + async def syncing(self) -> Union[SyncStatus, bool]: + return await self._syncing() + + # eth_feeHistory + + _fee_history: Method[ + Callable[ + [int, Union[BlockParams, BlockNumber], Optional[List[float]]], + Awaitable[FeeHistory], + ] + ] = Method(RPC.eth_feeHistory, mungers=[default_root_munger]) + + async def fee_history( + self, + block_count: int, + newest_block: Union[BlockParams, BlockNumber], + reward_percentiles: Optional[List[float]] = None, + ) -> FeeHistory: + return await self._fee_history(block_count, newest_block, reward_percentiles) + + # eth_call + + _call: Method[ + Callable[ + [ + TxParams, + Optional[BlockIdentifier], + Optional[CallOverride], + ], + Awaitable[Union[bytes, bytearray]], + ] + ] = Method(RPC.eth_call, mungers=[BaseEth.call_munger]) + + async def call( + self, + transaction: TxParams, + block_identifier: Optional[BlockIdentifier] = None, + state_override: Optional[CallOverride] = None, + ccip_read_enabled: Optional[bool] = None, + ) -> Union[bytes, bytearray]: + ccip_read_enabled_on_provider = self.w3.provider.global_ccip_read_enabled + if ( + # default conditions: + ccip_read_enabled_on_provider + and ccip_read_enabled is not False + # explicit call flag overrides provider flag, + # enabling ccip read for specific calls: + or not ccip_read_enabled_on_provider + and ccip_read_enabled is True + ): + return await self._durin_call(transaction, block_identifier, state_override) + + return await self._call(transaction, block_identifier, state_override) + + async def _durin_call( + self, + transaction: TxParams, + block_identifier: Optional[BlockIdentifier] = None, + state_override: Optional[CallOverride] = None, + ) -> Union[bytes, bytearray]: + max_redirects = self.w3.provider.ccip_read_max_redirects + + if not max_redirects or max_redirects < 4: + raise ValueError( + "ccip_read_max_redirects property on provider must be at least 4." + ) + + for _ in range(max_redirects): + try: + return await self._call(transaction, block_identifier, state_override) + except OffchainLookup as offchain_lookup: + durin_calldata = await async_handle_offchain_lookup( + offchain_lookup.payload, + transaction, + ) + transaction["data"] = durin_calldata + + raise TooManyRequests("Too many CCIP read redirects") + + # eth_estimateGas + + _estimate_gas: Method[ + Callable[[TxParams, Optional[BlockIdentifier]], Awaitable[int]] + ] = Method(RPC.eth_estimateGas, mungers=[BaseEth.estimate_gas_munger]) + + async def estimate_gas( + self, transaction: TxParams, block_identifier: Optional[BlockIdentifier] = None + ) -> int: + return await self._estimate_gas(transaction, block_identifier) + + # eth_getTransactionByHash + + _get_transaction: Method[Callable[[_Hash32], Awaitable[TxData]]] = Method( + RPC.eth_getTransactionByHash, mungers=[default_root_munger] + ) + + async def get_transaction(self, transaction_hash: _Hash32) -> TxData: + return await self._get_transaction(transaction_hash) + + # eth_getRawTransactionByHash + + _get_raw_transaction: Method[Callable[[_Hash32], Awaitable[HexBytes]]] = Method( + RPC.eth_getRawTransactionByHash, mungers=[default_root_munger] + ) + + async def get_raw_transaction(self, transaction_hash: _Hash32) -> HexBytes: + return await self._get_raw_transaction(transaction_hash) + + # eth_getTransactionByBlockNumberAndIndex + # eth_getTransactionByBlockHashAndIndex + + _get_transaction_by_block: Method[ + Callable[[BlockIdentifier, int], Awaitable[TxData]] + ] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getTransactionByBlockNumberAndIndex, + if_hash=RPC.eth_getTransactionByBlockHashAndIndex, + if_number=RPC.eth_getTransactionByBlockNumberAndIndex, + ), + mungers=[default_root_munger], + ) + + async def get_transaction_by_block( + self, block_identifier: BlockIdentifier, index: int + ) -> TxData: + return await self._get_transaction_by_block(block_identifier, index) + + # eth_getRawTransactionByBlockHashAndIndex + # eth_getRawTransactionByBlockNumberAndIndex + + _get_raw_transaction_by_block: Method[ + Callable[[BlockIdentifier, int], Awaitable[HexBytes]] + ] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getRawTransactionByBlockNumberAndIndex, + if_hash=RPC.eth_getRawTransactionByBlockHashAndIndex, + if_number=RPC.eth_getRawTransactionByBlockNumberAndIndex, + ), + mungers=[default_root_munger], + ) + + async def get_raw_transaction_by_block( + self, block_identifier: BlockIdentifier, index: int + ) -> HexBytes: + return await self._get_raw_transaction_by_block(block_identifier, index) + + # eth_getBlockTransactionCountByHash + # eth_getBlockTransactionCountByNumber + + get_block_transaction_count: Method[ + Callable[[BlockIdentifier], Awaitable[int]] + ] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getBlockTransactionCountByNumber, + if_hash=RPC.eth_getBlockTransactionCountByHash, + if_number=RPC.eth_getBlockTransactionCountByNumber, + ), + mungers=[default_root_munger], + ) + + # eth_sendTransaction + + _send_transaction: Method[Callable[[TxParams], Awaitable[HexBytes]]] = Method( + RPC.eth_sendTransaction, mungers=[BaseEth.send_transaction_munger] + ) + + async def send_transaction(self, transaction: TxParams) -> HexBytes: + return await self._send_transaction(transaction) + + # eth_sendRawTransaction + + _send_raw_transaction: Method[ + Callable[[Union[HexStr, bytes]], Awaitable[HexBytes]] + ] = Method( + RPC.eth_sendRawTransaction, + mungers=[default_root_munger], + ) + + async def send_raw_transaction(self, transaction: Union[HexStr, bytes]) -> HexBytes: + return await self._send_raw_transaction(transaction) + + # eth_getBlockByHash + # eth_getBlockByNumber + + _get_block: Method[ + Callable[[BlockIdentifier, bool], Awaitable[BlockData]] + ] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getBlockByNumber, + if_hash=RPC.eth_getBlockByHash, + if_number=RPC.eth_getBlockByNumber, + ), + mungers=[BaseEth.get_block_munger], + ) + + async def get_block( + self, block_identifier: BlockIdentifier, full_transactions: bool = False + ) -> BlockData: + return await self._get_block(block_identifier, full_transactions) + + # eth_getBalance + + _get_balance: Method[ + Callable[ + [Union[Address, ChecksumAddress, ENS], Optional[BlockIdentifier]], + Awaitable[Wei], + ] + ] = Method( + RPC.eth_getBalance, + mungers=[BaseEth.block_id_munger], + ) + + async def get_balance( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> Wei: + return await self._get_balance(account, block_identifier) + + # eth_getCode + + _get_code: Method[ + Callable[ + [Union[Address, ChecksumAddress, ENS], Optional[BlockIdentifier]], + Awaitable[HexBytes], + ] + ] = Method(RPC.eth_getCode, mungers=[BaseEth.block_id_munger]) + + async def get_code( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> HexBytes: + return await self._get_code(account, block_identifier) + + # eth_getLogs + + _get_logs: Method[Callable[[FilterParams], Awaitable[List[LogReceipt]]]] = Method( + RPC.eth_getLogs, mungers=[default_root_munger] + ) + + async def get_logs( + self, + filter_params: FilterParams, + ) -> List[LogReceipt]: + return await self._get_logs(filter_params) + + # eth_getTransactionCount + + _get_transaction_count: Method[ + Callable[ + [Union[Address, ChecksumAddress, ENS], Optional[BlockIdentifier]], + Awaitable[Nonce], + ] + ] = Method( + RPC.eth_getTransactionCount, + mungers=[BaseEth.block_id_munger], + ) + + async def get_transaction_count( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> Nonce: + return await self._get_transaction_count(account, block_identifier) + + # eth_getTransactionReceipt + + _transaction_receipt: Method[Callable[[_Hash32], Awaitable[TxReceipt]]] = Method( + RPC.eth_getTransactionReceipt, mungers=[default_root_munger] + ) + + async def get_transaction_receipt(self, transaction_hash: _Hash32) -> TxReceipt: + return await self._transaction_receipt(transaction_hash) + + async def wait_for_transaction_receipt( + self, transaction_hash: _Hash32, timeout: float = 120, poll_latency: float = 0.1 + ) -> TxReceipt: + async def _wait_for_tx_receipt_with_timeout( + _tx_hash: _Hash32, _poll_latency: float + ) -> TxReceipt: + while True: + try: + tx_receipt = await self._transaction_receipt(_tx_hash) + except TransactionNotFound: + tx_receipt = None + if tx_receipt is not None: + break + await asyncio.sleep(poll_latency) + return tx_receipt + + try: + return await asyncio.wait_for( + _wait_for_tx_receipt_with_timeout(transaction_hash, poll_latency), + timeout=timeout, + ) + except asyncio.TimeoutError: + raise TimeExhausted( + f"Transaction {HexBytes(transaction_hash) !r} is not in the chain " + f"after {timeout} seconds" + ) + + # eth_getStorageAt + + _get_storage_at: Method[ + Callable[ + [Union[Address, ChecksumAddress, ENS], int, Optional[BlockIdentifier]], + Awaitable[HexBytes], + ] + ] = Method( + RPC.eth_getStorageAt, + mungers=[BaseEth.get_storage_at_munger], + ) + + async def get_storage_at( + self, + account: Union[Address, ChecksumAddress, ENS], + position: int, + block_identifier: Optional[BlockIdentifier] = None, + ) -> HexBytes: + return await self._get_storage_at(account, position, block_identifier) + + # eth_newFilter, eth_newBlockFilter, eth_newPendingTransactionFilter + + filter: Method[ + Callable[[Optional[Union[str, FilterParams, HexStr]]], Awaitable[AsyncFilter]] + ] = Method( + method_choice_depends_on_args=select_filter_method( + if_new_block_filter=RPC.eth_newBlockFilter, + if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter, + if_new_filter=RPC.eth_newFilter, + ), + mungers=[BaseEth.filter_munger], + ) + + # eth_getFilterChanges, eth_getFilterLogs, eth_uninstallFilter + + _get_filter_changes: Method[ + Callable[[HexStr], Awaitable[List[LogReceipt]]] + ] = Method(RPC.eth_getFilterChanges, mungers=[default_root_munger]) + + async def get_filter_changes(self, filter_id: HexStr) -> List[LogReceipt]: + return await self._get_filter_changes(filter_id) + + _get_filter_logs: Method[Callable[[HexStr], Awaitable[List[LogReceipt]]]] = Method( + RPC.eth_getFilterLogs, mungers=[default_root_munger] + ) + + async def get_filter_logs(self, filter_id: HexStr) -> List[LogReceipt]: + return await self._get_filter_logs(filter_id) + + _uninstall_filter: Method[Callable[[HexStr], Awaitable[bool]]] = Method( + RPC.eth_uninstallFilter, + mungers=[default_root_munger], + ) + + async def uninstall_filter(self, filter_id: HexStr) -> bool: + return await self._uninstall_filter(filter_id) diff --git a/web3/eth/base.py b/web3/eth/base.py new file mode 100644 index 0000000000..ac9429b8a3 --- /dev/null +++ b/web3/eth/base.py @@ -0,0 +1,223 @@ +from typing import ( + Any, + List, + NoReturn, + Optional, + Sequence, + Tuple, + Type, + Union, + overload, +) + +from eth_account import ( + Account, +) +from eth_typing import ( + Address, + ChecksumAddress, + HexStr, +) +from eth_utils import ( + is_checksum_address, + is_string, +) +from eth_utils.toolz import ( + assoc, +) + +from web3._utils.empty import ( + Empty, + empty, +) +from web3.contract import ( + AsyncContract, + AsyncContractCaller, + Contract, + ContractCaller, +) +from web3.module import ( + Module, +) +from web3.types import ( + ENS, + BlockIdentifier, + CallOverride, + FilterParams, + GasPriceStrategy, + TxParams, + Wei, +) + + +class BaseEth(Module): + _default_account: Union[ChecksumAddress, Empty] = empty + _default_block: BlockIdentifier = "latest" + _default_contract_factory: Any = None + _gas_price_strategy = None + + is_async = False + account = Account() + + def namereg(self) -> NoReturn: + raise NotImplementedError() + + def icap_namereg(self) -> NoReturn: + raise NotImplementedError() + + @property + def default_block(self) -> BlockIdentifier: + return self._default_block + + @default_block.setter + def default_block(self, value: BlockIdentifier) -> None: + self._default_block = value + + @property + def default_account(self) -> Union[ChecksumAddress, Empty]: + return self._default_account + + @default_account.setter + def default_account(self, account: Union[ChecksumAddress, Empty]) -> None: + self._default_account = account + + def send_transaction_munger(self, transaction: TxParams) -> Tuple[TxParams]: + if "from" not in transaction and is_checksum_address(self.default_account): + transaction = assoc(transaction, "from", self.default_account) + + return (transaction,) + + def generate_gas_price( + self, transaction_params: Optional[TxParams] = None + ) -> Optional[Wei]: + if self._gas_price_strategy: + return self._gas_price_strategy(self.w3, transaction_params) + return None + + def set_gas_price_strategy( + self, gas_price_strategy: Optional[GasPriceStrategy] + ) -> None: + self._gas_price_strategy = gas_price_strategy + + def estimate_gas_munger( + self, transaction: TxParams, block_identifier: Optional[BlockIdentifier] = None + ) -> Sequence[Union[TxParams, BlockIdentifier]]: + if "from" not in transaction and is_checksum_address(self.default_account): + transaction = assoc(transaction, "from", self.default_account) + + if block_identifier is None: + params: Sequence[Union[TxParams, BlockIdentifier]] = [transaction] + else: + params = [transaction, block_identifier] + + return params + + def get_block_munger( + self, block_identifier: BlockIdentifier, full_transactions: bool = False + ) -> Tuple[BlockIdentifier, bool]: + return (block_identifier, full_transactions) + + def block_id_munger( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> Tuple[Union[Address, ChecksumAddress, ENS], BlockIdentifier]: + if block_identifier is None: + block_identifier = self.default_block + return (account, block_identifier) + + def get_storage_at_munger( + self, + account: Union[Address, ChecksumAddress, ENS], + position: int, + block_identifier: Optional[BlockIdentifier] = None, + ) -> Tuple[Union[Address, ChecksumAddress, ENS], int, BlockIdentifier]: + if block_identifier is None: + block_identifier = self.default_block + return (account, position, block_identifier) + + def call_munger( + self, + transaction: TxParams, + block_identifier: Optional[BlockIdentifier] = None, + state_override: Optional[CallOverride] = None, + ) -> Union[ + Tuple[TxParams, BlockIdentifier], Tuple[TxParams, BlockIdentifier, CallOverride] + ]: + # TODO: move to middleware + if "from" not in transaction and is_checksum_address(self.default_account): + transaction = assoc(transaction, "from", self.default_account) + + # TODO: move to middleware + if block_identifier is None: + block_identifier = self.default_block + + if state_override is None: + return (transaction, block_identifier) + else: + return (transaction, block_identifier, state_override) + + def filter_munger( + self, + filter_params: Optional[Union[str, FilterParams]] = None, + filter_id: Optional[HexStr] = None, + ) -> Union[List[FilterParams], List[HexStr], List[str]]: + if filter_id and filter_params: + raise TypeError( + "Ambiguous invocation: provide either a `filter_params` or a " + "`filter_id` argument. Both were supplied." + ) + if isinstance(filter_params, dict): + return [filter_params] + elif is_string(filter_params): + if filter_params in {"latest", "pending"}: + return [filter_params] + else: + raise ValueError( + "The filter API only accepts the values of `pending` or " + "`latest` for string based filters" + ) + elif filter_id and not filter_params: + return [filter_id] + else: + raise TypeError( + "Must provide either filter_params as a string or " + "a valid filter object, or a filter_id as a string " + "or hex." + ) + + @overload + def contract( + self, address: None = None, **kwargs: Any + ) -> Union[Type[Contract], Type[AsyncContract]]: + ... # noqa: E704,E501 + + @overload # noqa: F811 + def contract( + self, address: Union[Address, ChecksumAddress, ENS], **kwargs: Any + ) -> Union[Contract, AsyncContract]: + ... # noqa: E704,E501 + + def contract( # noqa: F811 + self, + address: Optional[Union[Address, ChecksumAddress, ENS]] = None, + **kwargs: Any, + ) -> Union[Type[Contract], Contract, Type[AsyncContract], AsyncContract]: + ContractFactoryClass = kwargs.pop( + "ContractFactoryClass", self._default_contract_factory + ) + + ContractFactory = ContractFactoryClass.factory(self.w3, **kwargs) + + if address: + return ContractFactory(address) + else: + return ContractFactory + + def set_contract_factory( + self, + contract_factory: Type[ + Union[Contract, AsyncContract, ContractCaller, AsyncContractCaller] + ], + ) -> None: + self._default_contract_factory = contract_factory diff --git a/web3/eth/main.py b/web3/eth/main.py new file mode 100644 index 0000000000..642ad532af --- /dev/null +++ b/web3/eth/main.py @@ -0,0 +1,634 @@ +from typing import ( + Any, + Callable, + List, + Optional, + Sequence, + Tuple, + Type, + Union, + cast, +) +import warnings + +from eth_typing import ( + Address, + BlockNumber, + ChecksumAddress, + HexStr, +) +from eth_utils.toolz import ( + merge, +) +from hexbytes import ( + HexBytes, +) + +from web3._utils.blocks import ( + select_method_for_block_identifier, +) +from web3._utils.encoding import ( + to_hex, +) +from web3._utils.fee_utils import ( + fee_history_priority_fee, +) +from web3._utils.filters import ( + Filter, + select_filter_method, +) +from web3._utils.rpc_abi import ( + RPC, +) +from web3._utils.threads import ( + Timeout, +) +from web3._utils.transactions import ( + assert_valid_transaction_params, + extract_valid_transaction_params, + get_required_transaction, + replace_transaction, +) +from web3.contract import ( + Contract, + ContractCaller, +) +from web3.eth.base import ( + BaseEth, +) +from web3.exceptions import ( + OffchainLookup, + TimeExhausted, + TooManyRequests, + TransactionNotFound, +) +from web3.method import ( + Method, + default_root_munger, +) +from web3.types import ( + ENS, + BlockData, + BlockIdentifier, + BlockParams, + CallOverride, + FeeHistory, + FilterParams, + LogReceipt, + MerkleProof, + Nonce, + SignedTx, + SyncStatus, + TxData, + TxParams, + TxReceipt, + Uncle, + Wei, + _Hash32, +) +from web3.utils import ( + handle_offchain_lookup, +) + + +class Eth(BaseEth): + _default_contract_factory: Type[Union[Contract, ContractCaller]] = Contract + + # eth_accounts + + _accounts: Method[Callable[[], Tuple[ChecksumAddress]]] = Method( + RPC.eth_accounts, + is_property=True, + ) + + @property + def accounts(self) -> Tuple[ChecksumAddress]: + return self._accounts() + + # eth_hashrate + + _hashrate: Method[Callable[[], int]] = Method( + RPC.eth_hashrate, + is_property=True, + ) + + @property + def hashrate(self) -> int: + return self._hashrate() + + # eth_blockNumber + + get_block_number: Method[Callable[[], BlockNumber]] = Method( + RPC.eth_blockNumber, + is_property=True, + ) + + @property + def block_number(self) -> BlockNumber: + return self.get_block_number() + + # eth_chainId + + _chain_id: Method[Callable[[], int]] = Method( + RPC.eth_chainId, + is_property=True, + ) + + @property + def chain_id(self) -> int: + return self._chain_id() + + # eth_coinbase + + _coinbase: Method[Callable[[], ChecksumAddress]] = Method( + RPC.eth_coinbase, + is_property=True, + ) + + @property + def coinbase(self) -> ChecksumAddress: + return self._coinbase() + + # eth_gasPrice + + _gas_price: Method[Callable[[], Wei]] = Method( + RPC.eth_gasPrice, + is_property=True, + ) + + @property + def gas_price(self) -> Wei: + return self._gas_price() + + # eth_maxPriorityFeePerGas + + _max_priority_fee: Method[Callable[[], Wei]] = Method( + RPC.eth_maxPriorityFeePerGas, + is_property=True, + ) + + @property + def max_priority_fee(self) -> Wei: + """ + Try to use eth_maxPriorityFeePerGas but, since this is not part + of the spec and is only supported by some clients, fall back to + an eth_feeHistory calculation with min and max caps. + """ + try: + return self._max_priority_fee() + except ValueError: + warnings.warn( + "There was an issue with the method eth_maxPriorityFeePerGas. " + "Calculating using eth_feeHistory." + ) + return fee_history_priority_fee(self) + + # eth_mining + + _mining: Method[Callable[[], bool]] = Method( + RPC.eth_mining, + is_property=True, + ) + + @property + def mining(self) -> bool: + return self._mining() + + # eth_syncing + + _syncing: Method[Callable[[], Union[SyncStatus, bool]]] = Method( + RPC.eth_syncing, + is_property=True, + ) + + @property + def syncing(self) -> Union[SyncStatus, bool]: + return self._syncing() + + # eth_feeHistory + + _fee_history: Method[ + Callable[ + [int, Union[BlockParams, BlockNumber], Optional[List[float]]], FeeHistory + ] + ] = Method(RPC.eth_feeHistory, mungers=[default_root_munger]) + + def fee_history( + self, + block_count: int, + newest_block: Union[BlockParams, BlockNumber], + reward_percentiles: Optional[List[float]] = None, + ) -> FeeHistory: + return self._fee_history(block_count, newest_block, reward_percentiles) + + # eth_call + + _call: Method[ + Callable[ + [TxParams, Optional[BlockIdentifier], Optional[CallOverride]], + Union[bytes, bytearray], + ] + ] = Method(RPC.eth_call, mungers=[BaseEth.call_munger]) + + def call( + self, + transaction: TxParams, + block_identifier: Optional[BlockIdentifier] = None, + state_override: Optional[CallOverride] = None, + ccip_read_enabled: Optional[bool] = None, + ) -> Union[bytes, bytearray]: + ccip_read_enabled_on_provider = self.w3.provider.global_ccip_read_enabled + if ( + # default conditions: + ccip_read_enabled_on_provider + and ccip_read_enabled is not False + # explicit call flag overrides provider flag, + # enabling ccip read for specific calls: + or not ccip_read_enabled_on_provider + and ccip_read_enabled is True + ): + return self._durin_call(transaction, block_identifier, state_override) + + return self._call(transaction, block_identifier, state_override) + + def _durin_call( + self, + transaction: TxParams, + block_identifier: Optional[BlockIdentifier] = None, + state_override: Optional[CallOverride] = None, + ) -> Union[bytes, bytearray]: + max_redirects = self.w3.provider.ccip_read_max_redirects + + if not max_redirects or max_redirects < 4: + raise ValueError( + "ccip_read_max_redirects property on provider must be at least 4." + ) + + for _ in range(max_redirects): + try: + return self._call(transaction, block_identifier, state_override) + except OffchainLookup as offchain_lookup: + durin_calldata = handle_offchain_lookup( + offchain_lookup.payload, transaction + ) + transaction["data"] = durin_calldata + + raise TooManyRequests("Too many CCIP read redirects") + + # eth_estimateGas + + _estimate_gas: Method[ + Callable[[TxParams, Optional[BlockIdentifier]], int] + ] = Method(RPC.eth_estimateGas, mungers=[BaseEth.estimate_gas_munger]) + + def estimate_gas( + self, transaction: TxParams, block_identifier: Optional[BlockIdentifier] = None + ) -> int: + return self._estimate_gas(transaction, block_identifier) + + # eth_getTransactionByHash + + _get_transaction: Method[Callable[[_Hash32], TxData]] = Method( + RPC.eth_getTransactionByHash, mungers=[default_root_munger] + ) + + def get_transaction(self, transaction_hash: _Hash32) -> TxData: + return self._get_transaction(transaction_hash) + + # eth_getRawTransactionByHash + + _get_raw_transaction: Method[Callable[[_Hash32], HexBytes]] = Method( + RPC.eth_getRawTransactionByHash, mungers=[default_root_munger] + ) + + def get_raw_transaction(self, transaction_hash: _Hash32) -> HexBytes: + return self._get_raw_transaction(transaction_hash) + + # eth_getTransactionByBlockNumberAndIndex + # eth_getTransactionByBlockHashAndIndex + + get_transaction_by_block: Method[Callable[[BlockIdentifier, int], TxData]] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getTransactionByBlockNumberAndIndex, + if_hash=RPC.eth_getTransactionByBlockHashAndIndex, + if_number=RPC.eth_getTransactionByBlockNumberAndIndex, + ), + mungers=[default_root_munger], + ) + + # eth_getRawTransactionByBlockHashAndIndex + # eth_getRawTransactionByBlockNumberAndIndex + + _get_raw_transaction_by_block: Method[ + Callable[[BlockIdentifier, int], HexBytes] + ] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getRawTransactionByBlockNumberAndIndex, + if_hash=RPC.eth_getRawTransactionByBlockHashAndIndex, + if_number=RPC.eth_getRawTransactionByBlockNumberAndIndex, + ), + mungers=[default_root_munger], + ) + + def get_raw_transaction_by_block( + self, block_identifier: BlockIdentifier, index: int + ) -> HexBytes: + return self._get_raw_transaction_by_block(block_identifier, index) + + # eth_getBlockTransactionCountByHash + # eth_getBlockTransactionCountByNumber + + get_block_transaction_count: Method[Callable[[BlockIdentifier], int]] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getBlockTransactionCountByNumber, + if_hash=RPC.eth_getBlockTransactionCountByHash, + if_number=RPC.eth_getBlockTransactionCountByNumber, + ), + mungers=[default_root_munger], + ) + + # eth_sendTransaction + + _send_transaction: Method[Callable[[TxParams], HexBytes]] = Method( + RPC.eth_sendTransaction, mungers=[BaseEth.send_transaction_munger] + ) + + def send_transaction(self, transaction: TxParams) -> HexBytes: + return self._send_transaction(transaction) + + # eth_sendRawTransaction + + _send_raw_transaction: Method[Callable[[Union[HexStr, bytes]], HexBytes]] = Method( + RPC.eth_sendRawTransaction, + mungers=[default_root_munger], + ) + + def send_raw_transaction(self, transaction: Union[HexStr, bytes]) -> HexBytes: + return self._send_raw_transaction(transaction) + + # eth_getBlockByHash + # eth_getBlockByNumber + + _get_block: Method[Callable[[BlockIdentifier, bool], BlockData]] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getBlockByNumber, + if_hash=RPC.eth_getBlockByHash, + if_number=RPC.eth_getBlockByNumber, + ), + mungers=[BaseEth.get_block_munger], + ) + + def get_block( + self, block_identifier: BlockIdentifier, full_transactions: bool = False + ) -> BlockData: + return self._get_block(block_identifier, full_transactions) + + # eth_getBalance + + _get_balance: Method[ + Callable[[Union[Address, ChecksumAddress, ENS], Optional[BlockIdentifier]], Wei] + ] = Method( + RPC.eth_getBalance, + mungers=[BaseEth.block_id_munger], + ) + + def get_balance( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> Wei: + return self._get_balance(account, block_identifier) + + # eth_getCode + + _get_code: Method[ + Callable[ + [Union[Address, ChecksumAddress, ENS], Optional[BlockIdentifier]], HexBytes + ] + ] = Method(RPC.eth_getCode, mungers=[BaseEth.block_id_munger]) + + def get_code( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> HexBytes: + return self._get_code(account, block_identifier) + + # eth_getLogs + + _get_logs: Method[Callable[[FilterParams], List[LogReceipt]]] = Method( + RPC.eth_getLogs, mungers=[default_root_munger] + ) + + def get_logs( + self, + filter_params: FilterParams, + ) -> List[LogReceipt]: + return self._get_logs(filter_params) + + # eth_getTransactionCount + + _get_transaction_count: Method[ + Callable[ + [Union[Address, ChecksumAddress, ENS], Optional[BlockIdentifier]], Nonce + ] + ] = Method( + RPC.eth_getTransactionCount, + mungers=[BaseEth.block_id_munger], + ) + + def get_transaction_count( + self, + account: Union[Address, ChecksumAddress, ENS], + block_identifier: Optional[BlockIdentifier] = None, + ) -> Nonce: + return self._get_transaction_count(account, block_identifier) + + # eth_getTransactionReceipt + + _transaction_receipt: Method[Callable[[_Hash32], TxReceipt]] = Method( + RPC.eth_getTransactionReceipt, mungers=[default_root_munger] + ) + + def get_transaction_receipt(self, transaction_hash: _Hash32) -> TxReceipt: + return self._transaction_receipt(transaction_hash) + + def wait_for_transaction_receipt( + self, transaction_hash: _Hash32, timeout: float = 120, poll_latency: float = 0.1 + ) -> TxReceipt: + try: + with Timeout(timeout) as _timeout: + while True: + try: + tx_receipt = self._transaction_receipt(transaction_hash) + except TransactionNotFound: + tx_receipt = None + if tx_receipt is not None: + break + _timeout.sleep(poll_latency) + return tx_receipt + + except Timeout: + raise TimeExhausted( + f"Transaction {HexBytes(transaction_hash) !r} is not in the chain " + f"after {timeout} seconds" + ) + + # eth_getStorageAt + + get_storage_at: Method[ + Callable[[Union[Address, ChecksumAddress, ENS], int], HexBytes] + ] = Method( + RPC.eth_getStorageAt, + mungers=[BaseEth.get_storage_at_munger], + ) + + # eth_getProof + + def get_proof_munger( + self, + account: Union[Address, ChecksumAddress, ENS], + positions: Sequence[int], + block_identifier: Optional[BlockIdentifier] = None, + ) -> Tuple[ + Union[Address, ChecksumAddress, ENS], Sequence[int], Optional[BlockIdentifier] + ]: + if block_identifier is None: + block_identifier = self.default_block + return (account, positions, block_identifier) + + get_proof: Method[ + Callable[ + [ + Tuple[ + Union[Address, ChecksumAddress, ENS], + Sequence[int], + Optional[BlockIdentifier], + ] + ], + MerkleProof, + ] + ] = Method( + RPC.eth_getProof, + mungers=[get_proof_munger], + ) + + # eth_getUncleCountByBlockHash + # eth_getUncleCountByBlockNumber + + get_uncle_count: Method[Callable[[BlockIdentifier], int]] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getUncleCountByBlockNumber, + if_hash=RPC.eth_getUncleCountByBlockHash, + if_number=RPC.eth_getUncleCountByBlockNumber, + ), + mungers=[default_root_munger], + ) + + # eth_getUncleByBlockHashAndIndex + # eth_getUncleByBlockNumberAndIndex + + get_uncle_by_block: Method[Callable[[BlockIdentifier, int], Uncle]] = Method( + method_choice_depends_on_args=select_method_for_block_identifier( + if_predefined=RPC.eth_getUncleByBlockNumberAndIndex, + if_hash=RPC.eth_getUncleByBlockHashAndIndex, + if_number=RPC.eth_getUncleByBlockNumberAndIndex, + ), + mungers=[default_root_munger], + ) + + def replace_transaction( + self, transaction_hash: _Hash32, new_transaction: TxParams + ) -> HexBytes: + current_transaction = get_required_transaction(self.w3, transaction_hash) + return replace_transaction(self.w3, current_transaction, new_transaction) + + # todo: Update Any to stricter kwarg checking with TxParams + # https://github.com/python/mypy/issues/4441 + def modify_transaction( + self, transaction_hash: _Hash32, **transaction_params: Any + ) -> HexBytes: + assert_valid_transaction_params(cast(TxParams, transaction_params)) + current_transaction = get_required_transaction(self.w3, transaction_hash) + current_transaction_params = extract_valid_transaction_params( + current_transaction + ) + new_transaction = merge(current_transaction_params, transaction_params) + return replace_transaction(self.w3, current_transaction, new_transaction) + + # eth_sign + + def sign_munger( + self, + account: Union[Address, ChecksumAddress, ENS], + data: Union[int, bytes] = None, + hexstr: HexStr = None, + text: str = None, + ) -> Tuple[Union[Address, ChecksumAddress, ENS], HexStr]: + message_hex = to_hex(data, hexstr=hexstr, text=text) + return (account, message_hex) + + sign: Method[Callable[..., HexStr]] = Method(RPC.eth_sign, mungers=[sign_munger]) + + # eth_signTransaction + + sign_transaction: Method[Callable[[TxParams], SignedTx]] = Method( + RPC.eth_signTransaction, + mungers=[default_root_munger], + ) + + # eth_signTypedData + + sign_typed_data: Method[ + Callable[[Union[Address, ChecksumAddress, ENS], str], HexStr] + ] = Method( + RPC.eth_signTypedData, + mungers=[default_root_munger], + ) + + # eth_newFilter, eth_newBlockFilter, eth_newPendingTransactionFilter + + filter: Method[ + Callable[[Optional[Union[str, FilterParams, HexStr]]], Filter] + ] = Method( + method_choice_depends_on_args=select_filter_method( + if_new_block_filter=RPC.eth_newBlockFilter, + if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter, + if_new_filter=RPC.eth_newFilter, + ), + mungers=[BaseEth.filter_munger], + ) + + # eth_getFilterChanges, eth_getFilterLogs, eth_uninstallFilter + + get_filter_changes: Method[Callable[[HexStr], List[LogReceipt]]] = Method( + RPC.eth_getFilterChanges, mungers=[default_root_munger] + ) + + get_filter_logs: Method[Callable[[HexStr], List[LogReceipt]]] = Method( + RPC.eth_getFilterLogs, mungers=[default_root_munger] + ) + + uninstall_filter: Method[Callable[[HexStr], bool]] = Method( + RPC.eth_uninstallFilter, + mungers=[default_root_munger], + ) + + # eth_submitHashrate + + submit_hashrate: Method[Callable[[int, _Hash32], bool]] = Method( + RPC.eth_submitHashrate, + mungers=[default_root_munger], + ) + + # eth_getWork, eth_submitWork + + get_work: Method[Callable[[], List[HexBytes]]] = Method( + RPC.eth_getWork, + is_property=True, + ) + + submit_work: Method[Callable[[int, _Hash32, _Hash32], bool]] = Method( + RPC.eth_submitWork, + mungers=[default_root_munger], + )