Skip to content

Commit 2593ee9

Browse files
committed
Make mandatory based on route data only
Instead of waiting for a mandatory_received message from the queue the mandatory result is calculated in the channel based on the routing result only. This may seem like a weakening of the mandatory semantics but considering that the mandatory_received message is returned _before_ the message is enqueued and/or persisted in the queue it doesn't actually open up any further failure scenarios. [#163222515]
1 parent 4563228 commit 2593ee9

File tree

3 files changed

+18
-54
lines changed

3 files changed

+18
-54
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
%%
1616

1717
-module(rabbit_amqqueue_process).
18-
-include("rabbit.hrl").
19-
-include("rabbit_framing.hrl").
18+
-include_lib("rabbit_common/include/rabbit.hrl").
19+
-include_lib("rabbit_common/include/rabbit_framing.hrl").
2020

2121
-behaviour(gen_server2).
2222

@@ -604,13 +604,6 @@ send_or_record_confirm(#delivery{confirm = true,
604604
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
605605
{immediately, State}.
606606

607-
send_mandatory(#delivery{mandatory = false}) ->
608-
ok;
609-
send_mandatory(#delivery{mandatory = true,
610-
sender = SenderPid,
611-
msg_seq_no = MsgSeqNo}) ->
612-
gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
613-
614607
discard(#delivery{confirm = Confirm,
615608
sender = SenderPid,
616609
flow = Flow,
@@ -674,7 +667,6 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
674667
State = #q{overflow = Overflow,
675668
backing_queue = BQ,
676669
backing_queue_state = BQS}) ->
677-
send_mandatory(Delivery), %% must do this before confirms
678670
case {will_overflow(Delivery, State), Overflow} of
679671
{true, 'reject-publish'} ->
680672
%% Drop publish and nack to publisher

src/rabbit_channel.erl

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
%% * Keeping track of consumers
3535
%% * Keeping track of unacknowledged deliveries to consumers
3636
%% * Keeping track of publisher confirms
37-
%% * Keeping track of mandatory message routing confirmations
38-
%% and returns
3937
%% * Transaction management
4038
%% * Authorisation (enforcing permissions)
4139
%% * Publishing trace events if tracing is enabled
@@ -143,9 +141,6 @@
143141
%% a list of tags for published messages that were
144142
%% rejected but are yet to be sent to the client
145143
rejected,
146-
%% a dtree used to track oustanding notifications
147-
%% for messages published as mandatory
148-
mandatory,
149144
%% same as capabilities in the reader
150145
capabilities,
151146
%% tracing exchange resource if tracing is enabled,
@@ -469,7 +464,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
469464
unconfirmed = dtree:empty(),
470465
rejected = [],
471466
confirmed = [],
472-
mandatory = dtree:empty(),
473467
capabilities = Capabilities,
474468
trace_state = rabbit_trace:init(VHost),
475469
consumer_prefetch = Prefetch,
@@ -502,7 +496,6 @@ prioritise_cast(Msg, _Len, _State) ->
502496
case Msg of
503497
{confirm, _MsgSeqNos, _QPid} -> 5;
504498
{reject_publish, _MsgSeqNos, _QPid} -> 5;
505-
{mandatory_received, _MsgSeqNo, _QPid} -> 5;
506499
_ -> 0
507500
end.
508501

@@ -637,10 +630,6 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
637630
|| {ConsumerTag, CreditDrained} <- CTagCredit],
638631
noreply(State);
639632

640-
handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
641-
%% NB: don't call noreply/1 since we don't want to send confirms.
642-
noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
643-
644633
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
645634
%% It does not matter which queue rejected the message,
646635
%% if any queue rejected it - it should not be confirmed.
@@ -1707,17 +1696,13 @@ track_delivering_queue(NoAck, QPid, QName,
17071696
false -> sets:add_element(QRef, DQ)
17081697
end}.
17091698

