@@ -64,7 +64,8 @@ shared() ->
6464 split_transfer ,
6565 transfer_unsettled ,
6666 subscribe ,
67- subscribe_with_auto_flow ,
67+ subscribe_with_auto_flow_settled ,
68+ subscribe_with_auto_flow_unsettled ,
6869 outgoing_heartbeat ,
6970 roundtrip_large_messages ,
7071 transfer_id_vs_delivery_id
@@ -502,7 +503,7 @@ transfer_unsettled(Config) ->
502503subscribe (Config ) ->
503504 Hostname = ? config (rmq_hostname , Config ),
504505 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
505- QueueName = << " test-sub " >> ,
506+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
506507 {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
507508 {ok , Session } = amqp10_client :begin_session (Connection ),
508509 {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
@@ -530,104 +531,121 @@ subscribe(Config) ->
530531 ok = amqp10_client :end_session (Session ),
531532 ok = amqp10_client :close_connection (Connection ).
532533
533- subscribe_with_auto_flow (Config ) ->
534+ subscribe_with_auto_flow_settled (Config ) ->
535+ SenderSettleMode = settled ,
534536 Hostname = ? config (rmq_hostname , Config ),
535537 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
536- QueueName = << " test-sub " >> ,
538+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
537539 {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
538540 {ok , Session } = amqp10_client :begin_session (Connection ),
539541 {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
540542 <<" sub-sender" >>,
541543 QueueName ),
542544 await_link (Sender , credited , link_credit_timeout ),
543545
544- _ = publish_messages (Sender , <<" banana" >>, 20 ),
545- % % Use sender settle mode 'settled'.
546- {ok , R1 } = amqp10_client :attach_receiver_link (
547- Session , <<" sub-receiver-1" >>, QueueName , settled ),
548- await_link (R1 , attached , attached_timeout ),
549- ok = amqp10_client :flow_link_credit (R1 , 5 , 2 ),
550- ? assertEqual (20 , count_received_messages (R1 )),
551- ok = amqp10_client :detach_link (R1 ),
546+ publish_messages (Sender , <<" banana" >>, 20 ),
547+ {ok , Receiver } = amqp10_client :attach_receiver_link (
548+ Session , <<" sub-receiver" >>, QueueName , SenderSettleMode ),
549+ await_link (Receiver , attached , attached_timeout ),
550+
551+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
552+ ? assertEqual (20 , count_received_messages (Receiver )),
552553
553- _ = publish_messages (Sender , <<" banana" >>, 30 ),
554+ ok = amqp10_client :detach_link (Receiver ),
555+ ok = amqp10_client :detach_link (Sender ),
556+ ok = amqp10_client :end_session (Session ),
557+ ok = amqp10_client :close_connection (Connection ).
558+
559+ subscribe_with_auto_flow_unsettled (Config ) ->
560+ SenderSettleMode = unsettled ,
561+ Hostname = ? config (rmq_hostname , Config ),
562+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
563+ QueueName = atom_to_binary (? FUNCTION_NAME ),
564+ {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
565+ {ok , Session } = amqp10_client :begin_session (Connection ),
566+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
567+ <<" sub-sender" >>,
568+ QueueName ),
569+ await_link (Sender , credited , link_credit_timeout ),
570+
571+ _ = publish_messages (Sender , <<" 1-" >>, 30 ),
554572 % % Use sender settle mode 'unsettled'.
555573 % % This should require us to manually settle message in order to receive more messages.
556- {ok , R2 } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , unsettled ),
557- await_link (R2 , attached , attached_timeout ),
558- ok = amqp10_client :flow_link_credit (R2 , 5 , 2 ),
574+ {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , SenderSettleMode ),
575+ await_link (Receiver , attached , attached_timeout ),
576+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
559577 % % We should receive exactly 5 messages.
560- [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (R2 , 5 ),
561- ok = assert_no_message (R2 ),
578+ [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (Receiver , 5 ),
579+ ok = assert_no_message (Receiver ),
562580
563581 % % Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
564582 % % Therefore, the client should not yet grant more credits to the sender.
565583 ok = amqp10_client_session :disposition (
566- R2 , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
567- ok = assert_no_message (R2 ),
584+ Receiver , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
585+ ok = assert_no_message (Receiver ),
568586
569587 % % When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
570588 % % the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
571589 % % Therefore, the client should grant 5 credits to the sender.
572590 % % Therefore, we should receive 5 more messages.
573- ok = amqp10_client :accept_msg (R2 , M5 ),
574- [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (R2 , 5 ),
575- ok = assert_no_message (R2 ),
591+ ok = amqp10_client :accept_msg (Receiver , M5 ),
592+ [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (Receiver , 5 ),
593+ ok = assert_no_message (Receiver ),
576594
577595 % % It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
578596 % % Settling all in flight messages should cause us to receive exactly 5 more messages.
579597 ok = amqp10_client_session :disposition (
580- R2 , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
581- [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (R2 , 5 ),
582- ok = assert_no_message (R2 ),
598+ Receiver , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
599+ [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (Receiver , 5 ),
600+ ok = assert_no_message (Receiver ),
583601
584602 % % Dynamically decrease link credit.
585603 % % Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
586- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
587- [M16 , _M17 , M18 ] = receive_messages (R2 , 3 ),
588- ok = assert_no_message (R2 ),
604+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
605+ [M16 , _M17 , M18 ] = receive_messages (Receiver , 3 ),
606+ ok = assert_no_message (Receiver ),
589607
590608 ok = amqp10_client_session :disposition (
591- R2 , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
609+ Receiver , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
592610 % % However, the RenewWhenBelow=3 still refers to all unsettled messages.
593611 % % Right now we have 3 messages (M16, M17, M18) unsettled.
594- ok = assert_no_message (R2 ),
612+ ok = assert_no_message (Receiver ),
595613
596614 % % Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
597615 % % in 3 new messages to be received.
598- ok = amqp10_client :accept_msg (R2 , M18 ),
599- [_M19 , _M20 , _M21 ] = receive_messages (R2 , 3 ),
600- ok = assert_no_message (R2 ),
616+ ok = amqp10_client :accept_msg (Receiver , M18 ),
617+ [_M19 , _M20 , _M21 ] = receive_messages (Receiver , 3 ),
618+ ok = assert_no_message (Receiver ),
601619
602- ok = amqp10_client :flow_link_credit (R2 , 3 , never , true ),
603- [_M22 , _M23 , M24 ] = receive_messages (R2 , 3 ),
604- ok = assert_no_message (R2 ),
620+ ok = amqp10_client :flow_link_credit (Receiver , 3 , never , true ),
621+ [_M22 , _M23 , M24 ] = receive_messages (Receiver , 3 ),
622+ ok = assert_no_message (Receiver ),
605623
606624 % % Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
607625 ok = amqp10_client_session :disposition (
608- R2 , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
609- ok = assert_no_message (R2 ),
626+ Receiver , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
627+ ok = assert_no_message (Receiver ),
610628
611- ok = amqp10_client :flow_link_credit (R2 , 2 , never , false ),
612- [M25 , _M26 ] = receive_messages (R2 , 2 ),
613- ok = assert_no_message (R2 ),
629+ ok = amqp10_client :flow_link_credit (Receiver , 2 , never , false ),
630+ [M25 , _M26 ] = receive_messages (Receiver , 2 ),
631+ ok = assert_no_message (Receiver ),
614632
615- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
616- [_M27 , _M28 , M29 ] = receive_messages (R2 , 3 ),
617- ok = assert_no_message (R2 ),
633+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
634+ [_M27 , _M28 , M29 ] = receive_messages (Receiver , 3 ),
635+ ok = assert_no_message (Receiver ),
618636
619637 ok = amqp10_client_session :disposition (
620- R2 , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
621- [M30 ] = receive_messages (R2 , 1 ),
622- ok = assert_no_message (R2 ),
623- ok = amqp10_client :accept_msg (R2 , M30 ),
638+ Receiver , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
639+ [M30 ] = receive_messages (Receiver , 1 ),
640+ ok = assert_no_message (Receiver ),
641+ ok = amqp10_client :accept_msg (Receiver , M30 ),
624642 % % The sender queue is empty now.
625- ok = assert_no_message (R2 ),
643+ ok = assert_no_message (Receiver ),
626644
627- ok = amqp10_client :flow_link_credit (R2 , 3 , 1 ),
628- _ = publish_messages (Sender , <<" banana " >>, 1 ),
629- [M31 ] = receive_messages (R2 , 1 ),
630- ok = amqp10_client :accept_msg (R2 , M31 ),
645+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 1 ),
646+ _ = publish_messages (Sender , <<" 2- " >>, 1 ),
647+ [M31 ] = receive_messages (Receiver , 1 ),
648+ ok = amqp10_client :accept_msg (Receiver , M31 ),
631649
632650 % % Since function flow_link_credit/3 documents
633651 % % "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
@@ -637,24 +655,25 @@ subscribe_with_auto_flow(Config) ->
637655 % % remaining link credit (2) and unsettled messages (0) is 2.
638656 % %
639657 % % Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
640- _ = publish_messages (Sender , <<" banana " >>, 5 ),
641- [M32 , M33 ] = receive_messages (R2 , 2 ),
642- ok = assert_no_message (R2 ),
658+ _ = publish_messages (Sender , <<" 3- " >>, 5 ),
659+ [M32 , M33 ] = receive_messages (Receiver , 2 ),
660+ ok = assert_no_message (Receiver ),
643661
644662 % % When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
645663 % % falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
646- ok = amqp10_client :accept_msg (R2 , M32 ),
647- ok = assert_no_message (R2 ),
648- ok = amqp10_client :accept_msg (R2 , M33 ),
649-
650- [M35 , M36 , M37 ] = receive_messages (R2 , 3 ),
651- ok = amqp10_client :accept_msg (R2 , M35 ),
652- ok = amqp10_client :accept_msg (R2 , M36 ),
653- ok = amqp10_client :accept_msg (R2 , M37 ),
664+ ok = amqp10_client :accept_msg (Receiver , M32 ),
665+ ok = assert_no_message (Receiver ),
666+ ok = amqp10_client :accept_msg (Receiver , M33 ),
667+
668+ [M35 , M36 , M37 ] = receive_messages (Receiver , 3 ),
669+ ok = amqp10_client :accept_msg (Receiver , M35 ),
670+ ok = amqp10_client :accept_msg (Receiver , M36 ),
671+ ok = amqp10_client :accept_msg (Receiver , M37 ),
654672 % % The sender queue is empty now.
655- ok = assert_no_message (R2 ),
673+ ok = assert_no_message (Receiver ),
656674
657- ok = amqp10_client :detach_link (R2 ),
675+ ok = amqp10_client :detach_link (Receiver ),
676+ ok = amqp10_client :detach_link (Sender ),
658677 ok = amqp10_client :end_session (Session ),
659678 ok = amqp10_client :close_connection (Connection ).
660679
@@ -817,18 +836,18 @@ await_link(Who, What, Err) ->
817836 ok ;
818837 {amqp10_event , {link , Who0 , {detached , Why }}}
819838 when Who0 =:= Who ->
820- exit (Why )
839+ ct : fail (Why )
821840 after 5000 ->
822841 flush (),
823- exit (Err )
842+ ct : fail (Err )
824843 end .
825844
826- publish_messages (Sender , Data , Num ) ->
845+ publish_messages (Sender , BodyPrefix , Num ) ->
827846 [begin
828- Tag = integer_to_binary (T ),
829- Msg = amqp10_msg :new (Tag , Data , false ),
830- ok = amqp10_client :send_msg (Sender , Msg ),
831- ok = await_disposition (Tag )
847+ Tag = integer_to_binary (T ),
848+ Msg = amqp10_msg :new (Tag , << BodyPrefix / binary , Tag / binary >> , false ),
849+ ok = amqp10_client :send_msg (Sender , Msg ),
850+ ok = await_disposition (Tag )
832851 end || T <- lists :seq (1 , Num )].
833852
834853await_disposition (DeliveryTag ) ->
@@ -847,7 +866,7 @@ count_received_messages0(Receiver, Count) ->
847866 receive
848867 {amqp10_msg , Receiver , _Msg } ->
849868 count_received_messages0 (Receiver , Count + 1 )
850- after 200 ->
869+ after 500 ->
851870 Count
852871 end .
853872
@@ -861,7 +880,15 @@ receive_messages0(Receiver, N, Acc) ->
861880 {amqp10_msg , Receiver , Msg } ->
862881 receive_messages0 (Receiver , N - 1 , [Msg | Acc ])
863882 after 5000 ->
864- ct :fail ({timeout , {num_received , length (Acc )}, {num_missing , N }})
883+ LastReceivedMsg = case Acc of
884+ [] -> none ;
885+ [M | _ ] -> M
886+ end ,
887+ ct :fail ({timeout ,
888+ {num_received , length (Acc )},
889+ {num_missing , N },
890+ {last_received_msg , LastReceivedMsg }
891+ })
865892 end .
866893
867894assert_no_message (Receiver ) ->
0 commit comments