diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 4b42ab5263..23ad70bad3 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -205,16 +205,16 @@ struct STFile { uint8_t state; }; -#define TD_FILE_F(tf) (&((tf)->f)) -#define TD_FILE_PFILE(tf) ((tf)->pFile) -#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) -#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname) -#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname) -#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) -#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf)) -#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL) -#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s)) -#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did) +#define TD_TFILE_F(tf) (&((tf)->f)) +#define TD_TFILE_PFILE(tf) ((tf)->pFile) +#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) +#define TD_TFILE_FULL_NAME(tf) (TD_TFILE_F(tf)->aname) +#define TD_TFILE_REL_NAME(tf) (TD_TFILE_F(tf)->rname) +#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) +#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf)) +#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL) +#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s)) +#define TD_TFILE_DID(tf) (TD_TFILE_F(tf)->did) int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname); int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 68ed6dde51..baead763ad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader; #define VNODE_TQ_DIR "tq" #define VNODE_WAL_DIR "wal" #define VNODE_TSMA_DIR "tsma" +#define VNODE_RSMA_DIR "rsma" #define VNODE_RSMA0_DIR "tsdb" #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" @@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool // sma int32_t smaOpen(SVnode* pVnode); -int32_t smaClose(SSma* pSma); int32_t smaCloseEnv(SSma* pSma); int32_t smaCloseEx(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 88ed7426f7..641b8c7934 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) { } // restore the rsma -#if 0 +#if 1 if (rsmaRestore(pSma) < 0) { goto _err; } @@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) { return 0; } -int32_t smaClose(SSma *pSma) { - smaCloseEnv(pSma); - smaCloseEx(pSma); - return 0; -} - /** * @brief rsma env restore * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ed5b6f4055..34c3ef8fca 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -17,11 +17,12 @@ #define RSMA_QTASKINFO_PERSIST_MS 7200000 #define RSMA_QTASKINFO_BUFSIZE 32768 +#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; -typedef struct SRSmaQTaskFIter SRSmaQTaskFIter; +typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); @@ -32,11 +33,11 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaPersistTrigger(void *param, void *tmrId); static void *tdRSmaPersistExec(void *param); -static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName); +static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName); -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile); -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish); -static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd); +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); +static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); struct SRSmaInfoItem { @@ -63,22 +64,23 @@ struct SRSmaQTaskInfoItem { void *qTaskInfo; }; -struct SRSmaQTaskFIter { +struct SRSmaQTaskInfoIter { STFile *pTFile; int64_t offset; int64_t fsize; int32_t nBytes; int32_t nAlloc; - char *buf; + char *pBuf; // ------------ + char *qBuf; // for iterator int32_t nBufPos; }; static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { - return lenWithHead - sizeof(int32_t) - sizeof(int8_t) - sizeof(int64_t); + return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; } -static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskFIter *pIter) { taosMemoryFreeClear(pIter->buf); } +static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC @@ -294,7 +296,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo) { - ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally + ASSERT(0); // TODO: free original pRSmaInfo if exists abnormally smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; } @@ -338,10 +340,10 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; - } else { - smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); } + smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); + // start the persist timer if (TASK_TRIGGER_STAT_INIT == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) { @@ -356,10 +358,9 @@ _err: } /** - * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. + * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently * - * @param pTsdb - * @param pMeta + * @param pVnode * @param pReq * @return int32_t */ @@ -695,8 +696,265 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } -static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) { - tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName); +int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { + SVnode *pVnode = pSma->pVnode; + + // step 1: iterate all stables to restore the rsma env + SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); + if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { + taosArrayDestroy(suidList); + smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr()); + return TSDB_CODE_FAILED; + } + + int32_t arrSize = taosArrayGetSize(suidList); + if (arrSize == 0) { + taosArrayDestroy(suidList); + smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); + return TSDB_CODE_SUCCESS; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, SMA_META(pSma), 0); + for (int32_t i = 0; i < arrSize; ++i) { + tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); + smaDebug("vgId:%d, rsma restore, suid[%d] is %" PRIi64, TD_VID(pVnode), i, suid); + if (metaGetTableEntryByUid(&mr, suid) < 0) { + smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + ASSERT(mr.me.type == TSDB_SUPER_TABLE); + ASSERT(mr.me.uid == suid); + if (TABLE_IS_ROLLUP(mr.me.flags)) { + SRSmaParam *param = &mr.me.stbEntry.rsmaParam; + for (int i = 0; i < TSDB_RETENTION_L2; ++i) { + smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64 + " qmsgLen:%" PRIi32, + TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); + } + if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { + smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); + goto _err; + } + smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid); + } + } + + // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore + STFile tFile = {0}; + char qTaskInfoFName[TSDB_FILENAME_LEN]; + + tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); + if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { + goto _err; + } + + if(!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { + metaReaderClear(&mr); + taosArrayDestroy(suidList); + return TSDB_CODE_SUCCESS; + } + + if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { + goto _err; + } + + SRSmaQTaskInfoIter fIter = {0}; + if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { + goto _err; + } + SRSmaQTaskInfoItem infoItem = {0}; + if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) { + tdRSmaQTaskInfoIterDestroy(&fIter); + goto _err; + } + + tdRSmaQTaskInfoIterDestroy(&fIter); + metaReaderClear(&mr); + taosArrayDestroy(suidList); + return TSDB_CODE_SUCCESS; +_err: + metaReaderClear(&mr); + taosArrayDestroy(suidList); + smaError("failed to restore rsma task since %s", terrstr()); + return TSDB_CODE_FAILED; +} + +static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) { + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); + SRSmaInfo *pRSmaInfo = NULL; + void *qTaskInfo = NULL; + + pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pItem->suid, sizeof(pItem->suid)); + + if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid); + return TSDB_CODE_SUCCESS; + } + + if (pItem->type == 1) { + qTaskInfo = pRSmaInfo->items[0].taskInfo; + } else if (pItem->type == 2) { + qTaskInfo = pRSmaInfo->items[1].taskInfo; + } else { + ASSERT(0); + } + + if (!qTaskInfo) { + smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid); + return TSDB_CODE_SUCCESS; + } + + if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { + smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, + pItem->type, terrstr(terrno)); + return TSDB_CODE_FAILED; + } + smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, pItem->type); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) { + memset(pIter, 0, sizeof(*pIter)); + pIter->pTFile = pTFile; + pIter->offset = TD_FILE_HEAD_SIZE; + + if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) { + return TSDB_CODE_FAILED; + } + + if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) { + pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE; + } else { + pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE; + } + + if (pIter->nAlloc < TD_FILE_HEAD_SIZE) { + pIter->nAlloc = TD_FILE_HEAD_SIZE; + } + + pIter->pBuf = taosMemoryMalloc(pIter->nAlloc); + if (!pIter->pBuf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pIter->qBuf = pIter->pBuf; + + return TSDB_CODE_SUCCESS; +} + +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) { + STFile *pTFile = pIter->pTFile; + int64_t nBytes = RSMA_QTASKINFO_BUFSIZE; + + if (pIter->offset >= pIter->fsize) { + *isFinish = true; + return TSDB_CODE_SUCCESS; + } + + if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) { + nBytes = pIter->fsize - pIter->offset; + } + + if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + + if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + + int32_t infoLen = 0; + taosDecodeFixedI32(pIter->qBuf, &infoLen); + if (infoLen > nBytes) { + ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE); + pIter->nAlloc = infoLen; + void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen); + if (!pBuf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pIter->pBuf = pBuf; + pIter->qBuf = pIter->pBuf; + nBytes = infoLen; + + if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + + if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + } + + pIter->offset += nBytes; + pIter->nBytes = nBytes; + pIter->nBufPos = 0; + + return TSDB_CODE_SUCCESS; +} + +static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { + while (1) { + // block iter + bool isFinish = false; + if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + if (isFinish) { + return TSDB_CODE_SUCCESS; + } + + // consume the block + int32_t qTaskInfoLenWithHead = 0; + pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); + if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return TSDB_CODE_FAILED; + } + + while (1) { + if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) { + SRSmaQTaskInfoItem infoItem = {0}; + pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type); + pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid); + infoItem.qTaskInfo = pIter->qBuf; + infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); + // do the restore job + smaDebug("vgId:%d, restore the qtask info %s offset:%" PRIi64 "\n", SMA_VID(pSma), + TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos); + tdRSmaQTaskInfoItemRestore(pSma, &infoItem); + + pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len); + pIter->nBufPos += qTaskInfoLenWithHead; + + if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) { + // prepare and load next block in the file + pIter->offset -= (pIter->nBytes - pIter->nBufPos); + break; + } + + pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); + continue; + } + // prepare and load next block in the file + pIter->offset -= (pIter->nBytes - pIter->nBufPos); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName) { + tdGetVndFileName(vid, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName); } static void *tdRSmaPersistExec(void *param) { @@ -704,6 +962,7 @@ static void *tdRSmaPersistExec(void *param) { SRSmaStat *pRSmaStat = param; SSma *pSma = pRSmaStat->pSma; STfs *pTfs = pSma->pVnode->pTfs; + int32_t vid = SMA_VID(pSma); int64_t toffset = 0; bool isFileCreated = false; @@ -716,26 +975,16 @@ static void *tdRSmaPersistExec(void *param) { goto _end; } - STFile tFile = {0}; - int32_t vid = SMA_VID(pSma); - + STFile tFile = {0}; while (infoHash) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - -#if 0 - smaDebug("table %" PRIi64 " sleep 15s start ...", pRSmaInfo->items[0].pRsmaInfo->suid); - for (int32_t i = 15; i > 0; --i) { - taosSsleep(1); - smaDebug("table %" PRIi64 " countdown %d", pRSmaInfo->items[0].pRsmaInfo->suid, i); - } - smaDebug("table %" PRIi64 " sleep 15s end ...", pRSmaInfo->items[0].pRsmaInfo->suid); -#endif for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; if (!taskInfo) { smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; } + char *pOutput = NULL; int32_t len = 0; int8_t type = (int8_t)(i + 1); @@ -743,69 +992,77 @@ static void *tdRSmaPersistExec(void *param) { smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, terrstr(terrno)); goto _err; - } else { - if (!pOutput) { - smaDebug("vgId:%d, table %" PRIi64 - " level %d serialize rsma task success but no output(len %d) and no need to persist", - vid, pRSmaInfo->suid, i + 1, len); - continue; - } else if (len <= 0) { - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and no need to persist", - vid, pRSmaInfo->suid, i + 1, len); - taosMemoryFree(pOutput); - } - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and need persist", vid, - pRSmaInfo->suid, i + 1, len); -#if 1 - if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) { - smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid, - i + 1, terrstr(terrno)); - } else { - smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1); - } -#endif } + if (!pOutput || len <= 0) { + smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist", + vid, pRSmaInfo->suid, i + 1, len); + taosMemoryFreeClear(pOutput); + continue; + } + + smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid, + pRSmaInfo->suid, i + 1, len); +#if 0 + if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) { + smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid, + i + 1, terrstr(terrno)); + } else { + smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1); + } +#endif if (!isFileCreated) { char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); + tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); tdInitTFile(&tFile, pTfs, qTaskInfoFName); tdCreateTFile(&tFile, pTfs, true, -1); isFileCreated = true; } - len += (sizeof(len) + sizeof(type) + sizeof(pRSmaInfo->suid)); - tdAppendTFile(&tFile, &len, sizeof(len), &toffset); - tdAppendTFile(&tFile, &type, sizeof(type), &toffset); - tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset); + + char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0}; + void *pTmpBuf = &tmpBuf; + int32_t headLen = 0; + headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN); + headLen += taosEncodeFixedI8(&pTmpBuf, type); + headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid); + + ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); + tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); + smaDebug("vgId:%d, table %" PRIi64 " level %d head part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, + i + 1, headLen, toffset); tdAppendTFile(&tFile, pOutput, len, &toffset); + smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, + i + 1, len, toffset); taosMemoryFree(pOutput); } + infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash); } _normal: if (isFileCreated) { if (tdUpdateTFileHeader(&tFile) < 0) { - smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno)); + smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), + tstrerror(terrno)); tdCloseTFile(&tFile); tdRemoveTFile(&tFile); goto _err; } else { - smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_FILE_FULL_NAME(&tFile)); + smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); } tdCloseTFile(&tFile); char newFName[TSDB_FILENAME_LEN]; - strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); + strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); - if (taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName) != 0) { - smaError("vgId:%d, failed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName); + if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { + smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); goto _err; } else { - smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName); + smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); } } goto _end; @@ -841,13 +1098,14 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INACTIVE, TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("persist task is active again"); + smaDebug("vgId:%d, persist task is active again", SMA_VID(pRSmaStat->pSma)); } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_FINISHED)) { - smaDebug(" persist task is cancelled and set finished"); + smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma)); } else { - smaWarn("persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); + smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)), + SMA_VID(pRSmaStat->pSma)); ASSERT(0); } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); @@ -864,7 +1122,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { */ static void tdRSmaPersistTrigger(void *param, void *tmrId) { SRSmaStat *pRSmaStat = param; - int8_t tmrStat = + + int8_t tmrStat = atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (tmrStat) { case TASK_TRIGGER_STAT_ACTIVE: { @@ -872,7 +1131,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("rsma persistence start since active"); + smaDebug("vgId:%d, rsma persistence start since active", SMA_VID(pRSmaStat->pSma)); // start persist task tdRSmaPersistTask(pRSmaStat); @@ -895,252 +1154,6 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { } break; default: { smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); - ASSERT(0); } break; } } - -int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { - SVnode *pVnode = pSma->pVnode; - - // step 1: iterate all stables to restore the rsma env - - SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); - if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { - smaError("vgId:%d, failed to restore rsma since get stb id list error: %s", TD_VID(pVnode), terrstr()); - return TSDB_CODE_FAILED; - } - - if (taosArrayGetSize(suidList) == 0) { - smaDebug("vgId:%d no need to restore rsma since empty stb id list", TD_VID(pVnode)); - return TSDB_CODE_SUCCESS; - } - - SMetaReader mr = {0}; - metaReaderInit(&mr, SMA_META(pSma), 0); - for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) { - tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); - smaDebug("suid [%d] is %" PRIi64, i, suid); - if (metaGetTableEntryByUid(&mr, suid) < 0) { - smaError("vgId:%d failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); - goto _err; - } - ASSERT(mr.me.type == TSDB_SUPER_TABLE); - ASSERT(mr.me.uid == suid); - if (TABLE_IS_ROLLUP(mr.me.flags)) { - SRSmaParam *param = &mr.me.stbEntry.rsmaParam; - for (int i = 0; i < 2; ++i) { - smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode), - suid, i, param->maxdelay[i], i, param->watermark[i]); - } - if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { - smaError("vgId:%d failed to retore rsma env for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); - goto _err; - } - } - } - - // step 2: retrieve qtaskinfo object from the rsma/qtaskinfo file and restore - STFile tFile = {0}; - char qTaskInfoFName[TSDB_FILENAME_LEN]; - - tdRSmaQTaskGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); - if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { - goto _err; - } - if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { - goto _err; - } - SRSmaQTaskFIter fIter = {0}; - if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { - goto _err; - } - SRSmaQTaskInfoItem infoItem = {0}; - bool isEnd = false; - int32_t code = 0; - while ((code = tdRSmaQTaskInfoIterNext(&fIter, &infoItem, &isEnd)) == 0) { - if (isEnd) { - break; - } - if ((code = tdRSmaQTaskInfoItemRestore(pSma, &infoItem)) < 0) { - break; - } - } - tdRSmaQTaskInfoIterDestroy(&fIter); - - if (code < 0) { - goto _err; - } - - metaReaderClear(&mr); - taosArrayDestroy(suidList); - return TSDB_CODE_SUCCESS; -_err: - ASSERT(0); - metaReaderClear(&mr); - taosArrayDestroy(suidList); - smaError("failed to restore rsma info since %s", terrstr()); - return TSDB_CODE_FAILED; -} - -static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem) { - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); - SRSmaInfo *pRSmaInfo = NULL; - void *qTaskInfo = NULL; - - pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &infoItem->suid, sizeof(infoItem->suid)); - - if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - smaDebug("vgId:%d, no restore as no rsma info for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid); - return TSDB_CODE_SUCCESS; - } - - if (infoItem->type == 1) { - qTaskInfo = pRSmaInfo->items[0].taskInfo; - } else if (infoItem->type == 2) { - qTaskInfo = pRSmaInfo->items[1].taskInfo; - } else { - ASSERT(0); - } - - if (!qTaskInfo) { - smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid); - return TSDB_CODE_SUCCESS; - } - - if (qDeserializeTaskStatus(qTaskInfo, infoItem->qTaskInfo, infoItem->len) < 0) { - smaError("vgId:%d, restore rsma failed for suid:%" PRIi64 " level %d since %s", SMA_VID(pSma), infoItem->suid, - infoItem->type, terrstr(terrno)); - return TSDB_CODE_FAILED; - } - smaDebug("vgId:%d, restore rsma success for suid:%" PRIi64 " level %d", SMA_VID(pSma), infoItem->suid, - infoItem->type); - - return TSDB_CODE_SUCCESS; -} - -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) { - memset(pIter, 0, sizeof(*pIter)); - pIter->pTFile = pTFile; - pIter->offset = TD_FILE_HEAD_SIZE; - - if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) { - return TSDB_CODE_FAILED; - } - - if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) { - pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE; - } else { - pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE; - } - - if (pIter->nAlloc < TD_FILE_HEAD_SIZE) { - pIter->nAlloc = TD_FILE_HEAD_SIZE; - } - - pIter->buf = taosMemoryMalloc(pIter->nAlloc); - if (!pIter->buf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish) { - STFile *pTFile = pIter->pTFile; - int64_t nBytes = RSMA_QTASKINFO_BUFSIZE; - - if (pIter->offset >= pIter->fsize) { - *isFinish = true; - return TSDB_CODE_SUCCESS; - } - - if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) { - nBytes = pIter->fsize - pIter->offset; - } - - if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) { - ASSERT(0); - return TSDB_CODE_FAILED; - } - - if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) { - ASSERT(0); - return TSDB_CODE_FAILED; - } - - int32_t infoLen = 0; - taosDecodeFixedI32(pIter->buf, &infoLen); - if (infoLen > nBytes) { - ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE); - pIter->nAlloc = infoLen; - void *pBuf = taosMemoryRealloc(pIter->buf, infoLen); - if (!pBuf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - pIter->buf = pBuf; - nBytes = infoLen; - - if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) { - ASSERT(0); - return TSDB_CODE_FAILED; - } - - if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) { - ASSERT(0); - return TSDB_CODE_FAILED; - } - } - - pIter->offset += nBytes; - pIter->nBytes = nBytes; - pIter->nBufPos = 0; - - return TSDB_CODE_SUCCESS; -} - -static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd) { - while (1) { - // block iter - bool isFinish = false; - if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) { - ASSERT(0); - return TSDB_CODE_FAILED; - } - if (isFinish) { - *isEnd = true; - return TSDB_CODE_SUCCESS; - } - - // consume the block - int32_t qTaskInfoLenWithHead = 0; - pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead); - if (qTaskInfoLenWithHead < 0) { - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return TSDB_CODE_FAILED; - } - while (1) { - if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) { - pIter->buf = taosDecodeFixedI8(pIter->buf, &pItem->type); - pIter->buf = taosDecodeFixedI64(pIter->buf, &pItem->suid); - pItem->qTaskInfo = pIter->buf; - pItem->len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); - // do the restore job - printf("%s:%d ###### restore the qtask info offset:%" PRIi64 "\n", __func__, __LINE__, pIter->offset); - - pIter->buf = POINTER_SHIFT(pIter->buf, pItem->len); - pIter->nBufPos += qTaskInfoLenWithHead; - - pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead); - continue; - } - // prepare and load next block in the file - pIter->offset -= (pIter->nBytes - pIter->nBufPos); - break; - } - } - - return TSDB_CODE_SUCCESS; -} diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 1f60da0b0a..5f78aadddf 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -22,7 +22,6 @@ #define TD_FILE_INIT_MAGIC 0xFFFFFFFF - static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); @@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { } int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); if (nwrite < nbyte) { @@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { } int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); - int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence); + int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence); if (loffset < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { } int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); return taosFStatFile(pTFile->pFile, size, NULL); } int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); if (nread < 0) { @@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { char buf[TD_FILE_HEAD_SIZE] = "\0"; uint32_t _version; - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { return -1; @@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { } int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); int64_t toffset; @@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) return -1; } +#if 1 + smaDebug("append to file %s, offset:%" PRIi64 " + nbyte:%" PRIi64 " =%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset, + nbyte, toffset + nbyte); +#endif + ASSERT(pTFile->info.fsize == toffset); if (offset) { @@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) } int32_t tdOpenTFile(STFile *pTFile, int flags) { - ASSERT(!TD_FILE_OPENED(pTFile)); + ASSERT(!TD_TFILE_OPENED(pTFile)); - pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags); if (pTFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) { } void tdCloseTFile(STFile *pTFile) { - if (TD_FILE_OPENED(pTFile)) { + if (TD_TFILE_OPENED(pTFile)) { taosCloseFile(&pTFile->pFile); - TD_FILE_SET_CLOSED(pTFile); + TD_TFILE_SET_CLOSED(pTFile); } } @@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { char fullname[TSDB_FILENAME_LEN]; SDiskID did = {0}; - TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK); - TD_FILE_SET_CLOSED(pTFile); + TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK); + TD_TFILE_SET_CLOSED(pTFile); memset(&(pTFile->info), 0, sizeof(pTFile->info)); pTFile->info.magic = TD_FILE_INIT_MAGIC; @@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) { ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); - pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pTFile->pFile == NULL) { if (errno == ENOENT) { // Try to create directory recursively - char *s = strdup(TD_FILE_REL_NAME(pTFile)); - if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) { + char *s = strdup(TD_TFILE_REL_NAME(pTFile)); + if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_TFILE_DID(pTFile)) < 0) { taosMemoryFreeClear(s); return -1; } taosMemoryFreeClear(s); - pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pTFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp return 0; } -int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); } +int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_TFILE_F(pTFile)); } // smaXXXUtil ================ // ... \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4d73dbc406..57d7386667 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return pVnode; _err: - if (pVnode->pSma) smaClose(pVnode->pSma); + if (pVnode->pSma) smaCloseEnv(pVnode->pSma); if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); + if (pVnode->pSma) smaCloseEx(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); - tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); return NULL;