enh: commit tsdb after split
This commit is contained in:
parent
141fcdd49b
commit
53ac9c7229
|
@ -81,7 +81,7 @@ typedef struct {
|
||||||
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||||
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
||||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);
|
||||||
|
|
||||||
// vmHandle.c
|
// vmHandle.c
|
||||||
SArray *vmGetMsgHandles();
|
SArray *vmGetMsgHandles();
|
||||||
|
|
|
@ -322,13 +322,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("vgId:%d, start to close vnode", srcVgId);
|
dInfo("vgId:%d, start to close vnode", srcVgId);
|
||||||
SWrapperCfg wrapperCfg = {
|
vmCloseVnode(pMgmt, pVnode, true);
|
||||||
.dropped = pVnode->dropped,
|
|
||||||
.vgId = pVnode->vgId,
|
|
||||||
.vgVersion = pVnode->vgVersion,
|
|
||||||
};
|
|
||||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
|
||||||
vmCloseVnode(pMgmt, pVnode);
|
|
||||||
|
|
||||||
char srcPath[TSDB_FILENAME_LEN] = {0};
|
char srcPath[TSDB_FILENAME_LEN] = {0};
|
||||||
char dstPath[TSDB_FILENAME_LEN] = {0};
|
char dstPath[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -348,7 +342,12 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
wrapperCfg.vgId = dstVgId;
|
SWrapperCfg wrapperCfg = {
|
||||||
|
.dropped = pVnode->dropped,
|
||||||
|
.vgId = dstVgId,
|
||||||
|
.vgVersion = pVnode->vgVersion,
|
||||||
|
};
|
||||||
|
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||||
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
|
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
|
||||||
dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
|
dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -407,7 +406,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
.vgVersion = pVnode->vgVersion,
|
.vgVersion = pVnode->vgVersion,
|
||||||
};
|
};
|
||||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||||
vmCloseVnode(pMgmt, pVnode);
|
vmCloseVnode(pMgmt, pVnode, false);
|
||||||
|
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
|
||||||
|
@ -469,7 +468,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vmCloseVnode(pMgmt, pVnode);
|
vmCloseVnode(pMgmt, pVnode, false);
|
||||||
vmWriteVnodeListToFile(pMgmt);
|
vmWriteVnodeListToFile(pMgmt);
|
||||||
|
|
||||||
dInfo("vgId:%d, is dropped", vgId);
|
dInfo("vgId:%d, is dropped", vgId);
|
||||||
|
|
|
@ -76,7 +76,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
vnodeProposeCommitOnNeed(pVnode->pImpl);
|
vnodeProposeCommitOnNeed(pVnode->pImpl);
|
||||||
|
@ -124,10 +124,24 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
vnodePostClose(pVnode->pImpl);
|
vnodePostClose(pVnode->pImpl);
|
||||||
|
|
||||||
vmFreeQueue(pMgmt, pVnode);
|
vmFreeQueue(pMgmt, pVnode);
|
||||||
|
|
||||||
|
if (commitAndRemoveWal) {
|
||||||
|
dInfo("vgId:%d, commit data", pVnode->vgId);
|
||||||
|
vnodeSyncCommit(pVnode->pImpl);
|
||||||
|
}
|
||||||
|
|
||||||
vnodeClose(pVnode->pImpl);
|
vnodeClose(pVnode->pImpl);
|
||||||
pVnode->pImpl = NULL;
|
pVnode->pImpl = NULL;
|
||||||
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
|
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
|
||||||
|
|
||||||
|
if (commitAndRemoveWal) {
|
||||||
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
|
||||||
|
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
|
||||||
|
tfsRmdir(pMgmt->pTfs, path);
|
||||||
|
tfsMkdir(pMgmt->pTfs, path);
|
||||||
|
}
|
||||||
|
|
||||||
if (pVnode->dropped) {
|
if (pVnode->dropped) {
|
||||||
dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
|
dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
|
||||||
|
@ -257,7 +271,7 @@ static void *vmCloseVnodeInThread(void *param) {
|
||||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||||
tmsgReportStartup("vnode-close", stepDesc);
|
tmsgReportStartup("vnode-close", stepDesc);
|
||||||
|
|
||||||
vmCloseVnode(pMgmt, pVnode);
|
vmCloseVnode(pMgmt, pVnode, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
||||||
|
|
|
@ -58,6 +58,7 @@ void vnodePreClose(SVnode *pVnode);
|
||||||
void vnodePostClose(SVnode *pVnode);
|
void vnodePostClose(SVnode *pVnode);
|
||||||
void vnodeSyncCheckTimeout(SVnode *pVnode);
|
void vnodeSyncCheckTimeout(SVnode *pVnode);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
|
int32_t vnodeSyncCommit(SVnode *pVnode);
|
||||||
|
|
||||||
int32_t vnodeStart(SVnode *pVnode);
|
int32_t vnodeStart(SVnode *pVnode);
|
||||||
void vnodeStop(SVnode *pVnode);
|
void vnodeStop(SVnode *pVnode);
|
||||||
|
|
|
@ -124,8 +124,8 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId,
|
vInfo("vgId:%d, start to alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, info.config.hashBegin,
|
||||||
info.config.hashBegin, info.config.hashEnd, pReq->hashBegin, pReq->hashEnd);
|
info.config.hashEnd, pReq->hashBegin, pReq->hashEnd);
|
||||||
info.config.vgId = pReq->dstVgId;
|
info.config.vgId = pReq->dstVgId;
|
||||||
info.config.hashBegin = pReq->hashBegin;
|
info.config.hashBegin = pReq->hashBegin;
|
||||||
info.config.hashEnd = pReq->hashEnd;
|
info.config.hashEnd = pReq->hashEnd;
|
||||||
|
@ -165,6 +165,8 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo compact here
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
|
vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue