99
1010-behaviour (gen_server ).
1111
12- -include_lib (" amqp_client /include/amqp_client .hrl" ).
12+ -include_lib (" rabbit_common /include/rabbit .hrl" ).
1313-include (" rabbit_amqp1_0.hrl" ).
1414
1515-define (MAX_SESSION_WINDOW_SIZE , 65_535 ).
2626 ? V_1_0_SYMBOL_MODIFIED ]).
2727
2828% % Just make these constant for the time being.
29- -define (INCOMING_CREDIT , 65536 ).
29+ -define (INCOMING_CREDIT , 65_536 ).
3030
3131-define (MAX_PERMISSION_CACHE_SIZE , 12 ).
3232
4040 get_info /1 ]).
4141-export ([init /1 ,
4242 terminate /2 ,
43- code_change /3 ,
4443 handle_call /3 ,
4544 handle_cast /2 ,
4645 handle_info /2 ]).
5352 exchange :: rabbit_exchange :name (),
5453 routing_key :: undefined | rabbit_types :routing_key (),
5554 delivery_id :: undefined | delivery_number (),
56- delivery_count = 0 ,
55+ delivery_count = 0 :: sequence_no () ,
5756 send_settle_mode = undefined ,
5857 recv_settle_mode = undefined ,
5958 credit_used = ? INCOMING_CREDIT div 2 ,
6059 msg_acc = []}).
6160
6261-record (outgoing_link , {
6362 queue :: undefined | rabbit_misc :resource_name (),
64- delivery_count = 0 ,
65- % % TODO below field is not needed?
66- send_settled }).
63+ delivery_count = 0 :: sequence_no (),
64+ send_settled :: boolean ()}).
6765
6866-record (outgoing_unsettled , {
6967 % % The queue sent us this consumer scoped sequence number.
8583
8684% %TODO put rarely used fields into separate #cfg{}
8785-record (state , {frame_max ,
88- reader_pid ,
89- writer_pid ,
86+ reader_pid :: pid () ,
87+ writer_pid :: pid () ,
9088 % % These messages were received from queues thanks to sufficient link credit.
9189 % % However, they are buffered here due to session flow control before being sent to the client.
9290 pending_transfers = queue :new () :: queue :queue (# pending_transfer {}),
103101 outgoing_window_max ,
104102 next_publish_id , % % the 0-9-1-side counter for confirms
105103 next_delivery_id = 0 :: delivery_number (),
106- incoming_unsettled_map , % %TODO delete
107- outgoing_unsettled_map = gb_trees :empty () :: gb_trees :tree (delivery_number (), # outgoing_unsettled {}),
108- queue_states = rabbit_queue_type :init () :: rabbit_queue_type :state (),
109104 % % TRANSFER delivery IDs published to queues but not yet confirmed by queues
110105 % % TODO Use a different data structure because
111106 % % 1. we don't need to record exchanges since we don't emit channel stats,
112107 % % 2. mixed mode can result in large gaps across delivery_ids that need to be confirmed. Use a tree?
113108 % % 3. handle wrap around of 32-bit RFC-1982 serial number
114- unconfirmed = rabbit_confirms :init () :: rabbit_confirms :state ()
109+ incoming_unsettled_map = rabbit_confirms :init () :: rabbit_confirms :state (),
110+ outgoing_unsettled_map = gb_trees :empty () :: gb_trees :tree (delivery_number (), # outgoing_unsettled {}),
111+ queue_states = rabbit_queue_type :init () :: rabbit_queue_type :state ()
115112 % % TRANSFER delivery IDs confirmed by queues but yet to be sent to the client
116113 % %TODO accumulate confirms and send DISPOSITIONs after processing the mailbox
117114 % %(see rabbit_channel:noreply_coalesce/1
@@ -138,63 +135,18 @@ init({Channel, ReaderPid, WriterPid, User, Vhost, FrameMax}) ->
138135 user = User ,
139136 vhost = Vhost ,
140137 channel_num = Channel ,
141- next_publish_id = 0 ,
142- incoming_unsettled_map = gb_trees :empty ()
138+ next_publish_id = 0
143139 }}.
144140
145141terminate (_Reason , _State ) ->
146142 ok .
147143
148- code_change (_OldVsn , State , _Extra ) ->
149- {ok , State }.
150-
151144handle_call (info , _From , # state {reader_pid = ReaderPid } = State ) ->
152145 Info = [{reader , ReaderPid }],
153146 {reply , Info , State };
154147handle_call (Msg , _From , State ) ->
155148 {reply , {error , not_understood , Msg }, State }.
156149
157- handle_info (# 'basic.consume_ok' {}, State ) ->
158- % % Handled above
159- {noreply , State };
160-
161- handle_info (# 'basic.cancel_ok' {}, State ) ->
162- % % just ignore this for now,
163- % % At some point we should send the detach here but then we'd need to track
164- % % consumer tags -> link handle somewhere
165- {noreply , State };
166-
167- % % A message from the queue saying that there are no more messages
168- % handle_info(#'basic.credit_drained'{consumer_tag = CTag} = CreditDrained,
169- % State = #state{writer_pid = WriterPid,
170- % session = Session}) ->
171- % Handle = ctag_to_handle(CTag),
172- % Link = get({out, Handle}),
173- % {Flow0, Link1} = rabbit_amqp1_0_session:outgoing_link_credit_drained(
174- % CreditDrained, Handle, Link),
175- % Flow = rabbit_amqp1_0_session:flow_fields(Flow0, Session),
176- % rabbit_amqp1_0_writer:send_command(WriterPid, Flow),
177- % put({out, Handle}, Link1),
178- % {noreply, State};
179-
180- % handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
181- % session = Session}) ->
182- % {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
183- % case Reply of
184- % undefined ->
185- % ok;
186- % _ ->
187- % rabbit_amqp1_0_writer:send_command(
188- % WriterPid,
189- % rabbit_amqp1_0_session:flow_fields(Reply, Session)
190- % )
191- % end,
192- % {noreply, state(Session1, State)};
193-
194- % handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
195- % rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
196- % {noreply, state(Session, State)};
197-
198150handle_info ({bump_credit , Msg }, State ) ->
199151 credit_flow :handle_bump_msg (Msg ),
200152 {noreply , State };
@@ -830,12 +782,6 @@ flow(#'v1_0.flow'{next_incoming_id = FlowNextIn0,
830782 end
831783 end .
832784
833- acknowledgement (DeliveryIds , Disposition ) ->
834- Disposition # 'v1_0.disposition' {first = {uint , hd (DeliveryIds )},
835- last = {uint , lists :last (DeliveryIds )},
836- settled = true ,
837- state = # 'v1_0.accepted' {}}.
838-
839785set_delivery_id ({uint , D }, # incoming_link {delivery_id = undefined } = Link ) ->
840786 % % "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery.
841787 Link # incoming_link {delivery_id = D };
@@ -893,20 +839,43 @@ handle_queue_event({queue_event, QRef, Evt},
893839 % rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
894840 end .
895841
842+ % handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
843+ % session = Session}) ->
844+ % {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
845+ % case Reply of
846+ % undefined ->
847+ % ok;
848+ % _ ->
849+ % rabbit_amqp1_0_writer:send_command(
850+ % WriterPid,
851+ % rabbit_amqp1_0_session:flow_fields(Reply, Session)
852+ % )
853+ % end,
854+ % {noreply, state(Session1, State)};
855+
856+ % handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
857+ % rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
858+ % {noreply, state(Session, State)};
859+
896860handle_queue_actions (Actions , State0 ) ->
897861 {ReplyRev , State } =
898862 lists :foldl (
899- fun ({settled , QName , DelIds }, {Reply , S0 = # state {unconfirmed = U0 }}) ->
863+ fun ({settled , QName , DelIds }, {Reply , S0 = # state {incoming_unsettled_map = U0 }}) ->
900864 {ConfirmMXs , U } = rabbit_confirms :confirm (DelIds , QName , U0 ),
901- S = S0 # state {unconfirmed = U },
865+ S = S0 # state {incoming_unsettled_map = U },
902866 R = if ConfirmMXs =:= [] ->
903867 Reply ;
904868 ConfirmMXs =/= [] ->
905869 ConfirmDelIds = lists :map (fun ({Id , _Exchange }) -> Id end , ConfirmMXs ),
906870 Ids = lists :usort (ConfirmDelIds ),
907871 % %TODO defer sending confirms as done in rabbit_channel
908872 % record_confirms(ConfirmDelIds, S)
909- Disposition = acknowledgement (Ids , # 'v1_0.disposition' {role = ? RECV_ROLE }),
873+ Disposition = # 'v1_0.disposition' {
874+ role = ? RECV_ROLE ,
875+ settled = true ,
876+ state = # 'v1_0.accepted' {},
877+ first = {uint , hd (Ids )},
878+ last = {uint , lists :last (Ids )}},
910879 [Disposition | Reply ]
911880 end ,
912881 {R , S };
@@ -960,7 +929,7 @@ handle_deliver(ConsumerTag, AckRequired,
960929 Dtag = if is_integer (MsgId ) ->
961930 % % delivery-tag must be unique only per link (not per session)
962931 <<MsgId :64 >>;
963- MsgId =:= undefined ->
932+ MsgId =:= undefined andalso SendSettled ->
964933 % % Both ends of the link will always consider this message settled because
965934 % % "the sender will send all deliveries settled to the receiver" [3.8.2].
966935 % % Hence, the delivery tag does not have to be unique on this link.
@@ -1039,7 +1008,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
10391008 send_settle_mode = SSM ,
10401009 recv_settle_mode = RSM } = Link ,
10411010 # state {queue_states = QStates0 ,
1042- unconfirmed = U0 ,
1011+ incoming_unsettled_map = U0 ,
10431012 next_publish_id = NextPublishId0
10441013 } = State0 ) ->
10451014 MsgBin = iolist_to_binary (lists :reverse ([MsgPart | MsgAcc ])),
@@ -1094,7 +1063,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
10941063 {U , Reply0 } = process_routing_confirm (Qs , EffectiveSendSettleMode , DeliveryId , XName , U0 ),
10951064 State1 = State0 # state {queue_states = QStates ,
10961065 next_publish_id = NextPublishId ,
1097- unconfirmed = U },
1066+ incoming_unsettled_map = U },
10981067 {Reply1 , State } = handle_queue_actions (Actions , State1 ),
10991068 {SendFlow , CreditUsed1 } = case CreditUsed - 1 of
11001069 C when C =< 0 ->
0 commit comments