sync index
This commit is contained in:
parent
8ae9fb6ae1
commit
7c868aea85
|
@ -209,6 +209,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
|
|||
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
|
||||
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
|
||||
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
|
||||
|
||||
|
@ -218,6 +219,20 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
|||
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
||||
char* syncNode2Str(const SSyncNode* pSyncNode);
|
||||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
|
||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode);
|
||||
|
||||
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
|
||||
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
|
||||
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
|
||||
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
|
||||
|
||||
// raft vote --------------
|
||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
|
||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
||||
|
||||
// for debug --------------
|
||||
void syncNodePrint(SSyncNode* pObj);
|
||||
void syncNodePrint2(char* s, SSyncNode* pObj);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "syncElection.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncVoteMgr.h"
|
||||
|
||||
// TLA+ Spec
|
||||
// RequestVote(i, j) ==
|
||||
|
@ -49,10 +50,22 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||
syncNodeFollower2Candidate(pSyncNode);
|
||||
}
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
|
||||
// start election
|
||||
raftStoreNextTerm(pSyncNode->pRaftStore);
|
||||
raftStoreClearVote(pSyncNode->pRaftStore);
|
||||
voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->pRaftStore->currentTerm);
|
||||
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm);
|
||||
|
||||
syncNodeVoteForSelf(pSyncNode);
|
||||
int32_t ret = syncNodeRequestVotePeers(pSyncNode);
|
||||
assert(ret == 0);
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,20 +42,6 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
|
|||
// on message ----
|
||||
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
|
||||
// raft state change ----
|
||||
static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
|
||||
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
|
||||
|
||||
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
|
||||
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
|
||||
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
|
||||
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
|
||||
|
||||
// raft vote ----
|
||||
static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
|
||||
static void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
||||
// ---------------------------------
|
||||
|
||||
int32_t syncInit() {
|
||||
|
@ -325,6 +311,13 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t electMS = syncUtilElectRandomMS();
|
||||
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
|
@ -495,6 +488,105 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
|
|||
return serialized;
|
||||
}
|
||||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||
if (term > pSyncNode->pRaftStore->currentTerm) {
|
||||
raftStoreSetTerm(pSyncNode->pRaftStore, term);
|
||||
syncNodeBecomeFollower(pSyncNode);
|
||||
raftStoreClearVote(pSyncNode->pRaftStore);
|
||||
}
|
||||
}
|
||||
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||
}
|
||||
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
int32_t electMS = syncUtilElectRandomMS();
|
||||
syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
}
|
||||
|
||||
// TLA+ Spec
|
||||
// \* Candidate i transitions to leader.
|
||||
// BecomeLeader(i) ==
|
||||
// /\ state[i] = Candidate
|
||||
// /\ votesGranted[i] \in Quorum
|
||||
// /\ state' = [state EXCEPT ![i] = Leader]
|
||||
// /\ nextIndex' = [nextIndex EXCEPT ![i] =
|
||||
// [j \in Server |-> Len(log[i]) + 1]]
|
||||
// /\ matchIndex' = [matchIndex EXCEPT ![i] =
|
||||
// [j \in Server |-> 0]]
|
||||
// /\ elections' = elections \cup
|
||||
// {[eterm |-> currentTerm[i],
|
||||
// eleader |-> i,
|
||||
// elog |-> log[i],
|
||||
// evotes |-> votesGranted[i],
|
||||
// evoterLog |-> voterLog[i]]}
|
||||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||
//
|
||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
||||
|
||||
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
||||
pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
|
||||
}
|
||||
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
syncNodeReplicate(pSyncNode);
|
||||
}
|
||||
|
||||
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
|
||||
syncNodeBecomeLeader(pSyncNode);
|
||||
}
|
||||
|
||||
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
|
||||
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
}
|
||||
|
||||
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
|
||||
syncNodeBecomeFollower(pSyncNode);
|
||||
}
|
||||
|
||||
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
syncNodeBecomeFollower(pSyncNode);
|
||||
}
|
||||
|
||||
// raft vote --------------
|
||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
||||
assert(term == pSyncNode->pRaftStore->currentTerm);
|
||||
assert(!raftStoreHasVoted(pSyncNode->pRaftStore));
|
||||
|
||||
raftStoreVote(pSyncNode->pRaftStore, pRaftId);
|
||||
}
|
||||
|
||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));
|
||||
|
||||
SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild();
|
||||
pMsg->srcId = pSyncNode->myRaftId;
|
||||
pMsg->destId = pSyncNode->myRaftId;
|
||||
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->voteGranted = true;
|
||||
|
||||
voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
|
||||
votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
|
||||
syncRequestVoteReplyDestroy(pMsg);
|
||||
}
|
||||
|
||||
// for debug --------------
|
||||
void syncNodePrint(SSyncNode* pObj) {
|
||||
char* serialized = syncNode2Str(pObj);
|
||||
|
@ -602,102 +694,3 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
|
|||
syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// raft state change ----
|
||||
static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||
if (term > pSyncNode->pRaftStore->currentTerm) {
|
||||
raftStoreSetTerm(pSyncNode->pRaftStore, term);
|
||||
syncNodeBecomeFollower(pSyncNode);
|
||||
raftStoreClearVote(pSyncNode->pRaftStore);
|
||||
}
|
||||
}
|
||||
|
||||
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||
}
|
||||
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
int32_t electMS = syncUtilElectRandomMS();
|
||||
syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
}
|
||||
|
||||
// TLA+ Spec
|
||||
// \* Candidate i transitions to leader.
|
||||
// BecomeLeader(i) ==
|
||||
// /\ state[i] = Candidate
|
||||
// /\ votesGranted[i] \in Quorum
|
||||
// /\ state' = [state EXCEPT ![i] = Leader]
|
||||
// /\ nextIndex' = [nextIndex EXCEPT ![i] =
|
||||
// [j \in Server |-> Len(log[i]) + 1]]
|
||||
// /\ matchIndex' = [matchIndex EXCEPT ![i] =
|
||||
// [j \in Server |-> 0]]
|
||||
// /\ elections' = elections \cup
|
||||
// {[eterm |-> currentTerm[i],
|
||||
// eleader |-> i,
|
||||
// elog |-> log[i],
|
||||
// evotes |-> votesGranted[i],
|
||||
// evoterLog |-> voterLog[i]]}
|
||||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||
//
|
||||
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
||||
|
||||
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
||||
pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
|
||||
}
|
||||
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
syncNodeReplicate(pSyncNode);
|
||||
}
|
||||
|
||||
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
|
||||
syncNodeBecomeLeader(pSyncNode);
|
||||
}
|
||||
|
||||
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
|
||||
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
}
|
||||
|
||||
static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
|
||||
syncNodeBecomeFollower(pSyncNode);
|
||||
}
|
||||
|
||||
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
||||
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
syncNodeBecomeFollower(pSyncNode);
|
||||
}
|
||||
|
||||
// raft vote ----
|
||||
static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
||||
assert(term == pSyncNode->pRaftStore->currentTerm);
|
||||
assert(!raftStoreHasVoted(pSyncNode->pRaftStore));
|
||||
|
||||
raftStoreVote(pSyncNode->pRaftStore, pRaftId);
|
||||
}
|
||||
|
||||
static void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));
|
||||
|
||||
SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild();
|
||||
pMsg->srcId = pSyncNode->myRaftId;
|
||||
pMsg->destId = pSyncNode->myRaftId;
|
||||
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->voteGranted = true;
|
||||
|
||||
voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
|
||||
votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
|
||||
syncRequestVoteReplyDestroy(pMsg);
|
||||
}
|
|
@ -19,15 +19,7 @@
|
|||
|
||||
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
|
||||
int32_t ret = 0;
|
||||
sTrace("<-- syncNodeOnTimeoutCb -->");
|
||||
|
||||
{
|
||||
cJSON* pJson = syncTimeout2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
|
||||
free(serialized);
|
||||
cJSON_Delete(pJson);
|
||||
}
|
||||
syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg);
|
||||
|
||||
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
|
||||
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
|
||||
|
|
Loading…
Reference in New Issue