enh(sync): syncStartStandBy
This commit is contained in:
parent
f99b20aaa9
commit
94438de0ec
|
@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
dTrace("msg:%p, get from mnode-sync queue", pMsg);
|
dTrace("msg:%p, get from mnode-sync queue", pMsg);
|
||||||
|
|
||||||
pMsg->info.node = pMgmt->pMnode;
|
pMsg->info.node = pMgmt->pMnode;
|
||||||
|
|
||||||
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
|
pHead->contLen = ntohl(pHead->contLen);
|
||||||
|
pHead->vgId = ntohl(pHead->vgId);
|
||||||
|
|
||||||
int32_t code = mndProcessSyncMsg(pMsg);
|
int32_t code = mndProcessSyncMsg(pMsg);
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
|
|
|
@ -17,7 +17,13 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
|
|
||||||
|
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
|
|
||||||
|
|
|
@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
||||||
|
bool hit = false;
|
||||||
|
for (int i = 0; i < newConfig->replicaNum; ++i) {
|
||||||
|
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 &&
|
||||||
|
pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) {
|
||||||
|
newConfig->myIndex = i;
|
||||||
|
hit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(hit == true);
|
||||||
|
|
||||||
pSyncNode->pRaftCfg->cfg = *newConfig;
|
pSyncNode->pRaftCfg->cfg = *newConfig;
|
||||||
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
|
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
Loading…
Reference in New Issue