refactor: rsma restore

This commit is contained in:
Cary Xu 2022-06-28 14:11:44 +08:00
parent 1bcdca11cb
commit 5be089b888
6 changed files with 357 additions and 353 deletions

View File

@ -28,7 +28,8 @@ extern "C" {
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// #define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
@ -205,16 +206,16 @@ struct STFile {
uint8_t state;
};
#define TD_FILE_F(tf) (&((tf)->f))
#define TD_FILE_PFILE(tf) ((tf)->pFile)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname)
#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
#define TD_TFILE_F(tf) (&((tf)->f))
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_FULL_NAME(tf) (TD_TFILE_F(tf)->aname)
#define TD_TFILE_REL_NAME(tf) (TD_TFILE_F(tf)->rname)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_TFILE_DID(tf) (TD_TFILE_F(tf)->did)
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);

View File

@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader;
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
#define VNODE_TSMA_DIR "tsma"
#define VNODE_RSMA_DIR "rsma"
#define VNODE_RSMA0_DIR "tsdb"
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma
int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma);
int32_t smaCloseEnv(SSma* pSma);
int32_t smaCloseEx(SSma* pSma);

View File

@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
}
// restore the rsma
#if 0
#if 1
if (rsmaRestore(pSma) < 0) {
goto _err;
}
@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) {
return 0;
}
int32_t smaClose(SSma *pSma) {
smaCloseEnv(pSma);
smaCloseEx(pSma);
return 0;
}
/**
* @brief rsma env restore
*

View File

@ -15,13 +15,14 @@
#include "sma.h"
#define RSMA_QTASKINFO_PERSIST_MS 7200000
#define RSMA_QTASKINFO_PERSIST_MS 5000 // 7200000
#define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T;
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskFIter SRSmaQTaskFIter;
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
@ -32,11 +33,11 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaPersistTrigger(void *param, void *tmrId);
static void *tdRSmaPersistExec(void *param);
static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName);
static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
struct SRSmaInfoItem {
@ -63,22 +64,23 @@ struct SRSmaQTaskInfoItem {
void *qTaskInfo;
};
struct SRSmaQTaskFIter {
struct SRSmaQTaskInfoIter {
STFile *pTFile;
int64_t offset;
int64_t fsize;
int32_t nBytes;
int32_t nAlloc;
char *buf;
char *pBuf;
// ------------
char *qBuf; // for iterator
int32_t nBufPos;
};
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
return lenWithHead - sizeof(int32_t) - sizeof(int8_t) - sizeof(int64_t);
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
}
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskFIter *pIter) { taosMemoryFreeClear(pIter->buf); }
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC
@ -294,7 +296,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
ASSERT(0); // TODO: free original pRSmaInfo if exists abnormally
smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS;
}
@ -338,10 +340,10 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err;
} else {
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid);
}
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid);
// start the persist timer
if (TASK_TRIGGER_STAT_INIT ==
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) {
@ -356,10 +358,9 @@ _err:
}
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
*
* @param pTsdb
* @param pMeta
* @param pVnode
* @param pReq
* @return int32_t
*/
@ -695,8 +696,257 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS;
}
static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) {
tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
// step 1: iterate all stables to restore the rsma env
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
taosArrayDestroy(suidList);
smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
int32_t arrSize = taosArrayGetSize(suidList);
if (arrSize == 0) {
taosArrayDestroy(suidList);
smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < arrSize; ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("vgId:%d, rsma restore, suid[%d] is %" PRIi64, TD_VID(pVnode), i, suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) {
smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
" qmsgLen:%" PRIi32,
TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
}
if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
}
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName);
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
goto _err;
}
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
goto _err;
}
SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
goto _err;
}
SRSmaQTaskInfoItem infoItem = {0};
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);
goto _err;
}
tdRSmaQTaskInfoIterDestroy(&fIter);
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
taosArrayDestroy(suidList);
smaError("failed to restore rsma task since %s", terrstr());
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv);
SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL;
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pItem->suid, sizeof(pItem->suid));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS;
}
if (pItem->type == 1) {
qTaskInfo = pRSmaInfo->items[0].taskInfo;
} else if (pItem->type == 2) {
qTaskInfo = pRSmaInfo->items[1].taskInfo;
} else {
ASSERT(0);
}
if (!qTaskInfo) {
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS;
}
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
pItem->type, terrstr(terrno));
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, pItem->type);
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) {
memset(pIter, 0, sizeof(*pIter));
pIter->pTFile = pTFile;
pIter->offset = TD_FILE_HEAD_SIZE;
if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
return TSDB_CODE_FAILED;
}
if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
} else {
pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
}
if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
pIter->nAlloc = TD_FILE_HEAD_SIZE;
}
pIter->pBuf = taosMemoryMalloc(pIter->nAlloc);
if (!pIter->pBuf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pIter->qBuf = pIter->pBuf;
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) {
STFile *pTFile = pIter->pTFile;
int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;
if (pIter->offset >= pIter->fsize) {
*isFinish = true;
return TSDB_CODE_SUCCESS;
}
if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
nBytes = pIter->fsize - pIter->offset;
}
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
int32_t infoLen = 0;
taosDecodeFixedI32(pIter->qBuf, &infoLen);
if (infoLen > nBytes) {
ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE);
pIter->nAlloc = infoLen;
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
if (!pBuf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pIter->pBuf = pBuf;
pIter->qBuf = pIter->pBuf;
nBytes = infoLen;
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
}
pIter->offset += nBytes;
pIter->nBytes = nBytes;
pIter->nBufPos = 0;
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
while (1) {
// block iter
bool isFinish = false;
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (isFinish) {
return TSDB_CODE_SUCCESS;
}
// consume the block
int32_t qTaskInfoLenWithHead = 0;
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return TSDB_CODE_FAILED;
}
while (1) {
if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
SRSmaQTaskInfoItem infoItem = {0};
pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type);
pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid);
infoItem.qTaskInfo = pIter->qBuf;
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
// do the restore job
smaDebug("vgId:%d, restore the qtask info %s offset:%" PRIi64 "\n", SMA_VID(pSma),
TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
pIter->nBufPos += qTaskInfoLenWithHead;
if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) {
// prepare and load next block in the file
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
break;
}
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
continue;
}
// prepare and load next block in the file
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
break;
}
}
return TSDB_CODE_SUCCESS;
}
static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName) {
tdGetVndFileName(vid, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName);
}
static void *tdRSmaPersistExec(void *param) {
@ -704,6 +954,7 @@ static void *tdRSmaPersistExec(void *param) {
SRSmaStat *pRSmaStat = param;
SSma *pSma = pRSmaStat->pSma;
STfs *pTfs = pSma->pVnode->pTfs;
int32_t vid = SMA_VID(pSma);
int64_t toffset = 0;
bool isFileCreated = false;
@ -716,26 +967,16 @@ static void *tdRSmaPersistExec(void *param) {
goto _end;
}
STFile tFile = {0};
int32_t vid = SMA_VID(pSma);
STFile tFile = {0};
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
#if 0
smaDebug("table %" PRIi64 " sleep 15s start ...", pRSmaInfo->items[0].pRsmaInfo->suid);
for (int32_t i = 15; i > 0; --i) {
taosSsleep(1);
smaDebug("table %" PRIi64 " countdown %d", pRSmaInfo->items[0].pRsmaInfo->suid, i);
}
smaDebug("table %" PRIi64 " sleep 15s end ...", pRSmaInfo->items[0].pRsmaInfo->suid);
#endif
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo;
if (!taskInfo) {
smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
continue;
}
char *pOutput = NULL;
int32_t len = 0;
int8_t type = (int8_t)(i + 1);
@ -743,69 +984,77 @@ static void *tdRSmaPersistExec(void *param) {
smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1,
terrstr(terrno));
goto _err;
} else {
if (!pOutput) {
smaDebug("vgId:%d, table %" PRIi64
" level %d serialize rsma task success but no output(len %d) and no need to persist",
vid, pRSmaInfo->suid, i + 1, len);
continue;
} else if (len <= 0) {
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and no need to persist",
vid, pRSmaInfo->suid, i + 1, len);
taosMemoryFree(pOutput);
}
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and need persist", vid,
pRSmaInfo->suid, i + 1, len);
#if 1
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
} else {
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
}
#endif
}
if (!pOutput || len <= 0) {
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist",
vid, pRSmaInfo->suid, i + 1, len);
taosMemoryFreeClear(pOutput);
continue;
}
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid,
pRSmaInfo->suid, i + 1, len);
#if 0
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
} else {
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
}
#endif
if (!isFileCreated) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName);
tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName);
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
tdCreateTFile(&tFile, pTfs, true, -1);
isFileCreated = true;
}
len += (sizeof(len) + sizeof(type) + sizeof(pRSmaInfo->suid));
tdAppendTFile(&tFile, &len, sizeof(len), &toffset);
tdAppendTFile(&tFile, &type, sizeof(type), &toffset);
tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset);
char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0};
void *pTmpBuf = &tmpBuf;
int32_t headLen = 0;
headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN);
headLen += taosEncodeFixedI8(&pTmpBuf, type);
headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid);
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
smaDebug("vgId:%d, table %" PRIi64 " level %d head part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid,
i + 1, headLen, toffset);
tdAppendTFile(&tFile, pOutput, len, &toffset);
smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid,
i + 1, len, toffset);
taosMemoryFree(pOutput);
}
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
}
_normal:
if (isFileCreated) {
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno));
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno));
tdCloseTFile(&tFile);
tdRemoveTFile(&tFile);
goto _err;
} else {
smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_FILE_FULL_NAME(&tFile));
smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
}
tdCloseTFile(&tFile);
char newFName[TSDB_FILENAME_LEN];
strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]);
strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
if (taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName) != 0) {
smaError("vgId:%d, failed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName);
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
goto _err;
} else {
smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName);
smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
}
}
goto _end;
@ -841,13 +1090,14 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_INACTIVE,
TASK_TRIGGER_STAT_ACTIVE)) {
smaDebug("persist task is active again");
smaDebug("vgId:%d, persist task is active again", SMA_VID(pRSmaStat->pSma));
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) {
smaDebug(" persist task is cancelled and set finished");
smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma));
} else {
smaWarn("persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)),
SMA_VID(pRSmaStat->pSma));
ASSERT(0);
}
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
@ -864,7 +1114,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *pRSmaStat = param;
int8_t tmrStat =
int8_t tmrStat =
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (tmrStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
@ -872,7 +1123,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) {
smaDebug("rsma persistence start since active");
smaDebug("vgId:%d, rsma persistence start since active", SMA_VID(pRSmaStat->pSma));
// start persist task
tdRSmaPersistTask(pRSmaStat);
@ -895,252 +1146,6 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
} break;
default: {
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
ASSERT(0);
} break;
}
}
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
// step 1: iterate all stables to restore the rsma env
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
smaError("vgId:%d, failed to restore rsma since get stb id list error: %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
if (taosArrayGetSize(suidList) == 0) {
smaDebug("vgId:%d no need to restore rsma since empty stb id list", TD_VID(pVnode));
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) {
smaError("vgId:%d failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < 2; ++i) {
smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode),
suid, i, param->maxdelay[i], i, param->watermark[i]);
}
if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
smaError("vgId:%d failed to retore rsma env for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err;
}
}
}
// step 2: retrieve qtaskinfo object from the rsma/qtaskinfo file and restore
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName);
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
goto _err;
}
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
goto _err;
}
SRSmaQTaskFIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
goto _err;
}
SRSmaQTaskInfoItem infoItem = {0};
bool isEnd = false;
int32_t code = 0;
while ((code = tdRSmaQTaskInfoIterNext(&fIter, &infoItem, &isEnd)) == 0) {
if (isEnd) {
break;
}
if ((code = tdRSmaQTaskInfoItemRestore(pSma, &infoItem)) < 0) {
break;
}
}
tdRSmaQTaskInfoIterDestroy(&fIter);
if (code < 0) {
goto _err;
}
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS;
_err:
ASSERT(0);
metaReaderClear(&mr);
taosArrayDestroy(suidList);
smaError("failed to restore rsma info since %s", terrstr());
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem) {
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv);
SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL;
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &infoItem->suid, sizeof(infoItem->suid));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaDebug("vgId:%d, no restore as no rsma info for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid);
return TSDB_CODE_SUCCESS;
}
if (infoItem->type == 1) {
qTaskInfo = pRSmaInfo->items[0].taskInfo;
} else if (infoItem->type == 2) {
qTaskInfo = pRSmaInfo->items[1].taskInfo;
} else {
ASSERT(0);
}
if (!qTaskInfo) {
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid);
return TSDB_CODE_SUCCESS;
}
if (qDeserializeTaskStatus(qTaskInfo, infoItem->qTaskInfo, infoItem->len) < 0) {
smaError("vgId:%d, restore rsma failed for suid:%" PRIi64 " level %d since %s", SMA_VID(pSma), infoItem->suid,
infoItem->type, terrstr(terrno));
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, restore rsma success for suid:%" PRIi64 " level %d", SMA_VID(pSma), infoItem->suid,
infoItem->type);
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) {
memset(pIter, 0, sizeof(*pIter));
pIter->pTFile = pTFile;
pIter->offset = TD_FILE_HEAD_SIZE;
if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
return TSDB_CODE_FAILED;
}
if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
} else {
pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
}
if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
pIter->nAlloc = TD_FILE_HEAD_SIZE;
}
pIter->buf = taosMemoryMalloc(pIter->nAlloc);
if (!pIter->buf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish) {
STFile *pTFile = pIter->pTFile;
int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;
if (pIter->offset >= pIter->fsize) {
*isFinish = true;
return TSDB_CODE_SUCCESS;
}
if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
nBytes = pIter->fsize - pIter->offset;
}
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
int32_t infoLen = 0;
taosDecodeFixedI32(pIter->buf, &infoLen);
if (infoLen > nBytes) {
ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE);
pIter->nAlloc = infoLen;
void *pBuf = taosMemoryRealloc(pIter->buf, infoLen);
if (!pBuf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pIter->buf = pBuf;
nBytes = infoLen;
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
}
pIter->offset += nBytes;
pIter->nBytes = nBytes;
pIter->nBufPos = 0;
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd) {
while (1) {
// block iter
bool isFinish = false;
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (isFinish) {
*isEnd = true;
return TSDB_CODE_SUCCESS;
}
// consume the block
int32_t qTaskInfoLenWithHead = 0;
pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < 0) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return TSDB_CODE_FAILED;
}
while (1) {
if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
pIter->buf = taosDecodeFixedI8(pIter->buf, &pItem->type);
pIter->buf = taosDecodeFixedI64(pIter->buf, &pItem->suid);
pItem->qTaskInfo = pIter->buf;
pItem->len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
// do the restore job
printf("%s:%d ###### restore the qtask info offset:%" PRIi64 "\n", __func__, __LINE__, pIter->offset);
pIter->buf = POINTER_SHIFT(pIter->buf, pItem->len);
pIter->nBufPos += qTaskInfoLenWithHead;
pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead);
continue;
}
// prepare and load next block in the file
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
break;
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -22,7 +22,6 @@
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
}
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile));
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
}
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
ASSERT(TD_FILE_OPENED(pTFile));
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence);
int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
}
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) {
ASSERT(TD_FILE_OPENED(pTFile));
ASSERT(TD_TFILE_OPENED(pTFile));
return taosFStatFile(pTFile->pFile, size, NULL);
}
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile));
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
if (nread < 0) {
@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
uint32_t _version;
ASSERT(TD_FILE_OPENED(pTFile));
ASSERT(TD_TFILE_OPENED(pTFile));
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
}
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TD_FILE_OPENED(pTFile));
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t toffset;
@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
return -1;
}
#if 1
smaDebug("append to file %s, offset:%" PRIi64 " + nbyte:%" PRIi64 " =%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset,
nbyte, toffset + nbyte);
#endif
ASSERT(pTFile->info.fsize == toffset);
if (offset) {
@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
}
int32_t tdOpenTFile(STFile *pTFile, int flags) {
ASSERT(!TD_FILE_OPENED(pTFile));
ASSERT(!TD_TFILE_OPENED(pTFile));
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) {
}
void tdCloseTFile(STFile *pTFile) {
if (TD_FILE_OPENED(pTFile)) {
if (TD_TFILE_OPENED(pTFile)) {
taosCloseFile(&pTFile->pFile);
TD_FILE_SET_CLOSED(pTFile);
TD_TFILE_SET_CLOSED(pTFile);
}
}
@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
char fullname[TSDB_FILENAME_LEN];
SDiskID did = {0};
TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
TD_FILE_SET_CLOSED(pTFile);
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
TD_TFILE_SET_CLOSED(pTFile);
memset(&(pTFile->info), 0, sizeof(pTFile->info));
pTFile->info.magic = TD_FILE_INIT_MAGIC;
@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
if (errno == ENOENT) {
// Try to create directory recursively
char *s = strdup(TD_FILE_REL_NAME(pTFile));
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) {
char *s = strdup(TD_TFILE_REL_NAME(pTFile));
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_TFILE_DID(pTFile)) < 0) {
taosMemoryFreeClear(s);
return -1;
}
taosMemoryFreeClear(s);
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
return 0;
}
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); }
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_TFILE_F(pTFile)); }
// smaXXXUtil ================
// ...

View File

@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return pVnode;
_err:
if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pSma) smaCloseEnv(pVnode->pSma);
if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaCloseEx(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
return NULL;