commit
142c469d70
10 changed files with 751 additions and 0 deletions
-
191LICENSE
-
14README.md
-
13rebar.config
-
1rebar.lock
-
109src/paxos_agent.erl
-
261src/raft_agent.erl
-
14src/raft_erl.app.src
-
43src/raft_erl.erl
-
64src/raft_log.erl
-
41src/storage.erl
@ -0,0 +1,191 @@ |
|||
Apache License |
|||
Version 2.0, January 2004 |
|||
http://www.apache.org/licenses/ |
|||
|
|||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION |
|||
|
|||
1. Definitions. |
|||
|
|||
"License" shall mean the terms and conditions for use, reproduction, |
|||
and distribution as defined by Sections 1 through 9 of this document. |
|||
|
|||
"Licensor" shall mean the copyright owner or entity authorized by |
|||
the copyright owner that is granting the License. |
|||
|
|||
"Legal Entity" shall mean the union of the acting entity and all |
|||
other entities that control, are controlled by, or are under common |
|||
control with that entity. For the purposes of this definition, |
|||
"control" means (i) the power, direct or indirect, to cause the |
|||
direction or management of such entity, whether by contract or |
|||
otherwise, or (ii) ownership of fifty percent (50%) or more of the |
|||
outstanding shares, or (iii) beneficial ownership of such entity. |
|||
|
|||
"You" (or "Your") shall mean an individual or Legal Entity |
|||
exercising permissions granted by this License. |
|||
|
|||
"Source" form shall mean the preferred form for making modifications, |
|||
including but not limited to software source code, documentation |
|||
source, and configuration files. |
|||
|
|||
"Object" form shall mean any form resulting from mechanical |
|||
transformation or translation of a Source form, including but |
|||
not limited to compiled object code, generated documentation, |
|||
and conversions to other media types. |
|||
|
|||
"Work" shall mean the work of authorship, whether in Source or |
|||
Object form, made available under the License, as indicated by a |
|||
copyright notice that is included in or attached to the work |
|||
(an example is provided in the Appendix below). |
|||
|
|||
"Derivative Works" shall mean any work, whether in Source or Object |
|||
form, that is based on (or derived from) the Work and for which the |
|||
editorial revisions, annotations, elaborations, or other modifications |
|||
represent, as a whole, an original work of authorship. For the purposes |
|||
of this License, Derivative Works shall not include works that remain |
|||
separable from, or merely link (or bind by name) to the interfaces of, |
|||
the Work and Derivative Works thereof. |
|||
|
|||
"Contribution" shall mean any work of authorship, including |
|||
the original version of the Work and any modifications or additions |
|||
to that Work or Derivative Works thereof, that is intentionally |
|||
submitted to Licensor for inclusion in the Work by the copyright owner |
|||
or by an individual or Legal Entity authorized to submit on behalf of |
|||
the copyright owner. For the purposes of this definition, "submitted" |
|||
means any form of electronic, verbal, or written communication sent |
|||
to the Licensor or its representatives, including but not limited to |
|||
communication on electronic mailing lists, source code control systems, |
|||
and issue tracking systems that are managed by, or on behalf of, the |
|||
Licensor for the purpose of discussing and improving the Work, but |
|||
excluding communication that is conspicuously marked or otherwise |
|||
designated in writing by the copyright owner as "Not a Contribution." |
|||
|
|||
"Contributor" shall mean Licensor and any individual or Legal Entity |
|||
on behalf of whom a Contribution has been received by Licensor and |
|||
subsequently incorporated within the Work. |
|||
|
|||
2. Grant of Copyright License. Subject to the terms and conditions of |
|||
this License, each Contributor hereby grants to You a perpetual, |
|||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
|||
copyright license to reproduce, prepare Derivative Works of, |
|||
publicly display, publicly perform, sublicense, and distribute the |
|||
Work and such Derivative Works in Source or Object form. |
|||
|
|||
3. Grant of Patent License. Subject to the terms and conditions of |
|||
this License, each Contributor hereby grants to You a perpetual, |
|||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
|||
(except as stated in this section) patent license to make, have made, |
|||
use, offer to sell, sell, import, and otherwise transfer the Work, |
|||
where such license applies only to those patent claims licensable |
|||
by such Contributor that are necessarily infringed by their |
|||
Contribution(s) alone or by combination of their Contribution(s) |
|||
with the Work to which such Contribution(s) was submitted. If You |
|||
institute patent litigation against any entity (including a |
|||
cross-claim or counterclaim in a lawsuit) alleging that the Work |
|||
or a Contribution incorporated within the Work constitutes direct |
|||
or contributory patent infringement, then any patent licenses |
|||
granted to You under this License for that Work shall terminate |
|||
as of the date such litigation is filed. |
|||
|
|||
4. Redistribution. You may reproduce and distribute copies of the |
|||
Work or Derivative Works thereof in any medium, with or without |
|||
modifications, and in Source or Object form, provided that You |
|||
meet the following conditions: |
|||
|
|||
(a) You must give any other recipients of the Work or |
|||
Derivative Works a copy of this License; and |
|||
|
|||
(b) You must cause any modified files to carry prominent notices |
|||
stating that You changed the files; and |
|||
|
|||
(c) You must retain, in the Source form of any Derivative Works |
|||
that You distribute, all copyright, patent, trademark, and |
|||
attribution notices from the Source form of the Work, |
|||
excluding those notices that do not pertain to any part of |
|||
the Derivative Works; and |
|||
|
|||
(d) If the Work includes a "NOTICE" text file as part of its |
|||
distribution, then any Derivative Works that You distribute must |
|||
include a readable copy of the attribution notices contained |
|||
within such NOTICE file, excluding those notices that do not |
|||
pertain to any part of the Derivative Works, in at least one |
|||
of the following places: within a NOTICE text file distributed |
|||
as part of the Derivative Works; within the Source form or |
|||
documentation, if provided along with the Derivative Works; or, |
|||
within a display generated by the Derivative Works, if and |
|||
wherever such third-party notices normally appear. The contents |
|||
of the NOTICE file are for informational purposes only and |
|||
do not modify the License. You may add Your own attribution |
|||
notices within Derivative Works that You distribute, alongside |
|||
or as an addendum to the NOTICE text from the Work, provided |
|||
that such additional attribution notices cannot be construed |
|||
as modifying the License. |
|||
|
|||
You may add Your own copyright statement to Your modifications and |
|||
may provide additional or different license terms and conditions |
|||
for use, reproduction, or distribution of Your modifications, or |
|||
for any such Derivative Works as a whole, provided Your use, |
|||
reproduction, and distribution of the Work otherwise complies with |
|||
the conditions stated in this License. |
|||
|
|||
5. Submission of Contributions. Unless You explicitly state otherwise, |
|||
any Contribution intentionally submitted for inclusion in the Work |
|||
by You to the Licensor shall be under the terms and conditions of |
|||
this License, without any additional terms or conditions. |
|||
Notwithstanding the above, nothing herein shall supersede or modify |
|||
the terms of any separate license agreement you may have executed |
|||
with Licensor regarding such Contributions. |
|||
|
|||
6. Trademarks. This License does not grant permission to use the trade |
|||
names, trademarks, service marks, or product names of the Licensor, |
|||
except as required for reasonable and customary use in describing the |
|||
origin of the Work and reproducing the content of the NOTICE file. |
|||
|
|||
7. Disclaimer of Warranty. Unless required by applicable law or |
|||
agreed to in writing, Licensor provides the Work (and each |
|||
Contributor provides its Contributions) on an "AS IS" BASIS, |
|||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
|||
implied, including, without limitation, any warranties or conditions |
|||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A |
|||
PARTICULAR PURPOSE. You are solely responsible for determining the |
|||
appropriateness of using or redistributing the Work and assume any |
|||
risks associated with Your exercise of permissions under this License. |
|||
|
|||
8. Limitation of Liability. In no event and under no legal theory, |
|||
whether in tort (including negligence), contract, or otherwise, |
|||
unless required by applicable law (such as deliberate and grossly |
|||
negligent acts) or agreed to in writing, shall any Contributor be |
|||
liable to You for damages, including any direct, indirect, special, |
|||
incidental, or consequential damages of any character arising as a |
|||
result of this License or out of the use or inability to use the |
|||
Work (including but not limited to damages for loss of goodwill, |
|||
work stoppage, computer failure or malfunction, or any and all |
|||
other commercial damages or losses), even if such Contributor |
|||
has been advised of the possibility of such damages. |
|||
|
|||
9. Accepting Warranty or Additional Liability. While redistributing |
|||
the Work or Derivative Works thereof, You may choose to offer, |
|||
and charge a fee for, acceptance of support, warranty, indemnity, |
|||
or other liability obligations and/or rights consistent with this |
|||
License. However, in accepting such obligations, You may act only |
|||
on Your own behalf and on Your sole responsibility, not on behalf |
|||
of any other Contributor, and only if You agree to indemnify, |
|||
defend, and hold each Contributor harmless for any liability |
|||
incurred by, or claims asserted against, such Contributor by reason |
|||
of your accepting any such warranty or additional liability. |
|||
|
|||
END OF TERMS AND CONDITIONS |
|||
|
|||
Copyright 2019, Shanti Chellaram <shanti@shanti.im>. |
|||
|
|||
Licensed under the Apache License, Version 2.0 (the "License"); |
|||
you may not use this file except in compliance with the License. |
|||
You may obtain a copy of the License at |
|||
|
|||
http://www.apache.org/licenses/LICENSE-2.0 |
|||
|
|||
Unless required by applicable law or agreed to in writing, software |
|||
distributed under the License is distributed on an "AS IS" BASIS, |
|||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
See the License for the specific language governing permissions and |
|||
limitations under the License. |
|||
|
|||
@ -0,0 +1,14 @@ |
|||
raft_erl |
|||
===== |
|||
|
|||
An escript |
|||
|
|||
Build |
|||
----- |
|||
|
|||
$ rebar3 escriptize |
|||
|
|||
Run |
|||
--- |
|||
|
|||
$ _build/default/bin/raft_erl |
|||
@ -0,0 +1,13 @@ |
|||
{erl_opts, [no_debug_info]}. |
|||
{deps, []}. |
|||
|
|||
{escript_incl_apps, |
|||
[raft_erl]}. |
|||
{escript_main_app, raft_erl}. |
|||
{escript_name, raft_erl}. |
|||
{escript_emu_args, "%%! +sbtu +A1\n"}. |
|||
|
|||
%% Profiles |
|||
{profiles, [{test, |
|||
[{erl_opts, [debug_info]} |
|||
]}]}. |
|||
@ -0,0 +1 @@ |
|||
[]. |
|||
@ -0,0 +1,109 @@ |
|||
-module(paxos_agent). |
|||
|
|||
-behavior(gen_statem). |
|||
|
|||
-export([callback_mode/0, init/1, handle_event/2]). |
|||
|
|||
callback_mode() -> [handle_event_function, state_enter]. |
|||
|
|||
init(Opts) -> |
|||
ok. |
|||
|
|||
% persistent state |
|||
-record( |
|||
pstate, { |
|||
name, % the identifier of this agent |
|||
outcome = [], % the last decree written in the ledger, or [] |
|||
lastTried = -1, % the number of the last ballot begun by this agent |
|||
prevBal = -1, % the number of the last ballot in which this agent voted |
|||
prevDec = [], % the decree of the last ballot in which this agent voted |
|||
nextBal -1 % the number of the last ballot in which this agent agreed to participate, or -1 |
|||
}). |
|||
% transient state |
|||
-record( |
|||
tstate, { |
|||
status = idle, % one of idle, trying, or polling |
|||
prevVotes = [], % set of votes received in LastVote messages for the current ballot (the one in lastTried) |
|||
quorum, % if status = polling, the set of priests forming quorum of current ballot; otherwise meaningless |
|||
voters, % if status = polling, the set of quorum members from whom p has received 'voted' messages |
|||
decree, % if status = polling, the decree of the current ballot |
|||
}). |
|||
|
|||
% Note: Guaranteed not to Lose Progress |
|||
% Note: Guaranteed to Proceed if N/2+1 nodes are available. |
|||
|
|||
% Ballots are a { BallotId :: integer, OwnerName :: string } |
|||
try_new_ballot(Tstate, Pstate) -> |
|||
{LastTriedId, _LastTriedOwner} = Pstate#pstate.lastTried, |
|||
Ballot = {LastTriedId+1, Pstate#pstate.name}, |
|||
NewPstate = Pstate#pstate{lastTried=Ballot}, |
|||
NewTstate = Tstate#tstate{status=trying, prevVotes=[]}, |
|||
{ok, {NewTstate, NewPstate}}. |
|||
|
|||
send_next_ballot(Tstate, Pstate) |
|||
when Tstate#tstate.status == trying -> |
|||
send(any_priest, {next_ballot, Pstate#pstate.lastTried}), |
|||
{ok, Tstate, Pstate}. |
|||
|
|||
recv_next_ballot({next_ballot, Ballot}, Tstate, #pstate{nextBal=NextBal} = Pstate) |
|||
when Ballot > NextBal -> |
|||
NewPstate = Pstate#pstate{nextBal=Ballot}, |
|||
{ok, {Tstate, NewPstate}}. |
|||
|
|||
send_last_vote(Tstate, Pstate) |
|||
when Pstate#pstate.nextBal > Pstate#pstate.prevBal -> % send the last decree this voted for |
|||
Vote = {Pstate#pstate.name, Pstate#pstate.prevBal, Pstate#pstate.prevDec}, |
|||
send(owner(Pstate#pstate.nextBal), {last_vote, Pstate#pstate.nextBal, Vote}), |
|||
{ok, Tstate, Pstate}. |
|||
|
|||
recv_last_vote({last_vote, Ballot, Vote}, Tstate, Pstate) |
|||
when Tstate#tstate.status == trying andalso Ballot == Pstate#pstate.lastTried -> |
|||
NewTstate=Tstate#tstate{prevVotes=[Vote|Tstate#tstate.prevVotes]}, % TODO: this should be union, not append |
|||
{ok, NewTstate, Pstate}. |
|||
|
|||
start_polling_majority_set_q(Q, Tstate, Pstate) |
|||
when Tstate#tstate.status == trying andalso Q -> |
|||
NewTstate = Tstate#tstate{ |
|||
status=polling, |
|||
quorum=Q, |
|||
voters=[], |
|||
decree=case max(Tstate#tstate.prevVotes) of |
|||
{PriestName, {-1, _}, PrevDecree} -> []; |
|||
{PriestName, PrevBallot, PrevDecree} -> PrevDecree |
|||
end, |
|||
}, |
|||
{ok, NewTstate, Pstate}. |
|||
|
|||
send_begin_ballot(Tstate, Pstate) |
|||
when Tstate#tstate.status == polling -> |
|||
send(random(Tstate#tstate.quorum), {begin_ballot, Pstate#pstate.lastTried, Tstate#tstate.decree}), |
|||
{ok, Tstate, Pstate}. |
|||
|
|||
recv_begin_ballot({begin_ballot, Ballot, Decree}, Tstate, Pstate) |
|||
when Ballot == Pstate#pstate.nextBal andalso Ballot > Pstate#pstate.prevBal -> |
|||
NewPstate = Pstate#pstate{prevBal=Ballot, nextBal=Decree}, |
|||
% if there is a Ballot B in _B_ with ballotId = Ballot, choose that node and let the new value of _B_ include the union of Bc (I believe this means commit?) |
|||
{ok, Tstate, NewPstate}. |
|||
|
|||
send_voted(Tstate, Pstate) |
|||
when Pstate#pstate.prevBal /= -1 -> |
|||
send(owner(Pstate#pstate.prevBal), {voted, Pstate#pstate.prevBal, Pstate#pstate.name}), |
|||
{ok, Tstate, Pstate}. |
|||
|
|||
recv_voted({voted, Ballot, Priest}, Tstate, Pstate) |
|||
when Tstate#tstate.status == polling andalso Ballot == Pstate#pstate.lastTried -> |
|||
{ok, Tstate#tstate{votes=[Priest|Tstate#tstate.voters]}, Pstate}. % this could be a transition point |
|||
|
|||
succeed(Tstate, Pstate) %todo: subset needs to be implemented as an if |
|||
when Tstate#tstate.status == polling andalso subset(quorum, voters) andalso Pstate#pstate.outcome == [] -> |
|||
NewPstate = Pstate#pstate{outcome = Tstate#tstate.decree}, % this is the commit step, i think |
|||
{ok, Tstate, NewPstate}. |
|||
|
|||
send_success(Tstate, Pstate) |
|||
when Pstate#pstate.outcome /= blank -> |
|||
send(any_priest(), {success, Pstate#pstate.outcome}), |
|||
{ok, Tstate, Pstate}. |
|||
|
|||
recv_success({success, Outcome}, Tstate, Pstate) |
|||
when Pstate#pstate.outcome /= blank -> |
|||
{ok, Tstate, Pstate#pstate{outcome=Outcome}}. % follower commit step |
|||
@ -0,0 +1,261 @@ |
|||
-module(raft_agent). |
|||
|
|||
-behavior(gen_statem). |
|||
|
|||
%% g |
|||
%-export([start_link/0]). |
|||
%% gen_statem callbacks |
|||
-export([callback_mode/0, init/1, handle_event/4]). |
|||
|
|||
%% Client API |
|||
-export([operate/2]). |
|||
|
|||
operate(ServerRef, Fun) -> |
|||
gen_statem:call(ServerRef, {client_req, raft_log:op(Fun, [])}, 500). |
|||
|
|||
% persisted state should be saved to disk after modification and before a callback returns |
|||
-record(state, { |
|||
id,term=0,voted_for=[],log,cluster=[], %persisted state |
|||
ldr_next_index, ldr_match_index, %volatile state |
|||
votes_gathered=0 |
|||
}). |
|||
|
|||
%% gen_statem callbacks |
|||
|
|||
callback_mode() -> [handle_event_function, state_enter]. |
|||
|
|||
init(Args) -> |
|||
State0 = parse_args(Args), |
|||
State1 = State0#state{log=raft_log:new(#{})}, |
|||
{ok, follower, State1}. |
|||
|
|||
%% Message Events that need to be handled: |
|||
%% |
|||
%% Election Timeout Elapsed (follower and candidate state only) |
|||
%% Received AppendEntries RPC |
|||
%% Received AppendEntries ACK |
|||
%% Received RequestVote RPC |
|||
%% Received RequestVote ACK |
|||
%% |
|||
%% Received ClientRequest RPC -> Commit Transition / Query-NoOp |
|||
%% [Committed ClientEntry] -> Respond to Client |
|||
|
|||
%% All States |
|||
|
|||
%% Receiving an out-of-date RPC or ACK |
|||
%handle_event(cast, {_Type, Term, _Payload}=Msg, _Role, #state{term=CurrentTerm}=Data) |
|||
% when Term > CurrentTerm -> |
|||
% {next_state, follower, Data#state{term=Term, voted_for=[]}}; |
|||
|
|||
%% Follower State |
|||
handle_event(enter, follower, follower, #state{}) -> |
|||
{keep_state_and_data, [action_reset_election()]}; |
|||
|
|||
handle_event(timeout, election, follower, #state{}=Data) -> |
|||
{next_state, candidate, Data}; |
|||
|
|||
%% Candidate State |
|||
handle_event(enter, _PrevState, candidate, #state{term=Term,id=Id}=Data0) -> |
|||
Data1 = Data0#state{term=Term+1, voted_for=Id, votes_gathered=1}, |
|||
lists:foreach(fun(Node) -> send(Node, rpc(request_vote, Data1)) end, peers(Data1)), |
|||
{keep_state, Data1}; |
|||
|
|||
|
|||
% if we are out of date, revert to follower |
|||
handle_event(cast, {rpc, _RpcType, Term, _Payload}=EventData, _State, #state{term=CurrentTerm}=Data) when Term > CurrentTerm -> |
|||
{next_state, follower, Data#state{term=Term, voted_for=[]}, [{next_event, cast, EventData}]}; |
|||
|
|||
handle_event(cast, {rpc, request_vote, Term, {Candidate, _LastIdx, _LastTerm}}, _State, #state{term=CurrentTerm}) when Term < CurrentTerm -> |
|||
send(Candidate, {ack, request_vote, CurrentTerm, false}), |
|||
keep_state_and_data; |
|||
|
|||
handle_event(cast, {rpc, request_vote, CurrentTerm, {Candidate, CLastIndex, CLastTerm}}, follower, #state{term=CurrentTerm,log=Log,voted_for=VotedFor}=Data) -> |
|||
LastIndex = raft_log:last_index(Log), |
|||
LastTerm = raft_log:term_at(Log, LastIndex), |
|||
Result = |
|||
if |
|||
not (VotedFor == [] orelse VotedFor == Candidate) -> |
|||
false; |
|||
LastTerm > CLastTerm -> |
|||
false; |
|||
LastIndex > CLastIndex -> |
|||
false; |
|||
true -> |
|||
true |
|||
end, |
|||
send(Candidate, {ack, request_vote, CurrentTerm, Result}), |
|||
case Result of |
|||
true -> |
|||
{keep_state, Data#state{voted_for=Candidate}}; |
|||
false -> |
|||
keep_state_and_data |
|||
end; |
|||
|
|||
handle_event(cast, {rpc, append_entries, Term, {LeaderId, _PrevLogIndex, _PrevLogTerm, [] = _Entries, _LeaderCommit}}, _Role, #state{id=Id, term=Term}) -> |
|||
send(LeaderId, {ack, append_entries, Term, Id, true}), |
|||
keep_state_and_data; |
|||
|
|||
handle_event(cast, {rpc, append_entries, Term, {LeaderId, PrevLogIndex, PrevLogTerm, Entries, LeaderCommit}}, _Role, #state{id=Id, term=Term, log=Log}=Data) -> |
|||
{Result, NewLog} = |
|||
case raft_log:term_at(Log, PrevLogIndex) of |
|||
PrevLogTerm -> |
|||
{false, Log}; |
|||
_ -> |
|||
Log1 = raft_log:drop_after(Log, PrevLogIndex), |
|||
Log2 = raft_log:append_all(Log1, Entries), |
|||
Log3 = raft_log:commit(Log2, min(LeaderCommit, raft_log:last_index(Log2))), |
|||
{true, Log3} |
|||
end, |
|||
send(LeaderId, {ack, append_entries, Term, Id, Result}), |
|||
{keep_state, Data#state{log=NewLog}}; |
|||
|
|||
handle_event(cast, {ack, request_vote, Term, true}, candidate, #state{term=Term,votes_gathered=Votes,cluster=Cluster}=Data) -> |
|||
NewVotes = Votes+1, |
|||
NewData = Data#state{votes_gathered=NewVotes}, |
|||
if |
|||
NewVotes >= length(Cluster) div 2 -> |
|||
{next_state, leader, NewData}; |
|||
true -> |
|||
{keep_state, NewData} |
|||
end; |
|||
|
|||
%% Leader Mode! |
|||
handle_event(enter, leader, leader, Data) -> |
|||
handle_event(enter, candidate, leader, Data); |
|||
handle_event(enter, candidate, leader, Data) -> |
|||
% send to all members |
|||
Peers = peers(Data), |
|||
LastLogIndex = raft_log:last_index(Data#state.log), |
|||
NextIndex = maps:from_list(lists:map(fun (Peer) -> {Peer, LastLogIndex + 1} end, Peers)), |
|||
MatchIndex = maps:from_list(lists:map(fun (Peer) -> {Peer, 0} end, Peers)), |
|||
NewData = Data#state{ldr_match_index=MatchIndex,ldr_next_index=NextIndex}, |
|||
|
|||
SendHeartbeat = fun(Server) -> send(Server, rpc(append_entries, Server, NewData)) end, |
|||
lists:foreach(SendHeartbeat, Peers), |
|||
{keep_state, NewData, [action_heartbeat()]}; |
|||
|
|||
handle_event({call, From}, {client_req, {op, Op, Args}}, leader, #state{term=Term,log=Log}=Data) -> |
|||
Parent = self(), |
|||
ProxyOp = |
|||
fun(State) -> |
|||
{State, Response} = apply(Op, [State | Args]), |
|||
gen_statem:cast(Parent, {client_resp, From, Response}), |
|||
State |
|||
end, |
|||
NewLog = raft_log:append(Log, Term, {op, ProxyOp, []}), |
|||
{keep_state, Data#state{log=NewLog}, [{next_event, internal, check_followers}]}; |
|||
|
|||
handle_event(cast, {client_resp, To, Result}, leader, _Data) -> |
|||
gen_statem:reply(To, Result), |
|||
keep_state_and_data; |
|||
|
|||
handle_event(internal, check_followers, leader, #state{cluster=Cluster,log=Log,ldr_next_index=NextIndices}=Data) -> |
|||
LastIndex = raft_log:last_index(Log), |
|||
AllPeers = peers(Data), |
|||
CheckFn = fun(Peer) -> |
|||
#{Peer := NextIndex} = NextIndices, |
|||
LastIndex >= NextIndex |
|||
end, |
|||
OutOfDatePeers = lists:filter(CheckFn, AllPeers), |
|||
SendAppendEntries = fun(Peer) -> send(Peer, rpc(append_entries, Peer, Data)) end, |
|||
lists:foreach(SendAppendEntries, OutOfDatePeers), |
|||
keep_state_and_data; |
|||
|
|||
handle_event(cast, {ack, append_entries, Term, FollowerId, false}, leader, #state{ldr_next_index=NextIndices,log=Log}=Data) -> |
|||
NewNextIndices = maps:update_with(FollowerId, fun(Value) -> Value-1 end, NextIndices), |
|||
NewData = Data#state{ldr_next_index=NewNextIndices}, |
|||
send(FollowerId, rpc(append_entries, NewData)), |
|||
{keep_state, NewData}; |
|||
|
|||
handle_event(cast, {ack, append_entries, Term, FollowerId, true}, leader, #state{ldr_match_index=MatchIndices, ldr_next_index=NextIndices,log=Log,cluster=Cluster}=Data) -> |
|||
%io:format("LEADER HEARTBEAT: ~p~n", [Data]), |
|||
NewNextIndices = NextIndices#{FollowerId => raft_log:last_index(Log)}, |
|||
NewMatchIndices = MatchIndices#{FollowerId => raft_log:last_index(Log)}, |
|||
% if SOME INDEX is replicated to a majority of followers, commit that index (the highest SOME INDEX) |
|||
% That index will be the median of the indices in NextIndices |
|||
SortedIndices = lists:sort(maps:values(NewMatchIndices)), |
|||
MedianIndex = length(Cluster) div 2, |
|||
MajorityCommitted = lists:nth(MedianIndex + 1, SortedIndices), |
|||
NewLog = raft_log:commit(Log, MajorityCommitted), |
|||
{keep_state, Data#state{log=NewLog, ldr_next_index=NewNextIndices, ldr_match_index=NewMatchIndices}}; |
|||
|
|||
%% Mostly Ignore votes we gather after being elected |
|||
handle_event(cast, {ack, request_vote, Term, Success}, leader, #state{term=Term,votes_gathered=Votes}=Data) -> |
|||
case Success of |
|||
true -> |
|||
{keep_state, Data#state{votes_gathered=Votes+1}}; |
|||
false -> |
|||
keep_state_and_data |
|||
end; |
|||
|
|||
handle_event(state_timeout, heartbeat, leader, #state{}) -> |
|||
repeat_state_and_data; |
|||
|
|||
handle_event(timeout, election, candidate, #state{}) -> |
|||
repeat_state_and_data. |
|||
|
|||
%% |
|||
%% Private Functions |
|||
%% |
|||
|
|||
%% A message |
|||
|
|||
rpc(append_entries, To, #state{ldr_next_index=NextIndices,term=Term,id=Id,log=Log}) -> |
|||
#{To := NextIndex} = NextIndices, |
|||
LastIndex = raft_log:last_index(Log), |
|||
PrevLogIndex = NextIndex - 1, |
|||
PrevLogTerm = raft_log:at(Log, PrevLogIndex), |
|||
Entries = raft_log:sublist(Log, NextIndex), |
|||
{rpc, append_entries, Term, {Id, PrevLogIndex, PrevLogTerm, Entries, raft_log:commit_index(Log)}}. |
|||
|
|||
rpc(heartbeat, #state{term=Term,id=Id,log=Log}) -> |
|||
PrevLogIndex = 0, |
|||
PrevLogTerm = 0, |
|||
% TODO: populate entries |
|||
Entries = [], |
|||
{rpc, append_entries, Term, {Id, PrevLogIndex, PrevLogTerm, Entries, raft_log:commit_index(Log)}}; |
|||
rpc(request_vote, #state{term=Term,id=Id,log=Log}) -> |
|||
LastIndex = raft_log:last_index(Log), |
|||
{rpc, request_vote, Term, {Id, LastIndex, raft_log:term_at(Log, LastIndex)}}. |
|||
|
|||
% min and max in millseconds |
|||
-define(election_min, 150). |
|||
-define(election_max, 300). |
|||
election_timeout() -> |
|||
?election_min + rand:uniform(?election_max-?election_min+1) - 1. |
|||
heartbeat_timeout() -> |
|||
50. |
|||
|
|||
%% gen_statem action to handle event transition |
|||
|
|||
action_reset_election() -> |
|||
{timeout, election_timeout(), election}. |
|||
|
|||
action_heartbeat() -> |
|||
{state_timeout, heartbeat_timeout(), heartbeat}. |
|||
|
|||
%action_retry_rpc(RpcData) -> |
|||
% {state_timeout, heartbeat_timeout(), {retry, RpcData}}. |
|||
|
|||
parse_args(Args) -> parse_args(Args, #state{}). |
|||
|
|||
parse_args([{id, Id}|Rest], State) -> |
|||
parse_args(Rest, State#state{id=Id}); |
|||
parse_args([{cluster, Cluster}|Rest], State) -> |
|||
parse_args(Rest, State#state{cluster=Cluster}); |
|||
parse_args([], State) -> State. |
|||
|
|||
peers(#state{id=Id,cluster=Cluster}) -> |
|||
Cluster -- [{local, Id}]. |
|||
|
|||
%% Update Commit Index (call this from check_followers) |
|||
|
|||
%% MOCK |
|||
|
|||
% TODO: Make this probabilistically fail |
|||
send(To, Data) when is_atom(To) -> |
|||
%io:format("Sending: ~p to ~p~n", [Data, To]), |
|||
gen_statem:cast(To,Data); |
|||
send({local, To}, Data) -> |
|||
%io:format("Sending: ~p to ~p~n", [Data, To]), |
|||
gen_statem:cast(To,Data). |
|||
@ -0,0 +1,14 @@ |
|||
{application, raft_erl, |
|||
[{description, "An escript"}, |
|||
{vsn, "0.1.0"}, |
|||
{registered, []}, |
|||
{applications, |
|||
[kernel, |
|||
stdlib |
|||
]}, |
|||
{env,[]}, |
|||
{modules, []}, |
|||
|
|||
{licenses, ["Apache 2.0"]}, |
|||
{links, []} |
|||
]}. |
|||
@ -0,0 +1,43 @@ |
|||
-module(raft_erl). |
|||
|
|||
%% API exports |
|||
-export([main/1]). |
|||
|
|||
%%==================================================================== |
|||
%% API functions |
|||
%%==================================================================== |
|||
|
|||
%% escript Entry point |
|||
main(Args) -> |
|||
storage:start_link(), |
|||
storage:store(lol, wut), |
|||
{ok, Omg} = storage:load(lol), |
|||
%Debug = [], |
|||
Debug = [{debug, [trace,log]}], |
|||
%Cluster = [{local, agent1}], |
|||
Cluster = [{local, agent1}, {local, agent2}, {local, agent3}], |
|||
lists:foreach( |
|||
fun({local, Id}=Name) -> |
|||
{ok, _Pid} = gen_statem:start_link(Name, raft_agent, [{id, Id}, {cluster, Cluster}], |
|||
Debug) |
|||
end, |
|||
Cluster), |
|||
timer:sleep(1000), |
|||
%io:format("Raft State: ~p~n", [raft_agent:operate(agent1, get_op())]), |
|||
erlang:halt(0). |
|||
|
|||
set_op(Value) -> |
|||
raft_log:op(fun do_set/2, [Value]). |
|||
|
|||
get_op() -> |
|||
raft_log:op(fun do_get/1, []). |
|||
|
|||
do_set(State, Value) -> |
|||
{Value, ok}. |
|||
|
|||
do_get(State) -> |
|||
{State, State}. |
|||
|
|||
%%==================================================================== |
|||
%% Internal functions |
|||
%%==================================================================== |
|||
@ -0,0 +1,64 @@ |
|||
-module(raft_log). |
|||
|
|||
-export([new/1, last_index/1, commit_index/1, applied_index/1, commit/2, sublist/2, op/2]). |
|||
-export([append/3, append_all/3, drop_after/2]). |
|||
-export([at/2, term_at/2, op_at/2]). |
|||
|
|||
-record(state, {oplist=[], state=[], applied_idx=0, commit_idx=0}). |
|||
-record(op, {f=fun(X)->X end, args=[]}). |
|||
|
|||
new(Init) -> #state{oplist=[{0, op(fun first_op/2, [Init])}]}. |
|||
|
|||
op(Fun, Args) -> |
|||
#op{f=Fun, args=Args}. |
|||
|
|||
last_index(#state{oplist=Log}) -> length(Log). |
|||
|
|||
applied_index(#state{applied_idx=Idx}) -> Idx. |
|||
|
|||
commit_index(#state{commit_idx=Idx}) -> Idx. |
|||
|
|||
commit(#state{oplist=Ops,state=State,commit_idx=OldIndex}=Log, NewIndex) when NewIndex > OldIndex -> |
|||
io:format("COMMITTING~n"), |
|||
% foreach operation between NewIndex and OldIndex, apply that operation to state |
|||
ToApply = lists:sublist(Ops, OldIndex, NewIndex - OldIndex), |
|||
NewState = lists:foldl(fun({Op, Args}, Acc) -> apply(Op, [Acc|Args]) end, State, ToApply), |
|||
Log#state{state=NewState, commit_idx=NewIndex}; |
|||
|
|||
% Ignore NewIndex <= OldIndex |
|||
commit(Log, NewIndex) -> |
|||
% io:format("NOT COMMITTING~p~n Index:(~p)~n State:(~p)~n Log:(~p)~n", [self(), NewIndex, Log#state.state, Log#state.oplist]), |
|||
Log. |
|||
|
|||
sublist(Log, Index) when Index >= length(Log#state.oplist) -> |
|||
[]; |
|||
sublist(Log, Index) -> |
|||
lists:nthtail(Index, Log#state.oplist). |
|||
|
|||
drop_after(Log, Index) -> |
|||
lists:sublist(Log#state.oplist, Index). |
|||
|
|||
append(#state{oplist=Log}=State, Term, Op) -> |
|||
State#state{oplist=Log ++ [{Term, Op}]}. |
|||
|
|||
append_all(State, _Term, []) -> |
|||
State; |
|||
append_all(State, Term, [Op|Tail]) -> |
|||
append(State, Term, Op). |
|||
|
|||
%% Lookup commands |
|||
|
|||
at(#state{oplist=Log}, Index) -> |
|||
lists:nth(Index, Log). |
|||
|
|||
term_at(Log, Index) -> |
|||
{Term, _Op} = at(Log, Index), |
|||
Term. |
|||
|
|||
op_at(Log, Index) -> |
|||
{_Term, Op} = at(Log, Index), |
|||
Op. |
|||
|
|||
%% |
|||
|
|||
first_op(_, InitialState) -> InitialState. |
|||
@ -0,0 +1,41 @@ |
|||
-module(storage). |
|||
-behavior(gen_server). |
|||
|
|||
%% gen_server callbacks |
|||
-export([init/1, handle_call/3, terminate/2]). |
|||
|
|||
%% public api |
|||
-export([start_link/0, load/1, store/2]). |
|||
|
|||
init([]) -> |
|||
TableRef = ets:new(storage, []), |
|||
{ok, TableRef}. |
|||
|
|||
terminate(_Reason, TableRef) -> |
|||
catch ets:delete(TableRef), |
|||
ok. |
|||
|
|||
% TODO: deal with hard expectation to be running as 'storage' here |
|||
|
|||
start_link() -> |
|||
gen_server:start_link({local, storage}, ?MODULE, [], []). |
|||
|
|||
load(Key) -> |
|||
gen_server:call(storage, {load, Key}). |
|||
|
|||
store(Key, Value) -> |
|||
gen_server:call(storage, {store, Key, Value}). |
|||
|
|||
|
|||
%% private methods |
|||
|
|||
handle_call({load, Key}, _From, TableRef) -> |
|||
Reply = case ets:lookup(TableRef, Key) of |
|||
[{Key, Value}] -> {ok, Value}; |
|||
[] -> error |
|||
end, |
|||
{reply, Reply, TableRef}; |
|||
|
|||
handle_call({store, Key, Value}, _From, TableRef) -> |
|||
ets:insert(TableRef, {Key, Value}), |
|||
{reply, ok, TableRef}. |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue