Skip to content

Commit 7e0f488

Browse files
author
Alex Valiushko
committed
extend CLI & HTTP API to pass membership
1 parent 767cb99 commit 7e0f488

File tree

7 files changed

+118
-40
lines changed

7 files changed

+118
-40
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
-export([format_ra_event/3]).
4949
-export([cleanup_data_dir/0]).
5050
-export([shrink_all/1,
51-
grow/4]).
51+
grow/5]).
5252
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
5353
-export([file_handle_leader_reservation/1,
5454
file_handle_other_reservation/0]).
@@ -86,6 +86,7 @@
8686
-type msg_id() :: non_neg_integer().
8787
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(),
8888
mc:state()}.
89+
-type membership() :: voter | non_voter | promotable. %% see ra_membership() in Ra.
8990

9091
-define(RA_SYSTEM, quorum_queues).
9192
-define(RA_WAL_NAME, ra_log_wal).
@@ -1179,10 +1180,10 @@ add_member(VHost, Name, Node, VoterStatus, Timeout) ->
11791180
add_member(Q, Node) ->
11801181
add_member(Q, Node, promotable).
11811182

1182-
add_member(Q, Node, VoterStatus) ->
1183-
add_member(Q, Node, VoterStatus, ?ADD_MEMBER_TIMEOUT).
1183+
add_member(Q, Node, Membership) ->
1184+
add_member(Q, Node, Membership, ?ADD_MEMBER_TIMEOUT).
11841185

1185-
add_member(Q, Node, VoterStatus, Timeout) when ?amqqueue_is_quorum(Q) ->
1186+
add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
11861187
{RaName, _} = amqqueue:get_pid(Q),
11871188
QName = amqqueue:get_name(Q),
11881189
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
@@ -1192,7 +1193,7 @@ add_member(Q, Node, VoterStatus, Timeout) when ?amqqueue_is_quorum(Q) ->
11921193
?TICK_TIMEOUT),
11931194
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
11941195
?SNAPSHOT_INTERVAL),
1195-
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, VoterStatus),
1196+
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership),
11961197
case ra:start_server(?RA_SYSTEM, Conf) of
11971198
ok ->
11981199
ServerIdSpec = maps:with([id, uid, membership], Conf),
@@ -1304,17 +1305,17 @@ shrink_all(Node) ->
13041305
amqqueue:get_type(Q) == ?MODULE,
13051306
lists:member(Node, get_nodes(Q))].
13061307

1307-
-spec grow(node(), binary(), binary(), all | even) ->
1308+
-spec grow(node(), binary(), binary(), all | even, membership()) ->
13081309
[{rabbit_amqqueue:name(),
13091310
{ok, pos_integer()} | {error, pos_integer(), term()}}].
1310-
grow(Node, VhostSpec, QueueSpec, Strategy) ->
1311+
grow(Node, VhostSpec, QueueSpec, Membership, Strategy) ->
13111312
Running = rabbit_nodes:list_running(),
13121313
[begin
13131314
Size = length(get_nodes(Q)),
13141315
QName = amqqueue:get_name(Q),
13151316
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
13161317
[rabbit_misc:rs(QName), Node]),
1317-
case add_member(Q, Node) of
1318+
case add_member(Q, Node, Membership) of
13181319
ok ->
13191320
{QName, {ok, Size + 1}};
13201321
{error, Err} ->

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,45 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do
1010

1111
@behaviour RabbitMQ.CLI.CommandBehaviour
1212

13-
@default_timeout 5_000
13+
defp default_opts, do: %{vhost: "/",
14+
membership: "promotable",
15+
timeout: 5_000}
1416

1517
def merge_defaults(args, opts) do
16-
timeout =
17-
case opts[:timeout] do
18-
nil -> @default_timeout
19-
:infinity -> @default_timeout
20-
other -> other
21-
end
22-
23-
{args, Map.merge(%{vhost: "/", timeout: timeout}, opts)}
18+
default = default_opts()
19+
opts = Map.update(opts, :timeout, :infinity,
20+
&(case &1 do
21+
:infinity -> default.timeout
22+
other -> other
23+
end))
24+
{args, Map.merge(default, opts)}
2425
end
2526

