From cea4be6f3c62963c53abf7925dc81a4fce29c3da Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 4 Dec 2023 13:40:28 +0800 Subject: [PATCH 1/2] fix: bg task memory waste --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbFS2.c | 31 +++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbFSet2.c | 1 + source/dnode/vnode/src/tsdb/tsdbFSet2.h | 1 + source/dnode/vnode/src/tsdb/tsdbMerge.c | 2 ++ source/dnode/vnode/src/tsdb/tsdbRetention.c | 5 ++++ source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 5 +--- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 22 ++++++++++++--- 8 files changed, 52 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 88362239f5..197952d190 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -377,6 +377,7 @@ struct STsdb { SVnode *pVnode; STsdbKeepCfg keepCfg; TdThreadMutex mutex; + bool bgTaskDisabled; SMemTable *mem; SMemTable *imem; STsdbFS fs; // old diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index add8da52e0..7322da6a23 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -745,31 +745,45 @@ _exit: return code; } -int32_t tsdbFSCancelAllBgTask(STFileSystem *fs) { +static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block); + +int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { + STFileSystem *fs = pTsdb->pFS; TARRAY2(int64_t) channelArr = {0}; - // collect all open channels taosThreadMutexLock(&fs->tsdb->mutex); + + // disable + pTsdb->bgTaskDisabled = true; + + // collect channel STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) { TARRAY2_APPEND(&channelArr, fset->bgTaskChannel); fset->bgTaskChannel = 0; } + fset->mergeScheduled = false; + tsdbFSSetBlockCommit(fset, false); } + taosThreadMutexUnlock(&fs->tsdb->mutex); // destroy all channels - int64_t channel; - TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); } - TARRAY2_DESTROY(&channelArr, NULL); + return 0; +} + +int32_t tsdbEnableBgTask(STsdb *pTsdb) { + taosThreadMutexLock(&pTsdb->mutex); + pTsdb->bgTaskDisabled = false; + taosThreadMutexUnlock(&pTsdb->mutex); return 0; } int32_t tsdbCloseFS(STFileSystem **fs) { if (fs[0] == NULL) return 0; - tsdbFSCancelAllBgTask(*fs); + tsdbDisableAndCancelAllBgTask((*fs)->tsdb); close_file_system(fs[0]); destroy_fs(fs); return 0; @@ -857,7 +871,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { // schedule merge int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger; - if (sttTrigger > 1) { + if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) { STFileSet *fset; TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { if (TARRAY2_SIZE(fset->lvlArr) == 0) { @@ -873,7 +887,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { bool skipMerge = false; int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); - if (numFile >= sttTrigger) { + if (numFile >= sttTrigger && (!fset->mergeScheduled)) { // launch merge { extern int8_t tsS3Enabled; @@ -917,6 +931,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL); TSDB_CHECK_CODE(code, lino, _exit); + fset->mergeScheduled = true; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 025671ff3d..1bf886e3b0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -453,6 +453,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { // background task queue fset[0]->bgTaskChannel = 0; + fset[0]->mergeScheduled = false; // block commit variables taosThreadCondInit(&fset[0]->canCommit, NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 32028db352..83f5b1e83c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -93,6 +93,7 @@ struct STFileSet { // background task channel int64_t bgTaskChannel; + bool mergeScheduled; // block commit variables TdThreadCond canCommit; diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index b47b951b2b..7f33f08794 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -514,6 +514,8 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) { return 0; } + fset->mergeScheduled = false; + int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); if (code) { taosThreadMutexUnlock(&merger->tsdb->mutex); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index d8f1ad7c6c..dc98f46ac5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -430,6 +430,11 @@ int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { taosThreadMutexLock(&tsdb->mutex); + if (tsdb->bgTaskDisabled) { + taosThreadMutexUnlock(&tsdb->mutex); + return 0; + } + STFileSet *fset; TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { code = tsdbTFileSetOpenChannel(fset); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index faea881f72..e757daa0af 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1602,7 +1602,4 @@ _out: } return code; -} - -extern int32_t tsdbFSCancelAllBgTask(STFileSystem* fs); -int32_t tsdbCancelAllBgTask(STsdb* tsdb) { return tsdbFSCancelAllBgTask(tsdb->pFS); } \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index f2ef11e9ed..34b508388f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -519,7 +519,21 @@ _out: return code; } -extern int32_t tsdbCancelAllBgTask(STsdb *tsdb); +extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb); +extern int32_t tsdbEnableBgTask(STsdb *pTsdb); + +static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) { + tsdbDisableAndCancelAllBgTask(pVnode->pTsdb); + vnodeSyncCommit(pVnode); + vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true); + return 0; +} + +static int32_t vnodeEnableBgTask(SVnode *pVnode) { + tsdbEnableBgTask(pVnode->pTsdb); + vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel); + return 0; +} int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t code = 0; @@ -527,9 +541,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter int64_t sver = pParam->start; int64_t ever = pParam->end; - // commit memory data - vnodeSyncCommit(pVnode); - tsdbCancelAllBgTask(pVnode->pTsdb); + // cancel and disable all bg task + vnodeCancelAndDisableAllBgTask(pVnode); // alloc pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); @@ -659,6 +672,7 @@ _exit: vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); taosMemoryFree(pWriter); } + vnodeEnableBgTask(pVnode); return code; } From ac2d9ebc271a99e4cb012a852db0b5446c71d0f2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 4 Dec 2023 16:48:43 +0800 Subject: [PATCH 2/2] fix: memory leak --- source/dnode/vnode/src/tsdb/tsdbFS2.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 7322da6a23..635c53bbed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -770,6 +770,9 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { taosThreadMutexUnlock(&fs->tsdb->mutex); // destroy all channels + int64_t channel; + TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); } + TARRAY2_DESTROY(&channelArr, NULL); return 0; }