fix: add debug mode

This commit is contained in:
dapan1121 2024-11-25 14:51:59 +08:00
parent 3dc61bf0f4
commit 2b342216e2
3 changed files with 146 additions and 43 deletions

View File

@ -23,6 +23,7 @@ extern "C" {
#include "os.h" #include "os.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "thash.h" #include "thash.h"
#include "tglobal.h"
#define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000 #define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500 #define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500
@ -481,11 +482,39 @@ enum {
} \ } \
} while (0) } while (0)
#define MP_CHECK_QUOTA(_pool, _job, _size) do { \
if (*(_pool)->cfg.jobQuota > 0) { \
int64_t cAllocSize = atomic_add_fetch_64(&(_job)->job.allocMemSize, (_size)); \
if (cAllocSize / 1048576UL > *(_pool)->cfg.jobQuota) { \
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \
(_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \
terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \
return NULL; \
} \
} \
if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32((_pool)->cfg.reserveSize) * 1048576UL) + (_size))) { \
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB", \
(_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), *(_pool)->cfg.reserveSize); \
(_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \
terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \
return NULL; \
} \
} while (0)
// direct // direct
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); void* mpDirectAlloc(SMemPool* pPool, SMPJob* pJob, int64_t size);
void* mpDirectAlignAlloc(SMemPool* pPool, SMPJob* pJob, uint32_t alignment, int64_t size);
void* mpDirectCalloc(SMemPool* pPool, SMPJob* pJob, int64_t num, int64_t size);
void mpDirectFree(SMemPool* pPool, SMPJob* pJob, void *ptr);
void* mpDirectRealloc(SMemPool* pPool, SMPJob* pJob, void* ptr, int64_t size);
void* mpDirectStrdup(SMemPool* pPool, SMPJob* pJob, const void* ptr);
void* mpDirectStrndup(SMemPool* pPool, SMPJob* pJob, const void* ptr, int64_t size);
int32_t mpDirectFullAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);
int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr); int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr);
void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); void mpDirectFullFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize);
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize); int32_t mpDirectFullRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize);
int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed); int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed);
// chunk // chunk
@ -499,7 +528,7 @@ int32_t mpChunkUpdateCfg(SMemPool* pPool);
int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes); int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes);
int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size); int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size);
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize); void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize);
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead); int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead);
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);

View File

