Skip to content

Performance updates to SimpleCache, session cache, cache middlewares #2719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/2719.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Some minor performance improvements to the ``SimpleCache`` class and simple cache middlewares (sync and async).
28 changes: 14 additions & 14 deletions tests/core/utilities/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
generate_cache_key,
)
from web3._utils.request import (
cache_and_return_async_session,
async_cache_and_return_session,
cache_and_return_session,
)
from web3.utils.caching import (
Expand Down Expand Up @@ -159,21 +159,21 @@ def test_precached_session(mocker):

def test_cache_session_class():
cache = SimpleCache(2)
evicted_items = cache.cache("1", "Hello1")
_, evicted_items = cache.cache("1", "Hello1")
assert cache.get_cache_entry("1") == "Hello1"
assert evicted_items is None

evicted_items = cache.cache("2", "Hello2")
_, evicted_items = cache.cache("2", "Hello2")
assert cache.get_cache_entry("2") == "Hello2"
assert evicted_items is None

# Changing what is stored at a given cache key should not cause the
# anything to be evicted
evicted_items = cache.cache("1", "HelloChanged")
_, evicted_items = cache.cache("1", "HelloChanged")
assert cache.get_cache_entry("1") == "HelloChanged"
assert evicted_items is None

evicted_items = cache.cache("3", "Hello3")
_, evicted_items = cache.cache("3", "Hello3")
assert "2" in cache
assert "3" in cache

Expand Down Expand Up @@ -297,15 +297,15 @@ async def test_async_make_post_request(mocker):
async def test_async_precached_session():
# Add a session
session = ClientSession()
await request.cache_and_return_async_session(TEST_URI, session)
await request.async_cache_and_return_session(TEST_URI, session)
assert len(request._async_session_cache) == 1

# Make sure the session isn't duplicated
await request.cache_and_return_async_session(TEST_URI, session)
await request.async_cache_and_return_session(TEST_URI, session)
assert len(request._async_session_cache) == 1

# Make sure a request with a different URI adds another cached session
await request.cache_and_return_async_session(URI(f"{TEST_URI}/test"), session)
await request.async_cache_and_return_session(URI(f"{TEST_URI}/test"), session)
assert len(request._async_session_cache) == 2

# -- teardown -- #
Expand All @@ -326,7 +326,7 @@ async def test_async_cache_does_not_close_session_before_a_call_when_multithread
request.DEFAULT_TIMEOUT = _timeout_for_testing

async def cache_uri_and_return_session(uri):
_session = await cache_and_return_async_session(uri)
_session = await async_cache_and_return_session(uri)

# simulate a call taking 0.01s to return a response
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -371,7 +371,7 @@ async def test_async_unique_cache_keys_created_per_thread_with_same_uri():
def target_function(endpoint_uri):
event_loop = asyncio.new_event_loop()
unique_session = event_loop.run_until_complete(
cache_and_return_async_session(endpoint_uri)
async_cache_and_return_session(endpoint_uri)
)
test_sessions.append(unique_session)

Expand Down Expand Up @@ -406,7 +406,7 @@ async def test_async_use_new_session_if_loop_closed_for_cached_session():
session1 = ClientSession(raise_for_status=True)
session1._loop = loop1

await cache_and_return_async_session(TEST_URI, session=session1)
await async_cache_and_return_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
Expand All @@ -420,7 +420,7 @@ async def test_async_use_new_session_if_loop_closed_for_cached_session():

# assert we create a new session when trying to retrieve the session at the
# cache key for TEST_URI
session2 = await cache_and_return_async_session(TEST_URI)
session2 = await async_cache_and_return_session(TEST_URI)
assert not session2._loop.is_closed()
assert session2 != session1

Expand All @@ -442,7 +442,7 @@ async def test_async_use_new_session_if_session_closed_for_cached_session():
# create a session, close it, and cache it at the cache key for TEST_URI
session1 = ClientSession(raise_for_status=True)
await session1.close()
await cache_and_return_async_session(TEST_URI, session=session1)
await async_cache_and_return_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
Expand All @@ -452,7 +452,7 @@ async def test_async_use_new_session_if_session_closed_for_cached_session():
assert cached_session == session1

# assert we create a new session when trying to retrieve closed session from cache
session2 = await cache_and_return_async_session(TEST_URI)
session2 = await async_cache_and_return_session(TEST_URI)
assert not session2.closed
assert session2 != session1

Expand Down
4 changes: 2 additions & 2 deletions web3/_utils/module_testing/module_testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
Literal,
)
from web3._utils.request import (
cache_and_return_async_session,
async_cache_and_return_session,
cache_and_return_session,
)
from web3.types import (
Expand Down Expand Up @@ -153,7 +153,7 @@ async def _mock_specific_request(
return AsyncMockedResponse()

# else, make a normal request (no mocking)
session = await cache_and_return_async_session(url_from_args)
session = await async_cache_and_return_session(url_from_args)
return await session.request(
method=http_method.upper(), url=url_from_args, **kwargs
)
Expand Down
41 changes: 23 additions & 18 deletions web3/_utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,20 @@ def cache_and_return_session(
# cache key should have a unique thread identifier
cache_key = generate_cache_key(f"{threading.get_ident()}:{endpoint_uri}")

evicted_items = None
with _session_cache_lock:
if cache_key not in _session_cache:
if session is None:
session = requests.Session()
cached_session = _session_cache.get_cache_entry(cache_key)
if cached_session is not None:
# If read from cache yields a session, no need to lock; return the session.
# Sync is a bit simpler in this way since a `requests.Session` doesn't really
# "close" in the same way that an async `ClientSession` does. When "closed", it
# still uses http / https adapters successfully if a request is made.
return cached_session

evicted_items = _session_cache.cache(cache_key, session)
logger.debug(f"Session cached: {endpoint_uri}, {session}")
if session is None:
session = requests.Session()

cached_session = _session_cache.get_cache_entry(cache_key)
with _session_cache_lock:
cached_session, evicted_items = _session_cache.cache(cache_key, session)
logger.debug(f"Session cached: {endpoint_uri}, {cached_session}")

if evicted_items is not None:
evicted_sessions = evicted_items.values()
Expand Down Expand Up @@ -126,7 +130,7 @@ def _close_evicted_sessions(evicted_sessions: List[requests.Session]) -> None:
_async_session_pool = ThreadPoolExecutor(max_workers=1)


async def cache_and_return_async_session(
async def async_cache_and_return_session(
endpoint_uri: URI,
session: Optional[ClientSession] = None,
) -> ClientSession:
Expand All @@ -139,8 +143,10 @@ async def cache_and_return_async_session(
if session is None:
session = ClientSession(raise_for_status=True)

evicted_items = _async_session_cache.cache(cache_key, session)
logger.debug(f"Async session cached: {endpoint_uri}, {session}")
cached_session, evicted_items = _async_session_cache.cache(
cache_key, session
)
logger.debug(f"Async session cached: {endpoint_uri}, {cached_session}")

else:
# get the cached session
Expand Down Expand Up @@ -171,11 +177,10 @@ async def cache_and_return_async_session(

# replace stale session with a new session at the cache key
_session = ClientSession(raise_for_status=True)
evicted_items = _async_session_cache.cache(cache_key, _session)
logger.debug(f"Async session cached: {endpoint_uri}, {_session}")

# get the cached session
cached_session = _async_session_cache.get_cache_entry(cache_key)
cached_session, evicted_items = _async_session_cache.cache(
cache_key, _session
)
logger.debug(f"Async session cached: {endpoint_uri}, {cached_session}")

if evicted_items is not None:
# At this point the evicted sessions are already popped out of the cache and
Expand Down Expand Up @@ -206,7 +211,7 @@ async def async_get_response_from_get_request(
endpoint_uri: URI, *args: Any, **kwargs: Any
) -> ClientResponse:
kwargs.setdefault("timeout", ClientTimeout(DEFAULT_TIMEOUT))
session = await cache_and_return_async_session(endpoint_uri)
session = await async_cache_and_return_session(endpoint_uri)
response = await session.get(endpoint_uri, *args, **kwargs)
return response

Expand All @@ -223,7 +228,7 @@ async def async_get_response_from_post_request(
endpoint_uri: URI, *args: Any, **kwargs: Any
) -> ClientResponse:
kwargs.setdefault("timeout", ClientTimeout(DEFAULT_TIMEOUT))
session = await cache_and_return_async_session(endpoint_uri)
session = await async_cache_and_return_session(endpoint_uri)
response = await session.post(endpoint_uri, *args, **kwargs)
return response

Expand Down
19 changes: 10 additions & 9 deletions web3/middleware/async_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ async def async_simple_cache_middleware(

async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if method in rpc_whitelist:
async with async_lock(_async_request_thread_pool, lock):
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
if cache.__contains__(cache_key):
return cache.get_cache_entry(cache_key)
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
cached_request = cache.get_cache_entry(cache_key)
if cached_request is not None:
return cached_request

response = await make_request(method, params)
if should_cache_fn(method, params, response):
response = await make_request(method, params)
if should_cache_fn(method, params, response):
async with async_lock(_async_request_thread_pool, lock):
cache.cache(cache_key, response)
return response
return response
else:
return await make_request(method, params)

Expand Down
37 changes: 17 additions & 20 deletions web3/middleware/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,24 @@ def simple_cache_middleware(
lock = threading.Lock()

def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
lock_acquired = (
lock.acquire(blocking=False) if method in rpc_whitelist else False
)

try:
if lock_acquired and method in rpc_whitelist:
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
if cache.__contains__(cache_key):
return cache.get_cache_entry(cache_key)

response = make_request(method, params)
if should_cache_fn(method, params, response):
if method in rpc_whitelist:
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
cached_request = cache.get_cache_entry(cache_key)
if cached_request is not None:
return cached_request

response = make_request(method, params)
if should_cache_fn(method, params, response):
lock.acquire(blocking=False)
try:
cache.cache(cache_key, response)
return response
else:
return make_request(method, params)
finally:
if lock_acquired:
lock.release()
finally:
lock.release()
return response
else:
return make_request(method, params)

return middleware

Expand Down
4 changes: 2 additions & 2 deletions web3/providers/async_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
construct_user_agent,
)
from web3._utils.request import (
async_cache_and_return_session as _async_cache_and_return_session,
async_make_post_request,
cache_and_return_async_session as _cache_and_return_async_session,
get_default_http_endpoint,
)
from web3.types import (
Expand Down Expand Up @@ -56,7 +56,7 @@ def __init__(
super().__init__()

async def cache_async_session(self, session: ClientSession) -> ClientSession:
return await _cache_and_return_async_session(self.endpoint_uri, session)
return await _async_cache_and_return_session(self.endpoint_uri, session)

def __str__(self) -> str:
return f"RPC connection {self.endpoint_uri}"
Expand Down
8 changes: 6 additions & 2 deletions web3/utils/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Any,
Dict,
Optional,
Tuple,
)


Expand All @@ -13,7 +14,7 @@ def __init__(self, size: int = 100):
self._size = size
self._data: OrderedDict[str, Any] = OrderedDict()

def cache(self, key: str, value: Any) -> Dict[str, Any]:
def cache(self, key: str, value: Any) -> Tuple[Any, Dict[str, Any]]:
evicted_items = None
# If the key is already in the OrderedDict just update it
# and don't evict any values. Ideally, we could still check to see
Expand All @@ -26,7 +27,10 @@ def cache(self, key: str, value: Any) -> Dict[str, Any]:
k, v = self._data.popitem(last=False)
evicted_items[k] = v
self._data[key] = value
return evicted_items

# Return the cached value along with the evicted items at the same time. No
# need to reach back into the cache to grab the value.
return value, evicted_items

def get_cache_entry(self, key: str) -> Optional[Any]:
return self._data[key] if key in self._data else None
Expand Down