diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 03311e8161..30c1655deb 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -15,6 +15,7 @@ #include "tsdbDef.h" #include "tsdbFSet.h" +#include "tsdbSttFileRW.h" #include "tsdbUtil.h" #ifndef _TSDB_DATA_FILE_RW_H @@ -46,6 +47,8 @@ int32_t tsdbDataFileReaderClose(SDataFileReader **reader); int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray); int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray); int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData); +int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray); +int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData); // SDataFileWriter ============================================= typedef struct SDataFileWriter SDataFileWriter; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h index e3b3d45ba1..6389fbdbeb 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h @@ -33,17 +33,20 @@ typedef enum { TSDB_ITER_TYPE_STT = 1, TSDB_ITER_TYPE_DATA, TSDB_ITER_TYPE_MEMT, + TSDB_ITER_TYPE_STT_TOMB, + TSDB_ITER_TYPE_DATA_TOMB, + TSDB_ITER_TYPE_MEMT_TOMB, } EIterType; typedef struct { EIterType type; union { - SSttSegReader *sttReader; - SDataFileReader *dataReader; + SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB + SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB struct { SMemTable *memt; TSDBKEY from[1]; - }; + }; // TSDB_ITER_TYPE_MEMT || TSDB_ITER_TYPE_MEMT_TOMB }; } STsdbIterConfig; @@ -53,11 +56,12 @@ int32_t tsdbIterClose(STsdbIter **iter); int32_t tsdbIterNext(STsdbIter *iter); // SIterMerger =============== -int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger); -int32_t tsdbIterMergerClose(SIterMerger **merger); -int32_t tsdbIterMergerNext(SIterMerger *merger); -SRowInfo *tsdbIterMergerGet(SIterMerger *merger); -int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid); +int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb); +int32_t tsdbIterMergerClose(SIterMerger **merger); +int32_t tsdbIterMergerNext(SIterMerger *merger); +SRowInfo *tsdbIterMergerGet(SIterMerger *merger); +STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger); +int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 08bd1f3ed7..7f14740f04 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -46,7 +46,7 @@ int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delB int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData); int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData); -int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData); +int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData); struct SSttFileReaderConfig { STsdb *tsdb; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index f58c326e37..a4b48f2543 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -24,9 +24,9 @@ extern "C" { // SDelBlock ---------- -#define DEL_RECORD_NUM_ELEM 5 +#define TOMB_RECORD_NUM_ELEM 5 typedef union { - int64_t aData[DEL_RECORD_NUM_ELEM]; + int64_t aData[TOMB_RECORD_NUM_ELEM]; struct { int64_t suid; int64_t uid; @@ -37,7 +37,7 @@ typedef union { } STombRecord; typedef union { - TARRAY2(int64_t) dataArr[DEL_RECORD_NUM_ELEM]; + TARRAY2(int64_t) dataArr[TOMB_RECORD_NUM_ELEM]; struct { TARRAY2(int64_t) suid[1]; TARRAY2(int64_t) uid[1]; @@ -49,7 +49,7 @@ typedef union { typedef struct { int32_t numRec; - int32_t size[DEL_RECORD_NUM_ELEM]; + int32_t size[TOMB_RECORD_NUM_ELEM]; TABLEID minTid; TABLEID maxTid; int64_t minVer; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index cad4e50a08..3ede67022f 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -174,7 +174,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { code = TARRAY2_APPEND(committer->iterArray, iter); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger); + code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, false); TSDB_CHECK_CODE(code, lino, _exit); // loop iter diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index 27ec7242d8..c51c2203d4 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -217,6 +217,28 @@ _exit: return code; } +int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) { + int32_t code = 0; + int32_t lino = 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) { + int32_t code = 0; + int32_t lino = 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + // SDataFileWriter ============================================= struct SDataFileWriter { SDataFileWriterConfig config[1]; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c index 2c578fe7af..6cfbdc6ff5 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c @@ -20,7 +20,10 @@ struct STsdbIter { struct { bool noMoreData; } ctx[1]; - SRowInfo row[1]; + union { + SRowInfo row[1]; + STombRecord record[1]; + }; SRBTreeNode node[1]; EIterType type; union { @@ -47,6 +50,26 @@ struct STsdbIter { STbData *tbData; STbDataIter tbIter[1]; } memt[1]; + struct { + SSttSegReader *reader; + const TTombBlkArray *tombBlkArray; + int32_t tombBlkArrayIdx; + STombBlock tData[1]; + int32_t iRow; + } sttTomb[1]; + struct { + SDataFileReader *reader; + const TTombBlkArray *tombBlkArray; + int32_t tombBlkArrayIdx; + STombBlock tData[1]; + int32_t iRow; + } dataTomb[1]; + struct { + SMemTable *memt; + SRBTreeIter iter[1]; + STbData *tbData; + STbDataIter tbIter[1]; + } memtTomb[1]; }; }; @@ -197,6 +220,55 @@ _exit: return 0; } +static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) { + while (!iter->ctx->noMoreData) { + for (; iter->dataTomb->iRow < TOMB_BLOCK_SIZE(iter->dataTomb->tData); iter->dataTomb->iRow++) { + iter->record->suid = TARRAY2_GET(iter->dataTomb->tData->suid, iter->dataTomb->iRow); + iter->record->uid = TARRAY2_GET(iter->dataTomb->tData->uid, iter->dataTomb->iRow); + + if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) { + continue; + } + + iter->record->version = TARRAY2_GET(iter->dataTomb->tData->version, iter->dataTomb->iRow); + iter->record->skey = TARRAY2_GET(iter->dataTomb->tData->skey, iter->dataTomb->iRow); + iter->record->ekey = TARRAY2_GET(iter->dataTomb->tData->ekey, iter->dataTomb->iRow); + iter->dataTomb->iRow++; + goto _exit; + } + + if (iter->dataTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->dataTomb->tombBlkArray)) { + iter->ctx->noMoreData = true; + break; + } + + for (; iter->dataTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->dataTomb->tombBlkArray); + iter->dataTomb->tombBlkArrayIdx++) { + const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->dataTomb->tombBlkArray, iter->dataTomb->tombBlkArrayIdx); + + if (tbid && tbid->suid == tombBlk->minTid.suid && tbid->uid == tombBlk->minTid.uid && + tbid->suid == tombBlk->maxTid.suid && tbid->uid == tombBlk->maxTid.uid) { + continue; + } + + int32_t code = tsdbDataFileReadTombBlock(iter->dataTomb->reader, tombBlk, iter->dataTomb->tData); + if (code) return code; + + iter->dataTomb->iRow = 0; + iter->dataTomb->tombBlkArrayIdx++; + break; + } + } + +_exit: + return 0; +} + +static int32_t tsdbMemTableTombIterNext(STsdbIter *iter, const TABLEID *tbid) { + ASSERTS(0, "Not implemented yet!"); + return 0; +} + static int32_t tsdbSttIterOpen(STsdbIter *iter) { int32_t code; @@ -249,6 +321,24 @@ static int32_t tsdbSttIterClose(STsdbIter *iter) { return 0; } +static int32_t tsdbDataTombIterOpen(STsdbIter *iter) { + int32_t code; + + code = tsdbDataFileReadTombBlk(iter->dataTomb->reader, &iter->dataTomb->tombBlkArray); + if (code) return code; + + if (TARRAY2_SIZE(iter->dataTomb->tombBlkArray) == 0) { + iter->ctx->noMoreData = true; + return 0; + } + iter->data->blockIdxArrayIdx = 0; + + tTombBlockInit(iter->dataTomb->tData); + iter->dataTomb->iRow = 0; + + return tsdbDataTombIterNext(iter, NULL); +} + static int32_t tsdbDataIterClose(STsdbIter *iter) { tBlockDataDestroy(iter->data->bData); return 0; @@ -256,6 +346,68 @@ static int32_t tsdbDataIterClose(STsdbIter *iter) { static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; } +static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) { + while (!iter->ctx->noMoreData) { + for (; iter->sttTomb->iRow < TOMB_BLOCK_SIZE(iter->sttTomb->tData); iter->sttTomb->iRow++) { + iter->record->suid = TARRAY2_GET(iter->sttTomb->tData->suid, iter->sttTomb->iRow); + iter->record->uid = TARRAY2_GET(iter->sttTomb->tData->uid, iter->sttTomb->iRow); + + if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) { + continue; + } + + iter->record->version = TARRAY2_GET(iter->sttTomb->tData->version, iter->sttTomb->iRow); + iter->record->skey = TARRAY2_GET(iter->sttTomb->tData->skey, iter->sttTomb->iRow); + iter->record->ekey = TARRAY2_GET(iter->sttTomb->tData->ekey, iter->sttTomb->iRow); + iter->sttTomb->iRow++; + goto _exit; + } + + if (iter->sttTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->sttTomb->tombBlkArray)) { + iter->ctx->noMoreData = true; + break; + } + + for (; iter->sttTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->sttTomb->tombBlkArray); + iter->sttTomb->tombBlkArrayIdx++) { + const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->sttTomb->tombBlkArray, iter->sttTomb->tombBlkArrayIdx); + + if (tbid && tbid->suid == tombBlk->minTid.suid && tbid->uid == tombBlk->minTid.uid && + tbid->suid == tombBlk->maxTid.suid && tbid->uid == tombBlk->maxTid.uid) { + continue; + } + + int32_t code = tsdbSttFileReadTombBlock(iter->sttTomb->reader, tombBlk, iter->sttTomb->tData); + if (code) return code; + + iter->sttTomb->iRow = 0; + iter->sttTomb->tombBlkArrayIdx++; + break; + } + } + +_exit: + return 0; +} + +static int32_t tsdbSttTombIterOpen(STsdbIter *iter) { + int32_t code; + + code = tsdbSttFileReadTombBlk(iter->sttTomb->reader, &iter->sttTomb->tombBlkArray); + if (code) return code; + + if (TARRAY2_SIZE(iter->sttTomb->tombBlkArray) == 0) { + iter->ctx->noMoreData = true; + return 0; + } + + iter->sttTomb->tombBlkArrayIdx = 0; + tTombBlockInit(iter->sttTomb->tData); + iter->sttTomb->iRow = 0; + + return tsdbSttTombIterNext(iter, NULL); +} + int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { int32_t code; @@ -278,8 +430,19 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { iter[0]->memt->from[0] = config->from[0]; code = tsdbMemTableIterOpen(iter[0]); break; + case TSDB_ITER_TYPE_STT_TOMB: + iter[0]->sttTomb->reader = config->sttReader; + code = tsdbSttTombIterOpen(iter[0]); + break; + case TSDB_ITER_TYPE_DATA_TOMB: + iter[0]->dataTomb->reader = config->dataReader; + code = tsdbDataTombIterOpen(iter[0]); + break; + case TSDB_ITER_TYPE_MEMT_TOMB: + ASSERTS(0, "Not implemented"); + break; default: - ASSERT(false); + ASSERTS(false, "Not implemented"); } if (code) { @@ -289,6 +452,16 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { return code; } +static int32_t tsdbSttTombIterClose(STsdbIter *iter) { + tTombBlockFree(iter->sttTomb->tData); + return 0; +} + +static int32_t tsdbDataTombIterClose(STsdbIter *iter) { + tTombBlockFree(iter->dataTomb->tData); + return 0; +} + int32_t tsdbIterClose(STsdbIter **iter) { switch (iter[0]->type) { case TSDB_ITER_TYPE_STT: @@ -300,6 +473,15 @@ int32_t tsdbIterClose(STsdbIter **iter) { case TSDB_ITER_TYPE_MEMT: tsdbMemTableIterClose(iter[0]); break; + case TSDB_ITER_TYPE_STT_TOMB: + tsdbSttTombIterClose(iter[0]); + break; + case TSDB_ITER_TYPE_DATA_TOMB: + tsdbDataTombIterClose(iter[0]); + break; + case TSDB_ITER_TYPE_MEMT_TOMB: + ASSERTS(false, "Not implemented"); + break; default: ASSERT(false); } @@ -316,6 +498,12 @@ int32_t tsdbIterNext(STsdbIter *iter) { return tsdbDataIterNext(iter, NULL); case TSDB_ITER_TYPE_MEMT: return tsdbMemTableIterNext(iter, NULL); + case TSDB_ITER_TYPE_STT_TOMB: + return tsdbSttTombIterNext(iter, NULL); + case TSDB_ITER_TYPE_DATA_TOMB: + return tsdbDataTombIterNext(iter, NULL); + case TSDB_ITER_TYPE_MEMT_TOMB: + return tsdbMemTableTombIterNext(iter, NULL); default: ASSERT(false); } @@ -342,20 +530,49 @@ static int32_t tsdbIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { return tRowInfoCmprFn(&iter1->row, &iter2->row); } +static int32_t tsdbTombIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { + STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node); + STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node); + + if (iter1->record->suid < iter2->record->suid) { + return -1; + } else if (iter1->record->suid > iter2->record->suid) { + return 1; + } + + if (iter1->record->uid < iter2->record->uid) { + return -1; + } else if (iter1->record->uid > iter2->record->uid) { + return 1; + } + + if (iter1->record->version < iter2->record->version) { + return -1; + } else if (iter1->record->version > iter2->record->version) { + return 1; + } + + return 0; +} + // SIterMerger ================ struct SIterMerger { STsdbIter *iter; SRBTree iterTree[1]; }; -int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger) { +int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb) { STsdbIter *iter; SRBTreeNode *node; merger[0] = taosMemoryCalloc(1, sizeof(*merger[0])); if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY; - tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn); + if (isTomb) { + tRBTreeCreate(merger[0]->iterTree, tsdbTombIterCmprFn); + } else { + tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn); + } TARRAY2_FOREACH(iterArray, iter) { if (iter->ctx->noMoreData) continue; node = tRBTreePut(merger[0]->iterTree, iter->node); @@ -385,7 +602,7 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) { if (merger->iter->ctx->noMoreData) { merger->iter = NULL; } else if ((node = tRBTreeMin(merger->iterTree))) { - c = tsdbIterCmprFn(merger->iter->node, node); + c = merger->iterTree->cmprFn(merger->iter->node, node); ASSERT(c); if (c > 0) { node = tRBTreePut(merger->iterTree, merger->iter->node); @@ -402,7 +619,8 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) { return 0; } -SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; } +SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; } +STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger) { return merger->iter ? merger->iter->record : NULL; } int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) { int32_t code; @@ -416,7 +634,7 @@ int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) { if (merger->iter->ctx->noMoreData) { merger->iter = NULL; } else if ((node = tRBTreeMin(merger->iterTree))) { - c = tsdbIterCmprFn(merger->iter->node, node); + c = merger->iterTree->cmprFn(merger->iter->node, node); ASSERT(c); if (c > 0) { node = tRBTreePut(merger->iterTree, merger->iter->node); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 6a02f0d825..8c1e2c2d0b 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -360,7 +360,7 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) { } } - code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger); + code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger, false); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 25ae4d2e12..97ce891a38 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -243,7 +243,7 @@ _exit: return code; } -int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) { +int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) { int32_t code = 0; int32_t lino = 0;