@@ -1127,9 +1127,8 @@ send_pending(#state{remote_incoming_window = Space,
11271127 {{value , # pending_transfer {
11281128 frames = Frames ,
11291129 queue_pid = QPid ,
1130- outgoing_unsettled = # outgoing_unsettled {
1131- consumer_tag = Ctag ,
1132- queue_name = QName }} = Pending }, Buf1 }
1130+ outgoing_unsettled = # outgoing_unsettled {queue_name = QName }
1131+ } = Pending }, Buf1 }
11331132 when Space > 0 ->
11341133 SendFun = case rabbit_queue_type :module (QName , State0 # state .queue_states ) of
11351134 {ok , rabbit_classic_queue } ->
@@ -1143,28 +1142,20 @@ send_pending(#state{remote_incoming_window = Space,
11431142 WriterPid , Ch , Transfer , Sections )
11441143 end
11451144 end ,
1146- % % rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
1145+ { NumTransfersSent , Buf , State1 } =
11471146 case send_frames (SendFun , Frames , Space ) of
11481147 {all , SpaceLeft } ->
1149- State1 = # state {outgoing_links = OutgoingLinks0 } = session_flow_control_sent_transfers (
1150- Space - SpaceLeft , State0 ),
1151- HandleInt = ctag_to_handle (Ctag ),
1152- OutgoingLinks = maps :update_with (
1153- HandleInt ,
1154- fun (# outgoing_link {delivery_count = {credit_api_v1 , C }} = Link ) ->
1155- Link # outgoing_link {delivery_count = {credit_api_v1 , add (C , 1 )}};
1156- (# outgoing_link {delivery_count = credit_api_v2 } = Link ) ->
1157- Link
1158- end ,
1159- OutgoingLinks0 ),
1160- State2 = State1 # state {outgoing_links = OutgoingLinks },
1161- State = record_outgoing_unsettled (Pending , State2 ),
1162- send_pending (State # state {outgoing_pending = Buf1 });
1148+ {Space - SpaceLeft ,
1149+ Buf1 ,
1150+ record_outgoing_unsettled (Pending , State0 )};
11631151 {some , Rest } ->
1164- State = session_flow_control_sent_transfers (Space , State0 ),
1165- Buf = queue :in_r (Pending # pending_transfer {frames = Rest }, Buf1 ),
1166- send_pending (State # state {outgoing_pending = Buf })
1167- end ;
1152+ {Space ,
1153+ queue :in_r (Pending # pending_transfer {frames = Rest }, Buf1 ),
1154+ State0 }
1155+ end ,
1156+ State2 = session_flow_control_sent_transfers (NumTransfersSent , State1 ),
1157+ State = State2 # state {outgoing_pending = Buf },
1158+ send_pending (State );
11681159 {{value , # pending_transfer {}}, _ }
11691160 when Space =:= 0 ->
11701161 State0
@@ -1415,17 +1406,18 @@ handle_deliver(ConsumerTag, AckRequired,
14151406 Msg = {QName , QPid , MsgId , Redelivered , Mc0 },
14161407 State = # state {outgoing_pending = Pending ,
14171408 outgoing_delivery_id = DeliveryId ,
1418- outgoing_links = OutgoingLinks ,
1409+ outgoing_links = OutgoingLinks0 ,
14191410 cfg = # cfg {outgoing_max_frame_size = MaxFrameSize ,
14201411 conn_name = ConnName ,
14211412 channel_num = ChannelNum ,
14221413 user = # user {username = Username },
14231414 trace_state = Trace }}) ->
14241415 Handle = ctag_to_handle (ConsumerTag ),
1425- case OutgoingLinks of
1416+ case OutgoingLinks0 of
14261417 #{Handle := # outgoing_link {queue_type = QType ,
14271418 send_settled = SendSettled ,
1428- max_message_size = MaxMessageSize }} ->
1419+ max_message_size = MaxMessageSize ,
1420+ delivery_count = DelCount } = Link0 } ->
14291421 Dtag = delivery_tag (MsgId , SendSettled ),
14301422 Transfer = # 'v1_0.transfer' {
14311423 handle = ? UINT (Handle ),
@@ -1451,6 +1443,13 @@ handle_deliver(ConsumerTag, AckRequired,
14511443 end ,
14521444 messages_delivered (Redelivered , QType ),
14531445 rabbit_trace :tap_out (Msg , ConnName , ChannelNum , Username , Trace ),
1446+ OutgoingLinks = case DelCount of
1447+ credit_api_v2 ->
1448+ OutgoingLinks0 ;
1449+ {credit_api_v1 , C } ->
1450+ Link = Link0 # outgoing_link {delivery_count = {credit_api_v1 , add (C , 1 )}},
1451+ maps :update (Handle , Link , OutgoingLinks0 )
1452+ end ,
14541453 Del = # outgoing_unsettled {
14551454 msg_id = MsgId ,
14561455 consumer_tag = ConsumerTag ,
@@ -1465,8 +1464,9 @@ handle_deliver(ConsumerTag, AckRequired,
14651464 queue_pid = QPid ,
14661465 delivery_id = DeliveryId ,
14671466 outgoing_unsettled = Del },
1468- State # state {outgoing_delivery_id = add (DeliveryId , 1 ),
1469- outgoing_pending = queue :in (PendingTransfer , Pending )};
1467+ State # state {outgoing_pending = queue :in (PendingTransfer , Pending ),
1468+ outgoing_delivery_id = add (DeliveryId , 1 ),
1469+ outgoing_links = OutgoingLinks };
14701470 _ ->
14711471 % % TODO handle missing link -- why does the queue think it's there?
14721472 rabbit_log :warning (
0 commit comments