26-
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
27-
use RabbitMQ.CLI.Core.AcceptsTwoPositionalArguments
27+
def switches(),
28+
do: [
29+
timeout: :integer,
30+
membership: :string
31+
]
32+
33+
def aliases(), do: [t: :timeout]
34+
35+
def validate(args, _) when length(args) < 2 do
36+
{:validation_failure, :not_enough_args}
37+
end
38+
39+
def validate(args, _) when length(args) > 2 do
40+
{:validation_failure, :too_many_args}
41+
end
42+
43+
def validate(_, %{membership: m}) when not (m == "promotable" or
44+
m == "non_voter" or
45+
m == "voter") do
46+
{:validation_failure, "voter status '#{m}' is not recognised."}
47+
end
48+
49+
def validate(_, _) do
50+
:ok
51+
end
2852

2953
def validate_execution_environment(args, opts) do
3054
Validators.chain(
@@ -39,11 +63,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do
3963
)
4064
end
4165

42-
def run([name, node] = _args, %{vhost: vhost, node: node_name, timeout: timeout}) do
66+
def run([name, node] = _args,
67+
%{vhost: vhost, node: node_name, timeout: timeout, membership: membership}) do
4368
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, [
4469
vhost,
4570
name,
4671
to_atom(node),
72+
to_atom(membership),
4773
timeout
4874
]) do
4975
{:error, :classic_queue_not_supported} ->
@@ -59,12 +85,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do
5985

6086
use RabbitMQ.CLI.DefaultOutput
6187

62-
def usage, do: "add_member [--vhost <vhost>] <queue> <node>"
88+
def usage, do: "add_member [--vhost <vhost>] <queue> <node> [--membership <promotable|voter>]"
6389

6490
def usage_additional do
6591
[
6692
["<queue>", "quorum queue name"],
67-
["<node>", "node to add a new replica on"]
93+
["<node>", "node to add a new replica on"],
94+
["--membership <promotable|voter>", "add a promotable non-voter (default) or full voter"]
6895
]
6996
end
7097

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
1010

1111
@behaviour RabbitMQ.CLI.CommandBehaviour
1212

13-
defp default_opts, do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}
13+
defp default_opts, do: %{vhost_pattern: ".*",
14+
queue_pattern: ".*",
15+
membership: "promotable",
16+
errors_only: false}
1417

1518
def switches(),
1619
do: [
1720
vhost_pattern: :string,
1821
queue_pattern: :string,
22+
membership: :string,
1923
errors_only: :boolean
2024
]
2125

@@ -31,17 +35,19 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
3135
{:validation_failure, :too_many_args}
3236
end
3337

34-
def validate([_, s], _) do
35-
case s do
36-
"all" ->
37-
:ok
38+
def validate([_, s], _) when not (s == "all" or
39+
s == "even") do
40+
{:validation_failure, "strategy '#{s}' is not recognised."}
41+
end
3842

39-
"even" ->
40-
:ok
43+
def validate(_, %{membership: m}) when not (m == "promotable" or
44+
m == "non_voter" or
45+
m == "voter") do
46+
{:validation_failure, "voter status '#{m}' is not recognised."}
47+
end
4148

42-
_ ->
43-
{:validation_failure, "strategy '#{s}' is not recognised."}
44-
end
49+
def validate(_, _) do
50+
:ok
4551
end
4652

4753
def validate_execution_environment(args, opts) do
@@ -58,12 +64,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
5864
node: node_name,
5965
vhost_pattern: vhost_pat,
6066
queue_pattern: queue_pat,
67+
membership: membership,
6168
errors_only: errors_only
6269
}) do
6370
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, [
6471
to_atom(node),
6572
vhost_pat,
6673
queue_pat,
74+
to_atom(membership),
6775
to_atom(strategy)
6876
]) do
6977
{:error, _} = error ->
@@ -97,7 +105,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
97105
def formatter(), do: RabbitMQ.CLI.Formatters.Table
98106

99107
def usage,
100-
do: "grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"
108+
do: "grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
101109

102110
def usage_additional do
103111
[
@@ -108,6 +116,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
108116
],
109117
["--queue-pattern <pattern>", "regular expression to match queue names"],
110118
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
119+
["--membership <promotable|voter>", "add a promotable non-voter (default) or full voter"],
111120
["--errors-only", "only list queues which reported an error"]
112121
]
113122
end

deps/rabbitmq_cli/test/queues/add_member_command_test.exs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do
2020
{:ok,
2121
opts: %{
2222
node: get_rabbit_hostname(),
23+
membership: "voter",
24+
vhost: "/",
2325
timeout: context[:test_timeout] || 30000
2426
}}
2527
end
@@ -42,17 +44,35 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do
4244
) == {:validation_failure, :too_many_args}
4345
end
4446

