@@ -108,7 +108,9 @@ all_tests() ->
108108     cancel_sync_queue ,
109109     basic_recover ,
110110     idempotent_recover ,
111-      vhost_with_quorum_queue_is_deleted 
111+      vhost_with_quorum_queue_is_deleted ,
112+      consume_redelivery_count ,
113+      subscribe_redelivery_count 
112114    ].
113115
114116% % -------------------------------------------------------------------
@@ -1633,6 +1635,99 @@ basic_recover(Config) ->
16331635    amqp_channel :cast (Ch , # 'basic.recover' {requeue  =  true }),
16341636    wait_for_messages_ready (Servers , RaName , 1 ),
16351637    wait_for_messages_pending_ack (Servers , RaName , 0 ).
1638+ 
1639+ subscribe_redelivery_count (Config ) -> 
1640+     [Server  | _ ] =  Servers  =  rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1641+ 
1642+     Ch  =  rabbit_ct_client_helpers :open_channel (Config , Server ),
1643+     QQ  =  ? config (queue_name , Config ),
1644+     ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1645+                  declare (Ch , QQ , [{<<" x-queue-type" longstr , <<" quorum" 
1646+ 
1647+     RaName  =  ra_name (QQ ),
1648+     publish (Ch , QQ ),
1649+     wait_for_messages_ready (Servers , RaName , 1 ),
1650+     wait_for_messages_pending_ack (Servers , RaName , 0 ),
1651+     subscribe (Ch , QQ , false ),
1652+ 
1653+     DTag  =  <<" x-redelivery-count" 
1654+     receive 
1655+         {# 'basic.deliver' {delivery_tag  =  DeliveryTag ,
1656+                           redelivered   =  false },
1657+          # amqp_msg {props  =  # 'P_basic' {headers  =  H0 }}} ->
1658+             ? assertMatch ({DTag , _ , 0 }, rabbit_basic :header (DTag , H0 )),
1659+             amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag  =  DeliveryTag ,
1660+                                                 multiple      =  false ,
1661+                                                 requeue       =  true })
1662+     end ,
1663+ 
1664+     receive 
1665+         {# 'basic.deliver' {delivery_tag  =  DeliveryTag1 ,
1666+                           redelivered   =  true },
1667+          # amqp_msg {props  =  # 'P_basic' {headers  =  H1 }}} ->
1668+             ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1669+             amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag  =  DeliveryTag1 ,
1670+                                                 multiple      =  false ,
1671+                                                 requeue       =  true })
1672+     end ,
1673+ 
1674+     receive 
1675+         {# 'basic.deliver' {delivery_tag  =  DeliveryTag2 ,
1676+                           redelivered   =  true },
1677+          # amqp_msg {props  =  # 'P_basic' {headers  =  H2 }}} ->
1678+             ? assertMatch ({DTag , _ , 2 }, rabbit_basic :header (DTag , H2 )),
1679+             amqp_channel :cast (Ch , # 'basic.ack' {delivery_tag  =  DeliveryTag2 ,
1680+                                                multiple      =  false }),
1681+             wait_for_messages_ready (Servers , RaName , 0 ),
1682+             wait_for_messages_pending_ack (Servers , RaName , 0 )
1683+     end .
1684+ 
1685+ consume_redelivery_count (Config ) -> 
1686+     [Server  | _ ] =  Servers  =  rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1687+ 
1688+     Ch  =  rabbit_ct_client_helpers :open_channel (Config , Server ),
1689+     QQ  =  ? config (queue_name , Config ),
1690+     ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1691+                  declare (Ch , QQ , [{<<" x-queue-type" longstr , <<" quorum" 
1692+ 
1693+     RaName  =  ra_name (QQ ),
1694+     publish (Ch , QQ ),
1695+     wait_for_messages_ready (Servers , RaName , 1 ),
1696+     wait_for_messages_pending_ack (Servers , RaName , 0 ),
1697+ 
1698+     DTag  =  <<" x-redelivery-count" 
1699+ 
1700+     {# 'basic.get_ok' {delivery_tag  =  DeliveryTag ,
1701+                      redelivered  =  false },
1702+      # amqp_msg {props  =  # 'P_basic' {headers  =  H0 }}} = 
1703+         amqp_channel :call (Ch , # 'basic.get' {queue  =  QQ ,
1704+                                            no_ack  =  false }),
1705+     ? assertMatch ({DTag , _ , 0 }, rabbit_basic :header (DTag , H0 )),
1706+     amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag  =  DeliveryTag ,
1707+                                         multiple      =  false ,
1708+                                         requeue       =  true }),
1709+ 
1710+     {# 'basic.get_ok' {delivery_tag  =  DeliveryTag1 ,
1711+                      redelivered  =  true },
1712+      # amqp_msg {props  =  # 'P_basic' {headers  =  H1 }}} = 
1713+         amqp_channel :call (Ch , # 'basic.get' {queue  =  QQ ,
1714+                                            no_ack  =  false }),
1715+     ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1716+     amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag  =  DeliveryTag1 ,
1717+                                         multiple      =  false ,
1718+                                         requeue       =  true }),
1719+ 
1720+     {# 'basic.get_ok' {delivery_tag  =  DeliveryTag2 ,
1721+                      redelivered  =  true },
1722+      # amqp_msg {props  =  # 'P_basic' {headers  =  H2 }}} = 
1723+         amqp_channel :call (Ch , # 'basic.get' {queue  =  QQ ,
1724+                                            no_ack  =  false }),
1725+     ? assertMatch ({DTag , _ , 2 }, rabbit_basic :header (DTag , H2 )),
1726+     amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag  =  DeliveryTag2 ,
1727+                                         multiple      =  false ,
1728+                                         requeue       =  true }).
1729+ 
1730+ 
16361731% %----------------------------------------------------------------------------
16371732
16381733declare (Ch , Q ) -> 
0 commit comments