Merge pull request #17438 from taosdata/fix/TD-19254-D
chore: remove codes unused currently for sma
This commit is contained in:
commit
c30f6a7720
|
@ -258,6 +258,7 @@ enum {
|
||||||
TD_FTYPE_RSMA_QTASKINFO = 0,
|
TD_FTYPE_RSMA_QTASKINFO = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#if 0
|
||||||
struct STFile {
|
struct STFile {
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
STFInfo info;
|
STFInfo info;
|
||||||
|
@ -287,6 +288,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile);
|
||||||
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
void tdCloseTFile(STFile *pTFile);
|
void tdCloseTFile(STFile *pTFile);
|
||||||
void tdDestroyTFile(STFile *pTFile);
|
void tdDestroyTFile(STFile *pTFile);
|
||||||
|
#endif
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||||
char *outputName);
|
char *outputName);
|
||||||
|
|
|
@ -17,14 +17,17 @@
|
||||||
|
|
||||||
extern SSmaMgmt smaMgmt;
|
extern SSmaMgmt smaMgmt;
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
|
||||||
|
#endif
|
||||||
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
|
||||||
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
|
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* @brief Only applicable to Rollup SMA
|
* @brief Only applicable to Rollup SMA
|
||||||
*
|
*
|
||||||
|
@ -48,6 +51,7 @@ int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
|
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Only applicable to Rollup SMA
|
* @brief Only applicable to Rollup SMA
|
||||||
|
@ -108,6 +112,7 @@ int32_t smaBegin(SSma *pSma) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* @brief pre-commit for rollup sma(sync commit).
|
* @brief pre-commit for rollup sma(sync commit).
|
||||||
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||||
|
@ -169,6 +174,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
|
||||||
#endif
|
#endif
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// SQTaskFile ======================================================
|
// SQTaskFile ======================================================
|
||||||
|
|
||||||
|
@ -230,6 +236,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* @brief post-commit for rollup sma
|
* @brief post-commit for rollup sma
|
||||||
* 1) clean up the outdated qtaskinfo files
|
* 1) clean up the outdated qtaskinfo files
|
||||||
|
@ -249,6 +256,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Rsma async commit implementation(only do some necessary light weighted task)
|
* @brief Rsma async commit implementation(only do some necessary light weighted task)
|
||||||
|
|
|
@ -15,8 +15,6 @@
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
|
||||||
#define RSMA_QTASKINFO_BUFSIZE (32768) // size
|
|
||||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
|
||||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||||
|
@ -48,23 +46,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SR
|
||||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||||
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
|
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
|
||||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
|
||||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
|
|
||||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
|
|
||||||
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
|
||||||
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
||||||
|
|
||||||
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
|
||||||
// adapt accordingly if definition of SRSmaInfo update
|
|
||||||
SRSmaInfo *pResult = NULL;
|
|
||||||
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
|
||||||
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN));
|
|
||||||
ASSERT(pResult->pTSchema->numOfCols > 1);
|
|
||||||
return pResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SRSmaQTaskInfoItem {
|
struct SRSmaQTaskInfoItem {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
@ -104,12 +89,6 @@ void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, con
|
||||||
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
|
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
|
||||||
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
|
|
||||||
|
|
||||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
||||||
// Note: free/kill may in RC
|
// Note: free/kill may in RC
|
||||||
if (!taskHandle || !(*taskHandle)) return;
|
if (!taskHandle || !(*taskHandle)) return;
|
||||||
|
@ -803,6 +782,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
@ -820,6 +800,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief sync mode
|
* @brief sync mode
|
||||||
|
@ -1189,65 +1170,6 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
|
|
||||||
SVnode *pVnode = pSma->pVnode;
|
|
||||||
STFile tFile = {0};
|
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
|
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
|
|
||||||
if (qTaskFileVer > 0) {
|
|
||||||
smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
|
|
||||||
TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
|
|
||||||
type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
STFInfo tFileInfo = {0};
|
|
||||||
if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
SRSmaQTaskInfoIter fIter = {0};
|
|
||||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
|
|
||||||
// restored successfully from committed or sync
|
|
||||||
smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
|
|
||||||
type, qTaskFileVer);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_err:
|
|
||||||
smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
|
|
||||||
TD_VID(pVnode), type, qTaskFileVer, terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief reload ts data from checkpoint
|
* @brief reload ts data from checkpoint
|
||||||
*
|
*
|
||||||
|
@ -1270,19 +1192,12 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
// step 2: reload ts data from checkpoint
|
||||||
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
|
|
||||||
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// step 3: reload ts data from checkpoint
|
|
||||||
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
|
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 4: open SRSmaFS for qTaskFiles
|
// step 3: open SRSmaFS for qTaskFiles
|
||||||
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
|
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -1295,191 +1210,6 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Restore from SRSmaQTaskInfoItem
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param pItem
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
|
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
|
||||||
void *qTaskInfo = NULL;
|
|
||||||
|
|
||||||
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid);
|
|
||||||
if (!pRSmaInfo) {
|
|
||||||
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pItem->type == TSDB_RETENTION_L1) {
|
|
||||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0);
|
|
||||||
} else if (pItem->type == TSDB_RETENTION_L2) {
|
|
||||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!qTaskInfo) {
|
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
|
||||||
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
|
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
|
||||||
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
|
|
||||||
pItem->type, terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
|
|
||||||
pItem->type);
|
|
||||||
|
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) {
|
|
||||||
memset(pIter, 0, sizeof(*pIter));
|
|
||||||
pIter->pTFile = pTFile;
|
|
||||||
pIter->offset = TD_FILE_HEAD_SIZE;
|
|
||||||
|
|
||||||
if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
|
|
||||||
pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
|
|
||||||
} else {
|
|
||||||
pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
|
|
||||||
pIter->nAlloc = TD_FILE_HEAD_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->pBuf = taosMemoryMalloc(pIter->nAlloc);
|
|
||||||
if (!pIter->pBuf) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
pIter->qBuf = pIter->pBuf;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) {
|
|
||||||
STFile *pTFile = pIter->pTFile;
|
|
||||||
int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;
|
|
||||||
|
|
||||||
if (pIter->offset >= pIter->fsize) {
|
|
||||||
*isFinish = true;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
|
|
||||||
nBytes = pIter->fsize - pIter->offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t infoLen = 0;
|
|
||||||
taosDecodeFixedI32(pIter->pBuf, &infoLen);
|
|
||||||
if (infoLen > nBytes) {
|
|
||||||
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
|
||||||
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
|
||||||
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
if (pIter->nAlloc < infoLen) {
|
|
||||||
pIter->nAlloc = infoLen;
|
|
||||||
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
|
||||||
if (!pBuf) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
pIter->pBuf = pBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
nBytes = infoLen;
|
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->qBuf = pIter->pBuf;
|
|
||||||
pIter->offset += nBytes;
|
|
||||||
pIter->nBytes = nBytes;
|
|
||||||
pIter->nBufPos = 0;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
|
|
||||||
while (1) {
|
|
||||||
// block iter
|
|
||||||
bool isFinish = false;
|
|
||||||
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
if (isFinish) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// consume the block
|
|
||||||
int32_t qTaskInfoLenWithHead = 0;
|
|
||||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
|
||||||
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
|
||||||
smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
|
|
||||||
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
|
|
||||||
SRSmaQTaskInfoItem infoItem = {0};
|
|
||||||
pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type);
|
|
||||||
pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid);
|
|
||||||
infoItem.qTaskInfo = pIter->qBuf;
|
|
||||||
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
|
|
||||||
// do the restore job
|
|
||||||
smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
|
|
||||||
type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
|
|
||||||
tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
|
|
||||||
|
|
||||||
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
|
|
||||||
pIter->nBufPos += qTaskInfoLenWithHead;
|
|
||||||
|
|
||||||
if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) {
|
|
||||||
// prepare and load next block in the file
|
|
||||||
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// prepare and load next block in the file
|
|
||||||
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
SSma *pSma = pRSmaStat->pSma;
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
@ -1523,148 +1253,6 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|
||||||
SSma *pSma = pRSmaStat->pSma;
|
|
||||||
SVnode *pVnode = pSma->pVnode;
|
|
||||||
int32_t vid = SMA_VID(pSma);
|
|
||||||
int64_t toffset = 0;
|
|
||||||
bool isFileCreated = false;
|
|
||||||
|
|
||||||
if (taosHashGetSize(pInfoHash) <= 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *infoHash = taosHashIterate(pInfoHash, NULL);
|
|
||||||
if (!infoHash) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
|
|
||||||
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
|
|
||||||
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
|
|
||||||
pRSmaStat->commitAppliedVer, fsMaxVer);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
STFile tFile = {0};
|
|
||||||
#if 0
|
|
||||||
if (pRSmaStat->commitAppliedVer > 0) {
|
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
|
|
||||||
isFileCreated = true;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
while (infoHash) {
|
|
||||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
|
||||||
|
|
||||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
|
||||||
infoHash = taosHashIterate(pInfoHash, infoHash);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
|
||||||
#if 0
|
|
||||||
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
|
|
||||||
#endif
|
|
||||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
|
|
||||||
if (!taskInfo) {
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *pOutput = NULL;
|
|
||||||
int32_t len = 0;
|
|
||||||
int8_t type = (int8_t)(i + 1);
|
|
||||||
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
|
|
||||||
smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
|
|
||||||
i + 1, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (!pOutput || len <= 0) {
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64
|
|
||||||
" level %d serialize qTaskInfo success but no output(len %d), not persist",
|
|
||||||
vid, pRSmaInfo->suid, i + 1, len);
|
|
||||||
taosMemoryFreeClear(pOutput);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid,
|
|
||||||
pRSmaInfo->suid, i + 1, len);
|
|
||||||
|
|
||||||
if (!isFileCreated) {
|
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
|
||||||
TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
|
|
||||||
isFileCreated = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0};
|
|
||||||
void *pTmpBuf = &tmpBuf;
|
|
||||||
int32_t headLen = 0;
|
|
||||||
headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN);
|
|
||||||
headLen += taosEncodeFixedI8(&pTmpBuf, type);
|
|
||||||
headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid);
|
|
||||||
|
|
||||||
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
|
|
||||||
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
|
|
||||||
pRSmaInfo->suid, i + 1, headLen, toffset);
|
|
||||||
tdAppendTFile(&tFile, pOutput, len, &toffset);
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid,
|
|
||||||
pRSmaInfo->suid, i + 1, len, toffset);
|
|
||||||
|
|
||||||
taosMemoryFree(pOutput);
|
|
||||||
}
|
|
||||||
|
|
||||||
infoHash = taosHashIterate(pInfoHash, infoHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isFileCreated) {
|
|
||||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
|
||||||
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
|
||||||
tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
}
|
|
||||||
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_err:
|
|
||||||
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
|
|
||||||
if (isFileCreated) {
|
|
||||||
tdRemoveTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief trigger to get rsma result in async mode
|
* @brief trigger to get rsma result in async mode
|
||||||
*
|
*
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
|
||||||
// smaFileUtil ================
|
// smaFileUtil ================
|
||||||
|
#if 0
|
||||||
#define TD_FILE_STATE_OK 0
|
#define TD_FILE_STATE_OK 0
|
||||||
#define TD_FILE_STATE_BAD 1
|
#define TD_FILE_STATE_BAD 1
|
||||||
|
|
||||||
|
@ -182,6 +182,8 @@ void tdCloseTFile(STFile *pTFile) {
|
||||||
|
|
||||||
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||||
char *outputName) {
|
char *outputName) {
|
||||||
if (version < 0) {
|
if (version < 0) {
|
||||||
|
@ -221,6 +223,7 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||||
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||||
TD_TFILE_SET_CLOSED(pTFile);
|
TD_TFILE_SET_CLOSED(pTFile);
|
||||||
|
@ -286,6 +289,8 @@ int32_t tdRemoveTFile(STFile *pTFile) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
// smaXXXUtil ================
|
// smaXXXUtil ================
|
||||||
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
|
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
|
||||||
void *pResult = taosAcquireRef(rsetId, refId);
|
void *pResult = taosAcquireRef(rsetId, refId);
|
||||||
|
|
Loading…
Reference in New Issue