Description
Version: redis-py 3.5.3, redis 6.2.5
Platform: Python 3.9.6 on MacOS
Reading through the code and the issues, it seems Pubsub is supposed to automatically reconnect if the connection breaks. However, calling PubSub.get_message()
does not reconnect, instead will raise a ConnectionError.
To reproduce, here's a simple script that publishes a message to a topic every 1 second, and in another thread runs get_message()
in a loop.
import redis
import itertools
import threading
from time import sleep
redis = redis.Redis()
pubsub = redis.pubsub()
pubsub.subscribe(topic=print)
class PubSubWorkerThread(threading.Thread):
def __init__(self, pubsub, sleep_time, daemon=False):
super(PubSubWorkerThread, self).__init__()
self.daemon = daemon
self.pubsub = pubsub
self.sleep_time = sleep_time
self._running = threading.Event()
def run(self):
if self._running.is_set():
return
self._running.set()
pubsub = self.pubsub
sleep_time = self.sleep_time
while self._running.is_set():
try:
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
except Exception:
print('get_message failed')
sleep(2)
pubsub.close()
def stop(self):
# trip the flag so the run loop exits. the run loop will
# close the pubsub connection, which disconnects the socket
# and returns the connection to the pool.
self._running.clear()
thread = PubSubWorkerThread(pubsub, 1)
thread.start()
for i in itertools.count():
try:
redis.publish('topic', str(i))
except Exception:
print('publish failed')
sleep(1)
Run this, and while running stop your Redis server, then start it up again.
Expected behavior: Publish and get_message will fail while Redis is stopped, then resume working when Redis is started.
Observed behavior: After restarting Redis, publish resumes working, get_message continues to fail.
Stack trace: (Same exception before and after restarting server)
Traceback (most recent call last):
File "/Users/luhn/Code/redis-py/test.py", line 24, in run
pubsub.get_message(ignore_subscribe_messages=True,
File "/Users/luhn/Code/redis-py/redis/client.py", line 3617, in get_message
response = self.parse_response(block=False, timeout=timeout)
File "/Users/luhn/Code/redis-py/redis/client.py", line 3503, in parse_response
if not block and not conn.can_read(timeout=timeout):
File "/Users/luhn/Code/redis-py/redis/connection.py", line 734, in can_read
return self._parser.can_read(timeout)
File "/Users/luhn/Code/redis-py/redis/connection.py", line 321, in can_read
return self._buffer and self._buffer.can_read(timeout)
File "/Users/luhn/Code/redis-py/redis/connection.py", line 230, in can_read
self._read_from_socket(timeout=timeout,
File "/Users/luhn/Code/redis-py/redis/connection.py", line 201, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.