Skip to content

Commit e6c474f

Browse files
author
Alex Valiushko
committed
track global non_voter status from the leader
1 parent 8badc9d commit e6c474f

File tree

8 files changed

+116
-189
lines changed

8 files changed

+116
-189
lines changed

src/ra.erl

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
%% queries
2424
members/1,
2525
members/2,
26+
cluster/1,
27+
cluster/2,
2628
initial_members/1,
2729
initial_members/2,
2830
local_query/2,
@@ -558,10 +560,9 @@ delete_cluster(ServerIds, Timeout) ->
558560
%% affect said cluster's availability characteristics (by increasing quorum node count).
559561
%%
560562
%% @param ServerLoc the ra server or servers to try to send the command to
561-
%% @param ServerId the ra server id of the new server, or a map with server id and settings.
563+
%% @param ServerId the ra server id of the new server.
562564
%% @end
563-
-spec add_member(ra_server_id() | [ra_server_id()],
564-
ra_server_id() | ra_new_server()) ->
565+
-spec add_member(ra_server_id() | [ra_server_id()], ra_server_id()) ->
565566
ra_cmd_ret() |
566567
{error, already_member} |
567568
{error, cluster_change_not_permitted}.
@@ -572,8 +573,7 @@ add_member(ServerLoc, ServerId) ->
572573
%% @see add_member/2
573574
%% @end
574575
-spec add_member(ra_server_id() | [ra_server_id()],
575-
ra_server_id() | ra_new_server(),
576-
timeout()) ->
576+
ra_server_id(), timeout()) ->
577577
ra_cmd_ret() |
578578
{error, already_member} |
579579
{error, cluster_change_not_permitted}.
@@ -1038,6 +1038,44 @@ members({local, ServerId}, Timeout) ->
10381038
members(ServerId, Timeout) ->
10391039
ra_server_proc:state_query(ServerId, members, Timeout).
10401040

1041+
%% @doc Returns a map of cluster members to their status.
1042+
%%
1043+
%% Except if `{local, ServerId}' is passed, the query is sent to the specified
1044+
%% server which may redirect it to the leader if it is a follower. It may
1045+
%% timeout if there is currently no leader (i.e. an election is in progress).
1046+
%%
1047+
%% With `{local, ServerId}', the query is always handled by the specified
1048+
%% server. It means the returned list might be out-of-date compared to what the
1049+
%% leader would have returned.
1050+
%%
1051+
%% @param ServerId the Ra server(s) to send the query to
1052+
%% @end
1053+
-spec cluster(ra_server_id() | [ra_server_id()]) ->
1054+
ra_server_proc:ra_leader_call_ret(ra_cluster()).
1055+
cluster(ServerId) ->
1056+
cluster(ServerId, ?DEFAULT_TIMEOUT).
1057+
1058+
%% @doc Returns a map of cluster members to their status.
1059+
%%
1060+
%% Except if `{local, ServerId}' is passed, the query is sent to the specified
1061+
%% server which may redirect it to the leader if it is a follower. It may
1062+
%% timeout if there is currently no leader (i.e. an election is in progress).
1063+
%%
1064+
%% With `{local, ServerId}', the query is always handled by the specified
1065+
%% server. It means the returned list might be out-of-date compared to what the
1066+
%% leader would have returned.
1067+
%%
1068+
%% @param ServerId the Ra server(s) to send the query to
1069+
%% @param Timeout the timeout to use
1070+
%% @end
1071+
-spec cluster(ra_server_id() | [ra_server_id()] | {local, ra_server_id()},
1072+
timeout()) ->
1073+
ra_server_proc:ra_leader_call_ret(ra_cluster()).
1074+
cluster({local, ServerId}, Timeout) ->
1075+
ra_server_proc:local_state_query(ServerId, cluster, Timeout);
1076+
cluster(ServerId, Timeout) ->
1077+
ra_server_proc:state_query(ServerId, cluster, Timeout).
1078+
10411079
%% @doc Returns a list of initial (seed) cluster members.
10421080
%%
10431081
%% This allows Ra-based systems with dynamic cluster membership

src/ra.hrl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@
3939
%% after node restart). Pids are not stable in this sense.
4040
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
4141

42-
%% Specifies server configuration for a new cluster member.
43-
-type ra_new_server() :: #{id := ra_server_id(),
44-
voter := boolean()}.
45-
4642
-type ra_peer_status() :: normal |
4743
{sending_snapshot, pid()} |
4844
suspended |

