diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index c55e5f92ea..79964c5636 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -562,6 +562,8 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { } else { SCommitter2 committer[1]; + tsdbFSCheckCommit(tsdb->pFS); + code = tsdbOpenCommitter(tsdb, info, committer); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 5e5348e9b5..e16eee39e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -65,11 +65,17 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue; fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue; + taosThreadMutexInit(&fs[0]->commitMutex, NULL); + taosThreadCondInit(&fs[0]->canCommit, NULL); + fs[0]->blockCommit = false; + return 0; } static int32_t destroy_fs(STFileSystem **fs) { if (fs[0] == NULL) return 0; + taosThreadMutexDestroy(&fs[0]->commitMutex); + taosThreadCondDestroy(&fs[0]->canCommit); taosThreadMutexDestroy(fs[0]->mutex); ASSERT(fs[0]->bgTaskNum == 0); @@ -829,6 +835,27 @@ _exit: return code; } +static int32_t tsdbFSSetBlockCommit(STFileSystem *fs, bool block) { + taosThreadMutexLock(&fs->commitMutex); + if (block) { + fs->blockCommit = true; + } else { + fs->blockCommit = false; + taosThreadCondSignal(&fs->canCommit); + } + taosThreadMutexUnlock(&fs->commitMutex); + return 0; +} + +int32_t tsdbFSCheckCommit(STFileSystem *fs) { + taosThreadMutexLock(&fs->commitMutex); + while (fs->blockCommit) { + taosThreadCondWait(&fs->canCommit, &fs->commitMutex); + } + taosThreadMutexUnlock(&fs->commitMutex); + return 0; +} + int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t code = 0; int32_t lino = 0; @@ -838,19 +865,36 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { TSDB_CHECK_CODE(code, lino, _exit); // schedule merge - if (fs->tsdb->pVnode->config.sttTrigger != 1) { + if (fs->tsdb->pVnode->config.sttTrigger > 1) { STFileSet *fset; + int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger; + bool schedMerge = false; + bool blockCommit = false; + TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); - if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; + if (lvl->level != 0) continue; + int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); + if (numFile >= sttTrigger) { + schedMerge = true; + } + + if (numFile >= sttTrigger * 2) { + blockCommit = true; + } + + if (schedMerge && blockCommit) break; + } + + if (schedMerge) { code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL); TSDB_CHECK_CODE(code, lino, _exit); - - break; } + + tsdbFSSetBlockCommit(fs, blockCommit); } _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index e814ab2fff..b0f42a0c48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -67,6 +67,7 @@ int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSEnableBgTask(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); +int32_t tsdbFSCheckCommit(STFileSystem *fs); struct STFSBgTask { EFSBgTaskT type; @@ -103,6 +104,11 @@ struct STFileSystem { int32_t bgTaskNum; STFSBgTask bgTaskQueue[1]; STFSBgTask *bgTaskRunning; + + // block commit variables + TdThreadMutex commitMutex; + TdThreadCond canCommit; + bool blockCommit; }; #ifdef __cplusplus