9494 | recovered | stop | receive_snapshot .
9595
9696-type command_type () :: '$usr' | '$ra_join' | '$ra_leave' |
97- '$ra_maybe_join' |
9897 '$ra_cluster_change' | '$ra_cluster' .
9998
10099-type command_meta () :: #{from => from (),
@@ -396,15 +395,11 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
396395 Peer = Peer0 #{match_index => max (MI , LastIdx ),
397396 next_index => max (NI , NextIdx )},
398397 State1 = put_peer (PeerId , Peer , State0 ),
399- Effects00 = ra_voter :maybe_promote (PeerId , State1 , []),
400-
401- {State2 , Effects0 } = evaluate_quorum (State1 , Effects00 ),
398+ {State2 , Effects0 } = evaluate_quorum (State1 , []),
402399
403400 {State , Effects1 } = process_pending_consistent_queries (State2 ,
404401 Effects0 ),
405-
406402 Effects = [{next_event , info , pipeline_rpcs } | Effects1 ],
407-
408403 case State of
409404 #{cluster := #{Id := _ }} ->
410405 % leader is in the cluster
@@ -782,7 +777,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
782777 NewVotes = Votes + 1 ,
783778 ? DEBUG (" ~s : vote granted for term ~b votes ~b " ,
784779 [LogId , Term , NewVotes ]),
785- case trunc ( maps : size ( Nodes ) / 2 ) + 1 of
780+ case need_acks ( Nodes ) of
786781 NewVotes ->
787782 {State1 , Effects } = make_all_rpcs (initialise_peers (State0 )),
788783 Noop = {noop , #{ts => erlang :system_time (millisecond )},
@@ -928,7 +923,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
928923 [LogId , Token , Term , Votes + 1 ]),
929924 NewVotes = Votes + 1 ,
930925 State = update_term (Term , State0 ),
931- case trunc ( maps : size ( Nodes ) / 2 ) + 1 of
926+ case need_acks ( Nodes ) of
932927 NewVotes ->
933928 call_for_election (candidate , State );
934929 _ ->
@@ -1109,10 +1104,6 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
11091104 % simply forward all other events to ra_log
11101105 {Log , Effects } = ra_log :handle_event (Evt , Log0 ),
11111106 {follower , State #{log => Log }, Effects };
1112- handle_follower (# pre_vote_rpc {},
1113- #{cfg := # cfg {log_id = LogId }, voter := {no , _ } = Voter } = State ) ->
1114- ? WARN (" ~w : follower ignored request_vote_rpc, non voter: ~p " , [LogId , Voter ]),
1115- {follower , State , []};
11161107handle_follower (# pre_vote_rpc {} = PreVote , State ) ->
11171108 process_pre_vote (follower , PreVote , State );
11181109handle_follower (# request_vote_rpc {candidate_id = Cand , term = Term },
@@ -1212,11 +1203,6 @@ handle_follower(#append_entries_reply{}, State) ->
12121203 % % handle to avoid logging as unhandled
12131204 % % could receive a lot of these shortly after standing down as leader
12141205 {follower , State , []};
1215- handle_follower (election_timeout ,
1216- #{cfg := # cfg {log_id = LogId }, voter := {no , _ } = Voter } = State ) ->
1217- ? WARN (" ~w : follower ignored election_timeout, non voter: ~p " ,
1218- [LogId , Voter ]),
1219- {follower , State , []};
12201206handle_follower (election_timeout , State ) ->
12211207 call_for_election (pre_vote , State );
12221208handle_follower (try_become_leader , State ) ->
@@ -1384,7 +1370,6 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
13841370 last_applied ,
13851371 cluster ,
13861372 leader_id ,
1387- voter ,
13881373 voted_for ,
13891374 cluster_change_permitted ,
13901375 cluster_index_term ,
@@ -2044,11 +2029,13 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand,
20442029 last_log_term = LLTerm },
20452030 #{cfg := # cfg {machine_version = OurMacVer ,
20462031 effective_machine_version = EffMacVer },
2047- current_term := CurTerm } = State0 )
2032+ current_term := CurTerm ,
2033+ cluster := Cluster } = State0 )
20482034 when Term >= CurTerm ->
20492035 State = update_term (Term , State0 ),
20502036 LastIdxTerm = last_idx_term (State ),
2051- case is_candidate_log_up_to_date (LLIdx , LLTerm , LastIdxTerm ) of
2037+ case is_voter (Cand , Cluster ) andalso
2038+ is_candidate_log_up_to_date (LLIdx , LLTerm , LastIdxTerm ) of
20522039 true when Version > ? RA_PROTO_VERSION ->
20532040 ? DEBUG (" ~s : declining pre-vote for ~w for protocol version ~b " ,
20542041 [log_id (State0 ), Cand , Version ]),
@@ -2073,8 +2060,10 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand,
20732060 false ->
20742061 ? DEBUG (" ~s : declining pre-vote for ~w for term ~b ,"
20752062 " candidate last log index term was: ~w~n "
2076- " Last log entry idxterm seen was: ~w " ,
2077- [log_id (State0 ), Cand , Term , {LLIdx , LLTerm }, LastIdxTerm ]),
2063+ " Last log entry idxterm seen was: ~w~n "
2064+ " Voter status was: ~w~n " ,
2065+ [log_id (State0 ), Cand , Term , {LLIdx , LLTerm }, LastIdxTerm ,
2066+ get_voter_status (Cand , Cluster )]),
20782067 case FsmState of
20792068 follower ->
20802069 {FsmState , State , [start_election_timeout ]};
@@ -2103,16 +2092,15 @@ new_peer() ->
21032092 match_index => 0 ,
21042093 commit_index_sent => 0 ,
21052094 query_index => 0 ,
2106- voter => yes ,
21072095 status => normal }.
21082096
21092097new_peer_with (Map ) ->
21102098 maps :merge (new_peer (), Map ).
21112099
2112- already_member ( State ) ->
2113- % already a member do nothing
2114- % TODO: reply? If we don't reply the caller may block until timeout
2115- { not_appended , already_member , State } .
2100+ new_matching_peer (#{ commit_index : = CI } = _State ) ->
2101+ new_peer_with (#{
2102+ voter => { matching , CI }
2103+ }) .
21162104
21172105peers (#{cfg := # cfg {id = Id }, cluster := Peers }) ->
21182106 maps :remove (Id , Peers ).
@@ -2340,7 +2328,6 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
23402328 [log_id (State0 ), maps :keys (NewCluster )]),
23412329 % % we are recovering and should apply the cluster change
23422330 State0 #{cluster => NewCluster ,
2343- voter => ra_voter :status (NewCluster , id (State0 )),
23442331 cluster_change_permitted => true ,
23452332 cluster_index_term => {Idx , Term }};
23462333 _ ->
@@ -2471,28 +2458,17 @@ add_reply(_, _, _, % From, Reply, Mode
24712458append_log_leader ({CmdTag , _ , _ , _ },
24722459 State = #{cluster_change_permitted := false })
24732460 when CmdTag == '$ra_join' orelse
2474- CmdTag == '$ra_maybe_join' orelse
24752461 CmdTag == '$ra_leave' ->
24762462 {not_appended , cluster_change_not_permitted , State };
24772463append_log_leader ({'$ra_join' , From , JoiningNode , ReplyMode },
24782464 State = #{cluster := OldCluster }) ->
2479- case OldCluster of
2480- #{JoiningNode := #{voter := yes }} ->
2481- already_member (State );
2482- #{JoiningNode := #{voter := {no , _ }} = Peer } ->
2483- Cluster = OldCluster #{JoiningNode => Peer #{voter => yes }},
2484- append_cluster_change (Cluster , From , ReplyMode , State );
2485- _ ->
2486- Cluster = OldCluster #{JoiningNode => new_peer ()},
2487- append_cluster_change (Cluster , From , ReplyMode , State )
2488- end ;
2489- append_log_leader ({'$ra_maybe_join' , From , JoiningNode , ReplyMode },
2490- State = #{cluster := OldCluster }) ->
24912465 case OldCluster of
24922466 #{JoiningNode := _ } ->
2493- already_member (State );
2467+ % already a member do nothing
2468+ % TODO: reply? If we don't reply the caller may block until timeout
2469+ {not_appended , already_member , State };
24942470 _ ->
2495- Cluster = OldCluster #{JoiningNode => new_peer_with (#{ voter => ra_voter : new_nonvoter ( State )} )},
2471+ Cluster = OldCluster #{JoiningNode => new_matching_peer ( State )},
24962472 append_cluster_change (Cluster , From , ReplyMode , State )
24972473 end ;
24982474append_log_leader ({'$ra_leave' , From , LeavingServer , ReplyMode },
@@ -2535,7 +2511,6 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
25352511pre_append_log_follower ({Idx , Term , {'$ra_cluster_change' , _ , Cluster , _ }},
25362512 State ) ->
25372513 State #{cluster => Cluster ,
2538- voter => ra_voter :status (Cluster , id (State )),
25392514 cluster_index_term => {Idx , Term }};
25402515pre_append_log_follower (_ , State ) ->
25412516 State .
@@ -2612,10 +2587,11 @@ query_indexes(#{cfg := #cfg{id = Id},
26122587 query_index := QueryIndex }) ->
26132588 maps :fold (fun (PeerId , _ , Acc ) when PeerId == Id ->
26142589 Acc ;
2615- (_K , #{voter := {no , _ }}, Acc ) ->
2616- Acc ;
2617- (_K , #{query_index := Idx }, Acc ) ->
2618- [Idx | Acc ]
2590+ (_K , #{query_index := Idx } = Peer , Acc ) ->
2591+ case is_voter (Peer ) of
2592+ true -> [Idx | Acc ];
2593+ false -> Acc
2594+ end
26192595 end , [QueryIndex ], Cluster ).
26202596
26212597match_indexes (#{cfg := # cfg {id = Id },
@@ -2624,12 +2600,49 @@ match_indexes(#{cfg := #cfg{id = Id},
26242600 {LWIdx , _ } = ra_log :last_written (Log ),
26252601 maps :fold (fun (PeerId , _ , Acc ) when PeerId == Id ->
26262602 Acc ;
2627- (_K , #{voter := {no , _ }}, Acc ) ->
2628- Acc ;
2629- (_K , #{match_index := Idx }, Acc ) ->
2630- [Idx | Acc ]
2603+ (_K , #{match_index := Idx } = Peer , Acc ) ->
2604+ case is_voter (Peer ) of
2605+ true -> [Idx | Acc ];
2606+ false -> Acc
2607+ end
26312608 end , [LWIdx ], Cluster ).
26322609
2610+ get_voter_status (Id , Cluster ) ->
2611+ case maps :get (Id , Cluster ) of
2612+ undefined -> undefined ;
2613+ Peer -> get_voter_status (Peer )
2614+ end .
2615+
2616+ get_voter_status (#{voter := Status }) ->
2617+ Status ;
2618+ get_voter_status (_ ) ->
2619+ % Implicit 'yes' for initial cluster members, to differentiate from 'undefined' above.
2620+ yes .
2621+
2622+
2623+ is_voter (Id , Cluster ) ->
2624+ case maps :get (Id , Cluster ) of
2625+ undefined -> false ;
2626+ Peer -> is_voter (Peer )
2627+ end .
2628+
2629+ is_voter (#{voter := {matching , Target }, match_index := MI })
2630+ when MI >= Target ->
2631+ true ;
2632+ is_voter (#{voter := {matching , _ }}) ->
2633+ false ;
2634+ is_voter (_Peer ) ->
2635+ true .
2636+
2637+ need_acks (Cluster ) ->
2638+ NumVoters = maps :fold (fun (_ , Peer , Count ) ->
2639+ case is_voter (Peer ) of
2640+ true -> Count + 1 ;
2641+ false -> Count
2642+ end
2643+ end , 0 , Cluster ),
2644+ trunc (NumVoters / 2 ) + 1 .
2645+
26332646-spec agreed_commit (list ()) -> ra_index ().
26342647agreed_commit (Indexes ) ->
26352648 SortedIdxs = lists :sort (fun erlang :'>' /2 , Indexes ),
0 commit comments