Skip to content

Commit 3bb3273

Browse files
committed
Fix channel crash when draining AMQP 1.0 credits from classic queue
Classic queues used a different format for the `{send_drained, _}` queue type action which was missed originally. This change handles both formats in the channel for backwards compatibility as well as changes classic queues to conform to the same format when sending the queue event. Whilst adding tests for this in the amqp10 plugin another issue around the amqp10_client and filters was discovered and this commit also includes improvements in this area. Such as more leninet support of source filters.
1 parent 149f3e5 commit 3bb3273

File tree

7 files changed

+137
-60
lines changed

7 files changed

+137
-60
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
269269
snd_settle_mode(), terminus_durability(), filter(),
270270
properties()) ->
271271
{ok, link_ref()}.
272-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
272+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
273+
when is_pid(Session) andalso
274+
is_binary(Name) andalso
275+
is_binary(Source) andalso
276+
(SettleMode == unsettled orelse
277+
SettleMode == settled orelse
278+
SettleMode == mixed) andalso
279+
is_atom(Durability) andalso
280+
is_map(Filter) andalso
281+
is_map(Properties) ->
273282
AttachArgs = #{name => Name,
274283
role => {receiver, #{address => Source,
275284
durable => Durability}, self()},

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -648,41 +648,51 @@ translate_terminus_durability(none) -> 0;
648648
translate_terminus_durability(configuration) -> 1;
649649
translate_terminus_durability(unsettled_state) -> 2.
650650