47+
test "validate: when membership promotable is provided, returns a success" do
48+
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "promotable"}) == :ok
49+
end
50+
51+
test "validate: when membership voter is provided, returns a success" do
52+
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "voter"}) == :ok
53+
end
54+
55+
test "validate: when membership non_voter is provided, returns a success" do
56+
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "non_voter"}) == :ok
57+
end
58+
59+
test "validate: when wrong membership is provided, returns failure" do
60+
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "banana"}) ==
61+
{:validation_failure, "voter status 'banana' is not recognised."}
62+
end
63+
64+
4565
test "validate: treats two positional arguments and default switches as a success" do
4666
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{}) == :ok
4767
end
4868

4969
@tag test_timeout: 3000
50-
test "run: targeting an unreachable node throws a badrpc" do
70+
test "run: targeting an unreachable node throws a badrpc", context do
5171
assert match?(
5272
{:badrpc, _},
5373
@command.run(
5474
["quorum-queue-a", "rabbit@new-node"],
55-
%{node: :jake@thedog, vhost: "/", timeout: 200}
75+
Map.merge(context[:opts], %{node: :jake@thedog})
5676
)
5777
)
5878
end

deps/rabbitmq_cli/test/queues/grow_command_test.exs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
2323
timeout: context[:test_timeout] || 30000,
2424
vhost_pattern: ".*",
2525
queue_pattern: ".*",
26+
membership: "promotable",
2627
errors_only: false
2728
}}
2829
end
2930

3031
test "merge_defaults: defaults to reporting complete results" do
3132
assert @command.merge_defaults([], %{}) ==
32-
{[], %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}}
33+
{[], %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false, membership: "promotable"}}
3334
end
3435

3536
test "validate: when no arguments are provided, returns a failure" do
@@ -58,13 +59,30 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
5859
{:validation_failure, :too_many_args}
5960
end
6061

62+
test "validate: when membership promotable is provided, returns a success" do
63+
assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok
64+
end
65+
66+
test "validate: when membership voter is provided, returns a success" do
67+
assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok
68+
end
69+
70+
test "validate: when membership non_voter is provided, returns a success" do
71+
assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok
72+
end
73+
74+
test "validate: when wrong membership is provided, returns failure" do
75+
assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) ==
76+
{:validation_failure, "voter status 'banana' is not recognised."}
77+
end
78+
6179
@tag test_timeout: 3000
6280
test "run: targeting an unreachable node throws a badrpc", context do
6381
assert match?(
6482
{:badrpc, _},
6583
@command.run(
6684
["quorum-queue-a", "all"],
67-
Map.merge(context[:opts], %{node: :jake@thedog, timeout: 200})
85+
Map.merge(context[:opts], %{node: :jake@thedog})
6886
)
6987
)
7088
end

deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ accept_content(ReqData, Context) ->
3838
QName = rabbit_mgmt_util:id(queue, ReqData),
3939
Res = rabbit_mgmt_util:with_decode(
4040
[node], ReqData, Context,
41-
fun([NewReplicaNode], _Body, _ReqData) ->
41+
fun([NewReplicaNode], Body, _ReqData) ->
42+
Membership = maps:get(<<"membership">>, Body, promotable),
4243
rabbit_amqqueue:with(
4344
rabbit_misc:r(VHost, queue, QName),
4445
fun(_Q) ->
45-
rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), promotable, ?TIMEOUT)
46+
rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), Membership, ?TIMEOUT)
4647
end)
4748
end),
4849
case Res of

deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ accept_content(ReqData, Context) ->
3939
NewReplicaNode = rabbit_mgmt_util:id(node, ReqData),
4040
rabbit_mgmt_util:with_decode(
4141
[vhost_pattern, queue_pattern, strategy], ReqData, Context,
42-
fun([VHPattern, QPattern, Strategy], _Body, _ReqData) ->
42+
fun([VHPattern, QPattern, Strategy], Body, _ReqData) ->
43+
Membership = maps:get(<<"membership">>, Body, promotable),
4344
rabbit_quorum_queue:grow(
4445
rabbit_data_coercion:to_atom(NewReplicaNode),
4546
VHPattern,
4647
QPattern,
48+
rabbit_data_coercion:to_atom(Membership),
4749
rabbit_data_coercion:to_atom(Strategy))
4850
end),
4951
{true, ReqData, Context}.

0 commit comments

Comments
 (0)