1717-include (" rabbit_stomp_headers.hrl" ).
1818
1919-define (QUEUE , <<" TestQueue" >>).
20+ -define (QUEUE_QQ , <<" TestQueueQQ" >>).
2021-define (DESTINATION , " /amq/queue/TestQueue" ).
22+ -define (DESTINATION_QQ , " /amq/queue/TestQueueQQ" ).
2123
2224all () ->
2325 [{group , version_to_group_name (V )} || V <- ? SUPPORTED_VERSIONS ].
@@ -28,6 +30,7 @@ groups() ->
2830 publish_unauthorized_error ,
2931 subscribe_error ,
3032 subscribe ,
33+ subscribe_with_x_priority ,
3134 unsubscribe_ack ,
3235 subscribe_ack ,
3336 send ,
@@ -161,6 +164,44 @@ subscribe(Config) ->
161164 {ok , _Client2 , _ , [<<" hello" >>]} = stomp_receive (Client1 , " MESSAGE" ),
162165 ok .
163166
167+ subscribe_with_x_priority (Config ) ->
168+ Version = ? config (version , Config ),
169+ StompPort = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_stomp ),
170+ Channel = ? config (amqp_channel , Config ),
171+ ClientA = ? config (stomp_client , Config ),
172+ # 'queue.declare_ok' {} =
173+ amqp_channel :call (Channel , # 'queue.declare' {queue = ? QUEUE_QQ ,
174+ durable = true ,
175+ arguments = [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
176+ {<<" x-single-active-consumer" >>, bool , true }
177+ ]}),
178+
179+ % % subscribe and wait for receipt
180+ rabbit_stomp_client :send (
181+ ClientA , " SUBSCRIBE" , [{" destination" , ? DESTINATION_QQ }, {" receipt" , " foo" }]),
182+ {ok , _ClientA1 , _ , _ } = stomp_receive (ClientA , " RECEIPT" ),
183+
184+ % % subscribe with a higher priority and wait for receipt
185+ {ok , ClientB } = rabbit_stomp_client :connect (Version , StompPort ),
186+ rabbit_stomp_client :send (
187+ ClientB , " SUBSCRIBE" , [{" destination" , ? DESTINATION_QQ },
188+ {" receipt" , " foo" },
189+ {" x-priority" , 10 }
190+ ]),
191+ {ok , ClientB1 , _ , _ } = stomp_receive (ClientB , " RECEIPT" ),
192+
193+ % % send from amqp
194+ Method = # 'basic.publish' {exchange = <<" " >>, routing_key = ? QUEUE_QQ },
195+
196+ amqp_channel :call (Channel , Method , # amqp_msg {props = # 'P_basic' {},
197+ payload = <<" hello" >>}),
198+
199+ % % ClientB should receive the message since it has a higher priority
200+ {ok , _ClientB2 , _ , [<<" hello" >>]} = stomp_receive (ClientB1 , " MESSAGE" ),
201+ # 'queue.delete_ok' {} =
202+ amqp_channel :call (Channel , # 'queue.delete' {queue = ? QUEUE_QQ }),
203+ ok .
204+
164205unsubscribe_ack (Config ) ->
165206 Channel = ? config (amqp_channel , Config ),
166207 Client = ? config (stomp_client , Config ),
0 commit comments