44-include (" amqqueue.hrl" ).
55-include_lib (" rabbit_common/include/rabbit.hrl" ).
66
7+ % % TODO possible to use sets / maps instead of lists?
8+ % % Check performance with QoS 1 and 1 million target queues.
79-record (msg_status , {pending :: [pid ()],
810 confirmed = [] :: [pid ()]}).
911
1012-define (STATE , ? MODULE ).
11- -record (? STATE , {pid :: undefined | pid (), % % the current master pid
12- qref :: term (), % % TODO
13- unconfirmed = #{} ::
14- #{non_neg_integer () => # msg_status {}}}).
13+ -record (? STATE , {
14+ % % the current master pid
15+ pid :: undefined | pid (),
16+ % % undefined if feature flag no_queue_name_in_classic_queue_client enabled
17+ qref :: term (),
18+ unconfirmed = #{} :: #{non_neg_integer () => # msg_status {}},
19+ monitored = #{} :: #{pid () => ok }
20+ }).
1521
1622
1723-opaque state () :: #? STATE {}.
@@ -156,9 +162,14 @@ stat(Q) ->
156162
157163-spec init (amqqueue :amqqueue ()) -> {ok , state ()}.
158164init (Q ) when ? amqqueue_is_classic (Q ) ->
159- QName = amqqueue :get_name (Q ),
165+ QRef = case rabbit_feature_flags :is_enabled (no_queue_name_in_classic_queue_client ) of
166+ true ->
167+ undefined ;
168+ false ->
169+ amqqueue :get_name (Q )
170+ end ,
160171 {ok , #? STATE {pid = amqqueue :get_pid (Q ),
161- qref = QName }}.
172+ qref = QRef }}.
162173
163174-spec close (state ()) -> ok .
164175close (_State ) ->
@@ -174,7 +185,7 @@ update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
174185 State #? STATE {pid = NewPid }
175186 end .
176187
177- consume (Q , Spec , State ) when ? amqqueue_is_classic (Q ) ->
188+ consume (Q , Spec , State0 ) when ? amqqueue_is_classic (Q ) ->
178189 QPid = amqqueue :get_pid (Q ),
179190 QRef = amqqueue :get_name (Q ),
180191 #{no_ack := NoAck ,
@@ -194,9 +205,9 @@ consume(Q, Spec, State) when ?amqqueue_is_classic(Q) ->
194205 ExclusiveConsume , Args , OkMsg , ActingUser },
195206 infinity ]}) of
196207 ok ->
197- % % ask the host process to monitor this pid
198208 % % TODO: track pids as they change
199- {ok , State #? STATE {pid = QPid }, [{monitor , QPid , QRef }]};
209+ State = ensure_monitor (QPid , QRef , State0 ),
210+ {ok , State #? STATE {pid = QPid }};
200211 Err ->
201212 Err
202213 end .
@@ -233,8 +244,10 @@ credit(CTag, Credit, Drain, State) ->
233244 [{credit , ChPid , CTag , Credit , Drain }]}),
234245 {State , []}.
235246
236- handle_event ({confirm , MsgSeqNos , Pid }, #? STATE {qref = QRef ,
237- unconfirmed = U0 } = State ) ->
247+ handle_event ({confirm , MsgSeqNos , Pid }, #? STATE {qref = QRef } = State ) ->
248+ % % backwards compatibility when feature flag no_queue_name_in_classic_queue_client disabled
249+ handle_event ({confirm , MsgSeqNos , Pid , QRef }, State );
250+ handle_event ({confirm , MsgSeqNos , Pid , QRef }, #? STATE {unconfirmed = U0 } = State ) ->
238251 % % confirms should never result in rejections
239252 {Unconfirmed , ConfirmedSeqNos , []} =
240253 settle_seq_nos (MsgSeqNos , Pid , U0 , confirm ),
@@ -247,17 +260,20 @@ handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef,
247260 {ok , State #? STATE {unconfirmed = Unconfirmed }, Actions };
248261handle_event ({deliver , _ , _ , _ } = Delivery , #? STATE {} = State ) ->
249262 {ok , State , [Delivery ]};
250- handle_event ({reject_publish , SeqNo , _QPid },
251- #? STATE {qref = QRef ,
252- unconfirmed = U0 } = State ) ->
263+ handle_event ({reject_publish , SeqNo , QPid }, #? STATE {qref = QRef } = State ) ->
264+ % % backwards compatibility when feature flag no_queue_name_in_classic_queue_client disabled
265+ handle_event ({reject_publish , SeqNo , QPid , QRef }, State );
266+ handle_event ({reject_publish , SeqNo , _QPid , QRef },
267+ #? STATE {unconfirmed = U0 } = State ) ->
253268 % % It does not matter which queue rejected the message,
254269 % % if any queue did, it should not be confirmed.
255270 {U , Rejected } = reject_seq_no (SeqNo , U0 ),
256271 Actions = [{rejected , QRef , Rejected }],
257272 {ok , State #? STATE {unconfirmed = U }, Actions };
258- handle_event ({down , Pid , Info }, #? STATE {qref = QRef ,
259- pid = MasterPid ,
260- unconfirmed = U0 } = State0 ) ->
273+ handle_event ({down , Pid , QRef , Info }, #? STATE {monitored = Monitored ,
274+ pid = MasterPid ,
275+ unconfirmed = U0 } = State0 ) ->
276+ State = State0 #? STATE {monitored = maps :remove (Pid , Monitored )},
261277 Actions0 = case Pid =:= MasterPid of
262278 true ->
263279 [{queue_down , QRef }];
@@ -279,7 +295,7 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
279295 Actions = settlement_action (
280296 settled , QRef , Settled ,
281297 settlement_action (rejected , QRef , Rejected , Actions0 )),
282- {ok , State0 #? STATE {unconfirmed = Unconfirmed }, Actions };
298+ {ok , State #? STATE {unconfirmed = Unconfirmed }, Actions };
283299 true ->
284300 % % any abnormal exit should be considered a full reject of the
285301 % % oustanding message ids - If the message didn't get to all
@@ -294,7 +310,7 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
294310 end
295311 end , [], U0 ),
296312 U = maps :without (MsgIds , U0 ),
297- {ok , State0 #? STATE {unconfirmed = U },
313+ {ok , State #? STATE {unconfirmed = U },
298314 [{rejected , QRef , MsgIds } | Actions0 ]}
299315 end ;
300316handle_event ({send_drained , _ } = Action , State ) ->
@@ -319,7 +335,7 @@ deliver(Qs0, #delivery{flow = Flow,
319335 Msg = Msg0 # basic_message {id = rabbit_guid :gen ()},
320336 Delivery = Delivery0 # delivery {message = Msg },
321337
322- {MPids , SPids , Qs , Actions } = qpids (Qs0 , Confirm , MsgNo ),
338+ {MPids , SPids , Qs } = qpids (Qs0 , Confirm , MsgNo ),
323339 case Flow of
324340 % % Here we are tracking messages sent by the rabbit_channel
325341 % % process. We are accessing the rabbit_channel process
@@ -333,7 +349,7 @@ deliver(Qs0, #delivery{flow = Flow,
333349 SMsg = {deliver , Delivery , true },
334350 delegate :invoke_no_result (MPids , {gen_server2 , cast , [MMsg ]}),
335351 delegate :invoke_no_result (SPids , {gen_server2 , cast , [SMsg ]}),
336- {Qs , Actions }.
352+ {Qs , [] }.
337353
338354
339355-spec dequeue (NoAck :: boolean (), LimiterPid :: pid (),
@@ -381,14 +397,16 @@ purge(Q) when ?is_amqqueue(Q) ->
381397
382398qpids (Qs , Confirm , MsgNo ) ->
383399 lists :foldl (
384- fun ({Q , S0 }, {MPidAcc , SPidAcc , Qs0 , Actions0 }) ->
400+ fun ({Q , S0 }, {MPidAcc , SPidAcc , Qs0 }) ->
385401 QPid = amqqueue :get_pid (Q ),
386402 SPids = amqqueue :get_slave_pids (Q ),
387403 QRef = amqqueue :get_name (Q ),
388- Actions = [{monitor , QPid , QRef }
389- | [{monitor , P , QRef } || P <- SPids ]] ++ Actions0 ,
404+ S1 = ensure_monitor (QPid , QRef , S0 ),
405+ S2 = lists :foldl (fun (SPid , Acc ) ->
406+ ensure_monitor (SPid , QRef , Acc )
407+ end , S1 , SPids ),
390408 % % confirm record only if necessary
391- S = case S0 of
409+ S = case S2 of
392410 #? STATE {unconfirmed = U0 } ->
393411 Rec = [QPid | SPids ],
394412 U = case Confirm of
@@ -397,14 +415,14 @@ qpids(Qs, Confirm, MsgNo) ->
397415 true ->
398416 U0 #{MsgNo => # msg_status {pending = Rec }}
399417 end ,
400- S0 #? STATE {pid = QPid ,
418+ S2 #? STATE {pid = QPid ,
401419 unconfirmed = U };
402420 stateless ->
403- S0
421+ S2
404422 end ,
405423 {[QPid | MPidAcc ], SPidAcc ++ SPids ,
406- [{Q , S } | Qs0 ], Actions }
407- end , {[], [], [], [] }, Qs ).
424+ [{Q , S } | Qs0 ]}
425+ end , {[], [], []}, Qs ).
408426
409427% % internal-ish
410428-spec wait_for_promoted_or_stopped (amqqueue :amqqueue ()) ->
@@ -521,59 +539,43 @@ update_msg_status(confirm, Pid, #msg_status{pending = P,
521539update_msg_status (down , Pid , # msg_status {pending = P } = S ) ->
522540 S # msg_status {pending = lists :delete (Pid , P )}.
523541
542+ ensure_monitor (_ , _ , State = stateless ) ->
543+ State ;
544+ ensure_monitor (Pid , _ , State = #? STATE {monitored = Monitored })
545+ when is_map_key (Pid , Monitored ) ->
546+ State ;
547+ ensure_monitor (Pid , QName , State = #? STATE {monitored = Monitored }) ->
548+ _ = erlang :monitor (process , Pid , [{tag , {'DOWN' , QName }}]),
549+ State #? STATE {monitored = Monitored #{Pid => ok }}.
550+
524551% % part of channel <-> queue api
525552confirm_to_sender (Pid , QName , MsgSeqNos ) ->
526- % % the stream queue included the queue type refactoring and thus requires
527- % % a different message format
528- case rabbit_queue_type :is_supported () of
529- true ->
530- gen_server :cast (Pid ,
531- {queue_event , QName ,
532- {confirm , MsgSeqNos , self ()}});
533- false ->
534- gen_server2 :cast (Pid , {confirm , MsgSeqNos , self ()})
535- end .
553+ Msg = case rabbit_feature_flags :is_enabled (no_queue_name_in_classic_queue_client ) of
554+ true ->
555+ {confirm , MsgSeqNos , self (), QName };
556+ false ->
557+ {confirm , MsgSeqNos , self ()}
558+ end ,
559+ gen_server :cast (Pid , {queue_event , QName , Msg }).
536560
537561send_rejection (Pid , QName , MsgSeqNo ) ->
538- case rabbit_queue_type : is_supported ( ) of
539- true ->
540- gen_server : cast ( Pid , { queue_event , QName ,
541- { reject_publish , MsgSeqNo , self ()}});
542- false ->
543- gen_server2 : cast ( Pid , { reject_publish , MsgSeqNo , self ()})
544- end .
562+ Msg = case rabbit_feature_flags : is_enabled ( no_queue_name_in_classic_queue_client ) of
563+ true ->
564+ { reject_publish , MsgSeqNo , self (), QName };
565+ false ->
566+ { reject_publish , MsgSeqNo , self ()}
567+ end ,
568+ gen_server : cast ( Pid , { queue_event , QName , Msg }) .
545569
546570deliver_to_consumer (Pid , QName , CTag , AckRequired , Message ) ->
547- case has_classic_queue_type_delivery_support () of
548- true ->
549- Deliver = {deliver , CTag , AckRequired , [Message ]},
550- Evt = {queue_event , QName , Deliver },
551- gen_server :cast (Pid , Evt );
552- false ->
553- Deliver = {deliver , CTag , AckRequired , Message },
554- gen_server2 :cast (Pid , Deliver )
555- end .
571+ Deliver = {deliver , CTag , AckRequired , [Message ]},
572+ Evt = {queue_event , QName , Deliver },
573+ gen_server :cast (Pid , Evt ).
556574
557575send_drained (Pid , QName , CTagCredits ) ->
558- case has_classic_queue_type_delivery_support () of
559- true ->
560- gen_server :cast (Pid , {queue_event , QName ,
561- {send_drained , CTagCredits }});
562- false ->
563- gen_server2 :cast (Pid , {send_drained , CTagCredits })
564- end .
576+ gen_server :cast (Pid , {queue_event , QName ,
577+ {send_drained , CTagCredits }}).
565578
566579send_credit_reply (Pid , QName , Len ) when is_integer (Len ) ->
567- case rabbit_queue_type :is_supported () of
568- true ->
569- gen_server :cast (Pid , {queue_event , QName ,
570- {send_credit_reply , Len }});
571- false ->
572- gen_server2 :cast (Pid , {send_credit_reply , Len })
573- end .
574-
575- has_classic_queue_type_delivery_support () ->
576- % % some queue_events were missed in the initial queue_type implementation
577- % % this feature flag enables those and completes the initial queue type
578- % % API for classic queues
579- rabbit_feature_flags :is_enabled (classic_queue_type_delivery_support ).
580+ gen_server :cast (Pid , {queue_event , QName ,
581+ {send_credit_reply , Len }}).
0 commit comments