From 84559495754e62f1bb1c503d450b2dca62470a16 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 25 Nov 2024 15:31:29 +0800 Subject: [PATCH] enh: add memory full func mode --- include/common/tglobal.h | 2 +- include/os/osMemPool.h | 6 ++-- source/common/src/tglobal.c | 2 +- source/libs/qworker/inc/qwInt.h | 4 +-- source/libs/qworker/src/qwUtil.c | 4 +-- source/libs/qworker/src/qworker.c | 12 +++---- source/util/src/tmempool.c | 18 +++++------ source/util/test/memPoolTest.cpp | 54 ++++++++++++++++++++++++++++--- 8 files changed, 74 insertions(+), 28 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 9cfc6eede5..ce29c60f86 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -75,7 +75,7 @@ extern int32_t tsQueryMaxConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsSingleQueryMaxMemorySize; extern int8_t tsQueryUseMemoryPool; -extern int8_t tsMemPoolDebug; +extern int8_t tsMemPoolFullFunc; //extern int32_t tsQueryBufferPoolSize; extern int32_t tsMinReservedMemorySize; extern int64_t tsCurrentAvailMemorySize; diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 683e37a0fa..d5936ff8cf 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -152,11 +152,11 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp); extern void* gMemPoolHandle; extern threadlocal void* threadPoolSession; extern threadlocal bool threadPoolEnabled; -extern int8_t tsMemPoolDebug; +extern int8_t tsMemPoolFullFunc; -#define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) -#define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) +#define taosEnableMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) +#define taosDisableMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) #define taosSaveDisableMemPoolUsage(_enable, _randErr) do { (_enable) = threadPoolEnabled; (_randErr) = tsEnableRandErr; threadPoolEnabled = false; tsEnableRandErr = false;} while (0) #define taosRestoreEnableMemPoolUsage(_enable, _randErr) do { threadPoolEnabled = (_enable); tsEnableRandErr = (_randErr);} while (0) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 995b592b3c..699337fe8a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -54,7 +54,7 @@ int32_t tsMaxShellConns = 50000; int32_t tsShellActivityTimer = 3; // second // memory pool -int8_t tsMemPoolDebug = 0; +int8_t tsMemPoolFullFunc = 0; int8_t tsQueryUseMemoryPool = 1; int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 77a99e8fa3..1604769c01 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -274,11 +274,11 @@ extern SQueryMgmt gQueryMgmt; #define QW_SINK_ENABLE_MEMPOOL(_ctx) \ do { \ if ((_ctx)->sinkWithMemPool) { \ - taosEnableFullMemPoolUsage((_ctx)->memPoolSession); \ + taosEnableMemPoolUsage((_ctx)->memPoolSession); \ } \ } while (0) -#define QW_SINK_DISABLE_MEMPOOL() taosDisableFullMemPoolUsage() +#define QW_SINK_DISABLE_MEMPOOL() taosDisableMemPoolUsage() #define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n) diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 0c41649e89..87cef4c32e 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -276,9 +276,9 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle); if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) { - taosEnableFullMemPoolUsage(ctx->memPoolSession); + taosEnableMemPoolUsage(ctx->memPoolSession); qDestroyTask(otaskHandle); - taosDisableFullMemPoolUsage(); + taosDisableMemPoolUsage(); qDebug("task handle destroyed"); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0168d4b9eb..28fff21239 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -141,9 +141,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (taskHandle) { qwDbgSimulateSleep(); - taosEnableFullMemPoolUsage(ctx->memPoolSession); + taosEnableMemPoolUsage(ctx->memPoolSession); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); - taosDisableFullMemPoolUsage(); + taosDisableMemPoolUsage(); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { @@ -780,9 +780,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->queryMsgType = qwMsg->msgType; ctx->localExec = false; - taosEnableFullMemPoolUsage(ctx->memPoolSession); + taosEnableMemPoolUsage(ctx->memPoolSession); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); - taosDisableFullMemPoolUsage(); + taosDisableMemPoolUsage(); if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; @@ -790,9 +790,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } - taosEnableFullMemPoolUsage(ctx->memPoolSession); + taosEnableMemPoolUsage(ctx->memPoolSession); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); - taosDisableFullMemPoolUsage(); + taosDisableMemPoolUsage(); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 5c7ee9e19c..e69245700d 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -1252,7 +1252,7 @@ _return: void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size); } @@ -1282,7 +1282,7 @@ _return: } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size); } @@ -1314,7 +1314,7 @@ _return: } void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size); } @@ -1365,7 +1365,7 @@ _return: } char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr); } @@ -1401,7 +1401,7 @@ _return: } char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size); } @@ -1439,7 +1439,7 @@ _return: void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr); return; } @@ -1465,7 +1465,7 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, } int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return taosMemSize(ptr); } @@ -1487,7 +1487,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha } void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size); } @@ -1546,7 +1546,7 @@ void taosMemPoolModDestroy(void) { int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) { - if (0 == tsMemPoolDebug) { + if (0 == tsMemPoolFullFunc) { return taosMemTrim(size, trimed); } diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 35c6501229..f791ac6537 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -1576,12 +1576,15 @@ TEST(PerfTest, allocLatency) { #if 0 -TEST(FuncTest, SingleThreadTest) { - char* caseName = "FuncTest:SingleThreadTest"; +TEST(poolFuncTest, SingleThreadTest) { + char* caseName = "poolFuncTest:SingleThreadTest"; SMPTestParam param = {0}; param.reserveMode = true; param.threadNum = 1; param.jobQuota = 1024; + param.enableMemPool = true; + + tsMemPoolFullFunc = 0; mptPrintTestBeginInfo(caseName, ¶m); @@ -1592,8 +1595,8 @@ TEST(FuncTest, SingleThreadTest) { } #endif #if 0 -TEST(EnablePoolFuncTest, MultiThreadTest) { - char* caseName = "FuncTest:MultiThreadTest"; +TEST(poolFuncTest, MultiThreadTest) { + char* caseName = "poolFuncTest:MultiThreadTest"; SMPTestParam param = {0}; param.reserveMode = true; param.threadNum = 6; @@ -1601,6 +1604,8 @@ TEST(EnablePoolFuncTest, MultiThreadTest) { param.randTask = true; param.enableMemPool = true; + tsMemPoolFullFunc = 0; + mptPrintTestBeginInfo(caseName, ¶m); for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { @@ -1610,7 +1615,48 @@ TEST(EnablePoolFuncTest, MultiThreadTest) { } #endif +#if 0 +TEST(poolFullFuncTest, SingleThreadTest) { + char* caseName = "poolFullFuncTest:SingleThreadTest"; + SMPTestParam param = {0}; + param.reserveMode = true; + param.threadNum = 1; + param.jobQuota = 1024; + param.enableMemPool = true; + + tsMemPoolFullFunc = 1; + + mptPrintTestBeginInfo(caseName, ¶m); + + for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { + mptRunCase(¶m, i); + } + +} +#endif #if 1 +TEST(poolFullFuncTest, MultiThreadTest) { + char* caseName = "poolFullFuncTest:MultiThreadTest"; + SMPTestParam param = {0}; + param.reserveMode = true; + param.threadNum = 6; + param.jobQuota = 1024; + param.randTask = true; + param.enableMemPool = true; + + tsMemPoolFullFunc = 1; + + mptPrintTestBeginInfo(caseName, ¶m); + + for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { + mptRunCase(¶m, i); + } + +} +#endif + + +#if 0 TEST(DisablePoolFuncTest, MultiThreadTest) { char* caseName = "FuncTest:MultiThreadTest"; SMPTestParam param = {0};