From a95f6e686287eea552faccae5d4ef216b29c1864 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 11 Oct 2023 20:49:43 +0800 Subject: [PATCH] feat: support restore dnode with vnodes of replaced disks for primary dirs --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 37 +++++++++++++-------- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 32 +++++++++++++----- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 10 +++--- source/dnode/vnode/src/vnd/vnodeOpen.c | 7 +++- 5 files changed, 60 insertions(+), 27 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index cddf132bce..34f2b5c446 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -56,6 +56,7 @@ typedef struct { int32_t vgVersion; int32_t refCount; int8_t dropped; + int8_t failed; int8_t disable; int32_t diskPrimary; int32_t toVgId; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index b4fe824466..3d7f2b9e9e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -30,9 +30,11 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { if (ppVnode == NULL || *ppVnode == NULL) continue; SVnodeObj *pVnode = *ppVnode; - SVnodeLoad vload = {0}; - vnodeGetLoad(pVnode->pImpl, &vload); - if (isReset) vnodeResetLoad(pVnode->pImpl, &vload); + SVnodeLoad vload = {.vgId = pVnode->vgId}; + if (!pVnode->failed) { + vnodeGetLoad(pVnode->pImpl, &vload); + if (isReset) vnodeResetLoad(pVnode->pImpl, &vload); + } taosArrayPush(pInfo->pVloads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } @@ -52,9 +54,11 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { if (ppVnode == NULL || *ppVnode == NULL) continue; SVnodeObj *pVnode = *ppVnode; - SVnodeLoadLite vload = {0}; - if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) { - taosArrayPush(pInfo->pVloads, &vload); + if (!pVnode->failed) { + SVnodeLoadLite vload = {0}; + if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) { + taosArrayPush(pInfo->pVloads, &vload); + } } pIter = taosHashIterate(pMgmt->hash, pIter); } @@ -278,7 +282,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode != NULL) { + if (pVnode != NULL && !pVnode->failed) { dError("vgId:%d, already exist", req.vgId); tFreeSCreateVnodeReq(&req); vmReleaseVnode(pMgmt, pVnode); @@ -287,7 +291,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } - wrapperCfg.diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId); + ASSERT(pVnode == NULL || pVnode->failed); + + wrapperCfg.diskPrimary = pVnode ? pVnode->diskPrimary : vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId); int32_t diskPrimary = wrapperCfg.diskPrimary; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); @@ -364,9 +370,10 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { TMSG_INFO(pMsg->msgType)); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL) { + if (pVnode == NULL || pVnode->failed) { dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; + if (pVnode) vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -481,9 +488,10 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.vgId, TMSG_INFO(pMsg->msgType)); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL) { + if (pVnode == NULL || pVnode->failed) { dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; + if (pVnode) vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -523,9 +531,10 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL) { + if (pVnode == NULL || pVnode->failed) { dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; + if (pVnode) vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -555,9 +564,10 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd, req.dstVgId); pVnode = vmAcquireVnode(pMgmt, srcVgId); - if (pVnode == NULL) { + if (pVnode == NULL || pVnode->failed) { dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; + if (pVnode) vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -669,9 +679,10 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); - if (pVnode == NULL) { + if (pVnode == NULL || pVnode->failed) { dError("vgId:%d, failed to alter replica since %s", vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; + if (pVnode) vmReleaseVnode(pMgmt, pVnode); return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 963bfa3197..973c45eda7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -112,6 +112,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->diskPrimary = pCfg->diskPrimary; pVnode->refCount = 0; pVnode->dropped = 0; + pVnode->failed = 0; pVnode->path = taosStrdup(pCfg->path); pVnode->pImpl = pImpl; @@ -121,11 +122,15 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - if (vmAllocQueue(pMgmt, pVnode) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pVnode->path); - taosMemoryFree(pVnode); - return -1; + if (pImpl) { + if (vmAllocQueue(pMgmt, pVnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pVnode->path); + taosMemoryFree(pVnode); + return -1; + } + } else { + pVnode->failed = 1; } taosThreadRwlockWrlock(&pMgmt->lock); @@ -271,8 +276,10 @@ static void *vmOpenVnodeInThread(void *param) { if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr()); - pThread->failed++; - continue; + if (terrno != TSDB_CODE_VND_NOT_EXIST) { + pThread->failed++; + continue; + } } if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) { @@ -379,6 +386,7 @@ static void *vmCloseVnodeInThread(void *param) { for (int32_t v = 0; v < pThread->vnodeNum; ++v) { SVnodeObj *pVnode = pThread->ppVnodes[v]; + if (pVnode->failed) continue; char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId, @@ -473,7 +481,9 @@ static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) { if (ppVnodes != NULL) { for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = ppVnodes[i]; - vnodeSyncCheckTimeout(pVnode->pImpl); + if (!pVnode->failed) { + vnodeSyncCheckTimeout(pVnode->pImpl); + } vmReleaseVnode(pMgmt, pVnode); } taosMemoryFree(ppVnodes); @@ -605,6 +615,12 @@ static void *vmRestoreVnodeInThread(void *param) { for (int32_t v = 0; v < pThread->vnodeNum; ++v) { SVnodeObj *pVnode = pThread->ppVnodes[v]; + if (pVnode->failed) { + dError("vgId:%d, skip restoring vnode in failure mode.", pVnode->vgId); + continue; + } + + ASSERT(pVnode->pImpl); char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId, diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 696107ca90..4b18ec4fb0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -187,9 +187,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp pHead->vgId = ntohl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL) { - dGWarn("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, - terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); + if (pVnode == NULL || pVnode->failed) { + dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, + terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); terrno = (terrno != 0) ? terrno : -1; return terrno; } @@ -316,7 +316,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t size = -1; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); - if (pVnode != NULL) { + if (pVnode != NULL && !pVnode->failed) { switch (qtype) { case WRITE_QUEUE: size = taosQueueItemSize(pVnode->pWriteW.queue); @@ -339,8 +339,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { default: break; } - vmReleaseVnode(pMgmt, pVnode); } + if (pVnode) vmReleaseVnode(pMgmt, pVnode); if (size < 0) { dTrace("vgId:%d, can't get size from queue since %s, qtype:%d", vgId, terrstr(), qtype); size = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 28d1e171d8..f9499cda6d 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -329,6 +329,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC char dir[TSDB_FILENAME_LEN] = {0}; char tdir[TSDB_FILENAME_LEN * 2] = {0}; int32_t ret = 0; + terrno = TSDB_CODE_SUCCESS; if (vnodeCheckDisk(diskPrimary, pTfs)) { vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary); @@ -342,6 +343,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC ret = vnodeLoadInfo(dir, &info); if (ret < 0) { vError("failed to open vnode from %s since %s", path, tstrerror(terrno)); + terrno = TSDB_CODE_VND_NOT_EXIST; return NULL; } @@ -514,7 +516,10 @@ void vnodeClose(SVnode *pVnode) { } // start the sync timer after the queue is ready -int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); } +int32_t vnodeStart(SVnode *pVnode) { + ASSERT(pVnode); + return vnodeSyncStart(pVnode); +} int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }