From 438d0caef7ad9c18fecd4d07fd0de886814d6985 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 12 Jun 2023 16:55:20 +0800 Subject: [PATCH] mroe code --- .../dnode/vnode/src/tsdb/dev/inc/tsdbIter.h | 3 +- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 137 ++++++++++++++---- source/dnode/vnode/src/tsdb/dev/tsdbIter.c | 58 ++++++++ 3 files changed, 165 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h index 41f0c76ade..8c4b2569f4 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h @@ -35,6 +35,7 @@ typedef enum { TSDB_ITER_TYPE_MEMT, TSDB_ITER_TYPE_STT_TOMB, TSDB_ITER_TYPE_DATA_TOMB, + TSDB_ITER_TYPE_MEMT_TOMB, } EIterType; typedef struct { @@ -43,7 +44,7 @@ typedef struct { SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB struct { - SMemTable *memt; + SMemTable *memt; // TSDB_ITER_TYPE_MEMT_TOMB TSDBKEY from[1]; }; // TSDB_ITER_TYPE_MEMT }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index f515a4ebc2..9646262049 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -455,47 +455,44 @@ _exit: return code; } -static int32_t tsdbCommitDelData(SCommitter2 *committer) { +static int32_t tsdbCommitTombDataToStt(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - SMemTable *mem = committer->tsdb->imem; + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { + code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); - if (mem->nDel == 0 // - || (committer->ctx->fset == NULL // - && committer->sttWriter == NULL) // - ) { - committer->ctx->nextKey = committer->ctx->maxKey + 1; - goto _exit; + code = tsdbIterMergerNext(committer->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); } - SRBTreeIter iter[1] = {tRBTreeIterCreate(committer->tsdb->imem->tbDataTree, 1)}; +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} - for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) { - STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); - STombRecord record[1] = {{ - .suid = tbData->suid, - .uid = tbData->uid, - }}; - - for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) { - if (delData->eKey < committer->ctx->minKey) continue; - if (delData->sKey > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey); - continue; - } - - record->version = delData->version; - record->skey = TMAX(delData->sKey, committer->ctx->minKey); - if (delData->eKey > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1); - record->ekey = committer->ctx->maxKey; - } else { - record->ekey = delData->eKey; - } +static int32_t tsdbCommitTombDataToData(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) { + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(committer->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { + code = tsdbDataFileWriteTombRecord(committer->dataWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(committer->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -506,6 +503,82 @@ _exit: return code; } +static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + STsdbIter *iter; + STsdbIterConfig config[1]; + + if (committer->sttReader) { + const TSttSegReaderArray *readerArray; + + tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray); + + SSttSegReader *segReader; + TARRAY2_FOREACH(readerArray, segReader) { + config->type = TSDB_ITER_TYPE_STT_TOMB; + config->sttReader = segReader; + + code = tsdbIterOpen(config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(committer->iterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + config->type = TSDB_ITER_TYPE_MEMT_TOMB; + config->memt = committer->tsdb->imem; + + code = tsdbIterOpen(config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(committer->iterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); + + // open iter + code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, true); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTombDataCloseIter(SCommitter2 *committer) { + tsdbIterMergerClose(&committer->iterMerger); + TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); + return 0; +} + +static int32_t tsdbCommitTombData(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbCommitTombDataOpenIter(committer); + TSDB_CHECK_CODE(code, lino, _exit); + + if (committer->sttTrigger > 1) { + code = tsdbCommitTombDataToStt(committer); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbCommitTombDataToData(committer); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbCommitTombDataCloseIter(committer); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; @@ -578,7 +651,7 @@ static int32_t tsdbCommitFileSet(SCommitter2 *committer) { code = tsdbCommitTSData(committer); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbCommitDelData(committer); + code = tsdbCommitTombData(committer); TSDB_CHECK_CODE(code, lino, _exit); // fset commit end diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c index 6b614d7e84..a79c9a1ca5 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c @@ -64,6 +64,12 @@ struct STsdbIter { STombBlock tombBlock[1]; int32_t tombBlockIdx; } dataTomb[1]; + struct { + SMemTable *memt; + SRBTreeIter rbtIter[1]; + STbData *tbData; + SDelData *delData; + } memtTomb[1]; }; }; @@ -259,6 +265,45 @@ _exit: return 0; } +static int32_t tsdbMemTombIterNext(STsdbIter *iter, const TABLEID *tbid) { + while (!iter->ctx->noMoreData) { + for (; iter->memtTomb->delData;) { + if (tbid && tbid->uid == iter->memtTomb->tbData->uid) { + iter->memtTomb->delData = NULL; + break; + } + + iter->record->suid = iter->memtTomb->tbData->suid; + iter->record->uid = iter->memtTomb->tbData->uid; + iter->record->version = iter->memtTomb->delData->version; + iter->record->skey = iter->memtTomb->delData->sKey; + iter->record->ekey = iter->memtTomb->delData->eKey; + + iter->memtTomb->delData = iter->memtTomb->delData->pNext; + goto _exit; + } + + for (;;) { + SRBTreeNode *node = tRBTreeIterNext(iter->memtTomb->rbtIter); + if (node == NULL) { + iter->ctx->noMoreData = true; + goto _exit; + } + + iter->memtTomb->tbData = TCONTAINER_OF(node, STbData, rbtn); + if (tbid && tbid->uid == iter->memtTomb->tbData->uid) { + continue; + } else { + iter->memtTomb->delData = iter->memtTomb->tbData->pHead; + break; + } + } + } + +_exit: + return 0; +} + static int32_t tsdbSttIterOpen(STsdbIter *iter) { int32_t code; @@ -330,6 +375,13 @@ static int32_t tsdbDataTombIterOpen(STsdbIter *iter) { return tsdbDataTombIterNext(iter, NULL); } +static int32_t tsdbMemTombIterOpen(STsdbIter *iter) { + int32_t code; + + iter->memtTomb->rbtIter[0] = tRBTreeIterCreate(iter->memtTomb->memt->tbDataTree, 1); + return tsdbMemTombIterNext(iter, NULL); +} + static int32_t tsdbDataIterClose(STsdbIter *iter) { tBrinBlockDestroy(iter->dataData->brinBlock); tBlockDataDestroy(iter->dataData->blockData); @@ -432,6 +484,10 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { iter[0]->dataTomb->reader = config->dataReader; code = tsdbDataTombIterOpen(iter[0]); break; + case TSDB_ITER_TYPE_MEMT_TOMB: + iter[0]->memtTomb->memt = config->memt; + code = tsdbMemTombIterOpen(iter[0]); + break; default: code = TSDB_CODE_INVALID_PARA; ASSERTS(false, "Not implemented"); @@ -471,6 +527,8 @@ int32_t tsdbIterClose(STsdbIter **iter) { case TSDB_ITER_TYPE_DATA_TOMB: tsdbDataTombIterClose(iter[0]); break; + case TSDB_ITER_TYPE_MEMT_TOMB: + break; default: ASSERT(false); }