@@ -94,7 +94,8 @@ groups() ->
9494 single_active_consumer_priority_take_over ,
9595 single_active_consumer_priority ,
9696 force_shrink_member_to_current_member ,
97- force_all_queues_shrink_member_to_current_member
97+ force_all_queues_shrink_member_to_current_member ,
98+ force_vhost_queues_shrink_member_to_current_member
9899 ]
99100 ++ all_tests ()},
100101 {cluster_size_5 , [], [start_queue ,
@@ -1233,6 +1234,72 @@ force_all_queues_shrink_member_to_current_member(Config) ->
12331234 ? assertEqual (3 , length (Nodes0 ))
12341235 end || Q <- QQs ].
12351236
1237+ force_vhost_queues_shrink_member_to_current_member (Config ) ->
1238+ [Server0 , Server1 , Server2 ] =
1239+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1240+
1241+ Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1242+ QQ = ? config (queue_name , Config ),
1243+ AQ = ? config (alt_queue_name , Config ),
1244+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1245+ declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1246+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1247+ declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1248+
1249+ QQs = [QQ , AQ ],
1250+
1251+ VHost1 = <<" /" >>,
1252+ VHost2 = <<" another-vhost" >>,
1253+ VHosts = [VHost1 , VHost2 ],
1254+
1255+ User = ? config (rmq_username , Config ),
1256+ ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1257+ ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1258+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1259+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1260+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1261+ declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1262+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1263+ declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1264+
1265+ [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
1266+
1267+ [begin
1268+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1269+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1270+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1271+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1272+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1273+ ? assertEqual (3 , length (Nodes0 ))
1274+ end || Q <- QQs , VHost <- VHosts ],
1275+
1276+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1277+ force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
1278+
1279+ [begin
1280+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1281+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1282+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1283+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1284+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1285+ case VHost of
1286+ VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1287+ VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1288+ end
1289+ end || Q <- QQs , VHost <- VHosts ],
1290+
1291+ % % grow queues back to all nodes in VHost2 only
1292+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1293+
1294+ [begin
1295+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1296+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1297+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1298+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1299+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1300+ ? assertEqual (3 , length (Nodes0 ))
1301+ end || Q <- QQs , VHost <- VHosts ].
1302+
12361303priority_queue_fifo (Config ) ->
12371304 % % testing: if hi priority messages are published before lo priority
12381305 % % messages they are always consumed first (fifo)
0 commit comments