Skip to content

Commit c681c6b

Browse files
author
Alex Valiushko
committed
fix: member restarts as nonvoter after catching up
1 parent abffcb8 commit c681c6b

File tree

5 files changed

+131
-31
lines changed

5 files changed

+131
-31
lines changed

src/ra.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
-type snapshot_meta() :: #{index := ra_index(),
150150
term := ra_term(),
151151
cluster := ra_cluster_servers(),
152+
non_voters => ra_cluster_servers(),
152153
machine_version := ra_machine:version()}.
153154

154155
-record(install_snapshot_rpc,

src/ra_log.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,12 +638,17 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState,
638638
reader = Reader,
639639
snapshot_state = SnapState} = State0) ->
640640
ClusterServerIds = maps:keys(Cluster),
641+
NonvoterIds = maps:fold(fun
642+
(K, #{voter_status := {nonvoter, _}}, Acc) -> [K|Acc];
643+
(_, _, Acc) -> Acc
644+
end, [], Cluster),
641645
SnapLimit = case ra_snapshot:current(SnapState) of
642646
undefined -> SnapInter;
643647
{I, _} -> I + SnapInter
644648
end,
645649
Meta = #{index => Idx,
646650
cluster => ClusterServerIds,
651+
non_voters => NonvoterIds,
647652
machine_version => MacVersion},
648653
% The release cursor index is the last entry _not_ contributing
649654
% to the current state. I.e. the last entry that can be discarded.

src/ra_server.erl

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -295,21 +295,25 @@ init(#{id := Id,
295295

296296
LatestMacVer = ra_machine:version(Machine),
297297

298-
{_FirstIndex, Cluster0, MacVer, MacState,
298+
{Cluster, VoterStatus,
299+
MacVer, MacState,
299300
{SnapshotIdx, _} = SnapshotIndexTerm} =
300301
case ra_log:recover_snapshot(Log0) of
301302
undefined ->
302303
InitialMachineState = ra_machine:init(Machine, Name),
303-
{0, make_cluster(Id, InitialNodes),
304-
0, InitialMachineState, {0, 0}};
305-
{#{index := Idx,
306-
term := Term,
304+
Voter = case maps:get(voter, Config, true) of
305+
false ->
306+
new_nonvoter(init);
307+
true ->
308+
voter
309+
end,
310+
{make_cluster(Id, InitialNodes), Voter, 0, InitialMachineState, {0, 0}};
311+
{#{index := Idx, term := Term,
307312
cluster := ClusterNodes,
308-
machine_version := MacVersion}, MacSt} ->
309-
Clu = make_cluster(Id, ClusterNodes),
310-
%% the snapshot is the last index before the first index
311-
%% TODO: should this be Idx + 1?
312-
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
313+
machine_version := MacVersion} = Snapshot, MacSt} ->
314+
Nonvoters = maps:get(non_voters, Snapshot, []),
315+
Clu = make_cluster(Id, ClusterNodes, Nonvoters, snapshot),
316+
{Clu, voter_status(Id, Clu), MacVersion, MacSt, {Idx, Term}}
313317
end,
314318
MacMod = ra_machine:which_module(Machine, MacVer),
315319

@@ -331,18 +335,9 @@ init(#{id := Id,
331335
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
332336
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),
333337

334-
VoterStatus = case maps:get(voter, Config, true) of
335-
false ->
336-
{nonvoter, init};
337-
true ->
338-
voter
339-
end,
340-
Peer = maps:get(Id, Cluster0),
341-
Cluster1 = Cluster0#{Id => Peer#{voter_status => VoterStatus}},
342-
343338
#{cfg => Cfg,
344339
current_term => CurrentTerm,
345-
cluster => Cluster1,
340+
cluster => Cluster,
346341
% There may be scenarios when a single server
347342
% starts up but hasn't
348343
% yet re-applied its noop command that we may receive other join
@@ -1296,13 +1291,14 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
12961291
Cfg0
12971292
end,
12981293

