Merge pull request #23920 from taosdata/fix/TD-27619

fix: bg task memory waste
This commit is contained in:
Hongze Cheng 2023-12-04 18:10:30 +08:00 committed by GitHub
commit 26774f87c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 52 additions and 13 deletions

View File

@ -377,6 +377,7 @@ struct STsdb {
SVnode *pVnode; SVnode *pVnode;
STsdbKeepCfg keepCfg; STsdbKeepCfg keepCfg;
TdThreadMutex mutex; TdThreadMutex mutex;
bool bgTaskDisabled;
SMemTable *mem; SMemTable *mem;
SMemTable *imem; SMemTable *imem;
STsdbFS fs; // old STsdbFS fs; // old

View File

@ -745,18 +745,28 @@ _exit:
return code; return code;
} }
int32_t tsdbFSCancelAllBgTask(STFileSystem *fs) { static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block);
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
STFileSystem *fs = pTsdb->pFS;
TARRAY2(int64_t) channelArr = {0}; TARRAY2(int64_t) channelArr = {0};
// collect all open channels
taosThreadMutexLock(&fs->tsdb->mutex); taosThreadMutexLock(&fs->tsdb->mutex);
// disable
pTsdb->bgTaskDisabled = true;
// collect channel
STFileSet *fset; STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) { if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
TARRAY2_APPEND(&channelArr, fset->bgTaskChannel); TARRAY2_APPEND(&channelArr, fset->bgTaskChannel);
fset->bgTaskChannel = 0; fset->bgTaskChannel = 0;
} }
fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false);
} }
taosThreadMutexUnlock(&fs->tsdb->mutex); taosThreadMutexUnlock(&fs->tsdb->mutex);
// destroy all channels // destroy all channels
@ -766,10 +776,17 @@ int32_t tsdbFSCancelAllBgTask(STFileSystem *fs) {
return 0; return 0;
} }
int32_t tsdbEnableBgTask(STsdb *pTsdb) {
taosThreadMutexLock(&pTsdb->mutex);
pTsdb->bgTaskDisabled = false;
taosThreadMutexUnlock(&pTsdb->mutex);
return 0;
}
int32_t tsdbCloseFS(STFileSystem **fs) { int32_t tsdbCloseFS(STFileSystem **fs) {
if (fs[0] == NULL) return 0; if (fs[0] == NULL) return 0;
tsdbFSCancelAllBgTask(*fs); tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
close_file_system(fs[0]); close_file_system(fs[0]);
destroy_fs(fs); destroy_fs(fs);
return 0; return 0;
@ -857,7 +874,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
// schedule merge // schedule merge
int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger; int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
if (sttTrigger > 1) { if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
STFileSet *fset; STFileSet *fset;
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) { if (TARRAY2_SIZE(fset->lvlArr) == 0) {
@ -873,7 +890,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
bool skipMerge = false; bool skipMerge = false;
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
if (numFile >= sttTrigger) { if (numFile >= sttTrigger && (!fset->mergeScheduled)) {
// launch merge // launch merge
{ {
extern int8_t tsS3Enabled; extern int8_t tsS3Enabled;
@ -917,6 +934,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree,
arg, NULL); arg, NULL);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true;
} }
} }

View File

@ -453,6 +453,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
// background task queue // background task queue
fset[0]->bgTaskChannel = 0; fset[0]->bgTaskChannel = 0;
fset[0]->mergeScheduled = false;
// block commit variables // block commit variables
taosThreadCondInit(&fset[0]->canCommit, NULL); taosThreadCondInit(&fset[0]->canCommit, NULL);

View File

@ -93,6 +93,7 @@ struct STFileSet {
// background task channel // background task channel
int64_t bgTaskChannel; int64_t bgTaskChannel;
bool mergeScheduled;
// block commit variables // block commit variables
TdThreadCond canCommit; TdThreadCond canCommit;

View File

@ -514,6 +514,8 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) {
return 0; return 0;
} }
fset->mergeScheduled = false;
int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
if (code) { if (code) {
taosThreadMutexUnlock(&merger->tsdb->mutex); taosThreadMutexUnlock(&merger->tsdb->mutex);

View File

@ -430,6 +430,11 @@ int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
taosThreadMutexLock(&tsdb->mutex); taosThreadMutexLock(&tsdb->mutex);
if (tsdb->bgTaskDisabled) {
taosThreadMutexUnlock(&tsdb->mutex);
return 0;
}
STFileSet *fset; STFileSet *fset;
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
code = tsdbTFileSetOpenChannel(fset); code = tsdbTFileSetOpenChannel(fset);

View File

@ -1602,7 +1602,4 @@ _out:
} }
return code; return code;
} }
extern int32_t tsdbFSCancelAllBgTask(STFileSystem* fs);
int32_t tsdbCancelAllBgTask(STsdb* tsdb) { return tsdbFSCancelAllBgTask(tsdb->pFS); }

View File

@ -519,7 +519,21 @@ _out:
return code; return code;
} }
extern int32_t tsdbCancelAllBgTask(STsdb *tsdb); extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
vnodeSyncCommit(pVnode);
vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true);
return 0;
}
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
tsdbEnableBgTask(pVnode->pTsdb);
vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel);
return 0;
}
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
@ -527,9 +541,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
int64_t sver = pParam->start; int64_t sver = pParam->start;
int64_t ever = pParam->end; int64_t ever = pParam->end;
// commit memory data // cancel and disable all bg task
vnodeSyncCommit(pVnode); vnodeCancelAndDisableAllBgTask(pVnode);
tsdbCancelAllBgTask(pVnode->pTsdb);
// alloc // alloc
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
@ -659,6 +672,7 @@ _exit:
vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
} }
vnodeEnableBgTask(pVnode);
return code; return code;
} }