diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 0156e695a3..35d3046d66 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -28,6 +28,71 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleAppendEntriesRequest(i, j, m) == +// LET logOk == \/ m.mprevLogIndex = 0 +// \/ /\ m.mprevLogIndex > 0 +// /\ m.mprevLogIndex <= Len(log[i]) +// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ /\ \* reject request +// \/ m.mterm < currentTerm[i] +// \/ /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ \lnot logOk +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> FALSE, +// mmatchIndex |-> 0, +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* return to follower state +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Candidate +// /\ state' = [state EXCEPT ![i] = Follower] +// /\ UNCHANGED <> +// \/ \* accept request +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ logOk +// /\ LET index == m.mprevLogIndex + 1 +// IN \/ \* already done with request +// /\ \/ m.mentries = << >> +// \/ /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term = m.mentries[1].term +// \* This could make our commitIndex decrease (for +// \* example if we process an old, duplicated request), +// \* but that doesn't really affect anything. +// /\ commitIndex' = [commitIndex EXCEPT ![i] = +// m.mcommitIndex] +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> TRUE, +// mmatchIndex |-> m.mprevLogIndex + +// Len(m.mentries), +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* conflict: remove 1 entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term /= m.mentries[1].term +// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> +// log[i][index2]] +// IN log' = [log EXCEPT ![i] = new] +// /\ UNCHANGED <> +// \/ \* no conflict: append entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) = m.mprevLogIndex +// /\ log' = [log EXCEPT ![i] = +// Append(log[i], m.mentries[1])] +// /\ UNCHANGED <> +// /\ UNCHANGED <> +// int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 7b80172e8d..75b82aa531 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -28,6 +28,19 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleAppendEntriesResponse(i, j, m) == +// /\ m.mterm = currentTerm[i] +// /\ \/ /\ m.msuccess \* successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] +// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] +// \/ /\ \lnot m.msuccess \* not successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = +// Max({nextIndex[i][j] - 1, 1})] +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +// int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index abacfb8093..019c291efc 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,10 +26,21 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -int32_t syncNodeElect(SSyncNode* pSyncNode); - +// TLA+ Spec +// RequestVote(i, j) == +// /\ state[i] = Candidate +// /\ j \notin votesResponded[i] +// /\ Send([mtype |-> RequestVoteRequest, +// mterm |-> currentTerm[i], +// mlastLogTerm |-> LastTerm(log[i]), +// mlastLogIndex |-> Len(log[i]), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +// int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); +int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 72ce986a7e..467cfdde5c 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,6 +26,31 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +// TLA+ Spec +// AppendEntries(i, j) == +// /\ i /= j +// /\ state[i] = Leader +// /\ LET prevLogIndex == nextIndex[i][j] - 1 +// prevLogTerm == IF prevLogIndex > 0 THEN +// log[i][prevLogIndex].term +// ELSE +// 0 +// \* Send up to 1 entry, constrained by the end of the log. +// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) +// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) +// IN Send([mtype |-> AppendEntriesRequest, +// mterm |-> currentTerm[i], +// mprevLogIndex |-> prevLogIndex, +// mprevLogTerm |-> prevLogTerm, +// mentries |-> entries, +// \* mlog is used as a history variable for the proof. +// \* It would not exist in a real implementation. +// mlog |-> log[i], +// mcommitIndex |-> Min({commitIndex[i], lastEntry}), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +// int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index da821c3ebd..8bb4976de2 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,6 +28,28 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleRequestVoteRequest(i, j, m) == +// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) +// \/ /\ m.mlastLogTerm = LastTerm(log[i]) +// /\ m.mlastLogIndex >= Len(log[i]) +// grant == /\ m.mterm = currentTerm[i] +// /\ logOk +// /\ votedFor[i] \in {Nil, j} +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] +// \/ ~grant /\ UNCHANGED votedFor +// /\ Reply([mtype |-> RequestVoteResponse, +// mterm |-> currentTerm[i], +// mvoteGranted |-> grant, +// \* mlog is used just for the `elections' history variable for +// \* the proof. It would not exist in a real implementation. +// mlog |-> log[i], +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 82f132f80b..ab9430b857 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,6 +28,23 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleRequestVoteResponse(i, j, m) == +// \* This tallies votes even when the current state is not Candidate, but +// \* they won't be looked at, so it doesn't matter. +// /\ m.mterm = currentTerm[i] +// /\ votesResponded' = [votesResponded EXCEPT ![i] = +// votesResponded[i] \cup {j}] +// /\ \/ /\ m.mvoteGranted +// /\ votesGranted' = [votesGranted EXCEPT ![i] = +// votesGranted[i] \cup {j}] +// /\ voterLog' = [voterLog EXCEPT ![i] = +// voterLog[i] @@ (j :> m.mlog)] +// \/ /\ ~m.mvoteGranted +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 23df8a539c..0a5120c8dc 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -27,4 +27,5 @@ // /\ UNCHANGED <> // /\ Discard(m) // /\ UNCHANGED <> +// int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {} diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index fe86d220cc..87017b718d 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -16,11 +16,6 @@ #include "syncElection.h" #include "syncMessage.h" -int32_t syncNodeElect(SSyncNode* pSyncNode) { - // start election - syncNodeRequestVotePeers(pSyncNode); -} - // TLA+ Spec // RequestVote(i, j) == // /\ state[i] = Candidate @@ -32,8 +27,14 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> +// int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} +int32_t syncNodeElect(SSyncNode* pSyncNode) { + // start election + syncNodeRequestVotePeers(pSyncNode); +} + int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode); int32_t ret = 0; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 878a870677..37e8959ff3 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -40,6 +40,7 @@ // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> +// int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 533043c512..354c559a90 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -36,4 +36,5 @@ // mdest |-> j], // m) // /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a5b434dbc5..72223ea83c 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -31,4 +31,5 @@ // /\ UNCHANGED <> // /\ Discard(m) // /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {}