From 2b342216e28e4c4aa065205dd96813a7a6e53adc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 25 Nov 2024 14:51:59 +0800 Subject: [PATCH] fix: add debug mode --- source/util/inc/tmempoolInt.h | 37 ++++++++-- source/util/src/mpDirect.c | 132 ++++++++++++++++++++++++++-------- source/util/src/tmempool.c | 20 +++--- 3 files changed, 146 insertions(+), 43 deletions(-) diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 7ed6be042d..ce84ff5331 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -23,6 +23,7 @@ extern "C" { #include "os.h" #include "tlockfree.h" #include "thash.h" +#include "tglobal.h" #define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000 #define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500 @@ -481,11 +482,39 @@ enum { } \ } while (0) +#define MP_CHECK_QUOTA(_pool, _job, _size) do { \ + if (*(_pool)->cfg.jobQuota > 0) { \ + int64_t cAllocSize = atomic_add_fetch_64(&(_job)->job.allocMemSize, (_size)); \ + if (cAllocSize / 1048576UL > *(_pool)->cfg.jobQuota) { \ + uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \ + (_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \ + terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \ + return NULL; \ + } \ + } \ + if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32((_pool)->cfg.reserveSize) * 1048576UL) + (_size))) { \ + uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB", \ + (_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), *(_pool)->cfg.reserveSize); \ + (_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \ + terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \ + return NULL; \ + } \ + } while (0) + + // direct -int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); +void* mpDirectAlloc(SMemPool* pPool, SMPJob* pJob, int64_t size); +void* mpDirectAlignAlloc(SMemPool* pPool, SMPJob* pJob, uint32_t alignment, int64_t size); +void* mpDirectCalloc(SMemPool* pPool, SMPJob* pJob, int64_t num, int64_t size); +void mpDirectFree(SMemPool* pPool, SMPJob* pJob, void *ptr); +void* mpDirectRealloc(SMemPool* pPool, SMPJob* pJob, void* ptr, int64_t size); +void* mpDirectStrdup(SMemPool* pPool, SMPJob* pJob, const void* ptr); +void* mpDirectStrndup(SMemPool* pPool, SMPJob* pJob, const void* ptr, int64_t size); + +int32_t mpDirectFullAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr); -void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); -int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize); +void mpDirectFullFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); +int32_t mpDirectFullRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize); int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed); // chunk @@ -499,7 +528,7 @@ int32_t mpChunkUpdateCfg(SMemPool* pPool); int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes); -int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size); +int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size); void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize); int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead); int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c index 61e8d976e0..acc1bc8dd0 100755 --- a/source/util/src/mpDirect.c +++ b/source/util/src/mpDirect.c @@ -19,14 +19,112 @@ #include "tlog.h" #include "tutil.h" -int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) { +void* mpDirectAlloc(SMemPool* pPool, SMPJob* pJob, int64_t size) { + MP_CHECK_QUOTA(pPool, pJob, size); + + return taosMemMalloc(size); +} + + +void* mpDirectAlignAlloc(SMemPool* pPool, SMPJob* pJob, uint32_t alignment, int64_t size) { + MP_CHECK_QUOTA(pPool, pJob, size); + + return taosMemMallocAlign(alignment, size); +} + + +void* mpDirectCalloc(SMemPool* pPool, SMPJob* pJob, int64_t num, int64_t size) { + int64_t tSize = num * size; + MP_CHECK_QUOTA(pPool, pJob, tSize); + + return taosMemCalloc(num, size); +} + +void mpDirectFree(SMemPool* pPool, SMPJob* pJob, void *ptr) { + taosMemFree(ptr); + + (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, taosMemSize(ptr)); +} + + +void* mpDirectRealloc(SMemPool* pPool, SMPJob* pJob, void* ptr, int64_t size) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == ptr) { + return mpDirectAlloc(pPool, pJob, size); + } + + if (0 == size) { + mpDirectFree(pPool, pJob, ptr); + return NULL; + } + + int64_t oSize = taosMemSize(ptr); + + MP_CHECK_QUOTA(pPool, pJob, size - oSize); + + return taosMemRealloc(ptr, size); +} + +void* mpDirectStrdup(SMemPool* pPool, SMPJob* pJob, const void* ptr) { + if (NULL == ptr) { + return NULL; + } + + int64_t oSize = strlen(ptr); + MP_CHECK_QUOTA(pPool, pJob, oSize); + + return taosStrdupi(ptr); +} + +void* mpDirectStrndup(SMemPool* pPool, SMPJob* pJob, const void* ptr, int64_t size) { + if (NULL == ptr) { + return NULL; + } + + int64_t oSize = strlen(ptr); + MP_CHECK_QUOTA(pPool, pJob, TMIN(oSize, size) + 1); + + return taosStrndupi(ptr, size); +} + + + + +int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) { + return taosMemSize(ptr); +} + +void mpDirectFullFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { + int64_t oSize = taosMemSize(ptr); + if (origSize) { + *origSize = oSize; + } + + MP_LOCK(MP_READ, &pPool->cfgLock); // tmp test + + taosMemFree(ptr); + + if (NULL != pSession) { + (void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize); + (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize); + } + + (void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize); + + MP_UNLOCK(MP_READ, &pPool->cfgLock); +} + + + +int32_t mpDirectFullAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; void* res = NULL; int64_t nSize = *size; MP_LOCK(MP_READ, &pPool->cfgLock); - MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size)); + MP_ERR_JRET(mpChkFullQuota(pPool, pSession, *size)); res = alignment ? taosMemMallocAlign(alignment, *size) : taosMemMalloc(*size); if (NULL != res) { @@ -54,37 +152,13 @@ _return: MP_RET(code); } -int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) { - return taosMemSize(ptr); -} - -void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { - int64_t oSize = taosMemSize(ptr); - if (origSize) { - *origSize = oSize; - } - - MP_LOCK(MP_READ, &pPool->cfgLock); // tmp test - - taosMemFree(ptr); - - if (NULL != pSession) { - (void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize); - (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize); - } - - (void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize); - - MP_UNLOCK(MP_READ, &pPool->cfgLock); -} - -int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) { +int32_t mpDirectFullRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; int64_t nSize = *size; MP_LOCK(MP_READ, &pPool->cfgLock); - MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size - *origSize)); + MP_ERR_JRET(mpChkFullQuota(pPool, pSession, *size - *origSize)); *pPtr = taosMemRealloc(*pPtr, *size); if (NULL != *pPtr) { @@ -99,7 +173,7 @@ _return: MP_UNLOCK(MP_READ, &pPool->cfgLock); if (code) { - mpDirectFree(pPool, pSession, *pPtr, origSize); + mpDirectFullFree(pPool, pSession, *pPtr, origSize); *pPtr = NULL; } diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 559fc16e53..5c7ee9e19c 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -29,7 +29,7 @@ threadlocal bool threadPoolEnabled = true; SMemPoolMgmt gMPMgmt = {0}; SMPStrategyFp gMPFps[] = { {NULL}, - {NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, mpDirectTrim}, + {NULL, mpDirectFullAlloc, mpDirectFullFree, mpDirectGetMemSize, mpDirectFullRealloc, NULL, NULL, mpDirectTrim}, {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL} }; @@ -303,7 +303,7 @@ int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { } -int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { +int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == pSession) { (void)atomic_add_fetch_64(&pPool->allocMemSize, size); @@ -1253,7 +1253,7 @@ _return: void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosMemMalloc(size); + return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size); } int32_t code = TSDB_CODE_SUCCESS; @@ -1283,7 +1283,7 @@ _return: void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosMemCalloc(num, size); + return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size); } int32_t code = TSDB_CODE_SUCCESS; @@ -1315,7 +1315,7 @@ _return: void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosMemRealloc(ptr, size); + return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size); } int32_t code = TSDB_CODE_SUCCESS; @@ -1366,7 +1366,7 @@ _return: char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosStrdupi(ptr); + return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr); } int32_t code = TSDB_CODE_SUCCESS; @@ -1402,7 +1402,7 @@ _return: char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosStrndupi(ptr, size); + return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size); } int32_t code = TSDB_CODE_SUCCESS; @@ -1440,7 +1440,7 @@ _return: void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - taosMemFree(ptr); + mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr); return; } @@ -1466,7 +1466,7 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosMemSize(ptr);; + return taosMemSize(ptr); } int64_t code = 0; @@ -1488,7 +1488,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { if (0 == tsMemPoolDebug) { - return taosMemMallocAlign(alignment, size); + return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size); } int32_t code = TSDB_CODE_SUCCESS;