diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 59d0c491a1..71cc2d2693 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from mnode-sync queue", pMsg); pMsg->info.node = pMgmt->pMnode; + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); + int32_t code = mndProcessSyncMsg(pMsg); dTrace("msg:%p, is freed, code:0x%x", pMsg, code); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index b502865d36..0858162959 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,7 +17,13 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index 5bc240e921..716d2f620c 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -42,6 +42,7 @@ typedef struct SVotesGranted { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); void voteGrantedDestroy(SVotesGranted *pVotesGranted); +void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); @@ -65,6 +66,7 @@ typedef struct SVotesRespond { SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); void votesRespondDestory(SVotesRespond *pVotesRespond); +void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c30a855034..4aca68fa64 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { } void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { + bool hit = false; + for (int i = 0; i < newConfig->replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) { + newConfig->myIndex = i; + hit = true; + break; + } + } + ASSERT(hit == true); + pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); ASSERT(ret == 0); @@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); + votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 733dfd05b6..1c1f0809bd 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) { } } +void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) { + pVotesGranted->replicas = &(pSyncNode->replicasId); + pVotesGranted->replicaNum = pSyncNode->replicaNum; + voteGrantedClearVotes(pVotesGranted); + + pVotesGranted->term = 0; + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; +} + bool voteGrantedMajority(SVotesGranted *pVotesGranted) { bool ret = pVotesGranted->votes >= pVotesGranted->quorum; return ret; @@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) { } } +void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) { + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; +} + bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool ret = false; for (int i = 0; i < pVotesRespond->replicaNum; ++i) {