@@ -188,7 +188,8 @@ all_tests() ->
188188 priority_queue_fifo ,
189189 priority_queue_2_1_ratio ,
190190 requeue_multiple_true ,
191- requeue_multiple_false
191+ requeue_multiple_false ,
192+ leader_health_check
192193 ].
193194
194195memory_tests () ->
@@ -4106,6 +4107,129 @@ amqpl_headers(Config) ->
41064107 ok = amqp_channel :cast (Ch , # 'basic.ack' {delivery_tag = DeliveryTag ,
41074108 multiple = true }).
41084109
4110+ leader_health_check (Config ) ->
4111+ VHost1 = <<" vhost1" >>,
4112+ VHost2 = <<" vhost2" >>,
4113+
4114+ set_up_vhost (Config , VHost1 ),
4115+ set_up_vhost (Config , VHost2 ),
4116+
4117+ % % check empty vhost
4118+ ? assertEqual ([],
4119+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4120+ [<<" .*" >>, VHost1 ])),
4121+ ? assertEqual ([],
4122+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4123+ [<<" .*" >>, across_all_vhosts ])),
4124+
4125+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , 0 , VHost1 ),
4126+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
4127+
4128+ Conn2 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , 0 , VHost2 ),
4129+ {ok , Ch2 } = amqp_connection :open_channel (Conn2 ),
4130+
4131+ Qs1 = [<<" Q.1" >>, <<" Q.2" >>, <<" Q.3" >>],
4132+ Qs2 = [<<" Q.4" >>, <<" Q.5" >>, <<" Q.6" >>],
4133+
4134+ % % in vhost1
4135+ [? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
4136+ declare (Ch1 , Q , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]))
4137+ || Q <- Qs1 ],
4138+
4139+ % % in vhost2
4140+ [? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
4141+ declare (Ch2 , Q , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]))
4142+ || Q <- Qs2 ],
4143+
4144+ % % test sucessful health checks in vhost1, vhost2, across_all_vhosts
4145+ ? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4146+ [<<" .*" >>, VHost1 ])),
4147+ ? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4148+ [<<" Q.*" >>, VHost1 ])),
4149+ [? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4150+ [Q , VHost1 ])) || Q <- Qs1 ],
4151+
4152+ ? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4153+ [<<" .*" >>, VHost2 ])),
4154+ ? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4155+ [<<" Q.*" >>, VHost2 ])),
4156+ [? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4157+ [Q , VHost2 ])) || Q <- Qs2 ],
4158+
4159+ ? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4160+ [<<" .*" >>, across_all_vhosts ])),
4161+ ? assertEqual ([], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4162+ [<<" Q.*" >>, across_all_vhosts ])),
4163+
4164+ % % clear leaderboard
4165+ Qs = rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_amqqueue , list , []),
4166+
4167+ [{_Q1_ClusterName , _Q1Res },
4168+ {_Q2_ClusterName , _Q2Res },
4169+ {_Q3_ClusterName , _Q3Res },
4170+ {_Q4_ClusterName , _Q4Res },
4171+ {_Q5_ClusterName , _Q5Res },
4172+ {_Q6_ClusterName , _Q6Res }] = QQ_Clusters =
4173+ lists :usort (
4174+ [begin
4175+ {ClusterName , _ } = amqqueue :get_pid (Q ),
4176+ {ClusterName , amqqueue :get_name (Q )}
4177+ end
4178+ || Q <- Qs , amqqueue :get_type (Q ) == rabbit_quorum_queue ]),
4179+
4180+ [Q1Data , Q2Data , Q3Data , Q4Data , Q5Data , Q6Data ] = QQ_Data =
4181+ [begin
4182+ rabbit_ct_broker_helpers :rpc (Config , 0 , ra_leaderboard , clear , [Q_ClusterName ]),
4183+ _QData = amqqueue :to_printable (Q_Res , rabbit_quorum_queue )
4184+ end
4185+ || {Q_ClusterName , Q_Res } <- QQ_Clusters ],
4186+
4187+ % % test failed health checks in vhost1, vhost2, across_all_vhosts
4188+ ? assertEqual ([Q1Data ], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4189+ [<<" Q.1" >>, VHost1 ])),
4190+ ? assertEqual ([Q2Data ], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4191+ [<<" Q.2" >>, VHost1 ])),
4192+ ? assertEqual ([Q3Data ], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4193+ [<<" Q.3" >>, VHost1 ])),
4194+ ? assertEqual ([Q1Data , Q2Data , Q3Data ],
4195+ lists :usort (rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4196+ [<<" .*" >>, VHost1 ]))),
4197+ ? assertEqual ([Q1Data , Q2Data , Q3Data ],
4198+ lists :usort (rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4199+ [<<" Q.*" >>, VHost1 ]))),
4200+
4201+ ? assertEqual ([Q4Data ], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4202+ [<<" Q.4" >>, VHost2 ])),
4203+ ? assertEqual ([Q5Data ], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4204+ [<<" Q.5" >>, VHost2 ])),
4205+ ? assertEqual ([Q6Data ], rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4206+ [<<" Q.6" >>, VHost2 ])),
4207+ ? assertEqual ([Q4Data , Q5Data , Q6Data ],
4208+ lists :usort (rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4209+ [<<" .*" >>, VHost2 ]))),
4210+ ? assertEqual ([Q4Data , Q5Data , Q6Data ],
4211+ lists :usort (rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4212+ [<<" Q.*" >>, VHost2 ]))),
4213+
4214+ ? assertEqual (QQ_Data ,
4215+ lists :usort (rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4216+ [<<" Q.*" >>, across_all_vhosts ]))),
4217+ ? assertEqual (QQ_Data ,
4218+ lists :usort (rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue , leader_health_check ,
4219+ [<<" Q.*" >>, across_all_vhosts ]))),
4220+
4221+ % % cleanup
4222+ [? assertMatch (# 'queue.delete_ok' {},
4223+ amqp_channel :call (Ch1 , # 'queue.delete' {queue = Q }))
4224+ || Q <- Qs1 ],
4225+ [? assertMatch (# 'queue.delete_ok' {},
4226+ amqp_channel :call (Ch1 , # 'queue.delete' {queue = Q }))
4227+ || Q <- Qs2 ],
4228+
4229+ amqp_connection :close (Conn1 ),
4230+ amqp_connection :close (Conn2 ).
4231+
4232+
41094233leader_locator_client_local (Config ) ->
41104234 [Server1 | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
41114235 Q = ? config (queue_name , Config ),
@@ -4426,6 +4550,11 @@ declare_passive(Ch, Q, Args) ->
44264550 auto_delete = false ,
44274551 passive = true ,
44284552 arguments = Args }).
4553+
4554+ set_up_vhost (Config , VHost ) ->
4555+ rabbit_ct_broker_helpers :add_vhost (Config , VHost ),
4556+ rabbit_ct_broker_helpers :set_full_permissions (Config , <<" guest" >>, VHost ).
4557+
44294558assert_queue_type (Server , Q , Expected ) ->
44304559 assert_queue_type (Server , <<" /" >>, Q , Expected ).
44314560
0 commit comments