Skip to content

Removing the threading.Lock locks and replacing them with RLock objects to avoid deadlocks. #3677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,7 @@ def __init__(
]:
raise RedisError("Client caching is only supported with RESP version 3")

# TODO: To avoid breaking changes during the bug fix, we have to keep non-reentrant lock.
# TODO: Remove this before next major version (7.0.0)
self.single_connection_lock = threading.Lock()
self.single_connection_lock = threading.RLock()
self.connection = None
self._single_connection_client = single_connection_client
if self._single_connection_client:
Expand Down Expand Up @@ -776,9 +774,7 @@ def __init__(
else:
self._event_dispatcher = event_dispatcher

# TODO: To avoid breaking changes during the bug fix, we have to keep non-reentrant lock.
# TODO: Remove this before next major version (7.0.0)
self._lock = threading.Lock()
self._lock = threading.RLock()
if self.encoder is None:
self.encoder = self.connection_pool.get_encoder()
self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
Expand Down
10 changes: 2 additions & 8 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ def __init__(
self,
conn: ConnectionInterface,
cache: CacheInterface,
pool_lock: threading.Lock,
pool_lock: threading.RLock,
):
self.pid = os.getpid()
self._conn = conn
Expand Down Expand Up @@ -1422,13 +1422,7 @@ def __init__(
# release the lock.

self._fork_lock = threading.RLock()

if self.cache is None:
self._lock = threading.RLock()
else:
# TODO: To avoid breaking changes during the bug fix, we have to keep non-reentrant lock.
# TODO: Remove this before next major version (7.0.0)
self._lock = threading.Lock()
self._lock = threading.RLock()

self.reset()

Expand Down
8 changes: 4 additions & 4 deletions redis/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def __init__(
self,
connection,
client_type: ClientType,
connection_lock: Union[threading.Lock, asyncio.Lock],
connection_lock: Union[threading.RLock, asyncio.Lock],
):
self._connection = connection
self._client_type = client_type
Expand All @@ -167,7 +167,7 @@ def client_type(self) -> ClientType:
return self._client_type

@property
def connection_lock(self) -> Union[threading.Lock, asyncio.Lock]:
def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]:
return self._connection_lock


Expand All @@ -177,7 +177,7 @@ def __init__(
pubsub_connection,
connection_pool,
client_type: ClientType,
connection_lock: Union[threading.Lock, asyncio.Lock],
connection_lock: Union[threading.RLock, asyncio.Lock],
):
self._pubsub_connection = pubsub_connection
self._connection_pool = connection_pool
Expand All @@ -197,7 +197,7 @@ def client_type(self) -> ClientType:
return self._client_type

@property
def connection_lock(self) -> Union[threading.Lock, asyncio.Lock]:
def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]:
return self._connection_lock


Expand Down
4 changes: 2 additions & 2 deletions tests/test_cluster_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def test_retry_transaction_on_connection_error(self, r, mock_connection):
mock_pool = Mock(spec=ConnectionPool)
mock_pool.get_connection.return_value = mock_connection
mock_pool._available_connections = [mock_connection]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()

_node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot)
node_importing.redis_connection.connection_pool = mock_pool
Expand All @@ -310,7 +310,7 @@ def test_retry_transaction_on_connection_error_with_watched_keys(
mock_pool = Mock(spec=ConnectionPool)
mock_pool.get_connection.return_value = mock_connection
mock_pool._available_connections = [mock_connection]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()

_node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot)
node_importing.redis_connection.connection_pool = mock_pool
Expand Down
6 changes: 3 additions & 3 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def test_clears_cache_on_disconnect(self, mock_connection, cache_conf):
mock_connection.credential_provider = UsernamePasswordCredentialProvider()

proxy_connection = CacheProxyConnection(
mock_connection, cache, threading.Lock()
mock_connection, cache, threading.RLock()
)
proxy_connection.disconnect()

Expand Down Expand Up @@ -492,7 +492,7 @@ def test_read_response_returns_cached_reply(self, mock_cache, mock_connection):
mock_connection.can_read.return_value = False

proxy_connection = CacheProxyConnection(
mock_connection, mock_cache, threading.Lock()
mock_connection, mock_cache, threading.RLock()
)
proxy_connection.send_command(*["GET", "foo"], **{"keys": ["foo"]})
assert proxy_connection.read_response() == b"bar"
Expand Down Expand Up @@ -554,7 +554,7 @@ def test_triggers_invalidation_processing_on_another_connection(
mock_connection.can_read.return_value = False

proxy_connection = CacheProxyConnection(
mock_connection, mock_cache, threading.Lock()
mock_connection, mock_cache, threading.RLock()
)
proxy_connection.send_command(*["GET", "foo"], **{"keys": ["foo"]})

Expand Down
10 changes: 5 additions & 5 deletions tests/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def test_re_auth_all_connections(self, credential_provider):
}
mock_pool.get_connection.return_value = mock_connection
mock_pool._available_connections = [mock_connection, mock_another_connection]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()
auth_token = None

def re_auth_callback(token):
Expand Down Expand Up @@ -382,7 +382,7 @@ def test_re_auth_partial_connections(self, credential_provider):
mock_another_connection,
mock_failed_connection,
]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()

def _raise(error: RedisError):
pass
Expand Down Expand Up @@ -442,7 +442,7 @@ def test_re_auth_pub_sub_in_resp3(self, credential_provider):
mock_another_connection,
]
mock_pool._available_connections = [mock_another_connection]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()
auth_token = None

def re_auth_callback(token):
Expand Down Expand Up @@ -502,7 +502,7 @@ def test_do_not_re_auth_pub_sub_in_resp2(self, credential_provider):
mock_another_connection,
]
mock_pool._available_connections = [mock_another_connection]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()
auth_token = None

def re_auth_callback(token):
Expand Down Expand Up @@ -560,7 +560,7 @@ def test_fails_on_token_renewal(self, credential_provider):
}
mock_pool.get_connection.return_value = mock_connection
mock_pool._available_connections = [mock_connection, mock_another_connection]
mock_pool._lock = threading.Lock()
mock_pool._lock = threading.RLock()

Redis(
connection_pool=mock_pool,
Expand Down
Loading