From 4e1aa6b5c5fdca03ad2a593a1b30e29be0d1d3da Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 26 May 2022 14:21:24 +0800 Subject: [PATCH] refactor: config change --- include/libs/sync/sync.h | 3 ++ source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 41 +++++++++++++++---- source/libs/sync/src/syncCommit.c | 41 +++++++++++++++---- source/libs/sync/src/syncMain.c | 33 +++++++-------- .../libs/sync/test/syncConfigChangeTest.cpp | 14 ++++--- 6 files changed, 97 insertions(+), 37 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 6635bdbb96..08640e25c2 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -88,6 +88,9 @@ typedef struct SReConfigCbMeta { SyncIndex index; SyncTerm term; SyncTerm currentTerm; + SSyncCfg oldCfg; + bool isDrop; + uint64_t flag; } SReConfigCbMeta; typedef struct SSyncFSM { diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 69549d2a7e..2e71745f61 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -182,7 +182,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4899839b29..b0c33ba347 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -333,7 +333,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.seqNum = pEntry->seqNum; cbMeta.term = pEntry->term; cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.flag = 9; + cbMeta.flag = 0x11; bool needExecute = true; if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { @@ -347,24 +347,51 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // config change if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; + SSyncCfg newSyncCfg; int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); ASSERT(ret == 0); - syncNodeUpdateConfig(ths, &newSyncCfg); - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); - } else { - syncNodeBecomeFollower(ths); + // update new config myIndex + bool hit = false; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + hit = true; + break; + } + } + ASSERT(hit == true); + + bool isDrop; + syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop); + + // change isStandBy to normal + if (!isDrop) { + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } } - // maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + if (ths->pFsm->FpReConfigCb != NULL) { SReConfigCbMeta cbMeta = {0}; cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; cbMeta.term = pEntry->term; + cbMeta.oldCfg = oldSyncCfg; + cbMeta.flag = 0x11; + cbMeta.isDrop = isDrop; ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index d8f693da6e..a9b055820b 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -111,7 +111,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.seqNum = pEntry->seqNum; cbMeta.term = pEntry->term; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - cbMeta.flag = 7; + cbMeta.flag = 0x1; bool needExecute = true; if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { @@ -125,24 +125,51 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // config change if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; + SSyncCfg newSyncCfg; int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); ASSERT(ret == 0); - syncNodeUpdateConfig(pSyncNode, &newSyncCfg); - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode); - } else { - syncNodeBecomeFollower(pSyncNode); + // update new config myIndex + bool hit = false; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + hit = true; + break; + } + } + ASSERT(hit == true); + + bool isDrop; + syncNodeUpdateConfig(pSyncNode, &newSyncCfg, &isDrop); + + // change isStandBy to normal + if (!isDrop) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode); + } } - // maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x1 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + if (pSyncNode->pFsm->FpReConfigCb != NULL) { SReConfigCbMeta cbMeta = {0}; cbMeta.code = 0; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; cbMeta.index = pEntry->index; cbMeta.term = pEntry->term; + cbMeta.oldCfg = oldSyncCfg; + cbMeta.flag = 0x1; + cbMeta.isDrop = isDrop; pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta); } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9b7440e48b..9e6765e1d5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -951,21 +951,9 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { - bool hit = false; - for (int i = 0; i < newConfig->replicaNum; ++i) { - if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 && - pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) { - newConfig->myIndex = i; - hit = true; - break; - } - } - ASSERT(hit == true); - +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) { pSyncNode->pRaftCfg->cfg = *newConfig; - int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); - ASSERT(ret == 0); + int32_t ret = 0; // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; @@ -995,9 +983,22 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); - pSyncNode->pRaftCfg->isStandBy = 0; - raftCfgPersist(pSyncNode->pRaftCfg); + // isDrop + *isDrop = true; + for (int i = 0; i < newConfig->replicaNum; ++i) { + if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { + *isDrop = false; + break; + } + } + if (!(*isDrop)) { + // change isStandBy to normal + pSyncNode->pRaftCfg->isStandBy = 0; + } + + raftCfgPersist(pSyncNode->pRaftCfg); syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index fa453577e7..580c4248a3 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -43,8 +43,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); @@ -54,15 +55,16 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); }