@@ -94,7 +94,8 @@ groups() ->
9494 single_active_consumer_priority_take_over ,
9595 single_active_consumer_priority ,
9696 force_shrink_member_to_current_member ,
97- force_all_queues_shrink_member_to_current_member
97+ force_all_queues_shrink_member_to_current_member ,
98+ force_vhost_queues_shrink_member_to_current_member
9899 ]
99100 ++ all_tests ()},
100101 {cluster_size_5 , [], [start_queue ,
@@ -1232,6 +1233,72 @@ force_all_queues_shrink_member_to_current_member(Config) ->
12321233 ? assertEqual (3 , length (Nodes0 ))
12331234 end || Q <- QQs ].
12341235
1236+ force_vhost_queues_shrink_member_to_current_member (Config ) ->
1237+ [Server0 , Server1 , Server2 ] =
1238+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1239+
1240+ Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1241+ QQ = ? config (queue_name , Config ),
1242+ AQ = ? config (alt_queue_name , Config ),
1243+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1244+ declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1245+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1246+ declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1247+
1248+ QQs = [QQ , AQ ],
1249+
1250+ VHost1 = <<" /" >>,
1251+ VHost2 = <<" another-vhost" >>,
1252+ VHosts = [VHost1 , VHost2 ],
1253+
1254+ User = ? config (rmq_username , Config ),
1255+ ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1256+ ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1257+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1258+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1259+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1260+ declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1261+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1262+ declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1263+
1264+ [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
1265+
1266+ [begin
1267+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1268+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1269+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1270+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1271+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1272+ ? assertEqual (3 , length (Nodes0 ))
1273+ end || Q <- QQs , VHost <- VHosts ],
1274+
1275+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1276+ force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
1277+
1278+ [begin
1279+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1280+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1281+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1282+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1283+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1284+ case VHost of
1285+ VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1286+ VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1287+ end
1288+ end || Q <- QQs , VHost <- VHosts ],
1289+
1290+ % % grow queues back to all nodes in VHost2 only
1291+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1292+
1293+ [begin
1294+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1295+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1296+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1297+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1298+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1299+ ? assertEqual (3 , length (Nodes0 ))
1300+ end || Q <- QQs , VHost <- VHosts ].
1301+
12351302priority_queue_fifo (Config ) ->
12361303 % % testing: if hi priority messages are published before lo priority
12371304 % % messages they are always consumed first (fifo)
0 commit comments