Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions docs/providers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,15 @@ asynchronous context manager, can be found in the `websockets connection`_ docs.
... # subscribe to new block headers
... subscription_id = await w3.eth.subscribe("newHeads")
...
... unsubscribed = False
... while not unsubscribed:
... async for response in w3.ws.listen_to_websocket():
... print(f"{response}\n")
... # handle responses here
... async for response in w3.ws.listen_to_websocket():
... print(f"{response}\n")
... # handle responses here
...
... if some_condition:
... # unsubscribe from new block headers
... unsubscribed = await w3.eth.unsubscribe(subscription_id)
... break
... if some_condition:
... # unsubscribe from new block headers and break out of
... # iterator
... await w3.eth.unsubscribe(subscription_id)
... break
...
... # still an open connection, make any other requests and get
... # responses via send / receive
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3116.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor the async iterator pattern for message streams from the websocket connection for ``WebsocketProviderV2`` to a proper async iterator. This allows for a more natural usage of the iterator pattern and mimics the behavior of the underlying ``websockets`` library.
1 change: 1 addition & 0 deletions newsfragments/3116.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issues with formatting middleware, such as ``async_geth_poa_middleware`` and subscription responses for ``WebsocketProviderV2``.
1 change: 1 addition & 0 deletions newsfragments/3116.docs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updates to the ``WebsocketProviderV2`` documentation async iterator example for iterating over a persistent stream of messages from the websocket connection via ``async for``.
121 changes: 121 additions & 0 deletions tests/core/providers/test_wsv2_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import pytest
import sys

from eth_utils import (
to_bytes,
)

from web3.exceptions import (
TimeExhausted,
)
from web3.providers.websocket import (
WebsocketProviderV2,
)
from web3.types import (
RPCEndpoint,
)


def _mock_ws(provider):
# move to top of file when python 3.7 is no longer supported in web3.py
from unittest.mock import (
AsyncMock,
)

provider._ws = AsyncMock()


@pytest.mark.asyncio
@pytest.mark.skipif(
# TODO: remove when python 3.7 is no longer supported in web3.py
# python 3.7 is already sunset so this feels like a reasonable tradeoff
sys.version_info < (3, 8),
reason="Uses AsyncMock, not supported by python 3.7",
)
async def test_async_make_request_caches_all_undesired_responses_and_returns_desired():
provider = WebsocketProviderV2("ws://mocked")

method_under_test = provider.make_request

_mock_ws(provider)
undesired_responses_count = 10
ws_recv_responses = [
to_bytes(
text=json.dumps(
{
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {"subscription": "0x1", "result": f"0x{i}"},
}
)
)
for i in range(0, undesired_responses_count)
]
# The first request we make should have an id of `0`, expect the response to match
# that id. Append it as the last response in the list.
ws_recv_responses.append(b'{"jsonrpc": "2.0", "id":0, "result": "0x1337"}')
provider._ws.recv.side_effect = ws_recv_responses

response = await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
assert response == json.loads(ws_recv_responses.pop()) # pop the expected response

assert (
len(provider._request_processor._raw_response_cache)
== len(ws_recv_responses)
== undesired_responses_count
)

for (
_cache_key,
cached_response,
) in provider._request_processor._raw_response_cache.items():
# assert all cached responses are in the list of responses we received
assert to_bytes(text=json.dumps(cached_response)) in ws_recv_responses


@pytest.mark.asyncio
@pytest.mark.skipif(
# TODO: remove when python 3.7 is no longer supported in web3.py
# python 3.7 is already sunset so this feels like a reasonable tradeoff
sys.version_info < (3, 8),
reason="Uses AsyncMock, not supported by python 3.7",
)
async def test_async_make_request_returns_cached_response_with_no_recv_if_cached():
provider = WebsocketProviderV2("ws://mocked")

method_under_test = provider.make_request

_mock_ws(provider)

# cache the response, so we should get it immediately & should never call `recv()`
desired_response = {"jsonrpc": "2.0", "id": 0, "result": "0x1337"}
await provider._request_processor.cache_raw_response(desired_response)

response = await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
assert response == desired_response

