Skip to content

Commit 9a14f89

Browse files
author
Alex Valiushko
committed
promote voters after sync
tick through replication rounds inject cluster_change effect
1 parent 453965a commit 9a14f89

File tree

6 files changed

+231
-258
lines changed

6 files changed

+231
-258
lines changed

src/ra.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
%% membership changes
6060
add_member/2,
6161
add_member/3,
62+
maybe_add_member/2,
63+
maybe_add_member/3,
6264
remove_member/2,
6365
remove_member/3,
6466
leave_and_terminate/3,
@@ -579,6 +581,13 @@ add_member(ServerLoc, ServerId, Timeout) ->
579581
{'$ra_join', ServerId, after_log_append},
580582
Timeout).
581583

584+
maybe_add_member(ServerLoc, ServerId) ->
585+
maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT).
586+
maybe_add_member(ServerLoc, ServerId, Timeout) ->
587+
ra_server_proc:command(ServerLoc,
588+
{'$ra_maybe_join', ServerId, after_log_append},
589+
Timeout).
590+
582591

583592
%% @doc Removes a server from the cluster's membership configuration.
584593
%% This function returns after appending a cluster membership change

src/ra.hrl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@
5757
% the commit index last sent
5858
% used for evaluating pipeline status
5959
commit_index_sent := non_neg_integer(),
60+
%% whether the peer is part of the consensus
61+
voter := ra_voter(),
6062
%% indicates that a snapshot is being sent
6163
%% to the peer
62-
status := ra_peer_status(),
63-
%% whether the peer is part of the consensus
64-
voter := ra_voter()}.
64+
status := ra_peer_status()}.
6565

6666
-type ra_cluster() :: #{ra_server_id() => ra_peer_state()}.
6767

src/ra_server.erl

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
query_index := non_neg_integer(),
8686
queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}),
8787
pending_consistent_queries := [consistent_query_ref()],
88-
voter => 'maybe'(ra_voter()),
8988
commit_latency => 'maybe'(non_neg_integer())
9089
}.
9190

@@ -397,11 +396,15 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
397396
Peer = Peer0#{match_index => max(MI, LastIdx),
398397
next_index => max(NI, NextIdx)},
399398
State1 = put_peer(PeerId, Peer, State0),
400-
{State2, Effects0} = evaluate_quorum(State1, []),
399+
Effects00 = ra_voter:maybe_promote(PeerId, State1, []),
400+
401+
{State2, Effects0} = evaluate_quorum(State1, Effects00),
401402

