|
| 1 | +-module(rafter_consensus_fsm). |
| 2 | + |
| 3 | +-behaviour(gen_fsm). |
| 4 | + |
| 5 | +-define(ELECTION_TIMEOUT_MIN, 150). |
| 6 | +-define(ELECTION_TIMEOUT_MAX, 300). |
| 7 | + |
| 8 | +%% API |
| 9 | +-export([start_link/0]). |
| 10 | + |
| 11 | +%% gen_fsm callbacks |
| 12 | +-export([init/1, code_change/4, handle_event/3, handle_info/3, |
| 13 | + handle_sync_event/4, terminate/3]). |
| 14 | + |
| 15 | +%% States |
| 16 | +-export([leader/2, follower/2, candidate/2]). |
| 17 | + |
| 18 | +-record(state, { |
| 19 | + leader :: string(), |
| 20 | + term = 0 :: non_neg_integer(), |
| 21 | + voted_for :: term(), |
| 22 | + last_log_term :: non_neg_integer(), |
| 23 | + last_log_index :: non_neg_integer(), |
| 24 | + |
| 25 | + %% The last time a timer was created |
| 26 | + timer_start:: non_neg_integer(), |
| 27 | + |
| 28 | + %% leader state |
| 29 | + followers = dict:new() :: dict:dict(), |
| 30 | + |
| 31 | + %% Responses from RPCs to other servers |
| 32 | + responses = dict:new() :: dict:dict(), |
| 33 | + |
| 34 | + %% All servers making up the ensemble |
| 35 | + me :: string(), |
| 36 | + peers :: list(string()), |
| 37 | + |
| 38 | + %% Different Transports can be plugged in (erlang messaging, tcp, udp, etc...) |
| 39 | + %% To get this thing implemented quickly, erlang messaging is hardcoded for now |
| 40 | + transport = erlang :: erlang}). |
| 41 | + |
| 42 | +start_link() -> |
| 43 | + gen_fsm:start_link(?MODULE, [Self, Peers], []). |
| 44 | + |
| 45 | +%%============================================================================= |
| 46 | +%% gen_fsm callbacks |
| 47 | +%%============================================================================= |
| 48 | + |
| 49 | +init([]) -> |
| 50 | + random:seed(), |
| 51 | + Me = rafter_config:get_id(), |
| 52 | + Peers = rafter_config:get_peers(), |
| 53 | + State = #state{peers=Peers, me=Me, timer_created=os:timestamp()}, |
| 54 | + {ok, follower, State, election_timeout()}. |
| 55 | + |
| 56 | +%%============================================================================= |
| 57 | +%% States |
| 58 | +%%============================================================================= |
| 59 | + |
| 60 | +follower(timeout, State) -> |
| 61 | + {ok, candidate, State, election_timeout()}; |
| 62 | +follower(#request_vote{from=CandidateId}=RequestVote, State) -> |
| 63 | + NewState = set_term(RequestVote#request_vote.term, State), |
| 64 | + {ok, Vote} = vote(RequestVote, NewState), |
| 65 | + %% rafter_log:write(NewState), |
| 66 | + transfer:send(CandidateId, Vote); |
| 67 | + case Vote#vote.success of |
| 68 | + true -> |
| 69 | + {ok, follower, NewState, election_timeout()}; |
| 70 | + false -> |
| 71 | + {ok, follower, NewState, election_timeout(State#state.timer_start)} |
| 72 | + end; |
| 73 | +follower(#append_entries{}=AppendEntries, State) -> |
| 74 | + {ok, follower, State, election_timeout()}. |
| 75 | + |
| 76 | +candidate(timeout, #state{term=CurrentTerm}=State) -> |
| 77 | + NewState = State#state{term = CurrentTerm + 1, |
| 78 | + responses = dict:new()}, |
| 79 | + request_votes(State), |
| 80 | + {ok, candidate, NewState, election_timeout()}; |
| 81 | +candidate(#vote{term=VoteTerm, success=false, from=From}=Vote, #state{term=Term}=State) |
| 82 | + when VoteTerm > Term -> |
| 83 | + {ok, follower, State#state{responses=dict:new()}, election_timeout()}; |
| 84 | +candidate(#vote{success=false, from=From}=Vote, State#state{responses=Responses}) -> |
| 85 | + NewState = State#state{responses = dict:store(From, false, Responses)}, |
| 86 | + {ok, candidate, NewState, election_timeout()}; |
| 87 | +candidate(#vote{success=true, term=Term}, State#state{responses=Responses}) -> |
| 88 | + NewResponses = dict:store(From, true, Responses), |
| 89 | + case is_leader(NewResponses) of |
| 90 | + true -> |
| 91 | + NewState = State#state{responses=dict:new()}, |
| 92 | + ok = gen_fsm:send_event(self(), become_leader), |
| 93 | + {ok, leader, NewState}; |
| 94 | + false -> |
| 95 | + NewState = State#state{responses=NewResponses}, |
| 96 | + {ok, candidate, NewState, election_timeout()} |
| 97 | + end; |
| 98 | +candidate(#request_vote{term=RequestTerm}, #state{term=Term}=State) when RequestTerm > Term -> |
| 99 | + {ok, follower, State#state{term = RequestTerm, responses=dict:new()}, election_timeout()}; |
| 100 | +candidate(#request_vote{}, State) -> |
| 101 | + {ok, candidate, State, election_timeout()}; |
| 102 | + |
| 103 | +candidate(#append_entries{term=RequestTerm}, State) -> |
| 104 | + %% TODO: Should we reset the current term in State here? |
| 105 | + {ok, follower, State, election_timeout()}. |
| 106 | + |
| 107 | +leader(become_leader, State) -> |
| 108 | + Followers = initialize_followers(State), |
| 109 | + NewState = State#state{followers=Followers}, |
| 110 | + ok = send_empty_append_entries(NewState), |
| 111 | + %% TODO: Put a timeout for retries here? |
| 112 | + {ok, leader, NewState}; |
| 113 | +leader(#append_entries_rpy{from=From, success=false}, #state{followers=Followers}=State) -> |
| 114 | + NextIndex = decrement_follower_index(From, Followers), |
| 115 | + ok = send_append_entries(From, NextIndex), |
| 116 | + NewState = State#state{followers=dict:store(From, NextIndex, Followers)}, |
| 117 | + {ok, leader, NewState}; |
| 118 | +leader(#append_entries_rpy{from=From, success=true}, #state{responses=Responses}=State) -> |
| 119 | + |
| 120 | +%%============================================================================= |
| 121 | +%% Internal Functions |
| 122 | +%%============================================================================= |
| 123 | + |
| 124 | +set_term(Term, #state{term=CurrentTerm}=State) when Term < CurrentTerm -> |
| 125 | + State; |
| 126 | +set_term(Term, #state{term=CurrentTerm}=State) when Term > CurrentTerm -> |
| 127 | + State#state{term=Term, voted_for=undefined}; |
| 128 | +set_term(Term, #state{term=Term}=State) -> |
| 129 | + State. |
| 130 | + |
| 131 | +vote(#request_vote{term=Term}=RequestVote, #state{term=CurrentTerm, me=Me}) |
| 132 | + when Term < CurrentTerm -> |
| 133 | + fail_vote(RequestVote, CurrentTerm, Me); |
| 134 | +vote(#request_vote{from=CandidateId, term=CurrentTerm}=RequestVote, |
| 135 | + #state{voted_for=CandidateId, term=CurrentTerm, me=Me}=State) -> |
| 136 | + maybe_successful_vote(RequestVote, CurrentTerm, Me, State); |
| 137 | +vote(#request_vote{term=CurrentTerm}=RequestVote, |
| 138 | + #state{voted_for=undefined, term=CurrentTerm, me=Me}=State) -> |
| 139 | + maybe_successful_vote(RequestVote, CurrentTerm, Me, State); |
| 140 | +vote(#request_vote{from=CandidateId, term=CurrentTerm}=RequestVote, |
| 141 | + #state{voted_for=AnotherId, term=CurrentTerm, me=Me}) |
| 142 | + when AnotherId != CandidateId -> |
| 143 | + fail_vote(RequestVote, CurrentTerm, Me). |
| 144 | + |
| 145 | +maybe_successful_vote(RequestVote, CurrentTerm, Me, State) -> |
| 146 | + case check_log(RequestVote, State) of |
| 147 | + true -> |
| 148 | + successful_vote(RequestVote, CurrentTerm, Me); |
| 149 | + false -> |
| 150 | + fail_vote(RequestVote, CurrentTerm, Me) |
| 151 | + end. |
| 152 | + |
| 153 | +successful_vote(#request_vote{msg_id=MsgId}, CurrentTerm, Me) -> |
| 154 | + {ok, #vote{msg_id=MsgId, term=CurrentTerm, from=Me, success=true}. |
| 155 | + |
| 156 | +fail_vote(#request_vote{msg_id=MsgId}, CurrentTerm, Me) -> |
| 157 | + {ok, #vote{msg_id=MsgId, term=CurrentTerm, from=Me, success=false}. |
| 158 | + |
| 159 | +check_log(#request_vote{last_log_term=CandidateLogTerm}, |
| 160 | + #state{last_log_term=LogTerm}) when CandidateLogTerm > LogTerm -> |
| 161 | + true; |
| 162 | +check_log(#request_vote{last_log_term=CandidateLogTerm}, |
| 163 | + #state{last_log_term=LogTerm}) when CandidateLogTerm < LogTerm -> |
| 164 | + false; |
| 165 | +check_log(#request_vote{last_log_index=CandidateLogIndex}, #state{last_log_index=LogIndex}) |
| 166 | + when CandidateLogIndex > LogIndex-> |
| 167 | + true; |
| 168 | +check_log(#request_vote{last_log_index=LogIndex}, #state{last_log_index=LogIndex}) |
| 169 | + true; |
| 170 | +check_log(_RequestVote, _State) -> |
| 171 | + false; |
| 172 | + |
| 173 | +initialize_followers(State) -> |
| 174 | + NextIndex = log:get_index() + 1, |
| 175 | + Followers = [{Peer, NextIndex} || Peer <- Peers], |
| 176 | + dict:from_list(Followers). |
| 177 | + |
| 178 | +election_timeout(StartTime) -> |
| 179 | + timer:diff(os:timestamp(), StartTime) div 1000. |
| 180 | + |
| 181 | +election_timeout() -> |
| 182 | + ?ELECTION_TIMEOUT_MIN + random:uniform(?ELECTION_TIMEOUT_MAX - |
| 183 | + ?ELECTION_TIMEOUT_MIN). |
0 commit comments