src/ra_server.erl

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2513,25 +2513,16 @@ append_log_leader({'$ra_join', From,
25132513
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})},
25142514
append_cluster_change(Cluster, From, ReplyMode, State)
25152515
end;
2516-
append_log_leader({'$ra_join', From, #{id := JoiningNode,
2517-
voter := WantVoter}, ReplyMode},
2518-
State) ->
2519-
% Shortcut to compute non-voter status
2520-
VoterStatus = case WantVoter of
2521-
true -> voter;
2522-
false -> new_nonvoter(State)
2523-
end,
2524-
append_log_leader({'$ra_join', From, #{id => JoiningNode,
2525-
voter_status => VoterStatus}, ReplyMode},
2526-
State);
25272516
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
25282517
State = #{cluster := OldCluster}) ->
2529-
% Legacy $ra_join, join as full voter iff no such member in the cluster.
2518+
% Legacy $ra_join, join as non voter iff no such member in the cluster.
25302519
case OldCluster of
25312520
#{JoiningNode := _} ->
25322521
already_member(State);
25332522
_ ->
2534-
append_log_leader({'$ra_join', From, #{id => JoiningNode, voter => true}, ReplyMode},
2523+
append_log_leader({'$ra_join', From,
2524+
#{id => JoiningNode, voter_status => new_nonvoter(State)},
2525+
ReplyMode},
25352526
State)
25362527
end;
25372528
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
@@ -2583,17 +2574,7 @@ append_cluster_change(Cluster, From, ReplyMode,
25832574
cluster := PrevCluster,
25842575
cluster_index_term := {PrevCITIdx, PrevCITTerm},
25852576
current_term := Term}) ->
2586-
%% TODO A bit dense.
2587-
maps:foreach(fun(K, #{voter_status := TargetStatus}) ->
2588-
case maps:get(K, PrevCluster, #{voter_status => voter}) of
2589-
#{voter_status := TargetStatus} ->
2590-
ok;
2591-
#{voter_status := voter} when TargetStatus =/= voter ->
2592-
ets:insert(ra_non_voters, K);
2593-
#{voter_status := {nonvoter, _}} when TargetStatus =:= voter ->
2594-
ets:delete_object(ra_non_voters, K)
2595-
end
2596-
end, Cluster),
2577+
update_known_non_voters(PrevCluster, Cluster),
25972578
% turn join command into a generic cluster change command
25982579
% that include the new cluster configuration
25992580
Command = {'$ra_cluster_change', From, Cluster, ReplyMode},
@@ -2609,6 +2590,26 @@ append_cluster_change(Cluster, From, ReplyMode,
26092590
cluster_index_term => IdxTerm,
26102591
previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}}}.
26112592

2593+
update_known_non_voters(OldCluster, NewCluster) ->
2594+
%% ra_non_voters table is used to report known non-voters from the leader perspecrtive.
2595+
maps:foreach(fun(K, V) ->
2596+
OldV = maps:get(K, OldCluster, undefined),
2597+
update_known_non_voter(K, OldV, V)
2598+
end, NewCluster),
2599+
Removed = maps:to_list(OldCluster) -- maps:to_list(NewCluster),
2600+
lists:foreach(fun({K, OldV})->
2601+
update_known_non_voter(K, OldV, undefined)
2602+
end, Removed).
2603+
2604+
update_known_non_voter(_Id, Peer, Peer) ->
2605+
ok;
2606+
update_known_non_voter(Id, _OldPeer, undefined) ->
2607+
ets:delete_object(ra_non_voters, Id);
2608+
update_known_non_voter(Id, _OldPeer, #{voter_status := voter}) ->
2609+
ets:delete_object(ra_non_voters, Id);
2610+
update_known_non_voter(Id, _OldPeer, #{voter_status := _}) ->
2611+
ets:insert(ra_non_voters, Id).
2612+
26122613
mismatch_append_entries_reply(Term, CommitIndex, State0) ->
26132614
{CITerm, State} = fetch_term(CommitIndex, State0),
26142615
% assert CITerm is found

src/ra_server_proc.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ log_fold(ServerId, Fun, InitialState, Timeout) ->
184184
all |
185185
overview |
186186
members |
187+
cluster |
187188
initial_members |
188189
machine, timeout()) ->
189190
ra_leader_call_ret(term()).
@@ -194,6 +195,7 @@ state_query(ServerLoc, Spec, Timeout) ->
194195
all |
195196
overview |
196197
members |
198+
cluster |
197199
initial_members |
198200
machine, timeout()) ->
199201
ra_local_call_ret(term()).
@@ -1514,6 +1516,8 @@ do_state_query(machine, #{machine_state := MacState}) ->
15141516
MacState;
15151517
do_state_query(members, #{cluster := Cluster}) ->
15161518
maps:keys(Cluster);
1519+
do_state_query(cluster, #{cluster := Cluster}) ->
1520+
Cluster;
15171521
do_state_query(initial_members, #{log := Log}) ->
15181522
case ra_log:read_config(Log) of
15191523
{ok, #{initial_members := InitialMembers}} ->

test/coordination_SUITE.erl

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ disconnected_node_catches_up(Config) ->
399399
nonvoter_catches_up(Config) ->
400400
PrivDir = ?config(data_dir, Config),
401401
ClusterName = ?config(cluster_name, Config),
402-
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
402+
[{_, ANode} = A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
403403
Machine = {module, ?MODULE, #{}},
404404
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
405405
{ok, _, Leader} = ra:members(hd(Started)),
@@ -408,25 +408,25 @@ nonvoter_catches_up(Config) ->
408408
|| N <- lists:seq(1, 10000)],
409409
{ok, _, _} = ra:process_command(Leader, banana),
410410

411-
New = #{id => C, voter => false},
412-
{ok, _, _} = ra:add_member(A, New),
411+
{ok, _, _} = ra:add_member(A, C),
413412
ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
414-
?assertMatch({ok, #{cluster := #{C := #{voter_status := {nonvoter, _}}}}, _},
415-
ra:member_overview(A)),
413+
?assertMatch({ok, #{C := #{voter_status := {nonvoter, _}}}, _}, ra:cluster(C)),
414+
?assertMatch(#{known_non_voters := [C]}, rpc:call(ANode, ra, overview,[?SYS])),
416415

417416
await_condition(
418417
fun () ->
419-
{ok, #{cluster := #{C := Peer}}, _} = ra:member_overview(A),
420-
voter == maps:get(voter_status, Peer)
418+
{ok, #{C := #{voter_status := Voter}}, _} = ra:cluster(C),
419+
voter =:= Voter
421420
end, 200),
421+
?assertMatch(#{known_non_voters := []}, rpc:call(ANode, ra, overview,[?SYS])),
422422

423423
[ok = slave:stop(S) || {_, S} <- ServerIds],
424424
ok.
425425

426426
nonvoter_catches_up_after_restart(Config) ->
427427
PrivDir = ?config(data_dir, Config),
428428
ClusterName = ?config(cluster_name, Config),
429-
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
429+
[{_, ANode} = A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
430430
Machine = {module, ?MODULE, #{}},
431431
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
432432
{ok, _, Leader} = ra:members(hd(Started)),
@@ -435,27 +435,27 @@ nonvoter_catches_up_after_restart(Config) ->
435435
|| N <- lists:seq(1, 10000)],
436436
{ok, _, _} = ra:process_command(Leader, banana),
437437

438-
New = #{id => C, voter => false},
439-
{ok, _, _} = ra:add_member(A, New),
438+
{ok, _, _} = ra:add_member(A, C),
440439
ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
441-
?assertMatch({ok, #{cluster := #{C := #{voter_status := {nonvoter, _}}}}, _},
442-
ra:member_overview(A)),
440+
?assertMatch({ok, #{C := #{voter_status := {nonvoter, _}}}, _}, ra:cluster(C)),
441+
?assertMatch(#{known_non_voters := [C]}, rpc:call(ANode, ra, overview,[?SYS])),
443442
ok = ra:stop_server(?SYS, C),
444443
ok = ra:restart_server(?SYS, C),
445444

446445
await_condition(
447446
fun () ->
448-
{ok, #{cluster := #{C := Peer}}, _} = ra:member_overview(A),
449-
voter == maps:get(voter_status, Peer)
447+
{ok, #{C := #{voter_status := Voter}}, _} = ra:cluster(C),
448+
voter =:= Voter
450449
end, 200),
450+
?assertMatch(#{known_non_voters := []}, rpc:call(ANode, ra, overview,[?SYS])),
451451

452452
[ok = slave:stop(S) || {_, S} <- ServerIds],
453453
ok.
454454

455455
nonvoter_catches_up_after_leader_restart(Config) ->
456456
PrivDir = ?config(data_dir, Config),
457457
ClusterName = ?config(cluster_name, Config),
458-
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
458+
[{_, ANode} = A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
459459
Machine = {module, ?MODULE, #{}},
460460
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
461461
{ok, _, Leader} = ra:members(hd(Started)),
@@ -464,19 +464,19 @@ nonvoter_catches_up_after_leader_restart(Config) ->
464464
|| N <- lists:seq(1, 10000)],
465465
{ok, _, _} = ra:process_command(Leader, banana),
466466

467-
New = #{id => C, voter => false},
468-
{ok, _, _} = ra:add_member(A, New),
467+
{ok, _, _} = ra:add_member(A, C),
469468
ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
470-
?assertMatch({ok, #{cluster := #{C := #{voter_status := {nonvoter, _}}}}, _},
471-
ra:member_overview(A)),
469+
?assertMatch({ok, #{C := #{voter_status := {nonvoter, _}}}, _}, ra:cluster(C)),
470+
?assertMatch(#{known_non_voters := [C]}, rpc:call(ANode, ra, overview,[?SYS])),
472471
ok = ra:stop_server(?SYS, Leader),
473472
ok = ra:restart_server(?SYS, Leader),
474473

475474
await_condition(
476475
fun () ->
477-
{ok, #{cluster := #{C := Peer}}, _} = ra:member_overview(A),
478-
voter == maps:get(voter_status, Peer)
476+
{ok, #{C := #{voter_status := Voter}}, _} = ra:cluster(C),
477+
voter =:= Voter
479478
end, 200),
479+
?assertMatch(#{known_non_voters := []}, rpc:call(ANode, ra, overview,[?SYS])),
480480

481481
[ok = slave:stop(S) || {_, S} <- ServerIds],
482482
ok.
@@ -730,3 +730,14 @@ await_condition(Fun, Attempts) ->
730730
timer:sleep(100),
731731
await_condition(Fun, Attempts - 1)
732732
end.
733+
734+
fail_on_condition(_Fun, 0) ->
735+
ok;
736+
fail_on_condition(Fun, Attempts) ->
737+
case catch Fun() of
738+
true ->
739+
exit(condition_materialised);
740+
_ ->
741+
timer:sleep(100),
742+
fail_on_condition(Fun, Attempts - 1)
743+
end.

test/ra_2_SUITE.erl

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ all_tests() ->
4545
external_reader,
4646
add_member_without_quorum,
4747
force_start_follower_as_single_member,
48-
force_start_follower_as_single_member_nonvoter,
4948
initial_members_query
5049
].
5150

@@ -674,46 +673,6 @@ force_start_follower_as_single_member(Config) ->
674673
UId4 = ?config(uid4, Config),
675674
Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]),
676675
{ok, _, _} = ra:add_member(ServerId3, ServerId4),
677-
{timeout,_} = ra:process_command(ServerId3, {enq, banana}),
678-
%% start new member
679-
ok = ra:start_server(?SYS, Conf4),
680-
{ok, _, ServerId3} = ra:members(ServerId4),
681-
ok = enqueue(ServerId3, msg3),
682-
683-
ok.
684-
685-
force_start_follower_as_single_member_nonvoter(Config) ->
686-
ok = logger:set_primary_config(level, all),
687-
%% ra:start_server should fail if the node already exists
688-
ClusterName = ?config(cluster_name, Config),
689-
PrivDir = ?config(priv_dir, Config),
690-
ServerId1 = ?config(server_id, Config),
691-
ServerId2 = ?config(server_id2, Config),
692-
ServerId3 = ?config(server_id3, Config),
693-
InitialCluster = [ServerId1, ServerId2, ServerId3],
694-
ok = start_cluster(ClusterName, InitialCluster),
695-
timer:sleep(100),
696-
%% stop majority to simulate permanent outage
697-
ok = ra:stop_server(?SYS, ServerId1),
698-
ok = ra:stop_server(?SYS, ServerId2),
699-
700-
timer:sleep(100),
701-
%% force the remaining node to change it's membership
702-
ok = ra_server_proc:force_shrink_members_to_current_member(ServerId3),
703-
{ok, [_], ServerId3} = ra:members(ServerId3),
704-
ok = enqueue(ServerId3, msg1),
705-
706-
%% test that it works after restart
707-
ok = ra:stop_server(?SYS, ServerId3),
708-
ok = ra:restart_server(?SYS, ServerId3),
709-
{ok, [_], ServerId3} = ra:members(ServerId3),
710-
ok = enqueue(ServerId3, msg2),
711-
712-
%% add a member
713-
ServerId4 = ?config(server_id4, Config),
714-
UId4 = ?config(uid4, Config),
715-
Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]),
716-
{ok, _, _} = ra:add_member(ServerId3, #{id => ServerId4, voter => false}),
717676
%% the membership has changed but member not running yet
718677
%% it is nonvoter and does not affect quorum size
719678
{ok, _, _} = ra:process_command(ServerId3, {enq, banana}),

0 commit comments

Comments
 (0)