From a7c8563c0616859f4fbc6bf8eca8cd96b7c84899 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 25 Dec 2023 16:13:30 +0800 Subject: [PATCH] fix: remove useless code --- source/dnode/vnode/src/inc/tsdb.h | 38 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 1628 ---------------------- source/dnode/vnode/src/tsdb/tsdbFS.c | 313 +---- source/dnode/vnode/src/tsdb/tsdbRead2.c | 119 +- 5 files changed, 70 insertions(+), 2032 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d347ce30d8..ebaf0851e6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -256,18 +256,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); // tsdbFS.c ============================================================================================== int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback); int32_t tsdbFSClose(STsdb *pTsdb); -int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); -void tsdbFSDestroy(STsdbFS *pFS); -int32_t tDFileSetCmprFn(const void *p1, const void *p2); -int32_t tsdbFSCommit(STsdb *pTsdb); -int32_t tsdbFSRollback(STsdb *pTsdb); -int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS); -int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); -void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS); void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); - -int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet); -int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile); // tsdbReaderWriter.c ============================================================================================== // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); @@ -737,7 +726,6 @@ struct STsdbReadSnap { SMemTable *pIMem; SQueryNode *pINode; TFileSetArray *pfSetArray; - STsdbFS fs; }; struct SDataFWriter { @@ -796,16 +784,16 @@ typedef struct { } SSttTableRowsInfo; typedef struct SSttBlockLoadInfo { - SBlockDataInfo blockData[2]; // buffered block data - SArray *aSttBlk; - int32_t currentLoadBlockIndex; - STSchema *pSchema; - int16_t *colIds; - int32_t numOfCols; - bool checkRemainingRow; // todo: no assign value? - bool isLast; - bool sttBlockLoaded; - SSttTableRowsInfo info; + SBlockDataInfo blockData[2]; // buffered block data + SArray *aSttBlk; + int32_t currentLoadBlockIndex; + STSchema *pSchema; + int16_t *colIds; + int32_t numOfCols; + bool checkRemainingRow; // todo: no assign value? + bool isLast; + bool sttBlockLoaded; + SSttTableRowsInfo info; SSttBlockLoadCostInfo cost; } SSttBlockLoadInfo; @@ -894,15 +882,15 @@ typedef struct { _load_tomb_fn loadTombFn; void *pReader; void *idstr; - bool rspRows; // response the rows in stt-file, if possible + bool rspRows; // response the rows in stt-file, if possible } SMergeTreeConf; typedef struct SSttDataInfoForTable { - SArray* pTimeWindowList; + SArray *pTimeWindowList; int64_t numOfRows; } SSttDataInfoForTable; -int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pTableInfo); +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); void tMergeTreePinSttBlock(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 84f51827ba..c1a4754b62 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -95,7 +95,7 @@ typedef struct SQueryNode SQueryNode; #define VNODE_RSMA2_DIR "rsma2" #define VNODE_TQ_STREAM "stream" -#if SUSPEND_RESUME_TEST // only for test purpose +#if SUSPEND_RESUME_TEST // only for test purpose #define VNODE_BUFPOOL_SEGMENTS 1 #else #define VNODE_BUFPOOL_SEGMENTS 3 @@ -216,8 +216,6 @@ int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); -// int32_t tsdbFinishCommit(STsdb* pTsdb); -// int32_t tsdbRollbackCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 55ae25aee4..37413ef920 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -15,92 +15,6 @@ #include "tsdb.h" -typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT; - -#define USE_STREAM_COMPRESSION 0 - -typedef struct { - SRBTreeNode n; - SRowInfo r; - EDataIterT type; - union { - struct { - int32_t iTbDataP; - STbDataIter iter; - }; // memory data iter - struct { - int32_t iStt; - SArray *aSttBlk; - int32_t iSttBlk; - SBlockData bData; - int32_t iRow; - }; // stt file data iter - }; -} SDataIter; - -typedef struct { - STsdb *pTsdb; - /* commit data */ - int64_t commitID; - int32_t minutes; - int8_t precision; - int32_t minRow; - int32_t maxRow; - int8_t cmprAlg; - int8_t sttTrigger; - SArray *aTbDataP; // memory - STsdbFS fs; // disk - // -------------- - TSKEY nextKey; // reset by each table commit - int32_t commitFid; - int32_t expLevel; - TSKEY minKey; - TSKEY maxKey; - // commit file data - struct { - SDataFReader *pReader; - SArray *aBlockIdx; // SArray - int32_t iBlockIdx; - SBlockIdx *pBlockIdx; - SMapData mBlock; // SMapData - SBlockData bData; - } dReader; - struct { - SDataIter *pIter; - SRBTree rbt; - SDataIter dataIter; - SDataIter aDataIter[TSDB_STT_TRIGGER_ARRAY_SIZE]; - int8_t toLastOnly; - }; - struct { - SDataFWriter *pWriter; - SArray *aBlockIdx; // SArray - SArray *aSttBlk; // SArray - SMapData mBlock; // SMapData - SBlockData bData; -#if USE_STREAM_COMPRESSION - SDiskDataBuilder *pBuilder; -#else - SBlockData bDatal; -#endif - } dWriter; - SSkmInfo skmTable; - SSkmInfo skmRow; - /* commit del */ - SDelFReader *pDelFReader; - SDelFWriter *pDelFWriter; - SArray *aDelIdx; // SArray - SArray *aDelIdxN; // SArray - SArray *aDelData; // SArray -} SCommitter; - -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo); -static int32_t tsdbCommitData(SCommitter *pCommitter); -static int32_t tsdbCommitDel(SCommitter *pCommitter); -static int32_t tsdbCommitCache(SCommitter *pCommitter); -static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); -static int32_t tsdbNextCommitRow(SCommitter *pCommitter); - int32_t tRowInfoCmprFn(const void *p1, const void *p2) { SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo2 = (SRowInfo *)p2; @@ -150,1545 +64,3 @@ _exit: } return code; } - -int32_t tsdbPrepareCommit(STsdb *pTsdb) { - taosThreadMutexLock(&pTsdb->mutex); - ASSERT(pTsdb->imem == NULL); - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - taosThreadMutexUnlock(&pTsdb->mutex); - - return 0; -} - -int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { - if (!pTsdb) return 0; - - int32_t code = 0; - int32_t lino = 0; - SCommitter commith; - SMemTable *pMemTable = pTsdb->imem; - - // check - if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { - taosThreadMutexLock(&pTsdb->mutex); - pTsdb->imem = NULL; - taosThreadMutexUnlock(&pTsdb->mutex); - - tsdbUnrefMemTable(pMemTable, NULL, true); - goto _exit; - } - - // start commit - code = tsdbStartCommit(pTsdb, &commith, pInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - // commit impl - code = tsdbCommitData(&commith); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCommitDel(&commith); - TSDB_CHECK_CODE(code, lino, _exit); - - // end commit - code = tsdbEndCommit(&commith, 0); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbEndCommit(&commith, code); - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - SDelFile *pDelFileR = pCommitter->fs.pDelFile; - if (pDelFileR) { - code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // prepare new - SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0}; - code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); - } - return code; -} - -static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { - int32_t code = 0; - int32_t lino = 0; - SDelData *pDelData; - tb_uid_t suid; - tb_uid_t uid; - - if (pTbData) { - suid = pTbData->suid; - uid = pTbData->uid; - - if (pTbData->pHead == NULL) { - pTbData = NULL; - } - } - - if (pDelIdx) { - suid = pDelIdx->suid; - uid = pDelIdx->uid; - - code = tsdbReadDelDatav1(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - taosArrayClear(pCommitter->aDelData); - } - - if (pTbData == NULL && pDelIdx == NULL) goto _exit; - - SDelIdx delIdx = {.suid = suid, .uid = uid}; - - // memory - pDelData = pTbData ? pTbData->pHead : NULL; - for (; pDelData; pDelData = pDelData->pNext) { - if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // write - code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - // put delIdx - if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - - code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->pDelFReader) { - code = tsdbDelFReaderClose(&pCommitter->pDelFReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - - taosArrayDestroy(pCommitter->aDelIdx); - taosArrayDestroy(pCommitter->aDelData); - taosArrayDestroy(pCommitter->aDelIdxN); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) { - int32_t code = 0; - int32_t lino = 0; - - if (suid) { - if (pSkmInfo->suid == suid) { - pSkmInfo->uid = uid; - goto _exit; - } - } else { - if (pSkmInfo->uid == uid) goto _exit; - } - - pSkmInfo->suid = suid; - pSkmInfo->uid = uid; - tDestroyTSchema(pSkmInfo->pTSchema); - code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - return code; -} - -static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { - int32_t code = 0; - int32_t lino = 0; - - if (pCommitter->skmRow.pTSchema) { - if (pCommitter->skmRow.suid == suid) { - if (suid == 0) { - if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit; - } else { - if (sver == pCommitter->skmRow.pTSchema->version) goto _exit; - } - } - } - - pCommitter->skmRow.suid = suid; - pCommitter->skmRow.uid = uid; - tDestroyTSchema(pCommitter->skmRow.pTSchema); - code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - return code; -} - -static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - ASSERT(pCommitter->dReader.pBlockIdx); - - pCommitter->dReader.iBlockIdx++; - if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { - pCommitter->dReader.pBlockIdx = - (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - - code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); - TSDB_CHECK_CODE(code, lino, _exit); - - ASSERT(pCommitter->dReader.mBlock.nItem > 0); - } else { - pCommitter->dReader.pBlockIdx = NULL; - } - -_exit: - return code; -} - -static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { - SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n)); - SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n)); - - return tRowInfoCmprFn(&pIter1->r, &pIter2->r); -} - -static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - pCommitter->pIter = NULL; - tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn); - - // memory - TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; - SDataIter *pIter = &pCommitter->dataIter; - pIter->type = MEMORY_DATA_ITER; - pIter->iTbDataP = 0; - for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); - tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter); - TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - pRow = NULL; - } - - if (pRow == NULL) continue; - - pIter->r.suid = pTbData->suid; - pIter->r.uid = pTbData->uid; - pIter->r.row = *pRow; - break; - } - ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)); - tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); - - // disk - pCommitter->toLastOnly = 0; - SDataFReader *pReader = pCommitter->dReader.pReader; - if (pReader) { - if (pReader->pSet->nSttF >= pCommitter->sttTrigger) { - int8_t iIter = 0; - for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { - pIter = &pCommitter->aDataIter[iIter]; - pIter->type = STT_DATA_ITER; - pIter->iStt = iStt; - - code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pIter->aSttBlk) == 0) continue; - - pIter->iSttBlk = 0; - SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0); - code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->iRow = 0; - pIter->r.suid = pIter->bData.suid; - pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); - - tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); - iIter++; - } - } else { - for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { - SSttFile *pSttFile = pReader->pSet->aSttF[iStt]; - if (pSttFile->size > pSttFile->offset) { - pCommitter->toLastOnly = 1; - break; - } - } - } - } - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SDFileSet *pRSet = NULL; - - // memory - pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); - pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); - tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, - &pCommitter->maxKey); -#if 0 - ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey); -#endif - - pCommitter->nextKey = TSKEY_MAX; - - // Reader - SDFileSet tDFileSet = {.fid = pCommitter->commitFid}; - pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ); - if (pRSet) { - code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // data - code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - pCommitter->dReader.iBlockIdx = 0; - if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { - pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); - code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - pCommitter->dReader.pBlockIdx = NULL; - } - tBlockDataReset(&pCommitter->dReader.bData); - } else { - pCommitter->dReader.pBlockIdx = NULL; - } - - // Writer - SHeadFile fHead = {.commitID = pCommitter->commitID}; - SDataFile fData = {.commitID = pCommitter->commitID}; - SSmaFile fSma = {.commitID = pCommitter->commitID}; - SSttFile fStt = {.commitID = pCommitter->commitID}; - SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; - if (pRSet) { - ASSERT(pRSet->nSttF <= pCommitter->sttTrigger); - fData = *pRSet->pDataF; - fSma = *pRSet->pSmaF; - wSet.diskId = pRSet->diskId; - if (pRSet->nSttF < pCommitter->sttTrigger) { - for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) { - wSet.aSttF[iStt] = pRSet->aSttF[iStt]; - } - wSet.nSttF = pRSet->nSttF + 1; - } else { - wSet.nSttF = 1; - } - } else { - SDiskID did = {0}; - if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } - code = tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); - TSDB_CHECK_CODE(code, lino, _exit); - wSet.diskId = did; - wSet.nSttF = 1; - } - wSet.aSttF[wSet.nSttF - 1] = &fStt; - code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - taosArrayClear(pCommitter->dWriter.aBlockIdx); - taosArrayClear(pCommitter->dWriter.aSttBlk); - tMapDataReset(&pCommitter->dWriter.mBlock); - tBlockDataReset(&pCommitter->dWriter.bData); -#if USE_STREAM_COMPRESSION - tDiskDataBuilderClear(pCommitter->dWriter.pBuilder); -#else - tBlockDataReset(&pCommitter->dWriter.bDatal); -#endif - - // open iter - code = tsdbOpenCommitIter(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) { - int32_t code = 0; - int32_t lino = 0; - - if (pBlockData->nRow == 0) return code; - - SDataBlk dataBlk; - tDataBlkReset(&dataBlk); - - // info - dataBlk.nRow += pBlockData->nRow; - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; - - if (iRow == 0) { - if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) { - dataBlk.minKey = key; - } - } else { - if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { - dataBlk.hasDup = 1; - } - } - - if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) { - dataBlk.maxKey = key; - } - - dataBlk.minVer = TMIN(dataBlk.minVer, key.version); - dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version); - } - - // write - dataBlk.nSubBlock++; - code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], - ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - // put SDataBlk - code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - // clear - tBlockDataClear(pBlockData); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) { - int32_t code = 0; - int32_t lino = 0; - SSttBlk sstBlk; - - if (pBlockData->nRow == 0) return code; - - // info - sstBlk.suid = pBlockData->suid; - sstBlk.nRow = pBlockData->nRow; - sstBlk.minKey = TSKEY_MAX; - sstBlk.maxKey = TSKEY_MIN; - sstBlk.minVer = VERSION_MAX; - sstBlk.maxVer = VERSION_MIN; - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]); - sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]); - sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]); - sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]); - } - sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0]; - sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; - - // write - code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - // push SSttBlk - if (taosArrayPush(aSttBlk, &sstBlk) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // clear - tBlockDataClear(pBlockData); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) { - int32_t code = 0; - int32_t lino = 0; - - if (pBuilder->nRow == 0) return code; - - // gnrt - const SDiskData *pDiskData; - const SBlkInfo *pBlkInfo; - code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - SSttBlk sttBlk = {.suid = pBuilder->suid, - .minUid = pBlkInfo->minUid, - .maxUid = pBlkInfo->maxUid, - .minKey = pBlkInfo->minKey, - .maxKey = pBlkInfo->maxKey, - .minVer = pBlkInfo->minVer, - .maxVer = pBlkInfo->maxVer, - .nRow = pBuilder->nRow}; - // write - code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - // push - if (taosArrayPush(aSttBlk, &sttBlk) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // clear - tDiskDataBuilderClear(pBuilder); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - // write aBlockIdx - code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - // write aSttBlk - code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - // update file header - code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - // upsert SDFileSet - code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // close and sync - code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->dReader.pReader) { - code = tsdbDataFReaderClose(&pCommitter->dReader.pReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { - int32_t code = 0; - int32_t lino = 0; - - while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { - SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; - code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbCommitterNextTableData(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); -static int32_t tsdbCommitFileData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - // commit file data start - code = tsdbCommitFileDataStart(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - // impl - code = tsdbCommitFileDataImpl(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - // commit file data end - code = tsdbCommitFileDataEnd(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - tsdbDataFReaderClose(&pCommitter->dReader.pReader); - tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); - } - return code; -} - -// ---------------------------------------------------------------------------- -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) { - int32_t code = 0; - int32_t lino = 0; - - memset(pCommitter, 0, sizeof(*pCommitter)); - ASSERT(pTsdb->imem && "last tsdb commit incomplete"); - - pCommitter->pTsdb = pTsdb; - pCommitter->commitID = pInfo->info.state.commitID; - pCommitter->minutes = pTsdb->keepCfg.days; - pCommitter->precision = pTsdb->keepCfg.precision; - pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows; - pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows; - pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression; - pCommitter->sttTrigger = pInfo->info.config.sttTrigger; - pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); - if (pCommitter->aTbDataP == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - code = tsdbFSCopy(pTsdb, &pCommitter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - // reader - pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->dReader.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pCommitter->dReader.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - // merger - for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) { - SDataIter *pIter = &pCommitter->aDataIter[iStt]; - pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pIter->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // writer - pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->dWriter.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pCommitter->dWriter.aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pCommitter->dWriter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - -#if USE_STREAM_COMPRESSION - code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder); -#else - code = tBlockDataCreate(&pCommitter->dWriter.bDatal); -#endif - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static void tsdbCommitDataEnd(SCommitter *pCommitter) { - // reader - taosArrayDestroy(pCommitter->dReader.aBlockIdx); - tMapDataClear(&pCommitter->dReader.mBlock); - tBlockDataDestroy(&pCommitter->dReader.bData); - - // merger - for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) { - SDataIter *pIter = &pCommitter->aDataIter[iStt]; - taosArrayDestroy(pIter->aSttBlk); - tBlockDataDestroy(&pIter->bData); - } - - // writer - taosArrayDestroy(pCommitter->dWriter.aBlockIdx); - taosArrayDestroy(pCommitter->dWriter.aSttBlk); - tMapDataClear(&pCommitter->dWriter.mBlock); - tBlockDataDestroy(&pCommitter->dWriter.bData); -#if USE_STREAM_COMPRESSION - tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder); -#else - tBlockDataDestroy(&pCommitter->dWriter.bDatal); -#endif - tDestroyTSchema(pCommitter->skmTable.pTSchema); - tDestroyTSchema(pCommitter->skmRow.pTSchema); -} - -static int32_t tsdbCommitData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - // check - if (pMemTable->nRow == 0) goto _exit; - - // start ==================== - code = tsdbCommitDataStart(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - // impl ==================== - pCommitter->nextKey = pMemTable->minKey; - while (pCommitter->nextKey < TSKEY_MAX) { - code = tsdbCommitFileData(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // end ==================== - tsdbCommitDataEnd(pCommitter); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDel(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - if (pMemTable->nDel == 0) { - goto _exit; - } - - // start - code = tsdbCommitDelStart(pCommitter); - if (code) { - TSDB_CHECK_CODE(code, lino, _exit); - } - - // impl - int32_t iDelIdx = 0; - int32_t nDelIdx = taosArrayGetSize(pCommitter->aDelIdx); - int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pCommitter->aTbDataP); - STbData *pTbData; - SDelIdx *pDelIdx; - - ASSERT(nTbData > 0); - - pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); - pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; - while (true) { - if (pTbData == NULL && pDelIdx == NULL) break; - - if (pTbData && pDelIdx) { - int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx); - - if (c == 0) { - goto _commit_mem_and_disk_del; - } else if (c < 0) { - goto _commit_mem_del; - } else { - goto _commit_disk_del; - } - } else if (pTbData) { - goto _commit_mem_del; - } else { - goto _commit_disk_del; - } - - _commit_mem_del: - code = tsdbCommitTableDel(pCommitter, pTbData, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; - continue; - - _commit_disk_del: - code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - iDelIdx++; - pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; - continue; - - _commit_mem_and_disk_del: - code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; - iDelIdx++; - pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; - continue; - } - - // end - code = tsdbCommitDelEnd(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); - } - return code; -} - -static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - - if (eno) { - code = eno; - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - tsdbFSDestroy(&pCommitter->fs); - taosArrayDestroy(pCommitter->aTbDataP); - pCommitter->aTbDataP = NULL; - if (code || eno) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); - } - return code; -} - -// ================================================================================ - -static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { - return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL; -} - -static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - if (pCommitter->pIter) { - SDataIter *pIter = pCommitter->pIter; - if (pCommitter->pIter->type == MEMORY_DATA_ITER) { // memory - tsdbTbDataIterNext(&pIter->iter); - TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); - while (true) { - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - pRow = NULL; - } - - if (pRow) { - pIter->r.suid = pIter->iter.pTbData->suid; - pIter->r.uid = pIter->iter.pTbData->uid; - pIter->r.row = *pRow; - break; - } - - pIter->iTbDataP++; - if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); - TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN}; - tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter); - pRow = tsdbTbDataIterGet(&pIter->iter); - continue; - } else { - pCommitter->pIter = NULL; - break; - } - } - } else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file - pIter->iRow++; - if (pIter->iRow < pIter->bData.nRow) { - pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; - pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - } else { - pIter->iSttBlk++; - if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) { - SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); - - code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); - if (code) goto _exit; - - pIter->iRow = 0; - pIter->r.suid = pIter->bData.suid; - pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); - } else { - pCommitter->pIter = NULL; - } - } - } else { - ASSERT(0); - } - - // compare with min in RB Tree - pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt); - if (pCommitter->pIter && pIter) { - int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r); - if (c > 0) { - tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter); - pCommitter->pIter = NULL; - } else { - ASSERT(c); - } - } - } - - if (pCommitter->pIter == NULL) { - pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt); - if (pCommitter->pIter) { - tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter); - } - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData *pBlockData = &pCommitter->dWriter.bData; - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; - - tBlockDataClear(pBlockData); - while (pRowInfo) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo) { - if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) { - pRowInfo = NULL; - } else { - TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL; - } - } - - if (pBlockData->nRow >= pCommitter->maxRow) { - code = - tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { - int32_t code = 0; - int32_t lino = 0; - - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; - SBlockData *pBDataR = &pCommitter->dReader.bData; - SBlockData *pBDataW = &pCommitter->dWriter.bData; - - code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataClear(pBDataW); - int32_t iRow = 0; - TSDBROW row = tsdbRowFromBlockData(pBDataR, 0); - TSDBROW *pRow = &row; - - while (pRow && pRowInfo) { - int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); - if (c < 0) { - code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - iRow++; - if (iRow < pBDataR->nRow) { - row = tsdbRowFromBlockData(pBDataR, iRow); - } else { - pRow = NULL; - } - } else if (c > 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo) { - if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) { - pRowInfo = NULL; - } else { - TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL; - } - } - } else { - ASSERT(0 && "dup rows not allowed"); - } - - if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - while (pRow) { - code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - iRow++; - if (iRow < pBDataR->nRow) { - row = tsdbRowFromBlockData(pBDataR, iRow); - } else { - pRow = NULL; - } - - if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - int32_t lino = 0; - - SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; - - ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); - if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { - int32_t iBlock = 0; - SDataBlk block; - SDataBlk *pDataBlk = █ - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - - ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid); - - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - while (pDataBlk && pRowInfo) { - SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; - int32_t c = tDataBlkCmprFn(pDataBlk, &tBlock); - - if (c < 0) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - } else { - pDataBlk = NULL; - } - } else if (c > 0) { - code = tsdbCommitAheadBlock(pCommitter, pDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; - } else { - code = tsdbCommitMergeBlock(pCommitter, pDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - } else { - pDataBlk = NULL; - } - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; - } - } - - while (pDataBlk) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - } else { - pDataBlk = NULL; - } - } - - code = tsdbCommitterNextTableData(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - int32_t lino = 0; - -#if USE_STREAM_COMPRESSION - SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder; - if (pBuilder->suid || pBuilder->uid) { - if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) { - code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - tDiskDataBuilderClear(pBuilder); - } - } - - if (!pBuilder->suid && !pBuilder->uid) { - ASSERT(pCommitter->skmTable.suid == id.suid); - ASSERT(pCommitter->skmTable.uid == id.uid); - code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } -#else - SBlockData *pBData = &pCommitter->dWriter.bDatal; - if (pBData->suid || pBData->uid) { - if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataReset(pBData); - } - } - - if (!pBData->suid && !pBData->uid) { - ASSERT(pCommitter->skmTable.suid == id.suid); - ASSERT(pCommitter->skmTable.uid == id.uid); - TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid}; - code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } -#endif - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData *pBData = &pCommitter->dWriter.bData; - TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; - - code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { - TSDBROW row = tsdbRowFromBlockData(pBData, iRow); - -#if USE_STREAM_COMPRESSION - code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { - code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - } -#else - code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } -#endif - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - int32_t lino = 0; - - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; - } - - if (pRowInfo == NULL) goto _exit; - - if (pCommitter->toLastOnly) { - code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - while (pRowInfo) { - STSchema *pTSchema = NULL; - if (pRowInfo->row.type == TSDBROW_ROW_FMT) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - pTSchema = pCommitter->skmRow.pTSchema; - } - -#if USE_STREAM_COMPRESSION - code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); -#else - code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid); -#endif - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; - } - -#if USE_STREAM_COMPRESSION - if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { - code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - } -#else - if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } -#endif - } - } else { - SBlockData *pBData = &pCommitter->dWriter.bData; - ASSERT(pBData->nRow == 0); - - while (pRowInfo) { - STSchema *pTSchema = NULL; - if (pRowInfo->row.type == TSDBROW_ROW_FMT) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - pTSchema = pCommitter->skmRow.pTSchema; - } - - code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; - } - - if (pBData->nRow >= pCommitter->maxRow) { - code = - tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - if (pBData->nRow) { - if (pBData->nRow > pCommitter->minRow) { - code = - tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbAppendLastBlock(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - SRowInfo *pRowInfo; - TABLEID id = {0}; - while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) { - ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid); - id.suid = pRowInfo->suid; - id.uid = pRowInfo->uid; - - code = tsdbMoveCommitData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - // start - tMapDataReset(&pCommitter->dWriter.mBlock); - - // impl - code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); - TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - /* merge with data in .data file */ - code = tsdbMergeTableData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - /* handle remain table data */ - code = tsdbCommitTableData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - // end - if (pCommitter->dWriter.mBlock.nItem > 0) { - SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; - code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } - - id.suid = INT64_MAX; - id.uid = INT64_MAX; - code = tsdbMoveCommitData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - -#if USE_STREAM_COMPRESSION - code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); -#else - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); -#endif - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -int32_t tsdbFinishCommit(STsdb *pTsdb) { - int32_t code = 0; - int32_t lino = 0; - SMemTable *pMemTable = pTsdb->imem; - - // lock - taosThreadMutexLock(&pTsdb->mutex); - - code = tsdbFSCommit(pTsdb); - if (code) { - taosThreadMutexUnlock(&pTsdb->mutex); - TSDB_CHECK_CODE(code, lino, _exit); - } - - pTsdb->imem = NULL; - - // unlock - taosThreadMutexUnlock(&pTsdb->mutex); - if (pMemTable) { - tsdbUnrefMemTable(pMemTable, NULL, true); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode)); - } - return code; -} - -int32_t tsdbRollbackCommit(STsdb *pTsdb) { - int32_t code = 0; - int32_t lino = 0; - - code = tsdbFSRollback(pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode)); - } - return code; -} diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 732f46467e..a796f5121c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -702,7 +702,7 @@ _exit: } // EXPOSED APIS ==================================================================================== -int32_t tsdbFSCommit(STsdb *pTsdb) { +static int32_t tsdbFSCommit(STsdb *pTsdb) { int32_t code = 0; int32_t lino = 0; STsdbFS fs = {0}; @@ -738,7 +738,7 @@ _exit: return code; } -int32_t tsdbFSRollback(STsdb *pTsdb) { +static int32_t tsdbFSRollback(STsdb *pTsdb) { int32_t code = 0; int32_t lino = 0; @@ -833,312 +833,3 @@ int32_t tsdbFSClose(STsdb *pTsdb) { return code; } - -int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { - int32_t code = 0; - int32_t lino = 0; - - pFS->pDelFile = NULL; - if (pFS->aDFileSet) { - taosArrayClear(pFS->aDFileSet); - } else { - pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); - if (pFS->aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - if (pTsdb->fs.pDelFile) { - pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pFS->pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - *pFS->pDelFile = *pTsdb->fs.pDelFile; - } - - for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid}; - - // head - fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); - if (fSet.pHeadF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - *fSet.pHeadF = *pSet->pHeadF; - - // data - fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); - if (fSet.pDataF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - *fSet.pDataF = *pSet->pDataF; - - // sma - fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); - if (fSet.pSmaF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - *fSet.pSmaF = *pSet->pSmaF; - - // stt - for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) { - fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (fSet.aSttF[fSet.nSttF] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF]; - } - - if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) { - int32_t code = 0; - - if (pDelFile) { - if (pFS->pDelFile == NULL) { - pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pFS->pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - *pFS->pDelFile = *pDelFile; - } else { - if (pFS->pDelFile) { - taosMemoryFree(pFS->pDelFile); - pFS->pDelFile = NULL; - } - } - -_exit: - return code; -} - -int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { - int32_t code = 0; - int32_t idx = taosArraySearchIdx(pFS->aDFileSet, pSet, tDFileSetCmprFn, TD_GE); - - if (idx < 0) { - idx = taosArrayGetSize(pFS->aDFileSet); - } else { - SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, idx); - int32_t c = tDFileSetCmprFn(pSet, pDFileSet); - if (c == 0) { - *pDFileSet->pHeadF = *pSet->pHeadF; - *pDFileSet->pDataF = *pSet->pDataF; - *pDFileSet->pSmaF = *pSet->pSmaF; - // stt - if (pSet->nSttF > pDFileSet->nSttF) { - ASSERT(pSet->nSttF == pDFileSet->nSttF + 1); - - pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - *pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1]; - pDFileSet->nSttF++; - } else if (pSet->nSttF < pDFileSet->nSttF) { - ASSERT(pSet->nSttF == 1); - for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) { - taosMemoryFree(pDFileSet->aSttF[iStt]); - } - - *pDFileSet->aSttF[0] = *pSet->aSttF[0]; - pDFileSet->nSttF = 1; - } else { - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - *pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt]; - } - } - - pDFileSet->diskId = pSet->diskId; - goto _exit; - } - } - - ASSERT(pSet->nSttF == 1); - SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1}; - - // head - fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); - if (fSet.pHeadF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - *fSet.pHeadF = *pSet->pHeadF; - - // data - fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); - if (fSet.pDataF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - *fSet.pDataF = *pSet->pDataF; - - // sma - fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); - if (fSet.pSmaF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - *fSet.pSmaF = *pSet->pSmaF; - - // stt - fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (fSet.aSttF[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - *fSet.aSttF[0] = *pSet->aSttF[0]; - - if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - -_exit: - return code; -} - -int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) { - int32_t code = 0; - int32_t lino = 0; - char tfname[TSDB_FILENAME_LEN]; - - tsdbGetCurrentFName(pTsdb, NULL, tfname); - - // gnrt CURRENT.t - code = tsdbSaveFSToFile(pFSNew, tfname); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { - int32_t code = 0; - int32_t nRef; - - pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); - if (pFS->aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - pFS->pDelFile = pTsdb->fs.pDelFile; - if (pFS->pDelFile) { - nRef = atomic_fetch_add_32(&pFS->pDelFile->nRef, 1); - ASSERT(nRef > 0); - } - - SDFileSet fSet; - for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - fSet = *pSet; - - nRef = atomic_fetch_add_32(&pSet->pHeadF->nRef, 1); - ASSERT(nRef > 0); - - nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1); - ASSERT(nRef > 0); - - nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); - ASSERT(nRef > 0); - - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1); - ASSERT(nRef > 0); - } - - if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - -_exit: - return code; -} - -void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { - int32_t nRef; - char fname[TSDB_FILENAME_LEN]; - - if (pFS->pDelFile) { - nRef = atomic_sub_fetch_32(&pFS->pDelFile->nRef, 1); - ASSERT(nRef >= 0); - if (nRef == 0) { - tsdbDelFileName(pTsdb, pFS->pDelFile, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pFS->pDelFile); - } - } - - for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet); - - // head - nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1); - ASSERT(nRef >= 0); - if (nRef == 0) { - tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pSet->pHeadF); - } - - // data - nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1); - ASSERT(nRef >= 0); - if (nRef == 0) { - tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pSet->pDataF); - } - - // sma - nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1); - ASSERT(nRef >= 0); - if (nRef == 0) { - tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pSet->pSmaF); - } - - // stt - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1); - ASSERT(nRef >= 0); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pSet->aSttF[iStt]); - /* code */ - } - } - } - - taosArrayDestroy(pFS->aDFileSet); -} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 5ad067c113..1f54f13592 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -22,8 +22,8 @@ #include "tsdbUtil2.h" #include "tsimplehash.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) -#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) typedef struct { bool overlapWithNeighborBlock; @@ -41,7 +41,7 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); + SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo); @@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); -static bool hasDataInSttBlock(STableBlockScanInfo *pInfo); +static bool hasDataInSttBlock(STableBlockScanInfo* pInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static void resetTableListIndex(SReaderStatus* pStatus); @@ -1138,7 +1138,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return false; } - int32_t step = asc ? 1 : -1; + int32_t step = asc ? 1 : -1; STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); @@ -1316,17 +1316,17 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } - int64_t st = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); + int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); double el = (taosGetTimestampUs() - st) / 1000.0; updateComposedBlockInfo(pReader, el, pBlockScanInfo); tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 " - %" PRId64 ", uid:%" PRIu64 ", %s", - pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, - pBlockScanInfo->uid, pReader->idStr); + pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid, + pReader->idStr); pReader->cost.buildmemBlock += el; return code; @@ -1390,13 +1390,9 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc } } -static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { - tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); -} +static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); } -static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { - tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); -} +static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, @@ -1535,8 +1531,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == k.ts) { @@ -1585,8 +1580,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == key) { @@ -1648,7 +1642,7 @@ static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, ST TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, - pReader->idStr); + pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1671,7 +1665,7 @@ static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, ST } doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, - pReader->idStr); + pReader->idStr); // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { @@ -1740,7 +1734,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* // the following for key == tsLast // ASC: file block ------> stt block // DESC: stt block ------> file block - SRow* pTSRow = NULL; + SRow* pTSRow = NULL; if (ASCENDING_TRAVERSE(pReader->info.order)) { code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1889,8 +1883,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == ik.ts) { @@ -1948,8 +1941,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == key) { @@ -2120,7 +2112,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan }; SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))}; - int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); + int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -2138,7 +2130,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan pScanInfo->sttWindow.ekey = INT64_MIN; // calculate the time window for data in stt files - for(int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) { STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i); if (pScanInfo->sttWindow.skey > pWindow->skey) { pScanInfo->sttWindow.skey = pWindow->skey; @@ -2149,8 +2141,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan } } - pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList)? STT_FILE_HAS_DATA:STT_FILE_NO_DATA; - pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey; + pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA; + pScanInfo->sttKeyInfo.nextProcKey = + ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; hasData = true; } else { hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); @@ -2168,9 +2161,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan return hasData; } -static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) { - return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; -} +static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; } bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { @@ -2225,10 +2216,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } } -int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { - bool copied = false; - SRow* pTSRow = NULL; - int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); +int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, + STsdbReader* pReader) { + bool copied = false; + SRow* pTSRow = NULL; + int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); SRowMerger* pMerger = &pReader->status.merger; TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); @@ -2559,16 +2551,16 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { pReader->status.bProcMemFirstFileset = false; } - int32_t fid = pReader->status.pCurrentFileset->fid; + int32_t fid = pReader->status.pCurrentFileset->fid; STimeWindow winFid = {0}; tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); if (ASCENDING_TRAVERSE(pReader->info.order)) { pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey || - (winFid.skey-1) < pReader->status.memTableMinKey); + (winFid.skey - 1) < pReader->status.memTableMinKey); } else { - pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) || - pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < (winFid.ekey + 1) || + pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); } if (pReader->status.bProcMemPreFileset) { @@ -2802,7 +2794,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); - if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) { + if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) { return true; } else { int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; @@ -2923,7 +2915,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; - tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr); + tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, + pReader->idStr); while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { @@ -3847,7 +3840,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow]; } - // no data in buffer, return immediately if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { break; @@ -3945,7 +3937,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; -// } else if (READER_EXEC_DATA == pReader->info.readMode) { + // } else if (READER_EXEC_DATA == pReader->info.readMode) { // DO NOTHING } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -4092,7 +4084,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } pReader->flag = READER_STATUS_SUSPEND; - pReader->info.execMode = pCond->notLoadData? READER_EXEC_ROWS : READER_EXEC_DATA; + pReader->info.execMode = pCond->notLoadData ? READER_EXEC_ROWS : READER_EXEC_DATA; pReader->pIgnoreTables = pIgnoreTables; tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 @@ -4157,7 +4149,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { } SReadCostSummary* pCost = &pReader->cost; - SFilesetIter* pFilesetIter = &pReader->status.fileIter; + SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pSttBlockReader != NULL) { SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; tMergeTreeClose(&pLReader->mergeTree); @@ -4204,7 +4196,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableBlockScanInfo* pBlockScanInfo = NULL; - pReader->status.suspendInvoked = true; // record the suspend status + pReader->status.suspendInvoked = true; // record the suspend status if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); @@ -4227,7 +4219,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; - int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; int32_t iter = 0; while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { @@ -4248,7 +4240,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { pReader->flag = READER_STATUS_SUSPEND; #if SUSPEND_RESUME_TEST - tsem_post(&pReader->resumeAfterSuspend); + tsem_post(&pReader->resumeAfterSuspend); #endif tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, @@ -4331,13 +4323,13 @@ _err: } static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SReaderStatus* pStatus = &pReader->status; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - int32_t fid = pReader->status.pCurrentFileset->fid; - STimeWindow win = {0}; + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; @@ -4359,8 +4351,8 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; if (pStatus->loadFromFile) { if (pStatus->bProcMemPreFileset) { @@ -4375,12 +4367,12 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { return code; } - tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s", - pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr); + tsdbTrace("block from file rows: %" PRId64 ", will process pre-file set buffer: %d. %s", pBlock->info.rows, + pStatus->bProcMemFirstFileset, pReader->idStr); if (pStatus->bProcMemPreFileset) { if (pBlock->info.rows > 0) { if (pReader->notifyFn) { - int32_t fid = pReader->status.pCurrentFileset->fid; + int32_t fid = pReader->status.pCurrentFileset->fid; STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam); @@ -4404,8 +4396,8 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; if (pStatus->loadFromFile) { code = buildBlockFromFiles(pReader); @@ -4940,7 +4932,7 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t int64_t minKey = INT64_MAX; void* pHashIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter); - while (pHashIter!= NULL) { + while (pHashIter != NULL) { STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pHashIter; STbData* d = NULL; @@ -5144,7 +5136,6 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); } - tsdbFSUnref(pTsdb, &pSnap->fs); if (pSnap->pNode) taosMemoryFree(pSnap->pNode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode); @@ -5165,9 +5156,7 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } -void tsdbSetFilesetDelimited(STsdbReader* pReader) { - pReader->bFilesetDelimited = true; -} +void tsdbSetFilesetDelimited(STsdbReader* pReader) { pReader->bFilesetDelimited = true; } void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { pReader->notifyFn = notifyFn;