402403
{State, Effects1} = process_pending_consistent_queries(State2,
403404
Effects0),
405+
404406
Effects = [{next_event, info, pipeline_rpcs} | Effects1],
407+
405408
case State of
406409
#{cluster := #{Id := _}} ->
407410
% leader is in the cluster
@@ -1106,14 +1109,12 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
11061109
% simply forward all other events to ra_log
11071110
{Log, Effects} = ra_log:handle_event(Evt, Log0),
11081111
{follower, State#{log => Log}, Effects};
1109-
handle_follower(#pre_vote_rpc{}, #{voter := {no, _}} = State) ->
1110-
%% ignore elections, non-voter
1112+
handle_follower(#pre_vote_rpc{},
1113+
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
1114+
?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]),
11111115
{follower, State, []};
11121116
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
11131117
process_pre_vote(follower, PreVote, State);
1114-
handle_follower(#request_vote_rpc{}, #{voter := {no, _}} = State) ->
1115-
%% ignore elections, non-voter
1116-
{follower, State, []};
11171118
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
11181119
#{current_term := Term, voted_for := VotedFor,
11191120
cfg := #cfg{log_id = LogId}} = State)
@@ -1211,8 +1212,10 @@ handle_follower(#append_entries_reply{}, State) ->
12111212
%% handle to avoid logging as unhandled
12121213
%% could receive a lot of these shortly after standing down as leader
12131214
{follower, State, []};
1214-
handle_follower(election_timeout, #{voter := {no, _}} = State) ->
1215-
%% ignore elections, non-voter
1215+
handle_follower(election_timeout,
1216+
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
1217+
?WARN("~w: follower ignored election_timeout, non voter: ~p",
1218+
[LogId, Voter]),
12161219
{follower, State, []};
12171220
handle_follower(election_timeout, State) ->
12181221
call_for_election(pre_vote, State);
@@ -1381,6 +1384,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
13811384
last_applied,
13821385
cluster,
13831386
leader_id,
1387+
voter,
13841388
voted_for,
13851389
cluster_change_permitted,
13861390
cluster_index_term,
@@ -2099,8 +2103,8 @@ new_peer() ->
20992103
match_index => 0,
21002104
commit_index_sent => 0,
21012105
query_index => 0,
2102-
status => normal,
2103-
voter => yes}.
2106+
voter => yes,
2107+
status => normal}.
21042108

21052109
new_peer_with(Map) ->
21062110
maps:merge(new_peer(), Map).
@@ -2336,7 +2340,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
23362340
[log_id(State0), maps:keys(NewCluster)]),
23372341
%% we are recovering and should apply the cluster change
23382342
State0#{cluster => NewCluster,
2339-
voter => ra_voter:peer_status(id(State0), NewCluster),
2343+
voter => ra_voter:status(NewCluster, id(State0)),
23402344
cluster_change_permitted => true,
23412345
cluster_index_term => {Idx, Term}};
23422346
_ ->
@@ -2488,7 +2492,7 @@ append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode},
24882492
#{JoiningNode := _} ->
24892493
already_member(State);
24902494
_ ->
2491-
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
2495+
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
24922496
append_cluster_change(Cluster, From, ReplyMode, State)
24932497
end;
24942498
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
@@ -2520,7 +2524,6 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
25202524
case Cmd of
25212525
{'$ra_cluster_change', _, Cluster, _} ->
25222526
State#{cluster => Cluster,
2523-
voter => ra_voter:peer_status(id(State), Cluster),
25242527
cluster_index_term => {Idx, Term}};
25252528
_ ->
25262529
% revert back to previous cluster
@@ -2532,7 +2535,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
25322535
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
25332536
State) ->
25342537
State#{cluster => Cluster,
2535-
voter => ra_voter:peer_status(id(State), Cluster),
2538+
voter => ra_voter:status(Cluster, id(State)),
25362539
cluster_index_term => {Idx, Term}};
25372540
pre_append_log_follower(_, State) ->
25382541
State.
@@ -2609,6 +2612,8 @@ query_indexes(#{cfg := #cfg{id = Id},
26092612
query_index := QueryIndex}) ->
26102613
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
26112614
Acc;
2615+
(_K, #{voter := {no, _}}, Acc) ->
2616+
Acc;
26122617
(_K, #{query_index := Idx}, Acc) ->
26132618
[Idx | Acc]
26142619
end, [QueryIndex], Cluster).
@@ -2619,6 +2624,8 @@ match_indexes(#{cfg := #cfg{id = Id},
26192624
{LWIdx, _} = ra_log:last_written(Log),
26202625
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
26212626
Acc;
2627+
(_K, #{voter := {no, _}}, Acc) ->
2628+
Acc;
26222629
(_K, #{match_index := Idx}, Acc) ->
26232630
[Idx | Acc]
26242631
end, [LWIdx], Cluster).

src/ra_voter.erl

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,55 @@
77
-export([
88
new_nonvoter/1,
99
status/1,
10-
peer_status/2
10+
status/2,
11+
maybe_promote/3
1112
]).
1213

14+
-define(DEFAULT_MAX_ROUNDS, 4).
15+
1316
new_nonvoter(State) ->
14-
TargetIdx = maps:get(commit_index, State),
15-
{no, #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}}.
17+
Target = maps:get(commit_index, State),
18+
{no, #{round => 0, target => Target , ts => os:system_time(millisecond)}}.
1619

17-
status(State) ->
18-
case maps:get(voter, State) of
19-
undefined ->
20-
MyId = ra_server:id(State),
21-
#{cluster := Cluster} = State,
22-
peer_status(MyId, Cluster);
23-
Voter -> Voter
20+
maybe_promote(PeerID,
21+
#{commit_index := CI, cluster := Cluster} = _State,
22+
Effects) ->
23+
#{PeerID := #{match_index := MI, voter := OldStatus} = _Peer} = Cluster,
24+
case evaluate_voter(OldStatus, MI, CI) of
25+
OldStatus ->
26+
Effects;
27+
Change ->
28+
[{next_event,
29+
{command, {'$ra_join',
30+
#{ts => os:system_time(millisecond)},
31+
PeerID,
32+
noreply}}} |
33+
Effects]
2434
end.
2535

26-
peer_status(PeerId, Cluster) ->
27-
Peer = maps:get(PeerId, Cluster, undefined),
28-
maps:get(voter, Peer, yes).
36+
evaluate_voter({no, #{round := Round, target := Target , ts := RoundStart}}, MI, CI)
37+
when MI >= Target ->
38+
AtenPollInt = application:get_env(aten, poll_interval, 1000),
39+
Now = os:system_time(millisecond),
40+
case (Now - RoundStart) =< AtenPollInt of
41+
true ->
42+
yes;
43+
false when Round > ?DEFAULT_MAX_ROUNDS ->
44+
{no, permanent};
45+
false ->
46+
{no, #{round => Round+1, target => CI, ts => Now}}
47+
end;
48+
evaluate_voter(Permanent, _, _) ->
49+
Permanent.
50+
51+
status(#{cluster := Cluster} = State) ->
52+
Id = ra_server:id(State),
53+
status(Cluster, Id).
54+
55+
status(Cluster, PeerId) ->
56+
case maps:get(PeerId, Cluster, undefined) of
57+
undefined ->
58+
throw(not_a_cluster_member);
59+
Peer ->
60+
maps:get(voter, Peer, yes)
61+
end.

test/ra_server_SUITE.erl

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ all() ->
4141
follower_machine_version,
4242
follower_install_snapshot_machine_version,
4343
leader_server_join,
44+
leader_server_maybe_join,
4445
leader_server_leave,
4546
leader_is_removed,
4647
follower_cluster_change,
@@ -1333,6 +1334,54 @@ leader_server_join(_Config) ->
13331334
| _] = Effects,
13341335
ok.
13351336

1337+
leader_server_maybe_join(_Config) ->
1338+
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
1339+
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
1340+
N2 => new_peer_with(#{next_index => 4, match_index => 3}),
1341+
N3 => new_peer_with(#{next_index => 4, match_index => 3})},
1342+
State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster},
1343+
% raft servers should switch to the new configuration after log append
1344+
% and further cluster changes should be disallowed
1345+
{leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _},
1346+
cluster_change_permitted := false} = _State1, Effects} =
1347+
ra_server:handle_leader({command, {'$ra_maybe_join', meta(),
1348+
N4, await_consensus}}, State0),
1349+
% new member should join as non-voter
1350+
{no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0),
1351+
[
1352+
{send_rpc, N4,
1353+
#append_entries_rpc{entries =
1354+
[_, _, _, {4, 5, {'$ra_cluster_change', _,
1355+
#{N1 := _, N2 := _,
1356+
N3 := _, N4 := #{voter := {no, #{round := Round,
1357+
target := Target,
1358+
ts := _}}}},
1359+
await_consensus}}]}},
1360+
{send_rpc, N3,
1361+
#append_entries_rpc{entries =
1362+
[{4, 5, {'$ra_cluster_change', _,
1363+
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round,
1364+
target := Target,
1365+
ts := _}}}},
1366+
await_consensus}}],
1367+
term = 5, leader_id = N1,
1368+
prev_log_index = 3,
1369+
prev_log_term = 5,
1370+
leader_commit = 3}},
1371+
{send_rpc, N2,
1372+
#append_entries_rpc{entries =
1373+
[{4, 5, {'$ra_cluster_change', _,
1374+
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round,
1375+
target := Target,
1376+
ts := _}}}},
1377+
await_consensus}}],
1378+
term = 5, leader_id = N1,
1379+
prev_log_index = 3,
1380+
prev_log_term = 5,
1381+
leader_commit = 3}}
1382+
| _] = Effects,
1383+
ok.
1384+
13361385
leader_server_leave(_Config) ->
13371386
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
13381387
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
@@ -2593,8 +2642,8 @@ new_peer() ->
25932642
match_index => 0,
25942643
query_index => 0,
25952644
commit_index_sent => 0,
2596-
status => normal,
2597-
voter => yes}.
2645+
voter => yes,
2646+
status => normal}.
25982647

25992648
new_peer_with(Map) ->
26002649
maps:merge(new_peer(), Map).

0 commit comments

Comments
 (0)