651-
translate_filters(Filters) when is_map(Filters) andalso map_size(Filters) =< 0 -> undefined;
652-
translate_filters(Filters) when is_map(Filters) -> {
653-
map,
654-
maps:fold(
655-
fun(<<"apache.org:legacy-amqp-direct-binding:string">> = K, V, Acc) when is_binary(V) ->
656-
[{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
657-
(<<"apache.org:legacy-amqp-topic-binding:string">> = K, V, Acc) when is_binary(V) ->
658-
[{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
659-
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
660-
[{{symbol, K}, {described, {symbol, K}, translate_legacy_amqp_headers_binding(V)}} | Acc];
661-
(<<"apache.org:no-local-filter:list">> = K, V, Acc) when is_list(V) ->
662-
[{{symbol, K}, {described, {symbol, K}, lists:map(fun(Id) -> {utf8, Id} end, V)}} | Acc];
663-
(<<"apache.org:selector-filter:string">> = K, V, Acc) when is_binary(V) ->
664-
[{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]
665-
end,
666-
[],
667-
Filters)
668-
}.
651+
translate_filters(Filters)
652+
when is_map(Filters) andalso
653+
map_size(Filters) == 0 ->
654+
undefined;
655+
translate_filters(Filters)
656+
when is_map(Filters) ->
657+
{map,
658+
maps:fold(
659+
fun
660+
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
661+
%% special case conversion
662+
Key = sym(K),
663+
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
664+
(K, V, Acc) when is_binary(K) ->
665+
%% try treat any filter value generically
666+
Key = sym(K),
667+
Value = filter_value_type(V),
668+
[{Key, {described, Key, Value}} | Acc]
669+
end, [], Filters)}.
670+
671+
filter_value_type(V) when is_binary(V) ->
672+
%% this is clearly not always correct
673+
{utf8, V};
674+
filter_value_type(V)
675+
when is_integer(V) andalso V >= 0 ->
676+
{uint, V};
677+
filter_value_type(VList) when is_list(VList) ->
678+
[filter_value_type(V) || V <- VList];
679+
filter_value_type({T, _} = V) when is_atom(T) ->
680+
%% looks like an already tagged type, just pass it through
681+
V.
669682

670683
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
671-
translate_legacy_amqp_headers_binding(LegacyHeaders) -> {
672-
map,
673-
maps:fold(
674-
fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
675-
[{{utf8, K}, {utf8, V}} | Acc];
676-
(<<"x-match">> = K, <<"all">> = V, Acc) ->
677-
[{{utf8, K}, {utf8, V}} | Acc];
678-
(<<"x-",_/binary>>, _, Acc) ->
679-
Acc;
680-
(K, V, Acc) ->
681-
[{{utf8, K}, {utf8, V}} | Acc]
682-
end,
683-
[],
684-
LegacyHeaders)
685-
}.
684+
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
685+
{map,
686+
maps:fold(
687+
fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
688+
[{{utf8, K}, {utf8, V}} | Acc];
689+
(<<"x-match">> = K, <<"all">> = V, Acc) ->
690+
[{{utf8, K}, {utf8, V}} | Acc];
691+
(<<"x-", _/binary>>, _, Acc) ->
692+
Acc;
693+
(K, V, Acc) ->
694+
[{{utf8, K}, filter_value_type(V)} | Acc]
695+
end, [], LegacyHeaders)}.
686696

687697
send_detach(Send, {detach, OutHandle}, _From, State = #state{links = Links}) ->
688698
case Links of
@@ -1011,8 +1021,10 @@ wrap_map_value(V) when is_atom(V) ->
10111021
utf8(atom_to_list(V)).
10121022

10131023
utf8(V) -> amqp10_client_types:utf8(V).
1024+
1025+
sym(B) when is_binary(B) -> {symbol, B};
10141026
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
1015-
sym(B) when is_binary(B) -> {symbol, B}.
1027+
sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
10161028

10171029
-ifdef(TEST).
10181030
-include_lib("eunit/include/eunit.hrl").

deps/rabbit/src/rabbit_channel.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2831,16 +2831,7 @@ evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
28312831
handle_queue_actions(Actions, #ch{} = State0) ->
28322832
WriterPid = State0#ch.cfg#conf.writer_pid,
28332833
lists:foldl(
2834-
fun ({send_credit_reply, Avail}, S0) ->
2835-
ok = rabbit_writer:send_command(WriterPid,
2836-
#'basic.credit_ok'{available = Avail}),
2837-
S0;
2838-
({send_drained, {CTag, Credit}}, S0) ->
2839-
ok = rabbit_writer:send_command(
2840-
WriterPid,
2841-
#'basic.credit_drained'{consumer_tag = CTag,
2842-
credit_drained = Credit}),
2843-
S0;
2834+
fun
28442835
({settled, QRef, MsgSeqNos}, S0) ->
28452836
confirm(MsgSeqNos, QRef, S0);
28462837
({rejected, _QRef, MsgSeqNos}, S0) ->
@@ -2865,9 +2856,28 @@ handle_queue_actions(Actions, #ch{} = State0) ->
28652856
S0;
28662857
({unblock, QName}, S0) ->
28672858
credit_flow:unblock(QName),
2859+
S0;
2860+
({send_credit_reply, Avail}, S0) ->
2861+
ok = rabbit_writer:send_command(WriterPid,
2862+
#'basic.credit_ok'{available = Avail}),
2863+
S0;
2864+
({send_drained, {CTag, Credit}}, S0) ->
2865+
ok = send_drained_to_writer(WriterPid, CTag, Credit),
2866+
S0;
2867+
({send_drained, CTagCredits}, S0) when is_list(CTagCredits) ->
2868+
%% this is the backwards compatible option that classic queues
2869+
%% used to send, this can be removed in 4.0
2870+
[ok = send_drained_to_writer(WriterPid, CTag, Credit)
2871+
|| {CTag, Credit} <- CTagCredits],
28682872
S0
28692873
end, State0, Actions).
28702874

2875+
send_drained_to_writer(WriterPid, CTag, Credit) ->
2876+
ok = rabbit_writer:send_command(
2877+
WriterPid,
2878+
#'basic.credit_drained'{consumer_tag = CTag,
2879+
credit_drained = Credit}).
2880+
28712881
maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) ->
28722882
State0;
28732883
maybe_increase_global_publishers(State0) ->

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,14 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
551551
Evt = {queue_event, QName, Deliver},
552552
gen_server:cast(Pid, Evt).
553553

554-
send_drained(Pid, QName, CTagCredits) ->
554+
send_drained(Pid, QName, CTagCredits) when is_list(CTagCredits) ->
555+
[_ = gen_server:cast(Pid, {queue_event, QName,
556+
{send_drained, CTagCredit}})
557+
|| CTagCredit <- CTagCredits],
558+
ok;
559+
send_drained(Pid, QName, CTagCredit) when is_tuple(CTagCredit) ->
555560
gen_server:cast(Pid, {queue_event, QName,
556-
{send_drained, CTagCredits}}).
561+
{send_drained, CTagCredit}}).
557562

558563
send_credit_reply(Pid, QName, Len) when is_integer(Len) ->
559564
gen_server:cast(Pid, {queue_event, QName,

deps/rabbit/src/rabbit_queue_consumers.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
193193
end.
194194

195195
-spec send_drained(rabbit_amqqueue:name()) -> 'ok'.
196-
197-
send_drained(QName) -> [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
198-
ok.
196+
send_drained(QName) ->
197+
[update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
198+
ok.
199199

200200
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
201201
rabbit_amqqueue:name(), state(), boolean(),
@@ -528,15 +528,15 @@ send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
528528
case rabbit_limiter:drained(Limiter) of
529529
{[], Limiter} -> C;
530530
{CTagCredits, Limiter2} ->
531-
rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
531+
ok = rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
532532
C#cr{limiter = Limiter2}
533533
end.
534534

535535
credit_and_drain(QName, C = #cr{ch_pid = ChPid, limiter = Limiter},
536536
CTag, Credit, Mode, IsEmpty) ->
537537
case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
538538
{true, Limiter1} ->
539-
rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
539+
ok = rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
540540
C#cr{limiter = Limiter1};
541541
{false, Limiter1} -> C#cr{limiter = Limiter1}
542542
end.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ transferred(DeliveryTag, Channel,
250250

251251
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
252252
Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
253-
case lists:keyfind(Key, 1, KVList) of
253+
case keyfind_unpack_described(Key, KVList) of
254254
{_, {timestamp, Ts}} ->
255255
[{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
256256
{_, {utf8, Spec}} ->
@@ -262,3 +262,17 @@ source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
262262
end;
263263
source_filters_to_consumer_args(_Source) ->
264264
[].
265+
266+
keyfind_unpack_described(Key, KvList) ->
267+
%% filterset values _should_ be described values
268+
%% they aren't always however for historical reasons so we need this bit of
269+
%% code to return a plain value for the given filter key
270+
case lists:keyfind(Key, 1, KvList) of
271+
{Key, {described, Key, Value}} ->
272+
{Key, Value};
273+
{Key, _} = Kv ->
274+
Kv;
275+
false ->
276+
false
277+
end.
278+

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ groups() ->
2424
[
2525
{tests, [], [
2626
reliable_send_receive_with_outcomes,
27+
roundtrip_classic_queue_with_drain,
2728
roundtrip_quorum_queue_with_drain,
29+
roundtrip_stream_queue_with_drain,
2830
message_headers_conversion
2931
]},
3032
{metrics, [], [
@@ -149,16 +151,28 @@ reliable_send_receive(Config, Outcome) ->
149151

150152
ok.
151153

154+
roundtrip_classic_queue_with_drain(Config) ->
155+
QName = atom_to_binary(?FUNCTION_NAME, utf8),
156+
roundtrip_queue_with_drain(Config, <<"classic">>, QName).
157+
152158
roundtrip_quorum_queue_with_drain(Config) ->
159+
QName = atom_to_binary(?FUNCTION_NAME, utf8),
160+
roundtrip_queue_with_drain(Config, <<"quorum">>, QName).
161+
162+
roundtrip_stream_queue_with_drain(Config) ->
163+
QName = atom_to_binary(?FUNCTION_NAME, utf8),
164+
roundtrip_queue_with_drain(Config, <<"stream">>, QName).
165+
166+
roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) ->
153167
Host = ?config(rmq_hostname, Config),
154168
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
155-
QName = atom_to_binary(?FUNCTION_NAME, utf8),
156169
Address = <<"/amq/queue/", QName/binary>>,
157-
%% declare a quorum queue
170+
%% declare a queue
158171
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
172+
Args = [{<<"x-queue-type">>, longstr, QueueType}],
159173
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
160174
durable = true,
161-
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
175+
arguments = Args}),
162176
% create a configuration map
163177
OpnConf = #{address => Host,
164178
port => Port,
@@ -182,16 +196,29 @@ roundtrip_quorum_queue_with_drain(Config) ->
182196

183197
flush("pre-receive"),
184198
% create a receiver link
185-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
186-
<<"test-receiver">>,
187-
Address),
199+
200+
TerminusDurability = none,
201+
Filter = case QueueType of
202+
<<"stream">> ->
203+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>};
204+
_ ->
205+
#{}
206+
end,
207+
Properties = #{},
208+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
209+
Address, unsettled,
210+
TerminusDurability,
211+
Filter, Properties),
188212

189213
% grant credit and drain
190214
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),
191215

192216
% wait for a delivery
193217
receive
194-
{amqp10_msg, Receiver, _InMsg} -> ok
218+
{amqp10_msg, Receiver, InMsg} ->
219+
ok = amqp10_client:accept_msg(Receiver, InMsg),
220+
wait_for_accepts(1),
221+
ok
195222
after 2000 ->
196223
exit(delivery_timeout)
197224
end,

0 commit comments

Comments
 (0)