From efff4e77f0fa1343595ba62265a273a3974c8a1a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 5 Jun 2023 09:36:26 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h | 1 + source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 4 -- source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 45 +++++++++++++++++++- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 8 ++-- 5 files changed, 49 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index e3c7141263..0b947bb285 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -304,7 +304,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); // tsdbMerge.c ============================================================================================== -int32_t tsdbMerge(STsdb *pTsdb); +int32_t tsdbMerge(void *arg); // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h index 093ed39a99..e04404ec8f 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h @@ -53,6 +53,7 @@ struct STFileSystem { int32_t state; int64_t neid; EFEditT etype; + bool mergeTaskOn; TFileSetArray fSetArr[1]; TFileSetArray fSetArrTmp[1]; }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 7a6eff4bac..1807f7fa24 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -473,10 +473,6 @@ int32_t tsdbCommitCommit(STsdb *tsdb) { taosThreadRwlockUnlock(&tsdb->rwLock); tsdbUnrefMemTable(pMemTable, NULL, true); - // TODO: make this call async - code = tsdbMerge(tsdb); - TSDB_CHECK_CODE(code, lino, _exit); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index aa86a19878..2014f2d6dc 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -15,6 +15,8 @@ #include "inc/tsdbFS.h" +extern int vnodeScheduleTask(int (*execute)(void *), void *arg); + #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) @@ -45,6 +47,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { tsem_init(&fs[0]->canEdit, 0, 1); fs[0]->state = TSDB_FS_STATE_NONE; fs[0]->neid = 0; + fs[0]->mergeTaskOn = false; TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArrTmp); @@ -584,8 +587,46 @@ _exit: } int32_t tsdbFSEditCommit(STFileSystem *fs) { - int32_t code = commit_edit(fs); - tsem_post(&fs->canEdit); + int32_t code = 0; + int32_t lino = 0; + + // commit + code = commit_edit(fs); + TSDB_CHECK_CODE(code, lino, _exit); + + if (fs->etype == TSDB_FEDIT_MERGE) { + ASSERT(fs->mergeTaskOn); + fs->mergeTaskOn = false; + } + + // check if need to merge + if (fs->mergeTaskOn == false) { + STFileSet *fset; + TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { + if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; + + SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr); + if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) == 0) continue; + + STFileObj *fobj = TARRAY2_FIRST(lvl0->fobjArr); + + if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue; + + code = vnodeScheduleTask(tsdbMerge, fs->tsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + fs->mergeTaskOn = true; + + break; + } + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); + } else { + tsem_post(&fs->canEdit); + } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index f2fef837eb..6a02f0d825 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -611,10 +611,10 @@ _exit: return code; } -int32_t tsdbMerge(STsdb *tsdb) { +int32_t tsdbMerge(void *arg) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(tsdb->pVnode); + STsdb *tsdb = (STsdb *)arg; SMerger merger[1] = {{ .tsdb = tsdb, @@ -631,9 +631,9 @@ int32_t tsdbMerge(STsdb *tsdb) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else if (merger->ctx->opened) { - tsdbDebug("vgId:%d %s done", vid, __func__); + tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); } return code; }