1299-
{#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log),
1294+
{#{cluster := ClusterIds} = Snapshot, MacState} = ra_log:recover_snapshot(Log),
1295+
Nonvoters = maps:get(non_voters, Snapshot, []),
13001296
State = update_term(Term,
13011297
State0#{cfg => Cfg,
13021298
log => Log,
13031299
commit_index => SnapIndex,
13041300
last_applied => SnapIndex,
1305-
cluster => make_cluster(Id, ClusterIds),
1301+
cluster => make_cluster(Id, ClusterIds, Nonvoters, remote_snapshot),
13061302
machine_state => MacState}),
13071303
%% it was the last snapshot chunk so we can revert back to
13081304
%% follower status
@@ -2245,8 +2241,15 @@ fetch_term(Idx, #{log := Log0} = State) ->
22452241
end.
22462242

22472243
make_cluster(Self, Nodes) ->
2244+
make_cluster(Self, Nodes, [], all_voters).
2245+
2246+
make_cluster(Self, Nodes, Nonvoters, Reason) ->
22482247
case lists:foldl(fun(N, Acc) ->
2249-
Acc#{N => new_peer()}
2248+
P = case lists:member(N, Nonvoters) of
2249+
true -> new_nonvoter(Reason);
2250+
false -> new_peer()
2251+
end,
2252+
Acc#{N =>P}
22502253
end, #{}, Nodes) of
22512254
#{Self := _} = Cluster ->
22522255
% current server is already in cluster - do nothing
@@ -2895,9 +2898,11 @@ already_member(State) ->
28952898
%%% Voter status helpers
28962899
%%% ====================
28972900

2898-
-spec new_nonvoter(ra_server_state()) -> ra_voter_status().
2901+
-spec new_nonvoter(ra_server_state() | atom()) -> ra_voter_status().
28992902
new_nonvoter(#{commit_index := Target} = _State) ->
2900-
{nonvoter, #{target => Target}}.
2903+
{nonvoter, #{target => Target}};
2904+
new_nonvoter(Reason) when is_atom(Reason) ->
2905+
{nonvoter, Reason}.
29012906

29022907
-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().
29032908
maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->

test/coordination_SUITE.erl

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ all() ->
2929

3030
all_tests() ->
3131
[
32+
nonvoter_catches_up,
33+
nonvoter_catches_up_after_restart,
34+
nonvoter_catches_up_after_leader_restart,
3235
start_stop_restart_delete_on_remote,
3336
start_cluster,
3437
start_or_restart_cluster,
@@ -393,6 +396,91 @@ disconnected_node_catches_up(Config) ->
393396
[ok = slave:stop(S) || {_, S} <- ServerIds],
394397
ok.
395398

399+
nonvoter_catches_up(Config) ->
400+
PrivDir = ?config(data_dir, Config),
401+
ClusterName = ?config(cluster_name, Config),
402+
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
403+
Machine = {module, ?MODULE, #{}},
404+
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
405+
{ok, _, Leader} = ra:members(hd(Started)),
406+
407+
[ok = ra:pipeline_command(Leader, N, no_correlation, normal)
408+
|| N <- lists:seq(1, 10000)],
409+
{ok, _, _} = ra:process_command(Leader, banana),
410+
411+
New = #{id => C, voter => false},
412+
{ok, _, _} = ra:add_member(A, New),
413+
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
414+
?assertMatch({ok, #{voter_status := {nonvoter, _}}, _},
415+
ra:member_overview(C)),
416+
417+
await_condition(
418+
fun () ->
419+
{ok, O2, _} = ra:member_overview(C),
420+
voter == maps:get(voter_status, O2)
421+
end, 200),
422+
423+
[ok = slave:stop(S) || {_, S} <- ServerIds],
424+
ok.
425+
426+
nonvoter_catches_up_after_restart(Config) ->
427+
PrivDir = ?config(data_dir, Config),
428+
ClusterName = ?config(cluster_name, Config),
429+
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
430+
Machine = {module, ?MODULE, #{}},
431+
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
432+
{ok, _, Leader} = ra:members(hd(Started)),
433+
434+
[ok = ra:pipeline_command(Leader, N, no_correlation, normal)
435+
|| N <- lists:seq(1, 10000)],
436+
{ok, _, _} = ra:process_command(Leader, banana),
437+
438+
New = #{id => C, voter => false},
439+
{ok, _, _} = ra:add_member(A, New),
440+
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
441+
?assertMatch({ok, #{voter_status := {nonvoter, _}}, _},
442+
ra:member_overview(C)),
443+
ok = ra:stop_server(?SYS, C),
444+
ok = ra:restart_server(?SYS, C),
445+
446+
await_condition(
447+
fun () ->
448+
{ok, O2, _} = ra:member_overview(C),
449+
voter == maps:get(voter_status, O2)
450+
end, 200),
451+
452+
[ok = slave:stop(S) || {_, S} <- ServerIds],
453+
ok.
454+
455+
nonvoter_catches_up_after_leader_restart(Config) ->
456+
PrivDir = ?config(data_dir, Config),
457+
ClusterName = ?config(cluster_name, Config),
458+
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
459+
Machine = {module, ?MODULE, #{}},
460+
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
461+
{ok, _, Leader} = ra:members(hd(Started)),
462+
463+
[ok = ra:pipeline_command(Leader, N, no_correlation, normal)
464+
|| N <- lists:seq(1, 10000)],
465+
{ok, _, _} = ra:process_command(Leader, banana),
466+
467+
New = #{id => C, voter => false},
468+
{ok, _, _} = ra:add_member(A, New),
469+
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
470+
?assertMatch({ok, #{voter_status := {nonvoter, _}}, _},
471+
ra:member_overview(C)),
472+
ok = ra:stop_server(?SYS, Leader),
473+
ok = ra:restart_server(?SYS, Leader),
474+
475+
await_condition(
476+
fun () ->
477+
{ok, O2, _} = ra:member_overview(C),
478+
voter == maps:get(voter_status, O2)
479+
end, 200),
480+
481+
[ok = slave:stop(S) || {_, S} <- ServerIds],
482+
ok.
483+
396484
key_metrics(Config) ->
397485
PrivDir = ?config(data_dir, Config),
398486
ClusterName = ?config(cluster_name, Config),

test/ra_log_snapshot_SUITE.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ end_per_testcase(_TestCase, _Config) ->
7171

7272
roundtrip(Config) ->
7373
Dir = ?config(dir, Config),
74-
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
74+
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}], [{banana, node@savanna}]),
7575
SnapshotRef = my_state,
7676
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
7777
Context = #{can_accept_full_file => true},
@@ -80,7 +80,7 @@ roundtrip(Config) ->
8080

8181
roundtrip_compat(Config) ->
8282
Dir = ?config(dir, Config),
83-
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
83+
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}], [{banana, node@savanna}]),
8484
SnapshotRef = my_state,
8585
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
8686
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)),
@@ -105,7 +105,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) ->
105105
Dir = dir(?config(dir, Config), Name),
106106
AcceptDir = dir(?config(accept_dir, Config), Name),
107107
ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]),
108-
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
108+
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}], [{banana, node@savanna}]),
109109
SnapshotRef = crypto:strong_rand_bytes(DataSize),
110110
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
111111
Context = #{can_accept_full_file => FullFile},
@@ -178,24 +178,25 @@ recover_invalid_checksum(Config) ->
178178

179179
read_meta_data(Config) ->
180180
Dir = ?config(dir, Config),
181-
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
181+
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}], [{banana, node@savanna}]),
182182
SnapshotRef = my_state,
183183
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
184184
{ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir),
185185
ok.
186186

187187
recover_same_as_read(Config) ->
188188
Dir = ?config(dir, Config),
189-
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
189+
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}], [{banana, node@savanna}]),
190190
SnapshotData = my_state,
191191
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData),
192192
{ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir),
193193
ok.
194194

195195
%% Utility
196196

197-
meta(Idx, Term, Cluster) ->
197+
meta(Idx, Term, Cluster, Nonvoters) ->
198198
#{index => Idx,
199199
term => Term,
200200
cluster => Cluster,
201+
non_voters => Nonvoters,
201202
machine_version => 1}.

0 commit comments

Comments
 (0)