fix: memory leak and dead lock issues

This commit is contained in:
dapan1121 2024-11-19 10:12:45 +08:00
parent 9941dcae9b
commit a5bd91492d
6 changed files with 70 additions and 36 deletions

View File

@ -220,6 +220,7 @@ void dmCleanup() {
dInfo("dnode env is cleaned up");
taosMemPoolClose(gMemPoolHandle);
taosCleanupCfg();
taosCloseLog();
}

View File

@ -273,6 +273,10 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
void qwFreeTaskHandle(SQWTaskCtx *ctx) {
if (ctx->dynamicTask) {
return;
}
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle);
if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) {
@ -285,6 +289,10 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) {
}
void qwFreeSinkHandle(SQWTaskCtx *ctx) {
if (ctx->dynamicTask) {
return;
}
// Note: free/kill may in RC
void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle);
if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) {
@ -580,6 +588,8 @@ void qwCloseRef(void) {
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
taosCloseRef(gQwMgmt.qwRef); // ignore error
gQwMgmt.qwRef = -1;
taosHashCleanup(gQueryMgmt.pJobInfo);
}
taosWUnLockLatch(&gQwMgmt.lock);
}

View File

@ -373,6 +373,23 @@ enum {
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define MP_TRY_LOCK(type, _lock, _res) \
do { \
if (MP_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \
uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
(_res) = taosRTryLockLatch(_lock); \
uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \
} else { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \
uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
(_res) = taosWTryLockLatch(_lock); \
uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \
} \
} while (0)
#define MP_LOCK(type, _lock) \
do { \

View File

@ -91,7 +91,7 @@ int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int6
nSize = taosMemSize(*pPtr);
mpUpdateAllocSize(pPool, pSession, nSize - *origSize, nSize - *size + *origSize);
} else {
MP_ERR_RET(terrno);
MP_ERR_JRET(terrno);
}
_return:

View File

@ -1486,6 +1486,10 @@ _return:
}
void taosMemPoolClose(void* poolHandle) {
if (NULL == poolHandle) {
return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
mpCheckStatDetail(pPool, NULL, "PoolClose");
@ -1496,6 +1500,8 @@ void taosMemPoolClose(void* poolHandle) {
mpDestroyCacheGroup(&pPool->sessionCache);
atomic_store_8(&gMPMgmt.modExit, 1);
(void)taosThreadJoin(gMPMgmt.poolMgmtThread, NULL);
}
@ -1550,14 +1556,15 @@ int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
SMemPool* pPool = (SMemPool*)poolHandle;
if (readLock) {
MP_LOCK(MP_READ, &pPool->cfgLock);
MP_TRY_LOCK(MP_READ, &pPool->cfgLock, code);
} else {
MP_LOCK(MP_WRITE, &pPool->cfgLock);
MP_TRY_LOCK(MP_WRITE, &pPool->cfgLock, code);
}
return TSDB_CODE_SUCCESS;
return code;
}
void taosMemPoolUnLockPool(void* poolHandle, bool readLock) {

View File

@ -38,7 +38,7 @@
#include "tvariant.h"
#include "stub.h"
#include "../inc/tmempoolInt.h"
#include "tglobal.h"
namespace {
@ -64,35 +64,33 @@ enum {
static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) {
int32_t code = 0;
int32_t code = -1;
do {
if (MPT_READ == (type)) {
if (atomic_load_32((_lock)) < 0) {
uError("invalid lock value before try read lock");
break;
}
uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
code = taosRTryLockLatch(_lock);
uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
if (atomic_load_32((_lock)) <= 0) {
uError("invalid lock value after try read lock");
break;
}
} else {
if (atomic_load_32((_lock)) < 0) {
uError("invalid lock value before try write lock");
break;
}
uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
code = taosWTryLockLatch(_lock);
uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) {
uError("invalid lock value after try write lock");
break;
}
}
} while (0);
if (MPT_READ == (type)) {
if (atomic_load_32((_lock)) < 0) {
uError("invalid lock value before try read lock");
return -1;
}
uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
code = taosRTryLockLatch(_lock);
uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
if (atomic_load_32((_lock)) <= 0) {
uError("invalid lock value after try read lock");
return -1;
}
} else {
if (atomic_load_32((_lock)) < 0) {
uError("invalid lock value before try write lock");
return -1;
}
uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
code = taosWTryLockLatch(_lock);
uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) {
uError("invalid lock value after try write lock");
return -1;
}
}
return code;
}
@ -159,7 +157,6 @@ static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) {
threadlocal void* mptThreadPoolHandle = NULL;
threadlocal void* mptThreadPoolSession = NULL;
int32_t tsSingleQueryMaxMemorySize = 0; //MB
#define MPT_SET_TEID(id, tId, eId) \
@ -1125,7 +1122,6 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
}
if (sleepTimes > 100) {
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
break;
}
@ -1334,7 +1330,10 @@ void mptRunCase(SMPTestParam* param, int32_t times) {
mptDestroyJobs();
taosMemPoolClose(gMemPoolHandle);
gMemPoolHandle = NULL;
while (gMemPoolHandle) {
taosMsleep(10);
}
MPT_PRINTF("\t case end the %dth running\n", times);
}