diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index 20fe9225..dc19bb81 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -192,8 +192,8 @@ impl Progress { // Do not decrease next index if it's requesting snapshot. if request_snapshot == INVALID_INDEX { self.next_idx = cmp::min(rejected, match_hint + 1); - if self.next_idx < 1 { - self.next_idx = 1; + if self.next_idx < self.matched + 1 { + self.next_idx = self.matched + 1; } } else if self.pending_request_snapshot == INVALID_INDEX { // Allow requesting snapshot even if it's not Replicate. diff --git a/tla/.gitignore b/tla/.gitignore new file mode 100644 index 00000000..2eb864a6 --- /dev/null +++ b/tla/.gitignore @@ -0,0 +1,2 @@ +states +*.jar \ No newline at end of file diff --git a/tla/Makefile b/tla/Makefile new file mode 100644 index 00000000..59666e33 --- /dev/null +++ b/tla/Makefile @@ -0,0 +1,19 @@ +TLA2TOOLS_JAR_URL = https://github.com/tlaplus/tlaplus/releases/download/v1.8.0/tla2tools.jar +COMMUNITYMODULES_DEPS_JAR_URL = https://github.com/tlaplus/communityModules/releases/latest/download/CommunityModules-deps.jar +TLA ?= consensus/WrapperUdp.tla + +.PHONY: all clean download run + +download: tla2tools.jar CommunityModules-deps.jar + +run: download + exec java -Dtlc2.tool.fp.FPSet.impl=tlc2.tool.fp.OffHeapDiskFPSet -XX:+UseParallelGC -cp tla2tools.jar:CommunityModules-deps.jar tlc2.TLC $(TLA) -workers auto -deadlock -cleanup + +tla2tools.jar: + wget -O tla2tools.jar $(TLA2TOOLS_JAR_URL) + +CommunityModules-deps.jar: + wget -O CommunityModules-deps.jar $(COMMUNITYMODULES_DEPS_JAR_URL) + +clean: + rm -rf states* \ No newline at end of file diff --git a/tla/README.md b/tla/README.md new file mode 100644 index 00000000..0c02af0f --- /dev/null +++ b/tla/README.md @@ -0,0 +1,46 @@ +# The TLA+ Specification for Raft-RS + +## Overview + +Welcome to the TLA+ specification for Raft-RS! + +The TLA+ specification is tailored specifically for Raft-RS. +Unlike existing TLA+ specifications for the Raft protocol, which tend to be high-level and abstract, +we write specifications closely with the codebase. +By doing so, we aim to capture the design choices and optimizations made in the implementation, +thereby enabling model checking the implementation core logic to uncover subtle bugs and edge cases at the specification level. + +At present, the specification modeled the basic Raft modules, including leader election and log replication. +Other modules (e.g., log compaction) are planned to model. +The specification assumes the UDP failure model because Raft-RS is designed to be agnostic of the underlying transport layer. +The UDP failure model allows message drop, duplication, and unordered delivery. + +We have conducted certain scale of model checking to verify the correctness of the specification. +The specification can serve as the super-doc supplementing detailed system documentation for the Raft-RS developers. + +If you have any question or find any problem of the specification, please contact us. + +## Scale of testing + +We verified a model with 3 servers, 4 client requests and 2 failures for 24 hours. This process generated 344,103,609 states and reached 18 depth without violating safety properties. + +## Running the model checker + +The specifications in this folder are implemented for and were checked with the TLC model checker, specifically with the nightly build of TLC. The scripts in this folder allow you to run TLC using the CLI easily. + +To download and then run TLC, simply execute: + +```shell +make run +``` + +If you want to specify testing UDP (WrapperUdp) or TCP (WrapperTcp) specifically, you should + +```shell +make run TLA="consensus/WrapperTcp.tla" +make run TLA="consensus/WrapperUdp.tla" +``` + +## Tip + +TLC works best if it can utilize all system resources. You can find the parameter settings for the worker in the run field of the makefile and adjust them to suit your computer's parameters diff --git a/tla/consensus/FifoNetwork.tla b/tla/consensus/FifoNetwork.tla new file mode 100644 index 00000000..af8e41f7 --- /dev/null +++ b/tla/consensus/FifoNetwork.tla @@ -0,0 +1,374 @@ +---------------------------- MODULE FifoNetwork ---------------------------- + +EXTENDS Sequences, Naturals, FiniteSets, TLC + +(*************************************************************************** + VARIABLES definitions: see InitFifoNetwork + ***************************************************************************) +VARIABLES _msgs, \* Messages in the network + _netman, \* Network manager + _netcmd \* Current network cmd + +(*************************************************************************** + FLUSH_DISCONN: + * If true, drop all the network wired msgs that are not accessible + * If false, do not drop, and msgs can still be delivered + ***************************************************************************) +CONSTANT FLUSH_DISCONN + +(*************************************************************************** + NULL_MSG: represent a null msg in some condition checkings + * Should be a model value if its type is CONSTANT + ***************************************************************************) +CONSTANT NULL_MSG +\*NULL_MSG == [ NULL_MSG |-> "" ] + +---- \* Common functions + +(*************************************************************************** + API InitFifoNetwork(nodes): + - _msgs: init to emtpy sequences of [src][dst] records + * format: [ seq |-> 0, src |-> s0, dst |-> s1, type |-> sth, data |-> sth] + * src and dst will be dropped when storing in _msgs + * type and data are user defined fields + - _netman: + - n_sent: number of msgs sent to network, to indicate next msg seq + - n_recv: number of msgs delivered to server + - n_wire: number of msgs in network but not delivered yet + - conn: network connections + * format: {n0, n1}, represents n0 is connected with n1 + * default all connnected + - _netcmd: <<"init">> + ***************************************************************************) +InitFifoNetworkAddNetman(nodes, cmd, additionalNetman) == + /\ _msgs = [ sender \in nodes |-> [ recver \in nodes \ {sender} |-> <<>> ]] + /\ _netman = additionalNetman @@ + [ n_sent |-> 0, n_recv |-> 0, n_wire |-> 0, conn |-> <>, + n_part |-> 0, n_cure |-> 0] + /\ _netcmd = <> + +InitFifoNetwork(nodes) == InitFifoNetworkAddNetman(nodes, "init", <<>>) + +(*************************************************************************** + _GetNodes: get all nodes in msg channels + ***************************************************************************) +_GetNodes == DOMAIN _msgs + +(*************************************************************************** + _Pick: choose any one + ***************************************************************************) +_Pick(S) == CHOOSE s \in S : TRUE + +(*************************************************************************** + API IsNullMsg: check if msg m is NULL + ***************************************************************************) +IsNullMsg(m) == m = NULL_MSG + +---- \* Update _netman functions + +(*************************************************************************** + _NetGetHelper and _NetIncHelper: get, inc and dec member of _netman records + ***************************************************************************) +_NetGetHelper(member) == _netman[member] +_NetIncHelper(member) == (member :> _netman[member] + 1) +_NetDecHelper(member) == (member :> _netman[member] - 1) +NetIncBy(member, number) == (member :> _netman[member] + number) + +NetGetSent == _NetGetHelper("n_sent") +NetIncSent == _NetIncHelper("n_sent") +NetGetRecv == _NetGetHelper("n_recv") +NetIncRecv == _NetIncHelper("n_recv") +NetGetWire == _NetGetHelper("n_wire") +NetIncWire == _NetIncHelper("n_wire") +NetDecWire == _NetDecHelper("n_wire") +NetGetPart == _NetGetHelper("n_part") +NetIncPart == _NetIncHelper("n_part") +NetGetCure == _NetGetHelper("n_cure") +NetIncCure == _NetIncHelper("n_cure") + +(*************************************************************************** + NetmanIncField: increase a field that is not a standard netman member + * updater is the return value of NetDelMsg/NetAddMsg .. + ***************************************************************************) +NetmanIncField(field, updater) == + <<_NetIncHelper(field) @@ updater[1]>> @@ updater + +(*************************************************************************** + _WireReduce, _WireNodeSumHelper, _WireSumHelper, NetSumWire: + * Sum up wired msgs + ***************************************************************************) +RECURSIVE _WireReduce(_, _, _, _) +_WireReduce(Helper(_, _, _), nodes, res, msgs) == + IF nodes = {} THEN res + ELSE LET n == _Pick(nodes) + IN _WireReduce(Helper, nodes \ {n}, Helper(n, res, msgs), msgs) + +_WireNodeSumHelper(n, res, msgs) == + LET node_msgs_list == msgs[n] + IN res + Len(node_msgs_list) + +_WireSumHelper(n, res, msgs) == + LET node_msgs == msgs[n] + to_nodes == DOMAIN node_msgs + IN res + _WireReduce(_WireNodeSumHelper, to_nodes, 0, node_msgs) + +_WireSum(msgs) == + [ n_wire |-> _WireReduce(_WireSumHelper, DOMAIN msgs, 0, msgs) ] + +NetSumWire == _WireSum(_msgs) + +(*************************************************************************** + API NetUpdate(args): update _netman with args[1], update _msgs with args[2] + - e.g. NetUpdate(<>) + ***************************************************************************) +NetUpdate(args) == + /\ _netman' = args[1] @@ _netman + /\ _msgs' = args[2] + /\ IF Len(args) = 3 + THEN _netcmd' = args[3] + ELSE _netcmd' = <<"noop">> + +NetUpdate2(args, cmd) == + /\ _netman' = args[1] @@ _netman + /\ _msgs' = args[2] + /\ IF Len(args) = 3 + THEN _netcmd' = <> + ELSE _netcmd' = <> + +---- \* Network partition functions + +(*************************************************************************** + _AddConn: add nodes connections and return connected nodes + * no change: + * if nodes contain only one node, or + * if nodes already connected + ***************************************************************************) +_AddConn(nodes, conn) == + IF \/ Cardinality(nodes) <= 1 + \/ Len(SelectSeq(conn, LAMBDA p: nodes \subseteq p)) > 0 + THEN conn + ELSE Append(SelectSeq(conn, LAMBDA p: \neg (p \subseteq nodes)), nodes) + +(*************************************************************************** + _DelConn: isolate nodes from others and return connected nodes + * delete node in nodes from all connections + * if after deleting, the nodes set has no more than 1 ndoe, delete the set + ***************************************************************************) +_DelConn(nodes, conn) == + LET F[i \in 0..Len(conn)] == + IF i = 0 THEN <<>> + ELSE IF Cardinality(conn[i] \ nodes) <= 1 THEN F[i-1] + ELSE Append(F[i-1], conn[i] \ nodes) + IN F[Len(conn)] + +(*************************************************************************** + _PartConn: delete nodes from other connections and then connect nodes + ***************************************************************************) +_PartConn(nodes, conn) == + _AddConn(nodes, _DelConn(nodes, conn)) + +(*************************************************************************** + _FlushReduce, _FlushMsgsHelper, _FlushMsgs: + * Flush disconnected msgs in wire + * _FlushMsgs: Return flushed msgs + _FlushMsgsDelHelper, _FlushMsgsDel: + * All msgs are flushed in delete_nodes (not inner-connnected) + ***************************************************************************) +RECURSIVE _FlushReduce(_, _, _, _, _) +_FlushReduce(Helper(_, _, _, _), nodes, res, part, msgs) == + IF nodes = {} THEN res + ELSE LET n == _Pick(nodes) + IN _FlushReduce(Helper, nodes \ {n}, Helper(n, res, part, msgs), part, msgs) + +_FlushMsgsHelper(n, res, part, msgs) == + LET node_msgs == msgs[n] + to_nodes == DOMAIN node_msgs + flush_nodes == IF n \in part THEN to_nodes \ part ELSE part + IN ( n :> ([ x \in flush_nodes |-> <<>> ] @@ node_msgs) ) @@ res + +_FlushMsgsDelHelper(n, res, delete_nodes, msgs) == + LET node_msgs == msgs[n] + to_nodes == DOMAIN node_msgs + flush_nodes == IF n \in delete_nodes THEN to_nodes ELSE delete_nodes + IN ( n :> ([ x \in flush_nodes |-> <<>> ] @@ node_msgs) ) @@ res + +_FlushMsgs(part, msgs) == + _FlushReduce(_FlushMsgsHelper, DOMAIN msgs, msgs, part, msgs) + +_FlushMsgsDel(delete_nodes, msgs) == + _FlushReduce(_FlushMsgsDelHelper, DOMAIN msgs, msgs, delete_nodes, msgs) + +(*************************************************************************** + API NetAddConn: add a network connection + * return <> + ***************************************************************************) +NetAddConn(nodes) == + <<[ conn |-> _AddConn(nodes, _netman.conn) ], _msgs, <<"conn_add", nodes>>>> + +(*************************************************************************** + API NetDelConn: isolate nodes from all other nodes + * unlike NetPartConn, nodes in the deletion set are not connected + ***************************************************************************) +NetDelConn(nodes) == + LET conn == [ conn |-> _DelConn(nodes, _netman.conn) ] + IN IF FLUSH_DISCONN + THEN LET msgs == _FlushMsgsDel(nodes, _msgs) + msgs_sum == NetSumWire + IN <>>> + ELSE <>>> + +(*************************************************************************** + API NetPartConn: add a network partition + ***************************************************************************) +NetPartConn(nodes) == + LET conn == [ conn |-> _PartConn(nodes, _netman.conn) ] + IN IF FLUSH_DISCONN + THEN LET msgs == _FlushMsgs(nodes, _msgs) + msgs_sum == NetSumWire + IN <>>> + ELSE <>>> + +(*************************************************************************** + API NetCureConn: connect all nodes + ***************************************************************************) +NetCureConn == <<[ conn |-> <<_GetNodes>> ] @@ NetIncCure, _msgs, <<"conn_cure">>>> + +(*************************************************************************** + API NetIsConn: check s0 and s1 are connected + ***************************************************************************) +NetIsConn(s0, s1) == + Len(SelectSeq(_netman.conn, LAMBDA p: {s0, s1} \subseteq p)) /= 0 + +(*************************************************************************** + API NetIsParted: check if network is partitioned + ***************************************************************************) +NetIsParted == + IF \/ Len(_netman.conn) /= 1 + \/ _netman.conn[1] /= _GetNodes + THEN TRUE + ELSE FALSE + +---- \* Network send and recv functions + +(*************************************************************************** + _AddMsgSrcDstSeq, _AddMsgSrcDst, _AddMsg: add msg m to msgs + * return <> + * set global seq to msg m + ***************************************************************************) +_AddMsgSrcDstSeq(src, dst, seq, m, msgs) == + LET m_ == IF NetIsConn(src, dst) + THEN [ x \in ((DOMAIN m \union {"seq"}) \ {"src", "dst"}) |-> + IF x = "seq" THEN seq ELSE m[x] ] + ELSE NULL_MSG \* Dropped. + IN IF m_ = NULL_MSG THEN <<0, msgs>> + ELSE <<1, [ msgs EXCEPT ![src][dst] = Append(@, m_) ]>> +_AddMsgSrcDst(src, dst, m, msgs) == + LET seq == NetGetSent + 1 + IN _AddMsgSrcDstSeq(src, dst, seq, m, msgs) +_AddMsg(m, msgs) == _AddMsgSrcDst(m.src, m.dst, m, msgs) + +(*************************************************************************** + _BatchAddMsgs: batch add multi messages to msgs + * return <> + * set global seq to each msg m + ***************************************************************************) +_BatchAddMsgs(ms, msgs) == + LET F[i \in 0..Len(ms)] == + IF i = 0 THEN <<0, msgs, <<"msg_batch_add">>>> + ELSE LET m == ms[i] + seq == NetGetSent + F[i-1][1] + 1 + res == _AddMsgSrcDstSeq(m.src, m.dst, seq, m, F[i-1][2]) + IN <> + ELSE <<"dropped", m.src, m.dst>>)>> + IN F[Len(ms)] + +(*************************************************************************** + _DelMsg: delete m from msgs return <> + ***************************************************************************) +_DelMsg(m, msgs) == + LET m_ == msgs[m.src][m.dst][1] + IN IF m.seq = m_.seq THEN <<1, [ msgs EXCEPT ![m.src][m.dst] = Tail(@)]>> + ELSE Assert(FALSE, "DelMsg: seq mismatch") + +(*************************************************************************** + _GetMsg: get m from msgs[src][dst] + * since it is fifo network, only head msg can be obtained + ***************************************************************************) +_GetMsg(src, dst, msgs) == + LET m_ == msgs[src][dst] + len == Len(m_) + IN IF len > 0 THEN [ src |-> src, dst |-> dst] @@ m_[1] ELSE NULL_MSG + +(*************************************************************************** + _ReplyMsg: delete request from msgs and then add reponse to msgs + * return <> + ***************************************************************************) +_ReplyMsg(reponse, request, msgs) == + LET del == _DelMsg(request, msgs) + add == _AddMsgSrcDst(request.dst, request.src, reponse, del[2]) + IN <> + +(*************************************************************************** + API NetGetMsg: Get msg from src -> dst FIFO head + * return msg m + ***************************************************************************) +NetGetMsg(src, dst) == _GetMsg(src, dst, _msgs) + +(*************************************************************************** + API NetDelMsg: Del first msg of m.src -> m.dst + * return <> + * update with NetUpdate + ***************************************************************************) +NetDelMsg(m) == + LET res == _DelMsg(m, _msgs) + IN <>>> + +(*************************************************************************** + API NetAddMsgSrcDst, NetAddMsg: Add m to the end of m.src -> m.dst + * return <> + ***************************************************************************) +NetAddMsgSrcDst(src, dst, m) == + LET res == _AddMsgSrcDst(src, dst, m, _msgs) + IN IF res[1] = 1 + THEN <>>> + ELSE <<_netman, res[2], <<"msg_add_dropped", src, dst>>>> +NetAddMsg(m) == NetAddMsgSrcDst(m.src, m.dst, m) + +(*************************************************************************** + API NetReplyMsg: delete request and try to add response to network + * return <> + ***************************************************************************) +NetReplyMsg(response, request) == + LET res == _ReplyMsg(response, request, _msgs) + IN IF res[1] = 0 + THEN <>>> + ELSE <>>> + +(*************************************************************************** + API NetBatchAddMsg: batch add messages ms to msgs + ***************************************************************************) +NetBatchAddMsg(ms) == + LET res == _BatchAddMsgs(ms, _msgs) + IN <> + +(*************************************************************************** + API NetReplyBatchAddMsg: remove request and batch add ms to msgs + ***************************************************************************) +NetReplyBatchAddMsg(ms, request) == + LET del == _DelMsg(request, _msgs) + add == _BatchAddMsgs(ms, del[2]) + IN <>, add[3])>> + +(*************************************************************************** + API NetNoAction: Network state unchanged + * return <> + ***************************************************************************) +NetNoAction1 == <<_netman, _msgs>> +NetNoAction2(cmd) == <<_netman, _msgs, cmd>> + +============================================================================= + diff --git a/tla/consensus/RaftRs.tla b/tla/consensus/RaftRs.tla new file mode 100644 index 00000000..e41e76bf --- /dev/null +++ b/tla/consensus/RaftRs.tla @@ -0,0 +1,1232 @@ +---------------------------- MODULE RaftRs ----------------------------- +(***************************************************************************) +(* This is the TLA+ specification for Raft-RS in TiKV with version 0.7.0 *) +(* *) +(* - Leader election: *) +(* - Log replication: *) +(* *) +(* Currently, the specification assumes: *) +(* - No snapshots *) +(* - No read-only requests *) +(* - No non-voting nodes *) +(* - No disk failures *) +(* - No membership change *) +(***************************************************************************) + +EXTENDS Sequences, Naturals, Integers, FiniteSets, TLC, SequencesExt + +(***************************************************************************) +(* Constants definitions *) +(***************************************************************************) +\* The set of servers +CONSTANT Servers +\* Server states, Corresponding to raft-rs StateRole +CONSTANTS Follower, Candidate, Leader +\* Raft message types +CONSTANTS M_RV, M_RVR, M_AE, M_AER, M_PRV, M_PRVR, M_HB, M_HBR +\* The set of commands +CONSTANTS Commands +\* The abstraction of null operation +CONSTANTS NoOp +\* Misc: state constraint parameters and placeholder +CONSTANTS Nil +\* The set of ProgressState +CONSTANTS Probe, Replicate + +(*************************************************************************** + Variables definitions + ***************************************************************************) +\* Persistent state on all servers +VARIABLES currentTerm, \* Latest term server has seen (initialized to 0 on first boot, increases monotonically) , Corresponding to raft-rs RaftCore.term + votedFor, \* CandidateId that received vote in current term (or null if none), Corresponding to raft-rs RaftCore.vote + log \* Log entries; each entry contains command for state machine, and term when entry was received by leader, Corresponding to raft-rs RaftCore.raft_log + + +\* Volatile state on all servers +VARIABLES raftState, \* State of servers, in {Follower, Candidate, Leader} , Corresponding to raft-rs RaftCore.state + commitIndex, \* Index of highest log entry known to be committed + leader_id \* The potential leader of the cluster, Corresponding to raft-rs RaftCore.leader_id + + +\* Volatile state on leader +VARIABLES nextIndex, \* for each server, index of the next log entry to send to that server, Corresponding to raft-rs Progress.next_idx + matchIndex \* for each server, index of highest log entry known to be replicated on server, Corresponding to raft-rs Progress.matched + +\* intermediate variable +VARIABLES voted_for_me \* Record nodes that have voted for me, Corresponding to raft-rs Progress.voted +VARIABLES voted_reject \* Record nodes that have not voted for me, Corresponding to raft-rs Progress.voted +VARIABLES check_quorum \* check_quorum variables +VARIABLE progress \* The status of each follower's receive log, which is used in receiving append, which contains probe and replicate. Corresponding to raft-rs Progress.state +VARIABLE inflight \* Number of letters transmitted during the recording process. Corresponding to raft-rs Progress.int (Inflights) + + + +(*************************************************************************** + Network variables and instance + ***************************************************************************) +\* The network is modelled through these three variables +VARIABLES netman, + netcmd, + msgs +INSTANCE FifoNetwork WITH FLUSH_DISCONN <- TRUE, NULL_MSG <- Nil, + _msgs <- msgs, _netman <- netman, _netcmd <- netcmd + + +(***************************************************************************) +(* Self manipulated invariants checking *) +(***************************************************************************) +VARIABLES inv \* Invariants that guarantee correctness + +(***************************************************************************) +(* Vars groups *) +(***************************************************************************) +serverVars == <> +leaderVars == <> +candidateVars == <> +logVars == <> +nodeVars == <> +netVars == <> +noNetVars == <> +vars == <> + + +(***************************************************************************) +(* State constraints helper *) +(***************************************************************************) +CONSTANTS Parameters \* to control the model scale + +GetParameterSet(p) == IF p \in DOMAIN Parameters THEN Parameters[p] ELSE {} + +CheckParameterHelper(n, p, Test(_,_)) == + IF p \in DOMAIN Parameters + THEN Test(n, Parameters[p]) + ELSE TRUE +CheckParameterMax(n, p) == CheckParameterHelper(n, p, LAMBDA i, j: i <= j) + +PrePrune(n, p) == CheckParameterHelper(n, p, LAMBDA i, j: i < j) + + +(***************************************************************************) +(* Type Ok. Used as a check on writing format *) +(***************************************************************************) + +TypeOkServerVars == + /\ currentTerm \in [ Servers -> Nat ] + /\ votedFor \in [ Servers -> Servers \cup {Nil} ] + /\ raftState \in [ Servers -> { Follower, Candidate, Leader } ] + +TypeOkLeaderVars == + /\ nextIndex \in [ Servers -> [ Servers -> Nat \ {0} ]] + /\ matchIndex \in [ Servers -> [ Servers -> Nat ]] + +\* TypeOkCandidateVars == +\* /\ votesGranted \in [ Servers -> {} ] + +TypeOkLogVars == + \* log data structure is complex, we skip checking it + /\ commitIndex \in [ Servers -> Nat ] + +TypeOk == + /\ TypeOkServerVars + /\ TypeOkLeaderVars + /\ TypeOkLogVars + + +(*************************************************************************** + Init variables + ***************************************************************************) +InitServerVars == + /\ currentTerm = [ i \in Servers |-> 1 ] + /\ votedFor = [ i \in Servers |-> Nil ] + /\ raftState = [ i \in Servers |-> Follower ] + +InitLeaderVars == + /\ nextIndex = [ i \in Servers |-> [ j \in Servers |-> 1 ]] + /\ matchIndex = [ i \in Servers |-> [ j \in Servers |-> 0 ]] + +InitCandidateVars == + /\ voted_for_me = [ i \in Servers |-> {} ] + /\ voted_reject = [ i \in Servers |-> {} ] + +InitLogVars == + /\ log = [ i \in Servers |-> << [term |-> 1, data |-> Nil, index |-> 1]>> ] + /\ commitIndex = [ i \in Servers |-> 1 ] +InitInv == inv = <<>> + +InitNodeVars == + /\ leader_id = [ i \in Servers |-> Nil] + /\ check_quorum = [i \in Servers |-> FALSE] \* Used to determine if check_quorum is on + /\ progress = [ i \in Servers |-> [ j \in Servers |-> <>]] + /\ inflight = [ i \in Servers |-> [ j \in Servers |-> 0 ]] + +InitNetVars == + /\ InitFifoNetworkAddNetman(Servers, <<"Init", Cardinality(Servers)>>, + [n_elec |-> 0, n_ae |-> 0, n_hb |-> 0, n_op |-> 0, n_restart |-> 0, + no_inv |-> GetParameterSet("NoInv")]) + + +Init == + /\ InitServerVars + /\ InitLeaderVars + /\ InitCandidateVars + /\ InitLogVars + /\ InitInv + /\ InitNodeVars + /\ InitNetVars + +(*************************************************************************** + Helper functions + ***************************************************************************) +NumServer == Cardinality(Servers) + +Min(x,y) == IF x < y THEN x ELSE y +Max(x,y) == IF x < y THEN y ELSE x + +IsQuorum(ss) == Cardinality(ss) * 2 > Cardinality(Servers) +IsQuorumNum(num) == num * 2 > Cardinality(Servers) + +CheckStateIs(n, s) == raftState[n] = s +CheckStateIsNot(n, s) == raftState[n] /= s + +Update(var, n, value) == [var EXCEPT ![n] = value] +UpdateCurrentTerm(n, term) == currentTerm' = Update(currentTerm, n, term) +UpdateLeaderId(n, id) == leader_id' = Update(leader_id, n, id) +UpdateVotedFor(n, node) == votedFor' = Update(votedFor, n, node) +UpdateState(n, s) == raftState' = Update(raftState, n, s) +UpdateVotedForMe(n, value) == voted_for_me' = Update(voted_for_me, n, value) +AddVotedForMe(me, node) == voted_for_me' = [ voted_for_me EXCEPT ![me] = @ \cup {node} ] +ClearVotedForMe(me) == voted_for_me' = [ voted_for_me EXCEPT ![me] = {} ] +UpdateVotesReject(n, value) == voted_reject' = Update(voted_reject, n, value) +AddVotesReject(me, node) == voted_reject' = [ voted_reject EXCEPT ![me] = @ \cup {node}] +ClearVotesReject(me) == voted_reject' = [ voted_reject EXCEPT ![me] = {} ] +UpdateMatchIdx(me, node, idx) == matchIndex' = [ matchIndex EXCEPT ![me][node] = idx ] +UpdateNextIdx(me, node, idx) == nextIndex' = [ nextIndex EXCEPT ![me][node] = IF idx < 1 THEN 1 ELSE idx ] +UpdateProgress(me, node, state) == progress' = [progress EXCEPT ![me][node] = state ] +UpdateInflight(me, node, num) == inflight' = [inflight EXCEPT ![me][node] = num ] +UpdateCommitIdx(n, idx) == commitIndex' = Update(commitIndex, n, idx) +AllUpdateNextIdx(me, idx) == + LET f == [i \in Servers |-> idx] + IN nextIndex' = [nextIndex EXCEPT ![me] = f] +AllUpdateMatchIdx(me, idx) == + LET f == [i \in Servers |-> idx] + IN matchIndex' = [matchIndex EXCEPT ![me] = f] +AllUpdateProgress(me, prstate) == + LET f == [i \in Servers |-> prstate] + IN progress' = [progress EXCEPT ![me] = f] +AllUpdateInflight(me, num_msg) == + LET f == [i \in Servers |-> num_msg] + IN inflight' = [inflight EXCEPT ![me] = f] + +(***************************************************************************) +(* Log helpers *) +(***************************************************************************) +\* Currently, the log won't be compacted +\* idx = 1, data = Nil +LogAppend(log_, entry) == Append(log_, entry) +LogCount(log_) == Len(log_) +LogGetEntry(log_, idx) == + IF idx > LogCount(log_) \/ idx <= 0 + THEN Nil + ELSE log_[idx] +LogGetEntryOne(log_, idx) == + IF idx > LogCount(log_) \/ idx <= 0 + THEN <<>> + ELSE SubSeq(log_, idx, idx) +LogGetEntriesFrom(log_, idx) == + IF idx > LogCount(log_) \/ idx <= 0 THEN <<>> + ELSE SubSeq(log_, idx, LogCount(log_)) +LogGetEntriesTo(log_, idx) == + IF Len(log_) < idx THEN log_ + ELSE SubSeq(log_, 1, idx) +LogDeleteEntriesFrom(log_, idx) == SubSeq(log_, 1, idx - 1) +LogCurrentIdx(log_) == LogCount(log_) +LogLastTerm(log_) == + LET idx == LogCount(log_) + term == IF idx = 0 THEN 0 ELSE log_[idx].term + IN term +LogLastIdx(log_) == + LET idx == LogCount(log_) + index == IF idx = 0 THEN 0 ELSE log_[idx].index + IN index +LogGetTerm(log_, idx, info) == + IF LogCount(log_) < idx + THEN 0 + ELSE IF idx = 0 THEN 0 ELSE log_[idx].term + +\* log_ is the log of the original node, entries is the logs that need to be added in the AE letter, we need to find a suitable location to overwrite the conflicting logs according to the incoming prevLogIdx, and add the subsequent logs. +\* in maybe_append@raft_log.rs +LogGetMatchEntries(log_, entries, prevLogIdx) == + LET F[i \in 0..Len(entries)] == + IF i = 0 THEN Nil + ELSE LET ety1 == LogGetEntry(log_, prevLogIdx + i) \* Original log Entry at prevLogIdx + i + ety2 == LogGetEntry(entries, i) \* The entries ith one to be added + entries1 == LogGetEntriesTo(log_, prevLogIdx + i - 1) \* log_ from first_index to prevLogIdx + i - 1 + entries2 == LogGetEntriesFrom(entries, i) \* entries from i to Len(entries) + IN IF /\ F[i-1] = Nil + /\ \/ ety1 = Nil \* The original log does not have the ith one, indicating that all subsequent ones need to be added directly. + \/ ety1.term /= ety2.term \* The i-th mismatch of the original log indicates that it needs to be overwritten from the i-th onwards with all newly added + THEN entries1 \o entries2 + ELSE F[i-1] + result == F[Len(entries)] + IN IF result = Nil THEN log_ ELSE result + + +(***************************************************************************) +(* Msg constructors *) +(***************************************************************************) +\* Send the letter to the remaining nodes, constructing the letter according to the rules of the Contrustor2/3 function +_BatchExcludesReqMsgsArg(n, excludes, Constructor2(_, _), Constructor3(_, _, _), arg) == + LET dsts == Servers \ excludes + size == Cardinality(dsts) + F[i \in 0..size] == + IF i = 0 THEN <<<<>>, dsts>> + ELSE LET ms == F[i-1][1] + s == CHOOSE j \in F[i-1][2]: TRUE + m == IF arg = Nil + THEN Constructor2(n, s) + ELSE Constructor3(n, s, arg) + remaining == F[i-1][2] \ {s} + IN <> + IN F[size][1] + +_Dummy2(a, b) == TRUE +_Dummy3(a, b, c) == TRUE + +BatchReqMsgs(n, Constructor(_, _)) == + _BatchExcludesReqMsgsArg(n, {n}, Constructor, _Dummy3, Nil) +BatchReqMsgsArg(n, Constructor(_, _, _), arg) == + _BatchExcludesReqMsgsArg(n, {n}, _Dummy2, Constructor, arg) +ConstructMsg(src, dst, type, body) == + [ src |-> src, dst |-> dst, type |-> type, data |-> body ] + +\* func:new_message(MsgRequestVote)@raft.rs +RequestVote(i, j) == + LET body == [ term |-> currentTerm'[i], + candidate_id |-> i, + index |-> LogCurrentIdx(log[i]), + log_term |-> LogLastTerm(log[i]), + commit |-> commitIndex[i], + commitTerm |-> LogGetTerm(log[i], commitIndex[i], "RequestVote")] + msg_type == M_RV + IN ConstructMsg(i, j, msg_type, body) + +\* func:new_message(MsgRequestVoteResponse)@raft.rs +RequestVoteResponse(m, voted, tempLeaderId) == + LET i == m.dst + j == m.src + req == m.data + \* can_vote corresponding to step()@raft.rs, which define the situation it can vote or not + can_vote == \/ voted = j + \/ /\ voted = Nil + /\ tempLeaderId = Nil + meTerm == currentTerm'[i] + rejectMeTermIsBigger == meTerm > req.term + meLastTerm == LogLastTerm(log[i]) + rejectMeLogNewer == \/ req.log_term < meLastTerm + \/ /\ req.log_term = meLastTerm + /\ req.index < LogCurrentIdx(log[i]) + voteStatus == IF rejectMeTermIsBigger THEN "not-vote: term bigger" ELSE + IF ~can_vote THEN "not-vote: can not vote" ELSE + IF rejectMeLogNewer THEN "not-vote: log newer" ELSE "voted" + granted == voteStatus = "voted" + reject == ~granted + send_commit == IF reject THEN commitIndex[i] ELSE 0 + send_commit_term == IF reject THEN LogGetTerm(log[i], commitIndex[i], "RequestVoteResponse") ELSE 0 + body == [ request_term |-> req.term, + term |-> Max(req.term, meTerm), + reject |-> reject, + commit |-> send_commit, + commitTerm |-> send_commit_term] + IN ConstructMsg(i, j, M_RVR, body) @@ [ status |-> voteStatus ] + +\* func: prepare_send_entries +AppendEntriesNext(i, j, next) == + LET prev_log_idx == next[i][j] - 1 + body == [ term |-> currentTerm[i], + leader_id |-> i, + commit |-> commitIndex'[i], + index |-> prev_log_idx, \* prev_log_idx + log_term |-> IF LogCount(log'[i]) >= prev_log_idx + THEN LogGetTerm(log'[i], prev_log_idx, "AppendEntriesNext") + ELSE 0 , + entries |-> LogGetEntryOne(log'[i], next[i][j]) ] \* The model restricts AppendEntry messages to one entry at a time. + IN ConstructMsg(i, j, M_AE, body) + +\* func: send_heartbeat +HeartBeatNext(i, j, next) == + LET body == [ term |-> currentTerm[i], + commit |-> Min(matchIndex[i][j], commitIndex[i])] + IN ConstructMsg(i, j, M_HB, body) + +HeartBeatResponse(m) == + LET body == [ term |-> currentTerm'[m.dst], + commitIdx |-> commitIndex'[m.dst] ] + IN ConstructMsg(m.dst, m.src, M_HBR, body) + +\* new_message(MsgAppendResponse)@raft.rs +AERFailLogStale(m) == \* func: handle_append_entries + LET body == [ reject |-> FALSE, + term |-> Max(currentTerm[m.dst], m.data.term), + index |-> commitIndex[m.dst], + commit |-> commitIndex[m.dst] ] + IN ConstructMsg(m.dst, m.src, M_AER, body) + +\* new_message(MsgAppendResponse)@raft.rs +AERFailTermMismatch(m, hint_index, hint_term) == + LET body == [ reject |-> TRUE, + term |-> Max(currentTerm[m.dst], m.data.term), + index |-> m.data.index, + reject_hint |-> hint_index, + log_term |-> hint_term, + commit |-> commitIndex[m.dst] ] + IN ConstructMsg(m.dst, m.src, M_AER, body) + +\* new_message(MsgAppendResponse)@raft.rs +AppendEntriesResponseSuccess(m) == + LET data == m.data + body == [ reject |-> FALSE, + term |-> currentTerm'[m.dst], + index |-> data.index + Len(data.entries), + commitIdx |-> commitIndex'[m.dst]] + IN ConstructMsg(m.dst, m.src, M_AER, body) + + +\* At bcast_append the next_index of the node to the target node is updated for each letter.(in prepare_send_entries@raft.rs) +BatchUpdateNextWithMsg(n, new_msgs) == + LET lenMsg == Len(new_msgs) + F[i \in 0..lenMsg] == + IF i = 0 THEN <<{}, Servers, (n :> 1)>> + ELSE LET dst == new_msgs[i].dst + ety == new_msgs[i].data.entries + etyLastIdx == LogLastIdx(ety) + IN IF \/ ety = <<>> \* If the content of the letter is empty, no need to update + \/ progress[n][dst][1] = Probe \* If a node is in the Probe state, sending at this point will block( maybe_send_append().is_paused() @ raft.rs) + THEN < etyLastIdx) >> + ELSE < etyLastIdx)>> + updateServer == F[lenMsg][1] + remainServer == F[lenMsg][2] + updateMap == F[lenMsg][3] + next_keep == [ s \in remainServer |-> nextIndex[n][s] ] + next_update == [ s \in updateServer |-> updateMap[s] ] + IN nextIndex' = [ nextIndex EXCEPT ![n] = next_keep @@ next_update ] + + + +(***************************************************************************) +(* Raft actions *) +(***************************************************************************) + +\* func reset +reset(i) == + /\ ClearVotedForMe(i) + /\ ClearVotesReject(i) + /\ AllUpdateNextIdx(i, LogCount(log[i]) + 1) + /\ AllUpdateMatchIdx(i, 0) + /\ AllUpdateProgress(i, <>) + /\ AllUpdateInflight(i, 0) + +(***************************************************************************) +(* Become candidate *) +(***************************************************************************) + +\* func: become_candidate +BecomeCandidate(i) == + /\ UpdateCurrentTerm(i, currentTerm[i] + 1) + /\ UpdateVotedFor(i, i) + /\ reset(i) + /\ UpdateLeaderId(i, Nil) + /\ UNCHANGED << check_quorum, logVars>> + /\ UpdateState(i, Candidate) + /\ LET ms == BatchReqMsgs(i, RequestVote) + IN NetUpdate2(NetmanIncField("n_elec", NetBatchAddMsg(ms)), <<"BecomeCandidate", i>>) + +(***************************************************************************) +(* Become leader *) +(***************************************************************************) + +\* func: become_leader@raft.rs +BecomeLeader(i, m) == + /\ LET noop == [ term |-> currentTerm[i], data |-> Nil, index |-> LogCount(log[i]) + 1 ] + IN log' = Update(log, i, LogAppend(log[i], noop)) + /\ UpdateState(i, Leader) + /\ UpdateLeaderId(i, i) + /\ ClearVotedForMe(i) + /\ ClearVotesReject(i) + /\ matchIndex' = [ matchIndex EXCEPT ![i] = ( i :> LogCurrentIdx(log'[i]) ) @@ [ j \in Servers |-> 0 ] ] + /\ AllUpdateProgress(i, <>) \* All progress needs to be in probe mode + /\ AllUpdateInflight(i, 0) \* All inflight needs to be 0 (no message send) + /\ LET next == [ nextIndex EXCEPT ![i] = ( i :> matchIndex'[i][i] + 1 ) @@ [ j \in Servers |-> LogCurrentIdx(log[i]) + 1] ] + ms == BatchReqMsgsArg(i, AppendEntriesNext, next) + IN /\ nextIndex' = next + /\ NetUpdate2(NetReplyBatchAddMsg(ms, m), <<"RecvRequestVoteResponse", "Won-BecomeLeader", i>>) \* bcast_send + +(***************************************************************************) +(* Become follower *) +(***************************************************************************) + +SetCurrentTerm(i, term) == + /\ UpdateCurrentTerm(i, term) + /\ UpdateVotedFor(i, Nil) + +_BecomeFollower(i) == + /\ UpdateState(i, Follower) + /\ UpdateLeaderId(i, Nil) + /\ reset(i) + +\* func : become_follower@raft.rs +BecomeFollower(i, term) == + /\ SetCurrentTerm(i, term) + /\ _BecomeFollower(i) + +BecomeFollowerInLost(i, term) == + /\ UNCHANGED <> + /\ UpdateCurrentTerm(i, term) + /\ _BecomeFollower(i) + +BecomeFollowerWithLeader(i, term, leaderId) == + /\ SetCurrentTerm(i, term) + /\ UpdateState(i, Follower) + /\ UpdateLeaderId(i, leaderId) + /\ reset(i) + +(***************************************************************************) +(* Recv requestvote *) +(***************************************************************************) + +\* func: maybe_commit_by_vote@raft.rs +maybe_commit_by_vote(n, commitIdx, commitTerm) == + IF \/ commitIdx = 0 + \/ commitTerm = 0 + \/ raftState'[n] = Leader + THEN UNCHANGED commitIndex + ELSE IF \/ commitIdx <= commitIndex[n] + THEN UNCHANGED commitIndex + ELSE IF /\ commitIdx > commitIndex[n] + /\ commitTerm = LogGetTerm(log[n], commitIdx, "maybe_commit_by_vote") + THEN UpdateCommitIdx(n, commitIdx) + ELSE UNCHANGED commitIndex + +HandleMsgRV(m) == + LET data == m.data + dst == m.dst + src == m.src + demote == currentTerm[dst] < data.term + stale == currentTerm[dst] > data.term + msg == RequestVoteResponse(m, IF demote THEN Nil ELSE votedFor[dst], IF demote THEN Nil ELSE leader_id[dst]) \* Pass in intermediate values based on demote status. + IN IF stale \* stale message drop + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), + <<"RecvRequestVote", "stale message ignore", dst, src, m.seq>>) + ELSE /\ UNCHANGED <> + /\ IF demote \* Received a newerletter and became a follower. + THEN /\ UpdateCurrentTerm(dst, data.term) + /\ UpdateState(dst, Follower) + /\ UpdateLeaderId(dst, Nil) + /\ reset(dst) + ELSE UNCHANGED <> + /\ IF ~msg.data.reject \* Determine whether to vote based on RequestVote letter + THEN /\ UpdateVotedFor(dst, src) + /\ UNCHANGED <> + ELSE /\ IF demote \* If there is a no vote the default is not to change the vote value, but due to the demote state, the node will reset and thus the vote will become nil + THEN UpdateVotedFor(dst, Nil) + ELSE UNCHANGED <> + /\ maybe_commit_by_vote(dst, data.commit, data.commitTerm) \* func: maybe_commit_by_vote @ raft.rs + /\ UNCHANGED <> + /\ NetUpdate2(NetReplyMsg(msg, m), + <<"RecvRequestVote", msg.status, dst, src, m, IF ~msg.data.reject THEN "vote" ELSE "not-vote">>) + + +(***************************************************************************) +(* Recv requestvote response *) +(***************************************************************************) + +\* func : poll@raft.rs +Poll(grant, reject) == + LET grantNum == Cardinality(grant) + 1 \* +1 is voted for myself + rejectNum == Cardinality(reject) + IN IF IsQuorumNum(grantNum) + THEN "Won" + ELSE IF IsQuorumNum(rejectNum) + THEN "Lost" + ELSE "Pending" + + + + +HandleMsgRVR( m) == + LET resp == m.data + src == m.src + dst == m.dst + demote == resp.term > currentTerm[dst] + isCandidate == raftState[dst] = Candidate + stale == resp.term < currentTerm[dst] + IN /\ IF demote \* Received a newerletter and became a follower. + THEN /\ UNCHANGED <> + /\ BecomeFollower(dst, resp.term) + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "term is smaller", dst, src, m>>) + ELSE IF stale \* stale message drop + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "vote is stale", dst, src, m>>) + ELSE IF ~isCandidate \* only candidate process M_RVR + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "not candidate", dst, src, m>>) + ELSE /\ UNCHANGED <> + /\ LET newVotedForMe == IF ~resp.reject + THEN voted_for_me[dst] \cup {src} + ELSE voted_for_me[dst] + newVotedReject == IF ~resp.reject + THEN voted_reject[dst] + ELSE voted_reject[dst]\cup {src} + res == Poll(newVotedForMe, newVotedReject) + IN IF res = "Won" + THEN /\ UNCHANGED << commitIndex>> \* The reason for this is that in becomeLeader we need to broadcast the AE letter globally, and the AE letter carries the latest commitIndex, but we don't update the commitIndex until below in maybe_commit_by_vote, and it has to use the latest commitIndex, so we need to write it here. + /\ BecomeLeader(dst, m) + /\ UNCHANGED << votedFor, currentTerm>> + ELSE /\ UNCHANGED <> + /\ IF res = "Lost" + THEN /\ BecomeFollowerInLost(dst, currentTerm[dst]) + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "Lost", dst, src, m>>) + ELSE /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "Pending", dst, src, m>>) + /\ UpdateVotedForMe(dst, newVotedForMe) + /\ UpdateVotesReject(dst, newVotedReject) + /\ UNCHANGED << serverVars, leader_id, progress, leaderVars>> + /\ maybe_commit_by_vote(dst, resp.commit, resp.commitTerm) + +(***************************************************************************) +(* Send appendentries to all other nodes *) +(***************************************************************************) +SendAppendentriesAll(n) == \* func: bcast_append + /\ UNCHANGED <> + /\ LET ms == BatchReqMsgsArg(n, AppendEntriesNext, nextIndex) + IN /\ BatchUpdateNextWithMsg(n, ms) + /\ NetUpdate2(NetmanIncField("n_ae", NetBatchAddMsg(ms)), <<"SendAppendentriesAll", n>>) + +(***************************************************************************) +(* Send heartbeat(empty log appendentries) to all other nodes *) +(***************************************************************************) +SendHeartBeatAll(n) == \* func: bcast_heart + /\ UNCHANGED <> + /\ LET ms == BatchReqMsgsArg(n, HeartBeatNext, nextIndex) + IN NetUpdate2(NetmanIncField("n_hb", NetBatchAddMsg(ms)), <<"SendHeartBeatAll", n>>) + +(***************************************************************************) +(* Recv appendentries *) +(***************************************************************************) +AcceptLeader(me, leader) == + /\ UpdateState(me, Follower) + /\ UpdateLeaderId(me, leader) + /\ IF raftState[me] = Follower + THEN UNCHANGED <> + ELSE reset(me) + +\* func: find_conflict_by_term +find_conflict_by_term(me, index, term) == + LET hint_index == Min(index, LogCount(log[me])) + F[i \in 0..hint_index ] == + IF hint_index = 0 + THEN <<0, 0>> + ELSE IF i = 0 + THEN << >> + ELSE IF term >= LogGetTerm(log[me] ,i, "find_conflict_by_term") + THEN <> + ELSE F[i-1] + IN F[hint_index] + +\* func: raft_log.maybe_commit() +SetCommitIdx(n, idx) == + /\ Assert(idx <= LogCurrentIdx(log'[n]), <<"SetCommitIdx: idx <= LogCurrentIdx(log'[n])", n, idx, log'>>) + /\ IF idx > commitIndex[n] + THEN UpdateCommitIdx(n, idx) + ELSE UNCHANGED <> + +HandleMsgAE(m) == \* func: handle_append + LET data == m.data + src == m.src + dst == m.dst + demote == currentTerm[dst] < data.term + stale == data.term < currentTerm[dst] + log_stale == data.index < commitIndex[dst] + log_stale_msg == AERFailLogStale(m) + success == AppendEntriesResponseSuccess(m) + IN IF stale \* drop stale message + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentries", "stale message ignore", dst, src, m>>) + ELSE /\ UNCHANGED <> + /\ IF demote \* Received a newerletter and became a follower, but there are related variables that need to be updated later, so only their term values are updated here. + THEN SetCurrentTerm(dst, data.term) + ELSE UNCHANGED <> + /\ AcceptLeader(dst, data.leader_id) \* Update the leader_id and make sure the node state is follower + /\ IF log_stale \* if m.index < self.raft_log.committed @ raft.rs + THEN /\ UNCHANGED <> + /\ NetUpdate2(NetReplyMsg(log_stale_msg, m), <<"RecvAppendentries", "log stale commit", dst, src, m>>) + ELSE LET ety == LogGetEntry(log[dst], data.index) + noPrevLog == ety = Nil + termMatch == \/ /\ noPrevLog + /\ data.log_term = 0 + \/ /\ ~noPrevLog + /\ ety.term = data.log_term + IN IF termMatch \* maybe_append@raft_log.rs + THEN /\ log' = Update(log, dst, LogGetMatchEntries(log[dst], data.entries, data.index)) + /\ IF commitIndex[dst] < data.commit + THEN LET lastLogIdx == Max(LogCurrentIdx(log'[dst]), 1) + idxToCommit == Min(lastLogIdx, data.commit) + IN SetCommitIdx(dst, idxToCommit) + ELSE UNCHANGED commitIndex + /\ NetUpdate2(NetReplyMsg(success, m), <<"RecvAppendentries", "success", dst, src, m>>) + ELSE LET conflict == find_conflict_by_term(dst, data.index, data.log_term) \* find_conflict_by_term @ raft_log.rs + fail == AERFailTermMismatch(m, conflict[1], conflict[2]) + IN /\ UNCHANGED <> + /\ NetUpdate2(NetReplyMsg(fail, m), <<"RecvAppendentries", "term Mismatch", dst, src, m>>) + + +(***************************************************************************) +(* Recv appendentries response *) +(***************************************************************************) +FlushSendAppendentries(me, m, tempNextIdx, tempInflight, info) == + LET F[i \in 0..NumServer] == + IF i = 0 THEN <<{}, Servers>> + ELSE LET n == CHOOSE n \in F[i-1][2]: TRUE + idx == LogCurrentIdx(log'[me]) + IN IF n = me + THEN <> + ELSE IF progress'[me][n][1] = Probe + THEN IF progress'[me][n][2] = TRUE + THEN <> + ELSE <> + ELSE IF tempInflight[me][n] /= 0 + THEN <> + ELSE <> + excludes == F[NumServer][1] + excludes2 == F[NumServer][1] \ {me} + ms == _BatchExcludesReqMsgsArg(me, excludes, _Dummy2, AppendEntriesNext, tempNextIdx) + next_keep == [ s \in excludes2 |-> tempNextIdx[me][s] ] + next_me == IF tempNextIdx[me][me] < LogCount(log'[me]) + 1 + THEN (me :> LogCount(log'[me]) + 1) + ELSE (me :> tempNextIdx[me][me] ) + next_update == [ s \in Servers \ excludes |-> IF tempNextIdx[me][s] <= LogCount(log'[me]) + THEN tempNextIdx[me][s] + 1 + ELSE tempNextIdx[me][s] ] + inflight_keep == [ s \in excludes |-> tempInflight[me][s]] + inflight_update == [ s \in Servers \ excludes |-> IF tempNextIdx[me][s] <= LogCount(log'[me]) + THEN tempNextIdx[me][s] + ELSE 0] + IN /\ nextIndex' = [ nextIndex EXCEPT ![me] = next_keep @@ next_update @@ next_me] + /\ inflight' = [inflight EXCEPT ![me] = inflight_keep @@ inflight_update] + /\ IF m = Nil \* RecvEntry: client request + THEN NetUpdate2(NetmanIncField("n_op", NetBatchAddMsg(ms)), info) + ELSE NetUpdate2(NetReplyBatchAddMsg(ms, m), info) + +\* (maybe_update + maybe_commit) in handle_append_response@raft.rs +AdvanceCommitIdx(me, m, succ_rsp, tempNextIndex, tempInflight) == \* func: raft_update_commitIndex + LET F[i \in 0..NumServer] == + IF i = 0 THEN <<<<>>, Servers>> + ELSE LET n == CHOOSE n \in F[i-1][2]: TRUE + IN <> + sorted_match_idx == SortSeq(F[NumServer][1], LAMBDA x, y: x > y) + commit == sorted_match_idx[NumServer \div 2 + 1] + can_send == tempInflight[me][m.src] = 0 + old_pause == \/ inflight[me][m.src] /= 0 + \/ /\ progress[me][m.src][1] = Probe + /\ progress[me][m.src][2] = TRUE + empty_entries == LogCount(succ_rsp.data.entries) = 0 + IN IF /\ commit > commitIndex[me] + /\ currentTerm[me] = LogGetTerm(log[me], commit, "AdvanceCommitIdx") + THEN /\ SetCommitIdx(me, commit) \* commit change, maybe send_bcast + /\ FlushSendAppendentries(me, m, tempNextIndex, tempInflight, <<"RecvAppendentriesResponse", "commit change", m.dst, m.src, m>>) + ELSE /\ UNCHANGED commitIndex + /\ IF can_send + THEN IF old_pause + THEN /\ NetUpdate2(NetReplyMsg(succ_rsp, m), <<"RecvAppendentriesResponse", "commit still send", m.dst, m.src, m>>) + /\ IF ~empty_entries + THEN UpdateInflight(me, m.src, succ_rsp.data.entries[1].index) + ELSE UpdateInflight(me, m.src, 0) + /\ IF empty_entries + THEN nextIndex' = tempNextIndex + ELSE UpdateNextIdx(me, m.src, succ_rsp.data.entries[1].index + 1) + ELSE IF empty_entries + THEN /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "commit still pause", m.dst, m.src, m>>) + /\ UpdateInflight(me, m.src, 0) + /\ nextIndex' = tempNextIndex + ELSE /\ NetUpdate2(NetReplyMsg(succ_rsp, m), <<"RecvAppendentriesResponse", "commit still send", m.dst, m.src, m>>) + /\ UpdateInflight(me, m.src, succ_rsp.data.entries[1].index) + /\ UpdateNextIdx(me, m.src, succ_rsp.data.entries[1].index + 1) + ELSE /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "commit still pause", m.dst, m.src, m>>) + /\ UNCHANGED inflight + /\ nextIndex' = tempNextIndex + +\* maybe_decr_to @ progress.rs +maybe_decr_to(dst, src, m, next_probe_index) == + LET rejected == m.data.index + match_hint == m.data.reject_hint + IN IF progress[dst][src][1] = Replicate + THEN IF rejected <= matchIndex[dst][src] + THEN /\ UNCHANGED << nextIndex, progress>> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "replicate: stale", dst, src, m>>) + ELSE /\ UpdateNextIdx(dst, src, matchIndex[dst][src] + 1) + /\ LET one_rsp == AppendEntriesNext(dst, src, nextIndex') + IN /\ NetUpdate2(NetReplyMsg(one_rsp, m), <<"RecvAppendentriesResponse", "replicate trun to probe", dst, src, m>>) + /\ IF Len(one_rsp.data.entries) = 0 + THEN UpdateProgress(dst, src, <>) + ELSE UpdateProgress(dst, src, <>) + ELSE /\ IF \/ nextIndex[dst][src] = 0 + \/ nextIndex[dst][src] - 1 /= rejected + THEN /\ UNCHANGED << nextIndex, progress>> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "probe: stale", dst, src, m>>) + ELSE LET new_match == Min(rejected, next_probe_index + 1) + new_next_idx == Max(new_match, 1) + one_rsp == AppendEntriesNext(dst, src, nextIndex') + IN /\ UpdateNextIdx(dst, src, new_next_idx) + /\ NetUpdate2(NetReplyMsg(one_rsp, m), <<"RecvAppendentriesResponse", "probe: update next", dst, src, m>>) + /\ IF Len(one_rsp.data.entries) = 0 + THEN UpdateProgress(dst, src, <>) + ELSE UpdateProgress(dst, src, <>) + +\* func: handle_append +HandleMsgAER(m) == + LET resp == m.data + src == m.src + dst == m.dst + stale == resp.term < currentTerm[dst] + demote == resp.term > currentTerm[dst] + need_optimize == resp.reject /\ resp.log_term > 0 + next_probe_index == find_conflict_by_term(dst, resp.reject_hint, resp.log_term)[1] + failReason == + IF stale THEN "stale message ignore" ELSE + IF resp.term > currentTerm[dst] THEN "term is smaller" ELSE + IF raftState[dst] /= Leader THEN "not leader" ELSE + IF need_optimize THEN "retry" ELSE "success" + IN IF failReason /= "success" + THEN IF failReason = "stale message ignore" \* drop stale message + THEN /\ UNCHANGED <> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "stale message ignore", dst, src, m>>) + ELSE IF failReason = "term is smaller" \* Received a newer letter and became a follower + THEN /\ BecomeFollower(dst, resp.term) + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "term is smaller", dst, src, m>>) + /\ UNCHANGED <> + ELSE IF failReason = "not leader" \* node not leader, drop the message + THEN /\ UNCHANGED <> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "not leader", dst, src, m>>) + ELSE IF failReason = "retry" \* m.reject + THEN /\ UNCHANGED <> + /\ maybe_decr_to(dst, src, m, next_probe_index) + ELSE Assert(FALSE, <<"handle aer Unseen error situation", failReason>>) + ELSE \* success + /\ UNCHANGED <> + /\ LET prboeToReplicate == progress[dst][src][1] = Probe + nextToUpdate == Max(resp.index + 1, nextIndex[dst][src]) \* The simulation here is that a call to maybe_update in handle_append_response may update next_idx, but since it will be changed again in prepare_entries, a temporary variable is needed to retrieve the corresponding entries. + tempNextIndex == [nextIndex EXCEPT ![dst][src] = nextToUpdate] + \* The temp nextIndex is also needed here. + one_rsp == AppendEntriesNext(dst, src, tempNextIndex) + repCanSend == inflight[dst][src] <= resp.index \* The number of the arriving packet is stored in inflight, and in raft.rs, the replicate state will be free_to, so we'll simulate it directly here. + tempInflight == IF prboeToReplicate + THEN [inflight EXCEPT ![dst][src] = 0] + ELSE IF repCanSend + THEN [inflight EXCEPT ![dst][src] = 0] + ELSE inflight + IN /\ IF prboeToReplicate + THEN UpdateProgress(dst, src, <>) + ELSE UNCHANGED progress + /\ IF resp.index > matchIndex[dst][src] + THEN /\ UpdateMatchIdx(dst, src, resp.index) + /\ AdvanceCommitIdx(dst, m, one_rsp, tempNextIndex, tempInflight) \* Here we need to update the progress and nextIndex status according to the content of the letter, corresponding to the handle_append_response of the maybe_update to maybe_commit processing logic + ELSE /\ UNCHANGED << matchIndex, commitIndex, inflight, nextIndex>> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "maybe_update_fail", dst, src, m>>) + +(***************************************************************************) +(* Recv heartBeat *) +(***************************************************************************) + +\* func: handle_heartbeat +HandleMsgHB(m) == + LET data == m.data + src == m.src + dst == m.dst + demote == currentTerm[dst] < data.term + stale == data.term < currentTerm[dst] + rsp == HeartBeatResponse(m) + IN IF stale + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeat", "stale message ignore", dst, src, m>>) + ELSE /\ IF \/ demote + \/ raftState[dst] = Candidate + THEN /\ BecomeFollowerWithLeader(dst, data.term, src) + /\ UNCHANGED <> + ELSE UNCHANGED <> + /\ UNCHANGED <> + /\ SetCommitIdx(dst, data.commit) + /\ NetUpdate2(NetReplyMsg(rsp, m), <<"RecvHeartBeat", "success", dst, src, m>>) + +(***************************************************************************) +(* Recv HeartBeatResponse *) +(***************************************************************************) +\* func: handle_heartbeat_response +HandleMsgHBR(m) == + LET resp == m.data + src == m.src + dst == m.dst + demote == resp.term > currentTerm[dst] + stale == resp.term < currentTerm[dst] + IN IF stale + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeatResponse", "stale message ignore", dst, src, m>>) + ELSE IF demote + THEN /\ UNCHANGED << logVars, check_quorum>> + /\ BecomeFollower(dst, resp.term) + /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeatResponse", "term is smaller", dst, src, m>>) + ELSE /\ UNCHANGED <> + /\ IF matchIndex[dst][src] < LogCount(log[dst]) + THEN LET req_msg == AppendEntriesNext(dst, src, nextIndex) + send_entry == req_msg.data.entries + isReplicate == progress[dst][src][1] = Replicate + inflightToUpdate == IF send_entry /= <<>> + THEN send_entry[1].index + ELSE 0 + nextIndexToUpdate == IF isReplicate + THEN IF send_entry /= <<>> + THEN nextIndex[dst][src] + 1 + ELSE nextIndex[dst][src] + ELSE nextIndex[dst][src] + IN /\ NetUpdate2(NetReplyMsg(req_msg, m), <<"RecvHeartBeatResponse", "send append", dst, src, m>>) + /\ UpdateInflight(dst, src ,inflightToUpdate) + /\ UpdateNextIdx(dst, src, nextIndexToUpdate) + ELSE /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeatResponse", "not send", dst, src, m>>) + /\ UpdateInflight(dst, src ,0) + /\ UNCHANGED nextIndex + + +\* in step_leader: msg_propose +RecvEntry(n, data) == + /\ raftState[n] = Leader + /\ UNCHANGED <> + /\ LET ety == [ term |-> currentTerm[n], data |-> data, index |-> LogCount(log[n]) + 1] + IN log' = Update(log, n, LogAppend(log[n], ety)) + /\ IF matchIndex[n][n] < LogCount(log'[n]) + THEN UpdateMatchIdx(n, n, LogCount(log'[n])) + ELSE UNCHANGED matchIndex + /\ FlushSendAppendentries(n, Nil, nextIndex, inflight, <<"RecvEntry", n, data>>) + + +(*************************************************************************** + restart node + ***************************************************************************) + +\* Server i restarts. Only currentTerm/votedFor/log restored (i.e. unchanged). +\* NOTE: snapshot variables are considered as parts of log +\* NOTE: last applied index should be cleared here if modelled. +Restart(i) == + /\ UNCHANGED <> + /\ raftState' = [raftState EXCEPT ![i] = Follower ] + /\ leader_id' = [ leader_id EXCEPT ![i] = Nil] + /\ voted_for_me' = [ voted_for_me EXCEPT ![i] = {} ] + /\ voted_reject' = [ voted_reject EXCEPT ![i] = {} ] + /\ nextIndex' = [ nextIndex EXCEPT ![i] = [j \in Servers |-> 1 ]] + /\ matchIndex' = [ matchIndex EXCEPT ![i] = [j \in Servers |-> 0 ]] + /\ progress' = [ progress EXCEPT ![i] = [j \in Servers |-> <>]] + /\ inflight' = [ inflight EXCEPT ![i] = [j \in Servers |-> 0 ]] + +(*************************************************************************** + State constraints + ***************************************************************************) + \* Here are some state limits to prevent state explosion due to control tla+ +GetRealLogLen(curLog) == SelectSeq(curLog, LAMBDA i: i.data /= NoOp) +GetMaxLogLen == Len(log[CHOOSE i \in Servers: \A j \in Servers \ {i}: + GetRealLogLen(log[i]) >= GetRealLogLen(log[j])]) +GetMaxTerm == currentTerm[CHOOSE i \in Servers: \A j \in Servers \ {i}: + currentTerm[i] >= currentTerm[j]] + +ScSent == CheckParameterMax(netman.n_sent, "MaxSentMsgs") +ScRecv == CheckParameterMax(netman.n_recv, "MaxRecvMsgs") +ScWire == CheckParameterMax(netman.n_wire, "MaxWireMsgs") +\* ScLog == CheckParameterMax(GetMaxLogLen, "MaxLogLength") +\* ScTerm == CheckParameterMax(GetMaxTerm, "MaxTerm") +ScPart == CheckParameterMax(netman.n_part, "MaxPartitionTimes") +ScCure == CheckParameterMax(netman.n_cure, "MaxCureTimes") +ScOp == CheckParameterMax(netman.n_op, "MaxClientOperationsTimes") +ScAe == CheckParameterMax(netman.n_ae, "MaxAppendEntriesTimes") +ScElec == CheckParameterMax(netman.n_elec, "MaxElectionTimes") +ScDrop == CheckParameterMax(netman.n_drop, "MaxDropTimes") +ScDup == CheckParameterMax(netman.n_dup, "MaxDupTimes") +ScRestart == CheckParameterMax(netman.n_restart, "MaxRestart") +ScUnorder == CheckParameterMax(netman.n_unorder, "MaxUnorderTimes") + +SC == /\ ScSent /\ ScRecv /\ ScWire /\ ScRestart + /\ ScPart /\ ScCure /\ ScOp /\ ScAe /\ ScElec + + +(***************************************************************************) +(* Invariants *) +(***************************************************************************) +ElectionSafety == + LET TwoLeader == + \E i, j \in Servers: + /\ i /= j + /\ currentTerm'[i] = currentTerm'[j] + /\ raftState'[i] = Leader + /\ raftState'[j] = Leader + IN ~TwoLeader + +LeaderAppendOnly == + \A i \in Servers: + IF raftState[i] = Leader /\ raftState'[i] = Leader + THEN LET curLog == log[i] + nextLog == log'[i] + IN IF Len(nextLog) >= Len(curLog) + THEN SubSeq(nextLog, 1, Len(curLog)) = curLog + ELSE FALSE + ELSE TRUE + +LogMatching == + ~UNCHANGED log => \* check the safety only if log has unchanged to avoid unnecessary evaluation cost + \A i, j \in Servers: + IF i /= j + THEN LET iLog == log'[i] + jLog == log'[j] + len == Min(Len(iLog), Len(jLog)) + F[k \in 0..len] == + IF k = 0 THEN <<>> + ELSE LET key1 == <> + value1 == iLog[k].data + key2 == <> + value2 == jLog[k].data + F1 == IF key1 \in DOMAIN F[k-1] + THEN IF F[k-1][key1] = value1 + THEN F[k-1] + ELSE F[k-1] @@ ( <<-1, -1>> :> <> ) + ELSE F[k-1] @@ (key1 :> value1) + F2 == IF key2 \in DOMAIN F1 + THEN IF F1[key2] = value2 + THEN F1 + ELSE F1 @@ ( <<-1, -1>> :> <> ) + ELSE F1 @@ (key2 :> value2) + IN F2 + IN IF << -1, -1>> \notin DOMAIN F[len] THEN TRUE + ELSE Assert(FALSE, <>) + ELSE TRUE + +MonotonicCurrentTerm == \A i \in Servers: currentTerm' [i] >= currentTerm[i] + +MonotonicCommitIdx == \A i \in Servers: commitIndex'[i] >= commitIndex[i] + +MonotonicMatchIdx == + \A i \in Servers: + IF raftState[i] = Leader /\ raftState'[i] = Leader \* change + THEN \A j \in Servers: matchIndex'[i][j] >= matchIndex[i][j] + ELSE TRUE + +CommittedLogDurable == + \A i \in Servers: + LET len == Min(commitIndex'[i], commitIndex[i]) + logNext == SubSeq(log'[i], 1, len) + logCur == SubSeq(log[i], 1, len) + IN IF len = 1 THEN TRUE + ELSE /\ Len(logNext) >= len + /\ Len(logCur) >= len + /\ logNext = logCur + +CommittedLogReplicatedMajority == + \A i \in Servers: + IF raftState'[i] /= Leader \/ commitIndex'[i] <= 1 + THEN TRUE + ELSE LET entries == SubSeq(log'[i], 1, commitIndex'[i]) + len == Len(entries) + nServer == Cardinality(Servers) + F[j \in 0..nServer] == + IF j = 0 + THEN <<{}, {}>> + ELSE LET k == CHOOSE k \in Servers: k \notin F[j-1][1] + logLenOk == LogCount(log'[k]) >= commitIndex'[i] + kEntries == SubSeq(log'[k], 1, commitIndex'[i]) + IN IF /\ logLenOk + /\ entries = kEntries + THEN <> + ELSE <> + IN IsQuorum(F[nServer][2]) + +NextIdxGtMatchIdx == + \A i \in Servers: + IF raftState'[i] = Leader + THEN \A j \in Servers \ {i}: nextIndex'[i][j] > matchIndex'[i][j] + ELSE TRUE + +NextIdxGtZero == + \A i \in Servers: + IF raftState'[i] = Leader + THEN \A j \in Servers: nextIndex'[i][j] > 0 + ELSE TRUE + +SelectSeqWithIdx(s, Test(_,_)) == + LET F[i \in 0..Len(s)] == + IF i = 0 + THEN <<>> + ELSE IF Test(s[i], i) + THEN Append(F[i-1], s[i]) + ELSE F[i-1] + IN F[Len(s)] + +FollowerLogLELeaderLogAfterAE == + LET cmd == netcmd'[1] + cmd1 == cmd[1] + cmd2 == cmd[2] + follower == cmd[3] + leader == cmd[4] + IN IF cmd1 = "RecvAppendentries" /\ cmd2 \in { "success", "term Mismatch" } + THEN IF log[follower] /= log'[follower] + THEN LogCount(log'[follower]) <= LogCount(log'[leader]) + ELSE TRUE + ELSE TRUE + +CommitIdxLELogLen == + \A i \in Servers: commitIndex'[i] <= LogCount(log'[i]) + +LeaderCommitCurrentTermLogs == + \A i \in Servers: + IF raftState'[i] = Leader + THEN IF commitIndex[i] /= commitIndex'[i] + THEN log'[i][commitIndex'[i]].term = currentTerm'[i] + ELSE TRUE + ELSE TRUE + +NewLeaderTermNotInLog == + \A i \in Servers: + IF raftState'[i] = Leader /\ raftState[i] /= Leader + THEN \A j \in Servers \ {i}: + \A n \in DOMAIN log'[j]: + log'[j][n].term /= currentTerm'[i] + ELSE TRUE + +LeaderTermLogHasGreatestIdx == + \A i \in Servers: + IF raftState'[i] = Leader + THEN \A j \in Servers \ {i}: + LET IncTermLogCount(a, b) == IF a.term = currentTerm'[i] THEN b + 1 ELSE b + IN FoldSeq(IncTermLogCount, 0, log'[i]) >= FoldSeq(IncTermLogCount, 0, log'[j]) + ELSE TRUE + +CheckLeader == + \A i \in Servers: + raftState[i] /= Leader + +InvSequence == << + ElectionSafety, + LeaderAppendOnly, + LogMatching, + MonotonicCurrentTerm, + MonotonicCommitIdx, + MonotonicMatchIdx, + CommittedLogDurable, + CommittedLogReplicatedMajority, + NextIdxGtMatchIdx, + NextIdxGtZero, + \* CheckLeader + FollowerLogLELeaderLogAfterAE, + CommitIdxLELogLen, + LeaderCommitCurrentTermLogs, + NewLeaderTermNotInLog, + LeaderTermLogHasGreatestIdx + +>> + +INV == Len(SelectSeqWithIdx(inv, LAMBDA x, y: ~x /\ y \notin netman.no_inv)) = 0 + + + + (*************************************************************************** + Next actions + ***************************************************************************) + +DoElectionTimeout == + /\ PrePrune(netman.n_elec, "MaxElectionTimes") + /\ \E n \in Servers: CheckStateIs(n, Follower) /\ BecomeCandidate(n) + /\ inv' = InvSequence + + +DoHeartBeat == + /\ PrePrune(netman.n_hb, "MaxHeartBeatTimes") + /\ \E n \in Servers: + /\ raftState[n] = Leader + /\ SendHeartBeatAll(n) + /\ inv' = InvSequence + + +_DoRecvM(type, func(_)) == + /\ \E src, dst \in Servers: + /\ src /= dst + /\ LET m == NetGetMsg(src, dst) + IN /\ m /= Nil + /\ m.type = type + /\ func(m) + /\ inv' = InvSequence + + +DoHandleMsgRV == /\ _DoRecvM(M_RV, HandleMsgRV) + +DoHandleMsgRVR == /\ _DoRecvM(M_RVR, HandleMsgRVR) + +DoHandleMsgAE == /\ _DoRecvM(M_AE, HandleMsgAE) + +DoHandleMsgAER == /\ _DoRecvM(M_AER, HandleMsgAER) + +DoHandleMsgHB == /\ _DoRecvM(M_HB, HandleMsgHB) + +DoHandleMsgHBR == /\ _DoRecvM(M_HBR, HandleMsgHBR) + +DoRecvEntry == + /\ PrePrune(netman.n_op, "MaxClientOperationsTimes") + /\ \E n \in Servers, v \in Commands: RecvEntry(n, v) + /\ inv' = InvSequence + +\* DoNetworkDrop == +\* /\ PrePrune(NetGetDrop, "MaxDropTimes") +\* /\ \E m \in msgs: +\* /\ NetUpdate2(NetDropMsg(m), <<"DoNetworkDrop", m.dst, m.src, m.seq>>) +\* /\ UNCHANGED noNetVars +\* /\ inv' = InvSequence + +\* DoNetworkDup == +\* /\ PrePrune(NetGetDup, "MaxDupTimes") +\* /\ \E m \in msgs: +\* /\ NetUpdate2(NetDupMsg(m), <<"DoNetworkDup", m.dst, m.src, m.seq>>) +\* /\ UNCHANGED noNetVars +\* /\ inv' = InvSequence + +DoNetworkPartition == + /\ PrePrune(netman.n_part, "MaxPartitionTimes") + /\ \E n \in Servers: + /\ NetUpdate2(NetPartConn({n}), <<"DoNetworkPartition", n>>) + /\ UNCHANGED noNetVars + /\ inv' = InvSequence + +DoNetworkCure == + /\ PrePrune(netman.n_cure, "MaxCureTimes") + /\ NetIsParted + /\ NetUpdate2(NetCureConn, <<"DoNetworkCure">>) + /\ UNCHANGED noNetVars + /\ inv' = InvSequence + +DoRestart == + /\ PrePrune(netman.n_restart, "MaxRestart") + /\ \E n \in Servers: + /\ Restart(n) + /\ NetUpdate2(NetmanIncField("n_restart", NetNoAction1), <<"Dorestart", n>>) + /\ inv' = InvSequence + +Next == + \/ DoRestart + \/ DoElectionTimeout + \/ DoHeartBeat + \/ DoHandleMsgRV + \/ DoHandleMsgRVR + \/ DoHandleMsgHB + \/ DoHandleMsgHBR + \/ DoHandleMsgAE + \/ DoHandleMsgAER + \/ DoRecvEntry + \* \/ DoNetworkDrop + \* \/ DoNetworkDup + \/ DoNetworkPartition + \/ DoNetworkCure + + + +Spec == Init /\ [][Next]_vars +==== \ No newline at end of file diff --git a/tla/consensus/RaftRsUdp.tla b/tla/consensus/RaftRsUdp.tla new file mode 100644 index 00000000..ff1a4c8b --- /dev/null +++ b/tla/consensus/RaftRsUdp.tla @@ -0,0 +1,1185 @@ +---------------------------- MODULE RaftRsUdp ----------------------------- +(***************************************************************************) +(* This is the TLA+ specification for Raft-RS in TiKV with version 0.7.0 *) +(* *) +(* - Leader election: *) +(* - Log replication: *) +(* *) +(* Currently, the specification assumes: *) +(* - No snapshots *) +(* - No read-only requests *) +(* - No non-voting nodes *) +(* - No disk failures *) +(* - No membership change *) +(***************************************************************************) + +EXTENDS Sequences, Naturals, Integers, FiniteSets, TLC, SequencesExt + +(***************************************************************************) +(* Constants definitions *) +(***************************************************************************) +\* The set of servers +CONSTANT Servers +\* Server states, Corresponding to raft-rs StateRole +CONSTANTS Follower, Candidate, Leader +\* Raft message types +CONSTANTS M_RV, M_RVR, M_AE, M_AER, M_PRV, M_PRVR, M_HB, M_HBR +\* The set of commands +CONSTANTS Commands +\* The abstraction of null operation +CONSTANTS NoOp +\* Misc: state constraint parameters and placeholder +CONSTANTS Nil +\* The set of ProgressState +CONSTANTS Probe, Replicate + +(*************************************************************************** + Variables definitions + ***************************************************************************) +\* Persistent state on all servers +VARIABLES currentTerm, \* Latest term server has seen (initialized to 0 on first boot, increases monotonically) , Corresponding to raft-rs RaftCore.term + votedFor, \* CandidateId that received vote in current term (or null if none), Corresponding to raft-rs RaftCore.vote + log \* Log entries; each entry contains command for state machine, and term when entry was received by leader, Corresponding to raft-rs RaftCore.raft_log + + +\* Volatile state on all servers +VARIABLES raftState, \* State of servers, in {Follower, Candidate, Leader} , Corresponding to raft-rs RaftCore.state + commitIndex, \* Index of highest log entry known to be committed + leader_id \* The potential leader of the cluster, Corresponding to raft-rs RaftCore.leader_id + + +\* Volatile state on leader +VARIABLES nextIndex, \* for each server, index of the next log entry to send to that server, Corresponding to raft-rs Progress.next_idx + matchIndex \* for each server, index of highest log entry known to be replicated on server, Corresponding to raft-rs Progress.matched + +\* intermediate variable +VARIABLES voted_for_me \* Record nodes that have voted for me, Corresponding to raft-rs Progress.voted +VARIABLES voted_reject \* Record nodes that have not voted for me, Corresponding to raft-rs Progress.voted +VARIABLES check_quorum \* check_quorum variables +VARIABLE progress \* The status of each follower's receive log, which is used in receiving append, which contains probe and replicate. Corresponding to raft-rs Progress.state +VARIABLE inflight \* Number of letters transmitted during the recording process. Corresponding to raft-rs Progress.int (Inflights) + + + +(*************************************************************************** + Network variables and instance + ***************************************************************************) +\* The network is modelled through these three variables +VARIABLES netman, + netcmd, + msgs +INSTANCE UdpNetwork WITH NULL_MSG <- Nil, + _msgs <- msgs, _netman <- netman, _netcmd <- netcmd + + +(***************************************************************************) +(* Self manipulated invariants checking *) +(***************************************************************************) +VARIABLES inv \* Invariants that guarantee correctness + +(***************************************************************************) +(* Vars groups *) +(***************************************************************************) +serverVars == <> +leaderVars == <> +candidateVars == <> +logVars == <> +nodeVars == <> +netVars == <> +noNetVars == <> +vars == <> + + +(***************************************************************************) +(* State constraints helper *) +(***************************************************************************) +CONSTANTS Parameters \* to control the model scale + +GetParameterSet(p) == IF p \in DOMAIN Parameters THEN Parameters[p] ELSE {} + +CheckParameterHelper(n, p, Test(_,_)) == + IF p \in DOMAIN Parameters + THEN Test(n, Parameters[p]) + ELSE TRUE +CheckParameterMax(n, p) == CheckParameterHelper(n, p, LAMBDA i, j: i <= j) + +PrePrune(n, p) == CheckParameterHelper(n, p, LAMBDA i, j: i < j) + + +(***************************************************************************) +(* Type Ok. Used as a check on writing format *) +(***************************************************************************) + +TypeOkServerVars == + /\ currentTerm \in [ Servers -> Nat ] + /\ votedFor \in [ Servers -> Servers \cup {Nil} ] + /\ raftState \in [ Servers -> { Follower, Candidate, Leader } ] + +TypeOkLeaderVars == + /\ nextIndex \in [ Servers -> [ Servers -> Nat \ {0} ]] + /\ matchIndex \in [ Servers -> [ Servers -> Nat ]] + +\* TypeOkCandidateVars == +\* /\ votesGranted \in [ Servers -> {} ] + +TypeOkLogVars == + \* log data structure is complex, we skip checking it + /\ commitIndex \in [ Servers -> Nat ] + +TypeOk == + /\ TypeOkServerVars + /\ TypeOkLeaderVars + /\ TypeOkLogVars + + +(*************************************************************************** + Init variables + ***************************************************************************) +InitServerVars == + /\ currentTerm = [ i \in Servers |-> 1 ] + /\ votedFor = [ i \in Servers |-> Nil ] + /\ raftState = [ i \in Servers |-> Follower ] + +InitLeaderVars == + /\ nextIndex = [ i \in Servers |-> [ j \in Servers |-> 1 ]] + /\ matchIndex = [ i \in Servers |-> [ j \in Servers |-> 0 ]] + +InitCandidateVars == + /\ voted_for_me = [ i \in Servers |-> {} ] + /\ voted_reject = [ i \in Servers |-> {} ] + +InitLogVars == + /\ log = [ i \in Servers |-> << [term |-> 1, data |-> Nil, index |-> 1]>> ] + /\ commitIndex = [ i \in Servers |-> 1 ] +InitInv == inv = <<>> + +InitNodeVars == + /\ leader_id = [ i \in Servers |-> Nil] + /\ check_quorum = [i \in Servers |-> FALSE] \* Used to determine if check_quorum is on + /\ progress = [ i \in Servers |-> [ j \in Servers |-> <>]] + /\ inflight = [ i \in Servers |-> [ j \in Servers |-> 0 ]] + +InitNetVars == + /\ InitUdpNetworkNetman(Servers, <<"Init", Cardinality(Servers)>>, + [n_elec |-> 0, n_ae |-> 0, n_hb |-> 0, n_op |-> 0, no_inv |-> GetParameterSet("NoInv")]) + +Init == + /\ InitServerVars + /\ InitLeaderVars + /\ InitCandidateVars + /\ InitLogVars + /\ InitInv + /\ InitNodeVars + /\ InitNetVars + +(*************************************************************************** + Helper functions + ***************************************************************************) +NumServer == Cardinality(Servers) + +Min(x,y) == IF x < y THEN x ELSE y +Max(x,y) == IF x < y THEN y ELSE x + +IsQuorum(ss) == Cardinality(ss) * 2 > Cardinality(Servers) +IsQuorumNum(num) == num * 2 > Cardinality(Servers) + +CheckStateIs(n, s) == raftState[n] = s +CheckStateIsNot(n, s) == raftState[n] /= s + +Update(var, n, value) == [var EXCEPT ![n] = value] +UpdateCurrentTerm(n, term) == currentTerm' = Update(currentTerm, n, term) +UpdateLeaderId(n, id) == leader_id' = Update(leader_id, n, id) +UpdateVotedFor(n, node) == votedFor' = Update(votedFor, n, node) +UpdateState(n, s) == raftState' = Update(raftState, n, s) +UpdateVotedForMe(n, value) == voted_for_me' = Update(voted_for_me, n, value) +AddVotedForMe(me, node) == voted_for_me' = [ voted_for_me EXCEPT ![me] = @ \cup {node} ] +ClearVotedForMe(me) == voted_for_me' = [ voted_for_me EXCEPT ![me] = {} ] +UpdateVotesReject(n, value) == voted_reject' = Update(voted_reject, n, value) +AddVotesReject(me, node) == voted_reject' = [ voted_reject EXCEPT ![me] = @ \cup {node}] +ClearVotesReject(me) == voted_reject' = [ voted_reject EXCEPT ![me] = {} ] +UpdateMatchIdx(me, node, idx) == matchIndex' = [ matchIndex EXCEPT ![me][node] = idx ] +UpdateNextIdx(me, node, idx) == nextIndex' = [ nextIndex EXCEPT ![me][node] = IF idx < 1 THEN 1 ELSE idx ] +UpdateProgress(me, node, state) == progress' = [progress EXCEPT ![me][node] = state ] +UpdateInflight(me, node, num) == inflight' = [inflight EXCEPT ![me][node] = num ] +UpdateCommitIdx(n, idx) == commitIndex' = Update(commitIndex, n, idx) +AllUpdateNextIdx(me, idx) == + LET f == [i \in Servers |-> idx] + IN nextIndex' = [nextIndex EXCEPT ![me] = f] +AllUpdateMatchIdx(me, idx) == + LET f == [i \in Servers |-> idx] + IN matchIndex' = [matchIndex EXCEPT ![me] = f] +AllUpdateProgress(me, prstate) == + LET f == [i \in Servers |-> prstate] + IN progress' = [progress EXCEPT ![me] = f] +AllUpdateInflight(me, num_msg) == + LET f == [i \in Servers |-> num_msg] + IN inflight' = [inflight EXCEPT ![me] = f] + +(***************************************************************************) +(* Log helpers *) +(***************************************************************************) +\* Currently, the log won't be compacted +\* idx = 1, data = Nil +LogAppend(log_, entry) == Append(log_, entry) +LogCount(log_) == Len(log_) +LogGetEntry(log_, idx) == + IF idx > LogCount(log_) \/ idx <= 0 + THEN Nil + ELSE log_[idx] +LogGetEntryOne(log_, idx) == + IF idx > LogCount(log_) \/ idx <= 0 + THEN <<>> + ELSE SubSeq(log_, idx, idx) +LogGetEntriesFrom(log_, idx) == + IF idx > LogCount(log_) \/ idx <= 0 THEN <<>> + ELSE SubSeq(log_, idx, LogCount(log_)) +LogGetEntriesTo(log_, idx) == + IF Len(log_) < idx THEN log_ + ELSE SubSeq(log_, 1, idx) +LogDeleteEntriesFrom(log_, idx) == SubSeq(log_, 1, idx - 1) +LogCurrentIdx(log_) == LogCount(log_) +LogLastTerm(log_) == + LET idx == LogCount(log_) + term == IF idx = 0 THEN 0 ELSE log_[idx].term + IN term +LogLastIdx(log_) == + LET idx == LogCount(log_) + index == IF idx = 0 THEN 0 ELSE log_[idx].index + IN index +LogGetTerm(log_, idx, info) == + IF LogCount(log_) < idx + THEN 0 + ELSE IF idx = 0 THEN 0 ELSE log_[idx].term + +\* log_ is the log of the original node, entries is the logs that need to be added in the AE letter, we need to find a suitable location to overwrite the conflicting logs according to the incoming prevLogIdx, and add the subsequent logs. +\* in maybe_append@raft_log.rs +LogGetMatchEntries(log_, entries, prevLogIdx) == + LET F[i \in 0..Len(entries)] == + IF i = 0 THEN Nil + ELSE LET ety1 == LogGetEntry(log_, prevLogIdx + i) \* Original log Entry at prevLogIdx + i + ety2 == LogGetEntry(entries, i) \* The entries ith one to be added + entries1 == LogGetEntriesTo(log_, prevLogIdx + i - 1) \* log_ from first_index to prevLogIdx + i - 1 + entries2 == LogGetEntriesFrom(entries, i) \* entries from i to Len(entries) + IN IF /\ F[i-1] = Nil + /\ \/ ety1 = Nil \* The original log does not have the ith one, indicating that all subsequent ones need to be added directly. + \/ ety1.term /= ety2.term \* The i-th mismatch of the original log indicates that it needs to be overwritten from the i-th onwards with all newly added + THEN entries1 \o entries2 + ELSE F[i-1] + result == F[Len(entries)] + IN IF result = Nil THEN log_ ELSE result + + +(***************************************************************************) +(* Msg constructors *) +(***************************************************************************) +\* Send the letter to the remaining nodes, constructing the letter according to the rules of the Contrustor2/3 function +_BatchExcludesReqMsgsArg(n, excludes, Constructor2(_, _), Constructor3(_, _, _), arg) == + LET dsts == Servers \ excludes + size == Cardinality(dsts) + F[i \in 0..size] == + IF i = 0 THEN <<<<>>, dsts>> + ELSE LET ms == F[i-1][1] + s == CHOOSE j \in F[i-1][2]: TRUE + m == IF arg = Nil + THEN Constructor2(n, s) + ELSE Constructor3(n, s, arg) + remaining == F[i-1][2] \ {s} + IN <> + IN F[size][1] + +_Dummy2(a, b) == TRUE +_Dummy3(a, b, c) == TRUE + +BatchReqMsgs(n, Constructor(_, _)) == + _BatchExcludesReqMsgsArg(n, {n}, Constructor, _Dummy3, Nil) +BatchReqMsgsArg(n, Constructor(_, _, _), arg) == + _BatchExcludesReqMsgsArg(n, {n}, _Dummy2, Constructor, arg) +ConstructMsg(src, dst, type, body) == + [ src |-> src, dst |-> dst, type |-> type, data |-> body ] + +\* func:new_message(MsgRequestVote)@raft.rs +RequestVote(i, j) == + LET body == [ term |-> currentTerm'[i], + candidate_id |-> i, + index |-> LogCurrentIdx(log[i]), + log_term |-> LogLastTerm(log[i]), + commit |-> commitIndex[i], + commitTerm |-> LogGetTerm(log[i], commitIndex[i], "RequestVote")] + msg_type == M_RV + IN ConstructMsg(i, j, msg_type, body) + +\* func:new_message(MsgRequestVoteResponse)@raft.rs +RequestVoteResponse(m, voted, tempLeaderId) == + LET i == m.dst + j == m.src + req == m.data + \* can_vote corresponding to step()@raft.rs, which define the situation it can vote or not + can_vote == \/ voted = j + \/ /\ voted = Nil + /\ tempLeaderId = Nil + meTerm == currentTerm'[i] + rejectMeTermIsBigger == meTerm > req.term + meLastTerm == LogLastTerm(log[i]) + rejectMeLogNewer == \/ req.log_term < meLastTerm + \/ /\ req.log_term = meLastTerm + /\ req.index < LogCurrentIdx(log[i]) + voteStatus == IF rejectMeTermIsBigger THEN "not-vote: term bigger" ELSE + IF ~can_vote THEN "not-vote: can not vote" ELSE + IF rejectMeLogNewer THEN "not-vote: log newer" ELSE "voted" + granted == voteStatus = "voted" + reject == ~granted + send_commit == IF reject THEN commitIndex[i] ELSE 0 + send_commit_term == IF reject THEN LogGetTerm(log[i], commitIndex[i], "RequestVoteResponse") ELSE 0 + body == [ request_term |-> req.term, + term |-> Max(req.term, meTerm), + reject |-> reject, + commit |-> send_commit, + commitTerm |-> send_commit_term] + IN ConstructMsg(i, j, M_RVR, body) @@ [ status |-> voteStatus ] + +\* func: prepare_send_entries +AppendEntriesNext(i, j, next) == + LET prev_log_idx == next[i][j] - 1 + body == [ term |-> currentTerm[i], + leader_id |-> i, + commit |-> commitIndex'[i], + index |-> prev_log_idx, \* prev_log_idx + log_term |-> IF LogCount(log'[i]) >= prev_log_idx + THEN LogGetTerm(log'[i], prev_log_idx, "AppendEntriesNext") + ELSE 0 , + entries |-> LogGetEntryOne(log'[i], next[i][j]) ] \* The model restricts AppendEntry messages to one entry at a time. + IN ConstructMsg(i, j, M_AE, body) + +\* func: send_heartbeat +HeartBeatNext(i, j, next) == + LET body == [ term |-> currentTerm[i], + commit |-> Min(matchIndex[i][j], commitIndex[i])] + IN ConstructMsg(i, j, M_HB, body) + +HeartBeatResponse(m) == + LET body == [ term |-> currentTerm'[m.dst], + commitIdx |-> commitIndex'[m.dst] ] + IN ConstructMsg(m.dst, m.src, M_HBR, body) + +\* new_message(MsgAppendResponse)@raft.rs +AERFailLogStale(m) == \* func: handle_append_entries + LET body == [ reject |-> FALSE, + term |-> Max(currentTerm[m.dst], m.data.term), + index |-> commitIndex[m.dst], + commit |-> commitIndex[m.dst] ] + IN ConstructMsg(m.dst, m.src, M_AER, body) + +\* new_message(MsgAppendResponse)@raft.rs +AERFailTermMismatch(m, hint_index, hint_term) == + LET body == [ reject |-> TRUE, + term |-> Max(currentTerm[m.dst], m.data.term), + index |-> m.data.index, + reject_hint |-> hint_index, + log_term |-> hint_term, + commit |-> commitIndex[m.dst] ] + IN ConstructMsg(m.dst, m.src, M_AER, body) + +\* new_message(MsgAppendResponse)@raft.rs +AppendEntriesResponseSuccess(m) == + LET data == m.data + body == [ reject |-> FALSE, + term |-> currentTerm'[m.dst], + index |-> data.index + Len(data.entries), + commitIdx |-> commitIndex'[m.dst]] + IN ConstructMsg(m.dst, m.src, M_AER, body) + + +\* At bcast_append the next_index of the node to the target node is updated for each letter.(in prepare_send_entries@raft.rs) +BatchUpdateNextWithMsg(n, new_msgs) == + LET lenMsg == Len(new_msgs) + F[i \in 0..lenMsg] == + IF i = 0 THEN <<{}, Servers, (n :> 1)>> + ELSE LET dst == new_msgs[i].dst + ety == new_msgs[i].data.entries + etyLastIdx == LogLastIdx(ety) + IN IF \/ ety = <<>> \* If the content of the letter is empty, no need to update + \/ progress[n][dst][1] = Probe \* If a node is in the Probe state, sending at this point will block( maybe_send_append().is_paused() @ raft.rs) + THEN < etyLastIdx) >> + ELSE < etyLastIdx)>> + updateServer == F[lenMsg][1] + remainServer == F[lenMsg][2] + updateMap == F[lenMsg][3] + next_keep == [ s \in remainServer |-> nextIndex[n][s] ] + next_update == [ s \in updateServer |-> updateMap[s] ] + IN nextIndex' = [ nextIndex EXCEPT ![n] = next_keep @@ next_update ] + + + +(***************************************************************************) +(* Raft actions *) +(***************************************************************************) + +\* func reset +reset(i) == + /\ ClearVotedForMe(i) + /\ ClearVotesReject(i) + /\ AllUpdateNextIdx(i, LogCount(log[i]) + 1) + /\ AllUpdateMatchIdx(i, 0) + /\ AllUpdateProgress(i, <>) + /\ AllUpdateInflight(i, 0) + +(***************************************************************************) +(* Become candidate *) +(***************************************************************************) + +\* func: become_candidate +BecomeCandidate(i) == + /\ UpdateCurrentTerm(i, currentTerm[i] + 1) + /\ UpdateVotedFor(i, i) + /\ reset(i) + /\ UpdateLeaderId(i, Nil) + /\ UNCHANGED << check_quorum, logVars>> + /\ UpdateState(i, Candidate) + /\ LET ms == BatchReqMsgs(i, RequestVote) + IN NetUpdate2(NetmanIncField("n_elec", NetBatchAddMsg(ms)), <<"BecomeCandidate", i>>) + +(***************************************************************************) +(* Become leader *) +(***************************************************************************) + +\* func: become_leader@raft.rs +BecomeLeader(i, m) == + /\ LET noop == [ term |-> currentTerm[i], data |-> Nil, index |-> LogCount(log[i]) + 1 ] + IN log' = Update(log, i, LogAppend(log[i], noop)) + /\ UpdateState(i, Leader) + /\ UpdateLeaderId(i, i) + /\ ClearVotedForMe(i) + /\ ClearVotesReject(i) + /\ matchIndex' = [ matchIndex EXCEPT ![i] = ( i :> LogCurrentIdx(log'[i]) ) @@ [ j \in Servers |-> 0 ] ] + /\ AllUpdateProgress(i, <>) \* All progress needs to be in probe mode + /\ AllUpdateInflight(i, 0) \* All inflight needs to be 0 (no message send) + /\ LET next == [ nextIndex EXCEPT ![i] = ( i :> matchIndex'[i][i] + 1 ) @@ [ j \in Servers |-> LogCurrentIdx(log[i]) + 1] ] + ms == BatchReqMsgsArg(i, AppendEntriesNext, next) + IN /\ nextIndex' = next + /\ NetUpdate2(NetReplyBatchAddMsg(ms, m), <<"RecvRequestVoteResponse", "Won-BecomeLeader", i>>) \* bcast_send + +(***************************************************************************) +(* Become follower *) +(***************************************************************************) + +SetCurrentTerm(i, term) == + /\ UpdateCurrentTerm(i, term) + /\ UpdateVotedFor(i, Nil) + +_BecomeFollower(i) == + /\ UpdateState(i, Follower) + /\ UpdateLeaderId(i, Nil) + /\ reset(i) + +\* func : become_follower@raft.rs +BecomeFollower(i, term) == + /\ SetCurrentTerm(i, term) + /\ _BecomeFollower(i) + +BecomeFollowerInLost(i, term) == + /\ UNCHANGED <> + /\ UpdateCurrentTerm(i, term) + /\ _BecomeFollower(i) + +BecomeFollowerWithLeader(i, term, leaderId) == + /\ SetCurrentTerm(i, term) + /\ UpdateState(i, Follower) + /\ UpdateLeaderId(i, leaderId) + /\ reset(i) + +(***************************************************************************) +(* Recv requestvote *) +(***************************************************************************) + +\* func: maybe_commit_by_vote@raft.rs +maybe_commit_by_vote(n, commitIdx, commitTerm) == + IF \/ commitIdx = 0 + \/ commitTerm = 0 + \/ raftState'[n] = Leader + THEN UNCHANGED commitIndex + ELSE IF \/ commitIdx <= commitIndex[n] + THEN UNCHANGED commitIndex + ELSE IF /\ commitIdx > commitIndex[n] + /\ commitTerm = LogGetTerm(log[n], commitIdx, "maybe_commit_by_vote") + THEN UpdateCommitIdx(n, commitIdx) + ELSE UNCHANGED commitIndex + +HandleMsgRV(m) == + LET data == m.data + dst == m.dst + src == m.src + demote == currentTerm[dst] < data.term + stale == currentTerm[dst] > data.term + msg == RequestVoteResponse(m, IF demote THEN Nil ELSE votedFor[dst], IF demote THEN Nil ELSE leader_id[dst]) \* Pass in intermediate values based on demote status. + IN IF stale \* stale message drop + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), + <<"RecvRequestVote", "stale message ignore", dst, src, m.seq>>) + ELSE /\ UNCHANGED <> + /\ IF demote \* Received a newerletter and became a follower. + THEN /\ UpdateCurrentTerm(dst, data.term) + /\ UpdateState(dst, Follower) + /\ UpdateLeaderId(dst, Nil) + /\ reset(dst) + ELSE UNCHANGED <> + /\ IF ~msg.data.reject \* Determine whether to vote based on RequestVote letter + THEN /\ UpdateVotedFor(dst, src) + /\ UNCHANGED <> + ELSE /\ IF demote \* If there is a no vote the default is not to change the vote value, but due to the demote state, the node will reset and thus the vote will become nil + THEN UpdateVotedFor(dst, Nil) + ELSE UNCHANGED <> + /\ maybe_commit_by_vote(dst, data.commit, data.commitTerm) \* func: maybe_commit_by_vote @ raft.rs + /\ UNCHANGED <> + /\ NetUpdate2(NetReplyMsg(msg, m), + <<"RecvRequestVote", msg.status, dst, src, m, IF ~msg.data.reject THEN "vote" ELSE "not-vote">>) + + +(***************************************************************************) +(* Recv requestvote response *) +(***************************************************************************) + +\* func : poll@raft.rs +Poll(grant, reject) == + LET grantNum == Cardinality(grant) + 1 \* +1 is voted for myself + rejectNum == Cardinality(reject) + IN IF IsQuorumNum(grantNum) + THEN "Won" + ELSE IF IsQuorumNum(rejectNum) + THEN "Lost" + ELSE "Pending" + + + + +HandleMsgRVR( m) == + LET resp == m.data + src == m.src + dst == m.dst + demote == resp.term > currentTerm[dst] + isCandidate == raftState[dst] = Candidate + stale == resp.term < currentTerm[dst] + IN /\ IF demote \* Received a newerletter and became a follower. + THEN /\ UNCHANGED <> + /\ BecomeFollower(dst, resp.term) + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "term is smaller", dst, src, m>>) + ELSE IF stale \* stale message drop + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "vote is stale", dst, src, m>>) + ELSE IF ~isCandidate \* only candidate process M_RVR + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "not candidate", dst, src, m>>) + ELSE /\ UNCHANGED <> + /\ LET newVotedForMe == IF ~resp.reject + THEN voted_for_me[dst] \cup {src} + ELSE voted_for_me[dst] + newVotedReject == IF ~resp.reject + THEN voted_reject[dst] + ELSE voted_reject[dst]\cup {src} + res == Poll(newVotedForMe, newVotedReject) + IN IF res = "Won" + THEN /\ UNCHANGED << commitIndex>> \* The reason for this is that in becomeLeader we need to broadcast the AE letter globally, and the AE letter carries the latest commitIndex, but we don't update the commitIndex until below in maybe_commit_by_vote, and it has to use the latest commitIndex, so we need to write it here. + /\ BecomeLeader(dst, m) + /\ UNCHANGED << votedFor, currentTerm>> + ELSE /\ UNCHANGED <> + /\ IF res = "Lost" + THEN /\ BecomeFollowerInLost(dst, currentTerm[dst]) + /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "Lost", dst, src, m>>) + ELSE /\ NetUpdate2(NetDelMsg(m), <<"RecvRequestVoteResponse", "Pending", dst, src, m>>) + /\ UpdateVotedForMe(dst, newVotedForMe) + /\ UpdateVotesReject(dst, newVotedReject) + /\ UNCHANGED << serverVars, leader_id, progress, leaderVars>> + /\ maybe_commit_by_vote(dst, resp.commit, resp.commitTerm) + +(***************************************************************************) +(* Send appendentries to all other nodes *) +(***************************************************************************) +SendAppendentriesAll(n) == \* func: bcast_append + /\ UNCHANGED <> + /\ LET ms == BatchReqMsgsArg(n, AppendEntriesNext, nextIndex) + IN /\ BatchUpdateNextWithMsg(n, ms) + /\ NetUpdate2(NetmanIncField("n_ae", NetBatchAddMsg(ms)), <<"SendAppendentriesAll", n>>) + +(***************************************************************************) +(* Send heartbeat(empty log appendentries) to all other nodes *) +(***************************************************************************) +SendHeartBeatAll(n) == \* func: bcast_heart + /\ UNCHANGED <> + /\ LET ms == BatchReqMsgsArg(n, HeartBeatNext, nextIndex) + IN NetUpdate2(NetmanIncField("n_hb", NetBatchAddMsg(ms)), <<"SendHeartBeatAll", n>>) + +(***************************************************************************) +(* Recv appendentries *) +(***************************************************************************) +AcceptLeader(me, leader) == + /\ UpdateState(me, Follower) + /\ UpdateLeaderId(me, leader) + /\ IF raftState[me] = Follower + THEN UNCHANGED <> + ELSE reset(me) + +\* func: find_conflict_by_term +find_conflict_by_term(me, index, term) == + LET hint_index == Min(index, LogCount(log[me])) + F[i \in 0..hint_index ] == + IF hint_index = 0 + THEN <<0, 0>> + ELSE IF i = 0 + THEN << >> + ELSE IF term >= LogGetTerm(log[me] ,i, "find_conflict_by_term") + THEN <> + ELSE F[i-1] + IN F[hint_index] + +\* func: raft_log.maybe_commit() +SetCommitIdx(n, idx) == + /\ Assert(idx <= LogCurrentIdx(log'[n]), <<"SetCommitIdx: idx <= LogCurrentIdx(log'[n])", n, idx, log'>>) + /\ IF idx > commitIndex[n] + THEN UpdateCommitIdx(n, idx) + ELSE UNCHANGED <> + +HandleMsgAE(m) == \* func: handle_append + LET data == m.data + src == m.src + dst == m.dst + demote == currentTerm[dst] < data.term + stale == data.term < currentTerm[dst] + log_stale == data.index < commitIndex[dst] + log_stale_msg == AERFailLogStale(m) + success == AppendEntriesResponseSuccess(m) + IN IF stale \* drop stale message + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentries", "stale message ignore", dst, src, m>>) + ELSE /\ UNCHANGED <> + /\ IF demote \* Received a newerletter and became a follower, but there are related variables that need to be updated later, so only their term values are updated here. + THEN SetCurrentTerm(dst, data.term) + ELSE UNCHANGED <> + /\ AcceptLeader(dst, data.leader_id) \* Update the leader_id and make sure the node state is follower + /\ IF log_stale \* if m.index < self.raft_log.committed @ raft.rs + THEN /\ UNCHANGED <> + /\ NetUpdate2(NetReplyMsg(log_stale_msg, m), <<"RecvAppendentries", "log stale commit", dst, src, m>>) + ELSE LET ety == LogGetEntry(log[dst], data.index) + noPrevLog == ety = Nil + termMatch == \/ /\ noPrevLog + /\ data.log_term = 0 + \/ /\ ~noPrevLog + /\ ety.term = data.log_term + IN IF termMatch \* maybe_append@raft_log.rs + THEN /\ log' = Update(log, dst, LogGetMatchEntries(log[dst], data.entries, data.index)) + /\ IF commitIndex[dst] < data.commit + THEN LET lastLogIdx == Max(LogCurrentIdx(log'[dst]), 1) + idxToCommit == Min(lastLogIdx, data.commit) + IN SetCommitIdx(dst, idxToCommit) + ELSE UNCHANGED commitIndex + /\ NetUpdate2(NetReplyMsg(success, m), <<"RecvAppendentries", "success", dst, src, m>>) + ELSE LET conflict == find_conflict_by_term(dst, data.index, data.log_term) \* find_conflict_by_term @ raft_log.rs + fail == AERFailTermMismatch(m, conflict[1], conflict[2]) + IN /\ UNCHANGED <> + /\ NetUpdate2(NetReplyMsg(fail, m), <<"RecvAppendentries", "term Mismatch", dst, src, m>>) + + +(***************************************************************************) +(* Recv appendentries response *) +(***************************************************************************) +FlushSendAppendentries(me, m, tempNextIdx, tempInflight, info) == + LET F[i \in 0..NumServer] == + IF i = 0 THEN <<{}, Servers>> + ELSE LET n == CHOOSE n \in F[i-1][2]: TRUE + idx == LogCurrentIdx(log'[me]) + IN IF n = me + THEN <> + ELSE IF progress'[me][n][1] = Probe + THEN IF progress'[me][n][2] = TRUE + THEN <> + ELSE <> + ELSE IF tempInflight[me][n] /= 0 + THEN <> + ELSE <> + excludes == F[NumServer][1] + excludes2 == F[NumServer][1] \ {me} + ms == _BatchExcludesReqMsgsArg(me, excludes, _Dummy2, AppendEntriesNext, tempNextIdx) + next_keep == [ s \in excludes2 |-> tempNextIdx[me][s] ] + next_me == IF tempNextIdx[me][me] < LogCount(log'[me]) + 1 + THEN (me :> LogCount(log'[me]) + 1) + ELSE (me :> tempNextIdx[me][me] ) + next_update == [ s \in Servers \ excludes |-> IF tempNextIdx[me][s] <= LogCount(log'[me]) + THEN tempNextIdx[me][s] + 1 + ELSE tempNextIdx[me][s] ] + inflight_keep == [ s \in excludes |-> tempInflight[me][s]] + inflight_update == [ s \in Servers \ excludes |-> IF tempNextIdx[me][s] <= LogCount(log'[me]) + THEN tempNextIdx[me][s] + ELSE 0] + IN /\ nextIndex' = [ nextIndex EXCEPT ![me] = next_keep @@ next_update @@ next_me] + /\ inflight' = [inflight EXCEPT ![me] = inflight_keep @@ inflight_update] + /\ IF m = Nil \* RecvEntry: client request + THEN NetUpdate2(NetmanIncField("n_op", NetBatchAddMsg(ms)), info) + ELSE NetUpdate2(NetReplyBatchAddMsg(ms, m), info) + +\* (maybe_update + maybe_commit) in handle_append_response@raft.rs +AdvanceCommitIdx(me, m, succ_rsp, tempNextIndex, tempInflight) == \* func: raft_update_commitIndex + LET F[i \in 0..NumServer] == + IF i = 0 THEN <<<<>>, Servers>> + ELSE LET n == CHOOSE n \in F[i-1][2]: TRUE + IN <> + sorted_match_idx == SortSeq(F[NumServer][1], LAMBDA x, y: x > y) + commit == sorted_match_idx[NumServer \div 2 + 1] + can_send == tempInflight[me][m.src] = 0 + old_pause == \/ inflight[me][m.src] /= 0 + \/ /\ progress[me][m.src][1] = Probe + /\ progress[me][m.src][2] = TRUE + empty_entries == LogCount(succ_rsp.data.entries) = 0 + IN IF /\ commit > commitIndex[me] + /\ currentTerm[me] = LogGetTerm(log[me], commit, "AdvanceCommitIdx") + THEN /\ SetCommitIdx(me, commit) \* commit change, maybe send_bcast + /\ FlushSendAppendentries(me, m, tempNextIndex, tempInflight, <<"RecvAppendentriesResponse", "commit change", m.dst, m.src, m>>) + ELSE /\ UNCHANGED commitIndex + /\ IF can_send + THEN IF old_pause + THEN /\ NetUpdate2(NetReplyMsg(succ_rsp, m), <<"RecvAppendentriesResponse", "commit still send", m.dst, m.src, m>>) + /\ IF ~empty_entries + THEN UpdateInflight(me, m.src, succ_rsp.data.entries[1].index) + ELSE UpdateInflight(me, m.src, 0) + /\ IF empty_entries + THEN nextIndex' = tempNextIndex + ELSE UpdateNextIdx(me, m.src, succ_rsp.data.entries[1].index + 1) + ELSE IF empty_entries + THEN /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "commit still pause", m.dst, m.src, m>>) + /\ UpdateInflight(me, m.src, 0) + /\ nextIndex' = tempNextIndex + ELSE /\ NetUpdate2(NetReplyMsg(succ_rsp, m), <<"RecvAppendentriesResponse", "commit still send", m.dst, m.src, m>>) + /\ UpdateInflight(me, m.src, succ_rsp.data.entries[1].index) + /\ UpdateNextIdx(me, m.src, succ_rsp.data.entries[1].index + 1) + ELSE /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "commit still pause", m.dst, m.src, m>>) + /\ UNCHANGED inflight + /\ nextIndex' = tempNextIndex + +\* maybe_decr_to @ progress.rs +maybe_decr_to(dst, src, m) == + LET rejected == m.data.index + match_hint == m.data.reject_hint + IN IF progress[dst][src][1] = Replicate + THEN IF rejected <= matchIndex[dst][src] + THEN /\ UNCHANGED << nextIndex, progress>> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "replicate: stale", dst, src, m>>) + ELSE /\ UpdateNextIdx(dst, src, matchIndex[dst][src] + 1) + /\ LET one_rsp == AppendEntriesNext(dst, src, nextIndex') + IN /\ NetUpdate2(NetReplyMsg(one_rsp, m), <<"RecvAppendentriesResponse", "replicate trun to probe", dst, src, m>>) + /\ IF Len(one_rsp.data.entries) = 0 + THEN UpdateProgress(dst, src, <>) + ELSE UpdateProgress(dst, src, <>) + ELSE /\ IF \/ nextIndex[dst][src] = 0 + \/ nextIndex[dst][src] - 1 /= rejected + THEN /\ UNCHANGED << nextIndex, progress>> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "probe: stale", dst, src, m>>) + ELSE LET new_match == Min(rejected, match_hint + 1) + new_next_idx == Max(new_match, 1) + one_rsp == AppendEntriesNext(dst, src, nextIndex') + IN /\ UpdateNextIdx(dst, src, new_next_idx) + /\ NetUpdate2(NetReplyMsg(one_rsp, m), <<"RecvAppendentriesResponse", "probe: update next", dst, src, m>>) + /\ IF Len(one_rsp.data.entries) = 0 + THEN UpdateProgress(dst, src, <>) + ELSE UpdateProgress(dst, src, <>) + +\* func: handle_append +HandleMsgAER(m) == + LET resp == m.data + src == m.src + dst == m.dst + stale == resp.term < currentTerm[dst] + demote == resp.term > currentTerm[dst] + need_optimize == resp.reject /\ resp.log_term > 0 + next_probe_index == find_conflict_by_term(dst, resp.reject_hint, resp.log_term) + failReason == + IF stale THEN "stale message ignore" ELSE + IF resp.term > currentTerm[dst] THEN "term is smaller" ELSE + IF raftState[dst] /= Leader THEN "not leader" ELSE + IF need_optimize THEN "retry" ELSE "success" + IN IF failReason /= "success" + THEN IF failReason = "stale message ignore" \* drop stale message + THEN /\ UNCHANGED <> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "stale message ignore", dst, src, m>>) + ELSE IF failReason = "term is smaller" \* Received a newer letter and became a follower + THEN /\ BecomeFollower(dst, resp.term) + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "term is smaller", dst, src, m>>) + /\ UNCHANGED <> + ELSE IF failReason = "not leader" \* node not leader, drop the message + THEN /\ UNCHANGED <> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "not leader", dst, src, m>>) + ELSE IF failReason = "retry" \* m.reject + THEN /\ UNCHANGED <> + /\ maybe_decr_to(dst, src, m) + ELSE Assert(FALSE, <<"handle aer Unseen error situation", failReason>>) + ELSE \* success + /\ UNCHANGED <> + /\ LET prboeToReplicate == progress[dst][src][1] = Probe + nextToUpdate == Max(resp.index + 1, nextIndex[dst][src]) \* The simulation here is that a call to maybe_update in handle_append_response may update next_idx, but since it will be changed again in prepare_entries, a temporary variable is needed to retrieve the corresponding entries. + tempNextIndex == [nextIndex EXCEPT ![dst][src] = nextToUpdate] + \* The temp nextIndex is also needed here. + one_rsp == AppendEntriesNext(dst, src, tempNextIndex) + repCanSend == inflight[dst][src] <= resp.index \* The number of the arriving packet is stored in inflight, and in raft.rs, the replicate state will be free_to, so we'll simulate it directly here. + tempInflight == IF prboeToReplicate + THEN [inflight EXCEPT ![dst][src] = 0] + ELSE IF repCanSend + THEN [inflight EXCEPT ![dst][src] = 0] + ELSE inflight + IN /\ IF prboeToReplicate + THEN UpdateProgress(dst, src, <>) + ELSE UNCHANGED progress + /\ IF resp.index > matchIndex[dst][src] + THEN /\ UpdateMatchIdx(dst, src, resp.index) + /\ AdvanceCommitIdx(dst, m, one_rsp, tempNextIndex, tempInflight) \* Here we need to update the progress and nextIndex status according to the content of the letter, corresponding to the handle_append_response of the maybe_update to maybe_commit processing logic + ELSE /\ UNCHANGED << matchIndex, commitIndex, inflight, nextIndex>> + /\ NetUpdate2(NetDelMsg(m), <<"RecvAppendentriesResponse", "maybe_update_fail", dst, src, m>>) + +(***************************************************************************) +(* Recv heartBeat *) +(***************************************************************************) + +\* func: handle_heartbeat +HandleMsgHB(m) == + LET data == m.data + src == m.src + dst == m.dst + demote == currentTerm[dst] < data.term + stale == data.term < currentTerm[dst] + rsp == HeartBeatResponse(m) + IN IF stale + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeat", "stale message ignore", dst, src, m>>) + ELSE /\ IF \/ demote + \/ raftState[dst] = Candidate + THEN /\ BecomeFollowerWithLeader(dst, data.term, src) + /\ UNCHANGED <> + ELSE UNCHANGED <> + /\ UNCHANGED <> + /\ SetCommitIdx(dst, data.commit) + /\ NetUpdate2(NetReplyMsg(rsp, m), <<"RecvHeartBeat", "success", dst, src, m>>) + +(***************************************************************************) +(* Recv HeartBeatResponse *) +(***************************************************************************) +\* func: handle_heartbeat_response +HandleMsgHBR(m) == + LET resp == m.data + src == m.src + dst == m.dst + demote == resp.term > currentTerm[dst] + stale == resp.term < currentTerm[dst] + IN IF stale + THEN /\ UNCHANGED noNetVars + /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeatResponse", "stale message ignore", dst, src, m>>) + ELSE IF demote + THEN /\ UNCHANGED << logVars, check_quorum>> + /\ BecomeFollower(dst, resp.term) + /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeatResponse", "term is smaller", dst, src, m>>) + ELSE /\ UNCHANGED <> + /\ IF matchIndex[dst][src] < LogCount(log[dst]) + THEN LET req_msg == AppendEntriesNext(dst, src, nextIndex) + send_entry == req_msg.data.entries + isReplicate == progress[dst][src][1] = Replicate + inflightToUpdate == IF send_entry /= <<>> + THEN send_entry[1].index + ELSE 0 + nextIndexToUpdate == IF isReplicate + THEN IF send_entry /= <<>> + THEN nextIndex[dst][src] + 1 + ELSE nextIndex[dst][src] + ELSE nextIndex[dst][src] + IN /\ NetUpdate2(NetReplyMsg(req_msg, m), <<"RecvHeartBeatResponse", "send append", dst, src, m>>) + /\ UpdateInflight(dst, src ,inflightToUpdate) + /\ UpdateNextIdx(dst, src, nextIndexToUpdate) + ELSE /\ NetUpdate2(NetDelMsg(m), <<"RecvHeartBeatResponse", "not send", dst, src, m>>) + /\ UpdateInflight(dst, src ,0) + /\ UNCHANGED nextIndex + + +\* in step_leader: msg_propose +RecvEntry(n, data) == + /\ raftState[n] = Leader + /\ UNCHANGED <> + /\ LET ety == [ term |-> currentTerm[n], data |-> data, index |-> LogCount(log[n]) + 1] + IN log' = Update(log, n, LogAppend(log[n], ety)) + /\ IF matchIndex[n][n] < LogCount(log'[n]) + THEN UpdateMatchIdx(n, n, LogCount(log'[n])) + ELSE UNCHANGED matchIndex + /\ FlushSendAppendentries(n, Nil, nextIndex, inflight, <<"RecvEntry", n, data>>) + +(*************************************************************************** + State constraints + ***************************************************************************) + \* Here are some state limits to prevent state explosion due to control tla+ +GetRealLogLen(curLog) == SelectSeq(curLog, LAMBDA i: i.data /= NoOp) +GetMaxLogLen == Len(log[CHOOSE i \in Servers: \A j \in Servers \ {i}: + GetRealLogLen(log[i]) >= GetRealLogLen(log[j])]) +GetMaxTerm == currentTerm[CHOOSE i \in Servers: \A j \in Servers \ {i}: + currentTerm[i] >= currentTerm[j]] + +ScSent == CheckParameterMax(netman.n_sent, "MaxSentMsgs") +ScRecv == CheckParameterMax(netman.n_recv, "MaxRecvMsgs") +ScWire == CheckParameterMax(netman.n_wire, "MaxWireMsgs") +\* ScLog == CheckParameterMax(GetMaxLogLen, "MaxLogLength") +\* ScTerm == CheckParameterMax(GetMaxTerm, "MaxTerm") +ScPart == CheckParameterMax(netman.n_part, "MaxPartitionTimes") +ScCure == CheckParameterMax(netman.n_cure, "MaxCureTimes") +ScOp == CheckParameterMax(netman.n_op, "MaxClientOperationsTimes") +ScAe == CheckParameterMax(netman.n_ae, "MaxAppendEntriesTimes") +ScElec == CheckParameterMax(netman.n_elec, "MaxElectionTimes") +ScDrop == CheckParameterMax(netman.n_drop, "MaxDropTimes") +ScDup == CheckParameterMax(netman.n_dup, "MaxDupTimes") +ScUnorder == CheckParameterMax(netman.n_unorder, "MaxUnorderTimes") + +SC == /\ ScSent /\ ScRecv /\ ScWire /\ ScOp /\ ScAe /\ ScElec + /\ ScDrop /\ ScDup /\ ScUnorder + + +(***************************************************************************) +(* Invariants *) +(***************************************************************************) +ElectionSafety == + LET TwoLeader == + \E i, j \in Servers: + /\ i /= j + /\ currentTerm'[i] = currentTerm'[j] + /\ raftState'[i] = Leader + /\ raftState'[j] = Leader + IN ~TwoLeader + +LeaderAppendOnly == + \A i \in Servers: + IF raftState[i] = Leader /\ raftState'[i] = Leader + THEN LET curLog == log[i] + nextLog == log'[i] + IN IF Len(nextLog) >= Len(curLog) + THEN SubSeq(nextLog, 1, Len(curLog)) = curLog + ELSE FALSE + ELSE TRUE + +LogMatching == + ~UNCHANGED log => \* check the safety only if log has unchanged to avoid unnecessary evaluation cost + \A i, j \in Servers: + IF i /= j + THEN LET iLog == log'[i] + jLog == log'[j] + len == Min(Len(iLog), Len(jLog)) + F[k \in 0..len] == + IF k = 0 THEN <<>> + ELSE LET key1 == <> + value1 == iLog[k].data + key2 == <> + value2 == jLog[k].data + F1 == IF key1 \in DOMAIN F[k-1] + THEN IF F[k-1][key1] = value1 + THEN F[k-1] + ELSE F[k-1] @@ ( <<-1, -1>> :> <> ) + ELSE F[k-1] @@ (key1 :> value1) + F2 == IF key2 \in DOMAIN F1 + THEN IF F1[key2] = value2 + THEN F1 + ELSE F1 @@ ( <<-1, -1>> :> <> ) + ELSE F1 @@ (key2 :> value2) + IN F2 + IN IF << -1, -1>> \notin DOMAIN F[len] THEN TRUE + ELSE Assert(FALSE, <>) + ELSE TRUE + +MonotonicCurrentTerm == \A i \in Servers: currentTerm' [i] >= currentTerm[i] + +MonotonicCommitIdx == \A i \in Servers: commitIndex'[i] >= commitIndex[i] + +MonotonicMatchIdx == + \A i \in Servers: + IF raftState[i] = Leader /\ raftState'[i] = Leader \* change + THEN \A j \in Servers: matchIndex'[i][j] >= matchIndex[i][j] + ELSE TRUE + +CommittedLogDurable == + \A i \in Servers: + LET len == Min(commitIndex'[i], commitIndex[i]) + logNext == SubSeq(log'[i], 1, len) + logCur == SubSeq(log[i], 1, len) + IN IF len = 1 THEN TRUE + ELSE /\ Len(logNext) >= len + /\ Len(logCur) >= len + /\ logNext = logCur + +CommittedLogReplicatedMajority == + \A i \in Servers: + IF raftState'[i] /= Leader \/ commitIndex'[i] <= 1 + THEN TRUE + ELSE LET entries == SubSeq(log'[i], 1, commitIndex'[i]) + len == Len(entries) + nServer == Cardinality(Servers) + F[j \in 0..nServer] == + IF j = 0 + THEN <<{}, {}>> + ELSE LET k == CHOOSE k \in Servers: k \notin F[j-1][1] + logLenOk == LogCount(log'[k]) >= commitIndex'[i] + kEntries == SubSeq(log'[k], 1, commitIndex'[i]) + IN IF /\ logLenOk + /\ entries = kEntries + THEN <> + ELSE <> + IN IsQuorum(F[nServer][2]) + +NextIdxGtMatchIdx == + \A i \in Servers: + IF raftState'[i] = Leader + THEN \A j \in Servers \ {i}: nextIndex'[i][j] > matchIndex'[i][j] + ELSE TRUE + +NextIdxGtZero == + \A i \in Servers: + IF raftState'[i] = Leader + THEN \A j \in Servers: nextIndex'[i][j] > 0 + ELSE TRUE + +SelectSeqWithIdx(s, Test(_,_)) == + LET F[i \in 0..Len(s)] == + IF i = 0 + THEN <<>> + ELSE IF Test(s[i], i) + THEN Append(F[i-1], s[i]) + ELSE F[i-1] + IN F[Len(s)] + +FollowerLogLELeaderLogAfterAE == + LET cmd == netcmd'[1] + cmd1 == cmd[1] + cmd2 == cmd[2] + follower == cmd[3] + leader == cmd[4] + IN IF cmd1 = "RecvAppendentries" /\ cmd2 \in { "success", "term Mismatch" } + THEN IF log[follower] /= log'[follower] + THEN LogCount(log'[follower]) <= LogCount(log'[leader]) + ELSE TRUE + ELSE TRUE + +CommitIdxLELogLen == + \A i \in Servers: commitIndex'[i] <= LogCount(log'[i]) + +LeaderCommitCurrentTermLogs == + \A i \in Servers: + IF raftState'[i] = Leader + THEN IF commitIndex[i] /= commitIndex'[i] + THEN log'[i][commitIndex'[i]].term = currentTerm'[i] + ELSE TRUE + ELSE TRUE + +NewLeaderTermNotInLog == + \A i \in Servers: + IF raftState'[i] = Leader /\ raftState[i] /= Leader + THEN \A j \in Servers \ {i}: + \A n \in DOMAIN log'[j]: + log'[j][n].term /= currentTerm'[i] + ELSE TRUE + +LeaderTermLogHasGreatestIdx == + \A i \in Servers: + IF raftState'[i] = Leader + THEN \A j \in Servers \ {i}: + LET IncTermLogCount(a, b) == IF a.term = currentTerm'[i] THEN b + 1 ELSE b + IN FoldSeq(IncTermLogCount, 0, log'[i]) >= FoldSeq(IncTermLogCount, 0, log'[j]) + ELSE TRUE + +CheckLeader == + \A i \in Servers: + raftState[i] /= Leader + +InvSequence == << + ElectionSafety, + LeaderAppendOnly, + LogMatching, + MonotonicCurrentTerm, + MonotonicCommitIdx, + MonotonicMatchIdx, + CommittedLogDurable, + CommittedLogReplicatedMajority, + NextIdxGtMatchIdx, + NextIdxGtZero, + \* CheckLeader + FollowerLogLELeaderLogAfterAE, + CommitIdxLELogLen, + LeaderCommitCurrentTermLogs, + NewLeaderTermNotInLog, + LeaderTermLogHasGreatestIdx + +>> + +INV == Len(SelectSeqWithIdx(inv, LAMBDA x, y: ~x /\ y \notin netman.no_inv)) = 0 + + + + (*************************************************************************** + Next actions + ***************************************************************************) + +DoElectionTimeout == + /\ PrePrune(netman.n_elec, "MaxElectionTimes") + /\ \E n \in Servers: CheckStateIs(n, Follower) /\ BecomeCandidate(n) + /\ inv' = InvSequence + + +DoHeartBeat == + /\ PrePrune(netman.n_hb, "MaxHeartBeatTimes") + /\ \E n \in Servers: + /\ raftState[n] = Leader + /\ SendHeartBeatAll(n) + /\ inv' = InvSequence + + +_DoRecvM(type, func(_)) == + /\ \E m \in msgs: + /\ m /= Nil + /\ m.type = type + /\ func(m) + /\ inv' = InvSequence + + +DoHandleMsgRV == /\ _DoRecvM(M_RV, HandleMsgRV) + +DoHandleMsgRVR == /\ _DoRecvM(M_RVR, HandleMsgRVR) + +DoHandleMsgAE == /\ _DoRecvM(M_AE, HandleMsgAE) + +DoHandleMsgAER == /\ _DoRecvM(M_AER, HandleMsgAER) + +DoHandleMsgHB == /\ _DoRecvM(M_HB, HandleMsgHB) + +DoHandleMsgHBR == /\ _DoRecvM(M_HBR, HandleMsgHBR) + +DoRecvEntry == + /\ PrePrune(netman.n_op, "MaxClientOperationsTimes") + /\ \E n \in Servers, v \in Commands: RecvEntry(n, v) + /\ inv' = InvSequence + +DoNetworkDrop == + /\ PrePrune(NetGetDrop, "MaxDropTimes") + /\ \E m \in msgs: + /\ NetUpdate2(NetDropMsg(m), <<"DoNetworkDrop", m.dst, m.src, m.seq>>) + /\ UNCHANGED noNetVars + /\ inv' = InvSequence + +DoNetworkDup == + /\ PrePrune(NetGetDup, "MaxDupTimes") + /\ \E m \in msgs: + /\ NetUpdate2(NetDupMsg(m), <<"DoNetworkDup", m.dst, m.src, m.seq>>) + /\ UNCHANGED noNetVars + /\ inv' = InvSequence + +Next == +\* \/ DoRestart + \/ DoElectionTimeout + \/ DoHeartBeat + \/ DoHandleMsgRV + \/ DoHandleMsgRVR + \/ DoHandleMsgHB + \/ DoHandleMsgHBR + \/ DoHandleMsgAE + \/ DoHandleMsgAER + \/ DoRecvEntry + \/ DoNetworkDrop + \/ DoNetworkDup + + + +Spec == Init /\ [][Next]_vars +==== \ No newline at end of file diff --git a/tla/consensus/UdpNetwork.tla b/tla/consensus/UdpNetwork.tla new file mode 100644 index 00000000..b7f62842 --- /dev/null +++ b/tla/consensus/UdpNetwork.tla @@ -0,0 +1,238 @@ +----------------- MODULE UdpNetwork ---------------------- +EXTENDS TLC, Naturals, FiniteSets, Sequences +(*************************************************************************** + VARIABLES definitions: see InitUdpNetwork + ***************************************************************************) + +VARIABLES _msgs, \* Messages in the network + _netman, \* Network manager + _netcmd \* Current network cmd + + +(* NULL_MSG: represent a null msg in some condition checkings + shoule be a model value if its type is constant *) +CONSTANT NULL_MSG +\*NULL_MSG == [ NULL_MSG |-> "" ] + +---- \* Common functions + +(*****) +(* API InitUdpNetwork(nodes): + - _msgs: init to empty set of msgs + * format [seq |-> 0, src |-> s0, dst |-> s1, type |-> sth, data -> sth] + * src and dst will be dropped when storing in _msgs + * type and data are user defined fields + _ _netman: + - n_sent: number of msgs sent to network, to indicate next msg seq + - n_recv: number of msgs delivered to server + - n_wire: number of msgs in network but not delivered yet + - n_unorder: unordered failure times + - n_drop: drop failure times + - n_dup: duplicate failure times + - _netcmd: <<"Init">> *) +(*****) + + +InitUdpNetworkNetman(nodes, cmd, additonalNetman) == + /\ _msgs = {} + /\ _netman = additonalNetman @@ + [ n_sent |-> 0, n_recv |-> 0, n_wire |-> 0, + n_unorder |-> 0, n_drop |-> 0, n_dup |-> 0 ] + /\ _netcmd = <> + +InitUdpNetwork(nodes) == InitUdpNetworkNetman(nodes, "init", <<>>) + +------ \* Update _netman functions + +(*************************************************************************** + _NetGetHelper and _NetIncHelper: get, inc and dec member of _netman records + ***************************************************************************) +_NetGetHelper(member) == _netman[member] +_NetIncHelper(member) == (member :> _netman[member] + 1) +_NetDecHelper(member) == (member :> _netman[member] - 1) +NetIncBy(member, number) == (member :> _netman[member] + number) + +NetGetSent == _NetGetHelper("n_sent") +NetIncSent == _NetIncHelper("n_sent") +NetGetRecv == _NetGetHelper("n_recv") +NetIncRecv == _NetIncHelper("n_recv") +NetGetWire == _NetGetHelper("n_wire") +NetIncWire == _NetIncHelper("n_wire") +NetDecWire == _NetDecHelper("n_wire") +NetGetUnorder == _NetGetHelper("n_unorder") +NetIncUnorder == _NetIncHelper("n_unorder") +NetGetDrop == _NetGetHelper("n_drop") +NetIncDrop == _NetIncHelper("n_drop") +NetGetDup == _NetGetHelper("n_dup") +NetIncDup == _NetIncHelper("n_dup") + +---- \* Network send and recv functions + +(****) +(* API updater : + * return <> *) +(****) +NetmanIncField(field, updater) == + <<_NetIncHelper(field) @@ updater[1]>> @@ updater + + +\*NetmanIncFieldWithoutUpdate(field) == +\* <<_NetIncHelper(field) , _msgs ,_netcmd>> + + +\* return <> +_AddMsgSeq(m, seq, msgs) == LET m_ == IF "seq" \in DOMAIN m \* TODO: add partition, see wraft + THEN {[ m EXCEPT !["seq"] = seq ]} \* inc dup to indicate it is a duplicate msg + ELSE {m @@ [ seq |-> seq]} + IN <<1, msgs \union m_>> + +\* Add msg to msgs, increase scr.nMessage. +_AddMsg(m, msgs) == LET seq == NetGetSent + 1 + IN _AddMsgSeq(m, seq, msgs) + + + +(****) +(* _BatchAddMsgs: batch add multi messages to msgs + * return <> + * set global seq to each msg m +*) +(****) +_BatchAddMsgs(ms, msgs)== + LET F[i \in 0 .. Len(ms)] == + IF i = 0 THEN <<0, msgs, <<"msg_batch_add">> >> + ELSE LET m == ms[i] + seq == NetGetSent + F[i-1][1] + 1 + res == _AddMsgSeq(m, seq ,F[i-1][2]) + IN << res[1] + F[i-1][1], res[2], Append(F[i-1][3], + IF res[1] = 1 THEN <<"ok", m.src, m.dst, seq>> + ELSE <<"dropped", m.src, m.dst, seq>>) >> + IN F[Len(ms)] + + +(*************************************************************************** + _DelMsg: delete m from msgs return <> + ***************************************************************************) +\* Del msg from msgs. +_DelMsg(m, msgs) == + IF m \in msgs + THEN <<1, msgs\ {m}>> + ELSE Assert(FALSE, "Delmsg: not in network") + +(*************************************************************************** + _ReplyMsg: delete request from msgs and then add reponse to msgs + * return <> + ***************************************************************************) +\* Combination of Send and Discard. +_ReplyMsg(response, request, msgs) == + LET del == _DelMsg(request, msgs) + add == _AddMsg(response, del[2]) + IN <> + +(*************************************************************************** + API NetGetMsg: Get msg from src -> dst FIFO head + * return msg m + 真的需要实现吗?? 讨论 + ***************************************************************************) +\* NetGetMsg(src, dst) == _GetMsg(src, dst, _msgs) + +(* inc unorder *) + +IsFirstMsg(m) == + LET myMsg == { i \in _msgs: i.dst = m.dst } \* 是否需要考虑src? + first == CHOOSE i \in myMsg: i.seq <= m.seq + IN first = m + +NetIncRecvCheckUnorder(m) == + IF IsFirstMsg(m) + THEN NetIncRecv + ELSE NetIncRecv @@ NetIncUnorder + + +(*************************************************************************** + API NetDelMsg: Del msgs of m + * return <> + * update with NetUpdate + ***************************************************************************) +NetDelMsg(m) == + LET res == _DelMsg(m, _msgs) + IN <> >> + +(*************************************************************************** + API NetDropMsg: Drop msgs of m + * return <> + * update with NetUpdate + ***************************************************************************) +NetDropMsg(m) == + LET res == _DelMsg(m, _msgs) + IN <> >> + + +(*************************************************************************** + API NetDupMsg: Duplicate msgs of m + * return <> + * update with NetUpdate + ***************************************************************************) +NetDupMsg(m) == + LET res == _AddMsg(m, _msgs) + IN <> >> + + +(****) +(* API NetAddMsg : add m into msgs + * return <> *) +(****) +NetAddMsg(m) == + LET res == _AddMsg(m, _msgs) + IN IF res[1] = 1 + THEN <> >> \* here we do not need seq, because we put in network then sort + ELSE <<_netman, res[2], <<"msg_add_dropped", m.src, m.dst>> >> + +NetReplyMsg(response, request) == + LET res == _ReplyMsg(response, request, _msgs) + IN IF res[1] = 0 + THEN <> >> + ELSE <> >> + + +(*************************************************************************** + API NetBatchAddMsg: batch add messages ms to msgs + ***************************************************************************) +NetBatchAddMsg(ms) == + LET res == _BatchAddMsgs(ms, _msgs) + IN <> + +(*************************************************************************** + API NetReplyBatchAddMsg: remove request and batch add ms to msgs + ***************************************************************************) +NetReplyBatchAddMsg(ms, request) == + LET del == _DelMsg(request, _msgs) + add == _BatchAddMsgs(ms, del[2]) + IN <>, add[3])>> + +(*************************************************************************** + API NetNoAction: Network state unchanged + * return <> + ***************************************************************************) +NetNoAction(cmd) == <<_netman, _msgs, cmd>> + + +NetUpdate(args) == + /\ _netman' = args[1] @@ _netman + /\ _msgs' = args[2] + /\ IF Len(args) = 3 + THEN _netcmd' = args[3] + ELSE _netcmd' = <<"noop">> + +NetUpdate2(args, cmd) == + /\ _netman' = args[1] @@ _netman + /\ _msgs' = args[2] + /\ IF Len(args) = 3 + THEN _netcmd' = <> + ELSE _netcmd' = <> + + +==== \ No newline at end of file diff --git a/tla/consensus/WrapperTcp.cfg b/tla/consensus/WrapperTcp.cfg new file mode 100644 index 00000000..7d8d68cb --- /dev/null +++ b/tla/consensus/WrapperTcp.cfg @@ -0,0 +1,59 @@ +SPECIFICATION +spec + +INVARIANT +inv_TypeOk +inv_INV + +CONSTANT M_AER = M_AER + +CONSTANT M_AE = M_AE + +CONSTANT M_PRV = M_PRV + +CONSTANT M_PRVR = M_PRVR + +CONSTANT M_RVR = M_RVR + +CONSTANT M_RV = M_RV + +CONSTANT M_HB = M_HB + +CONSTANT M_HBR = M_HBR + +CONSTANT Leader = Leader + +CONSTANT Follower = Follower + +CONSTANT PreCandidate = PreCandidate + +CONSTANT Candidate = Candidate + +CONSTANT Probe = Probe + +CONSTANT Replicate = Replicate + +CONSTANT NoOp = NoOp + +CONSTANT Nil = Nil + +CONSTANTS +v1 = v1 +v2 = v2 +CONSTANT +Commands <- const_Commands + +CONSTANTS +n1 = n1 +n2 = n2 +n3 = n3 +CONSTANT +Servers <- const_Servers + +CONSTANT +Parameters <- const_Parameters + +SYMMETRY symm_2 + +CONSTRAINT +constr_SC diff --git a/tla/consensus/WrapperTcp.tla b/tla/consensus/WrapperTcp.tla new file mode 100644 index 00000000..2786a108 --- /dev/null +++ b/tla/consensus/WrapperTcp.tla @@ -0,0 +1,42 @@ +---- MODULE WrapperTcp ---- +EXTENDS RaftRs, TLC + +spec == +Spec +---- + +inv_TypeOk == +TypeOk +inv_INV == +INV +---- + +CONSTANTS +v1, v2 +const_Commands == +{v1, v2} +---- + +CONSTANTS +n1, n2, n3 +const_Servers == +{n1, n2, n3} +---- + +const_Parameters == +[MaxElectionTimes |-> 3, +MaxRestart |-> 1, +MaxAppendEntriesTimes |-> 3, +MaxHeartBeatTimes |-> 3, +MaxPartitionTimes |-> 1, +MaxClientOperationsTimes |-> 3, +MaxWireMsgs |-> 8] +---- + +symm_2 == +Permutations(const_Commands) \union Permutations(const_Servers) +---- + +constr_SC == +SC +==================== diff --git a/tla/consensus/WrapperUdp.cfg b/tla/consensus/WrapperUdp.cfg new file mode 100644 index 00000000..7d8d68cb --- /dev/null +++ b/tla/consensus/WrapperUdp.cfg @@ -0,0 +1,59 @@ +SPECIFICATION +spec + +INVARIANT +inv_TypeOk +inv_INV + +CONSTANT M_AER = M_AER + +CONSTANT M_AE = M_AE + +CONSTANT M_PRV = M_PRV + +CONSTANT M_PRVR = M_PRVR + +CONSTANT M_RVR = M_RVR + +CONSTANT M_RV = M_RV + +CONSTANT M_HB = M_HB + +CONSTANT M_HBR = M_HBR + +CONSTANT Leader = Leader + +CONSTANT Follower = Follower + +CONSTANT PreCandidate = PreCandidate + +CONSTANT Candidate = Candidate + +CONSTANT Probe = Probe + +CONSTANT Replicate = Replicate + +CONSTANT NoOp = NoOp + +CONSTANT Nil = Nil + +CONSTANTS +v1 = v1 +v2 = v2 +CONSTANT +Commands <- const_Commands + +CONSTANTS +n1 = n1 +n2 = n2 +n3 = n3 +CONSTANT +Servers <- const_Servers + +CONSTANT +Parameters <- const_Parameters + +SYMMETRY symm_2 + +CONSTRAINT +constr_SC diff --git a/tla/consensus/WrapperUdp.tla b/tla/consensus/WrapperUdp.tla new file mode 100644 index 00000000..7f427bd3 --- /dev/null +++ b/tla/consensus/WrapperUdp.tla @@ -0,0 +1,44 @@ +---- MODULE WrapperUdp ---- +EXTENDS RaftRsUdp, TLC + +spec == +Spec +---- + +inv_TypeOk == +TypeOk +inv_INV == +INV +---- + +CONSTANTS +v1, v2 +const_Commands == +{v1, v2} +---- + +CONSTANTS +n1, n2, n3 +const_Servers == +{n1, n2, n3} +---- + +const_Parameters == +[ MaxElectionTimes |-> 3, +MaxRestart |-> 1, +MaxAppendEntriesTimes |-> 4, +MaxDupTimes |-> 1, +MaxDropTimes |-> 1, +MaxHeartBeatTimes |-> 3, +MaxUnorderTimes |-> 4, +MaxClientOperationsTimes |-> 6, +MaxWireMsgs |-> 8 ] +---- + +symm_2 == +Permutations(const_Commands) \union Permutations(const_Servers) +---- + +constr_SC == +SC +====================