Skip to content

Commit 1073dd4

Browse files
author
Alex Valiushko
committed
Newly added followers do not participate in quorum until they catch up with the log
1 parent c1d2036 commit 1073dd4

File tree

9 files changed

+360
-36
lines changed

9 files changed

+360
-36
lines changed

src/ra.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
579579
{'$ra_join', ServerId, after_log_append},
580580
Timeout).
581581

582-
583582
%% @doc Removes a server from the cluster's membership configuration.
584583
%% This function returns after appending a cluster membership change
585584
%% command to the log.

src/ra.hrl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,18 @@
4444
suspended |
4545
disconnected.
4646

47+
-type ra_voter_status() :: voter | {nonvoter, ra_nonvoter_reason()}.
48+
49+
-type ra_nonvoter_reason() :: init | #{target := ra_index()}.
50+
4751
-type ra_peer_state() :: #{next_index := non_neg_integer(),
4852
match_index := non_neg_integer(),
4953
query_index := non_neg_integer(),
5054
% the commit index last sent
5155
% used for evaluating pipeline status
5256
commit_index_sent := non_neg_integer(),
57+
%% whether the peer is part of the consensus
58+
voter_status := ra_voter_status(),
5359
%% indicates that a snapshot is being sent
5460
%% to the peer
5561
status := ra_peer_status()}.

src/ra_directory.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,20 @@ overview(System) when is_atom(System) ->
175175
#{directory := Tbl,
176176
directory_rev := _TblRev} = get_names(System),
177177
Dir = ets:tab2list(Tbl),
178-
States = maps:from_list(ets:tab2list(ra_state)),
178+
Rows = lists:map(fun({K, S, V}) ->
179+
{K, {S, V}}
180+
end,
181+
ets:tab2list(ra_state)),
182+
States = maps:from_list(Rows),
179183
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
180184
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
185+
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
181186
Acc#{ServerName =>
182187
#{uid => UId,
183188
pid => Pid,
184189
parent => Parent,
185-
state => maps:get(ServerName, States, undefined),
190+
state => S,
191+
voter_status => V,
186192
cluster_name => ClusterName,
187193
snapshot_state => maps:get(UId, Snaps,
188194
undefined)}}

src/ra_server.erl

Lines changed: 136 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
terminate/2,
5656
log_fold/3,
5757
log_read/2,
58+
voter_status/1,
5859
recover/1
5960
]).
6061

