From 2c9a0dbcdaa880ba7a2be8dffbdb6733e176cfb8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Oct 2023 14:00:11 +0800 Subject: [PATCH] enh: control stt max level --- source/dnode/vnode/src/tsdb/tsdbMerge.c | 200 +++++++++++++++--------- 1 file changed, 127 insertions(+), 73 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index a1643acc50..5986e2ed1a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -15,6 +15,8 @@ #include "tsdbMerge.h" +#define TSDB_MAX_LEVEL 6 // means max level is 7 + typedef struct { STsdb *tsdb; TFileSetArray *fsetArr; @@ -100,90 +102,142 @@ _exit: } static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - - merger->ctx->toData = true; - merger->ctx->level = 0; - - // find the highest level that can be merged to - for (int32_t i = 0, numCarry = 0;;) { - int32_t numFile = numCarry; - if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) && - merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) { - numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr); - i++; - } - - numCarry = numFile / merger->sttTrigger; - if (numCarry == 0) { - break; - } else { - merger->ctx->level++; - } - } - - ASSERT(merger->ctx->level > 0); - + int32_t code = 0; + int32_t lino = 0; SSttLvl *lvl; + + bool hasLevelLargerThanMax = false; TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) { - if (TARRAY2_SIZE(lvl->fobjArr) == 0) { - continue; - } - - if (lvl->level <= merger->ctx->level) { - merger->ctx->toData = false; - } - break; - } - - // get number of level-0 files to merge - int32_t numFile = pow(merger->sttTrigger, merger->ctx->level); - TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { - if (lvl->level == 0) continue; - if (lvl->level >= merger->ctx->level) break; - - numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level); - } - - ASSERT(numFile >= 0); - - // get file system operations - TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { - if (lvl->level >= merger->ctx->level) { + if (lvl->level <= TSDB_MAX_LEVEL) { + break; + } else if (TARRAY2_SIZE(lvl->fobjArr) > 0) { + hasLevelLargerThanMax = true; break; } + } - int32_t numMergeFile; - if (lvl->level == 0) { - numMergeFile = numFile; - } else { - numMergeFile = TARRAY2_SIZE(lvl->fobjArr); + if (hasLevelLargerThanMax) { + // merge all stt files + merger->ctx->toData = true; + merger->ctx->level = TSDB_MAX_LEVEL; + + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { + int32_t numMergeFile = TARRAY2_SIZE(lvl->fobjArr); + + for (int32_t i = 0; i < numMergeFile; ++i) { + STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i); + + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = merger->ctx->fset->fid, + .of = fobj->f[0], + }; + code = TARRAY2_APPEND(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + SSttFileReader *reader; + SSttFileReaderConfig config = { + .tsdb = merger->tsdb, + .szPage = merger->szPage, + .file[0] = fobj->f[0], + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(merger->sttReaderArr, reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else { + // do regular merge + merger->ctx->toData = true; + merger->ctx->level = 0; + + // find the highest level that can be merged to + for (int32_t i = 0, numCarry = 0;;) { + int32_t numFile = numCarry; + if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) && + merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) { + numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr); + i++; + } + + numCarry = numFile / merger->sttTrigger; + if (numCarry == 0) { + break; + } else { + merger->ctx->level++; + } } - for (int32_t i = 0; i < numMergeFile; ++i) { - STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i); + ASSERT(merger->ctx->level > 0); - STFileOp op = { - .optype = TSDB_FOP_REMOVE, - .fid = merger->ctx->fset->fid, - .of = fobj->f[0], - }; - code = TARRAY2_APPEND(merger->fopArr, op); - TSDB_CHECK_CODE(code, lino, _exit); + if (merger->ctx->level <= TSDB_MAX_LEVEL) { + TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) { + if (TARRAY2_SIZE(lvl->fobjArr) == 0) { + continue; + } - SSttFileReader *reader; - SSttFileReaderConfig config = { - .tsdb = merger->tsdb, - .szPage = merger->szPage, - .file[0] = fobj->f[0], - }; + if (lvl->level >= merger->ctx->level) { + merger->ctx->toData = false; + } + break; + } + } - code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); - TSDB_CHECK_CODE(code, lino, _exit); + // get number of level-0 files to merge + int32_t numFile = pow(merger->sttTrigger, merger->ctx->level); + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { + if (lvl->level == 0) continue; + if (lvl->level >= merger->ctx->level) break; - code = TARRAY2_APPEND(merger->sttReaderArr, reader); - TSDB_CHECK_CODE(code, lino, _exit); + numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level); + } + + ASSERT(numFile >= 0); + + // get file system operations + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { + if (lvl->level >= merger->ctx->level) { + break; + } + + int32_t numMergeFile; + if (lvl->level == 0) { + numMergeFile = numFile; + } else { + numMergeFile = TARRAY2_SIZE(lvl->fobjArr); + } + + for (int32_t i = 0; i < numMergeFile; ++i) { + STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i); + + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = merger->ctx->fset->fid, + .of = fobj->f[0], + }; + code = TARRAY2_APPEND(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + SSttFileReader *reader; + SSttFileReaderConfig config = { + .tsdb = merger->tsdb, + .szPage = merger->szPage, + .file[0] = fobj->f[0], + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(merger->sttReaderArr, reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + if (merger->ctx->level > TSDB_MAX_LEVEL) { + merger->ctx->level = TSDB_MAX_LEVEL; } }