diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d67b419b24..2b62f9b8b3 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -121,14 +121,17 @@ typedef struct SSyncNode { // init internal SNodeInfo me; + SRaftId raftId; + int32_t peersNum; SNodeInfo peers[TSDB_MAX_REPLICA]; + SRaftId peersId[TSDB_MAX_REPLICA]; + + int32_t replicaNum; + SRaftId replicasId[TSDB_MAX_REPLICA]; // raft algorithm SSyncFSM* pFsm; - SRaftId raftId; - SRaftId peersId[TSDB_MAX_REPLICA]; - int32_t replicaNum; int32_t quorum; // life cycle diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 93d2c12525..e1078d5738 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); + // ---- SSyncBuffer ---- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); void syncUtilbufDestroy(SSyncBuffer* syncBuf); @@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf); void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); -#endif #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index b841f2e316..e2307e9e66 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -24,13 +24,36 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" #include "taosdef.h" typedef struct SVotesGranted { + SyncTerm term; + int32_t quorum; + int32_t votes; + bool toLeader; + SSyncNode *pSyncNode; } SVotesGranted; -typedef struct SVotesResponded { -} SVotesResponded; +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); +void voteGrantedDestroy(SVotesGranted *pVotesGranted); +bool voteGrantedMajority(SVotesGranted *pVotesGranted); +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); + +typedef struct SVotesRespond { + SRaftId (*replicas)[TSDB_MAX_REPLICA]; + bool isRespond[TSDB_MAX_REPLICA]; + int32_t replicaNum; + SyncTerm term; + SSyncNode *pSyncNode; +} SVotesRespond; + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); +void Reset(SVotesRespond *pVotesRespond, SyncTerm term); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5f3a276a68..6a0663dd57 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -25,7 +25,6 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); -static void syncNodePingTimerCb(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); @@ -358,25 +357,6 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { return ret; } -static void syncNodePingTimerCb(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerEnable)) { - ++(pSyncNode->pingTimerCounter); - - sTrace( - "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " - "tmrId:%p ", - pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); - - syncNodePingAll(pSyncNode); - - taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); - } else { - sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); - } -} - static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b4959a810b..7b4d6ee366 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft raftId->vgId = vgId; } +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { + bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId; + return ret; +} + // ---- SSyncBuffer ----- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { syncBuf->len = len; syncBuf->data = malloc(syncBuf->len); @@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { dest->data = malloc(dest->len); memcpy(dest->data, src->data, dest->len); } -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 02cf4ac033..c9f0ceab57 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -14,3 +14,83 @@ */ #include "syncVoteMgr.h" + +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { + SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted)); + assert(pVotesGranted != NULL); + memset(pVotesGranted, 0, sizeof(SVotesGranted)); + + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->term = 0; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; + + return pVotesGranted; +} + +void voteGrantedDestroy(SVotesGranted *pVotesGranted) { + if (pVotesGranted != NULL) { + free(pVotesGranted); + } +} + +bool voteGrantedMajority(SVotesGranted *pVotesGranted) { + bool ret = pVotesGranted->votes >= pVotesGranted->quorum; + return ret; +} + +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { + assert(pMsg->voteGranted == true); + assert(pMsg->term == pVotesGranted->term); + pVotesGranted->votes++; +} + +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { + pVotesGranted->term = term; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; +} + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { + SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); + assert(pVotesRespond != NULL); + memset(pVotesRespond, 0, sizeof(SVotesRespond)); + + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; + + return pVotesRespond; +} + +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { + bool ret = false; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) { + ret = true; + break; + } + } + return ret; +} + +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { + assert(pVotesRespond->term == pMsg->term); + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) { + assert(pVotesRespond->isRespond[i] == false); + pVotesRespond->isRespond[i] = true; + return; + } + } + assert(0); +} + +void Reset(SVotesRespond *pVotesRespond, SyncTerm term) { + pVotesRespond->term = term; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + pVotesRespond->isRespond[i] = false; + } +} \ No newline at end of file