assert len(provider._request_processor._raw_response_cache) == 0
assert not provider._ws.recv.called # type: ignore


@pytest.mark.asyncio
@pytest.mark.skipif(
# TODO: remove when python 3.7 is no longer supported in web3.py
# python 3.7 is already sunset so this feels like a reasonable tradeoff
sys.version_info < (3, 8),
reason="Uses AsyncMock, not supported by python 3.7",
)
async def test_async_make_request_times_out_of_while_loop_looking_for_response():
provider = WebsocketProviderV2("ws://mocked", call_timeout=0.1)

method_under_test = provider.make_request

_mock_ws(provider)
provider._ws.recv.side_effect = lambda *args, **kwargs: b'{"jsonrpc": "2.0"}'

with pytest.raises(
TimeExhausted,
match="Timed out waiting for response with request id `0` after 0.1 seconds.",
):
await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from web3._utils.module_testing.go_ethereum_personal_module import (
GoEthereumAsyncPersonalModuleTest,
)
from web3._utils.module_testing.persistent_connection_provider import (
PersistentConnectionProviderTest,
)

from ..common import (
GoEthereumAsyncEthModuleTest,
Expand All @@ -25,6 +28,8 @@
@pytest_asyncio.fixture(scope="module")
async def async_w3(geth_process, endpoint_uri):
await wait_for_aiohttp(endpoint_uri)

# async context manager pattern
async with AsyncWeb3.persistent_websocket(
WebsocketProviderV2(endpoint_uri, call_timeout=30)
) as w3:
Expand Down Expand Up @@ -57,6 +62,10 @@ async def test_admin_start_stop_ws(self, async_w3: "AsyncWeb3") -> None:
await super().test_admin_start_stop_ws(async_w3)


class TestPersistentConnectionProviderTest(PersistentConnectionProviderTest):
pass


class TestGoEthereumAsyncEthModuleTest(GoEthereumAsyncEthModuleTest):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from web3._utils.module_testing.go_ethereum_personal_module import (
GoEthereumAsyncPersonalModuleTest,
)
from web3._utils.module_testing.persistent_connection_provider import (
PersistentConnectionProviderTest,
)

from ..common import (
GoEthereumAsyncEthModuleTest,
Expand All @@ -25,6 +28,8 @@
@pytest_asyncio.fixture(scope="module")
async def async_w3(geth_process, endpoint_uri):
await wait_for_aiohttp(endpoint_uri)

# async iterator pattern
async for w3 in AsyncWeb3.persistent_websocket(
WebsocketProviderV2(endpoint_uri, call_timeout=30)
):
Expand Down Expand Up @@ -57,6 +62,10 @@ async def test_admin_start_stop_ws(self, async_w3: "AsyncWeb3") -> None:
await super().test_admin_start_stop_ws(async_w3)


class TestPersistentConnectionProviderTest(PersistentConnectionProviderTest):
pass


class TestGoEthereumAsyncEthModuleTest(GoEthereumAsyncEthModuleTest):
pass

Expand Down
6 changes: 3 additions & 3 deletions web3/_utils/method_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,13 +643,13 @@ def subscription_formatter(value: Any) -> Union[HexBytes, HexStr, Dict[str, Any]
result_formatter = block_formatter

elif either_set_is_a_subset(
result_key_set, set(LOG_ENTRY_FORMATTERS.keys()), percentage=90
result_key_set, set(LOG_ENTRY_FORMATTERS.keys()), percentage=75
):
# logs
result_formatter = log_entry_formatter

elif either_set_is_a_subset(
result_key_set, set(TRANSACTION_RESULT_FORMATTERS.keys()), percentage=90
result_key_set, set(TRANSACTION_RESULT_FORMATTERS.keys()), percentage=75
):
# newPendingTransactions, full transactions
result_formatter = transaction_result_formatter
Expand All @@ -663,7 +663,7 @@ def subscription_formatter(value: Any) -> Union[HexBytes, HexStr, Dict[str, Any]
elif either_set_is_a_subset(
result_key_set,
set(SYNCING_FORMATTERS.keys()),
percentage=90,
percentage=75,
):
# syncing response object
result_formatter = syncing_formatter
Expand Down
Loading