diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index cd49997d49..e3fa2964b7 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -81,7 +81,7 @@ typedef struct { SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); -void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); +void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal); // vmHandle.c SArray *vmGetMsgHandles(); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index e110130909..d2ab9de7c8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -322,13 +322,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, start to close vnode", srcVgId); - SWrapperCfg wrapperCfg = { - .dropped = pVnode->dropped, - .vgId = pVnode->vgId, - .vgVersion = pVnode->vgVersion, - }; - tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, true); char srcPath[TSDB_FILENAME_LEN] = {0}; char dstPath[TSDB_FILENAME_LEN] = {0}; @@ -348,7 +342,12 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - wrapperCfg.vgId = dstVgId; + SWrapperCfg wrapperCfg = { + .dropped = pVnode->dropped, + .vgId = dstVgId, + .vgVersion = pVnode->vgVersion, + }; + tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) { dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr()); return -1; @@ -407,7 +406,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { .vgVersion = pVnode->vgVersion, }; tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, false); char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); @@ -469,7 +468,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, false); vmWriteVnodeListToFile(pMgmt); dInfo("vgId:%d, is dropped", vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 99ba9b9b3b..61d83f4e8d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -76,7 +76,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return code; } -void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { +void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) { char path[TSDB_FILENAME_LEN] = {0}; vnodeProposeCommitOnNeed(pVnode->pImpl); @@ -124,10 +124,24 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { vnodePostClose(pVnode->pImpl); vmFreeQueue(pMgmt, pVnode); + + if (commitAndRemoveWal) { + dInfo("vgId:%d, commit data", pVnode->vgId); + vnodeSyncCommit(pVnode->pImpl); + } + vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; dInfo("vgId:%d, vnode is closed", pVnode->vgId); + if (commitAndRemoveWal) { + char path[TSDB_FILENAME_LEN] = {0}; + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP); + dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path); + tfsRmdir(pMgmt->pTfs, path); + tfsMkdir(pMgmt->pTfs, path); + } + if (pVnode->dropped) { dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); @@ -257,7 +271,7 @@ static void *vmCloseVnodeInThread(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); tmsgReportStartup("vnode-close", stepDesc); - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, false); } dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fdb778e2bb..5cd8b63bc4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -58,6 +58,7 @@ void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); +int32_t vnodeSyncCommit(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4acaadfa20..64cc31596e 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -124,8 +124,8 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod return -1; } - vInfo("vgId:%d, alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, - info.config.hashBegin, info.config.hashEnd, pReq->hashBegin, pReq->hashEnd); + vInfo("vgId:%d, start to alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, info.config.hashBegin, + info.config.hashEnd, pReq->hashBegin, pReq->hashEnd); info.config.vgId = pReq->dstVgId; info.config.hashBegin = pReq->hashBegin; info.config.hashEnd = pReq->hashEnd; @@ -165,6 +165,8 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod return -1; } + // todo compact here + vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId); return 0; }