From aa7b0e1a61b6301b59846ec9367a83a017f0f5eb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 5 Dec 2024 18:41:38 +0800 Subject: [PATCH] fix: async launch trim --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 1 + source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 3 +++ source/util/inc/tmempoolInt.h | 5 +++-- source/util/src/tmempool.c | 24 ++++++++++----------- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 833094fb8f..21b9f14276 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -79,6 +79,7 @@ extern int8_t tsMemPoolFullFunc; //extern int32_t tsQueryBufferPoolSize; extern int32_t tsMinReservedMemorySize; extern int64_t tsCurrentAvailMemorySize; +extern int8_t tsNeedTrim; extern int32_t tsQueryNoFetchTimeoutSec; extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfRpcThreads; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f60b30823f..2d412f4e53 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,6 +60,7 @@ int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB int32_t tsMinReservedMemorySize = 0; //MB int64_t tsCurrentAvailMemorySize = 0; +int8_t tsNeedTrim = 0; // queue & threads int32_t tsQueryMinConcurrentTaskNum = 1; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index f9436f89b7..86522a8791 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -205,6 +205,9 @@ static void *dmMonitorThreadFp(void *param) { taosMemoryTrim(0, NULL); } } + if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) { + taosMemoryTrim(0, NULL); + } } return NULL; diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 70e0425244..91296067cf 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -490,7 +490,7 @@ enum { if ((cAllocSize / 1048576L) > *(_pool)->cfg.jobQuota) { \ uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \ (_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \ - atomic_store_8(&gMPMgmt.needTrim, 1); \ + mpSchedTrim(NULL); \ terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \ return NULL; \ } \ @@ -499,7 +499,7 @@ enum { uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", \ (_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), (_pool)->cfg.reserveSize); \ (_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \ - atomic_store_8(&gMPMgmt.needTrim, 1); \ + mpSchedTrim(NULL); \ terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \ return NULL; \ } \ @@ -536,6 +536,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size); void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize); int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead); int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); +void mpSchedTrim(int64_t* loopTimes); diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 18fa2f2be5..ff3caba012 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -323,7 +323,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) { code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota); pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); - atomic_store_8(&gMPMgmt.needTrim, 1); + mpSchedTrim(NULL); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } @@ -333,7 +333,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) { uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize); pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); - atomic_store_8(&gMPMgmt.needTrim, 1); + mpSchedTrim(NULL); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } @@ -1032,15 +1032,15 @@ void mpUpdateSystemAvailableMemorySize() { uDebug("system available memory size: %" PRId64, sysAvailSize); } -void mpLaunchTrim(int64_t* loopTimes) { +void mpSchedTrim(int64_t* loopTimes) { static int64_t trimTimes = 0; - taosMemTrim(0, NULL); + atomic_store_8(&tsNeedTrim, 1); + if (loopTimes) { + *loopTimes = 0; + } - atomic_store_8(&gMPMgmt.needTrim, 0); - *loopTimes = 0; - - uDebug("%" PRId64 "th memory trim launched", ++trimTimes); + uDebug("%" PRId64 "th memory trim scheduled", ++trimTimes); } void* mpMgmtThreadFunc(void* param) { @@ -1055,11 +1055,11 @@ void* mpMgmtThreadFunc(void* param) { if (retireSize > 0) { (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); - mpLaunchTrim(&loopTimes); + mpSchedTrim(&loopTimes); } - if ((0 == (++loopTimes) % 500) || atomic_load_8(&gMPMgmt.needTrim)) { - mpLaunchTrim(&loopTimes); + if (0 == (++loopTimes) % 500) { + mpSchedTrim(&loopTimes); } taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS); @@ -1695,7 +1695,7 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t } void taosMemPoolSchedTrim(void) { - atomic_store_8(&gMPMgmt.needTrim, 1); + mpSchedTrim(NULL); } int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {