Skip to content

Improve upon issues with session caching #2409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/2409.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve upon issues with session caching - better support for multithreading and make sure session eviction from cache does not happen prematurely.
4 changes: 2 additions & 2 deletions tests/core/providers/test_async_http_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

@pytest.mark.asyncio
async def test_user_provided_session() -> None:

session = ClientSession()
provider = AsyncHTTPProvider(endpoint_uri="http://mynode.local:8545")
await provider.cache_async_session(session)
cached_session = await provider.cache_async_session(session)
assert len(request._async_session_cache) == 1
assert cached_session == session
2 changes: 1 addition & 1 deletion tests/core/providers/test_http_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_user_provided_session():
w3 = Web3(provider)
assert w3.manager.provider == provider

session = request.get_session(URI)
session = request.cache_and_return_session(URI)
adapter = session.get_adapter(URI)
assert isinstance(adapter, HTTPAdapter)
assert adapter._pool_connections == 20
Expand Down
182 changes: 156 additions & 26 deletions tests/core/utilities/test_request.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import asyncio
from concurrent.futures import (
ThreadPoolExecutor,
)
import pytest
import time

from aiohttp import (
ClientSession,
)
from eth_typing import (
URI,
)
from requests import (
Session,
adapters,
Expand All @@ -15,8 +23,13 @@
from web3._utils import (
request,
)
from web3._utils.caching import (
generate_cache_key,
)
from web3._utils.request import (
SessionCache,
cache_and_return_async_session,
cache_and_return_session,
)


Expand All @@ -34,7 +47,14 @@ def raise_for_status(self):
pass


URI = "http://mynode.local:8545"
TEST_URI = URI("http://mynode.local:8545")
UNIQUE_URIS = [
"https://www.test1.com",
"https://www.test2.com",
"https://www.test3.com",
"https://www.test4.com",
"https://www.test5.com",
]


def check_adapters_mounted(session: Session):
Expand All @@ -48,19 +68,23 @@ def test_make_post_request_no_args(mocker):

# Submit a first request to create a session with default parameters
assert len(request._session_cache) == 0
response = request.make_post_request(URI, data=b"request")
response = request.make_post_request(TEST_URI, data=b"request")
assert response == "content"
assert len(request._session_cache) == 1
session = request._session_cache.values()[0]
session.post.assert_called_once_with(URI, data=b"request", timeout=10)
cache_key = generate_cache_key(TEST_URI)
session = request._session_cache.get_cache_entry(cache_key)
session.post.assert_called_once_with(TEST_URI, data=b"request", timeout=10)

# Ensure the adapter was created with default values
check_adapters_mounted(session)
adapter = session.get_adapter(URI)
adapter = session.get_adapter(TEST_URI)
assert isinstance(adapter, HTTPAdapter)
assert adapter._pool_connections == DEFAULT_POOLSIZE
assert adapter._pool_maxsize == DEFAULT_POOLSIZE

# clear cache
request._session_cache.clear()


def test_precached_session(mocker):
mocker.patch("requests.Session.post", return_value=MockedResponse())
Expand All @@ -70,44 +94,30 @@ def test_precached_session(mocker):
session = Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
request.cache_session(URI, session)
request.cache_and_return_session(TEST_URI, session)

# Submit a second request with different arguments
assert len(request._session_cache) == 1
response = request.make_post_request(URI, data=b"request", timeout=60)
response = request.make_post_request(TEST_URI, data=b"request", timeout=60)
assert response == "content"
assert len(request._session_cache) == 1

# Ensure the timeout was passed to the request
session = request.get_session(URI)
session.post.assert_called_once_with(URI, data=b"request", timeout=60)
session = request.cache_and_return_session(TEST_URI)
session.post.assert_called_once_with(TEST_URI, data=b"request", timeout=60)

# Ensure the adapter parameters match those we specified
check_adapters_mounted(session)
adapter = session.get_adapter(URI)
adapter = session.get_adapter(TEST_URI)
assert isinstance(adapter, HTTPAdapter)
assert adapter._pool_connections == 100
assert adapter._pool_maxsize == 100


@pytest.mark.asyncio
async def test_async_precached_session(mocker):
# Add a session
session = ClientSession()
await request.cache_async_session(URI, session)
assert len(request._async_session_cache) == 1

# Make sure the session isn't duplicated
await request.cache_async_session(URI, session)
assert len(request._async_session_cache) == 1

# Make sure a request with a different URI adds another cached session
await request.cache_async_session(f"{URI}/test", session)
assert len(request._async_session_cache) == 2
# clear cache
request._session_cache.clear()


def test_cache_session_class():

cache = SessionCache(2)
evicted_items = cache.cache("1", "Hello1")
assert cache.get_cache_entry("1") == "Hello1"
Expand All @@ -126,9 +136,129 @@ def test_cache_session_class():
evicted_items = cache.cache("3", "Hello3")
assert "2" in cache
assert "3" in cache

assert "1" not in cache
assert "1" in evicted_items

with pytest.raises(KeyError):
# This should throw a KeyError since the cache size was 2 and 3 were inserted
# the first inserted cached item was removed and returned in evicted items
cache.get_cache_entry("1")

# clear cache
request._session_cache.clear()


def test_cache_does_not_close_session_before_a_call_when_multithreading():
# save default values
session_cache_default = request._async_session_cache
timeout_default = request.DEFAULT_TIMEOUT

# set cache size to 1 + set future session close thread time to 0.01s
request._session_cache = SessionCache(1)
_timeout_for_testing = 0.01
request.DEFAULT_TIMEOUT = _timeout_for_testing

def _simulate_call(uri):
_session = cache_and_return_session(uri)

# simulate a call taking 0.01s to return a response
time.sleep(0.01)
return _session

with ThreadPoolExecutor(max_workers=len(UNIQUE_URIS)) as exc:
all_sessions = [exc.submit(_simulate_call, uri) for uri in UNIQUE_URIS]

# assert last session remains in cache, all others evicted
cache_data = request._session_cache._data
assert len(cache_data) == 1
_key, cached_session = cache_data.popitem()
assert cached_session == all_sessions[-1].result() # result of the `Future`

# -- teardown -- #

# close the cached session before exiting test
cached_session.close()

# reset default values
request._async_session_cache = session_cache_default
request.DEFAULT_TIMEOUT = timeout_default

# clear cache
request._session_cache.clear()


# -- async -- #


@pytest.mark.asyncio
async def test_async_precached_session():
# Add a session
session = ClientSession()
await request.cache_and_return_async_session(TEST_URI, session)
assert len(request._async_session_cache) == 1

# Make sure the session isn't duplicated
await request.cache_and_return_async_session(TEST_URI, session)
assert len(request._async_session_cache) == 1

# Make sure a request with a different URI adds another cached session
await request.cache_and_return_async_session(URI(f"{TEST_URI}/test"), session)
assert len(request._async_session_cache) == 2

# -- teardown -- #

# appropriately close the cached sessions
[await session.close() for session in request._async_session_cache._data.values()]

# clear cache
request._async_session_cache.clear()


@pytest.mark.asyncio
async def test_async_cache_does_not_close_session_before_a_call_when_multithreading():
# save default values
session_cache_default = request._async_session_cache
timeout_default = request.DEFAULT_TIMEOUT

# set cache size to 1 + set future session close thread time to 0.01s
request._async_session_cache = SessionCache(1)
_timeout_for_testing = 0.01
request.DEFAULT_TIMEOUT = _timeout_for_testing

async def cache_uri_and_return_session(uri):
_session = await cache_and_return_async_session(uri)

# simulate a call taking 0.01s to return a response
await asyncio.sleep(0.01)

assert not _session.closed
return _session

tasks = [cache_uri_and_return_session(uri) for uri in UNIQUE_URIS]

all_sessions = await asyncio.gather(*tasks)
assert len(all_sessions) == len(UNIQUE_URIS)
assert all(isinstance(s, ClientSession) for s in all_sessions)

# last session remains in cache, all others evicted
cache_data = request._async_session_cache._data
assert len(cache_data) == 1
_key, cached_session = cache_data.popitem()
assert cached_session == all_sessions[-1]

# assert all evicted sessions were closed
await asyncio.sleep(_timeout_for_testing + 0.1)
assert all(session.closed for session in all_sessions[:-1])

# -- teardown -- #

# appropriately close the cached session
await cached_session.close()

# reset default values
request._async_session_cache = session_cache_default
request.DEFAULT_TIMEOUT = timeout_default

# clear cache
request._async_session_cache.clear()
8 changes: 4 additions & 4 deletions web3/_utils/module_testing/module_testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
Literal,
)
from web3._utils.request import (
get_async_session,
get_session,
cache_and_return_async_session,
cache_and_return_session,
)
from web3.types import (
BlockData,
Expand Down Expand Up @@ -103,7 +103,7 @@ def _mock_specific_request(
return MockedResponse()

# else, make a normal request (no mocking)
session = get_session(url_from_args)
session = cache_and_return_session(url_from_args)
return session.request(method=http_method.upper(), url=url_from_args, **kwargs)

monkeypatch.setattr(
Expand Down Expand Up @@ -153,7 +153,7 @@ async def _mock_specific_request(
return AsyncMockedResponse()

# else, make a normal request (no mocking)
session = await get_async_session(url_from_args)
session = await cache_and_return_async_session(url_from_args)
return await session.request(
method=http_method.upper(), url=url_from_args, **kwargs
)
Expand Down
Loading