diff --git a/include/common/tglobal.h b/include/common/tglobal.h index d2b4d3066b..9cfc6eede5 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -75,6 +75,7 @@ extern int32_t tsQueryMaxConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsSingleQueryMaxMemorySize; extern int8_t tsQueryUseMemoryPool; +extern int8_t tsMemPoolDebug; //extern int32_t tsQueryBufferPoolSize; extern int32_t tsMinReservedMemorySize; extern int64_t tsCurrentAvailMemorySize; diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 126660ab61..683e37a0fa 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -152,24 +152,25 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp); extern void* gMemPoolHandle; extern threadlocal void* threadPoolSession; extern threadlocal bool threadPoolEnabled; +extern int8_t tsMemPoolDebug; #define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) #define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) -#define taosEnableMemPoolUsage() do { threadPoolEnabled = true; tsEnableRandErr = true;} while (0) -#define taosDisableMemPoolUsage() do { threadPoolEnabled = false; tsEnableRandErr = false;} while (0) +#define taosSaveDisableMemPoolUsage(_enable, _randErr) do { (_enable) = threadPoolEnabled; (_randErr) = tsEnableRandErr; threadPoolEnabled = false; tsEnableRandErr = false;} while (0) +#define taosRestoreEnableMemPoolUsage(_enable, _randErr) do { threadPoolEnabled = (_enable); tsEnableRandErr = (_randErr);} while (0) -#define taosMemoryMalloc(_size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) -#define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) -#define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) -#define taosStrdup(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) -#define taosStrndup(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) -#define taosMemoryFree(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) -#define taosMemorySize(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) -#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) +#define taosMemoryMalloc(_size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) +#define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) +#define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) +#define taosStrdup(_ptr) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define taosStrndup(_ptr, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) +#define taosMemoryFree(_ptr) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) +#define taosMemorySize(_ptr) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) +#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) +#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #else #define taosEnableMemoryPoolUsage(_pool, _session) #define taosDisableMemoryPoolUsage() diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b429b0507e..995b592b3c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -54,6 +54,7 @@ int32_t tsMaxShellConns = 50000; int32_t tsShellActivityTimer = 3; // second // memory pool +int8_t tsMemPoolDebug = 0; int8_t tsQueryUseMemoryPool = 1; int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 26320c49a3..559fc16e53 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -278,8 +278,8 @@ void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int6 mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize); } - int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize); if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOG_MAXSIZE)) { + int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize); mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize); } } @@ -902,6 +902,8 @@ _return: void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) { + bool enablePool = false, randErr = false; + switch (item) { case E_MP_STAT_LOG_MEM_MALLOC: case E_MP_STAT_LOG_MEM_CALLOC: @@ -917,14 +919,14 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt mpLogDetailStat(&pPool->stat.statDetail, item, pInput); } if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { - taosDisableMemPoolUsage(); + taosSaveDisableMemPoolUsage(enablePool, randErr); mpLogPosStat(&pSession->stat.posStat, item, pInput, true); - taosEnableMemPoolUsage(); + taosRestoreEnableMemPoolUsage(enablePool, randErr); } if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { - taosDisableMemPoolUsage(); + taosSaveDisableMemPoolUsage(enablePool, randErr); mpLogPosStat(&pPool->stat.posStat, item, pInput, false); - taosEnableMemPoolUsage(); + taosRestoreEnableMemPoolUsage(enablePool, randErr); } break; } @@ -1250,6 +1252,10 @@ _return: void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosMemMalloc(size); + } + int32_t code = TSDB_CODE_SUCCESS; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; @@ -1276,6 +1282,10 @@ _return: } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosMemCalloc(num, size); + } + int32_t code = TSDB_CODE_SUCCESS; int64_t totalSize = num * size; SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; @@ -1304,6 +1314,10 @@ _return: } void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosMemRealloc(ptr, size); + } + int32_t code = TSDB_CODE_SUCCESS; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr}; @@ -1351,6 +1365,10 @@ _return: } char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosStrdupi(ptr); + } + int32_t code = TSDB_CODE_SUCCESS; int64_t size = (ptr ? strlen(ptr) : 0) + 1; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; @@ -1383,6 +1401,10 @@ _return: } char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosStrndupi(ptr, size); + } + int32_t code = TSDB_CODE_SUCCESS; int64_t origSize = ptr ? strlen(ptr) : 0; size = TMIN(size, origSize) + 1; @@ -1417,6 +1439,11 @@ _return: void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + taosMemFree(ptr); + return; + } + if (NULL == ptr) { return; } @@ -1438,6 +1465,10 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, } int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosMemSize(ptr);; + } + int64_t code = 0; if (NULL == poolHandle || NULL == fileName) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p", @@ -1456,6 +1487,10 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha } void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { + if (0 == tsMemPoolDebug) { + return taosMemMallocAlign(alignment, size); + } + int32_t code = TSDB_CODE_SUCCESS; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; @@ -1511,6 +1546,10 @@ void taosMemPoolModDestroy(void) { int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) { + if (0 == tsMemPoolDebug) { + return taosMemTrim(size, trimed); + } + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == fileName || size < 0) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index dff7e9e578..35c6501229 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -188,6 +188,7 @@ typedef struct { typedef struct { int32_t jobQuota; + bool enableMemPool; bool reserveMode; int64_t upperLimitSize; int32_t reserveSize; //MB @@ -474,7 +475,10 @@ void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) { SMPTestTaskCtx* pTask = &pJobCtx->taskCtxs[taskIdx]; - mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[taskIdx]); + if (mptCtx.param.enableMemPool) { + mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[taskIdx]); + } + for (int32_t i = 0; i < pTask->memIdx; ++i) { pTask->stat.times.memFree.exec++; pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p); @@ -482,8 +486,11 @@ void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) { mptMemoryFree(pTask->pMemList[i].p); pTask->pMemList[i].p = NULL; } - mptDisableMemoryPoolUsage(); - + + if (mptCtx.param.enableMemPool) { + mptDisableMemoryPoolUsage(); + } + mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[taskIdx].taskId, 0, taskIdx, pJobCtx, pJobCtx->pSessions[taskIdx]); pJobCtx->pSessions[taskIdx] = NULL; @@ -614,13 +621,15 @@ void mptInitJob(int32_t idx) { void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) { - SMPStatDetail* pStat = NULL; - int64_t allocSize = 0; - taosMemPoolGetSessionStat(pJobCtx->pSessions[taskIdx], &pStat, &allocSize, NULL); - int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); - - assert(allocSize == usedSize); - assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[taskIdx].stat, sizeof(*pStat))); + if (mptCtx.param.enableMemPool) { + SMPStatDetail* pStat = NULL; + int64_t allocSize = 0; + taosMemPoolGetSessionStat(pJobCtx->pSessions[taskIdx], &pStat, &allocSize, NULL); + int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); + + assert(allocSize == usedSize); + assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[taskIdx].stat, sizeof(*pStat))); + } mptDestroyTaskCtx(pJobCtx, taskIdx); } @@ -1147,10 +1156,16 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32 } atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1); + + if (mptCtx.param.enableMemPool) { + mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]); + } - mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]); mptSimulateTask(pJobCtx, pCtx, actTimes); - mptDisableMemoryPoolUsage(); + + if (mptCtx.param.enableMemPool) { + mptDisableMemoryPoolUsage(); + } // if (!atomic_load_8(&pJobCtx->pJob->retired)) { mptSimulateOutTask(pJobCtx, pCtx); @@ -1345,7 +1360,7 @@ void* mptRunThreadFunc(void* param) { MPT_PRINTF("Thread %d finish the %dth exection\n", pThread->idx, n); - if (mptCtx.param.threadNum <= 1) { + if (mptCtx.param.threadNum <= 1 && mptCtx.param.enableMemPool) { mptCheckPoolUsedSize(mptJobNum); } } @@ -1576,14 +1591,15 @@ TEST(FuncTest, SingleThreadTest) { } #endif -#if 1 -TEST(FuncTest, MultiThreadTest) { +#if 0 +TEST(EnablePoolFuncTest, MultiThreadTest) { char* caseName = "FuncTest:MultiThreadTest"; SMPTestParam param = {0}; param.reserveMode = true; param.threadNum = 6; param.jobQuota = 1024; param.randTask = true; + param.enableMemPool = true; mptPrintTestBeginInfo(caseName, ¶m); @@ -1594,6 +1610,26 @@ TEST(FuncTest, MultiThreadTest) { } #endif +#if 1 +TEST(DisablePoolFuncTest, MultiThreadTest) { + char* caseName = "FuncTest:MultiThreadTest"; + SMPTestParam param = {0}; + param.reserveMode = true; + param.threadNum = 6; + param.jobQuota = 1024; + param.randTask = true; + param.enableMemPool = false; + + mptPrintTestBeginInfo(caseName, ¶m); + + for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { + mptRunCase(¶m, i); + } + +} +#endif + + #endif