12
12
-define (HEARTBEAT_TIMEOUT , 25 ).
13
13
14
14
% % API
15
- -export ([start / 0 , stop /1 , start / 1 , start_link / 3 , read_op /2 , op /2 ,
15
+ -export ([start_link / 3 , stop /1 , get_leader / 1 , read_op /2 , op /2 ,
16
16
set_config /2 , send /2 , send_sync /2 ]).
17
17
18
18
% % gen_fsm callbacks
25
25
% % Testing outputs
26
26
-export ([set_term /2 , candidate_log_up_to_date /4 ]).
27
27
28
- % % This function is simply for testing a single peer with erlang transport
29
- start () ->
30
- start (peer1 ).
31
-
32
28
stop (Pid ) ->
33
29
gen_fsm :send_all_state_event (Pid , stop ).
34
30
35
- start (Me ) ->
36
- gen_fsm :start ({local , Me }, ? MODULE , [Me ], []).
37
-
38
31
start_link (NameAtom , Me , Opts ) ->
39
32
gen_fsm :start_link ({local , NameAtom }, ? MODULE , [Me , Opts ], []).
40
- % %gen_fsm:start_link({local, NameAtom}, ?MODULE, [Me, Opts], [{debug, [trace]}]).
41
33
42
34
op (Peer , Command ) ->
43
35
gen_fsm :sync_send_event (Peer , {op , Command }).
@@ -48,13 +40,16 @@ read_op(Peer, Command) ->
48
40
set_config (Peer , Config ) ->
49
41
gen_fsm :sync_send_event (Peer , {set_config , Config }).
50
42
43
+ get_leader (Pid ) ->
44
+ gen_fsm :sync_send_all_state_event (Pid , get_leader ).
45
+
51
46
-spec send (atom (), # vote {} | # append_entries_rpy {}) -> ok .
52
47
send (To , Msg ) ->
53
48
% % Catch badarg error thrown if name is unregistered
54
49
catch gen_fsm :send_event (To , Msg ).
55
50
56
51
-spec send_sync (atom (), # request_vote {} | # append_entries {}) ->
57
- { ok , # vote {}} | { ok , # append_entries_rpy {}} .
52
+ # vote {} | # append_entries_rpy {} | timeout .
58
53
send_sync (To , Msg ) ->
59
54
Timeout = 100 ,
60
55
gen_fsm :sync_send_event (To , Msg , Timeout ).
@@ -64,7 +59,6 @@ send_sync(To, Msg) ->
64
59
% %=============================================================================
65
60
66
61
init ([Me , # rafter_opts {state_machine = StateMachine }]) ->
67
- random :seed (),
68
62
Timer = gen_fsm :send_event_after (election_timeout (), timeout ),
69
63
# meta {voted_for = VotedFor , term = Term } = rafter_log :get_metadata (Me ),
70
64
BackendState = StateMachine :init (Me ),
@@ -95,6 +89,8 @@ handle_event(stop, _, State) ->
95
89
handle_event (_Event , _StateName , State ) ->
96
90
{stop , {error , badmsg }, State }.
97
91
92
+ handle_sync_event (get_leader , _ , StateName , State = # state {leader = Leader }) ->
93
+ {reply , Leader , StateName , State };
98
94
handle_sync_event (_Event , _From , _StateName , State ) ->
99
95
{stop , badmsg , State }.
100
96
@@ -174,7 +170,7 @@ follower(#append_entries{term=Term, from=From, prev_log_index=PrevLogIndex,
174
170
State3 = reset_timer (election_timeout (), State2 ),
175
171
case consistency_check (AppendEntries , State3 ) of
176
172
false ->
177
- lager :info (" ~p ~p ~n " , [AppendEntries , State3 ]),
173
+ ok = lager :info (" ~p ~p ~n " , [AppendEntries , State3 ]),
178
174
{reply , Rpy , follower , State3 };
179
175
true ->
180
176
{ok , CurrentIndex } = rafter_log :check_and_append (Me ,
@@ -811,7 +807,7 @@ send_entry(Peer, Index, #state{me=Me,
811
807
812
808
send_append_entries (# state {followers = Followers , send_clock = SendClock }= State ) ->
813
809
NewState = State # state {send_clock = SendClock + 1 },
814
- [send_entry (Peer , Index , NewState ) ||
810
+ _ = [send_entry (Peer , Index , NewState ) ||
815
811
{Peer , Index } <- dict :to_list (Followers )],
816
812
NewState .
817
813
@@ -843,7 +839,7 @@ become_candidate(#state{term=CurrentTerm, me=Me}=State0) ->
843
839
responses = dict :new (),
844
840
leader = undefined },
845
841
State3 = set_metadata (Me , State2 ),
846
- request_votes (State3 ),
842
+ _ = request_votes (State3 ),
847
843
State3 .
848
844
849
845
become_leader (# state {me = Me , term = Term , init_config = InitConfig }= State ) ->
0 commit comments