From 09628401f872f0ceb6182c5f197bed6291e1d9a0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Feb 2023 14:49:50 +0800 Subject: [PATCH] refact code --- source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/inc/tsdb.h | 78 ++++ source/dnode/vnode/src/tsdb/tsdbCompact.c | 4 + source/dnode/vnode/src/tsdb/tsdbDataIter.c | 394 ++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 449 +-------------------- 5 files changed, 478 insertions(+), 448 deletions(-) create mode 100644 source/dnode/vnode/src/tsdb/tsdbDataIter.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index c8a9f2bfc4..ea7046886e 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -54,6 +54,7 @@ target_sources( "src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbCompact.c" "src/tsdb/tsdbMergeTree.c" + "src/tsdb/tsdbDataIter.c" # tq "src/tq/tq.c" diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2276601ec7..d387d2dfbe 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -69,6 +69,8 @@ typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SBlkInfo SBlkInfo; +typedef struct STsdbDataIter2 STsdbDataIter2; +typedef struct STsdbFilterInfo STsdbFilterInfo; #define TSDBROW_ROW_FMT ((int8_t)0x0) #define TSDBROW_COL_FMT ((int8_t)0x1) @@ -279,6 +281,7 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); +int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData); int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); // SDelFWriter @@ -310,6 +313,25 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder); int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId); int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo); +// tsdbDataIter.c ============================================================================================== +#define TSDB_MEM_TABLE_DATA_ITER 0 +#define TSDB_DATA_FILE_DATA_ITER 1 +#define TSDB_STT_FILE_DATA_ITER 2 +#define TSDB_TOMB_FILE_DATA_ITER 3 + +#define TSDB_FILTER_FLAG_BY_VERSION 0x1 + +#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2 *)(((char *)pNode) - offsetof(STsdbDataIter2, rbtn))) +/* open */ +int32_t tsdbOpenDataFileDataIter(SDataFReader *pReader, STsdbDataIter2 **ppIter); +int32_t tsdbOpenSttFileDataIter(SDataFReader *pReader, int32_t iStt, STsdbDataIter2 **ppIter); +int32_t tsdbOpenTombFileDataIter(SDelFReader *pReader, STsdbDataIter2 **ppIter); +/* close */ +void tsdbCloseDataIter2(STsdbDataIter2 *pIter); +/* cmpr */ +int32_t tsdbDataIterCmprFn(const SRBTreeNode *pNode1, const SRBTreeNode *pNode2); +/* next */ +int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo); // structs ======================= struct STsdbFS { @@ -830,6 +852,62 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { int32_t tRowInfoCmprFn(const void *p1, const void *p2); +typedef struct { + int64_t suid; + int64_t uid; + SDelData delData; +} SDelInfo; + +struct STsdbDataIter2 { + STsdbDataIter2 *next; + SRBTreeNode rbtn; + + int32_t type; + SRowInfo rowInfo; + SDelInfo delInfo; + union { + // TSDB_MEM_TABLE_DATA_ITER + struct { + SMemTable *pMemTable; + } mIter; + + // TSDB_DATA_FILE_DATA_ITER + struct { + SDataFReader *pReader; + SArray *aBlockIdx; // SArray + SMapData mDataBlk; + SBlockData bData; + int32_t iBlockIdx; + int32_t iDataBlk; + int32_t iRow; + } dIter; + + // TSDB_STT_FILE_DATA_ITER + struct { + SDataFReader *pReader; + int32_t iStt; + SArray *aSttBlk; + SBlockData bData; + int32_t iSttBlk; + int32_t iRow; + } sIter; + // TSDB_TOMB_FILE_DATA_ITER + struct { + SDelFReader *pReader; + SArray *aDelIdx; + SArray *aDelData; + int32_t iDelIdx; + int32_t iDelData; + } tIter; + }; +}; + +struct STsdbFilterInfo { + int32_t flag; + int64_t sver; + int64_t ever; +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index bf61d0fbaf..6948409ca0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#if 0 #define TSDB_ITER_TYPE_MEM 0x0 #define TSDB_ITER_TYPE_DAT 0x1 @@ -943,11 +944,13 @@ _exit: tsdbCloseCompactor(pCompactor); return code; } +#endif int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { int32_t code = 0; int32_t lino = 0; +#if 0 STsdbCompactor *pCompactor = &(STsdbCompactor){0}; // begin compact @@ -977,5 +980,6 @@ _exit: tsdbCommitCompact(pCompactor); } tsdbEndCompact(pCompactor); +#endif return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c new file mode 100644 index 0000000000..c8392166ad --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c @@ -0,0 +1,394 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +/* open */ +int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) { + int32_t code = 0; + int32_t lino = 0; + + // create handle + STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); + if (pIter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pIter->type = TSDB_DATA_FILE_DATA_ITER; + pIter->dIter.pReader = pReader; + if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataCreate(&pIter->dIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->dIter.iBlockIdx = 0; + pIter->dIter.iDataBlk = 0; + pIter->dIter.iRow = 0; + + // read data + code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear; + +_exit: + if (code) { + if (pIter) { + _clear: + tBlockDataDestroy(&pIter->dIter.bData); + taosArrayDestroy(pIter->dIter.aBlockIdx); + taosMemoryFree(pIter); + pIter = NULL; + } + } + *ppIter = pIter; + return code; +} + +int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) { + int32_t code = 0; + int32_t lino = 0; + + // create handle + STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); + if (pIter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pIter->type = TSDB_STT_FILE_DATA_ITER; + pIter->sIter.pReader = pReader; + pIter->sIter.iStt = iStt; + pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pIter->sIter.aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataCreate(&pIter->sIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->sIter.iSttBlk = 0; + pIter->sIter.iRow = 0; + + // read data + code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear; + +_exit: + if (code) { + if (pIter) { + _clear: + taosArrayDestroy(pIter->sIter.aSttBlk); + tBlockDataDestroy(&pIter->sIter.bData); + taosMemoryFree(pIter); + pIter = NULL; + } + } + *ppIter = pIter; + return code; +} + +int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) { + int32_t code = 0; + int32_t lino = 0; + + STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); + if (pIter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + pIter->type = TSDB_TOMB_FILE_DATA_ITER; + + pIter->tIter.pReader = pReader; + if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear; + + pIter->tIter.iDelIdx = 0; + pIter->tIter.iDelData = 0; + +_exit: + if (code) { + if (pIter) { + _clear: + taosArrayDestroy(pIter->tIter.aDelIdx); + taosArrayDestroy(pIter->tIter.aDelData); + taosMemoryFree(pIter); + pIter = NULL; + } + } + *ppIter = pIter; + return code; +} + +/* close */ +static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) { + tBlockDataDestroy(&pIter->dIter.bData); + tMapDataClear(&pIter->dIter.mDataBlk); + taosArrayDestroy(pIter->dIter.aBlockIdx); + taosMemoryFree(pIter); +} + +static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) { + tBlockDataDestroy(&pIter->sIter.bData); + taosArrayDestroy(pIter->sIter.aSttBlk); + taosMemoryFree(pIter); +} + +static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) { + taosArrayDestroy(pIter->tIter.aDelData); + taosArrayDestroy(pIter->tIter.aDelIdx); + taosMemoryFree(pIter); +} + +void tsdbCloseDataIter2(STsdbDataIter2* pIter) { + if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { + ASSERT(0); + } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { + tsdbCloseDataFileDataIter(pIter); + } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { + tsdbCloseSttFileDataIter(pIter); + } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { + tsdbCloseTombFileDataIter(pIter); + } else { + ASSERT(0); + } +} + +/* cmpr */ +int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { + STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1); + STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2); + return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo); +} + +/* seek */ + +/* iter next */ +static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + int32_t lino = 0; + + for (;;) { + while (pIter->dIter.iRow < pIter->dIter.bData.nRow) { + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver || + pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) { + pIter->dIter.iRow++; + continue; + } + } + } + + pIter->rowInfo.suid = pIter->dIter.bData.suid; + pIter->rowInfo.uid = pIter->dIter.bData.uid; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow); + pIter->dIter.iRow++; + goto _exit; + } + + for (;;) { + while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); + + // filter + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) { + pIter->dIter.iDataBlk++; + continue; + } + } + } + + code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->dIter.iDataBlk++; + pIter->dIter.iRow = 0; + + break; + } + + if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break; + + for (;;) { + if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) { + SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); + + code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->dIter.iBlockIdx++; + pIter->dIter.iDataBlk = 0; + + break; + } else { + pIter->rowInfo = (SRowInfo){0}; + goto _exit; + } + } + } + } + +_exit: + if (code) { + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + int32_t lino = 0; + + for (;;) { + while (pIter->sIter.iRow < pIter->sIter.bData.nRow) { + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] || + pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) { + pIter->sIter.iRow++; + continue; + } + } + } + + pIter->rowInfo.suid = pIter->sIter.bData.suid; + pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow); + pIter->sIter.iRow++; + goto _exit; + } + + for (;;) { + if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) { + SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk); + + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) { + pIter->sIter.iSttBlk++; + continue; + } + } + } + + code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->sIter.iRow = 0; + pIter->sIter.iSttBlk++; + break; + } else { + pIter->rowInfo = (SRowInfo){0}; + goto _exit; + } + } + } + +_exit: + if (code) { + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + int32_t lino = 0; + + for (;;) { + while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) { + SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData); + + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) { + pIter->tIter.iDelData++; + continue; + } + } + } + + pIter->delInfo.delData = *pDelData; + pIter->tIter.iDelData++; + goto _exit; + } + + for (;;) { + if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) { + SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx); + + code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->delInfo.suid = pDelIdx->suid; + pIter->delInfo.uid = pDelIdx->uid; + pIter->tIter.iDelData = 0; + pIter->tIter.iDelIdx++; + break; + } else { + pIter->delInfo = (SDelInfo){0}; + goto _exit; + } + } + } + +_exit: + if (code) { + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + + if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { + ASSERT(0); + return code; + } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { + return tsdbDataFileDataIterNext(pIter, pFilterInfo); + } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { + return tsdbSttFileDataIterNext(pIter, pFilterInfo); + } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { + return tsdbTombFileDataIterNext(pIter, pFilterInfo); + } else { + ASSERT(0); + return code; + } +} + +/* get */ diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 0e804bc65e..55b52e8bf8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -15,457 +15,10 @@ #include "tsdb.h" -extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); -// STsdbDataIter2 ======================================== -#define TSDB_MEM_TABLE_DATA_ITER 0 -#define TSDB_DATA_FILE_DATA_ITER 1 -#define TSDB_STT_FILE_DATA_ITER 2 -#define TSDB_TOMB_FILE_DATA_ITER 3 - -typedef struct STsdbDataIter2 STsdbDataIter2; -typedef struct STsdbFilterInfo STsdbFilterInfo; - -typedef struct { - int64_t suid; - int64_t uid; - SDelData delData; -} SDelInfo; - -struct STsdbDataIter2 { - STsdbDataIter2* next; - SRBTreeNode rbtn; - - int32_t type; - SRowInfo rowInfo; - SDelInfo delInfo; - union { - // TSDB_MEM_TABLE_DATA_ITER - struct { - SMemTable* pMemTable; - } mIter; - - // TSDB_DATA_FILE_DATA_ITER - struct { - SDataFReader* pReader; - SArray* aBlockIdx; // SArray - SMapData mDataBlk; - SBlockData bData; - int32_t iBlockIdx; - int32_t iDataBlk; - int32_t iRow; - } dIter; - - // TSDB_STT_FILE_DATA_ITER - struct { - SDataFReader* pReader; - int32_t iStt; - SArray* aSttBlk; - SBlockData bData; - int32_t iSttBlk; - int32_t iRow; - } sIter; - // TSDB_TOMB_FILE_DATA_ITER - struct { - SDelFReader* pReader; - SArray* aDelIdx; - SArray* aDelData; - int32_t iDelIdx; - int32_t iDelData; - } tIter; - }; -}; - -#define TSDB_FILTER_FLAG_BY_VERSION 0x1 -struct STsdbFilterInfo { - int32_t flag; - int64_t sver; - int64_t ever; -}; - -#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn))) - -/* open */ -static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) { - int32_t code = 0; - int32_t lino = 0; - - // create handle - STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); - if (pIter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pIter->type = TSDB_DATA_FILE_DATA_ITER; - pIter->dIter.pReader = pReader; - if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pIter->dIter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->dIter.iBlockIdx = 0; - pIter->dIter.iDataBlk = 0; - pIter->dIter.iRow = 0; - - // read data - code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear; - -_exit: - if (code) { - if (pIter) { - _clear: - tBlockDataDestroy(&pIter->dIter.bData); - taosArrayDestroy(pIter->dIter.aBlockIdx); - taosMemoryFree(pIter); - pIter = NULL; - } - } - *ppIter = pIter; - return code; -} - -static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) { - int32_t code = 0; - int32_t lino = 0; - - // create handle - STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); - if (pIter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pIter->type = TSDB_STT_FILE_DATA_ITER; - pIter->sIter.pReader = pReader; - pIter->sIter.iStt = iStt; - pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pIter->sIter.aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pIter->sIter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->sIter.iSttBlk = 0; - pIter->sIter.iRow = 0; - - // read data - code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear; - -_exit: - if (code) { - if (pIter) { - _clear: - taosArrayDestroy(pIter->sIter.aSttBlk); - tBlockDataDestroy(&pIter->sIter.bData); - taosMemoryFree(pIter); - pIter = NULL; - } - } - *ppIter = pIter; - return code; -} - -static int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) { - int32_t code = 0; - int32_t lino = 0; - - STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); - if (pIter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pIter->type = TSDB_TOMB_FILE_DATA_ITER; - - pIter->tIter.pReader = pReader; - if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear; - - pIter->tIter.iDelIdx = 0; - pIter->tIter.iDelData = 0; - -_exit: - if (code) { - if (pIter) { - _clear: - taosArrayDestroy(pIter->tIter.aDelIdx); - taosArrayDestroy(pIter->tIter.aDelData); - taosMemoryFree(pIter); - pIter = NULL; - } - } - *ppIter = pIter; - return code; -} - -/* close */ -static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) { - tBlockDataDestroy(&pIter->dIter.bData); - tMapDataClear(&pIter->dIter.mDataBlk); - taosArrayDestroy(pIter->dIter.aBlockIdx); - taosMemoryFree(pIter); -} - -static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) { - tBlockDataDestroy(&pIter->sIter.bData); - taosArrayDestroy(pIter->sIter.aSttBlk); - taosMemoryFree(pIter); -} - -static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) { - taosArrayDestroy(pIter->tIter.aDelData); - taosArrayDestroy(pIter->tIter.aDelIdx); - taosMemoryFree(pIter); -} - -static void tsdbCloseDataIter2(STsdbDataIter2* pIter) { - if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { - ASSERT(0); - } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { - tsdbCloseDataFileDataIter(pIter); - } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { - tsdbCloseSttFileDataIter(pIter); - } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { - tsdbCloseTombFileDataIter(pIter); - } else { - ASSERT(0); - } -} - -/* cmpr */ -static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { - STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1); - STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2); - return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo); -} - -/* seek */ - -/* iter next */ -static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { - int32_t code = 0; - int32_t lino = 0; - - for (;;) { - while (pIter->dIter.iRow < pIter->dIter.bData.nRow) { - if (pFilterInfo) { - if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { - if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver || - pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) { - pIter->dIter.iRow++; - continue; - } - } - } - - pIter->rowInfo.suid = pIter->dIter.bData.suid; - pIter->rowInfo.uid = pIter->dIter.bData.uid; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow); - pIter->dIter.iRow++; - goto _exit; - } - - for (;;) { - while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) { - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); - - // filter - if (pFilterInfo) { - if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { - if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) { - pIter->dIter.iDataBlk++; - continue; - } - } - } - - code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->dIter.iDataBlk++; - pIter->dIter.iRow = 0; - - break; - } - - if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break; - - for (;;) { - if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) { - SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); - - code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->dIter.iBlockIdx++; - pIter->dIter.iDataBlk = 0; - - break; - } else { - pIter->rowInfo = (SRowInfo){0}; - goto _exit; - } - } - } - } - -_exit: - if (code) { - tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { - int32_t code = 0; - int32_t lino = 0; - - for (;;) { - while (pIter->sIter.iRow < pIter->sIter.bData.nRow) { - if (pFilterInfo) { - if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { - if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] || - pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) { - pIter->sIter.iRow++; - continue; - } - } - } - - pIter->rowInfo.suid = pIter->sIter.bData.suid; - pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow); - pIter->sIter.iRow++; - goto _exit; - } - - for (;;) { - if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) { - SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk); - - if (pFilterInfo) { - if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { - if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) { - pIter->sIter.iSttBlk++; - continue; - } - } - } - - code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->sIter.iRow = 0; - pIter->sIter.iSttBlk++; - break; - } else { - pIter->rowInfo = (SRowInfo){0}; - goto _exit; - } - } - } - -_exit: - if (code) { - tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { - int32_t code = 0; - int32_t lino = 0; - - for (;;) { - while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) { - SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData); - - if (pFilterInfo) { - if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { - if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) { - pIter->tIter.iDelData++; - continue; - } - } - } - - pIter->delInfo.delData = *pDelData; - pIter->tIter.iDelData++; - goto _exit; - } - - for (;;) { - if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) { - SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx); - - code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->delInfo.suid = pDelIdx->suid; - pIter->delInfo.uid = pDelIdx->uid; - pIter->tIter.iDelData = 0; - pIter->tIter.iDelIdx++; - break; - } else { - pIter->delInfo = (SDelInfo){0}; - goto _exit; - } - } - } - -_exit: - if (code) { - tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { - int32_t code = 0; - - if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { - ASSERT(0); - return code; - } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { - return tsdbDataFileDataIterNext(pIter, pFilterInfo); - } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { - return tsdbSttFileDataIterNext(pIter, pFilterInfo); - } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { - return tsdbTombFileDataIterNext(pIter, pFilterInfo); - } else { - ASSERT(0); - return code; - } -} - -/* get */ - // STsdbSnapReader ======================================== struct STsdbSnapReader { STsdb* pTsdb; @@ -1318,7 +871,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pSIter) { - code = tsdbSttFileDataIterNext(pWriter->pSIter, NULL); + code = tsdbDataIterNext2(pWriter->pSIter, NULL); TSDB_CHECK_CODE(code, lino, _exit); // add to tree