fix: async launch trim

This commit is contained in:
dapan1121 2024-12-05 18:41:38 +08:00
parent 011ff04234
commit aa7b0e1a61
5 changed files with 20 additions and 14 deletions

View File

@ -79,6 +79,7 @@ extern int8_t tsMemPoolFullFunc;
//extern int32_t tsQueryBufferPoolSize; //extern int32_t tsQueryBufferPoolSize;
extern int32_t tsMinReservedMemorySize; extern int32_t tsMinReservedMemorySize;
extern int64_t tsCurrentAvailMemorySize; extern int64_t tsCurrentAvailMemorySize;
extern int8_t tsNeedTrim;
extern int32_t tsQueryNoFetchTimeoutSec; extern int32_t tsQueryNoFetchTimeoutSec;
extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfQueryThreads;
extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcThreads;

View File

@ -60,6 +60,7 @@ int32_t tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB
int32_t tsMinReservedMemorySize = 0; //MB int32_t tsMinReservedMemorySize = 0; //MB
int64_t tsCurrentAvailMemorySize = 0; int64_t tsCurrentAvailMemorySize = 0;
int8_t tsNeedTrim = 0;
// queue & threads // queue & threads
int32_t tsQueryMinConcurrentTaskNum = 1; int32_t tsQueryMinConcurrentTaskNum = 1;

View File

@ -205,6 +205,9 @@ static void *dmMonitorThreadFp(void *param) {
taosMemoryTrim(0, NULL); taosMemoryTrim(0, NULL);
} }
} }
if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
taosMemoryTrim(0, NULL);
}
} }
return NULL; return NULL;

View File

@ -490,7 +490,7 @@ enum {
if ((cAllocSize / 1048576L) > *(_pool)->cfg.jobQuota) { \ if ((cAllocSize / 1048576L) > *(_pool)->cfg.jobQuota) { \
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \ uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \
(_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \ (_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \
atomic_store_8(&gMPMgmt.needTrim, 1); \ mpSchedTrim(NULL); \
terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \ terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \
return NULL; \ return NULL; \
} \ } \
@ -499,7 +499,7 @@ enum {
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", \ uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", \
(_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), (_pool)->cfg.reserveSize); \ (_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), (_pool)->cfg.reserveSize); \
(_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \ (_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \
atomic_store_8(&gMPMgmt.needTrim, 1); \ mpSchedTrim(NULL); \
terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \ terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \
return NULL; \ return NULL; \
} \ } \
@ -536,6 +536,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size);
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize); void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize);
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead); int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead);
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);
void mpSchedTrim(int64_t* loopTimes);

View File

@ -323,7 +323,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD;
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota); uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota);
pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
atomic_store_8(&gMPMgmt.needTrim, 1); mpSchedTrim(NULL);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code); MP_RET(code);
} }
@ -333,7 +333,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes",
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize); pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
atomic_store_8(&gMPMgmt.needTrim, 1); mpSchedTrim(NULL);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code); MP_RET(code);
} }
@ -1032,15 +1032,15 @@ void mpUpdateSystemAvailableMemorySize() {
uDebug("system available memory size: %" PRId64, sysAvailSize); uDebug("system available memory size: %" PRId64, sysAvailSize);
} }
void mpLaunchTrim(int64_t* loopTimes) { void mpSchedTrim(int64_t* loopTimes) {
static int64_t trimTimes = 0; static int64_t trimTimes = 0;
taosMemTrim(0, NULL); atomic_store_8(&tsNeedTrim, 1);
if (loopTimes) {
*loopTimes = 0;
}
atomic_store_8(&gMPMgmt.needTrim, 0); uDebug("%" PRId64 "th memory trim scheduled", ++trimTimes);
*loopTimes = 0;
uDebug("%" PRId64 "th memory trim launched", ++trimTimes);
} }
void* mpMgmtThreadFunc(void* param) { void* mpMgmtThreadFunc(void* param) {
@ -1055,11 +1055,11 @@ void* mpMgmtThreadFunc(void* param) {
if (retireSize > 0) { if (retireSize > 0) {
(*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
mpLaunchTrim(&loopTimes); mpSchedTrim(&loopTimes);
} }
if ((0 == (++loopTimes) % 500) || atomic_load_8(&gMPMgmt.needTrim)) { if (0 == (++loopTimes) % 500) {
mpLaunchTrim(&loopTimes); mpSchedTrim(&loopTimes);
} }
taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS); taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS);
@ -1695,7 +1695,7 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t
} }
void taosMemPoolSchedTrim(void) { void taosMemPoolSchedTrim(void) {
atomic_store_8(&gMPMgmt.needTrim, 1); mpSchedTrim(NULL);
} }
int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) { int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {