From 0a4fac77049b237a6458e97da3b0bbc3328e159f Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 23 Nov 2023 09:37:37 +0800 Subject: [PATCH] enh: not allow to acquire a vnode in failed mode from vmAcquireVnode --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 21 ++++++------- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 33 +++++++++++++++++++-- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 4 +-- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 34f2b5c446..9e3039d73f 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -84,6 +84,7 @@ typedef struct { } SVnodeThread; // vmInt.c +int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId); int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId); SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 64bf875a8e..d673da4be2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -282,7 +282,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode != NULL && !pVnode->failed) { + if (pVnode != NULL) { dError("vgId:%d, already exist", req.vgId); tFreeSCreateVnodeReq(&req); vmReleaseVnode(pMgmt, pVnode); @@ -291,10 +291,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } - ASSERT(pVnode == NULL || pVnode->failed); - - wrapperCfg.diskPrimary = pVnode ? pVnode->diskPrimary : vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId); - int32_t diskPrimary = wrapperCfg.diskPrimary; + int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId); + if (diskPrimary < 0) { + diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId); + } + wrapperCfg.diskPrimary = diskPrimary; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); @@ -371,7 +372,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { TMSG_INFO(pMsg->msgType)); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL || pVnode->failed) { + if (pVnode == NULL) { dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; if (pVnode) vmReleaseVnode(pMgmt, pVnode); @@ -489,7 +490,7 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.vgId, TMSG_INFO(pMsg->msgType)); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL || pVnode->failed) { + if (pVnode == NULL) { dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; if (pVnode) vmReleaseVnode(pMgmt, pVnode); @@ -532,7 +533,7 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL || pVnode->failed) { + if (pVnode == NULL) { dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; if (pVnode) vmReleaseVnode(pMgmt, pVnode); @@ -565,7 +566,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd, req.dstVgId); pVnode = vmAcquireVnode(pMgmt, srcVgId); - if (pVnode == NULL || pVnode->failed) { + if (pVnode == NULL) { dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; if (pVnode) vmReleaseVnode(pMgmt, pVnode); @@ -680,7 +681,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); - if (pVnode == NULL || pVnode->failed) { + if (pVnode == NULL) { dError("vgId:%d, failed to alter replica since %s", vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; if (pVnode) vmReleaseVnode(pMgmt, pVnode); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 21b791eb4d..7cb31b6e5d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -19,6 +19,19 @@ #include "vnd.h" #include "libs/function/tudf.h" +int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { + int32_t diskId = -1; + SVnodeObj *pVnode = NULL; + + taosThreadRwlockRdlock(&pMgmt->lock); + taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); + if (pVnode != NULL) { + diskId = pVnode->diskPrimary; + } + taosThreadRwlockUnlock(&pMgmt->lock); + return diskId; +} + int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { STfs *pTfs = pMgmt->pTfs; int32_t diskId = 0; @@ -79,7 +92,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { taosThreadRwlockRdlock(&pMgmt->lock); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); - if (pVnode == NULL || pVnode->dropped) { + if (pVnode == NULL || pVnode->dropped || pVnode->failed) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; pVnode = NULL; } else { @@ -100,6 +113,15 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockUnlock(&pMgmt->lock); } +static void vmFreeVnodeObj(SVnodeObj **ppVnode) { + if (!ppVnode || !(*ppVnode)) return; + + SVnodeObj *pVnode = *ppVnode; + taosMemoryFree(pVnode->path); + taosMemoryFree(pVnode); + ppVnode[0] = NULL; +} + int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { @@ -134,6 +156,12 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } taosThreadRwlockWrlock(&pMgmt->lock); + SVnodeObj *pOld = NULL; + taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (pOld) { + ASSERT(pOld->failed); + vmFreeVnodeObj(&pOld); + } int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosThreadRwlockUnlock(&pMgmt->lock); @@ -223,8 +251,7 @@ _closed: vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs); } - taosMemoryFree(pVnode->path); - taosMemoryFree(pVnode); + vmFreeVnodeObj(&pVnode); } static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 4b18ec4fb0..d80bc62c47 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -187,7 +187,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp pHead->vgId = ntohl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL || pVnode->failed) { + if (pVnode == NULL) { dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); terrno = (terrno != 0) ? terrno : -1; @@ -316,7 +316,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t size = -1; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); - if (pVnode != NULL && !pVnode->failed) { + if (pVnode != NULL) { switch (qtype) { case WRITE_QUEUE: size = taosQueueItemSize(pVnode->pWriteW.queue);