Skip to content

Commit be23f0e

Browse files
committed
Document the RequestProcessor class and caches.
- Add the beginnings of the documentation for why and how ``PersistentConnectionProvider`` instances cache request information and responses in order to process them appropriately. - Link to the ``RequestProcessor`` documentation from the ``WebsocketProviderV2`` docs.
1 parent 375166a commit be23f0e

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

docs/internals.rst

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,118 @@ Managers
185185
The Manager acts as a gatekeeper for the request/response lifecycle. It is
186186
unlikely that you will need to change the Manager as most functionality can be
187187
implemented in the Middleware layer.
188+
189+
.. _request_processor:
190+
191+
RequestProcessor
192+
----------------
193+
194+
The ``RequestProcessor`` class is responsible for the storing and syncing up of
195+
asynchronous requests to responses for a ``PersistentConnectionProvider``. The best
196+
example of one such provider is the ``WebsocketProviderV2``. In order to send a
197+
websocket message and receive a response to that particular request,
198+
``PersistentConnectionProvider`` instances have to match request *id* values to
199+
response *id* values coming back from the websocket connection. Any provider that does
200+
not adhere to the `JSON-RPC 2.0 specification <https://www.jsonrpc.org/specification>`_
201+
in this way will not work with ``PersistentConnectionProvider`` instances. The specifics
202+
of how the request processor handles this is outlined below.
203+
204+
One-To-One Requests
205+
~~~~~~~~~~~~~~~~~~~
206+
207+
One-to-one requests can be summarized as any request that expects one response back.
208+
An example is using the ``eth`` module API to request the latest block number.
209+
210+
.. code-block:: python
211+
212+
>>> async def wsV2_example():
213+
... async with AsyncWeb3.persistent_websocket(
214+
... WebsocketProviderV2(f"ws://127.0.0.1:8546")
215+
... ) as w3:
216+
... # make a request and expect a single response returned on the same line
217+
... latest_block_num = await w3.eth.block_number
218+
219+
With websockets we have to call ``ws_send()`` and asynchronously receive responses via
220+
``ws.recv()``. In order to make the one-to-one request-to-response call work, we
221+
have to save the request information somewhere so that, when the response is received,
222+
we can match it to the original request that was made (the request with a matching *id*
223+
to the response that was received), and use that request information to process the
224+
response. Processing the response, in this case, means running it through the
225+
formatters and middlewares internal to the *web3.py* library.
226+
227+
In order to store the request information, the ``RequestProcessor`` class has an
228+
internal ``RequestInformation`` cache. The ``RequestInformation`` class saves important
229+
information about a request, such as:
230+
231+
- ``method``: The name of the method - e.g. "eth_subscribe".
232+
- ``params``: The params used when the call was made - e.g.
233+
("newPendingTransactions", True).
234+
- ``response_formatters``: The formatters that will be used to process the response.
235+
- ``middleware_response_processors``: Any middleware that processes responses that
236+
is present on the instance at the time of the request is appended here, in order,
237+
so the response may be piped through that logic when it comes in.
238+
- ``subscription_id``: If the request is an ``eth_subscribe`` request, rather than
239+
popping this information from the cache when the response to the subscription call
240+
comes in (i.e. the subscription *id*), we save the subscription id with the
241+
request information so that we can correctly process all subscription messages
242+
that come in with that subscription *id*. For one-to-one request-to-response
243+
calls, this value is always ``None``.
244+
245+
One-to-one responses, those that include a JSON-RPC *id* in the response object, are
246+
stored in an internal ``SimpleCache`` class, isolated from any one-to-many responses.
247+
When the ``PersistentConnectionProvider`` is looking for a response internally, it will
248+
cycle within a ``while`` loop, alternating between checking this cache (in case
249+
somewhere else in the code our desired response was cached by another call) and calling
250+
``recv()`` on the websocket connection to see if it is yet to come.
251+
252+
One-To-Many Requests
253+
~~~~~~~~~~~~~~~~~~~~
254+
255+
One-to-many requests can be summarized by any request that expects many responses as a
256+
result of the initial request. An example is the ``eth_subscribe`` request. The initial
257+
``eth_subscribe`` request expects only one response, the subscription *id* value, but
258+
it also expects to receive many ``eth_subscription`` messages if and when the request is
259+
successful. For this reason, the original request is considered a one-to-one request
260+
so that a subscription *id* can be returned to the user on the same line, but the
261+
``listen_to_websocket()`` method on the ``WebsocketConnection`` class, the public API
262+
for interacting with the active websocket connection, is set up to receive many-to-one
263+
``eth_subscription`` responses over an asynchronous interator pattern.
264+
265+
.. code-block:: python
266+
267+
>>> async def ws_v2_subscription_example():
268+
... async with AsyncWeb3.persistent_websocket(
269+
... WebsocketProviderV2(f"ws://127.0.0.1:8546")
270+
... ) as w3:
271+
... # Subscribe to new block headers and receive the subscription_id.
272+
... # A one-to-one call with a trigger for many responses
273+
... subscription_id = await w3.eth.subscribe("newHeads")
274+
...
275+
... # Listen to the websocket for the many responses utilizing the ``w3.ws``
276+
... # ``WebsocketConnection`` public API method ``listen_to_websocket()``
277+
... async for response in w3.ws.listen_to_websocket():
278+
... # Receive only one-to-many responses here so that we don't
279+
... # accidentally return the response for a one-to-one request in this
280+
... # block
281+
...
282+
... print(f"{response}\n")
283+
...
284+
... if some_condition:
285+
... # unsubscribe from new block headers, another one-to-one request
286+
... is_unsubscribed = await w3.eth.unsubscribe(subscription_id)
287+
... if is_unsubscribed:
288+
... break
289+
290+
>>> asyncio.run(ws_v2_subscription_example())
291+
292+
One-to-many responses, those that do not include a JSON-RPC *id* in the response object,
293+
are stored in an internal ``collections.deque`` instance, isolated from any one-to-one
294+
responses. When the ``PersistentConnectionProvider`` is looking for a one-to-many
295+
response internally, it will cycle within a ``while`` loop, alternating between
296+
checking this deque (in case somewhere else in the code, subscriptions responses were
297+
put in the deque by another call looking for another response type) and calling
298+
``recv()`` on the websocket connection to see if the response is yet to be received.
299+
With each iteration on this iterator, if the ``deque`` is non-empty, it will yield
300+
messages from the ``deque`` as FIFO order until the deque is empty before receiving any
301+
new messages from the websocket connection, guaranteeing the messages are yielded in the
302+
order they were received.

docs/providers.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,11 @@ provider in separate lines. Both of these examples are shown below.
348348
>>> asyncio.run(ws_v2_alternate_init_example_1)
349349
>>> asyncio.run(ws_v2_alternate_init_example_2)
350350
351+
The ``WebsocketProviderV2`` class uses the :ref:`RequestProcessor <request_processor>`
352+
class under the hood to sync up the receiving and response processing for one-to-one and
353+
one-to-many request-to-response requests. Refer to the linked documentation for the
354+
``RequestProcessor`` for details on why and how it handles this processing.
355+
351356
_PersistentConnectionWeb3 via AsyncWeb3.persistent_websocket()
352357
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
353358

0 commit comments

Comments
 (0)