From a5bd91492d73b09796496122a488208618ea4c14 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 19 Nov 2024 10:12:45 +0800 Subject: [PATCH] fix: memory leak and dead lock issues --- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 1 + source/libs/qworker/src/qwUtil.c | 10 ++++ source/util/inc/tmempoolInt.h | 17 +++++++ source/util/src/mpDirect.c | 2 +- source/util/src/tmempool.c | 13 +++-- source/util/test/memPoolTest.cpp | 63 ++++++++++++------------- 6 files changed, 70 insertions(+), 36 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 6d4ebe424a..1708a0c8d9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -220,6 +220,7 @@ void dmCleanup() { dInfo("dnode env is cleaned up"); + taosMemPoolClose(gMemPoolHandle); taosCleanupCfg(); taosCloseLog(); } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 0b5092ab0f..41e36aacb8 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -273,6 +273,10 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(SQWTaskCtx *ctx) { + if (ctx->dynamicTask) { + return; + } + // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle); if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) { @@ -285,6 +289,10 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) { } void qwFreeSinkHandle(SQWTaskCtx *ctx) { + if (ctx->dynamicTask) { + return; + } + // Note: free/kill may in RC void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle); if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) { @@ -580,6 +588,8 @@ void qwCloseRef(void) { if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { taosCloseRef(gQwMgmt.qwRef); // ignore error gQwMgmt.qwRef = -1; + + taosHashCleanup(gQueryMgmt.pJobInfo); } taosWUnLockLatch(&gQwMgmt.lock); } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 421c4e213e..9013ad03a2 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -373,6 +373,23 @@ enum { #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 +#define MP_TRY_LOCK(type, _lock, _res) \ + do { \ + if (MP_READ == (type)) { \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \ + uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + (_res) = taosRTryLockLatch(_lock); \ + uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ + ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \ + } else { \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \ + uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + (_res) = taosWTryLockLatch(_lock); \ + uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ + ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \ + } \ + } while (0) + #define MP_LOCK(type, _lock) \ do { \ diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c index a08ceab8f4..61e8d976e0 100755 --- a/source/util/src/mpDirect.c +++ b/source/util/src/mpDirect.c @@ -91,7 +91,7 @@ int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int6 nSize = taosMemSize(*pPtr); mpUpdateAllocSize(pPool, pSession, nSize - *origSize, nSize - *size + *origSize); } else { - MP_ERR_RET(terrno); + MP_ERR_JRET(terrno); } _return: diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 8cd6a11598..e530ee017a 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -1486,6 +1486,10 @@ _return: } void taosMemPoolClose(void* poolHandle) { + if (NULL == poolHandle) { + return; + } + SMemPool* pPool = (SMemPool*)poolHandle; mpCheckStatDetail(pPool, NULL, "PoolClose"); @@ -1496,6 +1500,8 @@ void taosMemPoolClose(void* poolHandle) { mpDestroyCacheGroup(&pPool->sessionCache); atomic_store_8(&gMPMgmt.modExit, 1); + + (void)taosThreadJoin(gMPMgmt.poolMgmtThread, NULL); } @@ -1550,14 +1556,15 @@ int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock) { return TSDB_CODE_INVALID_PARA; } + int32_t code = 0; SMemPool* pPool = (SMemPool*)poolHandle; if (readLock) { - MP_LOCK(MP_READ, &pPool->cfgLock); + MP_TRY_LOCK(MP_READ, &pPool->cfgLock, code); } else { - MP_LOCK(MP_WRITE, &pPool->cfgLock); + MP_TRY_LOCK(MP_WRITE, &pPool->cfgLock, code); } - return TSDB_CODE_SUCCESS; + return code; } void taosMemPoolUnLockPool(void* poolHandle, bool readLock) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 687f3e595b..98393388bc 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -38,7 +38,7 @@ #include "tvariant.h" #include "stub.h" #include "../inc/tmempoolInt.h" - +#include "tglobal.h" namespace { @@ -64,35 +64,33 @@ enum { static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) { - int32_t code = 0; + int32_t code = -1; - do { - if (MPT_READ == (type)) { - if (atomic_load_32((_lock)) < 0) { - uError("invalid lock value before try read lock"); - break; - } - uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - code = taosRTryLockLatch(_lock); - uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - if (atomic_load_32((_lock)) <= 0) { - uError("invalid lock value after try read lock"); - break; - } - } else { - if (atomic_load_32((_lock)) < 0) { - uError("invalid lock value before try write lock"); - break; - } - uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - code = taosWTryLockLatch(_lock); - uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { - uError("invalid lock value after try write lock"); - break; - } - } - } while (0); + if (MPT_READ == (type)) { + if (atomic_load_32((_lock)) < 0) { + uError("invalid lock value before try read lock"); + return -1; + } + uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + code = taosRTryLockLatch(_lock); + uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + if (atomic_load_32((_lock)) <= 0) { + uError("invalid lock value after try read lock"); + return -1; + } + } else { + if (atomic_load_32((_lock)) < 0) { + uError("invalid lock value before try write lock"); + return -1; + } + uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + code = taosWTryLockLatch(_lock); + uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { + uError("invalid lock value after try write lock"); + return -1; + } + } return code; } @@ -159,7 +157,6 @@ static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) { threadlocal void* mptThreadPoolHandle = NULL; threadlocal void* mptThreadPoolSession = NULL; -int32_t tsSingleQueryMaxMemorySize = 0; //MB #define MPT_SET_TEID(id, tId, eId) \ @@ -1125,7 +1122,6 @@ void mptCheckPoolUsedSize(int32_t jobNum) { } if (sleepTimes > 100) { - MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); break; } @@ -1334,7 +1330,10 @@ void mptRunCase(SMPTestParam* param, int32_t times) { mptDestroyJobs(); taosMemPoolClose(gMemPoolHandle); - gMemPoolHandle = NULL; + + while (gMemPoolHandle) { + taosMsleep(10); + } MPT_PRINTF("\t case end the %dth running\n", times); }