From 369214ffd4cb86419df8559f80136d987f9d74f6 Mon Sep 17 00:00:00 2001 From: Eugene Morozov Date: Sun, 7 Nov 2021 21:58:54 +0300 Subject: [PATCH] Fixes garbage collection deadlock. --- dev_requirements.txt | 1 + redis/connection.py | 9 ++++++--- tests/test_pubsub.py | 12 ++++++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index aa9d8f9eee..0ca7727049 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,5 +1,6 @@ flake8>=3.9.2 pytest==6.2.5 +pytest-timeout==2.0.1 tox==3.24.4 tox-docker==3.1.0 invoke==1.6.0 diff --git a/redis/connection.py b/redis/connection.py index f5d6a38221..cb9acb4595 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -10,6 +10,7 @@ import socket import threading import warnings +import weakref from redis.exceptions import ( AuthenticationError, @@ -562,7 +563,7 @@ def __del__(self): pass def register_connect_callback(self, callback): - self._connect_callbacks.append(callback) + self._connect_callbacks.append(weakref.WeakMethod(callback)) def clear_connect_callbacks(self): self._connect_callbacks = [] @@ -588,8 +589,10 @@ def connect(self): # run any user callbacks. right now the only internal callback # is for pubsub channel/pattern resubscription - for callback in self._connect_callbacks: - callback(self) + for ref in self._connect_callbacks: + callback = ref() + if callback: + callback(self) def _connect(self): "Create a TCP socket connection" diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 6a4f0aafa4..4be6c7a305 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -570,3 +570,15 @@ def exception_handler(ex, pubsub, thread): assert event.wait(timeout=1.0) pubsub_thread.join(timeout=1.0) assert not pubsub_thread.is_alive() + + +class TestPubSubDeadlock: + @pytest.mark.timeout(30, method='thread') + def test_pubsub_deadlock(self, master_host): + pool = redis.ConnectionPool(host=master_host) + r = redis.Redis(connection_pool=pool) + + for i in range(60): + p = r.pubsub() + p.subscribe("my-channel-1", "my-channel-2") + pool.reset()