Skip to content

Commit 250e700

Browse files
author
Alex Valiushko
committed
Newly added followers do not participate in quorum until they catch up with the log
1 parent 54860c4 commit 250e700

File tree

9 files changed

+390
-43
lines changed

9 files changed

+390
-43
lines changed

src/ra.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
579579
{'$ra_join', ServerId, after_log_append},
580580
Timeout).
581581

582-
583582
%% @doc Removes a server from the cluster's membership configuration.
584583
%% This function returns after appending a cluster membership change
585584
%% command to the log.

src/ra.hrl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,18 @@
4444
suspended |
4545
disconnected.
4646

47+
-type ra_voter_status() :: voter | {nonvoter, ra_nonvoter_reason()}.
48+
49+
-type ra_nonvoter_reason() :: init | #{target := ra_index()}.
50+
4751
-type ra_peer_state() :: #{next_index := non_neg_integer(),
4852
match_index := non_neg_integer(),
4953
query_index := non_neg_integer(),
5054
% the commit index last sent
5155
% used for evaluating pipeline status
5256
commit_index_sent := non_neg_integer(),
57+
%% whether the peer is part of the consensus
58+
voter_status := ra_voter_status(),
5359
%% indicates that a snapshot is being sent
5460
%% to the peer
5561
status := ra_peer_status()}.

src/ra_directory.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,20 @@ overview(System) when is_atom(System) ->
175175
#{directory := Tbl,
176176
directory_rev := _TblRev} = get_names(System),
177177
Dir = ets:tab2list(Tbl),
178-
States = maps:from_list(ets:tab2list(ra_state)),
178+
Rows = lists:map(fun({K, S, V}) ->
179+
{K, {S, V}}
180+
end,
181+
ets:tab2list(ra_state)),
182+
States = maps:from_list(Rows),
179183
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
180184
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
185+
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
181186
Acc#{ServerName =>
182187
#{uid => UId,
183188
pid => Pid,
184189
parent => Parent,
185-
state => maps:get(ServerName, States, undefined),
190+
state => S,
191+
voter_status => V,
186192
cluster_name => ClusterName,
187193
snapshot_state => maps:get(UId, Snaps,
188194
undefined)}}

src/ra_server.erl

Lines changed: 138 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
terminate/2,
5656
log_fold/3,
5757
log_read/2,
58+
voter_status/1,
5859
recover/1
5960
]).
6061

@@ -72,6 +73,7 @@
7273
log := term(),
7374
voted_for => 'maybe'(ra_server_id()), % persistent
7475
votes => non_neg_integer(),
76+
voter_status => ra_voter_status(),
7577
commit_index := ra_index(),
7678
last_applied := ra_index(),
7779
persisted_last_applied => ra_index(),
@@ -325,16 +327,27 @@ init(#{id := Id,
325327
counter = maps:get(counter, Config, undefined),
326328
system_config = SystemConfig},
327329

330+
VoterStatus = case maps:get(voter_status, Config, true) of
331+
false ->
332+
{nonvoter, init};
333+
true ->
334+
voter
335+
end,
336+
337+
Peer = maps:get(Id, Cluster0),
338+
Cluster1 = Cluster0#{Id => Peer#{voter_status => VoterStatus}},
339+
328340
#{cfg => Cfg,
329341
current_term => CurrentTerm,
330-
cluster => Cluster0,
342+
cluster => Cluster1,
331343
% There may be scenarios when a single server
332344
% starts up but hasn't
333345
% yet re-applied its noop command that we may receive other join
334346
% commands that can't be applied.
335347
cluster_change_permitted => false,
336348
cluster_index_term => SnapshotIndexTerm,
337349
voted_for => VotedFor,
350+
voter_status => VoterStatus,
338351
commit_index => CommitIndex,
339352
%% set this to the first index so that we can apply all entries
340353
%% up to the commit index during recovery
@@ -394,8 +407,8 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
394407
Peer = Peer0#{match_index => max(MI, LastIdx),
395408
next_index => max(NI, NextIdx)},
396409
State1 = put_peer(PeerId, Peer, State0),
397-
{State2, Effects0} = evaluate_quorum(State1, []),
398-
410+
Effects00 = maybe_promote_voter(PeerId, State1, []),
411+
{State2, Effects0} = evaluate_quorum(State1, Effects00),
399412
{State, Effects1} = process_pending_consistent_queries(State2,
400413
Effects0),
401414
Effects = [{next_event, info, pipeline_rpcs} | Effects1],
@@ -776,7 +789,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
776789
NewVotes = Votes + 1,
777790
?DEBUG("~ts: vote granted for term ~b votes ~b",
778791
[LogId, Term, NewVotes]),
779-
case trunc(maps:size(Nodes) / 2) + 1 of
792+
case required_quorum(Nodes) of
780793
NewVotes ->
781794
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
782795
Noop = {noop, #{ts => erlang:system_time(millisecond)},
@@ -922,7 +935,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
922935
[LogId, Token, Term, Votes + 1]),
923936
NewVotes = Votes + 1,
924937
State = update_term(Term, State0),
925-
case trunc(maps:size(Nodes) / 2) + 1 of
938+
case required_quorum(Nodes) of
926939
NewVotes ->
927940
call_for_election(candidate, State);
928941
_ ->
@@ -1103,8 +1116,18 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
11031116
% simply forward all other events to ra_log
11041117
{Log, Effects} = ra_log:handle_event(Evt, Log0),
11051118
{follower, State#{log => Log}, Effects};
1119+
handle_follower(#pre_vote_rpc{},
1120+
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
1121+
?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0",
1122+
[LogId, Voter]),
1123+
{follower, State, []};
11061124
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
11071125
process_pre_vote(follower, PreVote, State);
1126+
handle_follower(#request_vote_rpc{},
1127+
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
1128+
?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0",
1129+
[LogId, Voter]),
1130+
{follower, State, []};
11081131
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
11091132
#{current_term := Term, voted_for := VotedFor,
11101133
cfg := #cfg{log_id = LogId}} = State)
@@ -1202,6 +1225,11 @@ handle_follower(#append_entries_reply{}, State) ->
12021225
%% handle to avoid logging as unhandled
12031226
%% could receive a lot of these shortly after standing down as leader
12041227
{follower, State, []};
1228+
handle_follower(election_timeout,
1229+
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
1230+
?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0",
1231+
[LogId, Voter]),
1232+
{follower, State, []};
12051233
handle_follower(election_timeout, State) ->
12061234
call_for_election(pre_vote, State);
12071235
handle_follower(try_become_leader, State) ->
@@ -1374,7 +1402,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
13741402
cluster_index_term,
13751403
query_index
13761404
], State),
1377-
O = maps:merge(O0, cfg_to_map(Cfg)),
1405+
O1 = O0#{voter_status => voter_status(State)},
1406+
O = maps:merge(O1, cfg_to_map(Cfg)),
13781407
LogOverview = ra_log:overview(Log),
13791408
MacOverview = ra_machine:overview(MacMod, MacState),
13801409
O#{log => LogOverview,
@@ -2087,6 +2116,7 @@ new_peer() ->
20872116
match_index => 0,
20882117
commit_index_sent => 0,
20892118
query_index => 0,
2119+
voter_status => voter,
20902120
status => normal}.
20912121

20922122
new_peer_with(Map) ->
@@ -2218,13 +2248,13 @@ make_cluster(Self, Nodes) ->
22182248
Cluster#{Self => new_peer()}
22192249
end.
22202250

2221-
initialise_peers(State = #{log := Log, cluster := Cluster0}) ->
2222-
PeerIds = peer_ids(State),
2251+
initialise_peers(State = #{cfg := #cfg{id = Id}, log := Log, cluster := Cluster0}) ->
22232252
NextIdx = ra_log:next_index(Log),
2224-
Cluster = lists:foldl(fun(PeerId, Acc) ->
2225-
Acc#{PeerId =>
2226-
new_peer_with(#{next_index => NextIdx})}
2227-
end, Cluster0, PeerIds),
2253+
Cluster = maps:map(fun (PeerId, Self) when PeerId =:= Id ->
2254+
Self;
2255+
(_, #{voter_status := Voter} = _Other) ->
2256+
new_peer_with(#{next_index => NextIdx, voter_status => Voter})
2257+
end, Cluster0),
22282258
State#{cluster => Cluster}.
22292259

22302260
apply_to(ApplyTo, State, Effs) ->
@@ -2318,6 +2348,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
23182348
[log_id(State0), maps:keys(NewCluster)]),
23192349
%% we are recovering and should apply the cluster change
23202350
State0#{cluster => NewCluster,
2351+
voter_status => voter_status(id(State0), NewCluster),
23212352
cluster_change_permitted => true,
23222353
cluster_index_term => {Idx, Term}};
23232354
_ ->
@@ -2450,16 +2481,33 @@ append_log_leader({CmdTag, _, _, _},
24502481
when CmdTag == '$ra_join' orelse
24512482
CmdTag == '$ra_leave' ->
24522483
{not_appended, cluster_change_not_permitted, State};
2484+
append_log_leader({'$ra_join', From,
2485+
#{node := JoiningNode, voter_status := Voter},
2486+
ReplyMode},
2487+
State = #{cluster := OldCluster}) ->
2488+
case OldCluster of
2489+
#{JoiningNode := #{voter_status := Voter}} ->
2490+
already_member(State);
2491+
#{JoiningNode := Peer} ->
2492+
% Update member status.
2493+
Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}},
2494+
append_cluster_change(Cluster, From, ReplyMode, State);
2495+
_ ->
2496+
% Insert new member.
2497+
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})},
2498+
append_cluster_change(Cluster, From, ReplyMode, State)
2499+
end;
24532500
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
24542501
State = #{cluster := OldCluster}) ->
2502+
% Legacy $ra_join, join as non-voter iff no such member in the cluster.
24552503
case OldCluster of
24562504
#{JoiningNode := _} ->
2457-
% already a member do nothing
2458-
% TODO: reply? If we don't reply the caller may block until timeout
2459-
{not_appended, already_member, State};
2505+
already_member(State);
24602506
_ ->
2461-
Cluster = OldCluster#{JoiningNode => new_peer()},
2462-
append_cluster_change(Cluster, From, ReplyMode, State)
2507+
append_log_leader({'$ra_join', From,
2508+
#{node => JoiningNode, voter_status => new_nonvoter(State)},
2509+
ReplyMode},
2510+
State)
24632511
end;
24642512
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
24652513
State = #{cfg := #cfg{log_id = LogId},
@@ -2501,6 +2549,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
25012549
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
25022550
State) ->
25032551
State#{cluster => Cluster,
2552+
voter_status => voter_status(id(State), Cluster),
25042553
cluster_index_term => {Idx, Term}};
25052554
pre_append_log_follower(_, State) ->
25062555
State.
@@ -2577,6 +2626,8 @@ query_indexes(#{cfg := #cfg{id = Id},
25772626
query_index := QueryIndex}) ->
25782627
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
25792628
Acc;
2629+
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
2630+
Acc;
25802631
(_K, #{query_index := Idx}, Acc) ->
25812632
[Idx | Acc]
25822633
end, [QueryIndex], Cluster).
@@ -2587,6 +2638,8 @@ match_indexes(#{cfg := #cfg{id = Id},
25872638
{LWIdx, _} = ra_log:last_written(Log),
25882639
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
25892640
Acc;
2641+
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
2642+
Acc;
25902643
(_K, #{match_index := Idx}, Acc) ->
25912644
[Idx | Acc]
25922645
end, [LWIdx], Cluster).
@@ -2803,6 +2856,74 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
28032856
Name;
28042857
meta_name(#{names := #{log_meta := Name}}) ->
28052858
Name.
2859+
2860+
already_member(State) ->
2861+
% already a member do nothing
2862+
% TODO: reply? If we don't reply the caller may block until timeout
2863+
{not_appended, already_member, State}.
2864+
2865+
%%% ====================
2866+
%%% Voter status helpers
2867+
%%% ====================
2868+
2869+
-spec new_nonvoter(ra_server_state()) -> ra_voter_status().
2870+
new_nonvoter(#{commit_index := Target} = _State) ->
2871+
{nonvoter, #{target => Target}}.
2872+
2873+
-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().
2874+
maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->
2875+
% Unknown peer handled in the caller.
2876+
#{PeerID := #{match_index := MI, voter_status := OldStatus}} = Cluster,
2877+
case update_voter_status(OldStatus, MI) of
2878+
OldStatus ->
2879+
Effects;
2880+
voter ->
2881+
[{next_event,
2882+
{command, {'$ra_join',
2883+
#{ts => os:system_time(millisecond)},
2884+
#{node => PeerID, voter_status => voter},
2885+
noreply}}} |
2886+
Effects]
2887+
end.
2888+
2889+
update_voter_status({nonvoter, #{target := Target}}, MI)
2890+
when MI >= Target ->
2891+
voter;
2892+
update_voter_status(Permanent, _) ->
2893+
Permanent.
2894+
2895+
-spec voter_status(ra_server_state()) -> ra_voter_status().
2896+
voter_status(#{cluster := Cluster} = State) ->
2897+
case maps:get(voter_status, State, undefined) of
2898+
undefined ->
2899+
voter_status(id(State), Cluster);
2900+
Voter ->
2901+
Voter
2902+
end.
2903+
2904+
-spec voter_status(ra_server_id(), ra_cluster()) -> ra_voter_status().
2905+
voter_status(PeerId, Cluster) ->
2906+
case maps:get(PeerId, Cluster, undefined) of
2907+
undefined ->
2908+
undefined;
2909+
Peer ->
2910+
maps:get(voter_status, Peer, voter)
2911+
end.
2912+
2913+
-spec required_quorum(ra_cluster()) -> pos_integer().
2914+
required_quorum(Cluster) ->
2915+
Voters = count_voters(Cluster),
2916+
trunc(Voters / 2) + 1.
2917+
2918+
count_voters(Cluster) ->
2919+
maps:fold(
2920+
fun (_, #{voter_status := {nonvoter, _}}, Count) ->
2921+
Count;
2922+
(_, _, Count) ->
2923+
Count + 1
2924+
end,
2925+
0, Cluster).
2926+
28062927
%%% ===================
28072928
%%% Internal unit tests
28082929
%%% ===================

src/ra_server.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
1010
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
1111
-define(FLUSH_COMMANDS_SIZE, 16).
12+
-define(MAX_NONVOTER_ROUNDS, 4).
1213

1314
-record(cfg,
1415
{id :: ra_server_id(),
1516
uid :: ra_uid(),
1617
log_id :: unicode:chardata(),
1718
metrics_key :: term(),
19+
tick_timeout :: non_neg_integer(),
1820
machine :: ra_machine:machine(),
1921
machine_version :: ra_machine:version(),
2022
machine_versions :: [{ra_index(), ra_machine:version()}, ...],

src/ra_server_proc.erl

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -783,9 +783,17 @@ follower(_, tick_timeout, State0) ->
783783
set_tick_timer(State, Actions)};
784784
follower({call, From}, {log_fold, Fun, Term}, State) ->
785785
fold_log(From, Fun, Term, State);
786-
follower(EventType, Msg, State0) ->
786+
follower(EventType, Msg, #state{conf = #conf{name = Name},
787+
server_state = SS0} = State0) ->
788+
Voter0 = ra_server:voter_status(SS0),
787789
case handle_follower(Msg, State0) of
788-
{follower, State1, Effects} ->
790+
{follower, #state{server_state = SS1} = State1, Effects} ->
791+
case ra_server:voter_status(SS1) of
792+
Voter0 ->
793+
ok;
794+
Voter ->
795+
true = ets:update_element(ra_state, Name, {3, Voter})
796+
end,
789797
{State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
790798
State = follower_leader_change(State0, State2),
791799
{keep_state, State, Actions};
@@ -1028,7 +1036,8 @@ format_status(Opt, [_PDict, StateName,
10281036
handle_enter(RaftState, OldRaftState,
10291037
#state{conf = #conf{name = Name},
10301038
server_state = ServerState0} = State) ->
1031-
true = ets:insert(ra_state, {Name, RaftState}),
1039+
Voter = ra_server:voter_status(ServerState0),
1040+
true = ets:insert(ra_state, {Name, RaftState, Voter}),
10321041
{ServerState, Effects} = ra_server:handle_state_enter(RaftState,
10331042
ServerState0),
10341043
case RaftState == leader orelse OldRaftState == leader of
@@ -1716,9 +1725,11 @@ handle_tick_metrics(State) ->
17161725
_ = ets:insert(ra_metrics, Metrics),
17171726
State.
17181727

1719-
can_execute_locally(RaftState, TargetNode, State) ->
1728+
can_execute_locally(RaftState, TargetNode,
1729+
#state{server_state = ServerState} = State) ->
1730+
Voter = ra_server:voter_status(ServerState),
17201731
case RaftState of
1721-
follower ->
1732+
follower when Voter =:= voter ->
17221733
TargetNode == node();
17231734
leader when TargetNode =/= node() ->
17241735
%% We need to evaluate whether to send the message.

0 commit comments

Comments
 (0)