Merge pull request #22085 from taosdata/FEAT/TS-2699-3.0

feat: distribute vnode primary dirs among disks of level 0
This commit is contained in:
wade zhang 2023-07-19 09:44:43 +08:00 committed by GitHub
commit c3e5375f15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 264 additions and 69 deletions

View File

@ -69,6 +69,13 @@ void tfsUpdateSize(STfs *pTfs);
*/
SDiskSize tfsGetSize(STfs *pTfs);
/**
* @brief Get the number of disks at level of multi-tier storage.
*
* @param pTfs
* @return int32_t
*/
int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level);
/**
* @brief Get level of multi-tier storage.
*
@ -123,6 +130,15 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname);
*/
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Recursive make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecur(STfs *pTfs, const char *rname);
/**
* @brief Recursive create directories in tfs.
*
@ -160,7 +176,17 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname);
int32_t tfsRename(STfs *pTfs, int32_t diskPrimary, const char *orname, const char *nrname);
/**
* @brief Search fname in level of tfs
*
* @param pTfs The fs object.
* @param level The level to search on
* @param fname The relative file name to be searched
* @param int32_t diskId for successs, -1 for failure
*/
int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname);
/**
* @brief Init file object in tfs.

View File

@ -416,7 +416,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x
// #define TSDB_CODE_VND_ACTION_NEED_REPROCESS. TAOS_DEF_ERROR_CODE(0, 0x0502) // 2.x
#define TSDB_CODE_VND_INVALID_VGROUP_ID TAOS_DEF_ERROR_CODE(0, 0x0503)
// #define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // 2.x
#define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504)
// #define TSDB_CODE_VND_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0505) // 2.x
// #define TSDB_CODE_VND_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0506) // 2.x
// #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) // 2.x

View File

@ -46,6 +46,7 @@ typedef struct {
int32_t vgId;
int32_t vgVersion;
int8_t dropped;
int32_t diskPrimary;
int32_t toVgId;
char path[PATH_MAX + 20];
} SWrapperCfg;
@ -56,6 +57,7 @@ typedef struct {
int32_t refCount;
int8_t dropped;
int8_t disable;
int32_t diskPrimary;
int32_t toVgId;
char *path;
SVnode *pImpl;
@ -81,6 +83,7 @@ typedef struct {
} SVnodeThread;
// vmInt.c
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);

View File

@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code);
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
if (code < 0) goto _OVER;
@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary) < 0) return -1;
if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1;
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
}

View File

@ -263,16 +263,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
wrapperCfg.diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
int32_t diskPrimary = wrapperCfg.diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
@ -403,21 +406,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.dropped = pVnode->dropped,
.vgId = pVnode->vgId,
.vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode, false);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlterReplica(path, &req, pMgmt->pTfs) < 0) {
if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1;
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
@ -490,6 +495,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.dropped = pVnode->dropped,
.vgId = dstVgId,
.vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
@ -503,19 +509,20 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, close vnode", srcVgId);
vmCloseVnode(pMgmt, pVnode, true);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char srcPath[TSDB_FILENAME_LEN] = {0};
char dstPath[TSDB_FILENAME_LEN] = {0};
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) {
if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
return -1;
}
dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1;
@ -602,21 +609,23 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.dropped = pVnode->dropped,
.vgId = pVnode->vgId,
.vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode, false);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) {
if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1;
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;

View File

@ -15,8 +15,64 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;
int32_t diskId = 0;
if (!pTfs) {
return diskId;
}
// search fs
char vnodePath[TSDB_FILENAME_LEN] = {0};
snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameTmp[TSDB_FILENAME_LEN] = {0};
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
diskId = tfsSearch(pTfs, 0, fname);
if (diskId >= 0) {
return diskId;
}
diskId = tfsSearch(pTfs, 0, fnameTmp);
if (diskId >= 0) {
return diskId;
}
// alloc
int32_t disks[TFS_MAX_DISKS_PER_TIER] = {0};
int32_t numOfVnodes = 0;
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
for (int32_t v = 0; v < numOfVnodes; v++) {
SVnodeObj *pVnode = ppVnodes[v];
disks[pVnode->diskPrimary] += 1;
}
int32_t minVal = INT_MAX;
int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
diskId = 0;
for (int32_t id = 0; id < ndisk; id++) {
if (minVal > disks[id]) {
minVal = disks[id];
diskId = id;
}
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
vmReleaseVnode(pMgmt, ppVnodes[i]);
}
if (ppVnodes != NULL) {
taosMemoryFree(ppVnodes);
}
dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
return diskId;
}
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL;
@ -52,6 +108,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->vgId = pCfg->vgId;
pVnode->vgVersion = pCfg->vgVersion;
pVnode->diskPrimary = pCfg->diskPrimary;
pVnode->refCount = 0;
pVnode->dropped = 0;
pVnode->path = taosStrdup(pCfg->path);
@ -169,7 +226,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs);
int32_t diskPrimary = pCfg->diskPrimary;
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
if (vgId <= 0) {
dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
return -1;
@ -205,11 +263,12 @@ static void *vmOpenVnodeInThread(void *param) {
pThread->updateVnodesList = true;
}
int32_t diskPrimary = pCfg->diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
pThread->failed++;
continue;
}
@ -296,6 +355,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
terrno = TSDB_CODE_VND_INIT_FAILED;
return -1;
}
@ -518,7 +578,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
tmsgReportStartup("vnode-worker", "initialized");
if (vmOpenVnodes(pMgmt) != 0) {
dError("failed to open vnode since %s", terrstr());
dError("failed to open all vnodes since %s", terrstr());
goto _OVER;
}
tmsgReportStartup("vnode-vnodes", "initialized");

View File

@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs);
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
int32_t diskPrimary, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);

View File

@ -87,7 +87,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
// vnodeOpen.c
int32_t vnodeGetPrimaryDir(const char* relPath, STfs* pTfs, char* buf, size_t bufLen);
int32_t vnodeGetPrimaryDir(const char* relPath, int32_t diskPrimary, STfs* pTfs, char* buf, size_t bufLen);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);

View File

@ -93,6 +93,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
@ -385,6 +386,7 @@ struct SVnode {
SVState state;
SVStatis statis;
STfs* pTfs;
int32_t diskPrimary;
SMsgCb msgCb;
// Buffer Pool

View File

@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
*ppMeta = NULL;
// create handle
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);

View File

@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
int32_t offset = 0;
// vnode
vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pTfs, outputName, TSDB_FILENAME_LEN);
offset = strlen(outputName);
// rsma

View File

@ -59,7 +59,7 @@ typedef struct {
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
int32_t offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);

View File

@ -276,14 +276,14 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
// CURRENT
if (current) {
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current, TSDB_FILENAME_LEN);
offset = strlen(current);
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP);
}
// CURRENT.t
if (current_t) {
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
offset = strlen(current_t);
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP);
}

View File

@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
int32_t offset = 0;
SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, fname, TSDB_FILENAME_LEN);
offset = strlen(fname);
snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP,
TD_VID(pTsdb->pVnode), pFile->commitID);

View File

@ -16,8 +16,6 @@
#include "vnd.h"
#include "vnodeInt.h"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
@ -290,7 +288,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo->txn = metaGetTxn(pVnode->pMeta);
// save info
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
@ -428,7 +426,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
return -1;
}
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
@ -492,7 +490,7 @@ bool vnodeShouldRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
@ -503,7 +501,7 @@ void vnodeRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);

View File

@ -15,9 +15,11 @@
#include "vnd.h"
int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) {
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
SDiskID diskId = {0};
diskId.id = diskPrimary;
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
} else {
snprintf(buf, bufLen - 1, "%s", relPath);
}
@ -25,7 +27,15 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu
return 0;
}
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
if (pTfs) {
return tfsMkdirRecur(pTfs, path);
} else {
return taosMkDir(path);
}
}
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
@ -36,10 +46,11 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
}
// create vnode env
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
if (taosMkDir(dir)) {
if (vnodeMkDir(pTfs, path)) {
vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path);
return TAOS_SYSTEM_ERROR(errno);
}
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (pCfg) {
info.config = *pCfg;
@ -60,12 +71,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return 0;
}
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
@ -133,7 +144,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
return strlen(tmp);
}
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs) {
int32_t ret = 0;
char oldRname[TSDB_FILENAME_LEN] = {0};
@ -164,7 +176,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
ret = tfsRename(pTfs, tsdbFile->rname, newRname);
ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
if (ret != 0) {
vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
tfsClosedir(tsdbDir);
@ -176,19 +188,20 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
tfsClosedir(tsdbDir);
vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
ret = tfsRename(pTfs, srcPath, dstPath);
ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
if (ret != 0) {
vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
}
return ret;
}
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
@ -232,7 +245,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
}
vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
if (ret < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
tstrerror(terrno));
@ -243,11 +256,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return 0;
}
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) == 0) {
if (info.config.vgId != dstVgId) {
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
@ -256,7 +270,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return dstVgId;
}
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
return -1;
@ -271,7 +285,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
}
vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) {
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
return -1;
}
@ -284,14 +298,31 @@ void vnodeDestroy(const char *path, STfs *pTfs) {
tfsRmdir(pTfs, path);
}
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
int32_t ndisk = 1;
if (pTfs) {
ndisk = tfsGetDisksAtLevel(pTfs, 0);
}
if (diskPrimary < 0 || diskPrimary >= ndisk) {
vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
terrno = TSDB_CODE_FS_INVLD_CFG;
return -1;
}
return 0;
}
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
char tdir[TSDB_FILENAME_LEN * 2] = {0};
int32_t ret = 0;
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeCheckDisk(diskPrimary, pTfs)) {
vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
return NULL;
}
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
info.config = vnodeCfgDefault;
@ -334,6 +365,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->state.applied = info.state.committed;
pVnode->state.applyTerm = info.state.commitTerm;
pVnode->pTfs = pTfs;
pVnode->diskPrimary = diskPrimary;
pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL);
pVnode->blocked = false;

View File

@ -35,7 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno;
@ -60,7 +60,7 @@ static int32_t vnodeRetentionTask(void *param) {
SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
// save info
pInfo->info.state.commitID = pInfo->commitID;

View File

@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0;
SVnode *pVnode = pReader->pVnode;
// CONFIG ==============
// FIXME: if commit multiple times and the config changed?
@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
char fName[TSDB_FILENAME_LEN];
int32_t offset = 0;
vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
offset = strlen(fName);
snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
@ -343,7 +344,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
.applyTerm = pWriter->info.state.commitTerm};
pVnode->statis = pWriter->info.statis;
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeCommitInfo(dir);
} else {
@ -381,7 +382,7 @@ _exit:
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0;
SVnode *pVnode = pWriter->pVnode;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
// decode info
@ -395,10 +396,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
// modify info as needed
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pWriter->pVnode->path, pWriter->pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
SVnodeStats vndStats = pWriter->info.config.vndStats;
SVnode *pVnode = pWriter->pVnode;
pWriter->info.config = pVnode->config;
pWriter->info.config.vndStats = vndStats;
vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);

View File

@ -113,6 +113,15 @@ SDiskSize tfsGetSize(STfs *pTfs) {
return size;
}
int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level) {
if (level < 0 || level >= pTfs->nlevel) {
return 0;
}
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
return pTier->ndisk;
}
int32_t tfsGetLevel(STfs *pTfs) { return pTfs->nlevel; }
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
@ -272,6 +281,20 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
return 0;
}
int32_t tfsMkdirRecur(STfs *pTfs, const char *rname) {
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
SDiskID did = {.id = id, .level = level};
if (tfsMkdirRecurAt(pTfs, rname, did) < 0) {
return -1;
}
}
}
return 0;
}
int32_t tfsMkdir(STfs *pTfs, const char *rname) {
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
@ -314,25 +337,60 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
return 0;
}
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) {
static int32_t tfsRenameAt(STfs *pTfs, SDiskID diskId, const char *orname, const char *nrname) {
char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0";
for (int32_t level = pTfs->nlevel - 1; level >= 0; level--) {
int32_t level = diskId.level;
int32_t id = diskId.id;
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = pTier->ndisk - 1; id >= 0; id--) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
if (taosRenameFile(oaname, naname) != 0 && errno != ENOENT) {
terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to rename %s to %s since %s", oaname, naname, terrstr());
return -1;
}
return 0;
}
int32_t tfsRename(STfs *pTfs, int32_t diskPrimary, const char *orname, const char *nrname) {
for (int32_t level = pTfs->nlevel - 1; level >= 0; level--) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = pTier->ndisk - 1; id >= 0; id--) {
if (level == 0 && id == diskPrimary) {
continue;
}
SDiskID diskId = {.level = level, .id = id};
if (tfsRenameAt(pTfs, diskId, orname, nrname)) {
return -1;
}
}
}
return 0;
SDiskID diskId = {.level = 0, .id = diskPrimary};
return tfsRenameAt(pTfs, diskId, orname, nrname);
}
int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname) {
if (level < 0 || level >= pTfs->nlevel) {
return -1;
}
char path[TMPNAME_LEN] = {0};
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(path, TMPNAME_LEN - 1, "%s%s%s", pDisk->path, TD_DIRSEP, fname);
if (taosCheckExistFile(path)) {
return id;
}
}
return -1;
}
STfsDir *tfsOpendir(STfs *pTfs, const char *rname) {

View File

@ -156,7 +156,7 @@ TEST_F(TfsTest, 03_Dir) {
EXPECT_NE(taosDirExist(ap4), 1);
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0);
EXPECT_EQ(taosDirExist(ap4), 1);
EXPECT_EQ(tfsRename(pTfs, p44, p45), 0);
EXPECT_EQ(tfsRename(pTfs, 0, p44, p45), 0);
EXPECT_EQ(tfsRmdir(pTfs, p4), 0);
EXPECT_NE(taosDirExist(ap4), 1);
@ -609,7 +609,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
EXPECT_NE(taosDirExist(_ap22), 1);
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0);
EXPECT_EQ(taosDirExist(_ap22), 1);
EXPECT_EQ(tfsRename(pTfs, p44, p45), 0);
EXPECT_EQ(tfsRename(pTfs, 0, p44, p45), 0);
EXPECT_EQ(tfsRmdir(pTfs, p4), 0);
EXPECT_NE(taosDirExist(_ap22), 1);
}

View File

@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this comma
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INIT_FAILED, "Vnode init failure")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")