diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index c55e5f92ea..79964c5636 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -562,6 +562,8 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { } else { SCommitter2 committer[1]; + tsdbFSCheckCommit(tsdb->pFS); + code = tsdbOpenCommitter(tsdb, info, committer); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 5e5348e9b5..afc9b7db3a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -18,6 +18,8 @@ #include "vnd.h" #include "vndCos.h" +#define BLOCK_COMMIT_FACTOR 3 + extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); extern void remove_file(const char *fname); @@ -65,11 +67,17 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue; fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue; + taosThreadMutexInit(&fs[0]->commitMutex, NULL); + taosThreadCondInit(&fs[0]->canCommit, NULL); + fs[0]->blockCommit = false; + return 0; } static int32_t destroy_fs(STFileSystem **fs) { if (fs[0] == NULL) return 0; + taosThreadMutexDestroy(&fs[0]->commitMutex); + taosThreadCondDestroy(&fs[0]->canCommit); taosThreadMutexDestroy(fs[0]->mutex); ASSERT(fs[0]->bgTaskNum == 0); @@ -829,6 +837,27 @@ _exit: return code; } +static int32_t tsdbFSSetBlockCommit(STFileSystem *fs, bool block) { + taosThreadMutexLock(&fs->commitMutex); + if (block) { + fs->blockCommit = true; + } else { + fs->blockCommit = false; + taosThreadCondSignal(&fs->canCommit); + } + taosThreadMutexUnlock(&fs->commitMutex); + return 0; +} + +int32_t tsdbFSCheckCommit(STFileSystem *fs) { + taosThreadMutexLock(&fs->commitMutex); + while (fs->blockCommit) { + taosThreadCondWait(&fs->canCommit, &fs->commitMutex); + } + taosThreadMutexUnlock(&fs->commitMutex); + return 0; +} + int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t code = 0; int32_t lino = 0; @@ -838,19 +867,36 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { TSDB_CHECK_CODE(code, lino, _exit); // schedule merge - if (fs->tsdb->pVnode->config.sttTrigger != 1) { + if (fs->tsdb->pVnode->config.sttTrigger > 1) { STFileSet *fset; + int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger; + bool schedMerge = false; + bool blockCommit = false; + TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); - if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; + if (lvl->level != 0) continue; + int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); + if (numFile >= sttTrigger) { + schedMerge = true; + } + + if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) { + blockCommit = true; + } + + if (schedMerge && blockCommit) break; + } + + if (schedMerge) { code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL); TSDB_CHECK_CODE(code, lino, _exit); - - break; } + + tsdbFSSetBlockCommit(fs, blockCommit); } _exit: @@ -1104,4 +1150,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) { fs->stop = false; taosThreadMutexUnlock(fs->mutex); return 0; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index e814ab2fff..b0f42a0c48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -67,6 +67,7 @@ int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSEnableBgTask(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); +int32_t tsdbFSCheckCommit(STFileSystem *fs); struct STFSBgTask { EFSBgTaskT type; @@ -103,6 +104,11 @@ struct STFileSystem { int32_t bgTaskNum; STFSBgTask bgTaskQueue[1]; STFSBgTask *bgTaskRunning; + + // block commit variables + TdThreadMutex commitMutex; + TdThreadCond canCommit; + bool blockCommit; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index a1643acc50..5986e2ed1a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -15,6 +15,8 @@ #include "tsdbMerge.h" +#define TSDB_MAX_LEVEL 6 // means max level is 7 + typedef struct { STsdb *tsdb; TFileSetArray *fsetArr; @@ -100,90 +102,142 @@ _exit: } static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - - merger->ctx->toData = true; - merger->ctx->level = 0; - - // find the highest level that can be merged to - for (int32_t i = 0, numCarry = 0;;) { - int32_t numFile = numCarry; - if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) && - merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) { - numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr); - i++; - } - - numCarry = numFile / merger->sttTrigger; - if (numCarry == 0) { - break; - } else { - merger->ctx->level++; - } - } - - ASSERT(merger->ctx->level > 0); - + int32_t code = 0; + int32_t lino = 0; SSttLvl *lvl; + + bool hasLevelLargerThanMax = false; TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) { - if (TARRAY2_SIZE(lvl->fobjArr) == 0) { - continue; - } - - if (lvl->level <= merger->ctx->level) { - merger->ctx->toData = false; - } - break; - } - - // get number of level-0 files to merge - int32_t numFile = pow(merger->sttTrigger, merger->ctx->level); - TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { - if (lvl->level == 0) continue; - if (lvl->level >= merger->ctx->level) break; - - numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level); - } - - ASSERT(numFile >= 0); - - // get file system operations - TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { - if (lvl->level >= merger->ctx->level) { + if (lvl->level <= TSDB_MAX_LEVEL) { + break; + } else if (TARRAY2_SIZE(lvl->fobjArr) > 0) { + hasLevelLargerThanMax = true; break; } + } - int32_t numMergeFile; - if (lvl->level == 0) { - numMergeFile = numFile; - } else { - numMergeFile = TARRAY2_SIZE(lvl->fobjArr); + if (hasLevelLargerThanMax) { + // merge all stt files + merger->ctx->toData = true; + merger->ctx->level = TSDB_MAX_LEVEL; + + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { + int32_t numMergeFile = TARRAY2_SIZE(lvl->fobjArr); + + for (int32_t i = 0; i < numMergeFile; ++i) { + STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i); + + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = merger->ctx->fset->fid, + .of = fobj->f[0], + }; + code = TARRAY2_APPEND(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + SSttFileReader *reader; + SSttFileReaderConfig config = { + .tsdb = merger->tsdb, + .szPage = merger->szPage, + .file[0] = fobj->f[0], + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(merger->sttReaderArr, reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else { + // do regular merge + merger->ctx->toData = true; + merger->ctx->level = 0; + + // find the highest level that can be merged to + for (int32_t i = 0, numCarry = 0;;) { + int32_t numFile = numCarry; + if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) && + merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) { + numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr); + i++; + } + + numCarry = numFile / merger->sttTrigger; + if (numCarry == 0) { + break; + } else { + merger->ctx->level++; + } } - for (int32_t i = 0; i < numMergeFile; ++i) { - STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i); + ASSERT(merger->ctx->level > 0); - STFileOp op = { - .optype = TSDB_FOP_REMOVE, - .fid = merger->ctx->fset->fid, - .of = fobj->f[0], - }; - code = TARRAY2_APPEND(merger->fopArr, op); - TSDB_CHECK_CODE(code, lino, _exit); + if (merger->ctx->level <= TSDB_MAX_LEVEL) { + TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) { + if (TARRAY2_SIZE(lvl->fobjArr) == 0) { + continue; + } - SSttFileReader *reader; - SSttFileReaderConfig config = { - .tsdb = merger->tsdb, - .szPage = merger->szPage, - .file[0] = fobj->f[0], - }; + if (lvl->level >= merger->ctx->level) { + merger->ctx->toData = false; + } + break; + } + } - code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); - TSDB_CHECK_CODE(code, lino, _exit); + // get number of level-0 files to merge + int32_t numFile = pow(merger->sttTrigger, merger->ctx->level); + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { + if (lvl->level == 0) continue; + if (lvl->level >= merger->ctx->level) break; - code = TARRAY2_APPEND(merger->sttReaderArr, reader); - TSDB_CHECK_CODE(code, lino, _exit); + numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level); + } + + ASSERT(numFile >= 0); + + // get file system operations + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) { + if (lvl->level >= merger->ctx->level) { + break; + } + + int32_t numMergeFile; + if (lvl->level == 0) { + numMergeFile = numFile; + } else { + numMergeFile = TARRAY2_SIZE(lvl->fobjArr); + } + + for (int32_t i = 0; i < numMergeFile; ++i) { + STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i); + + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = merger->ctx->fset->fid, + .of = fobj->f[0], + }; + code = TARRAY2_APPEND(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + SSttFileReader *reader; + SSttFileReaderConfig config = { + .tsdb = merger->tsdb, + .szPage = merger->szPage, + .file[0] = fobj->f[0], + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(merger->sttReaderArr, reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + if (merger->ctx->level > TSDB_MAX_LEVEL) { + merger->ctx->level = TSDB_MAX_LEVEL; } }