@ -19,14 +19,112 @@
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) { void* mpDirectAlloc(SMemPool* pPool, SMPJob* pJob, int64_t size) {
MP_CHECK_QUOTA(pPool, pJob, size);
return taosMemMalloc(size);
}
void* mpDirectAlignAlloc(SMemPool* pPool, SMPJob* pJob, uint32_t alignment, int64_t size) {
MP_CHECK_QUOTA(pPool, pJob, size);
return taosMemMallocAlign(alignment, size);
}
void* mpDirectCalloc(SMemPool* pPool, SMPJob* pJob, int64_t num, int64_t size) {
int64_t tSize = num * size;
MP_CHECK_QUOTA(pPool, pJob, tSize);
return taosMemCalloc(num, size);
}
void mpDirectFree(SMemPool* pPool, SMPJob* pJob, void *ptr) {
taosMemFree(ptr);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, taosMemSize(ptr));
}
void* mpDirectRealloc(SMemPool* pPool, SMPJob* pJob, void* ptr, int64_t size) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == ptr) {
return mpDirectAlloc(pPool, pJob, size);
}
if (0 == size) {
mpDirectFree(pPool, pJob, ptr);
return NULL;
}
int64_t oSize = taosMemSize(ptr);
MP_CHECK_QUOTA(pPool, pJob, size - oSize);
return taosMemRealloc(ptr, size);
}
void* mpDirectStrdup(SMemPool* pPool, SMPJob* pJob, const void* ptr) {
if (NULL == ptr) {
return NULL;
}
int64_t oSize = strlen(ptr);
MP_CHECK_QUOTA(pPool, pJob, oSize);
return taosStrdupi(ptr);
}
void* mpDirectStrndup(SMemPool* pPool, SMPJob* pJob, const void* ptr, int64_t size) {
if (NULL == ptr) {
return NULL;
}
int64_t oSize = strlen(ptr);
MP_CHECK_QUOTA(pPool, pJob, TMIN(oSize, size) + 1);
return taosStrndupi(ptr, size);
}
int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) {
return taosMemSize(ptr);
}
void mpDirectFullFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
int64_t oSize = taosMemSize(ptr);
if (origSize) {
*origSize = oSize;
}
MP_LOCK(MP_READ, &pPool->cfgLock); // tmp test
taosMemFree(ptr);
if (NULL != pSession) {
(void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize);
}
(void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
MP_UNLOCK(MP_READ, &pPool->cfgLock);
}
int32_t mpDirectFullAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* res = NULL; void* res = NULL;
int64_t nSize = *size; int64_t nSize = *size;
MP_LOCK(MP_READ, &pPool->cfgLock); MP_LOCK(MP_READ, &pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size)); MP_ERR_JRET(mpChkFullQuota(pPool, pSession, *size));
res = alignment ? taosMemMallocAlign(alignment, *size) : taosMemMalloc(*size); res = alignment ? taosMemMallocAlign(alignment, *size) : taosMemMalloc(*size);
if (NULL != res) { if (NULL != res) {
@ -54,37 +152,13 @@ _return:
MP_RET(code); MP_RET(code);
} }
int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) { int32_t mpDirectFullRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
return taosMemSize(ptr);
}
void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
int64_t oSize = taosMemSize(ptr);
if (origSize) {
*origSize = oSize;
}
MP_LOCK(MP_READ, &pPool->cfgLock); // tmp test
taosMemFree(ptr);
if (NULL != pSession) {
(void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize);
}
(void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
MP_UNLOCK(MP_READ, &pPool->cfgLock);
}
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int64_t nSize = *size; int64_t nSize = *size;
MP_LOCK(MP_READ, &pPool->cfgLock); MP_LOCK(MP_READ, &pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size - *origSize)); MP_ERR_JRET(mpChkFullQuota(pPool, pSession, *size - *origSize));
*pPtr = taosMemRealloc(*pPtr, *size); *pPtr = taosMemRealloc(*pPtr, *size);
if (NULL != *pPtr) { if (NULL != *pPtr) {
@ -99,7 +173,7 @@ _return:
MP_UNLOCK(MP_READ, &pPool->cfgLock); MP_UNLOCK(MP_READ, &pPool->cfgLock);
if (code) { if (code) {
mpDirectFree(pPool, pSession, *pPtr, origSize); mpDirectFullFree(pPool, pSession, *pPtr, origSize);
*pPtr = NULL; *pPtr = NULL;
} }

View File

@ -29,7 +29,7 @@ threadlocal bool threadPoolEnabled = true;
SMemPoolMgmt gMPMgmt = {0}; SMemPoolMgmt gMPMgmt = {0};
SMPStrategyFp gMPFps[] = { SMPStrategyFp gMPFps[] = {
{NULL}, {NULL},
{NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, mpDirectTrim}, {NULL, mpDirectFullAlloc, mpDirectFullFree, mpDirectGetMemSize, mpDirectFullRealloc, NULL, NULL, mpDirectTrim},
{mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL} {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL}
}; };
@ -303,7 +303,7 @@ int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
} }
int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pSession) { if (NULL == pSession) {
(void)atomic_add_fetch_64(&pPool->allocMemSize, size); (void)atomic_add_fetch_64(&pPool->allocMemSize, size);
@ -1253,7 +1253,7 @@ _return:
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosMemMalloc(size); return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1283,7 +1283,7 @@ _return:
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosMemCalloc(num, size); return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1315,7 +1315,7 @@ _return:
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosMemRealloc(ptr, size); return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1366,7 +1366,7 @@ _return:
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosStrdupi(ptr); return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1402,7 +1402,7 @@ _return:
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosStrndupi(ptr, size); return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1440,7 +1440,7 @@ _return:
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
taosMemFree(ptr); mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr);
return; return;
} }
@ -1466,7 +1466,7 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName,
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosMemSize(ptr);; return taosMemSize(ptr);
} }
int64_t code = 0; int64_t code = 0;
@ -1488,7 +1488,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolDebug) {
return taosMemMallocAlign(alignment, size); return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;