From 20a55c34121d86cab061ec68e6c18b9a45d90c4e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 19 Jun 2023 15:09:00 +0800 Subject: [PATCH 1/4] fix: make vmProcessAlterHashRangeReq idempotent --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 12 ++++++++++-- source/dnode/mnode/impl/src/mndVgroup.c | 1 + 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index dd880a87c8..c1cc813eee 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -484,10 +484,18 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t srcVgId = req.srcVgId; int32_t dstVgId = req.dstVgId; + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId); + if (pVnode != NULL) { + dError("vgId:%d, vnode already exist", dstVgId); + vmReleaseVnode(pMgmt, pVnode); + terrno = TSDB_CODE_VND_ALREADY_EXIST; + return -1; + } + dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd, req.dstVgId); - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId); + pVnode = vmAcquireVnode(pMgmt, srcVgId); if (pVnode == NULL) { dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 36e8755a3e..fe588b8f3b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1217,6 +1217,7 @@ static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, i action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_VND_ALTER_HASHRANGE; + action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); From 090a1a1595f47c2aae4c3f9131a6c8abd6b30546 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 19 Jun 2023 15:48:32 +0800 Subject: [PATCH 2/4] enh: rename dir at the end in vnodeRenameVgroupId --- source/dnode/vnode/src/vnd/vnodeOpen.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index b5e7c6875b..c1a1e7a27a 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -130,14 +130,13 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p } int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { - int32_t ret = tfsRename(pTfs, srcPath, dstPath); - if (ret != 0) return ret; + int32_t ret = 0; char oldRname[TSDB_FILENAME_LEN] = {0}; char newRname[TSDB_FILENAME_LEN] = {0}; char tsdbPath[TSDB_FILENAME_LEN] = {0}; char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0}; - snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", dstPath, TD_DIRSEP); + snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP); snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP); STfsDir *tsdbDir = tfsOpendir(pTfs, tsdbPath); @@ -163,7 +162,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr ret = tfsRename(pTfs, tsdbFile->rname, newRname); if (ret != 0) { - vInfo("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr()); + vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr()); tfsClosedir(tsdbDir); return ret; } @@ -171,7 +170,13 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr } tfsClosedir(tsdbDir); - return 0; + + vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath); + ret = tfsRename(pTfs, srcPath, dstPath); + if (ret != 0) { + vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr()); + } + return ret; } int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { From e1d9e44fcd4fbe69c23ede71aabe7482e3d92565 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 19 Jun 2023 17:59:15 +0800 Subject: [PATCH 3/4] enh: add a field toVgId in vnodes.json for prepare alter-hashrange --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 2 ++ source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 7 +++++-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 8 ++++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 83fb331dbd..56c25d1039 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -46,6 +46,7 @@ typedef struct { int32_t vgId; int32_t vgVersion; int8_t dropped; + int32_t toVgId; char path[PATH_MAX + 20]; } SWrapperCfg; @@ -55,6 +56,7 @@ typedef struct { int32_t refCount; int8_t dropped; int8_t disable; + int32_t toVgId; char *path; SVnode *pImpl; SMultiWorker pWriteW; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index bf176ebb40..769b274f7f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg ** if (code < 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code); if (code < 0) goto _OVER; + tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code); + if (code < 0) goto _OVER; snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId); } @@ -165,6 +167,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1; + if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1; if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1; } @@ -179,7 +182,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { SVnodeObj **ppVnodes = NULL; char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); int32_t numOfVnodes = 0; @@ -226,4 +229,4 @@ _OVER: dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c1cc813eee..aff4a35110 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -509,6 +509,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { }; tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); + // prepare alter + pVnode->toVgId = dstVgId; + if (vmWriteVnodeListToFile(pMgmt) != 0) { + dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr()); + return -1; + } + dInfo("vgId:%d, close vnode", srcVgId); vmCloseVnode(pMgmt, pVnode, true); @@ -540,6 +547,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } + // complete alter if (vmWriteVnodeListToFile(pMgmt) != 0) { dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr()); return -1; From 424086e32467e574e464528d6fa1dd089dc06ea9 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 20 Jun 2023 15:40:09 +0800 Subject: [PATCH 4/4] enh: restore vgroup id in vmOpenVnodes for vnodeAlterHashRange --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 62 +++++++++++++++++++++--- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 53 +++++++++++++++++--- 4 files changed, 102 insertions(+), 15 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 56c25d1039..d1dc872f4b 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -72,6 +72,7 @@ typedef struct { int32_t vnodeNum; int32_t opened; int32_t failed; + bool updateVnodesList; int32_t threadIndex; TdThread thread; SVnodeMgmt *pMgmt; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 16e7ffc536..db46ce3ca0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -158,6 +158,28 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) taosMemoryFree(pVnode); } +static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) { + int32_t srcVgId = pCfg->vgId; + int32_t dstVgId = pCfg->toVgId; + if (dstVgId == 0) return 0; + + char srcPath[TSDB_FILENAME_LEN]; + char dstPath[TSDB_FILENAME_LEN]; + + snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); + snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); + + int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs); + if (vgId <= 0) { + dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath); + return -1; + } + + pCfg->vgId = vgId; + pCfg->toVgId = 0; + return 0; +} + static void *vmOpenVnodeInThread(void *param) { SVnodeThread *pThread = param; SVnodeMgmt *pMgmt = pThread->pMgmt; @@ -174,17 +196,33 @@ static void *vmOpenVnodeInThread(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); tmsgReportStartup("vnode-open", stepDesc); + if (pCfg->toVgId) { + if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) { + dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex); + pThread->failed++; + continue; + } + pThread->updateVnodesList = true; + } + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; - } else { - vmOpenVnode(pMgmt, pCfg, pImpl); - dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); - pThread->opened++; - atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); + continue; } + + if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) { + dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); + pThread->failed++; + continue; + } + + dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); + pThread->opened++; + atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); } dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, @@ -242,6 +280,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { taosThreadAttrDestroy(&thAttr); } + bool updateVnodesList = false; + for (int32_t t = 0; t < threadNum; ++t) { SVnodeThread *pThread = &threads[t]; if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { @@ -249,6 +289,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { taosThreadClear(&pThread->thread); } taosMemoryFree(pThread->pCfgs); + if (pThread->updateVnodesList) updateVnodesList = true; } taosMemoryFree(threads); taosMemoryFree(pCfgs); @@ -256,10 +297,15 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) { dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes); return -1; - } else { - dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes); - return 0; } + + if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) { + dError("failed to write vnode list since %s", terrstr()); + return -1; + } + + dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes); + return 0; } static void *vmCloseVnodeInThread(void *param) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7e19425d56..83025d8561 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -54,6 +54,7 @@ void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs); +int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c1a1e7a27a..94becbf126 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -179,18 +179,21 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr return ret; } +int32_t vnodeGetAbsDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) { + if (pTfs) { + snprintf(buf, bufLen, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath); + } else { + snprintf(buf, bufLen, "%s", relPath); + } + return 0; +} + int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; int32_t ret = 0; - if (pTfs) { - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath); - } else { - snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath); - } - - // todo add stat file to handle exception while vnode open + vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); ret = vnodeLoadInfo(dir, &info); if (ret < 0) { @@ -245,6 +248,42 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod return 0; } +int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { + SVnodeInfo info = {0}; + char dir[TSDB_FILENAME_LEN] = {0}; + + vnodeGetAbsDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN); + if (vnodeLoadInfo(dir, &info) == 0) { + if (info.config.vgId != dstVgId) { + vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId); + return -1; + } + return dstVgId; + } + + vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); + if (vnodeLoadInfo(dir, &info) < 0) { + vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno)); + return -1; + } + + if (info.config.vgId == srcVgId) { + vInfo("vgId:%d, rollback alter hashrange", srcVgId); + return srcVgId; + } else if (info.config.vgId != dstVgId) { + vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId); + return -1; + } + + vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath); + if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) { + vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno)); + return -1; + } + + return dstVgId; +} + void vnodeDestroy(const char *path, STfs *pTfs) { vInfo("path:%s is removed while destroy vnode", path); tfsRmdir(pTfs, path);