Merge branch 'feature/sync-mnode-integration' of https://github.com/taosdata/TDengine into feature/sync-mnode-integration
This commit is contained in:
commit
e0d6d47930
|
@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
dTrace("msg:%p, get from mnode-sync queue", pMsg);
|
||||
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
|
||||
int32_t code = mndProcessSyncMsg(pMsg);
|
||||
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
|
|
|
@ -17,7 +17,13 @@
|
|||
#include "mndSync.h"
|
||||
#include "mndTrans.h"
|
||||
|
||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
|
||||
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||
}
|
||||
|
||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ typedef struct SVotesGranted {
|
|||
|
||||
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode);
|
||||
void voteGrantedDestroy(SVotesGranted *pVotesGranted);
|
||||
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode);
|
||||
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
|
||||
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
|
||||
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
|
||||
|
@ -65,6 +66,7 @@ typedef struct SVotesRespond {
|
|||
|
||||
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
|
||||
void votesRespondDestory(SVotesRespond *pVotesRespond);
|
||||
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode);
|
||||
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
|
||||
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
|
||||
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
|
||||
|
|
|
@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
||||
bool hit = false;
|
||||
for (int i = 0; i < newConfig->replicaNum; ++i) {
|
||||
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 &&
|
||||
pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) {
|
||||
newConfig->myIndex = i;
|
||||
hit = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
ASSERT(hit == true);
|
||||
|
||||
pSyncNode->pRaftCfg->cfg = *newConfig;
|
||||
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
|
||||
ASSERT(ret == 0);
|
||||
|
@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
|||
|
||||
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
|
||||
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
|
||||
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
|
||||
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
|
||||
|
||||
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
|
||||
}
|
||||
|
|
|
@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
|
|||
}
|
||||
}
|
||||
|
||||
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
|
||||
pVotesGranted->replicas = &(pSyncNode->replicasId);
|
||||
pVotesGranted->replicaNum = pSyncNode->replicaNum;
|
||||
voteGrantedClearVotes(pVotesGranted);
|
||||
|
||||
pVotesGranted->term = 0;
|
||||
pVotesGranted->quorum = pSyncNode->quorum;
|
||||
pVotesGranted->toLeader = false;
|
||||
pVotesGranted->pSyncNode = pSyncNode;
|
||||
}
|
||||
|
||||
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
|
||||
bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
|
||||
return ret;
|
||||
|
@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) {
|
|||
}
|
||||
}
|
||||
|
||||
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
|
||||
pVotesRespond->replicas = &(pSyncNode->replicasId);
|
||||
pVotesRespond->replicaNum = pSyncNode->replicaNum;
|
||||
pVotesRespond->term = 0;
|
||||
pVotesRespond->pSyncNode = pSyncNode;
|
||||
}
|
||||
|
||||
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
|
||||
bool ret = false;
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
|
|
Loading…
Reference in New Issue