Skip to content

Commit 21a3db2

Browse files
committed
add an ets backend and rafter_backend behaviour
This commit incorporates the following changes: Improvements * Separate Read and Write Path * Read path is linearizable, but doesn't require logging operations * Uses successful next tick of append_entries as an implicit quorum * ETS Backend * Backends can maintain their internal state now * Statem test for ETS Backend * script to start a node for development purposes Bugfixes * uses noops to advance commitIndex * explicitly handle timeouts in rafter_consensus_fsm Libraries * Removed eper from rebar.config
1 parent a45de09 commit 21a3db2

15 files changed

+1031
-437
lines changed

README.md

+171-59
Large diffs are not rendered by default.

bin/start-node

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
erl -pa deps/*/ebin ebin -setcookie rafter_localhost_test -name $1@127.0.0.1 -eval "rafter:start_test_node($1)."

include/rafter.hrl

+14-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
prev_log_index :: non_neg_integer(),
1919
prev_log_term :: non_neg_integer(),
2020
entries :: term(),
21-
commit_index :: non_neg_integer()}).
21+
commit_index :: non_neg_integer(),
22+
23+
%% This is used during read-only operations
24+
send_clock :: non_neg_integer()}).
2225

2326
-record(append_entries_rpy, {
2427
from :: atom(),
@@ -28,13 +31,17 @@
2831
%% it prevents duplicate responses from causing recommits and helps
2932
%% maintain safety. In the raft reference implementation (logcabin)
3033
%% they cancel the in flight RPC's instead. That's difficult
31-
%% to do correctly(without races) in erlang with asynchronous
34+
%% to do correctly(without races) in erlang with asynchronous
3235
%% messaging and mailboxes.
3336
index :: non_neg_integer(),
37+
38+
%% This is used during read-only operations
39+
send_clock :: non_neg_integer(),
40+
3441
success :: boolean()}).
3542

3643
-record(rafter_entry, {
37-
type :: config | op,
44+
type :: noop | config | op,
3845
term :: non_neg_integer(),
3946
index :: non_neg_integer(),
4047
cmd :: term()}).
@@ -44,16 +51,16 @@
4451
term = 0 :: non_neg_integer()}).
4552

4653
-record(config, {
47-
state = blank ::
54+
state = blank ::
4855
%% The configuration specifies no servers. Servers that are new to the
4956
%% cluster and have empty logs start in this state.
50-
blank |
57+
blank |
5158
%% The configuration specifies a single list of servers: a quorum
5259
%% requires any majority of oldservers.
53-
stable |
60+
stable |
5461
%% The configuration specifies two lists of servers: a quorum requires
5562
%% any majority of oldservers, but the newservers also receive log entries.
56-
staging |
63+
staging |
5764
%% The configuration specifies two lists of servers: a quorum requires
5865
%% any majority of oldservers and any majority of the newservers.
5966
transitional,

include/rafter_consensus_fsm.hrl

+27-12
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
timer :: timer:tref(),
44
from :: term(),
55
index :: non_neg_integer(),
6-
term :: non_neg_integer()}).
6+
term :: non_neg_integer(),
7+
8+
%% only used during read_only commands
9+
cmd :: term()}).
710

811
-record(state, {
912
leader :: term(),
@@ -12,25 +15,37 @@
1215
commit_index = 0 :: non_neg_integer(),
1316
init_config :: undefined | list() | complete,
1417

15-
%% The last time a timer was created
16-
timer_start :: non_neg_integer(),
17-
18-
%% The duration of the timer
19-
timer_duration :: non_neg_integer(),
18+
%% Used for Election and Heartbeat timeouts
19+
timer :: timer:tref(),
2020

21-
%% leader state
21+
%% leader state: contains nextIndex for each peer
2222
followers = dict:new() :: dict(),
2323

24-
%% Responses from RPCs to other servers
24+
%% Dict keyed by peer id.
25+
%% contains true as val when candidate
26+
%% contains match_indexes as val when leader
2527
responses = dict:new() :: dict(),
2628

27-
%% Outstanding Client Requests
29+
%% Logical clock to allow read linearizability
30+
%% Reset to 0 on leader election.
31+
send_clock = 0 :: non_neg_integer(),
32+
33+
%% Keep track of the highest send_clock received from each peer
34+
%% Reset on leader election
35+
send_clock_responses = dict:new() :: dict(),
36+
37+
%% Outstanding Client Write Requests
2838
client_reqs = [] :: [#client_req{}],
2939

40+
%% Outstanding Client Read Requests
41+
%% Keyed on send_clock, Val = [#client_req{}]
42+
read_reqs = orddict:new() :: orddict:orddict(),
43+
3044
%% All servers making up the ensemble
3145
me :: string(),
3246

3347
config :: term(),
34-
35-
%% We allow pluggable state machine modules.
36-
state_machine :: atom()}).
48+
49+
%% We allow pluggable backend state machine modules.
50+
state_machine :: atom(),
51+
backend_state :: term()}).

include/rafter_opts.hrl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
-record(rafter_opts, {state_machine = rafter_sm_echo :: atom(),
1+
-record(rafter_opts, {state_machine = rafter_backend_echo :: atom(),
22
logdir :: string()}).

rebar.config

-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@
55
{erl_opts, [debug_info, warnings_as_errors, {parse_transform, lager_transform}]}.
66

77
{deps, [{lager, ".*", {git, "git://github.com/basho/lager", {tag, "2.0.0rc2"}}},
8-
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "3280b736"}}},
98
{druuid, ".*", {git, "git://github.com/kellymclaughlin/druuid.git", {tag, "0.2"}}}
109
]}.

src/rafter.erl

+8-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
-include("rafter_opts.hrl").
66

77
%% API
8-
-export([start_node/2, stop_node/1, op/2, set_config/2,
8+
-export([start_node/2, stop_node/1, op/2, read_op/2, set_config/2,
99
get_state/1, get_leader/1, get_entry/2, get_last_entry/1]).
1010

1111
%% Test API
@@ -17,12 +17,16 @@ start_node(Peer, Opts) ->
1717
stop_node(Peer) ->
1818
rafter_sup:stop_peer(Peer).
1919

20-
%% @doc Run an operation on the backend state machine.
20+
%% @doc Run an operation on the backend state machine.
2121
%% Note: Peer is just the local node in production.
2222
op(Peer, Command) ->
2323
Id = druuid:v4(),
2424
rafter_consensus_fsm:op(Peer, {Id, Command}).
2525

26+
read_op(Peer, Command) ->
27+
Id = druuid:v4(),
28+
rafter_consensus_fsm:read_op(Peer, {Id, Command}).
29+
2630
set_config(Peer, NewServers) ->
2731
Id = druuid:v4(),
2832
rafter_consensus_fsm:set_config(Peer, {Id, NewServers}).
@@ -51,13 +55,13 @@ get_last_entry(Peer) ->
5155
start_cluster() ->
5256
application:start(lager),
5357
application:start(rafter),
54-
Opts = #rafter_opts{state_machine=rafter_sm_echo, logdir="./log"},
58+
Opts = #rafter_opts{state_machine=rafter_backend_echo, logdir="./log"},
5559
Peers = [peer1, peer2, peer3, peer4, peer5],
5660
[rafter_sup:start_peer(Me, Opts) || Me <- Peers].
5761

5862
start_test_node(Name) ->
5963
application:start(lager),
6064
application:start(rafter),
6165
Me = {Name, node()},
62-
Opts = #rafter_opts{state_machine=rafter_sm_echo, logdir="./"},
66+
Opts = #rafter_opts{state_machine=rafter_backend_ets, logdir="./data"},
6367
start_node(Me, Opts).

src/rafter_backend.erl

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-module(rafter_backend).
2+
3+
-export([behaviour_info/1]).
4+
5+
behaviour_info(callbacks) ->
6+
[{init, 1}, {read, 2}, {write, 2}];
7+
behaviour_info(_) ->
8+
undefined.
9+

src/rafter_backend_echo.erl

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-module(rafter_backend_echo).
2+
3+
-behaviour(rafter_backend).
4+
5+
%% Rafter backend callbacks
6+
-export([init/1, read/2, write/2]).
7+
8+
init(_) ->
9+
ok.
10+
11+
read(Command, State) ->
12+
{{ok, Command}, State}.
13+
14+
write(Command, State) ->
15+
{{ok, Command}, State}.

src/rafter_backend_ets.erl

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
-module(rafter_backend_ets).
2+
3+
-behaviour(rafter_backend).
4+
5+
%% rafter_backend callbacks
6+
-export([init/1, stop/1, read/2, write/2]).
7+
8+
-record(state, {peer :: atom() | {atom(), atom()}}).
9+
10+
init(Peer) ->
11+
State = #state{peer=Peer},
12+
NewState = stop(State),
13+
ets:new(rafter_backend_ets, [set, named_table, public]),
14+
ets:new(rafter_backend_ets_tables, [set, named_table, public]),
15+
NewState.
16+
17+
stop(State) ->
18+
catch ets:delete(rafter_backend_ets),
19+
catch ets:delete(rafter_backend_ets_tables),
20+
State.
21+
22+
read({get, Table, Key}, State) ->
23+
Val = try
24+
case ets:lookup(Table, Key) of
25+
[{Key, Value}] ->
26+
{ok, Value};
27+
[] ->
28+
{ok, not_found}
29+
end
30+
catch _:E ->
31+
{error, E}
32+
end,
33+
{Val, State};
34+
35+
read(list_tables, State) ->
36+
{{ok, [Table || {Table} <- ets:tab2list(rafter_backend_ets_tables)]},
37+
State};
38+
39+
read({list_keys, Table}, State) ->
40+
Val = try
41+
list_keys(Table)
42+
catch _:E ->
43+
{error, E}
44+
end,
45+
{Val, State}.
46+
47+
write({new, Name}, State) ->
48+
Val = try
49+
ets:new((Name), [ordered_set, named_table, public]),
50+
ets:insert(rafter_backend_ets_tables, {Name}),
51+
{ok, Name}
52+
catch _:E ->
53+
{error, E}
54+
end,
55+
{Val, State};
56+
57+
write({put, Table, Key, Value}, State) ->
58+
Val = try
59+
ets:insert(Table, {Key, Value}),
60+
{ok, Value}
61+
catch _:E ->
62+
{error, E}
63+
end,
64+
{Val, State};
65+
66+
write({delete, Table}, State) ->
67+
Val =
68+
try
69+
ets:delete(Table),
70+
ets:delete(rafter_backend_ets_tables, Table),
71+
{ok, true}
72+
catch _:E ->
73+
{error, E}
74+
end,
75+
{Val, State};
76+
77+
78+
write({delete, Table, Key}, State) ->
79+
Val = try
80+
{ok, ets:delete(Table, Key)}
81+
catch _:E ->
82+
{error, E}
83+
end,
84+
{Val, State}.
85+
86+
list_keys(Table) ->
87+
list_keys(ets:first(Table), Table, []).
88+
89+
list_keys('$end_of_table', _Table, Keys) ->
90+
{ok, Keys};
91+
list_keys(Key, Table, Keys) ->
92+
list_keys(ets:next(Table, Key), Table, [Key | Keys]).

0 commit comments

Comments
 (0)