@@ -45,7 +45,7 @@ def __init__(self, addr, redis_addr, delay: float):
4545
4646 async def start (self ):
4747 # test that we can connect to redis
48- with async_timeout (2 ):
48+ async with async_timeout (2 ):
4949 redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
5050 redis_writer .close ()
5151 self .server = await asyncio .start_server (self .handle , * self .addr )
@@ -67,15 +67,23 @@ def override(self, delay: float = 0.0):
6767 async def handle (self , reader , writer ):
6868 # establish connection to redis
6969 redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
70- pipe1 = asyncio .create_task (
71- pipe (reader , redis_writer , self , "to redis:" , self .send_event )
72- )
73- pipe2 = asyncio .create_task (pipe (redis_reader , writer , self , "from redis:" ))
74- await asyncio .gather (pipe1 , pipe2 )
70+ try :
71+ pipe1 = asyncio .create_task (
72+ pipe (reader , redis_writer , self , "to redis:" , self .send_event )
73+ )
74+ pipe2 = asyncio .create_task (pipe (redis_reader , writer , self , "from redis:" ))
75+ await asyncio .gather (pipe1 , pipe2 )
76+ finally :
77+ redis_writer .close ()
78+ redis_reader .close ()
7579
7680 async def stop (self ):
7781 # clean up enough so that we can reuse the looper
7882 self .ROUTINE .cancel ()
83+ try :
84+ await self .ROUTINE
85+ except asyncio .CancelledError :
86+ pass
7987 loop = self .server .get_loop ()
8088 await loop .shutdown_asyncgens ()
8189
@@ -181,7 +189,7 @@ async def test_cluster(request, redis_addr):
181189 remap .append ({"from_port" : port , "to_port" : remapped })
182190 forward_addr = redis_addr [0 ], port
183191 proxy = DelayProxy (
184- addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = delay
192+ addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = 0
185193 )
186194 proxies .append (proxy )
187195
@@ -198,30 +206,29 @@ async def wait_for_send():
198206 )
199207
200208 @contextlib .contextmanager
201- def override ():
209+ def override (delay : int = 0 ):
202210 with contextlib .ExitStack () as stack :
203211 for p in proxies :
204- stack .enter_context (p .override ())
212+ stack .enter_context (p .override (delay = delay ))
205213 yield
206214
207- with override ():
208- r = RedisCluster .from_url (
209- f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap
210- )
215+ with contextlib .closing (
216+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap )
217+ ) as r :
211218 await r .initialize ()
212219 await r .set ("foo" , "foo" )
213220 await r .set ("bar" , "bar" )
214221
215- all_clear ()
216- t = asyncio .create_task (r .get ("foo" ))
217- # cannot wait on the send event, we don't know which node will be used
218- await wait_for_send ()
219- await asyncio .sleep (delay )
220- t .cancel ()
221- with pytest .raises (asyncio .CancelledError ):
222- await t
222+ all_clear ()
223+ with override (delay = delay ):
224+ t = asyncio .create_task (r .get ("foo" ))
225+ # cannot wait on the send event, we don't know which node will be used
226+ await wait_for_send ()
227+ await asyncio .sleep (delay )
228+ t .cancel ()
229+ with pytest .raises (asyncio .CancelledError ):
230+ await t
223231
224- with override ():
225232 # try a number of requests to excercise all the connections
226233 async def doit ():
227234 assert await r .get ("bar" ) == b"bar"
0 commit comments