1710-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
1711-
mandatory = Mand})
1699+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC})
17121700
when ?IS_CLASSIC(QPid) ->
1713-
{MMsgs, Mand1} = dtree:take(QPid, Mand),
1714-
[basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
1715-
State1 = State#ch{mandatory = Mand1},
17161701
case rabbit_misc:is_abnormal_exit(Reason) of
17171702
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
1718-
record_rejects(MXs, State1#ch{unconfirmed = UC1});
1703+
record_rejects(MXs, State#ch{unconfirmed = UC1});
17191704
false -> {MXs, UC1} = dtree:take(QPid, UC),
1720-
record_confirms(MXs, State1#ch{unconfirmed = UC1})
1705+
record_confirms(MXs, State#ch{unconfirmed = UC1})
17211706

17221707
end;
17231708
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
@@ -1972,7 +1957,7 @@ foreach_per_queue(F, UAL, Acc) ->
19721957

19731958
consumer_queue_refs(Consumers) ->
19741959
lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
1975-
<- maps:to_list(Consumers)]).
1960+
<- maps:to_list(Consumers)]).
19761961

19771962
%% tell the limiter about the number of acks that have been received
19781963
%% for messages delivered to subscribed consumers, but not acks for
@@ -2038,11 +2023,11 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
20382023
queue_monitors = QMons1},
20392024
%% NB: the order here is important since basic.returns must be
20402025
%% sent before confirms.
2041-
State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
2042-
Message, State1),
2043-
State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
2044-
XName, State2),
2045-
case rabbit_event:stats_level(State3, #ch.stats_timer) of
2026+
ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs,
2027+
Message, State1),
2028+
State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
2029+
XName, State1),
2030+
case rabbit_event:stats_level(State, #ch.stats_timer) of
20462031
fine ->
20472032
?INCR_STATS(exchange_stats, XName, 1, publish),
20482033
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
@@ -2051,16 +2036,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
20512036
_ ->
20522037
ok
20532038
end,
2054-
State3#ch{queue_states = QueueStates}.
2039+
State2#ch{queue_states = QueueStates}.
20552040

2056-
process_routing_mandatory(false, _, _, _, State) ->
2057-
State;
2058-
process_routing_mandatory(true, [], _, Msg, State) ->
2041+
process_routing_mandatory(true, [], Msg, State) ->
20592042
ok = basic_return(Msg, State, no_route),
2060-
State;
2061-
process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
2062-
State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
2063-
State#ch.mandatory)}.
2043+
ok;
2044+
process_routing_mandatory(_, _, _, _) ->
2045+
ok.
20642046

20652047
process_routing_confirm(false, _, _, _, State) ->
20662048
State;

src/rabbit_mirror_queue_slave.erl

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -542,13 +542,6 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
542542
backing_queue_state = BQS }) ->
543543
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
544544

545-
send_mandatory(#delivery{mandatory = false}) ->
546-
ok;
547-
send_mandatory(#delivery{mandatory = true,
548-
sender = SenderPid,
549-
msg_seq_no = MsgSeqNo}) ->
550-
gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
551-
552545
send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
553546
MS;
554547
send_or_record_confirm(published, #delivery { sender = ChPid,
@@ -707,13 +700,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
707700
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
708701
MTC).
709702

710-
%% We reset mandatory to false here because we will have sent the
711-
%% mandatory_received already as soon as we got the message. We also
712-
%% need to send an ack for these messages since the channel is waiting
703+
%% We need to send an ack for these messages since the channel is waiting
713704
%% for one for the via-GM case and we will not now receive one.
714705
promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
715706
maybe_flow_ack(Sender, Flow),
716-
Delivery#delivery{mandatory = false}.
707+
Delivery.
717708

718709
noreply(State) ->
719710
{NewState, Timeout} = next_state(State),
@@ -832,7 +823,6 @@ maybe_enqueue_message(
832823
Delivery = #delivery { message = #basic_message { id = MsgId },
833824
sender = ChPid },
834825
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
835-
send_mandatory(Delivery), %% must do this before confirms
836826
State1 = ensure_monitoring(ChPid, State),
837827
%% We will never see {published, ChPid, MsgSeqNo} here.
838828
case maps:find(MsgId, MS) of

0 commit comments

Comments
 (0)