Merge pull request #22045 from taosdata/FEAT/TS-2699-3.0
enh: unify handling of vnode primary dir for vnd, tsdb and sma
This commit is contained in:
commit
ec86a68b6f
|
@ -265,22 +265,6 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (pMgmt->pTfs) {
|
|
||||||
if (tfsDirExistAt(pMgmt->pTfs, path, (SDiskID){0})) {
|
|
||||||
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
|
|
||||||
dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (taosDirExist(path)) {
|
|
||||||
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
|
|
||||||
dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
||||||
tFreeSCreateVnodeReq(&req);
|
tFreeSCreateVnodeReq(&req);
|
||||||
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
|
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
|
||||||
|
|
|
@ -229,11 +229,11 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
|
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
||||||
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
|
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
|
||||||
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
|
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName);
|
||||||
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
|
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
|
||||||
char *outputName);
|
char *outputName);
|
||||||
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
|
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName);
|
||||||
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
|
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
|
||||||
|
|
||||||
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||||
int32_t ref = T_REF_INC(pRSmaInfo);
|
int32_t ref = T_REF_INC(pRSmaInfo);
|
||||||
|
@ -244,9 +244,9 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||||
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
|
void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
|
||||||
int8_t level, int64_t version, char *outputName);
|
char *outputName);
|
||||||
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
|
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,6 +86,9 @@ void vnodeBufPoolReset(SVBufPool* pPool);
|
||||||
void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
|
void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
|
||||||
int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
|
int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
|
||||||
|
|
||||||
|
// vnodeOpen.c
|
||||||
|
int32_t vnodeGetPrimaryDir(const char* relPath, STfs* pTfs, char* buf, size_t bufLen);
|
||||||
|
|
||||||
// vnodeQuery.c
|
// vnodeQuery.c
|
||||||
int32_t vnodeQueryOpen(SVnode* pVnode);
|
int32_t vnodeQueryOpen(SVnode* pVnode);
|
||||||
void vnodeQueryPreClose(SVnode* pVnode);
|
void vnodeQueryPreClose(SVnode* pVnode);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
|
@ -34,30 +35,27 @@ static void metaCleanup(SMeta **ppMeta);
|
||||||
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
SMeta *pMeta = NULL;
|
SMeta *pMeta = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
int slen;
|
int offset;
|
||||||
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
*ppMeta = NULL;
|
*ppMeta = NULL;
|
||||||
|
|
||||||
// create handle
|
// create handle
|
||||||
if (pVnode->pTfs) {
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
||||||
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_META_DIR) + 3;
|
offset = strlen(path);
|
||||||
} else {
|
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);
|
||||||
slen = strlen(pVnode->path) + strlen(VNODE_META_DIR) + 2;
|
|
||||||
}
|
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + strlen(path) + 1)) == NULL) {
|
||||||
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + slen)) == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaInitLock(pMeta);
|
metaInitLock(pMeta);
|
||||||
|
|
||||||
pMeta->path = (char *)&pMeta[1];
|
pMeta->path = (char *)&pMeta[1];
|
||||||
if (pVnode->pTfs) {
|
strcpy(pMeta->path, path);
|
||||||
sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
taosRealPath(pMeta->path, NULL, strlen(path) + 1);
|
||||||
VNODE_META_DIR);
|
|
||||||
} else {
|
|
||||||
sprintf(pMeta->path, "%s%s%s", pVnode->path, TD_DIRSEP, VNODE_META_DIR);
|
|
||||||
}
|
|
||||||
taosRealPath(pMeta->path, NULL, slen);
|
|
||||||
pMeta->pVnode = pVnode;
|
pMeta->pVnode = pVnode;
|
||||||
|
|
||||||
// create path if not created yet
|
// create path if not created yet
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
// =================================================================================================
|
// =================================================================================================
|
||||||
|
|
||||||
|
@ -157,25 +158,15 @@ _exit:
|
||||||
|
|
||||||
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
|
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
if (pVnode->pTfs) {
|
int32_t offset = 0;
|
||||||
if (current) {
|
|
||||||
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs),
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
|
||||||
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
|
offset = strlen(current);
|
||||||
}
|
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%sPRESENT", TD_DIRSEP, VNODE_RSMA_DIR, TD_DIRSEP);
|
||||||
if (current_t) {
|
|
||||||
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs),
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
|
||||||
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
|
offset = strlen(current_t);
|
||||||
}
|
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%sPRESENT.t", TD_DIRSEP, VNODE_RSMA_DIR, TD_DIRSEP);
|
||||||
} else {
|
|
||||||
#if 0
|
|
||||||
if (current) {
|
|
||||||
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP);
|
|
||||||
}
|
|
||||||
if (current_t) {
|
|
||||||
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
|
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
|
||||||
|
@ -309,8 +300,7 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
|
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
|
||||||
if (nRef <= 0) {
|
if (nRef <= 0) {
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version,
|
tdRSmaQTaskInfoGetFullName(pVnode, preTaskF->suid, preTaskF->level, preTaskF->version, pVnode->pTfs, fname);
|
||||||
tfsGetPrimaryPath(pVnode->pTfs), fname);
|
|
||||||
(void)taosRemoveFile(fname);
|
(void)taosRemoveFile(fname);
|
||||||
taosArrayRemove(pFSOld->aQTaskInf, idx);
|
taosArrayRemove(pFSOld->aQTaskInf, idx);
|
||||||
}
|
}
|
||||||
|
@ -341,9 +331,9 @@ static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
|
||||||
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
|
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
|
||||||
|
|
||||||
// main.tdb =========
|
// main.tdb =========
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
|
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version,
|
||||||
tfsGetPrimaryPath(pVnode->pTfs), fnameVer);
|
pVnode->pTfs, fnameVer);
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
|
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
|
||||||
|
|
||||||
if (taosCheckExistFile(fnameVer)) {
|
if (taosCheckExistFile(fnameVer)) {
|
||||||
if (taosRenameFile(fnameVer, fname) < 0) {
|
if (taosRenameFile(fnameVer, fname) < 0) {
|
||||||
|
@ -597,8 +587,7 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
|
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
|
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fname);
|
||||||
tfsGetPrimaryPath(pVnode->pTfs), fname);
|
|
||||||
if (taosRemoveFile(fname) < 0) {
|
if (taosRemoveFile(fname) < 0) {
|
||||||
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -260,7 +260,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
void *pStreamState = NULL;
|
void *pStreamState = NULL;
|
||||||
|
|
||||||
// set the backend of stream state
|
// set the backend of stream state
|
||||||
tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
|
tdRSmaQTaskInfoGetFullPathEx(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
|
||||||
if (!taosCheckExistFile(taskInfDir)) {
|
if (!taosCheckExistFile(taskInfDir)) {
|
||||||
char *s = taosStrdup(taskInfDir);
|
char *s = taosStrdup(taskInfDir);
|
||||||
if (taosMulMkDir(taosDirName(s)) != 0) {
|
if (taosMulMkDir(taosDirName(s)) != 0) {
|
||||||
|
@ -1258,9 +1258,8 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
pRSmaInfo->suid, i + 1);
|
pRSmaInfo->suid, i + 1);
|
||||||
|
|
||||||
// qTaskInfo file
|
// qTaskInfo file
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
|
tdRSmaQTaskInfoGetFullName(pVnode, pRSmaInfo->suid, i + 1, -1, pVnode->pTfs, fname);
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs),
|
tdRSmaQTaskInfoGetFullName(pVnode, pRSmaInfo->suid, i + 1, version, pVnode->pTfs, fnameVer);
|
||||||
fnameVer);
|
|
||||||
if (taosCheckExistFile(fnameVer)) {
|
if (taosCheckExistFile(fnameVer)) {
|
||||||
smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer);
|
smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,8 +117,7 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
|
tdRSmaQTaskInfoGetFullName(pVnode, qTaskF->suid, qTaskF->level, version, pVnode->pTfs, fname);
|
||||||
fname);
|
|
||||||
if (!taosCheckExistFile(fname)) {
|
if (!taosCheckExistFile(fname)) {
|
||||||
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
|
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
|
||||||
", version %" PRIi64 " failed since %s not exist",
|
", version %" PRIi64 " failed since %s not exist",
|
||||||
|
@ -340,7 +339,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
|
||||||
SSmaEnv* pEnv = NULL;
|
SSmaEnv* pEnv = NULL;
|
||||||
SRSmaStat* pStat = NULL;
|
SRSmaStat* pStat = NULL;
|
||||||
SRSmaSnapWriter* pWriter = *ppWriter;
|
SRSmaSnapWriter* pWriter = *ppWriter;
|
||||||
const char* primaryPath = NULL;
|
|
||||||
char fname[TSDB_FILENAME_LEN] = {0};
|
char fname[TSDB_FILENAME_LEN] = {0};
|
||||||
char fnameVer[TSDB_FILENAME_LEN] = {0};
|
char fnameVer[TSDB_FILENAME_LEN] = {0};
|
||||||
TdFilePtr pOutFD = NULL;
|
TdFilePtr pOutFD = NULL;
|
||||||
|
@ -354,7 +352,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
|
||||||
pVnode = pSma->pVnode;
|
pVnode = pSma->pVnode;
|
||||||
pEnv = SMA_RSMA_ENV(pSma);
|
pEnv = SMA_RSMA_ENV(pSma);
|
||||||
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
|
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
|
||||||
primaryPath = tfsGetPrimaryPath(pVnode->pTfs);
|
|
||||||
|
|
||||||
// rsma1/rsma2
|
// rsma1/rsma2
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
|
@ -375,8 +372,8 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
|
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
|
||||||
if (pTaskF->version == pWriter->ever) {
|
if (pTaskF->version == pWriter->ever) {
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer);
|
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fnameVer);
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname);
|
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
|
||||||
|
|
||||||
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
|
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
|
||||||
if (pInFD == NULL) {
|
if (pInFD == NULL) {
|
||||||
|
@ -486,8 +483,7 @@ static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData,
|
||||||
SQTaskFile qTaskFile = {
|
SQTaskFile qTaskFile = {
|
||||||
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
|
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version,
|
tdRSmaQTaskInfoGetFullName(pVnode, pHdr->index, pHdr->flag, qTaskFile.version, pVnode->pTfs, fname);
|
||||||
tfsGetPrimaryPath(pVnode->pTfs), fname);
|
|
||||||
|
|
||||||
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
|
|
|
@ -14,88 +14,72 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
|
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
|
||||||
|
|
||||||
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
|
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName) {
|
||||||
tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
|
tdRSmaGetFileName(pVnode, NULL, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
|
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
|
||||||
char *outputName) {
|
char *outputName) {
|
||||||
tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
|
tdRSmaGetFileName(pVnode, pTfs, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
|
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName) {
|
||||||
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
|
tdRSmaGetDirName(pVnode, pTfs, true, outputName);
|
||||||
int32_t rsmaLen = strlen(outputName);
|
int32_t rsmaLen = strlen(outputName);
|
||||||
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
|
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
|
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName) {
|
||||||
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
|
tdRSmaGetDirName(pVnode, pTfs, true, outputName);
|
||||||
int32_t rsmaLen = strlen(outputName);
|
int32_t rsmaLen = strlen(outputName);
|
||||||
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
|
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
|
void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
|
||||||
int8_t level, int64_t version, char *outputName) {
|
char *outputName) {
|
||||||
|
int32_t offset = 0;
|
||||||
|
|
||||||
|
// vnode
|
||||||
|
vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
|
||||||
|
offset = strlen(outputName);
|
||||||
|
|
||||||
|
// rsma
|
||||||
|
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_RSMA_DIR);
|
||||||
|
offset = strlen(outputName);
|
||||||
|
|
||||||
|
// level & suid || vgid
|
||||||
if (level >= 0 && suid > 0) {
|
if (level >= 0 && suid > 0) {
|
||||||
if (version >= 0) {
|
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%" PRIi8 "%s%" PRIi64 "%s", TD_DIRSEP, level,
|
||||||
if (pdname) {
|
TD_DIRSEP, suid, TD_DIRSEP);
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, pdname,
|
|
||||||
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname,
|
|
||||||
version);
|
|
||||||
} else {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, TD_DIRSEP,
|
|
||||||
vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, version);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pdname) {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", pdname,
|
|
||||||
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
|
|
||||||
} else {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", TD_DIRSEP, vgId,
|
|
||||||
TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if (version >= 0) {
|
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%d", TD_DIRSEP, TD_VID(pVnode));
|
||||||
if (pdname) {
|
}
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
|
offset = strlen(outputName);
|
||||||
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
|
|
||||||
} else {
|
// fname
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
|
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s", fname);
|
||||||
TD_DIRSEP, vgId, fname, version);
|
offset = strlen(outputName);
|
||||||
}
|
|
||||||
} else {
|
// version
|
||||||
if (pdname) {
|
if (version >= 0) {
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
|
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, ".%" PRIi64, version);
|
||||||
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
|
|
||||||
} else {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname,
|
|
||||||
TD_DIRSEP, vgId, fname);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
|
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName) {
|
||||||
if (pdname) {
|
int32_t offset = 0;
|
||||||
if (endWithSep) {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
|
// vnode
|
||||||
dname, TD_DIRSEP);
|
vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
|
||||||
} else {
|
offset = strlen(outputName);
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
|
|
||||||
dname);
|
// rsma
|
||||||
}
|
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR,
|
||||||
} else {
|
(endWithSep ? TD_DIRSEP : ""));
|
||||||
if (endWithSep) {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP);
|
|
||||||
} else {
|
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// smaXXXUtil ================
|
// smaXXXUtil ================
|
||||||
|
@ -117,4 +101,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
|
||||||
smaTrace("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
|
smaTrace("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
#define ROCKS_BATCH_SIZE (4096)
|
#define ROCKS_BATCH_SIZE (4096)
|
||||||
|
|
||||||
|
@ -58,16 +59,10 @@ typedef struct {
|
||||||
|
|
||||||
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
|
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
|
||||||
SVnode *pVnode = pTsdb->pVnode;
|
SVnode *pVnode = pTsdb->pVnode;
|
||||||
if (pVnode->pTfs) {
|
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
||||||
if (path) {
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "%s%s%s%scache.rdb", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
int32_t offset = strlen(path);
|
||||||
pTsdb->path, TD_DIRSEP);
|
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (path) {
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "%s%scache.rdb", pTsdb->path, TD_DIRSEP);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *myCmpName(void *state) {
|
static const char *myCmpName(void *state) {
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
// =================================================================================================
|
// =================================================================================================
|
||||||
static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) {
|
static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) {
|
||||||
|
@ -271,22 +272,20 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
|
||||||
|
|
||||||
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
|
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
|
||||||
SVnode *pVnode = pTsdb->pVnode;
|
SVnode *pVnode = pTsdb->pVnode;
|
||||||
if (pVnode->pTfs) {
|
int32_t offset = 0;
|
||||||
if (current) {
|
|
||||||
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
// CURRENT
|
||||||
pTsdb->path, TD_DIRSEP);
|
if (current) {
|
||||||
}
|
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
|
||||||
if (current_t) {
|
offset = strlen(current);
|
||||||
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP);
|
||||||
pTsdb->path, TD_DIRSEP);
|
}
|
||||||
}
|
|
||||||
} else {
|
// CURRENT.t
|
||||||
if (current) {
|
if (current_t) {
|
||||||
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
|
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
|
||||||
}
|
offset = strlen(current_t);
|
||||||
if (current_t) {
|
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP);
|
||||||
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1142,4 +1141,4 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pFS->aDFileSet);
|
taosArrayDestroy(pFS->aDFileSet);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
|
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
@ -282,8 +283,12 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
|
||||||
|
|
||||||
// SDelFile ===============================================
|
// SDelFile ===============================================
|
||||||
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
|
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
|
int32_t offset = 0;
|
||||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del");
|
|
||||||
|
vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
|
||||||
|
offset = strlen(fname);
|
||||||
|
snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP,
|
||||||
|
TD_VID(pTsdb->pVnode), pFile->commitID);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
|
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
|
||||||
|
|
|
@ -290,11 +290,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
||||||
|
|
||||||
// save info
|
// save info
|
||||||
if (pVnode->pTfs) {
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
|
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
|
||||||
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
|
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
|
||||||
|
@ -427,11 +423,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->pTfs) {
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
|
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
|
||||||
|
|
||||||
|
@ -493,16 +485,22 @@ _exit:
|
||||||
|
|
||||||
bool vnodeShouldRollback(SVnode *pVnode) {
|
bool vnodeShouldRollback(SVnode *pVnode) {
|
||||||
char tFName[TSDB_FILENAME_LEN] = {0};
|
char tFName[TSDB_FILENAME_LEN] = {0};
|
||||||
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
int32_t offset = 0;
|
||||||
VND_INFO_FNAME_TMP);
|
|
||||||
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
|
||||||
|
offset = strlen(tFName);
|
||||||
|
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
|
||||||
|
|
||||||
return taosCheckExistFile(tFName);
|
return taosCheckExistFile(tFName);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeRollback(SVnode *pVnode) {
|
void vnodeRollback(SVnode *pVnode) {
|
||||||
char tFName[TSDB_FILENAME_LEN] = {0};
|
char tFName[TSDB_FILENAME_LEN] = {0};
|
||||||
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
int32_t offset = 0;
|
||||||
VND_INFO_FNAME_TMP);
|
|
||||||
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
|
||||||
|
offset = strlen(tFName);
|
||||||
|
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
|
||||||
|
|
||||||
(void)taosRemoveFile(tFName);
|
(void)taosRemoveFile(tFName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,16 @@
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
|
int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
|
||||||
|
if (pTfs) {
|
||||||
|
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
|
||||||
|
} else {
|
||||||
|
snprintf(buf, bufLen - 1, "%s", relPath);
|
||||||
|
}
|
||||||
|
buf[bufLen - 1] = '\0';
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -26,17 +36,9 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create vnode env
|
// create vnode env
|
||||||
if (pTfs) {
|
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
if (tfsMkdirAt(pTfs, path, (SDiskID){0}) < 0) {
|
if (taosMkDir(dir)) {
|
||||||
vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
|
||||||
} else {
|
|
||||||
if (taosMkDir(path)) {
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
}
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCfg) {
|
if (pCfg) {
|
||||||
|
@ -63,11 +65,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
if (pTfs) {
|
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = vnodeLoadInfo(dir, &info);
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -185,21 +183,12 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeGetAbsDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
|
|
||||||
if (pTfs) {
|
|
||||||
snprintf(buf, bufLen, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
|
|
||||||
} else {
|
|
||||||
snprintf(buf, bufLen, "%s", relPath);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
|
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
ret = vnodeLoadInfo(dir, &info);
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -258,7 +247,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
vnodeGetAbsDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
|
vnodeGetPrimaryDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
if (vnodeLoadInfo(dir, &info) == 0) {
|
if (vnodeLoadInfo(dir, &info) == 0) {
|
||||||
if (info.config.vgId != dstVgId) {
|
if (info.config.vgId != dstVgId) {
|
||||||
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
||||||
|
@ -267,7 +256,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
|
||||||
return dstVgId;
|
return dstVgId;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeGetAbsDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
if (vnodeLoadInfo(dir, &info) < 0) {
|
if (vnodeLoadInfo(dir, &info) < 0) {
|
||||||
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
|
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -302,11 +291,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
char tdir[TSDB_FILENAME_LEN * 2] = {0};
|
char tdir[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
if (pTfs) {
|
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
|
|
||||||
}
|
|
||||||
|
|
||||||
info.config = vnodeCfgDefault;
|
info.config = vnodeCfgDefault;
|
||||||
|
|
||||||
|
|
|
@ -35,11 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
|
||||||
pInfo->commitID = ++pVnode->state.commitID;
|
pInfo->commitID = ++pVnode->state.commitID;
|
||||||
|
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
if (pVnode->pTfs) {
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
|
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
@ -64,11 +60,7 @@ static int32_t vnodeRetentionTask(void *param) {
|
||||||
SVnode *pVnode = pInfo->pVnode;
|
SVnode *pVnode = pInfo->pVnode;
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
if (pVnode->pTfs) {
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
// save info
|
// save info
|
||||||
pInfo->info.state.commitID = pInfo->commitID;
|
pInfo->info.state.commitID = pInfo->commitID;
|
||||||
|
@ -127,4 +119,4 @@ _exit:
|
||||||
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
|
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,12 +91,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
// FIXME: if commit multiple times and the config changed?
|
// FIXME: if commit multiple times and the config changed?
|
||||||
if (!pReader->cfgDone) {
|
if (!pReader->cfgDone) {
|
||||||
char fName[TSDB_FILENAME_LEN];
|
char fName[TSDB_FILENAME_LEN];
|
||||||
if (pReader->pVnode->pTfs) {
|
int32_t offset = 0;
|
||||||
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pReader->pVnode->pTfs), TD_DIRSEP,
|
|
||||||
pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME);
|
vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN);
|
||||||
} else {
|
offset = strlen(fName);
|
||||||
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s", pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME);
|
snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
|
||||||
}
|
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
|
TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
|
||||||
if (NULL == pFile) {
|
if (NULL == pFile) {
|
||||||
|
@ -344,11 +343,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
.applyTerm = pWriter->info.state.commitTerm};
|
.applyTerm = pWriter->info.state.commitTerm};
|
||||||
pVnode->statis = pWriter->info.statis;
|
pVnode->statis = pWriter->info.statis;
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
if (pWriter->pVnode->pTfs) {
|
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeCommitInfo(dir);
|
vnodeCommitInfo(dir);
|
||||||
} else {
|
} else {
|
||||||
|
@ -400,12 +395,7 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
|
||||||
|
|
||||||
// modify info as needed
|
// modify info as needed
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
if (pWriter->pVnode->pTfs) {
|
vnodeGetPrimaryDir(pWriter->pVnode->path, pWriter->pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pWriter->pVnode->pTfs), TD_DIRSEP,
|
|
||||||
pWriter->pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
SVnodeStats vndStats = pWriter->info.config.vndStats;
|
SVnodeStats vndStats = pWriter->info.config.vndStats;
|
||||||
SVnode *pVnode = pWriter->pVnode;
|
SVnode *pVnode = pWriter->pVnode;
|
||||||
|
|
Loading…
Reference in New Issue