Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 96 additions & 24 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
-export([rebalance/3]).
-export([collect_info_all/2]).

-export([is_policy_applicable/2]).
-export([is_policy_applicable/2, declare_args/0]).
-export([is_server_named_allowed/1]).

-export([check_max_age/1]).
Expand Down Expand Up @@ -793,50 +793,85 @@ consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end.
false -> {error, rabbit_misc:format("expected integer, got ~p", [Type])}
end;
check_int_arg(Val, _) when is_integer(Val) ->
ok;
check_int_arg(_Val, _) ->
{error, {unacceptable_type, "expected integer"}}.

check_bool_arg({bool, _}, _) -> ok;
check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
check_bool_arg(true, _) -> ok;
check_bool_arg(false, _) -> ok;
check_bool_arg(_Val, _) -> {error, {unacceptable_type, "expected boolean"}}.

check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
Error -> Error
end;
check_non_neg_int_arg(Val, Args) ->
case check_int_arg(Val, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
Error -> Error
end.

check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end;
check_expires_arg(Val, Args) ->
case check_int_arg(Val, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end.

check_message_ttl_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end;
check_message_ttl_arg(Val, Args) ->
case check_int_arg(Val, Args) of
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end.

check_max_priority_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
ok -> {error, {max_value_exceeded, Val}};
Error -> Error
end;
check_max_priority_arg(Val, Args) ->
case check_non_neg_int_arg(Val, Args) of
ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
ok -> {error, {max_value_exceeded, Val}};
Error -> Error
end.

check_single_active_consumer_arg({Type, Val}, Args) ->
case check_bool_arg({Type, Val}, Args) of
ok -> ok;
Error -> Error
end.
check_bool_arg({Type, Val}, Args);
check_single_active_consumer_arg(Val, Args) ->
check_bool_arg(Val, Args).

check_initial_cluster_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> ok;
Error -> Error
end;
check_initial_cluster_size_arg(Val, Args) ->
case check_non_neg_int_arg(Val, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> ok;
Error -> Error
end.

check_max_age_arg({longstr, Val}, _Args) ->
Expand Down Expand Up @@ -884,51 +919,88 @@ unit_value_in_ms("s") ->
%% Note that the validity of x-dead-letter-exchange is already verified
%% by rabbit_channel's queue.declare handler.
check_dlxname_arg({longstr, _}, _) -> ok;
check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
check_dlxname_arg(Val, _) when is_list(Val) or is_binary(Val) -> ok;
check_dlxname_arg(_Val, _) -> {error, {unacceptable_type, "expected a string (valid exchange name)"}}.

check_dlxrk_arg({longstr, _}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_dlxrk_arg(Val, Args) when is_binary(Val) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
check_dlxrk_arg(_Val, _Args) ->
{error, {unacceptable_type, "expected a string"}}.

-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, [<<"drop-head">>,
<<"reject-publish">>,
<<"reject-publish-dlx">>]) of
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
true -> ok;
false -> {error, invalid_overflow}
end;
check_overflow({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_overflow(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
true -> ok;
false -> {error, invalid_overflow}
end;
check_overflow(_Val, _Args) ->
{error, invalid_overflow}.

-define(KNOWN_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]).
check_queue_leader_locator_arg({longstr, Val}, _Args) ->
case lists:member(Val, [<<"client-local">>,
<<"random">>,
<<"least-leaders">>]) of
case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of
true -> ok;
false -> {error, invalid_queue_locator_arg}
end;
check_queue_leader_locator_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of
true -> ok;
false -> {error, invalid_queue_locator_arg}
end;
check_queue_leader_locator_arg(_Val, _Args) ->
{error, invalid_queue_locator_arg}.

-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, invalid_queue_mode}
false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])}
end;
check_queue_mode({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_queue_mode(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])}
end;
check_queue_mode(_Val, _Args) ->
{error, invalid_queue_mode}.

-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
check_queue_type({longstr, Val}, _Args) ->
case lists:member(Val, [<<"classic">>, <<"quorum">>, <<"stream">>]) of
case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
true -> ok;
false -> {error, invalid_queue_type}
false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])}
end;
check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_queue_type(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])}
end;
check_queue_type(_Val, _Args) ->
{error, invalid_queue_type}.

-spec list() -> [amqqueue:amqqueue()].

Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/src/rabbit_parameter_validation.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ regex(Name, Term) ->
proplist(Name, Constraints, Term) when is_list(Term) ->
{Results, Remainder}
= lists:foldl(
fun ({Key, Fun, Needed}, {Results0, Term0}) ->
%% if the optional/mandatory flag is not provided in a constraint tuple,
%% assume 'optional'
fun ({Key, Fun}, {Results0, Term0}) ->
case lists:keytake(Key, 1, Term0) of
{value, {Key, Value}, Term1} ->
{[Fun(Key, Value) | Results0],
Term1};
{value, {Key, Type, Value}, Term1} ->
{[Fun(Key, Type, Value) | Results0],
Term1};
false ->
{Results0, Term0}
end;
({Key, Fun, Needed}, {Results0, Term0}) ->
case {lists:keytake(Key, 1, Term0), Needed} of
{{value, {Key, Value}, Term1}, _} ->
{[Fun(Key, Value) | Results0],
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
ack_mode => ack_mode(),
atom() => term()}.

-export_type([state/0, source_config/0, dest_config/0, uri/0]).
-export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0]).

