Skip to content

Commit d78e14a

Browse files
committed
Allow #amqp_error{} responses in channel interceptors
1 parent 0dc1501 commit d78e14a

File tree

3 files changed

+69
-4
lines changed

3 files changed

+69
-4
lines changed

deps/rabbit/src/rabbit_channel_interceptor.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
-callback init(rabbit_channel:channel()) -> interceptor_state().
3030
-callback intercept(original_method(), original_content(),
3131
interceptor_state()) ->
32-
{processed_method(), processed_content()} |
32+
{processed_method(), processed_content()} | rabbit_types:amqp_error() |
3333
rabbit_misc:channel_or_connection_exit().
3434
-callback applies_to() -> list(method_name()).
3535

@@ -88,7 +88,9 @@ validate_response(Mod, M1, C1, R = {M2, C2}) ->
8888
"content iff content is provided but "
8989
"content in = ~p; content out = ~p",
9090
[Mod, C1, C2])
91-
end.
91+
end;
92+
validate_response(_Mod, _M1, _C1, AMQPError = #amqp_error{}) ->
93+
internal_error(AMQPError).
9294

9395
validate_method(M, M2) ->
9496
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
@@ -98,6 +100,12 @@ validate_content(#content{}, #content{}) -> true;
98100
validate_content(_, _) -> false.
99101

100102
%% keep dialyzer happy
101-
-spec internal_error(string(), [any()]) -> no_return().
103+
-spec internal_error(rabbit_types:amqp_error()) ->
104+
rabbit_misc:channel_or_connection_exit().
105+
internal_error(AMQPError = #amqp_error{}) ->
106+
rabbit_misc:protocol_error(AMQPError).
107+
108+
-spec internal_error(string(), [any()]) ->
109+
rabbit_misc:channel_or_connection_exit().
102110
internal_error(Format, Args) ->
103111
rabbit_misc:protocol_error(internal_error, Format, Args).

deps/rabbit/test/channel_interceptor_SUITE.erl

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-include_lib("common_test/include/ct.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
1213

1314
-compile(export_all).
1415

@@ -21,6 +22,7 @@ groups() ->
2122
[
2223
{non_parallel_tests, [], [
2324
register_interceptor,
25+
register_interceptor_failing_with_amqp_error,
2426
register_failing_interceptors
2527
]}
2628
].
@@ -94,6 +96,55 @@ register_interceptor1(Config, Interceptor) ->
9496
check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>),
9597
passed.
9698

99+
register_interceptor_failing_with_amqp_error(Config) ->
100+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
101+
?MODULE, register_interceptor_failing_with_amqp_error1,
102+
[Config, dummy_interceptor]).
103+
104+
register_interceptor_failing_with_amqp_error1(Config, Interceptor) ->
105+
PredefinedChannels = rabbit_channel:list(),
106+
107+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0),
108+
109+
[ChannelProc] = rabbit_channel:list() -- PredefinedChannels,
110+
111+
[{interceptors, []}] = rabbit_channel:info(ChannelProc, [interceptors]),
112+
113+
ok = rabbit_registry:register(channel_interceptor,
114+
<<"dummy interceptor">>,
115+
Interceptor),
116+
[{interceptors, [{Interceptor, undefined}]}] =
117+
rabbit_channel:info(ChannelProc, [interceptors]),
118+
119+
Q1 = <<"succeeding-q">>,
120+
#'queue.declare_ok'{} =
121+
amqp_channel:call(Ch1, #'queue.declare'{queue = Q1}),
122+
123+
Q2 = <<"failing-q">>,
124+
try
125+
amqp_channel:call(Ch1, #'queue.declare'{queue = Q2})
126+
catch
127+
_:Reason ->
128+
?assertMatch(
129+
{{shutdown, {_, _, <<"PRECONDITION_FAILED - operation not allowed">>}}, _},
130+
Reason)
131+
end,
132+
133+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
134+
[ChannelProc1] = rabbit_channel:list() -- PredefinedChannels,
135+
136+
ok = rabbit_registry:unregister(channel_interceptor,
137+
<<"dummy interceptor">>),
138+
[{interceptors, []}] = rabbit_channel:info(ChannelProc1, [interceptors]),
139+
140+
#'queue.declare_ok'{} =
141+
amqp_channel:call(Ch2, #'queue.declare'{queue = Q2}),
142+
143+
#'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q1}),
144+
#'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q2}),
145+
146+
passed.
147+
97148
register_failing_interceptors(Config) ->
98149
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
99150
?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).

deps/rabbit/test/dummy_interceptor.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@ intercept(#'basic.publish'{} = Method, Content, _IState) ->
1919
Content2 = Content#content{payload_fragments_rev = []},
2020
{Method, Content2};
2121

22+
%% Use 'queue.declare' to test #amqp_error{} handling
23+
intercept(#'queue.declare'{queue = <<"failing-q">>}, _Content, _IState) ->
24+
rabbit_misc:amqp_error(
25+
'precondition_failed', "operation not allowed", [],
26+
'queue.declare');
27+
2228
intercept(Method, Content, _VHost) ->
2329
{Method, Content}.
2430

2531
applies_to() ->
26-
['basic.publish'].
32+
['basic.publish', 'queue.declare'].

0 commit comments

Comments
 (0)