From 7c868aea85f1255f093ad397b3a41cf477512e1e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Mar 2022 18:44:53 +0800 Subject: [PATCH] sync index --- source/libs/sync/inc/syncInt.h | 15 ++ source/libs/sync/src/syncElection.c | 13 ++ source/libs/sync/src/syncMain.c | 219 ++++++++++++++-------------- source/libs/sync/src/syncTimeout.c | 10 +- 4 files changed, 135 insertions(+), 122 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8866603b8e..4a1557addd 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -209,6 +209,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); @@ -218,6 +219,20 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); +// raft state change -------------- +void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); +void syncNodeBecomeFollower(SSyncNode* pSyncNode); +void syncNodeBecomeLeader(SSyncNode* pSyncNode); + +void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +void syncNodeLeader2Follower(SSyncNode* pSyncNode); +void syncNodeCandidate2Follower(SSyncNode* pSyncNode); + +// raft vote -------------- +void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); +void syncNodeVoteForSelf(SSyncNode* pSyncNode); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 223431336e..d06253c049 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -16,6 +16,7 @@ #include "syncElection.h" #include "syncMessage.h" #include "syncRaftStore.h" +#include "syncVoteMgr.h" // TLA+ Spec // RequestVote(i, j) == @@ -49,10 +50,22 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { } int32_t syncNodeElect(SSyncNode* pSyncNode) { + if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { + syncNodeFollower2Candidate(pSyncNode); + } assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); // start election + raftStoreNextTerm(pSyncNode->pRaftStore); + raftStoreClearVote(pSyncNode->pRaftStore); + voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->pRaftStore->currentTerm); + votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm); + + syncNodeVoteForSelf(pSyncNode); int32_t ret = syncNodeRequestVotePeers(pSyncNode); + assert(ret == 0); + syncNodeResetElectTimer(pSyncNode); + return ret; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index da1286bd32..5ec9df1e70 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -42,20 +42,6 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); // on message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); - -// raft state change ---- -static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); -static void syncNodeBecomeFollower(SSyncNode* pSyncNode); -static void syncNodeBecomeLeader(SSyncNode* pSyncNode); - -static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); -static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); -static void syncNodeLeader2Follower(SSyncNode* pSyncNode); -static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); - -// raft vote ---- -static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); -static void syncNodeVoteForSelf(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -325,6 +311,13 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { return ret; } +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { + int32_t ret = 0; + int32_t electMS = syncUtilElectRandomMS(); + ret = syncNodeRestartElectTimer(pSyncNode, electMS); + return ret; +} + int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, @@ -495,6 +488,105 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } +// raft state change -------------- +void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { + if (term > pSyncNode->pRaftStore->currentTerm) { + raftStoreSetTerm(pSyncNode->pRaftStore, term); + syncNodeBecomeFollower(pSyncNode); + raftStoreClearVote(pSyncNode->pRaftStore); + } +} + +void syncNodeBecomeFollower(SSyncNode* pSyncNode) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + pSyncNode->leaderCache = EMPTY_RAFT_ID; + } + + pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + syncNodeStopHeartbeatTimer(pSyncNode); + + int32_t electMS = syncUtilElectRandomMS(); + syncNodeRestartElectTimer(pSyncNode, electMS); +} + +// TLA+ Spec +// \* Candidate i transitions to leader. +// BecomeLeader(i) == +// /\ state[i] = Candidate +// /\ votesGranted[i] \in Quorum +// /\ state' = [state EXCEPT ![i] = Leader] +// /\ nextIndex' = [nextIndex EXCEPT ![i] = +// [j \in Server |-> Len(log[i]) + 1]] +// /\ matchIndex' = [matchIndex EXCEPT ![i] = +// [j \in Server |-> 0]] +// /\ elections' = elections \cup +// {[eterm |-> currentTerm[i], +// eleader |-> i, +// elog |-> log[i], +// evotes |-> votesGranted[i], +// evoterLog |-> voterLog[i]]} +// /\ UNCHANGED <> +// +void syncNodeBecomeLeader(SSyncNode* pSyncNode) { + pSyncNode->state = TAOS_SYNC_STATE_LEADER; + pSyncNode->leaderCache = pSyncNode->myRaftId; + + for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { + pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; + } + + for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { + pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID; + } + + syncNodeStopElectTimer(pSyncNode); + syncNodeStartHeartbeatTimer(pSyncNode); + syncNodeReplicate(pSyncNode); +} + +void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + assert(voteGrantedMajority(pSyncNode->pVotesGranted)); + syncNodeBecomeLeader(pSyncNode); +} + +void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); + pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; +} + +void syncNodeLeader2Follower(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + syncNodeBecomeFollower(pSyncNode); +} + +void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + syncNodeBecomeFollower(pSyncNode); +} + +// raft vote -------------- +void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { + assert(term == pSyncNode->pRaftStore->currentTerm); + assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); + + raftStoreVote(pSyncNode->pRaftStore, pRaftId); +} + +void syncNodeVoteForSelf(SSyncNode* pSyncNode) { + syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); + + SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = pSyncNode->myRaftId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->voteGranted = true; + + voteGrantedVote(pSyncNode->pVotesGranted, pMsg); + votesRespondAdd(pSyncNode->pVotesRespond, pMsg); + syncRequestVoteReplyDestroy(pMsg); +} + // for debug -------------- void syncNodePrint(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); @@ -602,102 +694,3 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); return ret; } - -// raft state change ---- -static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > pSyncNode->pRaftStore->currentTerm) { - raftStoreSetTerm(pSyncNode->pRaftStore, term); - syncNodeBecomeFollower(pSyncNode); - raftStoreClearVote(pSyncNode->pRaftStore); - } -} - -static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - pSyncNode->leaderCache = EMPTY_RAFT_ID; - } - - pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - syncNodeStopHeartbeatTimer(pSyncNode); - - int32_t electMS = syncUtilElectRandomMS(); - syncNodeRestartElectTimer(pSyncNode, electMS); -} - -// TLA+ Spec -// \* Candidate i transitions to leader. -// BecomeLeader(i) == -// /\ state[i] = Candidate -// /\ votesGranted[i] \in Quorum -// /\ state' = [state EXCEPT ![i] = Leader] -// /\ nextIndex' = [nextIndex EXCEPT ![i] = -// [j \in Server |-> Len(log[i]) + 1]] -// /\ matchIndex' = [matchIndex EXCEPT ![i] = -// [j \in Server |-> 0]] -// /\ elections' = elections \cup -// {[eterm |-> currentTerm[i], -// eleader |-> i, -// elog |-> log[i], -// evotes |-> votesGranted[i], -// evoterLog |-> voterLog[i]]} -// /\ UNCHANGED <> -// -static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { - pSyncNode->state = TAOS_SYNC_STATE_LEADER; - pSyncNode->leaderCache = pSyncNode->myRaftId; - - for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { - pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; - } - - for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { - pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID; - } - - syncNodeStopElectTimer(pSyncNode); - syncNodeStartHeartbeatTimer(pSyncNode); - syncNodeReplicate(pSyncNode); -} - -static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - assert(voteGrantedMajority(pSyncNode->pVotesGranted)); - syncNodeBecomeLeader(pSyncNode); -} - -static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); - pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; -} - -static void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode); -} - -static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode); -} - -// raft vote ---- -static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { - assert(term == pSyncNode->pRaftStore->currentTerm); - assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); - - raftStoreVote(pSyncNode->pRaftStore, pRaftId); -} - -static void syncNodeVoteForSelf(SSyncNode* pSyncNode) { - syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); - - SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = pSyncNode->myRaftId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->voteGranted = true; - - voteGrantedVote(pSyncNode->pVotesGranted, pMsg); - votesRespondAdd(pSyncNode->pVotesRespond, pMsg); - syncRequestVoteReplyDestroy(pMsg); -} \ No newline at end of file diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 7cbfd6d40a..3a48b0cbb3 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -19,15 +19,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; - sTrace("<-- syncNodeOnTimeoutCb -->"); - - { - cJSON* pJson = syncTimeout2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } + syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg); if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {