diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 07a0ca952a..6c3c7497b1 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -20,6 +20,7 @@ #include "tmsg.h" #include "tmsgcb.h" #include "trpc.h" +#include "sync.h" #ifdef __cplusplus extern "C" { @@ -73,6 +74,7 @@ int32_t mndStart(SMnode *pMnode); void mndStop(SMnode *pMnode); int32_t mndIsCatchUp(SMnode *pMnode); +ESyncRole mndGetRole(SMnode *pMnode); /** * @brief Get mnode monitor info. diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 33cef538d2..e86a4f9690 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -250,6 +250,7 @@ void syncPreStop(int64_t rid); void syncPostStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncIsCatchUp(int64_t rid); +ESyncRole syncGetRole(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0d9292cc6b..b7325edf9c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -403,6 +403,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040F) #define TSDB_CODE_SNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411) +#define TSDB_CODE_MNODE_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0412) // internal +#define TSDB_CODE_MNODE_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0413) // internal // vnode // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x @@ -437,6 +439,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) #define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) +#define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal +#define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 05b59b9865..7840528db9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -159,6 +159,10 @@ static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { return mndIsCatchUp(pMgmt->pMnode); } +static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) { + return mndGetRole(pMgmt->pMnode); +} + SMgmtFunc mmGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; mgmtFunc.openFp = mmOpen; @@ -170,6 +174,7 @@ SMgmtFunc mmGetMgmtFunc() { mgmtFunc.requiredFp = mmRequire; mgmtFunc.getHandlesFp = mmGetMsgHandles; mgmtFunc.isCatchUpFp = (NodeIsCatchUpFp)mmSyncIsCatchUp; + mgmtFunc.nodeRoleFp = (NodeRole)mmSyncGetRole; return mgmtFunc; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9802930418..9dbc12cf62 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -336,13 +336,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr()); + dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; return -1; } + ESyncRole role = vnodeGetRole(pVnode->pImpl); + dInfo("vgId:%d, checking node role:%d", req.vgId, role); + if(role == TAOS_SYNC_ROLE_VOTER){ + terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; + vmReleaseVnode(pMgmt, pVnode); + return -1; + } + dInfo("vgId:%d, checking node catch up", req.vgId); - if(vnodeIsCatchUp(pVnode->pImpl) != 0){ + if(vnodeIsCatchUp(pVnode->pImpl) != 1){ + terrno = TSDB_CODE_VND_NOT_CATCH_UP; + vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -365,6 +375,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); + vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -381,6 +392,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn, pReplica->port); + vmReleaseVnode(pMgmt, pVnode); return -1; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index c53c21cd30..3459af1a3a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -214,9 +214,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { pWrapper = &pDnode->wrappers[ntype]; + if(pWrapper->func.nodeRoleFp != NULL){ + ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); + dInfo("node:%s, checking node role:%d", pWrapper->name, role); + if(role == TAOS_SYNC_ROLE_VOTER){ + terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; + return -1; + } + } + if(pWrapper->func.isCatchUpFp != NULL){ dInfo("node:%s, checking node catch up", pWrapper->name); - if(!(*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) == 0){ + if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ + terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 000ce81207..98ef8cd95b 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -135,6 +135,7 @@ typedef int32_t (*NodeDropFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle typedef bool (*NodeIsCatchUpFp)(void *pMgmt); +typedef bool (*NodeRole)(void *pMgmt); typedef struct { NodeOpenFp openFp; @@ -146,6 +147,7 @@ typedef struct { NodeRequireFp requiredFp; NodeGetHandlesFp getHandlesFp; NodeIsCatchUpFp isCatchUpFp; + NodeRole nodeRoleFp; } SMgmtFunc; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 13ae4a11d5..a9f52128a6 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -590,6 +590,11 @@ int32_t mndIsCatchUp(SMnode *pMnode) { return syncIsCatchUp(rid); } +ESyncRole mndGetRole(SMnode *pMnode){ + int64_t rid = pMnode->syncMgmt.sync; + return syncGetRole(rid); +} + void mndStop(SMnode *pMnode) { mndSetStop(pMnode); mndSyncStop(pMnode); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 53baf843de..5e3476859a 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -322,7 +322,8 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, .pCont = pReq, .contLen = contLen, .msgType = TDMT_DND_ALTER_MNODE_TYPE, - .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED, + .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP, + .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER, }; if (mndTransAppendRedoAction(pTrans, &action) != 0) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index d674db5b4b..f0bece6e5e 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1263,6 +1263,8 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_DND_ALTER_VNODE_TYPE; + action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER; + action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7dfaf1508d..828a173108 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,6 +69,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); +ESyncRole vnodeGetRole(SVnode *pVnode); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index deeb0af42a..7d41edfdd9 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -431,6 +431,10 @@ int32_t vnodeIsCatchUp(SVnode *pVnode){ return syncIsCatchUp(pVnode->sync); } +ESyncRole vnodeGetRole(SVnode *pVnode){ + return syncGetRole(pVnode->sync); +} + void vnodeStop(SVnode *pVnode) {} int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e3342d76ee..19bb126cb6 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -580,25 +580,37 @@ int32_t syncIsCatchUp(int64_t rid) { return -1; } - while(1){ - if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || - pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || - pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ - sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, - pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, - pSyncNode->pLogBuf->matchIndex); - taosSsleep(1); - } - else{ - sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, - pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, - pSyncNode->pLogBuf->matchIndex); - break; - } + int32_t isCatchUp = 0; + if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || + pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || + pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ + sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, + pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, + pSyncNode->pLogBuf->matchIndex); + isCatchUp = 0; + } + else{ + sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, + pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, + pSyncNode->pLogBuf->matchIndex); + isCatchUp = 1; } syncNodeRelease(pSyncNode); - return 0; + return isCatchUp; +} + +ESyncRole syncGetRole(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + sError("sync Node Acquire error since %d", errno); + return -1; + } + + ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole; + + syncNodeRelease(pSyncNode); + return role; } int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {