Skip to content

Commit 03b9bbe

Browse files
committed
Remove async lock in asyncio.Connection.read_response
1 parent 48f5aca commit 03b9bbe

File tree

3 files changed

+25
-66
lines changed

3 files changed

+25
-66
lines changed

redis/asyncio/cluster.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ async def on_connect(self, connection: Connection) -> None:
390390
# regardless of the server type. If this is a primary connection,
391391
# READONLY would not affect executing write commands.
392392
await connection.send_command("READONLY")
393-
if str_if_bytes(await connection.read_response_without_lock()) != "OK":
393+
if str_if_bytes(await connection.read_response()) != "OK":
394394
raise ConnectionError("READONLY command failed")
395395

396396
def get_nodes(self) -> List["ClusterNode"]:
@@ -866,11 +866,9 @@ async def parse_response(
866866
) -> Any:
867867
try:
868868
if NEVER_DECODE in kwargs:
869-
response = await connection.read_response_without_lock(
870-
disable_decoding=True
871-
)
869+
response = await connection.read_response(disable_decoding=True)
872870
else:
873-
response = await connection.read_response_without_lock()
871+
response = await connection.read_response()
874872
except ResponseError:
875873
if EMPTY_RESPONSE in kwargs:
876874
return kwargs[EMPTY_RESPONSE]

redis/asyncio/connection.py

-35
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,6 @@ def __init__(
660660
self.set_parser(parser_class)
661661
self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = []
662662
self._buffer_cutoff = 6000
663-
self._lock = asyncio.Lock()
664663

665664
def __repr__(self):
666665
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
@@ -929,39 +928,6 @@ async def can_read(self, timeout: float = 0):
929928
)
930929

931930
async def read_response(self, disable_decoding: bool = False):
932-
"""Read the response from a previously sent command"""
933-
try:
934-
async with self._lock:
935-
if self.socket_timeout:
936-
async with async_timeout.timeout(self.socket_timeout):
937-
response = await self._parser.read_response(
938-
disable_decoding=disable_decoding
939-
)
940-
else:
941-
response = await self._parser.read_response(
942-
disable_decoding=disable_decoding
943-
)
944-
except asyncio.TimeoutError:
945-
await self.disconnect()
946-
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
947-
except OSError as e:
948-
await self.disconnect()
949-
raise ConnectionError(
950-
f"Error while reading from {self.host}:{self.port} : {e.args}"
951-
)
952-
except BaseException:
953-
await self.disconnect()
954-
raise
955-
956-
if self.health_check_interval:
957-
next_time = asyncio.get_running_loop().time() + self.health_check_interval
958-
self.next_health_check = next_time
959-
960-
if isinstance(response, ResponseError):
961-
raise response from None
962-
return response
963-
964-
async def read_response_without_lock(self, disable_decoding: bool = False):
965931
"""Read the response from a previously sent command"""
966932
try:
967933
if self.socket_timeout:
@@ -1230,7 +1196,6 @@ def __init__(
12301196
self.set_parser(parser_class)
12311197
self._connect_callbacks = []
12321198
self._buffer_cutoff = 6000
1233-
self._lock = asyncio.Lock()
12341199

12351200
def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]:
12361201
pieces = [("path", self.path), ("db", self.db)]

tests/test_asyncio/test_cluster.py

