feat: support restore dnode with vnodes of replaced disks for primary dirs
This commit is contained in:
parent
e463e0690d
commit
a95f6e6862
|
@ -56,6 +56,7 @@ typedef struct {
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int8_t dropped;
|
int8_t dropped;
|
||||||
|
int8_t failed;
|
||||||
int8_t disable;
|
int8_t disable;
|
||||||
int32_t diskPrimary;
|
int32_t diskPrimary;
|
||||||
int32_t toVgId;
|
int32_t toVgId;
|
||||||
|
|
|
@ -30,9 +30,11 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
||||||
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
SVnodeObj *pVnode = *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
SVnodeLoad vload = {0};
|
SVnodeLoad vload = {.vgId = pVnode->vgId};
|
||||||
|
if (!pVnode->failed) {
|
||||||
vnodeGetLoad(pVnode->pImpl, &vload);
|
vnodeGetLoad(pVnode->pImpl, &vload);
|
||||||
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
|
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
|
||||||
|
}
|
||||||
taosArrayPush(pInfo->pVloads, &vload);
|
taosArrayPush(pInfo->pVloads, &vload);
|
||||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
}
|
}
|
||||||
|
@ -52,10 +54,12 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
SVnodeObj *pVnode = *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
if (!pVnode->failed) {
|
||||||
SVnodeLoadLite vload = {0};
|
SVnodeLoadLite vload = {0};
|
||||||
if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
|
if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
|
||||||
taosArrayPush(pInfo->pVloads, &vload);
|
taosArrayPush(pInfo->pVloads, &vload);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,7 +282,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
|
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL && !pVnode->failed) {
|
||||||
dError("vgId:%d, already exist", req.vgId);
|
dError("vgId:%d, already exist", req.vgId);
|
||||||
tFreeSCreateVnodeReq(&req);
|
tFreeSCreateVnodeReq(&req);
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -287,7 +291,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
wrapperCfg.diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
|
ASSERT(pVnode == NULL || pVnode->failed);
|
||||||
|
|
||||||
|
wrapperCfg.diskPrimary = pVnode ? pVnode->diskPrimary : vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
|
||||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||||
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||||
|
@ -364,9 +370,10 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
TMSG_INFO(pMsg->msgType));
|
TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL || pVnode->failed) {
|
||||||
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -481,9 +488,10 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
req.vgId, TMSG_INFO(pMsg->msgType));
|
req.vgId, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL || pVnode->failed) {
|
||||||
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,9 +531,10 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
|
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL || pVnode->failed) {
|
||||||
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -555,9 +564,10 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
|
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
|
||||||
req.dstVgId);
|
req.dstVgId);
|
||||||
pVnode = vmAcquireVnode(pMgmt, srcVgId);
|
pVnode = vmAcquireVnode(pMgmt, srcVgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL || pVnode->failed) {
|
||||||
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
|
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -669,9 +679,10 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL || pVnode->failed) {
|
||||||
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
|
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
|
||||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -112,6 +112,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
pVnode->diskPrimary = pCfg->diskPrimary;
|
pVnode->diskPrimary = pCfg->diskPrimary;
|
||||||
pVnode->refCount = 0;
|
pVnode->refCount = 0;
|
||||||
pVnode->dropped = 0;
|
pVnode->dropped = 0;
|
||||||
|
pVnode->failed = 0;
|
||||||
pVnode->path = taosStrdup(pCfg->path);
|
pVnode->path = taosStrdup(pCfg->path);
|
||||||
pVnode->pImpl = pImpl;
|
pVnode->pImpl = pImpl;
|
||||||
|
|
||||||
|
@ -121,12 +122,16 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pImpl) {
|
||||||
if (vmAllocQueue(pMgmt, pVnode) != 0) {
|
if (vmAllocQueue(pMgmt, pVnode) != 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pVnode->path);
|
taosMemoryFree(pVnode->path);
|
||||||
taosMemoryFree(pVnode);
|
taosMemoryFree(pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
pVnode->failed = 1;
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadRwlockWrlock(&pMgmt->lock);
|
taosThreadRwlockWrlock(&pMgmt->lock);
|
||||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||||
|
@ -271,9 +276,11 @@ static void *vmOpenVnodeInThread(void *param) {
|
||||||
|
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
|
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
|
||||||
|
if (terrno != TSDB_CODE_VND_NOT_EXIST) {
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
|
if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
|
||||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
|
@ -379,6 +386,7 @@ static void *vmCloseVnodeInThread(void *param) {
|
||||||
|
|
||||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||||
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
||||||
|
if (pVnode->failed) continue;
|
||||||
|
|
||||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
|
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
|
||||||
|
@ -473,7 +481,9 @@ static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
|
||||||
if (ppVnodes != NULL) {
|
if (ppVnodes != NULL) {
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
SVnodeObj *pVnode = ppVnodes[i];
|
SVnodeObj *pVnode = ppVnodes[i];
|
||||||
|
if (!pVnode->failed) {
|
||||||
vnodeSyncCheckTimeout(pVnode->pImpl);
|
vnodeSyncCheckTimeout(pVnode->pImpl);
|
||||||
|
}
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
}
|
}
|
||||||
taosMemoryFree(ppVnodes);
|
taosMemoryFree(ppVnodes);
|
||||||
|
@ -605,6 +615,12 @@ static void *vmRestoreVnodeInThread(void *param) {
|
||||||
|
|
||||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||||
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
||||||
|
if (pVnode->failed) {
|
||||||
|
dError("vgId:%d, skip restoring vnode in failure mode.", pVnode->vgId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pVnode->pImpl);
|
||||||
|
|
||||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
|
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
|
||||||
|
|
|
@ -187,8 +187,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
pHead->vgId = ntohl(pHead->vgId);
|
pHead->vgId = ntohl(pHead->vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL || pVnode->failed) {
|
||||||
dGWarn("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
||||||
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
||||||
terrno = (terrno != 0) ? terrno : -1;
|
terrno = (terrno != 0) ? terrno : -1;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -316,7 +316,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||||
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
int32_t size = -1;
|
int32_t size = -1;
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL && !pVnode->failed) {
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
size = taosQueueItemSize(pVnode->pWriteW.queue);
|
size = taosQueueItemSize(pVnode->pWriteW.queue);
|
||||||
|
@ -339,8 +339,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
}
|
}
|
||||||
|
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
|
||||||
if (size < 0) {
|
if (size < 0) {
|
||||||
dTrace("vgId:%d, can't get size from queue since %s, qtype:%d", vgId, terrstr(), qtype);
|
dTrace("vgId:%d, can't get size from queue since %s, qtype:%d", vgId, terrstr(), qtype);
|
||||||
size = 0;
|
size = 0;
|
||||||
|
|
|
@ -329,6 +329,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
char tdir[TSDB_FILENAME_LEN * 2] = {0};
|
char tdir[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (vnodeCheckDisk(diskPrimary, pTfs)) {
|
if (vnodeCheckDisk(diskPrimary, pTfs)) {
|
||||||
vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
|
vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
|
||||||
|
@ -342,6 +343,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
ret = vnodeLoadInfo(dir, &info);
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
|
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
|
||||||
|
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -514,7 +516,10 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start the sync timer after the queue is ready
|
// start the sync timer after the queue is ready
|
||||||
int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); }
|
int32_t vnodeStart(SVnode *pVnode) {
|
||||||
|
ASSERT(pVnode);
|
||||||
|
return vnodeSyncStart(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
|
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue