@@ -75,7 +75,8 @@ groups() ->
7575 purge_stream
7676 ]},
7777 {cluster_size_3 , [shuffle ],
78- [classic_queue_stopped
78+ [classic_queue_stopped ,
79+ queue_topology
7980 ]}
8081 ].
8182
@@ -563,19 +564,15 @@ declare_exchange_inequivalent_fields(Config) ->
563564 ok = cleanup (Init ).
564565
565566classic_queue_stopped (Config ) ->
566- OpnConf2 = connection_config (Config , 2 ),
567- {ok , Conn2 } = amqp10_client :open_connection (OpnConf2 ),
568- {ok , Session2 } = amqp10_client :begin_session_sync (Conn2 ),
569- {ok , LinkPair2 } = rabbitmq_amqp_client :attach_management_link_pair_sync (Session2 , <<" my link pair" >>),
570-
567+ Init2 = {_ , _ , LinkPair2 } = init (Config , 2 ),
571568 QName = <<" 👌" /utf8 >>,
572569 {ok , #{durable := true ,
573570 type := <<" classic" >>}} = rabbitmq_amqp_client :declare_queue (LinkPair2 , QName , #{}),
574- ok = amqp10_client : close_connection ( Conn2 ),
571+ ok = cleanup ( Init2 ),
575572 ok = rabbit_ct_broker_helpers :stop_node (Config , 2 ),
576573 % % Classic queue is now stopped.
577574
578- Init = {_ , _ , LinkPair0 } = init (Config ),
575+ Init0 = {_ , _ , LinkPair0 } = init (Config ),
579576 {error , Resp0 } = rabbitmq_amqp_client :declare_queue (LinkPair0 , QName , #{}),
580577 ? assertMatch (#{subject := <<" 400" >>}, amqp10_msg :properties (Resp0 )),
581578 ExpectedResponseBody = # 'v1_0.amqp_value' {
@@ -591,7 +588,7 @@ classic_queue_stopped(Config) ->
591588
592589 ok = rabbit_ct_broker_helpers :start_node (Config , 2 ),
593590 {ok , #{}} = rabbitmq_amqp_client :delete_queue (LinkPair0 , QName ),
594- ok = cleanup (Init ).
591+ ok = cleanup (Init0 ).
595592
596593delete_default_exchange (Config ) ->
597594 Init = {_ , _ , LinkPair } = init (Config ),
@@ -731,8 +728,69 @@ purge_stream(Config) ->
731728 {ok , #{}} = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
732729 ok = cleanup (Init ).
733730
731+ queue_topology (Config ) ->
732+ NodeNames = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
733+ Nodes = [N0 , N1 , N2 ] = lists :map (fun erlang :atom_to_binary /1 , NodeNames ),
734+ Init0 = {_ , _ , LinkPair0 } = init (Config , 0 ),
735+
736+ CQName = <<" my classic queue" >>,
737+ QQName = <<" my quorum queue" >>,
738+ SQName = <<" my stream queue" >>,
739+
740+ CQProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" classic" >>}}},
741+ QQProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>}}},
742+ SQProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" stream" >>}}},
743+
744+ {ok , CQInfo0 } = rabbitmq_amqp_client :declare_queue (LinkPair0 , CQName , CQProps ),
745+ {ok , QQInfo0 } = rabbitmq_amqp_client :declare_queue (LinkPair0 , QQName , QQProps ),
746+ {ok , SQInfo0 } = rabbitmq_amqp_client :declare_queue (LinkPair0 , SQName , SQProps ),
747+
748+ % % The default queue leader strategy is client-local.
749+ ? assertEqual ({ok , N0 }, maps :find (leader , CQInfo0 )),
750+ ? assertEqual ({ok , N0 }, maps :find (leader , QQInfo0 )),
751+ ? assertEqual ({ok , N0 }, maps :find (leader , SQInfo0 )),
752+
753+ ? assertEqual ({ok , [N0 ]}, maps :find (replicas , CQInfo0 )),
754+ {ok , QQReplicas0 } = maps :find (replicas , QQInfo0 ),
755+ ? assertEqual (Nodes , lists :usort (QQReplicas0 )),
756+ {ok , SQReplicas0 } = maps :find (replicas , SQInfo0 ),
757+ ? assertEqual (Nodes , lists :usort (SQReplicas0 )),
758+
759+ ok = cleanup (Init0 ),
760+ ok = rabbit_ct_broker_helpers :stop_node (Config , 0 ),
761+
762+ Init2 = {_ , _ , LinkPair2 } = init (Config , 2 ),
763+ {ok , QQInfo2 } = rabbitmq_amqp_client :get_queue (LinkPair2 , QQName ),
764+ {ok , SQInfo2 } = rabbitmq_amqp_client :get_queue (LinkPair2 , SQName ),
765+
766+ case maps :get (leader , QQInfo2 ) of
767+ N1 -> ok ;
768+ N2 -> ok ;
769+ Other0 -> ct :fail ({? LINE , Other0 })
770+ end ,
771+ case maps :get (leader , SQInfo2 ) of
772+ N1 -> ok ;
773+ N2 -> ok ;
774+ Other1 -> ct :fail ({? LINE , Other1 })
775+ end ,
776+
777+ % % Replicas should include both online and offline replicas.
778+ {ok , QQReplicas2 } = maps :find (replicas , QQInfo2 ),
779+ ? assertEqual (Nodes , lists :usort (QQReplicas2 )),
780+ {ok , SQReplicas2 } = maps :find (replicas , SQInfo2 ),
781+ ? assertEqual (Nodes , lists :usort (SQReplicas2 )),
782+
783+ ok = rabbit_ct_broker_helpers :start_node (Config , 0 ),
784+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair2 , CQName ),
785+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair2 , QQName ),
786+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair2 , SQName ),
787+ ok = cleanup (Init2 ).
788+
734789init (Config ) ->
735- OpnConf = connection_config (Config ),
790+ init (Config , 0 ).
791+
792+ init (Config , Node ) ->
793+ OpnConf = connection_config (Config , Node ),
736794 {ok , Connection } = amqp10_client :open_connection (OpnConf ),
737795 {ok , Session } = amqp10_client :begin_session_sync (Connection ),
738796 {ok , LinkPair } = rabbitmq_amqp_client :attach_management_link_pair_sync (Session , <<" my link pair" >>),
0 commit comments