From a781d435771a56206016a222eeb3f57389a1d1c8 Mon Sep 17 00:00:00 2001 From: cadem Date: Mon, 24 Apr 2023 10:23:43 +0800 Subject: [PATCH 1/2] fix/block-check-to-async-check --- include/libs/sync/sync.h | 1 + include/util/taoserror.h | 2 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 14 ++++++- source/dnode/mnode/impl/src/mndVgroup.c | 2 + source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 4 ++ source/libs/sync/src/syncMain.c | 44 +++++++++++++-------- 7 files changed, 50 insertions(+), 18 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 33cef538d2..e86a4f9690 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -250,6 +250,7 @@ void syncPreStop(int64_t rid); void syncPostStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncIsCatchUp(int64_t rid); +ESyncRole syncGetRole(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0d9292cc6b..297ae9de07 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -437,6 +437,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) #define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) +#define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal +#define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d7197832e5..0ddc1fcf00 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -336,13 +336,21 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr()); + dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; return -1; } + if(vnodeGetRole(pVnode->pImpl) == TAOS_SYNC_ROLE_VOTER){ + terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; + vmReleaseVnode(pMgmt, pVnode); + return -1; + } + dInfo("vgId:%d, checking node catch up", req.vgId); - if(vnodeIsCatchUp(pVnode->pImpl) != 0){ + if(vnodeIsCatchUp(pVnode->pImpl) != 1){ + terrno = TSDB_CODE_VND_NOT_CATCH_UP; + vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -365,6 +373,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); + vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -381,6 +390,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn, pReplica->port); + vmReleaseVnode(pMgmt, pVnode); return -1; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index d674db5b4b..f0bece6e5e 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1263,6 +1263,8 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_DND_ALTER_VNODE_TYPE; + action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER; + action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7dfaf1508d..828a173108 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,6 +69,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); +ESyncRole vnodeGetRole(SVnode *pVnode); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index deeb0af42a..7d41edfdd9 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -431,6 +431,10 @@ int32_t vnodeIsCatchUp(SVnode *pVnode){ return syncIsCatchUp(pVnode->sync); } +ESyncRole vnodeGetRole(SVnode *pVnode){ + return syncGetRole(pVnode->sync); +} + void vnodeStop(SVnode *pVnode) {} int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8a0a6be8fd..fd89d48363 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -580,25 +580,37 @@ int32_t syncIsCatchUp(int64_t rid) { return -1; } - while(1){ - if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || - pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || - pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ - sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, - pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, - pSyncNode->pLogBuf->matchIndex); - taosSsleep(1); - } - else{ - sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, - pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, - pSyncNode->pLogBuf->matchIndex); - break; - } + int32_t isCatchUp = 0; + if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || + pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || + pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ + sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, + pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, + pSyncNode->pLogBuf->matchIndex); + isCatchUp = 0; + } + else{ + sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, + pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, + pSyncNode->pLogBuf->matchIndex); + isCatchUp = 1; } syncNodeRelease(pSyncNode); - return 0; + return isCatchUp; +} + +ESyncRole syncGetRole(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + sError("sync Node Acquire error since %d", errno); + return -1; + } + + ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole; + + syncNodeRelease(pSyncNode); + return role; } int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { From 470fef645210a6efbaa7837318a49b7a927e01b6 Mon Sep 17 00:00:00 2001 From: cadem Date: Mon, 24 Apr 2023 13:38:55 +0800 Subject: [PATCH 2/2] fix/block check to async check for mnode --- include/dnode/mnode/mnode.h | 2 ++ include/util/taoserror.h | 2 ++ source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 5 +++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 +++- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 12 +++++++++++- source/dnode/mgmt/node_util/inc/dmUtil.h | 2 ++ source/dnode/mnode/impl/src/mndMain.c | 5 +++++ source/dnode/mnode/impl/src/mndMnode.c | 3 ++- 8 files changed, 32 insertions(+), 3 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 07a0ca952a..6c3c7497b1 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -20,6 +20,7 @@ #include "tmsg.h" #include "tmsgcb.h" #include "trpc.h" +#include "sync.h" #ifdef __cplusplus extern "C" { @@ -73,6 +74,7 @@ int32_t mndStart(SMnode *pMnode); void mndStop(SMnode *pMnode); int32_t mndIsCatchUp(SMnode *pMnode); +ESyncRole mndGetRole(SMnode *pMnode); /** * @brief Get mnode monitor info. diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 297ae9de07..b7325edf9c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -403,6 +403,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040F) #define TSDB_CODE_SNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411) +#define TSDB_CODE_MNODE_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0412) // internal +#define TSDB_CODE_MNODE_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0413) // internal // vnode // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 05b59b9865..7840528db9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -159,6 +159,10 @@ static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { return mndIsCatchUp(pMgmt->pMnode); } +static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) { + return mndGetRole(pMgmt->pMnode); +} + SMgmtFunc mmGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; mgmtFunc.openFp = mmOpen; @@ -170,6 +174,7 @@ SMgmtFunc mmGetMgmtFunc() { mgmtFunc.requiredFp = mmRequire; mgmtFunc.getHandlesFp = mmGetMsgHandles; mgmtFunc.isCatchUpFp = (NodeIsCatchUpFp)mmSyncIsCatchUp; + mgmtFunc.nodeRoleFp = (NodeRole)mmSyncGetRole; return mgmtFunc; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0ddc1fcf00..4121e14e6d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -341,7 +341,9 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if(vnodeGetRole(pVnode->pImpl) == TAOS_SYNC_ROLE_VOTER){ + ESyncRole role = vnodeGetRole(pVnode->pImpl); + dInfo("vgId:%d, checking node role:%d", req.vgId, role); + if(role == TAOS_SYNC_ROLE_VOTER){ terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; vmReleaseVnode(pMgmt, pVnode); return -1; diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index c53c21cd30..3459af1a3a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -214,9 +214,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { pWrapper = &pDnode->wrappers[ntype]; + if(pWrapper->func.nodeRoleFp != NULL){ + ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); + dInfo("node:%s, checking node role:%d", pWrapper->name, role); + if(role == TAOS_SYNC_ROLE_VOTER){ + terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; + return -1; + } + } + if(pWrapper->func.isCatchUpFp != NULL){ dInfo("node:%s, checking node catch up", pWrapper->name); - if(!(*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) == 0){ + if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ + terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 000ce81207..98ef8cd95b 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -135,6 +135,7 @@ typedef int32_t (*NodeDropFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle typedef bool (*NodeIsCatchUpFp)(void *pMgmt); +typedef bool (*NodeRole)(void *pMgmt); typedef struct { NodeOpenFp openFp; @@ -146,6 +147,7 @@ typedef struct { NodeRequireFp requiredFp; NodeGetHandlesFp getHandlesFp; NodeIsCatchUpFp isCatchUpFp; + NodeRole nodeRoleFp; } SMgmtFunc; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 13ae4a11d5..a9f52128a6 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -590,6 +590,11 @@ int32_t mndIsCatchUp(SMnode *pMnode) { return syncIsCatchUp(rid); } +ESyncRole mndGetRole(SMnode *pMnode){ + int64_t rid = pMnode->syncMgmt.sync; + return syncGetRole(rid); +} + void mndStop(SMnode *pMnode) { mndSetStop(pMnode); mndSyncStop(pMnode); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 53baf843de..5e3476859a 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -322,7 +322,8 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, .pCont = pReq, .contLen = contLen, .msgType = TDMT_DND_ALTER_MNODE_TYPE, - .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED, + .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP, + .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER, }; if (mndTransAppendRedoAction(pTrans, &action) != 0) {