@@ -168,34 +168,66 @@ async def test_standalone_pipeline(delay, redis_addr):
168168@pytest .mark .onlycluster
169169async def test_cluster (request , redis_addr ):
170170
171- # TODO: This test actually doesn't work. Once the RedisCluster initializes,
172- # it will re-connect to the nodes as advertised by the cluster, bypassing
173- # the single DelayProxy we set up.
174- # to work around this, we really would nedd a port-remapper for the RedisCluster
171+ delay = 0.1
172+ cluster_port = 6372
173+ remap_base = 7372
174+ n_nodes = 6
175+
176+ remap = []
177+ proxies = []
178+ for i in range (n_nodes ):
179+ port = cluster_port + i
180+ remapped = remap_base + i
181+ remap .append ({"from_port" : port , "to_port" : remapped })
182+ forward_addr = redis_addr [0 ], port
183+ proxy = DelayProxy (
184+ addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = delay
185+ )
186+ proxies .append (proxy )
175187
176- redis_addr = redis_addr [0 ], 6372 # use the cluster port
177- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr , delay = 0.1 )
178- await dp .start ()
188+ # start proxies
189+ await asyncio .gather (* [p .start () for p in proxies ])
190+
191+ def all_clear ():
192+ for p in proxies :
193+ p .send_event .clear ()
179194
180- r = RedisCluster .from_url ("redis://127.0.0.1:5381" )
181- await r .initialize ()
182- with dp .override ():
195+ async def wait_for_send ():
196+ asyncio .wait (
197+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
198+ )
199+
200+ @contextlib .contextmanager
201+ def override ():
202+ with contextlib .ExitStack () as stack :
203+ for p in proxies :
204+ stack .enter_context (p .override ())
205+ yield
206+
207+ with override ():
208+ r = RedisCluster .from_url (
209+ f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap
210+ )
211+ await r .initialize ()
183212 await r .set ("foo" , "foo" )
184213 await r .set ("bar" , "bar" )
185214
186- dp . send_event . clear ()
215+ all_clear ()
187216 t = asyncio .create_task (r .get ("foo" ))
188- # await dp.send_event.wait() # won"t work, because DelayProxy is by-passed
189- await asyncio .sleep (0.05 )
217+ # cannot wait on the send event, we don't know which node will be used
218+ await wait_for_send ()
219+ await asyncio .sleep (delay )
190220 t .cancel ()
191- try :
221+ with pytest . raises ( asyncio . CancelledError ) :
192222 await t
193- except asyncio .CancelledError :
194- pass
195223
196- with dp .override ():
197- assert await r .get ("bar" ) == b"bar"
198- assert await r .ping ()
199- assert await r .get ("foo" ) == b"foo"
224+ with override ():
225+ # try a number of requests to excercise all the connections
226+ async def doit ():
227+ assert await r .get ("bar" ) == b"bar"
228+ assert await r .ping ()
229+ assert await r .get ("foo" ) == b"foo"
200230
201- await dp .stop ()
231+ await asyncio .gather (* [doit () for _ in range (10 )])
232+
233+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments