Skip to content

Commit 308ee6b

Browse files
committed
chore: add takeover test with stuck connections
Signed-off-by: Kostas Kyrimis <[email protected]>
1 parent eaedc9a commit 308ee6b

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

tests/dragonfly/replication_test.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3666,3 +3666,54 @@ async def test_replica_of_self(async_client):
36663666

36673667
with pytest.raises(redis.exceptions.ResponseError):
36683668
await async_client.execute_command(f"replicaof 127.0.0.1 {port}")
3669+
3670+
3671+
@dfly_args({"proactor_threads": 2})
3672+
async def test_takeover_with_stuck_connections(df_factory: DflyInstanceFactory):
3673+
master = df_factory.create()
3674+
master.start()
3675+
3676+
async_client = master.client()
3677+
await async_client.execute_command("debug populate 2000")
3678+
3679+
reader, writer = await asyncio.open_connection("127.0.0.1", master.port)
3680+
writer.write(f"client setname writer_test\n".encode())
3681+
await writer.drain()
3682+
assert "OK" in (await reader.readline()).decode()
3683+
size = 1024 * 1024
3684+
writer.write(f"SET a {'v'*size}\n".encode())
3685+
await writer.drain()
3686+
3687+
async def get_task():
3688+
while True:
3689+
writer.write(f"GET a\n".encode())
3690+
await writer.drain()
3691+
await asyncio.sleep(0.1)
3692+
3693+
get = asyncio.create_task(get_task())
3694+
3695+
@assert_eventually(times=600)
3696+
async def wait_for_stuck_on_send():
3697+
clients = await async_client.client_list()
3698+
logging.info("wait_for_stuck_on_send clients: %s", clients)
3699+
phase = next(
3700+
(client["phase"] for client in clients if client["name"] == "writer_test"), None
3701+
)
3702+
assert phase == "send"
3703+
3704+
await wait_for_stuck_on_send()
3705+
3706+
replica = df_factory.create()
3707+
replica.start()
3708+
3709+
replica_cl = replica.client()
3710+
3711+
res = await replica_cl.execute_command(f"replicaof localhost {master.port}")
3712+
assert res == "OK"
3713+
3714+
# Wait for all replicas to transition into stable sync
3715+
async with async_timeout.timeout(240):
3716+
await wait_for_replicas_state(replica_cl)
3717+
3718+
with pytest.raises(redis.exceptions.ResponseError) as e:
3719+
await replica_cl.execute_command("REPLTAKEOVER 5")

0 commit comments

Comments
 (0)