+22-26
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
120120
def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
121121
connection = mock.AsyncMock()
122122
connection.is_connected = True
123-
connection.read_response_without_lock.return_value = response
123+
connection.read_response.return_value = response
124124
while node._free:
125125
node._free.pop()
126126
node._free.append(connection)
@@ -130,7 +130,7 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
130130
def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode:
131131
connection = mock.AsyncMock()
132132
connection.is_connected = True
133-
connection.read_response_without_lock.side_effect = exc
133+
connection.read_response.side_effect = exc
134134
while node._free:
135135
node._free.pop()
136136
node._free.append(connection)
@@ -275,16 +275,12 @@ async def test_max_connections(
275275
for node in rc.get_nodes():
276276
assert node.max_connections == 10
277277

278-
with mock.patch.object(
279-
Connection, "read_response_without_lock"
280-
) as read_response_without_lock:
278+
with mock.patch.object(Connection, "read_response") as read_response:
281279

282-
async def read_response_without_lock_mocked(
283-
*args: Any, **kwargs: Any
284-
) -> None:
280+
async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
285281
await asyncio.sleep(10)
286282

287-
read_response_without_lock.side_effect = read_response_without_lock_mocked
283+
read_response.side_effect = read_response_mocked
288284

289285
with pytest.raises(MaxConnectionsError):
290286
await asyncio.gather(
@@ -316,10 +312,10 @@ async def test_execute_command_node_flag_primaries(self, r: RedisCluster) -> Non
316312
assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True
317313
for primary in primaries:
318314
conn = primary._free.pop()
319-
assert conn.read_response_without_lock.called is True
315+
assert conn.read_response.called is True
320316
for replica in replicas:
321317
conn = replica._free.pop()
322-
assert conn.read_response_without_lock.called is not True
318+
assert conn.read_response.called is not True
323319

324320
async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None:
325321
"""
@@ -333,10 +329,10 @@ async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None
333329
assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True
334330
for replica in replicas:
335331
conn = replica._free.pop()
336-
assert conn.read_response_without_lock.called is True
332+
assert conn.read_response.called is True
337333
for primary in primaries:
338334
conn = primary._free.pop()
339-
assert conn.read_response_without_lock.called is not True
335+
assert conn.read_response.called is not True
340336

341337
await r.close()
342338

@@ -348,7 +344,7 @@ async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> Non
348344
assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
349345
for node in r.get_nodes():
350346
conn = node._free.pop()
351-
assert conn.read_response_without_lock.called is True
347+
assert conn.read_response.called is True
352348

353349
async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
354350
"""
@@ -359,7 +355,7 @@ async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
359355
called_count = 0
360356
for node in r.get_nodes():
361357
conn = node._free.pop()
362-
if conn.read_response_without_lock.called is True:
358+
if conn.read_response.called is True:
363359
called_count += 1
364360
assert called_count == 1
365361

@@ -372,7 +368,7 @@ async def test_execute_command_default_node(self, r: RedisCluster) -> None:
372368
mock_node_resp(def_node, "PONG")
373369
assert await r.ping() is True
374370
conn = def_node._free.pop()
375-
assert conn.read_response_without_lock.called
371+
assert conn.read_response.called
376372

377373
async def test_ask_redirection(self, r: RedisCluster) -> None:
378374
"""
@@ -516,7 +512,7 @@ async def test_reading_from_replicas_in_round_robin(self) -> None:
516512
with mock.patch.multiple(
517513
Connection,
518514
send_command=mock.DEFAULT,
519-
read_response_without_lock=mock.DEFAULT,
515+
read_response=mock.DEFAULT,
520516
_connect=mock.DEFAULT,
521517
can_read=mock.DEFAULT,
522518
on_connect=mock.DEFAULT,
@@ -548,7 +544,7 @@ def execute_command_mock_third(self, *args, **options):
548544
# so we'll mock some of the Connection's functions to allow it
549545
execute_command.side_effect = execute_command_mock_first
550546
mocks["send_command"].return_value = True
551-
mocks["read_response_without_lock"].return_value = "OK"
547+
mocks["read_response"].return_value = "OK"
552548
mocks["_connect"].return_value = True
553549
mocks["can_read"].return_value = False
554550
mocks["on_connect"].return_value = True
@@ -857,8 +853,8 @@ async def test_cluster_delslots(self) -> None:
857853
node0 = r.get_node(default_host, 7000)
858854
node1 = r.get_node(default_host, 7001)
859855
assert await r.cluster_delslots(0, 8192) == [True, True]
860-
assert node0._free.pop().read_response_without_lock.called
861-
assert node1._free.pop().read_response_without_lock.called
856+
assert node0._free.pop().read_response.called
857+
assert node1._free.pop().read_response.called
862858

863859
await r.close()
864860

@@ -1027,7 +1023,7 @@ async def test_cluster_setslot_stable(self, r: RedisCluster) -> None:
10271023
node = r.nodes_manager.get_node_from_slot(12182)
10281024
mock_node_resp(node, "OK")
10291025
assert await r.cluster_setslot_stable(12182) is True
1030-
assert node._free.pop().read_response_without_lock.called
1026+
assert node._free.pop().read_response.called
10311027

10321028
@skip_if_redis_enterprise()
10331029
async def test_cluster_replicas(self, r: RedisCluster) -> None:
@@ -1069,7 +1065,7 @@ async def test_readonly(self) -> None:
10691065
for res in all_replicas_results.values():
10701066
assert res is True
10711067
for replica in r.get_replicas():
1072-
assert replica._free.pop().read_response_without_lock.called
1068+
assert replica._free.pop().read_response.called
10731069

10741070
await r.close()
10751071

@@ -1082,7 +1078,7 @@ async def test_readwrite(self) -> None:
10821078
for res in all_replicas_results.values():
10831079
assert res is True
10841080
for replica in r.get_replicas():
1085-
assert replica._free.pop().read_response_without_lock.called
1081+
assert replica._free.pop().read_response.called
10861082

10871083
await r.close()
10881084

@@ -2441,8 +2437,8 @@ async def test_asking_error(self, r: RedisCluster) -> None:
24412437
mock_node_resp_exc(first_node, AskError(ask_msg))
24422438
mock_node_resp(ask_node, "MOCK_OK")
24432439
res = await pipe.get(key).execute()
2444-
assert first_node._free.pop().read_response_without_lock.await_count
2445-
assert ask_node._free.pop().read_response_without_lock.await_count
2440+
assert first_node._free.pop().read_response.await_count
2441+
assert ask_node._free.pop().read_response.await_count
24462442
assert res == ["MOCK_OK"]
24472443

24482444
async def test_moved_redirection_on_slave_with_default(
@@ -2497,7 +2493,7 @@ async def test_readonly_pipeline_from_readonly_client(
24972493
executed_on_replica = False
24982494
for node in slot_nodes:
24992495
if node.server_type == REPLICA:
2500-
if node._free.pop().read_response_without_lock.await_count:
2496+
if node._free.pop().read_response.await_count:
25012497
executed_on_replica = True
25022498
break
25032499
assert executed_on_replica

0 commit comments

Comments
 (0)