Merge pull request #21786 from taosdata/FIX/TD-24817-3.0
enh: make alter-hashrange atomic and idempotent
This commit is contained in:
commit
e95b430e89
|
@ -46,6 +46,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int32_t vgVersion;
|
||||
int8_t dropped;
|
||||
int32_t toVgId;
|
||||
char path[PATH_MAX + 20];
|
||||
} SWrapperCfg;
|
||||
|
||||
|
@ -55,6 +56,7 @@ typedef struct {
|
|||
int32_t refCount;
|
||||
int8_t dropped;
|
||||
int8_t disable;
|
||||
int32_t toVgId;
|
||||
char *path;
|
||||
SVnode *pImpl;
|
||||
SMultiWorker pWriteW;
|
||||
|
@ -70,6 +72,7 @@ typedef struct {
|
|||
int32_t vnodeNum;
|
||||
int32_t opened;
|
||||
int32_t failed;
|
||||
bool updateVnodesList;
|
||||
int32_t threadIndex;
|
||||
TdThread thread;
|
||||
SVnodeMgmt *pMgmt;
|
||||
|
|
|
@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
|
|||
if (code < 0) goto _OVER;
|
||||
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
|
||||
if (code < 0) goto _OVER;
|
||||
tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
|
||||
if (code < 0) goto _OVER;
|
||||
|
||||
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId);
|
||||
}
|
||||
|
@ -165,6 +167,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
|
|||
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
|
||||
if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1;
|
||||
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
|
||||
}
|
||||
|
||||
|
@ -179,7 +182,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
|
|||
SVnodeObj **ppVnodes = NULL;
|
||||
char file[PATH_MAX] = {0};
|
||||
char realfile[PATH_MAX] = {0};
|
||||
snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP);
|
||||
snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
|
@ -226,4 +229,4 @@ _OVER:
|
|||
dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -484,10 +484,18 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t srcVgId = req.srcVgId;
|
||||
int32_t dstVgId = req.dstVgId;
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
|
||||
if (pVnode != NULL) {
|
||||
dError("vgId:%d, vnode already exist", dstVgId);
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
terrno = TSDB_CODE_VND_ALREADY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
|
||||
req.dstVgId);
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId);
|
||||
pVnode = vmAcquireVnode(pMgmt, srcVgId);
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
|
||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||
|
@ -501,6 +509,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
};
|
||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||
|
||||
// prepare alter
|
||||
pVnode->toVgId = dstVgId;
|
||||
if (vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||
dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, close vnode", srcVgId);
|
||||
vmCloseVnode(pMgmt, pVnode, true);
|
||||
|
||||
|
@ -532,6 +547,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// complete alter
|
||||
if (vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||
dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -158,6 +158,28 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
|||
taosMemoryFree(pVnode);
|
||||
}
|
||||
|
||||
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
|
||||
int32_t srcVgId = pCfg->vgId;
|
||||
int32_t dstVgId = pCfg->toVgId;
|
||||
if (dstVgId == 0) return 0;
|
||||
|
||||
char srcPath[TSDB_FILENAME_LEN];
|
||||
char dstPath[TSDB_FILENAME_LEN];
|
||||
|
||||
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
|
||||
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
|
||||
|
||||
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs);
|
||||
if (vgId <= 0) {
|
||||
dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pCfg->vgId = vgId;
|
||||
pCfg->toVgId = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *vmOpenVnodeInThread(void *param) {
|
||||
SVnodeThread *pThread = param;
|
||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||
|
@ -174,17 +196,33 @@ static void *vmOpenVnodeInThread(void *param) {
|
|||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||
tmsgReportStartup("vnode-open", stepDesc);
|
||||
|
||||
if (pCfg->toVgId) {
|
||||
if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
|
||||
dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->failed++;
|
||||
continue;
|
||||
}
|
||||
pThread->updateVnodesList = true;
|
||||
}
|
||||
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||
|
||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
||||
if (pImpl == NULL) {
|
||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->failed++;
|
||||
} else {
|
||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->opened++;
|
||||
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
|
||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->failed++;
|
||||
continue;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->opened++;
|
||||
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
|
||||
}
|
||||
|
||||
dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
|
||||
|
@ -242,6 +280,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
taosThreadAttrDestroy(&thAttr);
|
||||
}
|
||||
|
||||
bool updateVnodesList = false;
|
||||
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
||||
|
@ -249,6 +289,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
taosThreadClear(&pThread->thread);
|
||||
}
|
||||
taosMemoryFree(pThread->pCfgs);
|
||||
if (pThread->updateVnodesList) updateVnodesList = true;
|
||||
}
|
||||
taosMemoryFree(threads);
|
||||
taosMemoryFree(pCfgs);
|
||||
|
@ -256,10 +297,15 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
|
||||
dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
|
||||
return -1;
|
||||
} else {
|
||||
dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||
dError("failed to write vnode list since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *vmCloseVnodeInThread(void *param) {
|
||||
|
|
|
@ -1217,6 +1217,7 @@ static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, i
|
|||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_ALTER_HASHRANGE;
|
||||
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
|
|
|
@ -54,6 +54,7 @@ void vnodeCleanup();
|
|||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
|
||||
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
|
||||
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs);
|
||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||
void vnodePreClose(SVnode *pVnode);
|
||||
|
|
|
@ -136,14 +136,13 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
|
|||
}
|
||||
|
||||
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
|
||||
int32_t ret = tfsRename(pTfs, srcPath, dstPath);
|
||||
if (ret != 0) return ret;
|
||||
int32_t ret = 0;
|
||||
|
||||
char oldRname[TSDB_FILENAME_LEN] = {0};
|
||||
char newRname[TSDB_FILENAME_LEN] = {0};
|
||||
char tsdbPath[TSDB_FILENAME_LEN] = {0};
|
||||
char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", dstPath, TD_DIRSEP);
|
||||
snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP);
|
||||
snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
|
||||
|
||||
STfsDir *tsdbDir = tfsOpendir(pTfs, tsdbPath);
|
||||
|
@ -168,7 +167,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
|||
|
||||
ret = tfsRename(pTfs, tsdbFile->rname, newRname);
|
||||
if (ret != 0) {
|
||||
vInfo("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
|
||||
vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
|
||||
tfsClosedir(tsdbDir);
|
||||
return ret;
|
||||
}
|
||||
|
@ -176,6 +175,21 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
|||
}
|
||||
|
||||
tfsClosedir(tsdbDir);
|
||||
|
||||
vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
|
||||
ret = tfsRename(pTfs, srcPath, dstPath);
|
||||
if (ret != 0) {
|
||||
vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t vnodeGetAbsDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
|
||||
if (pTfs) {
|
||||
snprintf(buf, bufLen, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
|
||||
} else {
|
||||
snprintf(buf, bufLen, "%s", relPath);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -184,13 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
|||
char dir[TSDB_FILENAME_LEN] = {0};
|
||||
int32_t ret = 0;
|
||||
|
||||
if (pTfs) {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath);
|
||||
} else {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath);
|
||||
}
|
||||
|
||||
// todo add stat file to handle exception while vnode open
|
||||
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||
|
||||
ret = vnodeLoadInfo(dir, &info);
|
||||
if (ret < 0) {
|
||||
|
@ -245,6 +253,42 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
|
||||
SVnodeInfo info = {0};
|
||||
char dir[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
vnodeGetAbsDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||
if (vnodeLoadInfo(dir, &info) == 0) {
|
||||
if (info.config.vgId != dstVgId) {
|
||||
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
||||
return -1;
|
||||
}
|
||||
return dstVgId;
|
||||
}
|
||||
|
||||
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||
if (vnodeLoadInfo(dir, &info) < 0) {
|
||||
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (info.config.vgId == srcVgId) {
|
||||
vInfo("vgId:%d, rollback alter hashrange", srcVgId);
|
||||
return srcVgId;
|
||||
} else if (info.config.vgId != dstVgId) {
|
||||
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
|
||||
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) {
|
||||
vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return dstVgId;
|
||||
}
|
||||
|
||||
void vnodeDestroy(const char *path, STfs *pTfs) {
|
||||
vInfo("path:%s is removed while destroy vnode", path);
|
||||
tfsRmdir(pTfs, path);
|
||||
|
|
Loading…
Reference in New Issue