From 94438de0ec79a2180587febb449e9a150e400521 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 24 May 2022 19:45:09 +0800 Subject: [PATCH] enh(sync): syncStartStandBy --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 5 +++++ source/dnode/mnode/impl/src/mndSync.c | 8 +++++++- source/libs/sync/src/syncMain.c | 11 +++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 59d0c491a1..71cc2d2693 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from mnode-sync queue", pMsg); pMsg->info.node = pMgmt->pMnode; + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); + int32_t code = mndProcessSyncMsg(pMsg); dTrace("msg:%p, is freed, code:0x%x", pMsg, code); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 868558525a..2b4b472c35 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,7 +17,13 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c30a855034..5aa35af21e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { } void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { + bool hit = false; + for (int i = 0; i < newConfig->replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) { + newConfig->myIndex = i; + hit = true; + break; + } + } + ASSERT(hit == true); + pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); ASSERT(ret == 0);