enh: unify handling of vnode primary dir for sma

This commit is contained in:
Benguang Zhao 2023-07-12 12:58:34 +08:00
parent bbf58d2d9d
commit 4c3bf41178
5 changed files with 75 additions and 107 deletions

View File

@ -229,11 +229,11 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
@ -244,9 +244,9 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName);
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
char *outputName);
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName);
#ifdef __cplusplus
}

View File

@ -14,6 +14,7 @@
*/
#include "sma.h"
#include "vnd.h"
// =================================================================================================
@ -157,25 +158,15 @@ _exit:
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
if (pVnode->pTfs) {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
} else {
#if 0
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP);
}
#endif
}
int32_t offset = 0;
vnodeGetAbsDir(pVnode->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
offset = strlen(current);
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%srsma%sPRESENT", TD_DIRSEP, TD_DIRSEP);
vnodeGetAbsDir(pVnode->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
offset = strlen(current_t);
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%srsma%sPRESENT.t", TD_DIRSEP, TD_DIRSEP);
}
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
@ -309,8 +300,7 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(pVnode, preTaskF->suid, preTaskF->level, preTaskF->version, pVnode->pTfs, fname);
(void)taosRemoveFile(fname);
taosArrayRemove(pFSOld->aQTaskInf, idx);
}
@ -341,9 +331,9 @@ static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
// main.tdb =========
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version,
pVnode->pTfs, fnameVer);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
if (taosCheckExistFile(fnameVer)) {
if (taosRenameFile(fnameVer, fname) < 0) {
@ -597,8 +587,7 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
if (nRef == 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fname);
if (taosRemoveFile(fname) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {

View File

@ -260,7 +260,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
void *pStreamState = NULL;
// set the backend of stream state
tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
tdRSmaQTaskInfoGetFullPathEx(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
if (!taosCheckExistFile(taskInfDir)) {
char *s = taosStrdup(taskInfDir);
if (taosMulMkDir(taosDirName(s)) != 0) {
@ -1258,9 +1258,8 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
pRSmaInfo->suid, i + 1);
// qTaskInfo file
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs),
fnameVer);
tdRSmaQTaskInfoGetFullName(pVnode, pRSmaInfo->suid, i + 1, -1, pVnode->pTfs, fname);
tdRSmaQTaskInfoGetFullName(pVnode, pRSmaInfo->suid, i + 1, version, pVnode->pTfs, fnameVer);
if (taosCheckExistFile(fnameVer)) {
smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer);
}

View File

@ -117,8 +117,7 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf)
continue;
}
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
fname);
tdRSmaQTaskInfoGetFullName(pVnode, qTaskF->suid, qTaskF->level, version, pVnode->pTfs, fname);
if (!taosCheckExistFile(fname)) {
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
", version %" PRIi64 " failed since %s not exist",
@ -340,7 +339,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
SSmaEnv* pEnv = NULL;
SRSmaStat* pStat = NULL;
SRSmaSnapWriter* pWriter = *ppWriter;
const char* primaryPath = NULL;
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
TdFilePtr pOutFD = NULL;
@ -354,7 +352,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
pVnode = pSma->pVnode;
pEnv = SMA_RSMA_ENV(pSma);
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
primaryPath = tfsGetPrimaryPath(pVnode->pTfs);
// rsma1/rsma2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
@ -375,8 +372,8 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
for (int32_t i = 0; i < size; ++i) {
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
if (pTaskF->version == pWriter->ever) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, pTaskF->version, pVnode->pTfs, fnameVer);
tdRSmaQTaskInfoGetFullName(pVnode, pTaskF->suid, pTaskF->level, -1, pVnode->pTfs, fname);
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
if (pInFD == NULL) {
@ -486,8 +483,7 @@ static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData,
SQTaskFile qTaskFile = {
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(pVnode, pHdr->index, pHdr->flag, qTaskFile.version, pVnode->pTfs, fname);
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!fp) {

View File

@ -14,88 +14,72 @@
*/
#include "sma.h"
#include "vnd.h"
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(pVnode, NULL, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
char *outputName) {
tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
tdRSmaGetFileName(pVnode, pTfs, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName) {
tdRSmaGetDirName(pVnode, pTfs, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName) {
tdRSmaGetDirName(pVnode, pTfs, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
}
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName) {
void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
char *outputName) {
int32_t offset = 0;
// vnode
vnodeGetAbsDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
offset = strlen(outputName);
// rsma
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_RSMA_DIR);
offset = strlen(outputName);
// level & suid || vgid
if (level >= 0 && suid > 0) {
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname,
version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, version);
}
} else {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
}
}
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%" PRIi8 "%s%" PRIi64 "%s", TD_DIRSEP, level,
TD_DIRSEP, suid, TD_DIRSEP);
} else {
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname, version);
}
} else {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname);
}
}
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%d", TD_DIRSEP, TD_VID(pVnode));
}
offset = strlen(outputName);
// fname
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s", fname);
offset = strlen(outputName);
// version
if (version >= 0) {
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, ".%" PRIi64, version);
}
}
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
if (pdname) {
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname);
}
} else {
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname);
}
}
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName) {
int32_t offset = 0;
// vnode
vnodeGetAbsDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
offset = strlen(outputName);
// rsma
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR,
(endWithSep ? TD_DIRSEP : ""));
}
// smaXXXUtil ================
@ -117,4 +101,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
smaTrace("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
return TSDB_CODE_SUCCESS;
}
}