2121 detach /2 ,
2222 transfer /3 ,
2323 flow /4 ,
24- disposition /6
24+ disposition /5
2525 ]).
2626
2727% % Private API
131131 available = 0 :: non_neg_integer (),
132132 drain = false :: boolean (),
133133 partial_transfers :: undefined | {# 'v1_0.transfer' {}, [binary ()]},
134- auto_flow :: never | {auto , RenewWhenBelow :: pos_integer (), Credit :: pos_integer ()}
135- }).
134+ auto_flow :: never | {auto , RenewWhenBelow :: pos_integer (), Credit :: pos_integer ()},
135+ incoming_unsettled = #{} :: #{delivery_number () => ok }
136+ }).
136137
137138-record (state ,
138139 {channel :: pos_integer (),
155156 connection_config :: amqp10_client_connection :connection_config (),
156157 outgoing_delivery_id = ? INITIAL_OUTGOING_DELIVERY_ID :: delivery_number (),
157158 outgoing_unsettled = #{} :: #{delivery_number () => {amqp10_msg :delivery_tag (), Notify :: pid ()}},
158- incoming_unsettled = #{} :: #{delivery_number () => output_handle ()},
159159 notify :: pid ()
160160 }).
161161
@@ -204,14 +204,18 @@ transfer(Session, Amqp10Msg, Timeout) ->
204204flow (Session , Handle , Flow , RenewWhenBelow ) ->
205205 gen_statem :cast (Session , {flow_link , Handle , Flow , RenewWhenBelow }).
206206
207- -spec disposition (pid (), link_role (), delivery_number (), delivery_number (), boolean (),
207+ % % Sending a disposition on a sender link (with receiver-settle-mode = second)
208+ % % is currently unsupported.
209+ -spec disposition (link_ref (), delivery_number (), delivery_number (), boolean (),
208210 amqp10_client_types :delivery_state ()) -> ok .
209- disposition (Session , Role , First , Last , Settled , DeliveryState ) ->
210- gen_statem :call (Session , {disposition , Role , First , Last , Settled ,
211+ disposition (# link_ref {role = receiver ,
212+ session = Session ,
213+ link_handle = Handle },
214+ First , Last , Settled , DeliveryState ) ->
215+ gen_statem :call (Session , {disposition , Handle , First , Last , Settled ,
211216 DeliveryState }, ? TIMEOUT ).
212217
213218
214-
215219% % -------------------------------------------------------------------
216220% % Private API.
217221% % -------------------------------------------------------------------
@@ -277,7 +281,7 @@ mapped(cast, 'end', State) ->
277281 send_end (State ),
278282 {next_state , end_sent , State };
279283mapped (cast , {flow_link , OutHandle , Flow0 , RenewWhenBelow }, State0 ) ->
280- State = send_flow_link (fun send / 2 , OutHandle , Flow0 , RenewWhenBelow , State0 ),
284+ State = send_flow_link (OutHandle , Flow0 , RenewWhenBelow , State0 ),
281285 {keep_state , State };
282286mapped (cast , {flow_session , Flow0 = # 'v1_0.flow' {incoming_window = {uint , IncomingWindow }}},
283287 # state {next_incoming_id = NII ,
@@ -367,45 +371,43 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
367371 State = book_partial_transfer_received (
368372 State0 # state {links = Links #{OutHandle => Link1 }}),
369373 {keep_state , State };
370- mapped (cast , {# 'v1_0.transfer' {handle = {uint , InHandle },
371- delivery_id = MaybeDeliveryId ,
372- settled = Settled } = Transfer0 , Payload0 },
373- # state {incoming_unsettled = Unsettled0 } = State0 ) ->
374-
374+ mapped (cast , {Transfer0 = # 'v1_0.transfer' {handle = {uint , InHandle }},
375+ Payload0 }, State0 ) ->
375376 {ok , # link {target = {pid , TargetPid },
376- output_handle = OutHandle ,
377- ref = LinkRef } = Link0 } =
378- find_link_by_input_handle (InHandle , State0 ),
377+ ref = LinkRef ,
378+ incoming_unsettled = Unsettled
379+ } = Link0 } = find_link_by_input_handle (InHandle , State0 ),
379380
380- {Transfer , Payload , Link1 } = complete_partial_transfer (Transfer0 , Payload0 , Link0 ),
381- Msg = decode_as_msg (Transfer , Payload ),
382-
383- % stash the DeliveryId - not sure for what yet
384- Unsettled = case MaybeDeliveryId of
385- {uint , DeliveryId } when Settled =/= true ->
386- Unsettled0 #{DeliveryId => OutHandle };
387- _ ->
388- Unsettled0
389- end ,
381+ {Transfer = # 'v1_0.transfer' {settled = Settled ,
382+ delivery_id = {uint , DeliveryId }},
383+ Payload , Link1 } = complete_partial_transfer (Transfer0 , Payload0 , Link0 ),
390384
385+ Msg = decode_as_msg (Transfer , Payload ),
386+ Link2 = case Settled of
387+ true ->
388+ Link1 ;
389+ _ ->
390+ % % "If not set on the first (or only) transfer for a (multi-transfer) delivery,
391+ % % then the settled flag MUST be interpreted as being false." [2.7.5]
392+ Link1 # link {incoming_unsettled = Unsettled #{DeliveryId => ok }}
393+ end ,
391394 % link bookkeeping
392395 % notify when credit is exhausted (link_credit = 0)
393396 % detach the Link with a transfer-limit-exceeded error code if further
394397 % transfers are received
395- State1 = State0 # state {incoming_unsettled = Unsettled },
396- case book_transfer_received (State1 , Link1 ) of
397- {ok , Link2 , State2 } ->
398+ case book_transfer_received (State0 , Link2 ) of
399+ {ok , Link3 , State1 } ->
398400 % deliver
399401 TargetPid ! {amqp10_msg , LinkRef , Msg },
400- State = auto_flow (Link2 , State2 ),
402+ State = auto_flow (Link3 , State1 ),
401403 {keep_state , State };
402- {credit_exhausted , Link2 , State } ->
404+ {credit_exhausted , Link3 , State } ->
403405 TargetPid ! {amqp10_msg , LinkRef , Msg },
404- notify_credit_exhausted (Link2 ),
406+ notify_credit_exhausted (Link3 ),
405407 {keep_state , State };
406- {transfer_limit_exceeded , Link2 , State } ->
407- logger :warning (" transfer_limit_exceeded for link ~tp " , [Link2 ]),
408- Link = detach_with_error_cond (Link2 , State ,
408+ {transfer_limit_exceeded , Link3 , State } ->
409+ logger :warning (" transfer_limit_exceeded for link ~tp " , [Link3 ]),
410+ Link = detach_with_error_cond (Link3 , State ,
409411 ? V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED ),
410412 {keep_state , update_link (Link , State )}
411413 end ;
@@ -501,12 +503,15 @@ mapped({call, From},
501503 end ;
502504
503505mapped ({call , From },
504- {disposition , Role , First , Last , Settled0 , DeliveryState },
505- # state {incoming_unsettled = Unsettled0 } = State0 ) ->
506+ {disposition , OutputHandle , First , Last , Settled0 , DeliveryState },
507+ # state {links = Links } = State0 ) ->
508+ #{OutputHandle := Link0 = # link {incoming_unsettled = Unsettled0 }} = Links ,
506509 Unsettled = serial_number :foldl (fun maps :remove /2 , Unsettled0 , First , Last ),
507- State = State0 # state {incoming_unsettled = Unsettled },
510+ Link = Link0 # link {incoming_unsettled = Unsettled },
511+ State1 = State0 # state {links = Links #{OutputHandle := Link }},
512+ State = auto_flow (Link , State1 ),
508513 Disposition = # 'v1_0.disposition' {
509- role = translate_role (Role ),
514+ role = translate_role (receiver ),
510515 first = {uint , First },
511516 last = {uint , Last },
512517 settled = Settled0 ,
@@ -599,7 +604,7 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket,
599604 {ok , length (Frames )}
600605 end .
601606
602- send_flow_link (Send , OutHandle ,
607+ send_flow_link (OutHandle ,
603608 # 'v1_0.flow' {link_credit = {uint , Credit }} = Flow0 , RenewWhenBelow ,
604609 # state {links = Links ,
605610 next_incoming_id = NII ,
@@ -625,7 +630,7 @@ send_flow_link(Send, OutHandle,
625630 % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
626631 delivery_count = maybe_uint (DeliveryCount ),
627632 available = uint (Available )},
628- ok = Send (Flow , State ),
633+ ok = send (Flow , State ),
629634 State # state {links = Links #{OutHandle =>
630635 Link # link {link_credit = Credit ,
631636 auto_flow = AutoFlow }}}.
@@ -777,8 +782,9 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
777782 max_message_size = MaxMessageSize },
778783 ok = Send (Attach , State ),
779784
785+ LinkRef = make_link_ref (element (1 , Role ), self (), OutHandle ),
780786 Link = # link {name = Name ,
781- ref = make_link_ref ( element ( 1 , Role ), self (), OutHandle ) ,
787+ ref = LinkRef ,
782788 output_handle = OutHandle ,
783789 state = attach_sent ,
784790 role = element (1 , Role ),
@@ -790,7 +796,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
790796
791797 {State # state {links = Links #{OutHandle => Link },
792798 next_link_handle = NextLinkHandle ,
793- link_index = LinkIndex #{Name => OutHandle }}, Link # link . ref }.
799+ link_index = LinkIndex #{Name => OutHandle }}, LinkRef }.
794800
795801- spec handle_session_flow (# 'v1_0.flow' {}, # state {}) -> # state {}.
796802handle_session_flow (# 'v1_0.flow' {next_incoming_id = MaybeNII ,
@@ -908,7 +914,6 @@ translate_delivery_state({modified,
908914translate_delivery_state (released ) -> # 'v1_0.released' {};
909915translate_delivery_state (received ) -> # 'v1_0.received' {}.
910916
911- translate_role (sender ) -> false ;
912917translate_role (receiver ) -> true .
913918
914919maybe_notify_link_credit (# link {role = sender ,
@@ -987,9 +992,11 @@ book_transfer_received(#state{next_incoming_id = NID,
987992
988993auto_flow (# link {link_credit = LC ,
989994 auto_flow = {auto , RenewWhenBelow , Credit },
990- output_handle = OutHandle }, State )
991- when LC < RenewWhenBelow ->
992- send_flow_link (fun send /2 , OutHandle ,
995+ output_handle = OutHandle ,
996+ incoming_unsettled = Unsettled },
997+ State )
998+ when LC + map_size (Unsettled ) < RenewWhenBelow ->
999+ send_flow_link (OutHandle ,
9931000 # 'v1_0.flow' {link_credit = {uint , Credit }},
9941001 RenewWhenBelow , State );
9951002auto_flow (_ , State ) ->
@@ -1045,7 +1052,8 @@ socket_send0({tcp, Socket}, Data) ->
10451052socket_send0 ({ssl , Socket }, Data ) ->
10461053 ssl :send (Socket , Data ).
10471054
1048- - spec make_link_ref (_ , _ , _ ) -> link_ref ().
1055+ - spec make_link_ref (link_role (), pid (), output_handle ()) ->
1056+ link_ref ().
10491057make_link_ref (Role , Session , Handle ) ->
10501058 # link_ref {role = Role , session = Session , link_handle = Handle }.
10511059
@@ -1100,7 +1108,6 @@ format_status(Status = #{data := Data0}) ->
11001108 connection_config = ConnectionConfig ,
11011109 outgoing_delivery_id = OutgoingDeliveryId ,
11021110 outgoing_unsettled = OutgoingUnsettled ,
1103- incoming_unsettled = IncomingUnsettled ,
11041111 notify = Notify
11051112 } = Data0 ,
11061113 Links = maps :map (
@@ -1119,7 +1126,8 @@ format_status(Status = #{data := Data0}) ->
11191126 available = Available ,
11201127 drain = Drain ,
11211128 partial_transfers = PartialTransfers0 ,
1122- auto_flow = AutoFlow
1129+ auto_flow = AutoFlow ,
1130+ incoming_unsettled = IncomingUnsettled
11231131 }) ->
11241132 PartialTransfers = case PartialTransfers0 of
11251133 undefined ->
@@ -1141,7 +1149,9 @@ format_status(Status = #{data := Data0}) ->
11411149 available => Available ,
11421150 drain => Drain ,
11431151 partial_transfers => PartialTransfers ,
1144- auto_flow => AutoFlow }
1152+ auto_flow => AutoFlow ,
1153+ incoming_unsettled => maps :size (IncomingUnsettled )
1154+ }
11451155 end , Links0 ),
11461156 Data = #{channel => Channel ,
11471157 remote_channel => RemoteChannel ,
@@ -1160,7 +1170,6 @@ format_status(Status = #{data := Data0}) ->
11601170 connection_config => maps :remove (sasl , ConnectionConfig ),
11611171 outgoing_delivery_id => OutgoingDeliveryId ,
11621172 outgoing_unsettled => maps :size (OutgoingUnsettled ),
1163- incoming_unsettled => maps :size (IncomingUnsettled ),
11641173 notify => Notify },
11651174 Status #{data := Data }.
11661175
0 commit comments