Skip to content

Commit 81807c7

Browse files
committed
Performance updates to SimpleCache, session cache, cache middlewares
- Don't lock caches until we have to. If reading from the cache yields a value and a stale value is not an issue, return it. Lock the cache only when writing to it or reading when, say, a stale session is a consideration (async session cache).
1 parent ff3e023 commit 81807c7

File tree

7 files changed

+75
-66
lines changed

7 files changed

+75
-66
lines changed

tests/core/utilities/test_request.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
generate_cache_key,
3131
)
3232
from web3._utils.request import (
33-
cache_and_return_async_session,
33+
async_cache_and_return_session,
3434
cache_and_return_session,
3535
)
3636
from web3.utils.caching import (
@@ -159,21 +159,21 @@ def test_precached_session(mocker):
159159

160160
def test_cache_session_class():
161161
cache = SimpleCache(2)
162-
evicted_items = cache.cache("1", "Hello1")
162+
_, evicted_items = cache.cache("1", "Hello1")
163163
assert cache.get_cache_entry("1") == "Hello1"
164164
assert evicted_items is None
165165

166-
evicted_items = cache.cache("2", "Hello2")
166+
_, evicted_items = cache.cache("2", "Hello2")
167167
assert cache.get_cache_entry("2") == "Hello2"
168168
assert evicted_items is None
169169

170170
# Changing what is stored at a given cache key should not cause the
171171
# anything to be evicted
172-
evicted_items = cache.cache("1", "HelloChanged")
172+
_, evicted_items = cache.cache("1", "HelloChanged")
173173
assert cache.get_cache_entry("1") == "HelloChanged"
174174
assert evicted_items is None
175175

176-
evicted_items = cache.cache("3", "Hello3")
176+
_, evicted_items = cache.cache("3", "Hello3")
177177
assert "2" in cache
178178
assert "3" in cache
179179

@@ -297,15 +297,15 @@ async def test_async_make_post_request(mocker):
297297
async def test_async_precached_session():
298298
# Add a session
299299
session = ClientSession()
300-
await request.cache_and_return_async_session(TEST_URI, session)
300+
await request.async_cache_and_return_session(TEST_URI, session)
301301
assert len(request._async_session_cache) == 1
302302

303303
# Make sure the session isn't duplicated
304-
await request.cache_and_return_async_session(TEST_URI, session)
304+
await request.async_cache_and_return_session(TEST_URI, session)
305305
assert len(request._async_session_cache) == 1
306306

307307
# Make sure a request with a different URI adds another cached session
308-
await request.cache_and_return_async_session(URI(f"{TEST_URI}/test"), session)
308+
await request.async_cache_and_return_session(URI(f"{TEST_URI}/test"), session)
309309
assert len(request._async_session_cache) == 2
310310

311311
# -- teardown -- #
@@ -326,7 +326,7 @@ async def test_async_cache_does_not_close_session_before_a_call_when_multithread
326326
request.DEFAULT_TIMEOUT = _timeout_for_testing
327327

328328
async def cache_uri_and_return_session(uri):
329-
_session = await cache_and_return_async_session(uri)
329+
_session = await async_cache_and_return_session(uri)
330330

331331
# simulate a call taking 0.01s to return a response
332332
await asyncio.sleep(0.01)
@@ -371,7 +371,7 @@ async def test_async_unique_cache_keys_created_per_thread_with_same_uri():
371371
def target_function(endpoint_uri):
372372
event_loop = asyncio.new_event_loop()
373373
unique_session = event_loop.run_until_complete(
374-
cache_and_return_async_session(endpoint_uri)
374+
async_cache_and_return_session(endpoint_uri)
375375
)
376376
test_sessions.append(unique_session)
377377

@@ -406,7 +406,7 @@ async def test_async_use_new_session_if_loop_closed_for_cached_session():
406406
session1 = ClientSession(raise_for_status=True)
407407
session1._loop = loop1
408408

409-
await cache_and_return_async_session(TEST_URI, session=session1)
409+
await async_cache_and_return_session(TEST_URI, session=session1)
410410

411411
# assert session1 was cached
412412
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
@@ -420,7 +420,7 @@ async def test_async_use_new_session_if_loop_closed_for_cached_session():
420420

421421
# assert we create a new session when trying to retrieve the session at the
422422
# cache key for TEST_URI
423-
session2 = await cache_and_return_async_session(TEST_URI)
423+
session2 = await async_cache_and_return_session(TEST_URI)
424424
assert not session2._loop.is_closed()
425425
assert session2 != session1
426426

@@ -442,7 +442,7 @@ async def test_async_use_new_session_if_session_closed_for_cached_session():
442442
# create a session, close it, and cache it at the cache key for TEST_URI
443443
session1 = ClientSession(raise_for_status=True)
444444
await session1.close()
445-
await cache_and_return_async_session(TEST_URI, session=session1)
445+
await async_cache_and_return_session(TEST_URI, session=session1)
446446

447447
# assert session1 was cached
448448
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
@@ -452,7 +452,7 @@ async def test_async_use_new_session_if_session_closed_for_cached_session():
452452
assert cached_session == session1
453453

454454
# assert we create a new session when trying to retrieve closed session from cache
455-
session2 = await cache_and_return_async_session(TEST_URI)
455+
session2 = await async_cache_and_return_session(TEST_URI)
456456
assert not session2.closed
457457
assert session2 != session1
458458

web3/_utils/module_testing/module_testing_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
Literal,
2727
)
2828
from web3._utils.request import (
29-
cache_and_return_async_session,
29+
async_cache_and_return_session,
3030
cache_and_return_session,
3131
)
3232
from web3.types import (
@@ -153,7 +153,7 @@ async def _mock_specific_request(
153153
return AsyncMockedResponse()
154154

155155
# else, make a normal request (no mocking)
156-
session = await cache_and_return_async_session(url_from_args)
156+
session = await async_cache_and_return_session(url_from_args)
157157
return await session.request(
158158
method=http_method.upper(), url=url_from_args, **kwargs
159159
)

web3/_utils/request.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,20 @@ def cache_and_return_session(
5252
# cache key should have a unique thread identifier
5353
cache_key = generate_cache_key(f"{threading.get_ident()}:{endpoint_uri}")
5454

55-
evicted_items = None
56-
with _session_cache_lock:
57-
if cache_key not in _session_cache:
58-
if session is None:
59-
session = requests.Session()
55+
cached_session = _session_cache.get_cache_entry(cache_key)
56+
if cached_session is not None:
57+
# If read from cache yields a session, no need to lock; return the session.
58+
# Sync is a bit simpler in this way since a `requests.Session` doesn't really
59+
# "close" in the same way that an async `ClientSession` does. When "closed", it
60+
# still uses http / https adapters successfully if a request is made.
61+
return cached_session
6062

61-
evicted_items = _session_cache.cache(cache_key, session)
62-
logger.debug(f"Session cached: {endpoint_uri}, {session}")
63+
if session is None:
64+
session = requests.Session()
6365

64-
cached_session = _session_cache.get_cache_entry(cache_key)
66+
with _session_cache_lock:
67+
cached_session, evicted_items = _session_cache.cache(cache_key, session)
68+
logger.debug(f"Session cached: {endpoint_uri}, {cached_session}")
6569

6670
if evicted_items is not None:
6771
evicted_sessions = evicted_items.values()
@@ -126,7 +130,7 @@ def _close_evicted_sessions(evicted_sessions: List[requests.Session]) -> None:
126130
_async_session_pool = ThreadPoolExecutor(max_workers=1)
127131

128132

129-
async def cache_and_return_async_session(
133+
async def async_cache_and_return_session(
130134
endpoint_uri: URI,
131135
session: Optional[ClientSession] = None,
132136
) -> ClientSession:
@@ -139,8 +143,10 @@ async def cache_and_return_async_session(
139143
if session is None:
140144
session = ClientSession(raise_for_status=True)
141145

142-
evicted_items = _async_session_cache.cache(cache_key, session)
143-
logger.debug(f"Async session cached: {endpoint_uri}, {session}")
146+
cached_session, evicted_items = _async_session_cache.cache(
147+
cache_key, session
148+
)
149+
logger.debug(f"Async session cached: {endpoint_uri}, {cached_session}")
144150

145151
else:
146152
# get the cached session
@@ -171,11 +177,10 @@ async def cache_and_return_async_session(
171177

172178
# replace stale session with a new session at the cache key
173179
_session = ClientSession(raise_for_status=True)
174-
evicted_items = _async_session_cache.cache(cache_key, _session)
175-
logger.debug(f"Async session cached: {endpoint_uri}, {_session}")
176-
177-
# get the cached session
178-
cached_session = _async_session_cache.get_cache_entry(cache_key)
180+
cached_session, evicted_items = _async_session_cache.cache(
181+
cache_key, _session
182+
)
183+
logger.debug(f"Async session cached: {endpoint_uri}, {cached_session}")
179184

180185
if evicted_items is not None:
181186
# At this point the evicted sessions are already popped out of the cache and
@@ -206,7 +211,7 @@ async def async_get_response_from_get_request(
206211
endpoint_uri: URI, *args: Any, **kwargs: Any
207212
) -> ClientResponse:
208213
kwargs.setdefault("timeout", ClientTimeout(DEFAULT_TIMEOUT))
209-
session = await cache_and_return_async_session(endpoint_uri)
214+
session = await async_cache_and_return_session(endpoint_uri)
210215
response = await session.get(endpoint_uri, *args, **kwargs)
211216
return response
212217

@@ -223,7 +228,7 @@ async def async_get_response_from_post_request(
223228
endpoint_uri: URI, *args: Any, **kwargs: Any
224229
) -> ClientResponse:
225230
kwargs.setdefault("timeout", ClientTimeout(DEFAULT_TIMEOUT))
226-
session = await cache_and_return_async_session(endpoint_uri)
231+
session = await async_cache_and_return_session(endpoint_uri)
227232
response = await session.post(endpoint_uri, *args, **kwargs)
228233
return response
229234

web3/middleware/async_cache.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,18 @@ async def async_simple_cache_middleware(
6262

6363
async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
6464
if method in rpc_whitelist:
65-
async with async_lock(_async_request_thread_pool, lock):
66-
cache_key = generate_cache_key(
67-
f"{threading.get_ident()}:{(method, params)}"
68-
)
69-
if cache.__contains__(cache_key):
70-
return cache.get_cache_entry(cache_key)
65+
cache_key = generate_cache_key(
66+
f"{threading.get_ident()}:{(method, params)}"
67+
)
68+
cached_request = cache.get_cache_entry(cache_key)
69+
if cached_request is not None:
70+
return cached_request
7171

72-
response = await make_request(method, params)
73-
if should_cache_fn(method, params, response):
72+
response = await make_request(method, params)
73+
if should_cache_fn(method, params, response):
74+
async with async_lock(_async_request_thread_pool, lock):
7475
cache.cache(cache_key, response)
75-
return response
76+
return response
7677
else:
7778
return await make_request(method, params)
7879

web3/middleware/cache.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,27 +93,24 @@ def simple_cache_middleware(
9393
lock = threading.Lock()
9494

9595
def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
96-
lock_acquired = (
97-
lock.acquire(blocking=False) if method in rpc_whitelist else False
98-
)
99-
100-
try:
101-
if lock_acquired and method in rpc_whitelist:
102-
cache_key = generate_cache_key(
103-
f"{threading.get_ident()}:{(method, params)}"
104-
)
105-
if cache.__contains__(cache_key):
106-
return cache.get_cache_entry(cache_key)
107-
108-
response = make_request(method, params)
109-
if should_cache_fn(method, params, response):
96+
if method in rpc_whitelist:
97+
cache_key = generate_cache_key(
98+
f"{threading.get_ident()}:{(method, params)}"
99+
)
100+
cached_request = cache.get_cache_entry(cache_key)
101+
if cached_request is not None:
102+
return cached_request
103+
104+
response = make_request(method, params)
105+
if should_cache_fn(method, params, response):
106+
lock.acquire(blocking=False)
107+
try:
110108
cache.cache(cache_key, response)
111-
return response
112-
else:
113-
return make_request(method, params)
114-
finally:
115-
if lock_acquired:
116-
lock.release()
109+
finally:
110+
lock.release()
111+
return response
112+
else:
113+
return make_request(method, params)
117114

118115
return middleware
119116

web3/providers/async_rpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
construct_user_agent,
2323
)
2424
from web3._utils.request import (
25+
async_cache_and_return_session as _cache_and_return_async_session,
2526
async_make_post_request,
26-
cache_and_return_async_session as _cache_and_return_async_session,
2727
get_default_http_endpoint,
2828
)
2929
from web3.types import (

web3/utils/caching.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Any,
66
Dict,
77
Optional,
8+
Tuple,
89
)
910

1011

@@ -13,7 +14,7 @@ def __init__(self, size: int = 100):
1314
self._size = size
1415
self._data: OrderedDict[str, Any] = OrderedDict()
1516

16-
def cache(self, key: str, value: Any) -> Dict[str, Any]:
17+
def cache(self, key: str, value: Any) -> Tuple[Any, Dict[str, Any]]:
1718
evicted_items = None
1819
# If the key is already in the OrderedDict just update it
1920
# and don't evict any values. Ideally, we could still check to see
@@ -26,7 +27,12 @@ def cache(self, key: str, value: Any) -> Dict[str, Any]:
2627
k, v = self._data.popitem(last=False)
2728
evicted_items[k] = v
2829
self._data[key] = value
29-
return evicted_items
30+
31+
# Return the cached value along with the evicted items at the same time. If we
32+
# only return the evicted items and try to pull the cached entry back out of the
33+
# cache, by the time we reach back into the cache it may have been evicted
34+
# from the cache.
35+
return value, evicted_items
3036

3137
def get_cache_entry(self, key: str) -> Optional[Any]:
3238
return self._data[key] if key in self._data else None

0 commit comments

Comments
 (0)