enh: require command to trigger repairing vnodes on replaced disks

This commit is contained in:
Benguang Zhao 2023-10-19 18:55:12 +08:00
parent a2e0480839
commit 959f8105ee
7 changed files with 25 additions and 20 deletions

View File

@ -305,7 +305,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER;
}
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
@ -452,7 +452,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
@ -602,7 +602,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
@ -707,7 +707,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;

View File

@ -272,11 +272,11 @@ static void *vmOpenVnodeInThread(void *param) {
int32_t diskPrimary = pCfg->diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
if (terrno != TSDB_CODE_VND_NOT_EXIST) {
if (terrno != TSDB_CODE_NEED_RETRY) {
pThread->failed++;
continue;
}

View File

@ -58,7 +58,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs);
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool force);
void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);

View File

@ -202,7 +202,7 @@ typedef struct SMetaInfo {
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pReader);
// tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback, bool force);
int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
@ -267,7 +267,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
// sma
int32_t smaInit();
void smaCleanUp();
int32_t smaOpen(SVnode* pVnode, int8_t rollback);
int32_t smaOpen(SVnode* pVnode, int8_t rollback, bool force);
int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma);
int32_t smaPrepareAsyncCommit(SSma* pSma);

View File

@ -30,7 +30,7 @@ static int32_t rsmaRestore(SSma *pSma);
pKeepCfg->keepTimeOffset = 0; \
} while (0)
#define SMA_OPEN_RSMA_IMPL(v, l) \
#define SMA_OPEN_RSMA_IMPL(v, l, force) \
do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(r)) { \
@ -42,7 +42,7 @@ static int32_t rsmaRestore(SSma *pSma);
} \
code = smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
TSDB_CHECK_CODE(code, lino, _exit); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback, force) < 0) { \
code = terrno; \
TSDB_CHECK_CODE(code, lino, _exit); \
} \
@ -118,7 +118,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
return terrno;
}
int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) {
int32_t code = 0;
int32_t lino = 0;
STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
@ -139,11 +139,11 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
STsdbKeepCfg keepCfg = {0};
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
if (i == TSDB_RETENTION_L0) {
SMA_OPEN_RSMA_IMPL(pVnode, 0);
SMA_OPEN_RSMA_IMPL(pVnode, 0, force);
} else if (i == TSDB_RETENTION_L1) {
SMA_OPEN_RSMA_IMPL(pVnode, 1);
SMA_OPEN_RSMA_IMPL(pVnode, 1, force);
} else if (i == TSDB_RETENTION_L2) {
SMA_OPEN_RSMA_IMPL(pVnode, 2);
SMA_OPEN_RSMA_IMPL(pVnode, 2, force);
}
}

View File

@ -35,7 +35,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
* @param dir
* @return int
*/
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback) {
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback, bool force) {
STsdb *pTsdb = NULL;
int slen = 0;
@ -72,6 +72,11 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
goto _err;
}
if (pTsdb->pFS->fsstate == TSDB_FS_STATE_INCOMPLETE && force == false) {
terrno = TSDB_CODE_NEED_RETRY;
goto _err;
}
if (tsdbOpenCache(pTsdb) < 0) {
goto _err;
}

View File

@ -330,7 +330,7 @@ static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
return 0;
}
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) {
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool force) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
@ -350,7 +350,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
terrno = TSDB_CODE_VND_NOT_EXIST;
terrno = TSDB_CODE_NEED_RETRY;
return NULL;
}
@ -419,7 +419,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
}
// open tsdb
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force) < 0) {
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
@ -453,7 +453,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
}
// open sma
if (smaOpen(pVnode, rollback)) {
if (smaOpen(pVnode, rollback, force)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}