Skip to content

Commit 22b272c

Browse files
author
Alex Valiushko
committed
Newly added followers do not participate in quorum until they catch up with the log
An opt-in ability of a cluster to ignore newly joined member until it catches up with the log: New = #{id => Id, ini_non_voter => ra:new_nvid()}, {ok, _, _} = ra:add_member(ServerRef, New), ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), Voter status is stored in the cluster map of the server state and is part of every $ra_cluster_update. Additionally, nodes store their own status at the top level for ease of matching. Nodes also store their own satus in ra_state ETS table (breaking change), and present in overview. On every #append_entries_reply leader may choose to promote non-voter by issuing new `$ra_join` with desired voter status. Currently, only one promotion condition is implemented `{nonvoter, #{target := ra_index()}`. Non-voter will be promoted when it reaches the leaders log index at the time of joining.
1 parent 08bfcc4 commit 22b272c

File tree

11 files changed

+735
-66
lines changed

11 files changed

+735
-66
lines changed

src/ra.erl

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
overview/1,
7272
%% helpers
7373
new_uid/1,
74+
new_nvid/0,
7475
%% rebalancing
7576
transfer_leadership/2,
7677
%% auxiliary commands
@@ -455,7 +456,7 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,
455456

456457
%% @doc Starts a new distributed ra cluster.
457458
%% @param ClusterName the name of the cluster.
458-
%% @param ServerId the ra_server_id() of the server
459+
%% @param ServerId the ra_server_id() of the server, or a map with server id and settings.
459460
%% @param Machine The {@link ra_machine:machine/0} configuration.
460461
%% @param ServerIds a list of initial (seed) server configurations
461462
%% @returns
@@ -470,19 +471,20 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,
470471
%% forcefully deleted.
471472
%% @see start_server/1
472473
%% @end
473-
-spec start_server(atom(), ra_cluster_name(), ra_server_id(),
474+
-spec start_server(atom(), ra_cluster_name(), ra_server_id() | ra_new_server(),
474475
ra_server:machine_conf(), [ra_server_id()]) ->
475476
ok | {error, term()}.
476-
start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds)
477+
start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) ->
478+
start_server(System, ClusterName, #{id => ServerId}, Machine, ServerIds);
479+
start_server(System, ClusterName, #{id := {_, _}} = Conf0, Machine, ServerIds)
477480
when is_atom(System) ->
478481
UId = new_uid(ra_lib:to_binary(ClusterName)),
479482
Conf = #{cluster_name => ClusterName,
480-
id => ServerId,
481483
uid => UId,
482484
initial_members => ServerIds,
483485
log_init_args => #{uid => UId},
484486
machine => Machine},
485-
start_server(System, Conf).
487+
start_server(System, maps:merge(Conf0, Conf)).
486488

487489
%% @doc Starts a ra server in the default system
488490
%% @param Conf a ra_server_config() configuration map.
@@ -558,9 +560,10 @@ delete_cluster(ServerIds, Timeout) ->
558560
%% affect said cluster's availability characteristics (by increasing quorum node count).
559561
%%
560562
%% @param ServerLoc the ra server or servers to try to send the command to
561-
%% @param ServerId the ra server id of the new server.
563+
%% @param ServerId the ra server id of the new server, or a map with server id and settings.
562564
%% @end
563-
-spec add_member(ra_server_id() | [ra_server_id()], ra_server_id()) ->
565+
-spec add_member(ra_server_id() | [ra_server_id()],
566+
ra_server_id() | ra_new_server()) ->
564567
ra_cmd_ret() |
565568
{error, already_member} |
566569
{error, cluster_change_not_permitted}.
@@ -571,7 +574,8 @@ add_member(ServerLoc, ServerId) ->
571574
%% @see add_member/2
572575
%% @end
573576
-spec add_member(ra_server_id() | [ra_server_id()],
574-
ra_server_id(), timeout()) ->
577+
ra_server_id() | ra_new_server(),
578+
timeout()) ->
575579
ra_cmd_ret() |
576580
{error, already_member} |
577581
{error, cluster_change_not_permitted}.
@@ -580,7 +584,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
580584
{'$ra_join', ServerId, after_log_append},
581585
Timeout).
582586

583-
584587
%% @doc Removes a server from the cluster's membership configuration.
585588
%% This function returns after appending a cluster membership change
586589
%% command to the log.
@@ -716,6 +719,13 @@ new_uid(Source) when is_binary(Source) ->
716719
Prefix = ra_lib:derive_safe_string(Source, 6),
717720
ra_lib:make_uid(string:uppercase(Prefix)).
718721

722+
%% @doc generates a random uid using timestamp for the first
723+
%% 6 characters.
724+
%% @end
725+
new_nvid() ->
726+
Millis = erlang:system_time(millisecond),
727+
Prefix = base64:encode(<<Millis:32/little>>),
728+
new_uid(Prefix).
719729

720730
%% @doc Returns a map of overview data of the default Ra system on the current Erlang
721731
%% node.
@@ -1132,13 +1142,16 @@ key_metrics({Name, N} = ServerId) when N == node() ->
11321142
end,
11331143
case whereis(Name) of
11341144
undefined ->
1135-
Counters#{state => noproc};
1145+
Counters#{state => noproc,
1146+
voter_status => noproc};
11361147
_ ->
11371148
case ets:lookup(ra_state, Name) of
11381149
[] ->
1139-
Counters#{state => unknown};
1140-
[{_, State}] ->
1141-
Counters#{state => State}
1150+
Counters#{state => unknown,
1151+
voter_status => unknown};
1152+
[{_, State, Voter}] ->
1153+
Counters#{state => State,
1154+
voter_status => Voter}
11421155
end
11431156
end;
11441157
key_metrics({_, N} = ServerId) ->

src/ra.hrl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,42 @@
3333
%% used for on disk resources and local name to pid mapping
3434
-type ra_uid() :: binary().
3535

36+
%% Transient ID that uniquely identifies any new non-voter.
37+
-type ra_nvid() :: binary().
38+
3639
%% Identifies a Ra server (node) in a Ra cluster.
3740
%%
3841
%% Ra servers need to be registered stable names (names that are reachable
3942
%% after node restart). Pids are not stable in this sense.
4043
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
4144

45+
%% Specifies server configuration for a new cluster member.
46+
%% Both `ra:add_member` and `ra:start_server` must be called with the same value.
47+
-type ra_new_server() :: #{id := ra_server_id(),
48+
49+
%% If set, server will start as non-voter until later promoted by the
50+
%% leader.
51+
init_non_voter => ra_nvid()}.
52+
4253
-type ra_peer_status() :: normal |
4354
{sending_snapshot, pid()} |
4455
suspended |
4556
disconnected.
4657

58+
-type ra_voter_status() :: {voter, ra_voter_state()} |
59+
{nonvoter, ra_voter_state()}.
60+
61+
-type ra_voter_state() :: #{nvid => ra_nvid(),
62+
target => ra_index()}.
63+
4764
-type ra_peer_state() :: #{next_index := non_neg_integer(),
4865
match_index := non_neg_integer(),
4966
query_index := non_neg_integer(),
5067
% the commit index last sent
5168
% used for evaluating pipeline status
5269
commit_index_sent := non_neg_integer(),
70+
%% whether the peer is part of the consensus
71+
voter_status := ra_voter_status(),
5372
%% indicates that a snapshot is being sent
5473
%% to the peer
5574
status := ra_peer_status()}.
@@ -139,6 +158,7 @@
139158
-type snapshot_meta() :: #{index := ra_index(),
140159
term := ra_term(),
141160
cluster := ra_cluster_servers(),
161+
cluster_state => ra_cluster(), %% TODO replace `cluster`
142162
machine_version := ra_machine:version()}.
143163

144164
-record(install_snapshot_rpc,

src/ra_directory.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,20 @@ overview(System) when is_atom(System) ->
175175
#{directory := Tbl,
176176
directory_rev := _TblRev} = get_names(System),
177177
Dir = ets:tab2list(Tbl),
178-
States = maps:from_list(ets:tab2list(ra_state)),
178+
Rows = lists:map(fun({K, S, V}) ->
179+
{K, {S, V}}
180+
end,
181+
ets:tab2list(ra_state)),
182+
States = maps:from_list(Rows),
179183
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
180184
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
185+
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
181186
Acc#{ServerName =>
182187
#{uid => UId,
183188
pid => Pid,
184189
parent => Parent,
185-
state => maps:get(ServerName, States, undefined),
190+
state => S,
191+
voter_status => V,
186192
cluster_name => ClusterName,
187193
snapshot_state => maps:get(UId, Snaps,
188194
undefined)}}

src/ra_log.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState,
644644
end,
645645
Meta = #{index => Idx,
646646
cluster => ClusterServerIds,
647+
cluster_state => Cluster,
647648
machine_version => MacVersion},
648649
% The release cursor index is the last entry _not_ contributing
649650
% to the current state. I.e. the last entry that can be discarded.

0 commit comments

Comments
 (0)