Skip to content

Commit 84cf22a

Browse files
Merge pull request #1988 from rabbitmq/rabbitmq-server-1904
Track messages that were not routed anywhere and also not published as mandatory
2 parents 0d67fc1 + a359b43 commit 84cf22a

File tree

4 files changed

+16
-6
lines changed

4 files changed

+16
-6
lines changed

src/rabbit_channel.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2111,8 +2111,9 @@ notify_limiter(Limiter, Acked) ->
21112111
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
21122112
confirm = false,
21132113
mandatory = false},
2114-
[]}, State) -> %% optimisation
2114+
_RoutedToQs = []}, State) -> %% optimisation
21152115
?INCR_STATS(exchange_stats, XName, 1, publish, State),
2116+
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
21162117
State;
21172118
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
21182119
exchange_name = XName},
@@ -2165,9 +2166,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
21652166
end,
21662167
State2#ch{queue_states = QueueStates}.
21672168

2168-
process_routing_mandatory(true, [], Msg, State) ->
2169+
process_routing_mandatory(_Mandatory = true,
2170+
_RoutedToQs = [],
2171+
Msg, State) ->
21692172
ok = basic_return(Msg, State, no_route),
21702173
ok;
2174+
process_routing_mandatory(_Mandatory = false,
2175+
_RoutedToQs = [],
2176+
#basic_message{exchange_name = ExchangeName}, State) ->
2177+
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
2178+
ok;
21712179
process_routing_mandatory(_, _, _, _) ->
21722180
ok.
21732181

src/rabbit_core_metrics_gc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ gc_process_and_entity(Table, GbSet) ->
157157
ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _, _}, none)
158158
when Table == channel_queue_metrics ->
159159
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
160-
({{Pid, Id} = Key, _, _, _, _}, none)
160+
({{Pid, Id} = Key, _, _, _, _, _}, none)
161161
when Table == channel_exchange_metrics ->
162162
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
163163
({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)

test/rabbit_core_metrics_gc_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ channel_metrics(Config) ->
177177
amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}),
178178
amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>},
179179
#amqp_msg{payload = <<"hello">>}),
180+
amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"won't route $¢% anywhere">>},
181+
#amqp_msg{payload = <<"hello">>}),
180182
{#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>,
181183
no_ack=true}),
182184
timer:sleep(150),

test/unit_inbroker_non_parallel_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ channel_statistics1(_Config) ->
510510
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 0}] = ets:lookup(
511511
channel_queue_metrics,
512512
{Ch, QRes}),
513-
[{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
513+
[{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
514514
channel_exchange_metrics,
515515
{Ch, X}),
516516
[{{Ch, {QRes, X}}, 1, 0}] = ets:lookup(
@@ -525,7 +525,7 @@ channel_statistics1(_Config) ->
525525
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 1}] = ets:lookup(
526526
channel_queue_metrics,
527527
{Ch, QRes}),
528-
[{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
528+
[{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
529529
channel_exchange_metrics,
530530
{Ch, X}),
531531
[{{Ch, {QRes, X}}, 1, 1}] = ets:lookup(
@@ -538,7 +538,7 @@ channel_statistics1(_Config) ->
538538
force_metric_gc(),
539539
Check4 = fun() ->
540540
[] = ets:lookup(channel_queue_metrics, {Ch, QRes}),
541-
[{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
541+
[{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
542542
channel_exchange_metrics,
543543
{Ch, X}),
544544
[] = ets:lookup(channel_queue_exchange_metrics,

0 commit comments

Comments
 (0)