@@ -72,6 +73,7 @@
7273
log := term(),
7374
voted_for => 'maybe'(ra_server_id()), % persistent
7475
votes => non_neg_integer(),
76+
voter_status => ra_voter_status(),
7577
commit_index := ra_index(),
7678
last_applied := ra_index(),
7779
persisted_last_applied => ra_index(),
@@ -182,6 +184,7 @@
182184
log_init_args := ra_log:ra_log_init_args(),
183185
initial_members := [ra_server_id()],
184186
machine := machine_conf(),
187+
voter => boolean(),
185188
friendly_name => unicode:chardata(),
186189
metrics_key => term(),
187190
% TODO: review - only really used for
@@ -236,6 +239,7 @@ init(#{id := Id,
236239
cluster_name := _ClusterName,
237240
initial_members := InitialNodes,
238241
log_init_args := LogInitArgs,
242+
tick_timeout := Timeout,
239243
machine := MachineConf} = Config) ->
240244
SystemConfig = maps:get(system_config, Config,
241245
ra_system:default_config()),
@@ -315,6 +319,7 @@ init(#{id := Id,
315319
uid = UId,
316320
log_id = LogId,
317321
metrics_key = MetricKey,
322+
tick_timeout = Timeout,
318323
machine = Machine,
319324
machine_version = LatestMacVer,
320325
machine_versions = [{SnapshotIdx, MacVer}],
@@ -325,6 +330,13 @@ init(#{id := Id,
325330
counter = maps:get(counter, Config, undefined),
326331
system_config = SystemConfig},
327332

333+
VoterStatus = case maps:get(voter, Config, false) of
334+
false ->
335+
{nonvoter, init};
336+
true ->
337+
voter
338+
end,
339+
328340
#{cfg => Cfg,
329341
current_term => CurrentTerm,
330342
cluster => Cluster0,
@@ -335,6 +347,7 @@ init(#{id := Id,
335347
cluster_change_permitted => false,
336348
cluster_index_term => SnapshotIndexTerm,
337349
voted_for => VotedFor,
350+
voter_status => VoterStatus,
338351
commit_index => CommitIndex,
339352
%% set this to the first index so that we can apply all entries
340353
%% up to the commit index during recovery
@@ -394,11 +407,16 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
394407
Peer = Peer0#{match_index => max(MI, LastIdx),
395408
next_index => max(NI, NextIdx)},
396409
State1 = put_peer(PeerId, Peer, State0),
397-
{State2, Effects0} = evaluate_quorum(State1, []),
410+
411+
Effects00 = maybe_promote_voter(PeerId, State1, []),
412+
413+
{State2, Effects0} = evaluate_quorum(State1, Effects00),
398414

399415
{State, Effects1} = process_pending_consistent_queries(State2,
400416
Effects0),
417+
401418
Effects = [{next_event, info, pipeline_rpcs} | Effects1],
419+
402420
case State of
403421
#{cluster := #{Id := _}} ->
404422
% leader is in the cluster
@@ -776,7 +794,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
776794
NewVotes = Votes + 1,
777795
?DEBUG("~ts: vote granted for term ~b votes ~b",
778796
[LogId, Term, NewVotes]),
779-
case trunc(maps:size(Nodes) / 2) + 1 of
797+
case required_quorum(Nodes) of
780798
NewVotes ->
781799
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
782800
Noop = {noop, #{ts => erlang:system_time(millisecond)},
@@ -922,7 +940,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
922940
[LogId, Token, Term, Votes + 1]),
923941
NewVotes = Votes + 1,
924942
State = update_term(Term, State0),
925-
case trunc(maps:size(Nodes) / 2) + 1 of
943+
case required_quorum(Nodes) of
926944
NewVotes ->
927945
call_for_election(candidate, State);
928946
_ ->
@@ -1103,8 +1121,18 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
11031121
% simply forward all other events to ra_log
11041122
{Log, Effects} = ra_log:handle_event(Evt, Log0),
11051123
{follower, State#{log => Log}, Effects};
1124+
handle_follower(#pre_vote_rpc{},
1125+
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
1126+
?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0",
1127+
[LogId, Voter]),
1128+
{follower, State, []};
11061129
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
11071130
process_pre_vote(follower, PreVote, State);
1131+
handle_follower(#request_vote_rpc{},
1132+
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
1133+
?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0",
1134+
[LogId, Voter]),
1135+
{follower, State, []};
11081136
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
11091137
#{current_term := Term, voted_for := VotedFor,
11101138
cfg := #cfg{log_id = LogId}} = State)
@@ -1161,7 +1189,7 @@ handle_follower(#install_snapshot_rpc{term = Term,
11611189
State = #{cfg := #cfg{log_id = LogId}, current_term := CurTerm})
11621190
when Term < CurTerm ->
11631191
?DEBUG("~ts: install_snapshot old term ~b in ~b",
1164-
[LogId, LastIndex, LastTerm]),
1192+
[LogId, LastIndex, LastTerm]),
11651193
% follower receives a snapshot from an old term
11661194
Reply = #install_snapshot_result{term = CurTerm,
11671195
last_term = LastTerm,
@@ -1202,6 +1230,11 @@ handle_follower(#append_entries_reply{}, State) ->
12021230
%% handle to avoid logging as unhandled
12031231
%% could receive a lot of these shortly after standing down as leader
12041232
{follower, State, []};
1233+
handle_follower(election_timeout,
1234+
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
1235+
?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0",
1236+
[LogId, Voter]),
1237+
{follower, State, []};
12051238
handle_follower(election_timeout, State) ->
12061239
call_for_election(pre_vote, State);
12071240
handle_follower(try_become_leader, State) ->
@@ -1374,7 +1407,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
13741407
cluster_index_term,
13751408
query_index
13761409
], State),
1377-
O = maps:merge(O0, cfg_to_map(Cfg)),
1410+
O1 = O0#{voter_status => voter_status(State)},
1411+
O = maps:merge(O1, cfg_to_map(Cfg)),
13781412
LogOverview = ra_log:overview(Log),
13791413
MacOverview = ra_machine:overview(MacMod, MacState),
13801414
O#{log => LogOverview,
@@ -2087,6 +2121,7 @@ new_peer() ->
20872121
match_index => 0,
20882122
commit_index_sent => 0,
20892123
query_index => 0,
2124+
voter_status => voter,
20902125
status => normal}.
20912126