-callback parse(binary(), {source | destination, Conf :: proplists:proplist()}) ->
source_config() | dest_config().
Expand Down
48 changes: 29 additions & 19 deletions deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ amqp10_src_validation(_Def, User) ->

amqp091_src_validation(_Def, User) ->
[
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"src-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue-args">>, fun validate_queue_args/2, optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
%% a deprecated pre-3.7 setting
{<<"delete-after">>, fun validate_delete_after/2, optional},
%% currently used multi-protocol friend name, introduced in 3.7
Expand Down Expand Up @@ -151,6 +152,7 @@ amqp091_dest_validation(_Def, User) ->
{<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue-args">>, fun validate_queue_args/2, optional},
{<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
Expand Down Expand Up @@ -206,6 +208,11 @@ validate_delete_after(Name, Term) ->
{error, "~s should be number, \"never\" or \"queue-length\", actually was "
"~p", [Name, Term]}.

validate_queue_args(Name, Term0) ->
Term = rabbit_data_coercion:to_proplist(Term0),

rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term).

validate_amqp10_map(Name, Terms0) ->
Terms = rabbit_data_coercion:to_proplist(Terms0),
Str = fun rabbit_parameter_validation:binary/2,
Expand Down Expand Up @@ -292,14 +299,15 @@ parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) ->
}.

parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
DestURIs = get_uris(<<"dest-uri">>, Def),
DestX = pget(<<"dest-exchange">>, Def, none),
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestURIs = get_uris(<<"dest-uri">>, Def),
DestX = pget(<<"dest-exchange">>, Def, none),
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
DestDeclFun = fun (Conn, _Ch) ->
case DestQ of
none -> ok;
_ -> ensure_queue(Conn, DestQ)
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end
end,
{X, Key} = case DestQ of
Expand Down Expand Up @@ -371,10 +379,11 @@ parse_amqp10_source(Def) ->
prefetch_count => PrefetchCount}, Headers}.

parse_amqp091_source(Def) ->
SrcURIs = get_uris(<<"src-uri">>, Def),
SrcX = pget(<<"src-exchange">>,Def, none),
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQ = pget(<<"src-queue">>, Def, none),
SrcURIs = get_uris(<<"src-uri">>, Def),
SrcX = pget(<<"src-exchange">>,Def, none),
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQ = pget(<<"src-queue">>, Def, none),
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
{SrcDeclFun, Queue, DestHeaders} =
case SrcQ of
none -> {fun (_Conn, Ch) ->
Expand All @@ -385,7 +394,7 @@ parse_amqp091_source(Def) ->
end, <<>>, [{<<"src-exchange">>, SrcX},
{<<"src-exchange-key">>, SrcXKey}]};
_ -> {fun (Conn, _Ch) ->
ensure_queue(Conn, SrcQ)
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs))
end, SrcQ, [{<<"src-queue">>, SrcQ}]}
end,
DeleteAfter = pget(<<"src-delete-after">>, Def,
Expand Down Expand Up @@ -416,15 +425,16 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
translate_ack_mode(<<"on-publish">>) -> on_publish;
translate_ack_mode(<<"no-ack">>) -> no_ack.

ensure_queue(Conn, Queue) ->
ensure_queue(Conn, Queue, XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true})
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
{ok, Ch2} = amqp_connection:open_channel(Conn),
amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
durable = true}),
amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
durable = true,
arguments = XArgs}),
catch amqp_channel:close(Ch2)

after
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ terminate({shutdown, restart}, State = #state{name = Name}) ->
{terminated, "needed a restart"}),
close_connections(State),
ok;
terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #state{name = Name}) ->
rabbit_log_shovel:error("Shovel ~s is stopping: one of its connections closed "
"with code ~b, reason: ~s",
[human_readable_name(Name), Code, Reason]),
rabbit_shovel_status:report(State#state.name, State#state.type,
{terminated, "needed a restart"}),
close_connections(State),
ok;
terminate(Reason, State = #state{name = Name}) ->
rabbit_log_shovel:error("Shovel ~s is stopping, reason: ~p", [human_readable_name(Name), Reason]),
rabbit_shovel_status:report(State#state.name, State#state.type,
Expand Down
15 changes: 15 additions & 0 deletions deps/rabbitmq_shovel/test/dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ groups() ->
[
{non_parallel_tests, [], [
simple,
quorum_queues,
set_properties_using_proplist,
set_properties_using_map,
set_empty_properties_using_proplist,
Expand Down Expand Up @@ -80,6 +81,20 @@ simple(Config) ->
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).

quorum_queues(Config) ->
with_ch(Config,
fun (Ch) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [
{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}},
{<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}
]),
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).

set_properties_using_map(Config) ->
with_ch(Config,
fun (Ch) ->
Expand Down