enh: restore vgroup id in vmOpenVnodes for vnodeAlterHashRange
This commit is contained in:
parent
e1d9e44fcd
commit
424086e324
|
@ -72,6 +72,7 @@ typedef struct {
|
||||||
int32_t vnodeNum;
|
int32_t vnodeNum;
|
||||||
int32_t opened;
|
int32_t opened;
|
||||||
int32_t failed;
|
int32_t failed;
|
||||||
|
bool updateVnodesList;
|
||||||
int32_t threadIndex;
|
int32_t threadIndex;
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
SVnodeMgmt *pMgmt;
|
SVnodeMgmt *pMgmt;
|
||||||
|
|
|
@ -158,6 +158,28 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
taosMemoryFree(pVnode);
|
taosMemoryFree(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
|
||||||
|
int32_t srcVgId = pCfg->vgId;
|
||||||
|
int32_t dstVgId = pCfg->toVgId;
|
||||||
|
if (dstVgId == 0) return 0;
|
||||||
|
|
||||||
|
char srcPath[TSDB_FILENAME_LEN];
|
||||||
|
char dstPath[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
|
||||||
|
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
|
||||||
|
|
||||||
|
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs);
|
||||||
|
if (vgId <= 0) {
|
||||||
|
dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCfg->vgId = vgId;
|
||||||
|
pCfg->toVgId = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void *vmOpenVnodeInThread(void *param) {
|
static void *vmOpenVnodeInThread(void *param) {
|
||||||
SVnodeThread *pThread = param;
|
SVnodeThread *pThread = param;
|
||||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||||
|
@ -174,17 +196,33 @@ static void *vmOpenVnodeInThread(void *param) {
|
||||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||||
tmsgReportStartup("vnode-open", stepDesc);
|
tmsgReportStartup("vnode-open", stepDesc);
|
||||||
|
|
||||||
|
if (pCfg->toVgId) {
|
||||||
|
if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
|
||||||
|
dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
|
pThread->failed++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pThread->updateVnodesList = true;
|
||||||
|
}
|
||||||
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||||
|
|
||||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
} else {
|
continue;
|
||||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
|
||||||
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
|
||||||
pThread->opened++;
|
|
||||||
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
|
||||||
|
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
|
pThread->failed++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
|
pThread->opened++;
|
||||||
|
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
|
dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
|
||||||
|
@ -242,6 +280,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool updateVnodesList = false;
|
||||||
|
|
||||||
for (int32_t t = 0; t < threadNum; ++t) {
|
for (int32_t t = 0; t < threadNum; ++t) {
|
||||||
SVnodeThread *pThread = &threads[t];
|
SVnodeThread *pThread = &threads[t];
|
||||||
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
||||||
|
@ -249,6 +289,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
||||||
taosThreadClear(&pThread->thread);
|
taosThreadClear(&pThread->thread);
|
||||||
}
|
}
|
||||||
taosMemoryFree(pThread->pCfgs);
|
taosMemoryFree(pThread->pCfgs);
|
||||||
|
if (pThread->updateVnodesList) updateVnodesList = true;
|
||||||
}
|
}
|
||||||
taosMemoryFree(threads);
|
taosMemoryFree(threads);
|
||||||
taosMemoryFree(pCfgs);
|
taosMemoryFree(pCfgs);
|
||||||
|
@ -256,10 +297,15 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
||||||
if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
|
if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
|
||||||
dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
|
dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
|
||||||
dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||||
|
dError("failed to write vnode list since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *vmCloseVnodeInThread(void *param) {
|
static void *vmCloseVnodeInThread(void *param) {
|
||||||
|
|
|
@ -54,6 +54,7 @@ void vnodeCleanup();
|
||||||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
|
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
|
||||||
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
|
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
|
||||||
|
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodePreClose(SVnode *pVnode);
|
void vnodePreClose(SVnode *pVnode);
|
||||||
|
|
|
@ -179,18 +179,21 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeGetAbsDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
|
||||||
|
if (pTfs) {
|
||||||
|
snprintf(buf, bufLen, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
|
||||||
|
} else {
|
||||||
|
snprintf(buf, bufLen, "%s", relPath);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
|
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
if (pTfs) {
|
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo add stat file to handle exception while vnode open
|
|
||||||
|
|
||||||
ret = vnodeLoadInfo(dir, &info);
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -245,6 +248,42 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
|
||||||
|
SVnodeInfo info = {0};
|
||||||
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
|
vnodeGetAbsDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
if (vnodeLoadInfo(dir, &info) == 0) {
|
||||||
|
if (info.config.vgId != dstVgId) {
|
||||||
|
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return dstVgId;
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
if (vnodeLoadInfo(dir, &info) < 0) {
|
||||||
|
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (info.config.vgId == srcVgId) {
|
||||||
|
vInfo("vgId:%d, rollback alter hashrange", srcVgId);
|
||||||
|
return srcVgId;
|
||||||
|
} else if (info.config.vgId != dstVgId) {
|
||||||
|
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
|
||||||
|
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) {
|
||||||
|
vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return dstVgId;
|
||||||
|
}
|
||||||
|
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs) {
|
void vnodeDestroy(const char *path, STfs *pTfs) {
|
||||||
vInfo("path:%s is removed while destroy vnode", path);
|
vInfo("path:%s is removed while destroy vnode", path);
|
||||||
tfsRmdir(pTfs, path);
|
tfsRmdir(pTfs, path);
|
||||||
|
|
Loading…
Reference in New Issue