enh: not allow to acquire a vnode in failed mode from vmAcquireVnode
This commit is contained in:
parent
9a4222e189
commit
0a4fac7704
|
@ -84,6 +84,7 @@ typedef struct {
|
||||||
} SVnodeThread;
|
} SVnodeThread;
|
||||||
|
|
||||||
// vmInt.c
|
// vmInt.c
|
||||||
|
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||||
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
|
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||||
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||||
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
|
|
|
@ -282,7 +282,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
|
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode != NULL && !pVnode->failed) {
|
if (pVnode != NULL) {
|
||||||
dError("vgId:%d, already exist", req.vgId);
|
dError("vgId:%d, already exist", req.vgId);
|
||||||
tFreeSCreateVnodeReq(&req);
|
tFreeSCreateVnodeReq(&req);
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -291,10 +291,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pVnode == NULL || pVnode->failed);
|
int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
|
||||||
|
if (diskPrimary < 0) {
|
||||||
wrapperCfg.diskPrimary = pVnode ? pVnode->diskPrimary : vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
|
diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
|
||||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
}
|
||||||
|
wrapperCfg.diskPrimary = diskPrimary;
|
||||||
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||||
|
|
||||||
|
@ -371,7 +372,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
TMSG_INFO(pMsg->msgType));
|
TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode == NULL || pVnode->failed) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -489,7 +490,7 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
req.vgId, TMSG_INFO(pMsg->msgType));
|
req.vgId, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode == NULL || pVnode->failed) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -532,7 +533,7 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
|
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode == NULL || pVnode->failed) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -565,7 +566,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
|
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
|
||||||
req.dstVgId);
|
req.dstVgId);
|
||||||
pVnode = vmAcquireVnode(pMgmt, srcVgId);
|
pVnode = vmAcquireVnode(pMgmt, srcVgId);
|
||||||
if (pVnode == NULL || pVnode->failed) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
|
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -680,7 +681,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
||||||
if (pVnode == NULL || pVnode->failed) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
|
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
|
|
@ -19,6 +19,19 @@
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
#include "libs/function/tudf.h"
|
#include "libs/function/tudf.h"
|
||||||
|
|
||||||
|
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
|
int32_t diskId = -1;
|
||||||
|
SVnodeObj *pVnode = NULL;
|
||||||
|
|
||||||
|
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
|
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||||
|
if (pVnode != NULL) {
|
||||||
|
diskId = pVnode->diskPrimary;
|
||||||
|
}
|
||||||
|
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
|
return diskId;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
STfs *pTfs = pMgmt->pTfs;
|
STfs *pTfs = pMgmt->pTfs;
|
||||||
int32_t diskId = 0;
|
int32_t diskId = 0;
|
||||||
|
@ -79,7 +92,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||||
if (pVnode == NULL || pVnode->dropped) {
|
if (pVnode == NULL || pVnode->dropped || pVnode->failed) {
|
||||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
pVnode = NULL;
|
pVnode = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -100,6 +113,15 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
|
||||||
|
if (!ppVnode || !(*ppVnode)) return;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
taosMemoryFree(pVnode->path);
|
||||||
|
taosMemoryFree(pVnode);
|
||||||
|
ppVnode[0] = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
|
@ -134,6 +156,12 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadRwlockWrlock(&pMgmt->lock);
|
taosThreadRwlockWrlock(&pMgmt->lock);
|
||||||
|
SVnodeObj *pOld = NULL;
|
||||||
|
taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||||
|
if (pOld) {
|
||||||
|
ASSERT(pOld->failed);
|
||||||
|
vmFreeVnodeObj(&pOld);
|
||||||
|
}
|
||||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
|
|
||||||
|
@ -223,8 +251,7 @@ _closed:
|
||||||
vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
|
vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pVnode->path);
|
vmFreeVnodeObj(&pVnode);
|
||||||
taosMemoryFree(pVnode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
|
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
|
||||||
|
|
|
@ -187,7 +187,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
pHead->vgId = ntohl(pHead->vgId);
|
pHead->vgId = ntohl(pHead->vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL || pVnode->failed) {
|
if (pVnode == NULL) {
|
||||||
dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
||||||
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
||||||
terrno = (terrno != 0) ? terrno : -1;
|
terrno = (terrno != 0) ? terrno : -1;
|
||||||
|
@ -316,7 +316,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||||
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
int32_t size = -1;
|
int32_t size = -1;
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
||||||
if (pVnode != NULL && !pVnode->failed) {
|
if (pVnode != NULL) {
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
size = taosQueueItemSize(pVnode->pWriteW.queue);
|
size = taosQueueItemSize(pVnode->pWriteW.queue);
|
||||||
|
|
Loading…
Reference in New Issue