20922127
new_peer_with(Map) ->
@@ -2318,6 +2353,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
23182353
[log_id(State0), maps:keys(NewCluster)]),
23192354
%% we are recovering and should apply the cluster change
23202355
State0#{cluster => NewCluster,
2356+
voter_status => voter_status(id(State0), NewCluster),
23212357
cluster_change_permitted => true,
23222358
cluster_index_term => {Idx, Term}};
23232359
_ ->
@@ -2450,16 +2486,33 @@ append_log_leader({CmdTag, _, _, _},
24502486
when CmdTag == '$ra_join' orelse
24512487
CmdTag == '$ra_leave' ->
24522488
{not_appended, cluster_change_not_permitted, State};
2489+
append_log_leader({'$ra_join', From,
2490+
#{node := JoiningNode, voter_status := Voter},
2491+
ReplyMode},
2492+
State = #{cluster := OldCluster}) ->
2493+
case OldCluster of
2494+
#{JoiningNode := #{voter_status := Voter}} ->
2495+
already_member(State);
2496+
#{JoiningNode := Peer} ->
2497+
% Update member status.
2498+
Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}},
2499+
append_cluster_change(Cluster, From, ReplyMode, State);
2500+
_ ->
2501+
% Insert new member.
2502+
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})},
2503+
append_cluster_change(Cluster, From, ReplyMode, State)
2504+
end;
24532505
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
24542506
State = #{cluster := OldCluster}) ->
2507+
% Legacy $ra_join, join as non-voter iff no such member in the cluster.
24552508
case OldCluster of
24562509
#{JoiningNode := _} ->
2457-
% already a member do nothing
2458-
% TODO: reply? If we don't reply the caller may block until timeout
2459-
{not_appended, already_member, State};
2510+
already_member(State);
24602511
_ ->
2461-
Cluster = OldCluster#{JoiningNode => new_peer()},
2462-
append_cluster_change(Cluster, From, ReplyMode, State)
2512+
append_log_leader({'$ra_join', From,
2513+
#{node => JoiningNode, voter_status => new_nonvoter(State)},
2514+
ReplyMode},
2515+
State)
24632516
end;
24642517
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
24652518
State = #{cfg := #cfg{log_id = LogId},
@@ -2501,6 +2554,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
25012554
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
25022555
State) ->
25032556
State#{cluster => Cluster,
2557+
voter_status => voter_status(id(State), Cluster),
25042558
cluster_index_term => {Idx, Term}};
25052559
pre_append_log_follower(_, State) ->
25062560
State.
@@ -2577,6 +2631,8 @@ query_indexes(#{cfg := #cfg{id = Id},
25772631
query_index := QueryIndex}) ->
25782632
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
25792633
Acc;
2634+
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
2635+
Acc;
25802636
(_K, #{query_index := Idx}, Acc) ->
25812637
[Idx | Acc]
25822638
end, [QueryIndex], Cluster).
@@ -2587,6 +2643,8 @@ match_indexes(#{cfg := #cfg{id = Id},
25872643
{LWIdx, _} = ra_log:last_written(Log),
25882644
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
25892645
Acc;
2646+
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
2647+
Acc;
25902648
(_K, #{match_index := Idx}, Acc) ->
25912649
[Idx | Acc]
25922650
end, [LWIdx], Cluster).
@@ -2803,6 +2861,74 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
28032861
Name;
28042862
meta_name(#{names := #{log_meta := Name}}) ->
28052863
Name.
2864+
2865+
already_member(State) ->
2866+
% already a member do nothing
2867+
% TODO: reply? If we don't reply the caller may block until timeout
2868+
{not_appended, already_member, State}.
2869+
2870+
%%% ====================
2871+
%%% Voter status helpers
2872+
%%% ====================
2873+
2874+
-spec new_nonvoter(ra_server_state()) -> ra_voter_status().
2875+
new_nonvoter(#{commit_index := Target} = _State) ->
2876+
{nonvoter, #{target => Target}}.
2877+
2878+
-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().
2879+
maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->
2880+
% Unknown peer handled in the caller.
2881+
#{PeerID := #{match_index := MI, voter_status := OldStatus}} = Cluster,
2882+
case update_voter_status(OldStatus, MI) of
2883+
OldStatus ->
2884+
Effects;
2885+
voter ->
2886+
[{next_event,
2887+
{command, {'$ra_join',
2888+
#{ts => os:system_time(millisecond)},
2889+
#{node => PeerID, voter_status => voter},
2890+
noreply}}} |
2891+
Effects]
2892+
end.
2893+
2894+
update_voter_status({nonvoter, #{target := Target}}, MI)
2895+
when MI >= Target ->
2896+
voter;
2897+
update_voter_status(Permanent, _) ->
2898+
Permanent.
2899+
2900+
-spec voter_status(ra_server_state()) -> ra_voter_status().
2901+
voter_status(#{cluster := Cluster} = State) ->
2902+
case maps:get(voter_status, State, undefined) of
2903+
undefined ->
2904+
voter_status(id(State), Cluster);
2905+
Voter ->
2906+
Voter
2907+
end.
2908+
2909+
-spec voter_status(ra_server_id(), ra_cluster()) -> ra_voter_status().
2910+
voter_status(PeerId, Cluster) ->
2911+
case maps:get(PeerId, Cluster, undefined) of
2912+
undefined ->
2913+
undefined;
2914+
Peer ->
2915+
maps:get(voter_status, Peer, voter)
2916+
end.
2917+
2918+
-spec required_quorum(ra_cluster()) -> pos_integer().
2919+
required_quorum(Cluster) ->
2920+
Voters = count_voters(Cluster),
2921+
trunc(Voters / 2) + 1.
2922+
2923+
count_voters(Cluster) ->
2924+
maps:fold(
2925+
fun (_, #{voter_status := {nonvoter, _}}, Count) ->
2926+
Count;
2927+
(_, _, Count) ->
2928+
Count + 1
2929+
end,
2930+
0, Cluster).
2931+
28062932
%%% ===================
28072933
%%% Internal unit tests
28082934
%%% ===================

src/ra_server.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
1010
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
1111
-define(FLUSH_COMMANDS_SIZE, 16).
12+
-define(MAX_NONVOTER_ROUNDS, 4).
1213

1314
-record(cfg,
1415
{id :: ra_server_id(),
1516
uid :: ra_uid(),
1617
log_id :: unicode:chardata(),
1718
metrics_key :: term(),
19+
tick_timeout :: non_neg_integer(),
1820
machine :: ra_machine:machine(),
1921
machine_version :: ra_machine:version(),
2022
machine_versions :: [{ra_index(), ra_machine:version()}, ...],

0 commit comments

Comments
 (0)