diff --git a/.circleci/config.yml b/.circleci/config.yml index dcfb35e437..9be90c1275 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -288,6 +288,13 @@ jobs: environment: TOXENV: py38-integration-goethereum-ipc + py38-integration-goethereum-ipc_async: + <<: *geth_steps + docker: + - image: cimg/python:3.8 + environment: + TOXENV: py38-integration-goethereum-ipc_async + py38-integration-goethereum-ipc_flaky: <<: *geth_steps docker: @@ -427,6 +434,13 @@ jobs: environment: TOXENV: py39-integration-goethereum-ipc + py39-integration-goethereum-ipc_async: + <<: *geth_steps + docker: + - image: cimg/python:3.9 + environment: + TOXENV: py39-integration-goethereum-ipc_async + py39-integration-goethereum-ipc_flaky: <<: *geth_steps docker: @@ -566,6 +580,13 @@ jobs: environment: TOXENV: py310-integration-goethereum-ipc + py310-integration-goethereum-ipc_async: + <<: *geth_steps + docker: + - image: cimg/python:3.10 + environment: + TOXENV: py310-integration-goethereum-ipc_async + py310-integration-goethereum-ipc_flaky: <<: *geth_steps docker: @@ -711,6 +732,13 @@ jobs: environment: TOXENV: py311-integration-goethereum-ipc + py311-integration-goethereum-ipc_async: + <<: *geth_steps + docker: + - image: cimg/python:3.11 + environment: + TOXENV: py311-integration-goethereum-ipc_async + py311-integration-goethereum-ipc_flaky: <<: *geth_steps docker: @@ -824,6 +852,7 @@ workflows: - py38-ensip15 - py38-ethpm - py38-integration-goethereum-ipc + - py38-integration-goethereum-ipc_async - py38-integration-goethereum-ipc_flaky - py38-integration-goethereum-http - py38-integration-goethereum-http_async @@ -841,6 +870,7 @@ workflows: - py39-ensip15 - py39-ethpm - py39-integration-goethereum-ipc + - py39-integration-goethereum-ipc_async - py39-integration-goethereum-ipc_flaky - py39-integration-goethereum-http - py39-integration-goethereum-http_async @@ -858,6 +888,7 @@ workflows: - py310-ensip15 - py310-ethpm - py310-integration-goethereum-ipc + - py310-integration-goethereum-ipc_async - py310-integration-goethereum-ipc_flaky - py310-integration-goethereum-http - py310-integration-goethereum-http_async @@ -875,6 +906,7 @@ workflows: - py311-ensip15 - py311-ethpm - py311-integration-goethereum-ipc + - py311-integration-goethereum-ipc_async - py311-integration-goethereum-ipc_flaky - py311-integration-goethereum-http - py311-integration-goethereum-http_async diff --git a/.gitignore b/.gitignore index 87e55458ea..ca0fee18d1 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,7 @@ docs/web3.providers.eth_tester.rst docs/web3.providers.rst docs/web3.providers.rpc.rst docs/web3.providers.websocket.rst +docs/web3.providers.persistent.rst docs/web3.rst docs/web3.scripts.release.rst docs/web3.scripts.rst diff --git a/docs/conf.py b/docs/conf.py index 210918e4f6..7d7f71970f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -88,6 +88,7 @@ "web3.providers.rst", "web3.providers.rpc.rst", "web3.providers.websocket.rst", + "web3.providers.persistent.rst", "web3.providers.eth_tester.rst", "web3.scripts.*", "web3.testing.rst", diff --git a/docs/internals.rst b/docs/internals.rst index 00ec97aa45..513694e159 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -304,12 +304,12 @@ implemented in the Middleware layer. Request Processing for Persistent Connection Providers ------------------------------------------------------ -.. py:class:: web3.providers.websocket.request_processor.RequestProcessor +.. py:class:: web3.providers.persistent.request_processor.RequestProcessor The ``RequestProcessor`` class is responsible for the storing and syncing up of asynchronous requests to responses for a ``PersistentConnectionProvider``. The best example of one such provider is the -:class:`~web3.providers.websocket.WebsocketProviderV2`. In order to send a websocket +:class:`~web3.providers.persistent.WebsocketProviderV2`. In order to send a websocket message and receive a response to that particular request, ``PersistentConnectionProvider`` instances have to match request *id* values to response *id* values coming back from the websocket connection. Any provider that does @@ -339,7 +339,7 @@ back. An example is using the ``eth`` module API to request the latest block num .. code-block:: python >>> async def wsV2_one_to_one_example(): - ... async with AsyncWeb3.persistent_websocket( + ... async with AsyncWeb3.persistent_connection( ... WebsocketProviderV2(f"ws://127.0.0.1:8546") ... ) as w3: ... # make a request and expect a single response returned on the same line @@ -414,23 +414,24 @@ subscription *id* value, but it also expects to receive many ``eth_subscription` messages if and when the request is successful. For this reason, the original request is considered a one-to-one request so that a subscription *id* can be returned to the user on the same line, but the ``process_subscriptions()`` method on the -:class:`~web3.providers.websocket.WebsocketConnection` class, the public API for +:class:`~web3.providers.persistent.PersistentConnection` class, the public API for interacting with the active websocket connection, is set up to receive ``eth_subscription`` responses over an asynchronous interator pattern. .. code-block:: python >>> async def ws_v2_subscription_example(): - ... async with AsyncWeb3.persistent_websocket( + ... async with AsyncWeb3.persistent_connection( ... WebsocketProviderV2(f"ws://127.0.0.1:8546") ... ) as w3: ... # Subscribe to new block headers and receive the subscription_id. ... # A one-to-one call with a trigger for many responses ... subscription_id = await w3.eth.subscribe("newHeads") ... - ... # Listen to the websocket for the many responses utilizing the ``w3.ws`` - ... # ``WebsocketConnection`` public API method ``process_subscriptions()`` - ... async for response in w3.ws.process_subscriptions(): + ... # Listen to the websocket for the many responses utilizing the + ... # ``w3.socket`` ``PersistentConnection`` public API method + ... # ``process_subscriptions()`` + ... async for response in w3.socket.process_subscriptions(): ... # Receive only one-to-many responses here so that we don't ... # accidentally return the response for a one-to-one request in this ... # block @@ -450,7 +451,7 @@ are stored in an internal ``asyncio.Queue`` instance, isolated from any one-to-o responses. When the ``PersistentConnectionProvider`` is looking for one-to-many responses internally, it will expect the message listener task to store these messages in this queue. Since the order of the messages is important, the queue is a FIFO queue. -The ``process_subscriptions()`` method on the ``WebsocketConnection`` class is set up +The ``process_subscriptions()`` method on the ``PersistentConnection`` class is set up to pop messages from this queue as FIFO over an asynchronous iterator pattern. If the stream of messages from the websocket is not being interrupted by any other diff --git a/docs/overview.rst b/docs/overview.rst index 421bf02d6d..483f7a9665 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -26,8 +26,9 @@ following built-in providers: - :class:`~web3.providers.ipc.IPCProvider` for connecting to ipc socket based JSON-RPC servers. - :class:`~web3.providers.rpc.HTTPProvider` for connecting to http and https based JSON-RPC servers. - :class:`~web3.providers.websocket.WebsocketProvider` for connecting to ws and wss websocket based JSON-RPC servers. -- :class:`~web3.providers.async_rpc.AsyncHTTPProvider` for connecting to http and https based JSON-RPC servers. - +- :class:`~web3.providers.async_rpc.AsyncHTTPProvider` for connecting to http and https based JSON-RPC servers asynchronously. +- :class:`~web3.providers.persistent.WebsocketProviderV2` (beta) for connecting to websocket based JSON-RPC servers asynchronously. +- :class:`~web3.providers.persistent.AsyncIPCProvider` (beta) for connecting to ipc socket based JSON-RPC servers asynchronously. Examples ^^^^^^^^ @@ -323,7 +324,7 @@ ENS `Ethereum Name Service (ENS) `_ provides the infrastructure for human-readable addresses. If an address is registered with the ENS registry, -the domain name can be used in place of the address itself. For example, the registered domain +the domain name can be used in place of the address itself. For example, the registered domain name ``ethereum.eth`` will resolve to the address ``0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe``. web3.py has support for ENS, documented :ref:`here `. diff --git a/docs/providers.rst b/docs/providers.rst index 811e515cc1..973fd23077 100644 --- a/docs/providers.rst +++ b/docs/providers.rst @@ -47,8 +47,9 @@ Providers are web3.py classes that are configured for the kind of connection you See: - :class:`~web3.providers.ipc.IPCProvider` +- :class:`~web3.providers.persistent.AsyncIPCProvider` - :class:`~web3.providers.websocket.WebsocketProvider` -- :class:`~web3.providers.websocket.WebsocketProviderV2` +- :class:`~web3.providers.persistent.WebsocketProviderV2` - :class:`~web3.providers.rpc.HTTPProvider` - :class:`~web3.providers.async_rpc.AsyncHTTPProvider` @@ -87,7 +88,8 @@ Auto-initialization Provider Shortcuts Geth dev Proof of Authority ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -To connect to a ``geth --dev`` Proof of Authority instance with defaults: +To connect to a ``geth --dev`` Proof of Authority instance with +the POA middleware loaded by default: .. code-block:: python @@ -97,6 +99,18 @@ To connect to a ``geth --dev`` Proof of Authority instance with defaults: >>> w3.is_connected() True +Or, connect to an async web3 instance: + +.. code-block:: python + + >>> from web3.auto.gethdev import async_w3 + >>> await async_w3.provider.connect() + + # confirm that the connection succeeded + >>> await async_w3.is_connected() + True + + Built In Providers ------------------ @@ -157,7 +171,7 @@ HTTPProvider IPCProvider ~~~~~~~~~~~ -.. py:class:: web3.providers.ipc.IPCProvider(ipc_path=None, testnet=False, timeout=10) +.. py:class:: web3.providers.ipc.IPCProvider(ipc_path=None, timeout=10) This provider handles interaction with an IPC Socket based JSON-RPC server. @@ -177,6 +191,120 @@ IPCProvider - On Windows: ``\\.\pipe\geth.ipc`` +AsyncIPCProvider (beta) +~~~~~~~~~~~~~~~~~~~~~~~ + +.. warning:: This provider is still in beta. However, it is being actively developed + and supported and is expected to be stable in the next major version of *web3.py* + (v7). + +.. py:class:: web3.providers.persistent.AsyncIPCProvider(ipc_path=None, request_timeout=10, max_connection_retries=5) + + This provider handles asynchronous, persistent interaction + with an IPC Socket based JSON-RPC server. + + * ``ipc_path`` is the filesystem path to the IPC socket: + + This provider inherits from the + :class:`~web3.providers.persistent.persistent_connection.PersistentConnectionProvider` class. Refer to + the :class:`~web3.providers.persistent.persistent_connection.PersistentConnectionProvider` documentation + for details on additional configuration options available for this provider. + + If no ``ipc_path`` is specified, it will use a default depending on your operating + system. + + - On Linux and FreeBSD: ``~/.ethereum/geth.ipc`` + - On Mac OS: ``~/Library/Ethereum/geth.ipc`` + - On Windows: ``\\.\pipe\geth.ipc`` + +AsyncIPCProvider Usage +++++++++++++++++++++++ + +The ``AsyncWeb3`` class may be used as a context manager, utilizing the ``async with`` +syntax, when connecting via ``persistent_connection()`` using the +``AsyncIPCProvider``. This will automatically close the connection when the context +manager exits and is the recommended way to initiate a persistent connection to the +provider. + +.. code-block:: python + + >>> import asyncio + >>> from web3 import AsyncWeb3, AsyncIPCProvider + + >>> LOG = True # toggle debug logging + >>> if LOG: + ... import logging + ... logger = logging.getLogger("web3.providers.AsyncIPCProvider") + ... logger.setLevel(logging.DEBUG) + ... logger.addHandler(logging.StreamHandler()) + + >>> async def subscription_context_manager_example(): + ... async with AsyncWeb3.persistent_connection( + ... AsyncIPCProvider('path/to/ipc') + ... ) as w3: + ... # subscribe to new block headers + ... subscription_id = await w3.eth.subscribe("newHeads") + ... + ... async for response in w3.socket.process_subscriptions(): + ... print(f"{response}\n") + ... # handle responses here + ... + ... if some_condition: + ... # unsubscribe from new block headers and break out of + ... # iterator + ... await w3.eth.unsubscribe(subscription_id) + ... break + ... + ... # still an open connection, make any other requests and get + ... # responses via send / receive + ... latest_block = await w3.eth.get_block("latest") + ... print(f"Latest block: {latest_block}") + ... + ... # the connection closes automatically when exiting the context + ... # manager (the `async with` block) + + >>> asyncio.run(subscription_context_manager_example()) + +If the above initilization pattern doesn't work for your application, the ``__await__()`` +method is defined on the ``persistent_connection()`` connection in a manner that awaits +connecting to the socket. You may also choose to instantiate and connect via the +provider in separate lines. Both of these examples are shown below. + +.. code-block:: python + + >>> async def alternate_init_example_1(): + ... # awaiting the persistent connection itself will connect to the socket + ... w3 = await AsyncWeb3.persistent_connection(AsyncIPCProvider('path/to/ipc')) + ... + ... # some code here + ... + ... # manual cleanup + ... await w3.provider.disconnect() + + # run the example + >>> asyncio.run(alternate_init_example_1) + + >>> async def alternate_init_example_2(): + ... # instantiation and connection via the provider as separate lines + ... w3 = AsyncWeb3.persistent_connection(AsyncIPCProvider('path/to/ipc')) + ... await w3.provider.connect() + ... + ... # some code here + ... + ... # manual cleanup + ... await w3.provider.disconnect() + + # run the example + >>> asyncio.run(alternate_init_example_2) + +The ``AsyncIPCProvider`` class uses the +:class:`~web3.providers.persistent.request_processor.RequestProcessor` class under the +hood to sync up the receiving of responses and response processing for one-to-one and +one-to-many request-to-response requests. Refer to the +:class:`~web3.providers.persistent.request_processor.RequestProcessor` +documentation for details. + + WebsocketProvider ~~~~~~~~~~~~~~~~~ @@ -222,12 +350,13 @@ WebsocketProvider Persistent Connection Providers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. py:class:: web3.providers.persistent.PersistentConnectionProvider(endpoint_uri: str, request_timeout: float = 50.0, subscription_response_queue_size: int = 500) +.. py:class:: web3.providers.persistent.persistent_connection.PersistentConnectionProvider(endpoint_uri: str, request_timeout: float = 50.0, subscription_response_queue_size: int = 500) - This is a base provider class, currently inherited by the ``WebsocketProviderV2``. + This is a base provider class, currently inherited by the ``WebsocketProviderV2``, + and the ``AsyncIPCProvider``. It handles interactions with a persistent connection to a JSON-RPC server. Among its configuration, it houses all of the - :class:`~web3.providers.websocket.request_processor.RequestProcessor` logic for + :class:`~web3.providers.persistent.request_processor.RequestProcessor` logic for handling the asynchronous sending and receiving of requests and responses. See the :ref:`internals__persistent_connection_providers` section for more details on the internals of persistent connection providers. @@ -242,15 +371,20 @@ Persistent Connection Providers asynchronous receiving and processing of responses. When in sync with the websocket stream, this queue should only ever store 1 to a few messages at a time. + * ``silence_listener_task_exceptions`` is a boolean that determines whether + exceptions raised by the listener task are silenced. Defaults to ``False``, + raising any exceptions that occur in the listener task. + + WebsocketProviderV2 (beta) -`````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~ .. warning:: This provider is still in beta. However, it is being actively developed and supported and is expected to be stable in the next major version of *web3.py* (v7). -.. py:class:: web3.providers.websocket.WebsocketProviderV2(endpoint_uri: str, websocket_kwargs: Dict[str, Any] = {}, silence_listener_task_exceptions: bool = False) +.. py:class:: web3.providers.persistent.WebsocketProviderV2(endpoint_uri: str, websocket_kwargs: Dict[str, Any] = {}, silence_listener_task_exceptions: bool = False) This provider handles interactions with an WS or WSS based JSON-RPC server. @@ -258,13 +392,10 @@ WebsocketProviderV2 (beta) ``'ws://localhost:8546'``. * ``websocket_kwargs`` this should be a dictionary of keyword arguments which will be passed onto the ws/wss websocket connection. - * ``silence_listener_task_exceptions`` is a boolean that determines whether - exceptions raised by the listener task are silenced. Defaults to ``False``, - raising any exceptions that occur in the listener task. This provider inherits from the - :class:`~web3.providers.persistent.PersistentConnectionProvider` class. Refer to - the :class:`~web3.providers.persistent.PersistentConnectionProvider` documentation + :class:`~web3.providers.persistent.persistent_connection.PersistentConnectionProvider` class. Refer to + the :class:`~web3.providers.persistent.persistent_connection.PersistentConnectionProvider` documentation for details on additional configuration options available for this provider. Under the hood, the ``WebsocketProviderV2`` uses the python websockets library for @@ -273,11 +404,11 @@ WebsocketProviderV2 (beta) available arguments. -Usage -+++++ +WebsocketProviderV2 Usage ++++++++++++++++++++++++++ The ``AsyncWeb3`` class may be used as a context manager, utilizing the ``async with`` -syntax, when connecting via ``persistent_websocket()`` using the +syntax, when connecting via ``persistent_connection()`` using the ``WebsocketProviderV2``. This will automatically close the connection when the context manager exits and is the recommended way to initiate a persistent connection to the websocket provider. A similar example, using the ``websockets`` connection as an @@ -297,13 +428,13 @@ asynchronous context manager, can be found in the `websockets connection`_ docs. ... logger.addHandler(logging.StreamHandler()) >>> async def ws_v2_subscription_context_manager_example(): - ... async with AsyncWeb3.persistent_websocket( + ... async with AsyncWeb3.persistent_connection( ... WebsocketProviderV2(f"ws://127.0.0.1:8546") ... ) as w3: ... # subscribe to new block headers ... subscription_id = await w3.eth.subscribe("newHeads") ... - ... async for response in w3.ws.process_subscriptions(): + ... async for response in w3.socket.process_subscriptions(): ... print(f"{response}\n") ... # handle responses here ... @@ -325,7 +456,7 @@ asynchronous context manager, can be found in the `websockets connection`_ docs. The ``AsyncWeb3`` class may also be used as an asynchronous iterator, utilizing the -``async for`` syntax, when connecting via ``persistent_websocket()`` using the +``async for`` syntax, when connecting via ``persistent_connection()`` using the ``WebsocketProviderV2``. This may be used to set up an indefinite websocket connection and reconnect automatically if the connection is lost. A similar example, using the ``websockets`` connection as an asynchronous iterator, can be found in the @@ -341,7 +472,7 @@ and reconnect automatically if the connection is lost. A similar example, using >>> import websockets >>> async def ws_v2_subscription_iterator_example(): - ... async for w3 in AsyncWeb3.persistent_websocket( + ... async for w3 in AsyncWeb3.persistent_connection( ... WebsocketProviderV2(f"ws://127.0.0.1:8546") ... ): ... try: @@ -354,7 +485,7 @@ and reconnect automatically if the connection is lost. A similar example, using If neither of the two init patterns above work for your application, the ``__await__()`` -method is defined on the ``persistent_websocket()`` connection in a manner that awaits +method is defined on the ``persistent_connection()`` connection in a manner that awaits connecting to the websocket. You may also choose to instantiate and connect via the provider in separate lines. Both of these examples are shown below. @@ -362,7 +493,7 @@ provider in separate lines. Both of these examples are shown below. >>> async def ws_v2_alternate_init_example_1(): ... # awaiting the persistent connection itself will connect to the websocket - ... w3 = await AsyncWeb3.persistent_websocket(WebsocketProviderV2(f"ws://127.0.0.1:8546")) + ... w3 = await AsyncWeb3.persistent_connection(WebsocketProviderV2(f"ws://127.0.0.1:8546")) ... ... # some code here ... @@ -374,7 +505,7 @@ provider in separate lines. Both of these examples are shown below. >>> async def ws_v2_alternate_init_example_2(): ... # instantiation and connection via the provider as separate lines - ... w3 = AsyncWeb3.persistent_websocket(WebsocketProviderV2(f"ws://127.0.0.1:8546")) + ... w3 = AsyncWeb3.persistent_connection(WebsocketProviderV2(f"ws://127.0.0.1:8546")) ... await w3.provider.connect() ... ... # some code here @@ -386,17 +517,17 @@ provider in separate lines. Both of these examples are shown below. >>> asyncio.run(ws_v2_alternate_init_example_2) The ``WebsocketProviderV2`` class uses the -:class:`~web3.providers.websocket.request_processor.RequestProcessor` class under the +:class:`~web3.providers.persistent.request_processor.RequestProcessor` class under the hood to sync up the receiving of responses and response processing for one-to-one and one-to-many request-to-response requests. Refer to the -:class:`~web3.providers.websocket.request_processor.RequestProcessor` +:class:`~web3.providers.persistent.request_processor.RequestProcessor` documentation for details. -_PersistentConnectionWeb3 via AsyncWeb3.persistent_websocket() -++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +_PersistentConnectionWeb3 via AsyncWeb3.persistent_connection() ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ When an ``AsyncWeb3`` class is connected to a persistent websocket connection, via the -``persistent_websocket()`` method, it becomes an instance of the +``persistent_connection()`` method, it becomes an instance of the ``_PersistentConnectionWeb3`` class. This class has a few additional methods and attributes that are not available on the ``AsyncWeb3`` class. @@ -407,18 +538,18 @@ attributes that are not available on the ``AsyncWeb3`` class. The public API for interacting with the websocket connection is available via the ``ws`` attribute of the ``_PersistentConnectionWeb3`` class. This attribute is an instance of the - :class:`~web3.providers.websocket.WebsocketConnection` class and is the main + :class:`~web3.providers.persistent.persistent_connection.PersistentConnection` class and is the main interface for interacting with the websocket connection. Interacting with the Websocket Connection +++++++++++++++++++++++++++++++++++++++++ -.. py:class:: web3.providers.websocket.WebsocketConnection +.. py:class:: web3.providers.persistent.persistent_connection.PersistentConnection This class handles interactions with a websocket connection. It is available via the ``ws`` attribute of the ``_PersistentConnectionWeb3`` class. The - ``WebsocketConnection`` class has the following methods and attributes: + ``PersistentConnection`` class has the following methods and attributes: .. py:attribute:: subscriptions @@ -437,7 +568,7 @@ Interacting with the Websocket Connection The responses from this method are formatted by web3.py formatters and run through the middlewares that were present at the time of subscription. - An example of its use can be seen above in the `Usage`_ section. + An example of its use can be seen above in the `WebsocketProviderV2 Usage`_ section. .. py:method:: recv() @@ -457,7 +588,7 @@ Interacting with the Websocket Connection responses will not be formatted by web3.py formatters or run through the middlewares. Instead, use the methods available on the respective web3 module. For example, use ``w3.eth.get_block("latest")`` instead of - ``w3.ws.send("eth_getBlockByNumber", ["latest", True])``. + ``w3.socket.send("eth_getBlockByNumber", ["latest", True])``. AutoProvider diff --git a/newsfragments/2984.feature.rst b/newsfragments/2984.feature.rst new file mode 100644 index 0000000000..1ac37350d0 --- /dev/null +++ b/newsfragments/2984.feature.rst @@ -0,0 +1 @@ +Add AsyncIPCProvider diff --git a/tests/core/providers/test_async_ipc_provider.py b/tests/core/providers/test_async_ipc_provider.py new file mode 100644 index 0000000000..847ceacba8 --- /dev/null +++ b/tests/core/providers/test_async_ipc_provider.py @@ -0,0 +1,186 @@ +import json +import os +import pathlib +import pytest +import socket +import tempfile +from threading import ( + Thread, +) +import time +import uuid + +from web3 import ( + AsyncWeb3, +) +from web3.datastructures import ( + AttributeDict, +) +from web3.exceptions import ( + ProviderConnectionError, +) +from web3.providers import ( + AsyncIPCProvider, +) + +ETH_SUBSCRIBE_RESPONSE = { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscription", + "params": { + "result": { + "removed": "false", + "transaction": { + "hash": "0xa8f2cf69e302da6c8100b80298ed77c37b6e75eed1177ca22acd5772c9fb9876", # noqa: E501 + }, + }, + "subscription": "0xf13f7073ddef66a8c1b0c9c9f0e543c3", + }, +} + + +@pytest.fixture +def jsonrpc_ipc_pipe_path(): + with tempfile.TemporaryDirectory() as temp_dir: + ipc_path = os.path.join(temp_dir, f"{uuid.uuid4()}.ipc") + try: + yield ipc_path + finally: + if os.path.exists(ipc_path): + os.remove(ipc_path) + + +@pytest.fixture +def simple_ipc_server(jsonrpc_ipc_pipe_path): + serv = socket.socket(socket.AF_UNIX) + serv.bind(jsonrpc_ipc_pipe_path) + serv.listen(1) + try: + yield serv + finally: + serv.close() + + +@pytest.fixture +def serve_empty_result(simple_ipc_server): + def reply(): + connection, client_address = simple_ipc_server.accept() + try: + connection.recv(1024) + connection.sendall(b'{"id": 0, "result": {}') + time.sleep(0.1) + connection.sendall(b"}") + except BrokenPipeError: + pass + finally: + # Clean up the connection + connection.close() + simple_ipc_server.close() + + thd = Thread(target=reply, daemon=True) + thd.start() + + try: + yield + finally: + thd.join() + + +@pytest.fixture +def serve_subscription_result(simple_ipc_server): + def reply(): + connection, client_address = simple_ipc_server.accept() + try: + connection.recv(1024) + connection.sendall( + b'{"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"}' # noqa: E501 + ) + connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8")) + finally: + # Clean up the connection + connection.close() + simple_ipc_server.close() + + thd = Thread(target=reply, daemon=True) + thd.start() + + try: + yield + finally: + thd.join() + + +def test_ipc_tilde_in_path(): + expected_path = str(pathlib.Path.home()) + "/foo" + assert AsyncIPCProvider("~/foo").ipc_path == expected_path + assert AsyncIPCProvider(pathlib.Path("~/foo")).ipc_path == expected_path + + +@pytest.mark.asyncio +async def test_provider_is_connected(jsonrpc_ipc_pipe_path, serve_empty_result): + w3 = await AsyncWeb3.persistent_connection( + AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path)) + ) + await w3.provider.disconnect() + assert await w3.is_connected() is False + with pytest.raises(ProviderConnectionError): + await w3.is_connected(show_traceback=True) + + +@pytest.mark.asyncio +async def test_async_waits_for_full_result(jsonrpc_ipc_pipe_path, serve_empty_result): + async with AsyncWeb3.persistent_connection( + AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path)) + ) as w3: + result = await w3.provider.make_request("method", []) + assert result == {"id": 0, "result": {}} + await w3.provider.disconnect() + + +@pytest.mark.asyncio +async def test_await_persistent_connection(jsonrpc_ipc_pipe_path, serve_empty_result): + w3 = await AsyncWeb3.persistent_connection( + AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path)) + ) + result = await w3.provider.make_request("method", []) + assert result == {"id": 0, "result": {}} + await w3.provider.disconnect() + + +@pytest.mark.asyncio +async def test_persistent_connection(jsonrpc_ipc_pipe_path, serve_empty_result): + w3 = AsyncWeb3.persistent_connection( + AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path)) + ) + await w3.provider.connect() + result = await w3.provider.make_request("method", []) + assert result == {"id": 0, "result": {}} + await w3.provider.disconnect() + + +@pytest.mark.asyncio +async def test_eth_subscription(jsonrpc_ipc_pipe_path, serve_subscription_result): + async with AsyncWeb3.persistent_connection( + AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path)) + ) as w3: + subscribe_response = await w3.eth.subscribe("newHeads") + subscription_id = "0xf13f7073ddef66a8c1b0c9c9f0e543c3" + assert subscribe_response == subscription_id + + subscription_response = { + "result": AttributeDict( + { + "removed": "false", + "transaction": AttributeDict( + { + "hash": "0xa8f2cf69e302da6c8100b80298ed77c37b6e75eed1177ca22acd5772c9fb9876" # noqa: E501 + } + ), + } + ), + "subscription": "0xf13f7073ddef66a8c1b0c9c9f0e543c3", + } + async for response in w3.socket.process_subscriptions(): + assert response == subscription_response + break + await w3.provider.disconnect() diff --git a/tests/core/providers/test_http_request_retry.py b/tests/core/providers/test_http_request_retry.py index 9fea423884..d139149fae 100644 --- a/tests/core/providers/test_http_request_retry.py +++ b/tests/core/providers/test_http_request_retry.py @@ -101,7 +101,7 @@ def test_exception_retry_config_is_strictly_on_http_provider(): w3 = Web3(IPCProvider()) assert not hasattr(w3.provider, "exception_retry_configuration") - w3 = AsyncWeb3.persistent_websocket(WebsocketProviderV2("ws://localhost:8546")) + w3 = AsyncWeb3.persistent_connection(WebsocketProviderV2("ws://localhost:8546")) assert not hasattr(w3.provider, "exception_retry_configuration") diff --git a/tests/core/providers/test_wsv2_provider.py b/tests/core/providers/test_wsv2_provider.py index 4ec6e5265c..3996ac3a7e 100644 --- a/tests/core/providers/test_wsv2_provider.py +++ b/tests/core/providers/test_wsv2_provider.py @@ -23,7 +23,7 @@ from web3.exceptions import ( TimeExhausted, ) -from web3.providers.websocket import ( +from web3.providers.persistent import ( WebsocketProviderV2, ) from web3.types import ( @@ -48,7 +48,7 @@ async def test_async_make_request_returns_desired_response(): provider = WebsocketProviderV2("ws://mocked") with patch( - "web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro() + "web3.providers.persistent.websocket_v2.connect", new=lambda *_1, **_2: _coro() ): await provider.connect() @@ -116,7 +116,7 @@ async def test_msg_listener_task_starts_on_provider_connect_and_cancels_on_disco assert provider._message_listener_task is None with patch( - "web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro() + "web3.providers.persistent.websocket_v2.connect", new=lambda *_1, **_2: _coro() ): await provider.connect() # connect @@ -135,7 +135,7 @@ async def test_msg_listener_task_raises_exceptions_by_default(): _mock_ws(provider) with patch( - "web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro() + "web3.providers.persistent.websocket_v2.connect", new=lambda *_1, **_2: _coro() ): await provider.connect() assert provider._message_listener_task is not None @@ -158,7 +158,7 @@ async def test_msg_listener_task_silences_exceptions_and_error_logs_when_configu _mock_ws(provider) with patch( - "web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro() + "web3.providers.persistent.websocket_v2.connect", new=lambda *_1, **_2: _coro() ): await provider.connect() assert provider._message_listener_task is not None @@ -190,9 +190,9 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful is full. """ with patch( - "web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro() + "web3.providers.persistent.websocket_v2.connect", new=lambda *_1, **_2: _coro() ): - async_w3 = await AsyncWeb3.persistent_websocket( + async_w3 = await AsyncWeb3.persistent_connection( WebsocketProviderV2("ws://mocked") ) @@ -254,7 +254,7 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful # set is not called until we start consuming messages async_w3.provider._listen_event.set.assert_not_called() - async for message in async_w3.ws.process_subscriptions(): + async for message in async_w3.socket.process_subscriptions(): # assert the very next message is the mocked subscription assert message == mocked_sub break diff --git a/tests/integration/go_ethereum/test_goethereum_ipc.py b/tests/integration/go_ethereum/test_goethereum_ipc.py index 1120329b19..06f86ed180 100644 --- a/tests/integration/go_ethereum/test_goethereum_ipc.py +++ b/tests/integration/go_ethereum/test_goethereum_ipc.py @@ -2,21 +2,28 @@ import pytest import tempfile +import pytest_asyncio + from tests.integration.common import ( COINBASE, ) from web3 import ( + AsyncIPCProvider, + AsyncWeb3, Web3, ) from .common import ( GoEthereumAdminModuleTest, + GoEthereumAsyncEthModuleTest, + GoEthereumAsyncNetModuleTest, GoEthereumEthModuleTest, GoEthereumNetModuleTest, GoEthereumPersonalModuleTest, GoEthereumTest, ) from .utils import ( + wait_for_async_socket, wait_for_socket, ) @@ -54,6 +61,13 @@ def w3(geth_process, geth_ipc_path): return _w3 +@pytest_asyncio.fixture(scope="module") +async def async_w3(geth_process, geth_ipc_path): + await wait_for_async_socket(geth_ipc_path) + async with AsyncWeb3.persistent_connection(AsyncIPCProvider(geth_ipc_path)) as _aw3: + yield _aw3 + + class TestGoEthereumTest(GoEthereumTest): pass @@ -82,9 +96,17 @@ class TestGoEthereumEthModuleTest(GoEthereumEthModuleTest): pass +class TestGoEthereumAsyncEthModuleTest(GoEthereumAsyncEthModuleTest): + pass + + class TestGoEthereumNetModuleTest(GoEthereumNetModuleTest): pass +class TestGoEthereumAsyncNetModuleTest(GoEthereumAsyncNetModuleTest): + pass + + class TestGoEthereumPersonalModuleTest(GoEthereumPersonalModuleTest): pass diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_await_w3.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_await_w3.py index 9a7dfe4aeb..b4ffb256af 100644 --- a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_await_w3.py +++ b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_await_w3.py @@ -30,7 +30,7 @@ async def async_w3(geth_process, endpoint_uri): await wait_for_aiohttp(endpoint_uri) # await the persistent connection itself - return await AsyncWeb3.persistent_websocket(WebsocketProviderV2(endpoint_uri)) + return await AsyncWeb3.persistent_connection(WebsocketProviderV2(endpoint_uri)) class TestGoEthereumAsyncAdminModuleTest(GoEthereumAsyncAdminModuleTest): diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_ctx_manager_w3.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_ctx_manager_w3.py index 2829f05119..fcad0f0c94 100644 --- a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_ctx_manager_w3.py +++ b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_ctx_manager_w3.py @@ -30,7 +30,7 @@ async def async_w3(geth_process, endpoint_uri): await wait_for_aiohttp(endpoint_uri) # async context manager pattern - async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(endpoint_uri)) as w3: + async with AsyncWeb3.persistent_connection(WebsocketProviderV2(endpoint_uri)) as w3: yield w3 diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py index 67b99a6c49..fb391af68d 100644 --- a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py +++ b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py @@ -30,7 +30,7 @@ async def async_w3(geth_process, endpoint_uri): await wait_for_aiohttp(endpoint_uri) # async iterator pattern - async for w3 in AsyncWeb3.persistent_websocket(WebsocketProviderV2(endpoint_uri)): + async for w3 in AsyncWeb3.persistent_connection(WebsocketProviderV2(endpoint_uri)): return w3 diff --git a/tests/integration/go_ethereum/utils.py b/tests/integration/go_ethereum/utils.py index a9db62c41b..4cf40200d6 100644 --- a/tests/integration/go_ethereum/utils.py +++ b/tests/integration/go_ethereum/utils.py @@ -1,3 +1,4 @@ +import asyncio import signal import socket import time @@ -19,6 +20,17 @@ def wait_for_socket(ipc_path, timeout=30): break +async def wait_for_async_socket(ipc_path, timeout=30): + start = time.time() + while time.time() < start + timeout: + try: + await asyncio.open_unix_connection(ipc_path) + except OSError: + time.sleep(0.01) + else: + break + + def wait_for_http(endpoint_uri, timeout=60): start = time.time() while time.time() < start + timeout: diff --git a/tox.ini b/tox.ini index f33770cb6f..8ab25bbcc6 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,8 @@ commands= ens: pytest {posargs:tests/ens --ignore=tests/ens/normalization/test_normalize_name_ensip15.py} ensip15: pytest {posargs:tests/ens/normalization/test_normalize_name_ensip15.py -q} ethpm: pytest {posargs:tests/ethpm} - integration-goethereum-ipc: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ipc.py} + integration-goethereum-ipc: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ipc.py -k "not Async"} + integration-goethereum-ipc_async: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ipc.py -k Async} integration-goethereum-ipc_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ipc.py --flaky} integration-goethereum-http: pytest {posargs:tests/integration/go_ethereum/test_goethereum_http.py -k "not Async"} integration-goethereum-http_async: pytest {posargs:tests/integration/go_ethereum/test_goethereum_http.py -k Async} diff --git a/web3/__init__.py b/web3/__init__.py index 4406207076..d1974529b7 100644 --- a/web3/__init__.py +++ b/web3/__init__.py @@ -15,6 +15,11 @@ AsyncWeb3, Web3, ) +from web3.providers.persistent import ( # noqa: E402 + AsyncIPCProvider, + PersistentConnectionProvider, + WebsocketProviderV2, +) from web3.providers.eth_tester import ( # noqa: E402 EthereumTesterProvider, ) @@ -27,7 +32,6 @@ ) from web3.providers.websocket import ( # noqa: E402 WebsocketProvider, - WebsocketProviderV2, ) @@ -42,4 +46,5 @@ "EthereumTesterProvider", "Account", "AsyncHTTPProvider", + "AsyncIPCProvider", ] diff --git a/web3/_utils/module_testing/persistent_connection_provider.py b/web3/_utils/module_testing/persistent_connection_provider.py index af996dbf05..bf1f989710 100644 --- a/web3/_utils/module_testing/persistent_connection_provider.py +++ b/web3/_utils/module_testing/persistent_connection_provider.py @@ -295,7 +295,7 @@ async def test_async_eth_subscribe_mocked( ws_subscription_response, subscription=True ) - async for msg in async_w3.ws.process_subscriptions(): + async for msg in async_w3.socket.process_subscriptions(): response = cast(FormattedEthSubscriptionResponse, msg) assert response["subscription"] == sub_id assert response["result"] == expected_formatted_result @@ -331,7 +331,7 @@ async def test_async_extradata_poa_middleware_on_eth_subscription( subscription=True, ) - async for msg in async_w3.ws.process_subscriptions(): + async for msg in async_w3.socket.process_subscriptions(): response = cast(FormattedEthSubscriptionResponse, msg) assert response.keys() == {"subscription", "result"} assert response["subscription"] == sub_id diff --git a/web3/auto/gethdev.py b/web3/auto/gethdev.py index 8ff878c38e..e7387ed2e1 100644 --- a/web3/auto/gethdev.py +++ b/web3/auto/gethdev.py @@ -1,4 +1,6 @@ from web3 import ( + AsyncIPCProvider, + AsyncWeb3, IPCProvider, Web3, ) @@ -11,3 +13,6 @@ w3 = Web3(IPCProvider(get_dev_ipc_path())) w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) + +async_w3 = AsyncWeb3.persistent_connection(AsyncIPCProvider(get_dev_ipc_path())) +async_w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) diff --git a/web3/main.py b/web3/main.py index 604ea4fcdf..b1282f9d8f 100644 --- a/web3/main.py +++ b/web3/main.py @@ -130,8 +130,8 @@ from web3.providers.websocket import ( WebsocketProvider, ) -from web3.providers.websocket.websocket_connection import ( - WebsocketConnection, +from web3.providers.persistent.persistent_connection import ( + PersistentConnection, ) from web3.testing import ( Testing, @@ -496,7 +496,7 @@ def ens(self, new_ens: Union[AsyncENS, "Empty"]) -> None: self._ens = new_ens @staticmethod - def persistent_websocket( + def persistent_connection( provider: PersistentConnectionProvider, middlewares: Optional[Sequence[Any]] = None, modules: Optional[Dict[str, Union[Type[Module], Sequence[Any]]]] = None, @@ -506,7 +506,7 @@ def persistent_websocket( ens: Union[AsyncENS, "Empty"] = empty, ) -> "_PersistentConnectionWeb3": """ - Establish a persistent connection via websockets to a websocket provider using + Establish a persistent connection using a ``PersistentConnectionProvider`` instance. """ return _PersistentConnectionWeb3( @@ -521,7 +521,7 @@ def persistent_websocket( class _PersistentConnectionWeb3(AsyncWeb3): provider: PersistentConnectionProvider - # w3 = AsyncWeb3.persistent_websocket(provider) + # w3 = AsyncWeb3.persistent_connection(provider) # await w3.provider.connect() def __init__( self, @@ -538,9 +538,9 @@ def __init__( "Provider must inherit from PersistentConnectionProvider class." ) AsyncWeb3.__init__(self, provider, middlewares, modules, external_modules, ens) - self.ws = WebsocketConnection(self) + self.socket = PersistentConnection(self) - # w3 = await AsyncWeb3.persistent_websocket(provider) + # w3 = await AsyncWeb3.persistent_connection(provider) def __await__( self, ) -> Generator[Any, None, Self]: @@ -552,7 +552,7 @@ async def __async_init__() -> Self: return __async_init__().__await__() - # async with w3.persistent_websocket(provider) as w3 + # async with w3.persistent_connection(provider) as w3 async def __aenter__(self) -> Self: await self.provider.connect() return self @@ -565,7 +565,7 @@ async def __aexit__( ) -> None: await self.provider.disconnect() - # async for w3 in w3.persistent_websocket(provider) + # async for w3 in w3.persistent_connection(provider) async def __aiter__(self) -> AsyncIterator[Self]: if not await self.provider.is_connected(): await self.provider.connect() diff --git a/web3/manager.py b/web3/manager.py index b0d777ff2b..7567119d65 100644 --- a/web3/manager.py +++ b/web3/manager.py @@ -72,7 +72,7 @@ AsyncBaseProvider, BaseProvider, ) - from web3.providers.websocket.request_processor import ( # noqa: F401 + from web3.providers.persistent.request_processor import ( # noqa: F401 RequestProcessor, ) @@ -314,33 +314,35 @@ async def coro_request( # -- persistent connection -- # - async def ws_send(self, method: RPCEndpoint, params: Any) -> RPCResponse: + async def send(self, method: RPCEndpoint, params: Any) -> RPCResponse: provider = cast(PersistentConnectionProvider, self._provider) request_func = await provider.request_func( cast("AsyncWeb3", self.w3), cast("MiddlewareOnion", self.middleware_onion) ) self.logger.debug( - "Making request to open websocket connection - " + "Making request to open socket connection - " f"uri: {provider.endpoint_uri}, method: {method}" ) response = await request_func(method, params) - return await self._process_ws_response(response) + return await self._process_response(response) def _persistent_message_stream(self) -> "_AsyncPersistentMessageStream": return _AsyncPersistentMessageStream(self) - async def _get_next_ws_message(self) -> Any: - return await self._ws_message_stream().__anext__() + async def _get_next_message(self) -> Any: + return await self._message_stream().__anext__() - async def _ws_message_stream(self) -> AsyncGenerator[RPCResponse, None]: + async def _message_stream(self) -> AsyncGenerator[RPCResponse, None]: if not isinstance(self._provider, PersistentConnectionProvider): raise TypeError( - "Only websocket providers that maintain an open, persistent connection " - "can listen to websocket recv streams." + "Only providers that maintain an open, persistent connection " + "can listen to streams." ) if self._provider._message_listener_task is None: - raise ProviderConnectionError("No listener found for websocket connection.") + raise ProviderConnectionError( + "No listener found for persistent connection." + ) while True: # sleep(0) here seems to be the most efficient way to yield control @@ -354,9 +356,9 @@ async def _ws_message_stream(self) -> AsyncGenerator[RPCResponse, None]: in self._request_processor.active_subscriptions ): # if response is an active subscription response, process it - yield await self._process_ws_response(response) + yield await self._process_response(response) - async def _process_ws_response(self, response: RPCResponse) -> RPCResponse: + async def _process_response(self, response: RPCResponse) -> RPCResponse: provider = cast(PersistentConnectionProvider, self._provider) request_info = self._request_processor.get_request_information_for_response( response @@ -421,6 +423,6 @@ def __aiter__(self) -> Self: async def __anext__(self) -> RPCResponse: try: - return await self.manager._get_next_ws_message() + return await self.manager._get_next_message() except ConnectionClosedOK: raise StopAsyncIteration diff --git a/web3/module.py b/web3/module.py index e04cf5ea31..cb990df944 100644 --- a/web3/module.py +++ b/web3/module.py @@ -97,11 +97,11 @@ async def caller(*args: Any, **kwargs: Any) -> Union[RPCResponse, AsyncLogFilter # For now, keep the expected typing but ignore it here. provider = async_w3.provider cache_key = provider._request_processor.cache_request_information( - method_str, params, response_formatters # type: ignore + cast(RPCEndpoint, method_str), params, response_formatters # type: ignore # noqa: E501 ) try: method_str = cast(RPCEndpoint, method_str) - return await async_w3.manager.ws_send(method_str, params) + return await async_w3.manager.send(method_str, params) except Exception as e: if ( cache_key is not None diff --git a/web3/providers/__init__.py b/web3/providers/__init__.py index e3a74a4d93..48df0c8a32 100644 --- a/web3/providers/__init__.py +++ b/web3/providers/__init__.py @@ -16,10 +16,12 @@ ) from .websocket import ( WebsocketProvider, - WebsocketProviderV2, ) from .persistent import ( + AsyncIPCProvider, + PersistentConnection, PersistentConnectionProvider, + WebsocketProviderV2, ) from .auto import ( AutoProvider, diff --git a/web3/providers/async_base.py b/web3/providers/async_base.py index 471e44e66e..5ee26d3385 100644 --- a/web3/providers/async_base.py +++ b/web3/providers/async_base.py @@ -44,7 +44,6 @@ if TYPE_CHECKING: from web3 import ( # noqa: F401 AsyncWeb3, - WebsocketProviderV2, ) @@ -134,7 +133,7 @@ def decode_rpc_response(self, raw_response: bytes) -> RPCResponse: async def is_connected(self, show_traceback: bool = False) -> bool: try: response = await self.make_request(RPCEndpoint("web3_clientVersion"), []) - except OSError as e: + except (OSError, ProviderConnectionError) as e: if show_traceback: raise ProviderConnectionError( f"Problem connecting to provider with error: {type(e)}: {e}" @@ -148,7 +147,7 @@ async def is_connected(self, show_traceback: bool = False) -> bool: ) return False - if response["jsonrpc"] == "2.0": + if response.get("jsonrpc") == "2.0": return True else: if show_traceback: diff --git a/web3/providers/persistent/__init__.py b/web3/providers/persistent/__init__.py new file mode 100644 index 0000000000..2de74846a9 --- /dev/null +++ b/web3/providers/persistent/__init__.py @@ -0,0 +1,15 @@ +from .persistent import ( + PersistentConnectionProvider, +) +from .persistent_connection import ( + PersistentConnection, +) +from .request_processor import ( + RequestProcessor, +) +from .async_ipc import ( + AsyncIPCProvider, +) +from .websocket_v2 import ( + WebsocketProviderV2, +) diff --git a/web3/providers/persistent/async_ipc.py b/web3/providers/persistent/async_ipc.py new file mode 100644 index 0000000000..90c0f6274f --- /dev/null +++ b/web3/providers/persistent/async_ipc.py @@ -0,0 +1,246 @@ +import asyncio +import errno +import json +from json import ( + JSONDecodeError, +) +import logging +from pathlib import ( + Path, +) +import sys +from typing import ( + Any, + Optional, + Tuple, + Union, +) + +from eth_utils import ( + to_text, +) + +from web3._utils.caching import ( + async_handle_request_caching, + generate_cache_key, +) +from web3.exceptions import ( + ProviderConnectionError, + TimeExhausted, +) +from web3.types import ( + RPCEndpoint, + RPCId, + RPCResponse, +) + +from . import ( + PersistentConnectionProvider, +) +from ..ipc import ( + get_default_ipc_path, +) + + +async def async_get_ipc_socket( + ipc_path: str, +) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + if sys.platform == "win32": + # On Windows named pipe is used. Simulate socket with it. + from web3._utils.windows import ( + NamedPipe, + ) + + return NamedPipe(ipc_path) + else: + return await asyncio.open_unix_connection(ipc_path) + + +class AsyncIPCProvider(PersistentConnectionProvider): + logger = logging.getLogger("web3.providers.AsyncIPCProvider") + + reader: Optional[asyncio.StreamReader] = None + writer: Optional[asyncio.StreamWriter] = None + + def __init__( + self, + ipc_path: Optional[Union[str, Path]] = None, + request_timeout: int = 10, + max_connection_retries: int = 5, + **kwargs: Any, + ) -> None: + if ipc_path is None: + self.ipc_path = get_default_ipc_path() + elif isinstance(ipc_path, str) or isinstance(ipc_path, Path): + self.ipc_path = str(Path(ipc_path).expanduser().resolve()) + else: + raise TypeError("ipc_path must be of type string or pathlib.Path") + + self.request_timeout = request_timeout + self._max_connection_retries = max_connection_retries + super().__init__(request_timeout, max_connection_retries, **kwargs) + + def __str__(self) -> str: + return f"<{self.__class__.__name__} {self.ipc_path}>" + + async def is_connected(self, show_traceback: bool = False) -> bool: + try: + await self.make_request(RPCEndpoint("web3_clientVersion"), []) + return True + except (OSError, BrokenPipeError, ProviderConnectionError) as e: + if show_traceback: + raise ProviderConnectionError( + f"Problem connecting to provider with error: {type(e)}: {e}" + ) + return False + + async def connect(self) -> None: + _connection_attempts = 0 + _backoff_rate_change = 1.75 + _backoff_time = 1.75 + + while _connection_attempts != self._max_connection_retries: + try: + _connection_attempts += 1 + self.reader, self.writer = await async_get_ipc_socket(self.ipc_path) + self._message_listener_task = asyncio.create_task( + self._message_listener() + ) + break + except OSError as e: + if _connection_attempts == self._max_connection_retries: + raise ProviderConnectionError( + f"Could not connect to endpoint: {self.endpoint_uri}. " + f"Retries exceeded max of {self._max_connection_retries}." + ) from e + self.logger.info( + f"Could not connect to endpoint: {self.endpoint_uri}. Retrying in " + f"{round(_backoff_time, 1)} seconds.", + exc_info=True, + ) + await asyncio.sleep(_backoff_time) + _backoff_time *= _backoff_rate_change + + async def disconnect(self) -> None: + if self.writer and not self.writer.is_closing(): + self.writer.close() + await self.writer.wait_closed() + self.writer = None + self.logger.debug( + f'Successfully disconnected from endpoint: "{self.endpoint_uri}' + ) + + try: + self._message_listener_task.cancel() + await self._message_listener_task + self.reader = None + except (asyncio.CancelledError, StopAsyncIteration): + pass + + self._request_processor.clear_caches() + + async def _reset_socket(self) -> None: + self.writer.close() + await self.writer.wait_closed() + self.reader, self.writer = await async_get_ipc_socket(self.ipc_path) + + @async_handle_request_caching + async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + request_data = self.encode_rpc_request(method, params) + + if self.writer is None: + raise ProviderConnectionError( + "Connection to ipc socket has not been initiated for the provider." + ) + + try: + self.writer.write(request_data) + await self.writer.drain() + except OSError as e: + # Broken pipe + if e.errno == errno.EPIPE: + # one extra attempt, then give up + await self._reset_socket() + self.writer.write(request_data) + await self.writer.drain() + + current_request_id = json.loads(request_data)["id"] + response = await self._get_response_for_request_id(current_request_id) + + return response + + async def _get_response_for_request_id(self, request_id: RPCId) -> RPCResponse: + async def _match_response_id_to_request_id() -> RPCResponse: + request_cache_key = generate_cache_key(request_id) + + while True: + # sleep(0) here seems to be the most efficient way to yield control + # back to the event loop while waiting for the response to be in the + # queue. + await asyncio.sleep(0) + + if request_cache_key in self._request_processor._request_response_cache: + self.logger.debug( + f"Popping response for id {request_id} from cache." + ) + popped_response = self._request_processor.pop_raw_response( + cache_key=request_cache_key, + ) + return popped_response + + try: + # Add the request timeout around the while loop that checks the request + # cache and tried to recv(). If the request is neither in the cache, nor + # received within the request_timeout, raise ``TimeExhausted``. + return await asyncio.wait_for( + _match_response_id_to_request_id(), self.request_timeout + ) + except asyncio.TimeoutError: + raise TimeExhausted( + f"Timed out waiting for response with request id `{request_id}` after " + f"{self.request_timeout} second(s). This may be due to the provider " + "not returning a response with the same id that was sent in the " + "request or an exception raised during the request was caught and " + "allowed to continue." + ) + + async def _message_listener(self) -> None: + self.logger.info( + "IPC socket listener background task started. Storing all messages in " + "appropriate request processor queues / caches to be processed." + ) + raw_message = "" + decoder = json.JSONDecoder() + + while True: + # the use of sleep(0) seems to be the most efficient way to yield control + # back to the event loop to share the loop with other tasks. + await asyncio.sleep(0) + + try: + raw_message += to_text(await self.reader.read(4096)).lstrip() + + while raw_message: + try: + response, pos = decoder.raw_decode(raw_message) + except JSONDecodeError: + break + + is_subscription = response.get("method") == "eth_subscription" + await self._request_processor.cache_raw_response( + response, subscription=is_subscription + ) + raw_message = raw_message[pos:].lstrip() + except Exception as e: + if not self.silence_listener_task_exceptions: + loop = asyncio.get_event_loop() + for task in asyncio.all_tasks(loop=loop): + task.cancel() + raise e + + self.logger.error( + "Exception caught in listener, error logging and keeping listener " + f"background task alive.\n error={e}" + ) + # if only error logging, reset the ``raw_message`` buffer and continue + raw_message = "" diff --git a/web3/providers/persistent.py b/web3/providers/persistent/persistent.py similarity index 85% rename from web3/providers/persistent.py rename to web3/providers/persistent/persistent.py index 1dd059f6a3..4c6b6e49b3 100644 --- a/web3/providers/persistent.py +++ b/web3/providers/persistent/persistent.py @@ -14,7 +14,7 @@ from web3.providers.async_base import ( AsyncJSONBaseProvider, ) -from web3.providers.websocket.request_processor import ( +from web3.providers.persistent.request_processor import ( RequestProcessor, ) @@ -35,6 +35,7 @@ def __init__( self, request_timeout: float = DEFAULT_PERSISTENT_CONNECTION_TIMEOUT, subscription_response_queue_size: int = 500, + silence_listener_task_exceptions: bool = False, ) -> None: super().__init__() self._request_processor = RequestProcessor( @@ -42,6 +43,7 @@ def __init__( subscription_response_queue_size=subscription_response_queue_size, ) self.request_timeout = request_timeout + self.silence_listener_task_exceptions = silence_listener_task_exceptions async def connect(self) -> None: raise NotImplementedError("Must be implemented by subclasses") @@ -49,5 +51,5 @@ async def connect(self) -> None: async def disconnect(self) -> None: raise NotImplementedError("Must be implemented by subclasses") - async def _ws_message_listener(self) -> None: + async def _message_listener(self) -> None: raise NotImplementedError("Must be implemented by subclasses") diff --git a/web3/providers/websocket/websocket_connection.py b/web3/providers/persistent/persistent_connection.py similarity index 85% rename from web3/providers/websocket/websocket_connection.py rename to web3/providers/persistent/persistent_connection.py index c2fd50e6c5..45cb8d3bcb 100644 --- a/web3/providers/websocket/websocket_connection.py +++ b/web3/providers/persistent/persistent_connection.py @@ -18,9 +18,9 @@ ) -class WebsocketConnection: +class PersistentConnection: """ - A class that houses the public API for interacting with the websocket connection + A class that houses the public API for interacting with the persistent connection via a `_PersistentConnectionWeb3` instance. """ @@ -33,10 +33,10 @@ def subscriptions(self) -> Dict[str, Any]: return self._manager._request_processor.active_subscriptions async def send(self, method: RPCEndpoint, params: Any) -> RPCResponse: - return await self._manager.ws_send(method, params) + return await self._manager.send(method, params) async def recv(self) -> Any: - return await self._manager._get_next_ws_message() + return await self._manager._get_next_message() def process_subscriptions(self) -> "_AsyncPersistentMessageStream": return self._manager._persistent_message_stream() diff --git a/web3/providers/websocket/request_processor.py b/web3/providers/persistent/request_processor.py similarity index 99% rename from web3/providers/websocket/request_processor.py rename to web3/providers/persistent/request_processor.py index d1641c8caf..1db616448a 100644 --- a/web3/providers/websocket/request_processor.py +++ b/web3/providers/persistent/request_processor.py @@ -219,6 +219,7 @@ async def cache_raw_response( "Subscription queue is full. Waiting for provider to consume " "messages before caching." ) + self._provider._listen_event.clear() await self._provider._listen_event.wait() self._provider.logger.debug( diff --git a/web3/providers/websocket/websocket_v2.py b/web3/providers/persistent/websocket_v2.py similarity index 97% rename from web3/providers/websocket/websocket_v2.py rename to web3/providers/persistent/websocket_v2.py index b294baff1b..b280cdc7c9 100644 --- a/web3/providers/websocket/websocket_v2.py +++ b/web3/providers/persistent/websocket_v2.py @@ -66,7 +66,6 @@ def __init__( self, endpoint_uri: Optional[Union[URI, str]] = None, websocket_kwargs: Optional[Dict[str, Any]] = None, - silence_listener_task_exceptions: bool = False, # `PersistentConnectionProvider` kwargs can be passed through **kwargs: Any, ) -> None: @@ -94,7 +93,6 @@ def __init__( ) self.websocket_kwargs = merge(DEFAULT_WEBSOCKET_KWARGS, websocket_kwargs or {}) - self.silence_listener_task_exceptions = silence_listener_task_exceptions super().__init__(**kwargs) @@ -126,7 +124,7 @@ async def connect(self) -> None: _connection_attempts += 1 self._ws = await connect(self.endpoint_uri, **self.websocket_kwargs) self._message_listener_task = asyncio.create_task( - self._ws_message_listener() + self._message_listener() ) break except WebSocketException as e: @@ -211,7 +209,7 @@ async def _match_response_id_to_request_id() -> RPCResponse: "allowed to continue." ) - async def _ws_message_listener(self) -> None: + async def _message_listener(self) -> None: self.logger.info( "Websocket listener background task started. Storing all messages in " "appropriate request processor queues / caches to be processed." diff --git a/web3/providers/websocket/websocket.py b/web3/providers/websocket.py similarity index 100% rename from web3/providers/websocket/websocket.py rename to web3/providers/websocket.py diff --git a/web3/providers/websocket/__init__.py b/web3/providers/websocket/__init__.py deleted file mode 100644 index 1afa41f867..0000000000 --- a/web3/providers/websocket/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from .websocket import ( - DEFAULT_WEBSOCKET_TIMEOUT, - RESTRICTED_WEBSOCKET_KWARGS, - WebsocketProvider, -) -from .websocket_connection import ( - WebsocketConnection, -) -from .websocket_v2 import ( - WebsocketProviderV2, -)