commit 142c469d7048f4a13faa657fb6075cf9472cdc05 Author: Shanti Chellaram Date: Thu Oct 21 16:38:17 2021 -0400 Initial commit diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c86f5bf --- /dev/null +++ b/LICENSE @@ -0,0 +1,191 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2019, Shanti Chellaram . + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..3c79012 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +raft_erl +===== + +An escript + +Build +----- + + $ rebar3 escriptize + +Run +--- + + $ _build/default/bin/raft_erl diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..6253b0f --- /dev/null +++ b/rebar.config @@ -0,0 +1,13 @@ +{erl_opts, [no_debug_info]}. +{deps, []}. + +{escript_incl_apps, + [raft_erl]}. +{escript_main_app, raft_erl}. +{escript_name, raft_erl}. +{escript_emu_args, "%%! +sbtu +A1\n"}. + +%% Profiles +{profiles, [{test, + [{erl_opts, [debug_info]} + ]}]}. diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..57afcca --- /dev/null +++ b/rebar.lock @@ -0,0 +1 @@ +[]. diff --git a/src/paxos_agent.erl b/src/paxos_agent.erl new file mode 100644 index 0000000..4c49104 --- /dev/null +++ b/src/paxos_agent.erl @@ -0,0 +1,109 @@ +-module(paxos_agent). + +-behavior(gen_statem). + +-export([callback_mode/0, init/1, handle_event/2]). + +callback_mode() -> [handle_event_function, state_enter]. + +init(Opts) -> + ok. + +% persistent state +-record( + pstate, { + name, % the identifier of this agent + outcome = [], % the last decree written in the ledger, or [] + lastTried = -1, % the number of the last ballot begun by this agent + prevBal = -1, % the number of the last ballot in which this agent voted + prevDec = [], % the decree of the last ballot in which this agent voted + nextBal -1 % the number of the last ballot in which this agent agreed to participate, or -1 + }). +% transient state +-record( + tstate, { + status = idle, % one of idle, trying, or polling + prevVotes = [], % set of votes received in LastVote messages for the current ballot (the one in lastTried) + quorum, % if status = polling, the set of priests forming quorum of current ballot; otherwise meaningless + voters, % if status = polling, the set of quorum members from whom p has received 'voted' messages + decree, % if status = polling, the decree of the current ballot + }). + +% Note: Guaranteed not to Lose Progress +% Note: Guaranteed to Proceed if N/2+1 nodes are available. + +% Ballots are a { BallotId :: integer, OwnerName :: string } +try_new_ballot(Tstate, Pstate) -> + {LastTriedId, _LastTriedOwner} = Pstate#pstate.lastTried, + Ballot = {LastTriedId+1, Pstate#pstate.name}, + NewPstate = Pstate#pstate{lastTried=Ballot}, + NewTstate = Tstate#tstate{status=trying, prevVotes=[]}, + {ok, {NewTstate, NewPstate}}. + +send_next_ballot(Tstate, Pstate) + when Tstate#tstate.status == trying -> + send(any_priest, {next_ballot, Pstate#pstate.lastTried}), + {ok, Tstate, Pstate}. + +recv_next_ballot({next_ballot, Ballot}, Tstate, #pstate{nextBal=NextBal} = Pstate) + when Ballot > NextBal -> + NewPstate = Pstate#pstate{nextBal=Ballot}, + {ok, {Tstate, NewPstate}}. + +send_last_vote(Tstate, Pstate) + when Pstate#pstate.nextBal > Pstate#pstate.prevBal -> % send the last decree this voted for + Vote = {Pstate#pstate.name, Pstate#pstate.prevBal, Pstate#pstate.prevDec}, + send(owner(Pstate#pstate.nextBal), {last_vote, Pstate#pstate.nextBal, Vote}), + {ok, Tstate, Pstate}. + +recv_last_vote({last_vote, Ballot, Vote}, Tstate, Pstate) + when Tstate#tstate.status == trying andalso Ballot == Pstate#pstate.lastTried -> + NewTstate=Tstate#tstate{prevVotes=[Vote|Tstate#tstate.prevVotes]}, % TODO: this should be union, not append + {ok, NewTstate, Pstate}. + +start_polling_majority_set_q(Q, Tstate, Pstate) + when Tstate#tstate.status == trying andalso Q -> + NewTstate = Tstate#tstate{ + status=polling, + quorum=Q, + voters=[], + decree=case max(Tstate#tstate.prevVotes) of + {PriestName, {-1, _}, PrevDecree} -> []; + {PriestName, PrevBallot, PrevDecree} -> PrevDecree + end, + }, + {ok, NewTstate, Pstate}. + +send_begin_ballot(Tstate, Pstate) + when Tstate#tstate.status == polling -> + send(random(Tstate#tstate.quorum), {begin_ballot, Pstate#pstate.lastTried, Tstate#tstate.decree}), + {ok, Tstate, Pstate}. + +recv_begin_ballot({begin_ballot, Ballot, Decree}, Tstate, Pstate) + when Ballot == Pstate#pstate.nextBal andalso Ballot > Pstate#pstate.prevBal -> + NewPstate = Pstate#pstate{prevBal=Ballot, nextBal=Decree}, + % if there is a Ballot B in _B_ with ballotId = Ballot, choose that node and let the new value of _B_ include the union of Bc (I believe this means commit?) + {ok, Tstate, NewPstate}. + +send_voted(Tstate, Pstate) + when Pstate#pstate.prevBal /= -1 -> + send(owner(Pstate#pstate.prevBal), {voted, Pstate#pstate.prevBal, Pstate#pstate.name}), + {ok, Tstate, Pstate}. + +recv_voted({voted, Ballot, Priest}, Tstate, Pstate) + when Tstate#tstate.status == polling andalso Ballot == Pstate#pstate.lastTried -> + {ok, Tstate#tstate{votes=[Priest|Tstate#tstate.voters]}, Pstate}. % this could be a transition point + +succeed(Tstate, Pstate) %todo: subset needs to be implemented as an if + when Tstate#tstate.status == polling andalso subset(quorum, voters) andalso Pstate#pstate.outcome == [] -> + NewPstate = Pstate#pstate{outcome = Tstate#tstate.decree}, % this is the commit step, i think + {ok, Tstate, NewPstate}. + +send_success(Tstate, Pstate) + when Pstate#pstate.outcome /= blank -> + send(any_priest(), {success, Pstate#pstate.outcome}), + {ok, Tstate, Pstate}. + +recv_success({success, Outcome}, Tstate, Pstate) + when Pstate#pstate.outcome /= blank -> + {ok, Tstate, Pstate#pstate{outcome=Outcome}}. % follower commit step diff --git a/src/raft_agent.erl b/src/raft_agent.erl new file mode 100644 index 0000000..d8c81c1 --- /dev/null +++ b/src/raft_agent.erl @@ -0,0 +1,261 @@ +-module(raft_agent). + +-behavior(gen_statem). + +%% g +%-export([start_link/0]). +%% gen_statem callbacks +-export([callback_mode/0, init/1, handle_event/4]). + +%% Client API +-export([operate/2]). + +operate(ServerRef, Fun) -> + gen_statem:call(ServerRef, {client_req, raft_log:op(Fun, [])}, 500). + +% persisted state should be saved to disk after modification and before a callback returns +-record(state, { + id,term=0,voted_for=[],log,cluster=[], %persisted state + ldr_next_index, ldr_match_index, %volatile state + votes_gathered=0 + }). + +%% gen_statem callbacks + +callback_mode() -> [handle_event_function, state_enter]. + +init(Args) -> + State0 = parse_args(Args), + State1 = State0#state{log=raft_log:new(#{})}, + {ok, follower, State1}. + +%% Message Events that need to be handled: +%% +%% Election Timeout Elapsed (follower and candidate state only) +%% Received AppendEntries RPC +%% Received AppendEntries ACK +%% Received RequestVote RPC +%% Received RequestVote ACK +%% +%% Received ClientRequest RPC -> Commit Transition / Query-NoOp +%% [Committed ClientEntry] -> Respond to Client + +%% All States + +%% Receiving an out-of-date RPC or ACK +%handle_event(cast, {_Type, Term, _Payload}=Msg, _Role, #state{term=CurrentTerm}=Data) +% when Term > CurrentTerm -> +% {next_state, follower, Data#state{term=Term, voted_for=[]}}; + +%% Follower State +handle_event(enter, follower, follower, #state{}) -> + {keep_state_and_data, [action_reset_election()]}; + +handle_event(timeout, election, follower, #state{}=Data) -> + {next_state, candidate, Data}; + +%% Candidate State +handle_event(enter, _PrevState, candidate, #state{term=Term,id=Id}=Data0) -> + Data1 = Data0#state{term=Term+1, voted_for=Id, votes_gathered=1}, + lists:foreach(fun(Node) -> send(Node, rpc(request_vote, Data1)) end, peers(Data1)), + {keep_state, Data1}; + + +% if we are out of date, revert to follower +handle_event(cast, {rpc, _RpcType, Term, _Payload}=EventData, _State, #state{term=CurrentTerm}=Data) when Term > CurrentTerm -> + {next_state, follower, Data#state{term=Term, voted_for=[]}, [{next_event, cast, EventData}]}; + +handle_event(cast, {rpc, request_vote, Term, {Candidate, _LastIdx, _LastTerm}}, _State, #state{term=CurrentTerm}) when Term < CurrentTerm -> + send(Candidate, {ack, request_vote, CurrentTerm, false}), + keep_state_and_data; + +handle_event(cast, {rpc, request_vote, CurrentTerm, {Candidate, CLastIndex, CLastTerm}}, follower, #state{term=CurrentTerm,log=Log,voted_for=VotedFor}=Data) -> + LastIndex = raft_log:last_index(Log), + LastTerm = raft_log:term_at(Log, LastIndex), + Result = + if + not (VotedFor == [] orelse VotedFor == Candidate) -> + false; + LastTerm > CLastTerm -> + false; + LastIndex > CLastIndex -> + false; + true -> + true + end, + send(Candidate, {ack, request_vote, CurrentTerm, Result}), + case Result of + true -> + {keep_state, Data#state{voted_for=Candidate}}; + false -> + keep_state_and_data + end; + +handle_event(cast, {rpc, append_entries, Term, {LeaderId, _PrevLogIndex, _PrevLogTerm, [] = _Entries, _LeaderCommit}}, _Role, #state{id=Id, term=Term}) -> + send(LeaderId, {ack, append_entries, Term, Id, true}), + keep_state_and_data; + +handle_event(cast, {rpc, append_entries, Term, {LeaderId, PrevLogIndex, PrevLogTerm, Entries, LeaderCommit}}, _Role, #state{id=Id, term=Term, log=Log}=Data) -> + {Result, NewLog} = + case raft_log:term_at(Log, PrevLogIndex) of + PrevLogTerm -> + {false, Log}; + _ -> + Log1 = raft_log:drop_after(Log, PrevLogIndex), + Log2 = raft_log:append_all(Log1, Entries), + Log3 = raft_log:commit(Log2, min(LeaderCommit, raft_log:last_index(Log2))), + {true, Log3} + end, + send(LeaderId, {ack, append_entries, Term, Id, Result}), + {keep_state, Data#state{log=NewLog}}; + +handle_event(cast, {ack, request_vote, Term, true}, candidate, #state{term=Term,votes_gathered=Votes,cluster=Cluster}=Data) -> + NewVotes = Votes+1, + NewData = Data#state{votes_gathered=NewVotes}, + if + NewVotes >= length(Cluster) div 2 -> + {next_state, leader, NewData}; + true -> + {keep_state, NewData} + end; + +%% Leader Mode! +handle_event(enter, leader, leader, Data) -> + handle_event(enter, candidate, leader, Data); +handle_event(enter, candidate, leader, Data) -> + % send to all members + Peers = peers(Data), + LastLogIndex = raft_log:last_index(Data#state.log), + NextIndex = maps:from_list(lists:map(fun (Peer) -> {Peer, LastLogIndex + 1} end, Peers)), + MatchIndex = maps:from_list(lists:map(fun (Peer) -> {Peer, 0} end, Peers)), + NewData = Data#state{ldr_match_index=MatchIndex,ldr_next_index=NextIndex}, + + SendHeartbeat = fun(Server) -> send(Server, rpc(append_entries, Server, NewData)) end, + lists:foreach(SendHeartbeat, Peers), + {keep_state, NewData, [action_heartbeat()]}; + +handle_event({call, From}, {client_req, {op, Op, Args}}, leader, #state{term=Term,log=Log}=Data) -> + Parent = self(), + ProxyOp = + fun(State) -> + {State, Response} = apply(Op, [State | Args]), + gen_statem:cast(Parent, {client_resp, From, Response}), + State + end, + NewLog = raft_log:append(Log, Term, {op, ProxyOp, []}), + {keep_state, Data#state{log=NewLog}, [{next_event, internal, check_followers}]}; + +handle_event(cast, {client_resp, To, Result}, leader, _Data) -> + gen_statem:reply(To, Result), + keep_state_and_data; + +handle_event(internal, check_followers, leader, #state{cluster=Cluster,log=Log,ldr_next_index=NextIndices}=Data) -> + LastIndex = raft_log:last_index(Log), + AllPeers = peers(Data), + CheckFn = fun(Peer) -> + #{Peer := NextIndex} = NextIndices, + LastIndex >= NextIndex + end, + OutOfDatePeers = lists:filter(CheckFn, AllPeers), + SendAppendEntries = fun(Peer) -> send(Peer, rpc(append_entries, Peer, Data)) end, + lists:foreach(SendAppendEntries, OutOfDatePeers), + keep_state_and_data; + +handle_event(cast, {ack, append_entries, Term, FollowerId, false}, leader, #state{ldr_next_index=NextIndices,log=Log}=Data) -> + NewNextIndices = maps:update_with(FollowerId, fun(Value) -> Value-1 end, NextIndices), + NewData = Data#state{ldr_next_index=NewNextIndices}, + send(FollowerId, rpc(append_entries, NewData)), + {keep_state, NewData}; + +handle_event(cast, {ack, append_entries, Term, FollowerId, true}, leader, #state{ldr_match_index=MatchIndices, ldr_next_index=NextIndices,log=Log,cluster=Cluster}=Data) -> + %io:format("LEADER HEARTBEAT: ~p~n", [Data]), + NewNextIndices = NextIndices#{FollowerId => raft_log:last_index(Log)}, + NewMatchIndices = MatchIndices#{FollowerId => raft_log:last_index(Log)}, + % if SOME INDEX is replicated to a majority of followers, commit that index (the highest SOME INDEX) + % That index will be the median of the indices in NextIndices + SortedIndices = lists:sort(maps:values(NewMatchIndices)), + MedianIndex = length(Cluster) div 2, + MajorityCommitted = lists:nth(MedianIndex + 1, SortedIndices), + NewLog = raft_log:commit(Log, MajorityCommitted), + {keep_state, Data#state{log=NewLog, ldr_next_index=NewNextIndices, ldr_match_index=NewMatchIndices}}; + +%% Mostly Ignore votes we gather after being elected +handle_event(cast, {ack, request_vote, Term, Success}, leader, #state{term=Term,votes_gathered=Votes}=Data) -> + case Success of + true -> + {keep_state, Data#state{votes_gathered=Votes+1}}; + false -> + keep_state_and_data + end; + +handle_event(state_timeout, heartbeat, leader, #state{}) -> + repeat_state_and_data; + +handle_event(timeout, election, candidate, #state{}) -> + repeat_state_and_data. + +%% +%% Private Functions +%% + +%% A message + +rpc(append_entries, To, #state{ldr_next_index=NextIndices,term=Term,id=Id,log=Log}) -> + #{To := NextIndex} = NextIndices, + LastIndex = raft_log:last_index(Log), + PrevLogIndex = NextIndex - 1, + PrevLogTerm = raft_log:at(Log, PrevLogIndex), + Entries = raft_log:sublist(Log, NextIndex), + {rpc, append_entries, Term, {Id, PrevLogIndex, PrevLogTerm, Entries, raft_log:commit_index(Log)}}. + +rpc(heartbeat, #state{term=Term,id=Id,log=Log}) -> + PrevLogIndex = 0, + PrevLogTerm = 0, + % TODO: populate entries + Entries = [], + {rpc, append_entries, Term, {Id, PrevLogIndex, PrevLogTerm, Entries, raft_log:commit_index(Log)}}; +rpc(request_vote, #state{term=Term,id=Id,log=Log}) -> + LastIndex = raft_log:last_index(Log), + {rpc, request_vote, Term, {Id, LastIndex, raft_log:term_at(Log, LastIndex)}}. + +% min and max in millseconds +-define(election_min, 150). +-define(election_max, 300). +election_timeout() -> + ?election_min + rand:uniform(?election_max-?election_min+1) - 1. +heartbeat_timeout() -> + 50. + +%% gen_statem action to handle event transition + +action_reset_election() -> + {timeout, election_timeout(), election}. + +action_heartbeat() -> + {state_timeout, heartbeat_timeout(), heartbeat}. + +%action_retry_rpc(RpcData) -> +% {state_timeout, heartbeat_timeout(), {retry, RpcData}}. + +parse_args(Args) -> parse_args(Args, #state{}). + +parse_args([{id, Id}|Rest], State) -> + parse_args(Rest, State#state{id=Id}); +parse_args([{cluster, Cluster}|Rest], State) -> + parse_args(Rest, State#state{cluster=Cluster}); +parse_args([], State) -> State. + +peers(#state{id=Id,cluster=Cluster}) -> + Cluster -- [{local, Id}]. + +%% Update Commit Index (call this from check_followers) + +%% MOCK + +% TODO: Make this probabilistically fail +send(To, Data) when is_atom(To) -> + %io:format("Sending: ~p to ~p~n", [Data, To]), + gen_statem:cast(To,Data); +send({local, To}, Data) -> + %io:format("Sending: ~p to ~p~n", [Data, To]), + gen_statem:cast(To,Data). diff --git a/src/raft_erl.app.src b/src/raft_erl.app.src new file mode 100644 index 0000000..0b1b1cd --- /dev/null +++ b/src/raft_erl.app.src @@ -0,0 +1,14 @@ +{application, raft_erl, + [{description, "An escript"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/src/raft_erl.erl b/src/raft_erl.erl new file mode 100644 index 0000000..cde9253 --- /dev/null +++ b/src/raft_erl.erl @@ -0,0 +1,43 @@ +-module(raft_erl). + +%% API exports +-export([main/1]). + +%%==================================================================== +%% API functions +%%==================================================================== + +%% escript Entry point +main(Args) -> + storage:start_link(), + storage:store(lol, wut), + {ok, Omg} = storage:load(lol), + %Debug = [], + Debug = [{debug, [trace,log]}], + %Cluster = [{local, agent1}], + Cluster = [{local, agent1}, {local, agent2}, {local, agent3}], + lists:foreach( + fun({local, Id}=Name) -> + {ok, _Pid} = gen_statem:start_link(Name, raft_agent, [{id, Id}, {cluster, Cluster}], + Debug) + end, + Cluster), + timer:sleep(1000), + %io:format("Raft State: ~p~n", [raft_agent:operate(agent1, get_op())]), + erlang:halt(0). + +set_op(Value) -> + raft_log:op(fun do_set/2, [Value]). + +get_op() -> + raft_log:op(fun do_get/1, []). + +do_set(State, Value) -> + {Value, ok}. + +do_get(State) -> + {State, State}. + +%%==================================================================== +%% Internal functions +%%==================================================================== diff --git a/src/raft_log.erl b/src/raft_log.erl new file mode 100644 index 0000000..e84167f --- /dev/null +++ b/src/raft_log.erl @@ -0,0 +1,64 @@ +-module(raft_log). + +-export([new/1, last_index/1, commit_index/1, applied_index/1, commit/2, sublist/2, op/2]). +-export([append/3, append_all/3, drop_after/2]). +-export([at/2, term_at/2, op_at/2]). + +-record(state, {oplist=[], state=[], applied_idx=0, commit_idx=0}). +-record(op, {f=fun(X)->X end, args=[]}). + +new(Init) -> #state{oplist=[{0, op(fun first_op/2, [Init])}]}. + +op(Fun, Args) -> + #op{f=Fun, args=Args}. + +last_index(#state{oplist=Log}) -> length(Log). + +applied_index(#state{applied_idx=Idx}) -> Idx. + +commit_index(#state{commit_idx=Idx}) -> Idx. + +commit(#state{oplist=Ops,state=State,commit_idx=OldIndex}=Log, NewIndex) when NewIndex > OldIndex -> + io:format("COMMITTING~n"), + % foreach operation between NewIndex and OldIndex, apply that operation to state + ToApply = lists:sublist(Ops, OldIndex, NewIndex - OldIndex), + NewState = lists:foldl(fun({Op, Args}, Acc) -> apply(Op, [Acc|Args]) end, State, ToApply), + Log#state{state=NewState, commit_idx=NewIndex}; + +% Ignore NewIndex <= OldIndex +commit(Log, NewIndex) -> +% io:format("NOT COMMITTING~p~n Index:(~p)~n State:(~p)~n Log:(~p)~n", [self(), NewIndex, Log#state.state, Log#state.oplist]), + Log. + +sublist(Log, Index) when Index >= length(Log#state.oplist) -> + []; +sublist(Log, Index) -> + lists:nthtail(Index, Log#state.oplist). + +drop_after(Log, Index) -> + lists:sublist(Log#state.oplist, Index). + +append(#state{oplist=Log}=State, Term, Op) -> + State#state{oplist=Log ++ [{Term, Op}]}. + +append_all(State, _Term, []) -> + State; +append_all(State, Term, [Op|Tail]) -> + append(State, Term, Op). + +%% Lookup commands + +at(#state{oplist=Log}, Index) -> + lists:nth(Index, Log). + +term_at(Log, Index) -> + {Term, _Op} = at(Log, Index), + Term. + +op_at(Log, Index) -> + {_Term, Op} = at(Log, Index), + Op. + +%% + +first_op(_, InitialState) -> InitialState. diff --git a/src/storage.erl b/src/storage.erl new file mode 100644 index 0000000..6fdff86 --- /dev/null +++ b/src/storage.erl @@ -0,0 +1,41 @@ +-module(storage). +-behavior(gen_server). + +%% gen_server callbacks +-export([init/1, handle_call/3, terminate/2]). + +%% public api +-export([start_link/0, load/1, store/2]). + +init([]) -> + TableRef = ets:new(storage, []), + {ok, TableRef}. + +terminate(_Reason, TableRef) -> + catch ets:delete(TableRef), + ok. + +% TODO: deal with hard expectation to be running as 'storage' here + +start_link() -> + gen_server:start_link({local, storage}, ?MODULE, [], []). + +load(Key) -> + gen_server:call(storage, {load, Key}). + +store(Key, Value) -> + gen_server:call(storage, {store, Key, Value}). + + +%% private methods + +handle_call({load, Key}, _From, TableRef) -> + Reply = case ets:lookup(TableRef, Key) of + [{Key, Value}] -> {ok, Value}; + [] -> error + end, + {reply, Reply, TableRef}; + +handle_call({store, Key, Value}, _From, TableRef) -> + ets:insert(TableRef, {Key, Value}), + {reply, ok, TableRef}.