@@ -3666,3 +3666,58 @@ async def test_replica_of_self(async_client):
36663666
36673667 with pytest .raises (redis .exceptions .ResponseError ):
36683668 await async_client .execute_command (f"replicaof 127.0.0.1 { port } " )
3669+
3670+
3671+ @dfly_args ({"proactor_threads" : 2 })
3672+ async def test_takeover_with_stuck_connections (df_factory : DflyInstanceFactory ):
3673+ master = df_factory .create ()
3674+ master .start ()
3675+
3676+ async_client = master .client ()
3677+ await async_client .execute_command ("debug populate 2000" )
3678+
3679+ reader , writer = await asyncio .open_connection ("127.0.0.1" , master .port )
3680+ writer .write (f"client setname writer_test\n " .encode ())
3681+ await writer .drain ()
3682+ assert "OK" in (await reader .readline ()).decode ()
3683+ size = 1024 * 1024
3684+ writer .write (f"SET a { 'v' * size } \n " .encode ())
3685+ await writer .drain ()
3686+
3687+ replica = df_factory .create ()
3688+ replica .start ()
3689+
3690+ replica_cl = replica .client ()
3691+
3692+ res = await replica_cl .execute_command (f"replicaof localhost { master .port } " )
3693+ assert res == "OK"
3694+
3695+ # Wait for all replicas to transition into stable sync
3696+ async with async_timeout .timeout (240 ):
3697+ await wait_for_replicas_state (replica_cl )
3698+
3699+ async def get_task ():
3700+ while True :
3701+ writer .write (f"GET a\n " .encode ())
3702+ await writer .drain ()
3703+ await asyncio .sleep (0.1 )
3704+
3705+ get = asyncio .create_task (get_task ())
3706+
3707+ @assert_eventually (times = 600 )
3708+ async def wait_for_stuck_on_send ():
3709+ clients = await async_client .client_list ()
3710+ logging .info ("wait_for_stuck_on_send clients: %s" , clients )
3711+ phase = next (
3712+ (client ["phase" ] for client in clients if client ["name" ] == "writer_test" ), None
3713+ )
3714+ assert phase == "send"
3715+
3716+ await wait_for_stuck_on_send ()
3717+
3718+ # with pytest.raises(redis.exceptions.ResponseError) as e:
3719+ try :
3720+ res = await replica_cl .execute_command ("REPLTAKEOVER 2" )
3721+ assert res != "OK"
3722+ except redis .exceptions .ResponseError as e :
3723+ pass
0 commit comments