1717-compile ([export_all , nowarn_export_all ]).
1818
1919suite () ->
20- [{timetrap , {seconds , 120 }}].
20+ [{timetrap , {minutes , 4 }}].
2121
2222all () ->
2323 [
@@ -64,7 +64,8 @@ shared() ->
6464 split_transfer ,
6565 transfer_unsettled ,
6666 subscribe ,
67- subscribe_with_auto_flow ,
67+ subscribe_with_auto_flow_settled ,
68+ subscribe_with_auto_flow_unsettled ,
6869 outgoing_heartbeat ,
6970 roundtrip_large_messages ,
7071 transfer_id_vs_delivery_id
@@ -290,12 +291,15 @@ roundtrip_large_messages(Config) ->
290291 Hostname = ? config (rmq_hostname , Config ),
291292 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
292293 OpenConf = #{address => Hostname , port => Port , sasl => anon },
293- DataKb = crypto :strong_rand_bytes (1024 ),
294- roundtrip (OpenConf , DataKb ),
295- Data1Mb = binary :copy (DataKb , 1024 ),
296- roundtrip (OpenConf , Data1Mb ),
297- roundtrip (OpenConf , binary :copy (Data1Mb , 8 )),
298- ok = roundtrip (OpenConf , binary :copy (Data1Mb , 64 )).
294+
295+ DataKb = rand :bytes (1024 ),
296+ DataMb = rand :bytes (1024 * 1024 ),
297+ Data8Mb = rand :bytes (8 * 1024 * 1024 ),
298+ Data64Mb = rand :bytes (64 * 1024 * 1024 ),
299+ ok = roundtrip (OpenConf , DataKb ),
300+ ok = roundtrip (OpenConf , DataMb ),
301+ ok = roundtrip (OpenConf , Data8Mb ),
302+ ok = roundtrip (OpenConf , Data64Mb ).
299303
300304roundtrip (OpenConf ) ->
301305 roundtrip (OpenConf , <<" banana" >>).
@@ -321,9 +325,10 @@ roundtrip(OpenConf, Body) ->
321325 {error , link_not_found } = amqp10_client :detach_link (Sender ),
322326 {ok , Receiver } = amqp10_client :attach_receiver_link (
323327 Session , <<" banana-receiver" >>, <<" test1" >>, settled , unsettled_state ),
324- {ok , OutMsg } = amqp10_client :get_msg (Receiver , 60_000 * 4 ),
328+ {ok , OutMsg } = amqp10_client :get_msg (Receiver , 4 * 60_000 ),
325329 ok = amqp10_client :end_session (Session ),
326330 ok = amqp10_client :close_connection (Connection ),
331+
327332 % ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
328333 #{creation_time := Now } = amqp10_msg :properties (OutMsg ),
329334 #{<<" a_key" >> := <<" a_value" >>} = amqp10_msg :application_properties (OutMsg ),
@@ -502,7 +507,7 @@ transfer_unsettled(Config) ->
502507subscribe (Config ) ->
503508 Hostname = ? config (rmq_hostname , Config ),
504509 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
505- QueueName = << " test-sub " >> ,
510+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
506511 {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
507512 {ok , Session } = amqp10_client :begin_session (Connection ),
508513 {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
@@ -530,104 +535,121 @@ subscribe(Config) ->
530535 ok = amqp10_client :end_session (Session ),
531536 ok = amqp10_client :close_connection (Connection ).
532537
533- subscribe_with_auto_flow (Config ) ->
538+ subscribe_with_auto_flow_settled (Config ) ->
539+ SenderSettleMode = settled ,
534540 Hostname = ? config (rmq_hostname , Config ),
535541 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
536- QueueName = << " test-sub " >> ,
542+ QueueName = atom_to_binary ( ? FUNCTION_NAME ) ,
537543 {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
538544 {ok , Session } = amqp10_client :begin_session (Connection ),
539545 {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
540546 <<" sub-sender" >>,
541547 QueueName ),
542548 await_link (Sender , credited , link_credit_timeout ),
543549
544- _ = publish_messages (Sender , <<" banana" >>, 20 ),
545- % % Use sender settle mode 'settled'.
546- {ok , R1 } = amqp10_client :attach_receiver_link (
547- Session , <<" sub-receiver-1" >>, QueueName , settled ),
548- await_link (R1 , attached , attached_timeout ),
549- ok = amqp10_client :flow_link_credit (R1 , 5 , 2 ),
550- ? assertEqual (20 , count_received_messages (R1 )),
551- ok = amqp10_client :detach_link (R1 ),
550+ publish_messages (Sender , <<" banana" >>, 20 ),
551+ {ok , Receiver } = amqp10_client :attach_receiver_link (
552+ Session , <<" sub-receiver" >>, QueueName , SenderSettleMode ),
553+ await_link (Receiver , attached , attached_timeout ),
554+
555+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
556+ ? assertEqual (20 , count_received_messages (Receiver )),
557+
558+ ok = amqp10_client :detach_link (Receiver ),
559+ ok = amqp10_client :detach_link (Sender ),
560+ ok = amqp10_client :end_session (Session ),
561+ ok = amqp10_client :close_connection (Connection ).
562+
563+ subscribe_with_auto_flow_unsettled (Config ) ->
564+ SenderSettleMode = unsettled ,
565+ Hostname = ? config (rmq_hostname , Config ),
566+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
567+ QueueName = atom_to_binary (? FUNCTION_NAME ),
568+ {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
569+ {ok , Session } = amqp10_client :begin_session (Connection ),
570+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session ,
571+ <<" sub-sender" >>,
572+ QueueName ),
573+ await_link (Sender , credited , link_credit_timeout ),
552574
553- _ = publish_messages (Sender , <<" banana " >>, 30 ),
575+ _ = publish_messages (Sender , <<" 1- " >>, 30 ),
554576 % % Use sender settle mode 'unsettled'.
555577 % % This should require us to manually settle message in order to receive more messages.
556- {ok , R2 } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , unsettled ),
557- await_link (R2 , attached , attached_timeout ),
558- ok = amqp10_client :flow_link_credit (R2 , 5 , 2 ),
578+ {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" sub-receiver-2" >>, QueueName , SenderSettleMode ),
579+ await_link (Receiver , attached , attached_timeout ),
580+ ok = amqp10_client :flow_link_credit (Receiver , 5 , 2 ),
559581 % % We should receive exactly 5 messages.
560- [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (R2 , 5 ),
561- ok = assert_no_message (R2 ),
582+ [M1 , _M2 , M3 , M4 , M5 ] = receive_messages (Receiver , 5 ),
583+ ok = assert_no_message (Receiver ),
562584
563585 % % Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
564586 % % Therefore, the client should not yet grant more credits to the sender.
565587 ok = amqp10_client_session :disposition (
566- R2 , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
567- ok = assert_no_message (R2 ),
588+ Receiver , amqp10_msg :delivery_id (M1 ), amqp10_msg :delivery_id (M3 ), true , accepted ),
589+ ok = assert_no_message (Receiver ),
568590
569591 % % When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
570592 % % the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
571593 % % Therefore, the client should grant 5 credits to the sender.
572594 % % Therefore, we should receive 5 more messages.
573- ok = amqp10_client :accept_msg (R2 , M5 ),
574- [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (R2 , 5 ),
575- ok = assert_no_message (R2 ),
595+ ok = amqp10_client :accept_msg (Receiver , M5 ),
596+ [_M6 , _M7 , _M8 , _M9 , M10 ] = receive_messages (Receiver , 5 ),
597+ ok = assert_no_message (Receiver ),
576598
577599 % % It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
578600 % % Settling all in flight messages should cause us to receive exactly 5 more messages.
579601 ok = amqp10_client_session :disposition (
580- R2 , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
581- [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (R2 , 5 ),
582- ok = assert_no_message (R2 ),
602+ Receiver , amqp10_msg :delivery_id (M4 ), amqp10_msg :delivery_id (M10 ), true , rejected ),
603+ [M11 , _M12 , _M13 , _M14 , M15 ] = receive_messages (Receiver , 5 ),
604+ ok = assert_no_message (Receiver ),
583605
584606 % % Dynamically decrease link credit.
585607 % % Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
586- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
587- [M16 , _M17 , M18 ] = receive_messages (R2 , 3 ),
588- ok = assert_no_message (R2 ),
608+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
609+ [M16 , _M17 , M18 ] = receive_messages (Receiver , 3 ),
610+ ok = assert_no_message (Receiver ),
589611
590612 ok = amqp10_client_session :disposition (
591- R2 , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
613+ Receiver , amqp10_msg :delivery_id (M11 ), amqp10_msg :delivery_id (M15 ), true , accepted ),
592614 % % However, the RenewWhenBelow=3 still refers to all unsettled messages.
593615 % % Right now we have 3 messages (M16, M17, M18) unsettled.
594- ok = assert_no_message (R2 ),
616+ ok = assert_no_message (Receiver ),
595617
596618 % % Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
597619 % % in 3 new messages to be received.
598- ok = amqp10_client :accept_msg (R2 , M18 ),
599- [_M19 , _M20 , _M21 ] = receive_messages (R2 , 3 ),
600- ok = assert_no_message (R2 ),
620+ ok = amqp10_client :accept_msg (Receiver , M18 ),
621+ [_M19 , _M20 , _M21 ] = receive_messages (Receiver , 3 ),
622+ ok = assert_no_message (Receiver ),
601623
602- ok = amqp10_client :flow_link_credit (R2 , 3 , never , true ),
603- [_M22 , _M23 , M24 ] = receive_messages (R2 , 3 ),
604- ok = assert_no_message (R2 ),
624+ ok = amqp10_client :flow_link_credit (Receiver , 3 , never , true ),
625+ [_M22 , _M23 , M24 ] = receive_messages (Receiver , 3 ),
626+ ok = assert_no_message (Receiver ),
605627
606628 % % Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
607629 ok = amqp10_client_session :disposition (
608- R2 , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
609- ok = assert_no_message (R2 ),
630+ Receiver , amqp10_msg :delivery_id (M16 ), amqp10_msg :delivery_id (M24 ), true , rejected ),
631+ ok = assert_no_message (Receiver ),
610632
611- ok = amqp10_client :flow_link_credit (R2 , 2 , never , false ),
612- [M25 , _M26 ] = receive_messages (R2 , 2 ),
613- ok = assert_no_message (R2 ),
633+ ok = amqp10_client :flow_link_credit (Receiver , 2 , never , false ),
634+ [M25 , _M26 ] = receive_messages (Receiver , 2 ),
635+ ok = assert_no_message (Receiver ),
614636
615- ok = amqp10_client :flow_link_credit (R2 , 3 , 3 ),
616- [_M27 , _M28 , M29 ] = receive_messages (R2 , 3 ),
617- ok = assert_no_message (R2 ),
637+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 3 ),
638+ [_M27 , _M28 , M29 ] = receive_messages (Receiver , 3 ),
639+ ok = assert_no_message (Receiver ),
618640
619641 ok = amqp10_client_session :disposition (
620- R2 , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
621- [M30 ] = receive_messages (R2 , 1 ),
622- ok = assert_no_message (R2 ),
623- ok = amqp10_client :accept_msg (R2 , M30 ),
642+ Receiver , amqp10_msg :delivery_id (M25 ), amqp10_msg :delivery_id (M29 ), true , accepted ),
643+ [M30 ] = receive_messages (Receiver , 1 ),
644+ ok = assert_no_message (Receiver ),
645+ ok = amqp10_client :accept_msg (Receiver , M30 ),
624646 % % The sender queue is empty now.
625- ok = assert_no_message (R2 ),
647+ ok = assert_no_message (Receiver ),
626648
627- ok = amqp10_client :flow_link_credit (R2 , 3 , 1 ),
628- _ = publish_messages (Sender , <<" banana " >>, 1 ),
629- [M31 ] = receive_messages (R2 , 1 ),
630- ok = amqp10_client :accept_msg (R2 , M31 ),
649+ ok = amqp10_client :flow_link_credit (Receiver , 3 , 1 ),
650+ _ = publish_messages (Sender , <<" 2- " >>, 1 ),
651+ [M31 ] = receive_messages (Receiver , 1 ),
652+ ok = amqp10_client :accept_msg (Receiver , M31 ),
631653
632654 % % Since function flow_link_credit/3 documents
633655 % % "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
@@ -637,24 +659,25 @@ subscribe_with_auto_flow(Config) ->
637659 % % remaining link credit (2) and unsettled messages (0) is 2.
638660 % %
639661 % % Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
640- _ = publish_messages (Sender , <<" banana " >>, 5 ),
641- [M32 , M33 ] = receive_messages (R2 , 2 ),
642- ok = assert_no_message (R2 ),
662+ _ = publish_messages (Sender , <<" 3- " >>, 5 ),
663+ [M32 , M33 ] = receive_messages (Receiver , 2 ),
664+ ok = assert_no_message (Receiver ),
643665
644666 % % When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
645667 % % falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
646- ok = amqp10_client :accept_msg (R2 , M32 ),
647- ok = assert_no_message (R2 ),
648- ok = amqp10_client :accept_msg (R2 , M33 ),
649-
650- [M35 , M36 , M37 ] = receive_messages (R2 , 3 ),
651- ok = amqp10_client :accept_msg (R2 , M35 ),
652- ok = amqp10_client :accept_msg (R2 , M36 ),
653- ok = amqp10_client :accept_msg (R2 , M37 ),
668+ ok = amqp10_client :accept_msg (Receiver , M32 ),
669+ ok = assert_no_message (Receiver ),
670+ ok = amqp10_client :accept_msg (Receiver , M33 ),
671+
672+ [M35 , M36 , M37 ] = receive_messages (Receiver , 3 ),
673+ ok = amqp10_client :accept_msg (Receiver , M35 ),
674+ ok = amqp10_client :accept_msg (Receiver , M36 ),
675+ ok = amqp10_client :accept_msg (Receiver , M37 ),
654676 % % The sender queue is empty now.
655- ok = assert_no_message (R2 ),
677+ ok = assert_no_message (Receiver ),
656678
657- ok = amqp10_client :detach_link (R2 ),
679+ ok = amqp10_client :detach_link (Receiver ),
680+ ok = amqp10_client :detach_link (Sender ),
658681 ok = amqp10_client :end_session (Session ),
659682 ok = amqp10_client :close_connection (Connection ).
660683
@@ -817,18 +840,18 @@ await_link(Who, What, Err) ->
817840 ok ;
818841 {amqp10_event , {link , Who0 , {detached , Why }}}
819842 when Who0 =:= Who ->
820- exit (Why )
843+ ct : fail (Why )
821844 after 5000 ->
822845 flush (),
823- exit (Err )
846+ ct : fail (Err )
824847 end .
825848
826- publish_messages (Sender , Data , Num ) ->
849+ publish_messages (Sender , BodyPrefix , Num ) ->
827850 [begin
828- Tag = integer_to_binary (T ),
829- Msg = amqp10_msg :new (Tag , Data , false ),
830- ok = amqp10_client :send_msg (Sender , Msg ),
831- ok = await_disposition (Tag )
851+ Tag = integer_to_binary (T ),
852+ Msg = amqp10_msg :new (Tag , << BodyPrefix / binary , Tag / binary >> , false ),
853+ ok = amqp10_client :send_msg (Sender , Msg ),
854+ ok = await_disposition (Tag )
832855 end || T <- lists :seq (1 , Num )].
833856
834857await_disposition (DeliveryTag ) ->
@@ -847,7 +870,7 @@ count_received_messages0(Receiver, Count) ->
847870 receive
848871 {amqp10_msg , Receiver , _Msg } ->
849872 count_received_messages0 (Receiver , Count + 1 )
850- after 200 ->
873+ after 500 ->
851874 Count
852875 end .
853876
@@ -861,7 +884,15 @@ receive_messages0(Receiver, N, Acc) ->
861884 {amqp10_msg , Receiver , Msg } ->
862885 receive_messages0 (Receiver , N - 1 , [Msg | Acc ])
863886 after 5000 ->
864- ct :fail ({timeout , {num_received , length (Acc )}, {num_missing , N }})
887+ LastReceivedMsg = case Acc of
888+ [] -> none ;
889+ [M | _ ] -> M
890+ end ,
891+ ct :fail ({timeout ,
892+ {num_received , length (Acc )},
893+ {num_missing , N },
894+ {last_received_msg , LastReceivedMsg }
895+ })
865896 end .
866897
867898assert_no_message (Receiver ) ->
0 commit comments