Skip to content

Add unittest for PubSub.connect() #2167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions tests/test_asyncio/test_pubsub.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import functools
import sys
from typing import Optional

Expand All @@ -20,6 +21,18 @@
pytestmark = pytest.mark.asyncio(forbid_global_loop=True)


def with_timeout(t):
def wrapper(corofunc):
@functools.wraps(corofunc)
async def run(*args, **kwargs):
async with async_timeout.timeout(t):
return await corofunc(*args, **kwargs)

return run

return wrapper


async def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
now = asyncio.get_event_loop().time()
timeout = now + timeout
Expand Down Expand Up @@ -603,6 +616,75 @@ async def test_get_message_with_timeout_returns_none(self, r: redis.Redis):
assert await p.get_message(timeout=0.01) is None


@pytest.mark.onlynoncluster
class TestPubSubReconnect:
# @pytest.mark.xfail
@with_timeout(2)
async def test_reconnect_listen(self, r: redis.Redis):
"""
Test that a loop processing PubSub messages can survive
a disconnect, by issuing a connect() call.
"""
messages = asyncio.Queue()
pubsub = r.pubsub()
interrupt = False

async def loop():
# must make sure the task exits
async with async_timeout.timeout(2):
nonlocal interrupt
await pubsub.subscribe("foo")
while True:
# print("loop")
try:
try:
await pubsub.connect()
await loop_step()
# print("succ")
except redis.ConnectionError:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
# we use a cancel to interrupt the "listen"
# when we perform a disconnect
# print("cancel", interrupt)
if interrupt:
interrupt = False
else:
raise

async def loop_step():
# get a single message via listen()
async for message in pubsub.listen():
await messages.put(message)
break

task = asyncio.get_event_loop().create_task(loop())
# get the initial connect message
async with async_timeout.timeout(1):
message = await messages.get()
assert message == {
"channel": b"foo",
"data": 1,
"pattern": None,
"type": "subscribe",
}
# now, disconnect the connection.
await pubsub.connection.disconnect()
interrupt = True
task.cancel() # interrupt the listen call
# await another auto-connect message
message = await messages.get()
assert message == {
"channel": b"foo",
"data": 1,
"pattern": None,
"type": "subscribe",
}
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task


@pytest.mark.onlynoncluster
class TestPubSubRun:
async def _subscribe(self, p, *args, **kwargs):
Expand Down