Skip to content

Commit 6b3e0b4

Browse files
Dev/no lock (#2308)
* Remove async lock in asyncio.Connection.read_response * Skip concurrent-commands test on non-pooled connections
1 parent 7c6a812 commit 6b3e0b4

File tree

4 files changed

+29
-66
lines changed

4 files changed

+29
-66
lines changed

redis/asyncio/cluster.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ async def on_connect(self, connection: Connection) -> None:
390390
# regardless of the server type. If this is a primary connection,
391391
# READONLY would not affect executing write commands.
392392
await connection.send_command("READONLY")
393-
if str_if_bytes(await connection.read_response_without_lock()) != "OK":
393+
if str_if_bytes(await connection.read_response()) != "OK":
394394
raise ConnectionError("READONLY command failed")
395395

396396
def get_nodes(self) -> List["ClusterNode"]:
@@ -866,11 +866,9 @@ async def parse_response(
866866
) -> Any:
867867
try:
868868
if NEVER_DECODE in kwargs:
869-
response = await connection.read_response_without_lock(
870-
disable_decoding=True
871-
)
869+
response = await connection.read_response(disable_decoding=True)
872870
else:
873-
response = await connection.read_response_without_lock()
871+
response = await connection.read_response()
874872
except ResponseError:
875873
if EMPTY_RESPONSE in kwargs:
876874
return kwargs[EMPTY_RESPONSE]

redis/asyncio/connection.py

-35
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,6 @@ def __init__(
671671
self.set_parser(parser_class)
672672
self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = []
673673
self._buffer_cutoff = 6000
674-
self._lock = asyncio.Lock()
675674

676675
def __repr__(self):
677676
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
@@ -940,39 +939,6 @@ async def can_read(self, timeout: float = 0):
940939
)
941940

942941
async def read_response(self, disable_decoding: bool = False):
943-
"""Read the response from a previously sent command"""
944-
try:
945-
async with self._lock:
946-
if self.socket_timeout:
947-
async with async_timeout.timeout(self.socket_timeout):
948-
response = await self._parser.read_response(
949-
disable_decoding=disable_decoding
950-
)
951-
else:
952-
response = await self._parser.read_response(
953-
disable_decoding=disable_decoding
954-
)
955-
except asyncio.TimeoutError:
956-
await self.disconnect()
957-
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
958-
except OSError as e:
959-
await self.disconnect()
960-
raise ConnectionError(
961-
f"Error while reading from {self.host}:{self.port} : {e.args}"
962-
)
963-
except BaseException:
964-
await self.disconnect()
965-
raise
966-
967-
if self.health_check_interval:
968-
next_time = asyncio.get_running_loop().time() + self.health_check_interval
969-
self.next_health_check = next_time
970-
971-
if isinstance(response, ResponseError):
972-
raise response from None
973-
return response
974-
975-
async def read_response_without_lock(self, disable_decoding: bool = False):
976942
"""Read the response from a previously sent command"""
977943
try:
978944
if self.socket_timeout:
@@ -1241,7 +1207,6 @@ def __init__(
12411207
self.set_parser(parser_class)
12421208
self._connect_callbacks = []
12431209
self._buffer_cutoff = 6000
1244-
self._lock = asyncio.Lock()
12451210

12461211
def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]:
12471212
pieces = [("path", self.path), ("db", self.db)]

tests/test_asyncio/test_cluster.py

+22-26
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
120120
def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
121121
connection = mock.AsyncMock()
122122
connection.is_connected = True
123-
connection.read_response_without_lock.return_value = response
123+
connection.read_response.return_value = response
124124
while node._free:
125125
node._free.pop()
126126
node._free.append(connection)
@@ -130,7 +130,7 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
130130
def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode:
131131
connection = mock.AsyncMock()
132132
connection.is_connected = True
133-
connection.read_response_without_lock.side_effect = exc
133+
connection.read_response.side_effect = exc
134134
while node._free:
135135
node._free.pop()
136136
node._free.append(connection)
@@ -275,16 +275,12 @@ async def test_max_connections(
275275
for node in rc.get_nodes():
276276
assert node.max_connections == 10
277277

278-
with mock.patch.object(
279-
Connection, "read_response_without_lock"
280-
) as read_response_without_lock:
278+
with mock.patch.object(Connection, "read_response") as read_response:
281279

282-
async def read_response_without_lock_mocked(
283-
*args: Any, **kwargs: Any
284-
) -> None:
280+
async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
285281
await asyncio.sleep(10)
286282

287-
read_response_without_lock.side_effect = read_response_without_lock_mocked
283+
read_response.side_effect = read_response_mocked
288284

289285
with pytest.raises(MaxConnectionsError):
290286
await asyncio.gather(
@@ -316,10 +312,10 @@ async def test_execute_command_node_flag_primaries(self, r: RedisCluster) -> Non
316312
assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True
317313
for primary in primaries:
318314
conn = primary._free.pop()
319-
assert conn.read_response_without_lock.called is True
315+
assert conn.read_response.called is True
320316
for replica in replicas:
321317
conn = replica._free.pop()
322-
assert conn.read_response_without_lock.called is not True
318+
assert conn.read_response.called is not True
323319

324320
async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None:
325321
"""
@@ -333,10 +329,10 @@ async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None
333329
assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True
334330
for replica in replicas:
335331
conn = replica._free.pop()
336-
assert conn.read_response_without_lock.called is True
332+
assert conn.read_response.called is True
337333
for primary in primaries:
338334
conn = primary._free.pop()
339-
assert conn.read_response_without_lock.called is not True
335+
assert conn.read_response.called is not True
340336

341337
await r.close()
342338

@@ -348,7 +344,7 @@ async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> Non
348344
assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
349345
for node in r.get_nodes():
350346
conn = node._free.pop()
351-
assert conn.read_response_without_lock.called is True
347+
assert conn.read_response.called is True
352348

353349
async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
354350
"""
@@ -359,7 +355,7 @@ async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
359355
called_count = 0
360356
for node in r.get_nodes():
361357
conn = node._free.pop()
362-
if conn.read_response_without_lock.called is True:
358+
if conn.read_response.called is True:
363359
called_count += 1
364360
assert called_count == 1
365361

@@ -372,7 +368,7 @@ async def test_execute_command_default_node(self, r: RedisCluster) -> None:
372368
mock_node_resp(def_node, "PONG")
373369
assert await r.ping() is True
374370
conn = def_node._free.pop()
375-
assert conn.read_response_without_lock.called
371+
assert conn.read_response.called
376372

377373
async def test_ask_redirection(self, r: RedisCluster) -> None:
378374
"""
@@ -516,7 +512,7 @@ async def test_reading_from_replicas_in_round_robin(self) -> None:
516512
with mock.patch.multiple(
517513
Connection,
518514
send_command=mock.DEFAULT,
519-
read_response_without_lock=mock.DEFAULT,
515+
read_response=mock.DEFAULT,
520516
_connect=mock.DEFAULT,
521517
can_read=mock.DEFAULT,
522518
on_connect=mock.DEFAULT,
@@ -548,7 +544,7 @@ def execute_command_mock_third(self, *args, **options):
548544
# so we'll mock some of the Connection's functions to allow it
549545
execute_command.side_effect = execute_command_mock_first
550546
mocks["send_command"].return_value = True
551-
mocks["read_response_without_lock"].return_value = "OK"
547+
mocks["read_response"].return_value = "OK"
552548
mocks["_connect"].return_value = True
553549
mocks["can_read"].return_value = False
554550
mocks["on_connect"].return_value = True
@@ -857,8 +853,8 @@ async def test_cluster_delslots(self) -> None:
857853
node0 = r.get_node(default_host, 7000)
858854
node1 = r.get_node(default_host, 7001)
859855
assert await r.cluster_delslots(0, 8192) == [True, True]
860-
assert node0._free.pop().read_response_without_lock.called
861-
assert node1._free.pop().read_response_without_lock.called
856+
assert node0._free.pop().read_response.called
857+
assert node1._free.pop().read_response.called
862858

863859
await r.close()
864860

@@ -1027,7 +1023,7 @@ async def test_cluster_setslot_stable(self, r: RedisCluster) -> None:
10271023
node = r.nodes_manager.get_node_from_slot(12182)
10281024
mock_node_resp(node, "OK")
10291025
assert await r.cluster_setslot_stable(12182) is True
1030-
assert node._free.pop().read_response_without_lock.called
1026+
assert node._free.pop().read_response.called
10311027

10321028
@skip_if_redis_enterprise()
10331029
async def test_cluster_replicas(self, r: RedisCluster) -> None:
@@ -1069,7 +1065,7 @@ async def test_readonly(self) -> None:
10691065
for res in all_replicas_results.values():
10701066
assert res is True
10711067
for replica in r.get_replicas():
1072-
assert replica._free.pop().read_response_without_lock.called
1068+
assert replica._free.pop().read_response.called
10731069

10741070
await r.close()
10751071

@@ -1082,7 +1078,7 @@ async def test_readwrite(self) -> None:
10821078
for res in all_replicas_results.values():
10831079
assert res is True
10841080
for replica in r.get_replicas():
1085-
assert replica._free.pop().read_response_without_lock.called
1081+
assert replica._free.pop().read_response.called
10861082

10871083
await r.close()
10881084

@@ -2441,8 +2437,8 @@ async def test_asking_error(self, r: RedisCluster) -> None:
24412437
mock_node_resp_exc(first_node, AskError(ask_msg))
24422438
mock_node_resp(ask_node, "MOCK_OK")
24432439
res = await pipe.get(key).execute()
2444-
assert first_node._free.pop().read_response_without_lock.await_count
2445-
assert ask_node._free.pop().read_response_without_lock.await_count
2440+
assert first_node._free.pop().read_response.await_count
2441+
assert ask_node._free.pop().read_response.await_count
24462442
assert res == ["MOCK_OK"]
24472443

24482444
async def test_moved_redirection_on_slave_with_default(
@@ -2497,7 +2493,7 @@ async def test_readonly_pipeline_from_readonly_client(
24972493
executed_on_replica = False
24982494
for node in slot_nodes:
24992495
if node.server_type == REPLICA:
2500-
if node._free.pop().read_response_without_lock.await_count:
2496+
if node._free.pop().read_response.await_count:
25012497
executed_on_replica = True
25022498
break
25032499
assert executed_on_replica

tests/test_asyncio/test_connection.py

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ async def test_socket_param_regression(r):
6464

6565

6666
async def test_can_run_concurrent_commands(r):
67+
if getattr(r, "connection", None) is not None:
68+
# Concurrent commands are only supported on pooled or cluster connections
69+
# since there is no synchronization on a single connection.
70+
pytest.skip("pool only")
6771
assert await r.ping() is True
6872
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))
6973

0 commit comments

Comments
 (0)