From 46e55ba9fb6a22d273647d25332f38061ad5c5d2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Nov 2022 20:04:55 +0800 Subject: [PATCH] refact: remove config change codes --- include/util/taoserror.h | 2 +- source/libs/sync/inc/syncInt.h | 1 - source/libs/sync/inc/syncTools.h | 2 - source/libs/sync/src/syncMain.c | 127 ++++------------------------- source/libs/sync/src/syncRaftCfg.c | 12 --- source/libs/sync/src/syncUtil.c | 53 +++++++++--- 6 files changed, 56 insertions(+), 141 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 092ede2281..447a90447a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -398,7 +398,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C) #define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D) #define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E) -#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F) +#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F) // internal #define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910) #define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911) #define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index df279d824c..6b6f14da00 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -278,7 +278,6 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); -char* syncNodePeerState2Str(const SSyncNode* pSyncNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index 87bd9cb13d..f7a9c404c1 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -26,8 +26,6 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -char* sync2SimpleStr(int64_t rid); - // for compatibility, the same as syncPropose int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bcca19b907..e9facfb381 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -83,11 +83,10 @@ void syncStop(int64_t rid) { void syncPreStop(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return; - - syncNodePreClose(pSyncNode); - - syncNodeRelease(pSyncNode); + if (pSyncNode != NULL) { + syncNodePreClose(pSyncNode); + syncNodeRelease(pSyncNode); + } } static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { @@ -217,48 +216,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { return minMatchIndex; } -char* syncNodePeerState2Str(const SSyncNode* pSyncNode) { - int32_t len = 128; - int32_t useLen = 0; - int32_t leftLen = len - useLen; - char* pStr = taosMemoryMalloc(len); - memset(pStr, 0, len); - - char* p = pStr; - int32_t use = snprintf(p, leftLen, "{"); - useLen += use; - leftLen -= use; - - for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { - SPeerState* pState = syncNodeGetPeerState((SSyncNode*)pSyncNode, &(pSyncNode->replicasId[i])); - if (pState == NULL) { - sError("vgId:%d, replica maybe dropped", pSyncNode->vgId); - break; - } - - p = pStr + useLen; - use = snprintf(p, leftLen, "%d:%" PRId64 " ,%" PRId64, i, pState->lastSendIndex, pState->lastSendTime); - useLen += use; - leftLen -= use; - } - - p = pStr + useLen; - use = snprintf(p, leftLen, "}"); - useLen += use; - leftLen -= use; - - // sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr); - - return pStr; -} - int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); + if (pSyncNode == NULL) return -1; + int32_t code = 0; if (syncNodeIsMnode(pSyncNode)) { @@ -366,19 +327,14 @@ _DEL_WAL: int32_t syncEndSnapshot(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); + if (pSyncNode == NULL) return -1; int32_t code = 0; if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr()); - + sNError(pSyncNode, "wal snapshot end error since:%s", terrstr()); syncNodeRelease(pSyncNode); return -1; } else { @@ -393,25 +349,16 @@ int32_t syncEndSnapshot(int64_t rid) { int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); + if (pSyncNode == NULL) return -1; syncNodeStepDown(pSyncNode, newTerm); - syncNodeRelease(pSyncNode); return 0; } bool syncIsReadyForRead(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return false; - } - ASSERT(rid == pSyncNode->rid); + if (pSyncNode == NULL) return -1; if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) { syncNodeRelease(pSyncNode); @@ -428,7 +375,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; @@ -469,8 +416,6 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { } int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { - int32_t ret = 0; - if (pSyncNode->replicaNum == 1) { sDebug("only one replica, cannot leader transfer"); terrno = TSDB_CODE_SYN_ONE_REPLICA; @@ -488,42 +433,10 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); syncLeaderTransferDestroy(pMsg); - ret = syncNodePropose(pSyncNode, &rpcMsg, false); + int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false); return ret; } -bool syncCanLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return false; - } - ASSERT(rid == pSyncNode->rid); - - if (pSyncNode->replicaNum == 1) { - syncNodeRelease(pSyncNode); - return false; - } - - if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - syncNodeRelease(pSyncNode); - return true; - } - - bool matchOK = true; - if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - SyncIndex myCommitIndex = pSyncNode->commitIndex; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]); - if (peerMatchIndex < myCommitIndex) { - matchOK = false; - } - } - } - - syncNodeRelease(pSyncNode); - return matchOK; -} - int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) { int32_t ret = syncPropose(rid, pMsg, isWeak); return ret; @@ -706,7 +619,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { memset(pEpSet, 0, sizeof(*pEpSet)); return; } - ASSERT(rid == pSyncNode->rid); + pEpSet->numOfEps = 0; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); @@ -722,6 +635,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } + static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) { SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); @@ -732,19 +646,6 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); } -char* sync2SimpleStr(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid); - return NULL; - } - - char* s = syncNode2SimpleStr(pSyncNode); - syncNodeRelease(pSyncNode); - - return s; -} - int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) return -1; diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index d399e7903b..66c399d15a 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -249,18 +249,6 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { return serialized; } -void syncCfg2SimpleStr(const SSyncCfg *pCfg, char *buf, int32_t bufLen) { - int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex); - - for (int32_t i = 0; i < pCfg->replicaNum; ++i) { - if (i < pCfg->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); - } else { - len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); - } - } -} - int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { memset(pSyncCfg, 0, sizeof(SSyncCfg)); // cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg"); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index e22e0d6a47..31c649b2f3 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -202,6 +202,35 @@ bool syncUtilUserRollback(tmsg_t msgType) { return false; } +void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { + int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex); + + for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + if (i < pCfg->replicaNum - 1) { + len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); + } else { + len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); + } + } +} + +static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int32_t len = 1; + + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i])); + if (pState == NULL) break; + + if (i < pSyncNode->replicaNum - 1) { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex, + pState->lastSendTime); + } else { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex, + pState->lastSendTime); + } + } +} + void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; @@ -220,7 +249,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo char cfgStr[1024]; syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); - char* pPeerStateStr = syncNodePeerState2Str(pNode); + char peerStr[1024] = "{"; + syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + int32_t quorum = syncNodeDynamicQuorum(pNode); char eventLog[512]; // {0}; @@ -239,9 +270,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, pPeerStateStr, cfgStr); - - taosMemoryFree(pPeerStateStr); + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, @@ -264,7 +293,9 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla char cfgStr[1024]; syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); - char* pPeerStateStr = syncNodePeerState2Str(pNode); + char peerStr[1024] = "{"; + syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + int32_t quorum = syncNodeDynamicQuorum(pNode); SRaftId destId = pNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -291,9 +322,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, pPeerStateStr, cfgStr); - - taosMemoryFree(pPeerStateStr); + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, @@ -316,7 +345,9 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df char cfgStr[1024]; syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); - char* pPeerStateStr = syncNodePeerState2Str(pNode); + char peerStr[1024] = "{"; + syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + int32_t quorum = syncNodeDynamicQuorum(pNode); SRaftId fromId = pReceiver->fromId; char host[128]; @@ -344,7 +375,5 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, pPeerStateStr, cfgStr); - - taosMemoryFree(pPeerStateStr); + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); }