fix: memory leak issues

This commit is contained in:
dapan1121 2024-11-12 08:42:54 +08:00
parent a8561fba4f
commit 7cd98a1902
3 changed files with 52 additions and 41 deletions

View File

@ -103,7 +103,7 @@ typedef struct SMPMemHeader {
} SMPMemHeader; } SMPMemHeader;
typedef struct SMPMemTailer { typedef struct SMPMemTailer {
uint8_t tail;
} SMPMemTailer; } SMPMemTailer;
typedef struct SMPListNode { typedef struct SMPListNode {

View File

@ -179,8 +179,18 @@ uint32_t mpFileIdHashFp(const char* fileId, uint32_t len) {
return *(uint32_t*)fileId; return *(uint32_t*)fileId;
} }
void mpDestroyPosStat(SMPStatPos* pStat) {
taosHashCleanup(pStat->fileHash);
pStat->fileHash = NULL;
taosHashCleanup(pStat->remainHash);
pStat->remainHash = NULL;
taosHashCleanup(pStat->allocHash);
pStat->allocHash = NULL;
taosHashCleanup(pStat->freeHash);
pStat->freeHash = NULL;
}
int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) { int32_t mpInitPosStat(SMPStatPos* pStat, bool sessionStat) {
pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pStat->remainHash) { if (NULL == pStat->remainHash) {
uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat); uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
@ -235,7 +245,7 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg)); MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg));
} }
MP_ERR_RET(mpInitStat(&pPool->stat.posStat, false)); MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1180,6 +1190,8 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
mpCheckStatDetail(pPool, pSession, "DestroySession"); mpCheckStatDetail(pPool, pSession, "DestroySession");
mpDestroyPosStat(&pSession->stat.posStat);
TAOS_MEMSET(pSession, 0, sizeof(*pSession)); TAOS_MEMSET(pSession, 0, sizeof(*pSession));
mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
@ -1198,7 +1210,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) {
MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession)); MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession));
} }
MP_ERR_JRET(mpInitStat(&pSession->stat.posStat, true)); MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true));
pSession->pJob = (SMPJob*)pJob; pSession->pJob = (SMPJob*)pJob;
(void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1); (void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1);
@ -1293,6 +1305,8 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
} else if (0 == size){ } else if (0 == size){
input.pMem = input.pOrigMem; input.pMem = input.pOrigMem;
input.pOrigMem = NULL;
input.size = input.origSize;
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
input.pMem = NULL; input.pMem = NULL;
@ -1301,6 +1315,8 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
input.pMem = input.pOrigMem; input.pMem = input.pOrigMem;
input.pOrigMem = NULL;
input.size = input.origSize;
input.procFlags = MP_STAT_PROC_FLAG_EXEC; input.procFlags = MP_STAT_PROC_FLAG_EXEC;
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
@ -1453,6 +1469,8 @@ void taosMemPoolClose(void* poolHandle) {
mpCheckStatDetail(pPool, NULL, "PoolClose"); mpCheckStatDetail(pPool, NULL, "PoolClose");
mpDestroyPosStat(&pPool->stat.posStat);
taosMemoryFree(pPool->name); taosMemoryFree(pPool->name);
mpDestroyCacheGroup(&pPool->sessionCache); mpDestroyCacheGroup(&pPool->sessionCache);
} }
@ -1556,7 +1574,12 @@ int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
} }
#endif #endif
taosGetTotalMemory(&tsTotalMemoryKB); code = taosGetTotalMemory(&tsTotalMemoryKB);
if (TSDB_CODE_SUCCESS != code) {
uInfo("fail to system total memory, error: %s", tstrerror(code));
return code;
}
if (tsTotalMemoryKB <= 0) { if (tsTotalMemoryKB <= 0) {
uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB); uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB);
return code; return code;
@ -1602,7 +1625,7 @@ int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
return code; return code;
} }
uInfo("memory pool initialized"); uInfo("memory pool initialized, reservedSize:%dMB, freeAfterReserved:%" PRId64 "MB", tsMinReservedMemorySize, freeSizeAfterRes/1048576UL);
return code; return code;
} }

View File

@ -66,7 +66,7 @@ threadlocal void* mptThreadPoolSession = NULL;
#define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0) #define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0)
#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL) #define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL, mptThreadPoolSession = NULL)
#define mptSaveDisableMemoryPoolUsage(_handle) do { (_handle) = mptThreadPoolHandle; mptThreadPoolHandle = NULL; } while (0) #define mptSaveDisableMemoryPoolUsage(_handle) do { (_handle) = mptThreadPoolHandle; mptThreadPoolHandle = NULL; } while (0)
#define mptRestoreEnableMemoryPoolUsage(_handle) (mptThreadPoolHandle = (_handle)) #define mptRestoreEnableMemoryPoolUsage(_handle) (mptThreadPoolHandle = (_handle))
@ -176,7 +176,6 @@ typedef struct SMPTestCtx {
int64_t tId; int64_t tId;
SHashObj* pJobs; SHashObj* pJobs;
BoundedQueue* pJobQueue; BoundedQueue* pJobQueue;
void* memPoolHandle;
SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM];
SMPTestJobCtx* jobCtxs; SMPTestJobCtx* jobCtxs;
SMPTestParam param; SMPTestParam param;
@ -242,6 +241,7 @@ void mptDeleteJobQueueData(void* pData) {
} }
void mptInit() { void mptInit() {
osDefaultInit();
mptInitLogFile(); mptInitLogFile();
mptCtrl.caseLoopTimes = 1; mptCtrl.caseLoopTimes = 1;
@ -266,10 +266,10 @@ void mptInit() {
} }
void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) { void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) {
assert(mptCtx.memPoolHandle); assert(gMemPoolHandle);
assert(pSession); assert(pSession);
mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pSession); mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < pTask->memIdx; ++i) { for (int32_t i = 0; i < pTask->memIdx; ++i) {
pTask->stat.times.memFree.exec++; pTask->stat.times.memFree.exec++;
pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p); pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p);
@ -349,7 +349,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p
pJobCtx->pJob = pJob; pJobCtx->pJob = pJob;
pJob->pCtx = pJobCtx; pJob->pCtx = pJobCtx;
assert(0 == taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo)); assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo));
char id[sizeof(tId) + sizeof(eId)] = {0}; char id[sizeof(tId) + sizeof(eId)] = {0};
MPT_SET_TEID(id, tId, eId); MPT_SET_TEID(id, tId, eId);
@ -411,7 +411,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat)));
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]);
taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]); taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i]);
} }
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode); uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode);
@ -531,23 +531,7 @@ void mptRetireJobCb(uint64_t jobId, int32_t errCode) {
} }
void mptInitPool(void) { void mptInitPool(void) {
SMemPoolCfg cfg = {0}; assert(0 == taosMemoryPoolInit(mptRetireJobsCb, mptRetireJobCb));
if (!mptCtx.param.reserveMode) {
//cfg.upperLimitSize = mptCtx.param.upperLimitSize;
} else {
int64_t memSize = 0;
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
cfg.reserveSize = &mptCtx.param.reserveSize;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.chunkSize = 1048576;
cfg.jobQuota = &mptCtx.param.jobQuota;
cfg.cb.failFp = mptRetireJobsCb;
cfg.cb.reachFp = mptRetireJobCb;
ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle));
} }
void mptWriteMem(void* pStart, int32_t size) { void mptWriteMem(void* pStart, int32_t size) {
@ -671,6 +655,10 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
pTask->stat.bytes.memRealloc.origExec+=osize; pTask->stat.bytes.memRealloc.origExec+=osize;
pTask->stat.times.memRealloc.fail++; pTask->stat.times.memRealloc.fail++;
pTask->stat.bytes.memRealloc.fail+=size; pTask->stat.bytes.memRealloc.fail+=size;
pTask->stat.times.memFree.exec++;
pTask->stat.bytes.memFree.exec+=osize;
pTask->stat.times.memFree.succ++;
pTask->stat.bytes.memFree.succ+=osize; pTask->stat.bytes.memFree.succ+=osize;
uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
pTask->memIdx--; pTask->memIdx--;
@ -700,12 +688,12 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p); osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p);
pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0); pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0);
pTask->stat.times.memRealloc.exec++; pTask->stat.times.memFree.exec++;
pTask->stat.bytes.memRealloc.origExec+=osize; pTask->stat.bytes.memFree.exec+=osize;
assert(NULL == pTask->pMemList[pTask->memIdx - 1].p); assert(NULL == pTask->pMemList[pTask->memIdx - 1].p);
pTask->stat.times.memRealloc.succ++; pTask->stat.times.memFree.succ++;
pTask->stat.bytes.memRealloc.origSucc+=osize; pTask->stat.bytes.memFree.succ+=osize;
pTask->memIdx--; pTask->memIdx--;
pTask->lastAct = actId; pTask->lastAct = actId;
@ -905,7 +893,7 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) {
atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1); atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1);
mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pJobCtx->pSessions[idx]); mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]);
mptSimulateTask(pJobCtx, pCtx); mptSimulateTask(pJobCtx, pCtx);
mptDisableMemoryPoolUsage(); mptDisableMemoryPoolUsage();
@ -936,7 +924,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
bool needEnd = false; bool needEnd = false;
int64_t poolUsedSize = 0; int64_t poolUsedSize = 0;
taosMemPoolGetUsedSizeBegin(mptCtx.memPoolHandle, &usedSize, &needEnd); taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd);
for (int32_t i = 0; i < jobNum; ++i) { for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
int64_t jobUsedSize = 0; int64_t jobUsedSize = 0;
@ -959,7 +947,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
assert(poolUsedSize == usedSize); assert(poolUsedSize == usedSize);
taosMemPoolGetUsedSizeEnd(mptCtx.memPoolHandle); taosMemPoolGetUsedSizeEnd(gMemPoolHandle);
} }
void* mptThreadFunc(void* param) { void* mptThreadFunc(void* param) {
@ -1057,7 +1045,7 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize); MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize);
MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota); MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota);
MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode); MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode);
MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize); //MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize);
MPT_PRINTF("\t test thread num: %d\n", param->threadNum); MPT_PRINTF("\t test thread num: %d\n", param->threadNum);
MPT_PRINTF("\t random exec task: %d\n", param->randTask); MPT_PRINTF("\t random exec task: %d\n", param->randTask);
} }
@ -1066,7 +1054,7 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
#if 1 #if 1
#if 1 #if 0
TEST(FuncTest, SysMemoryPerfTest) { TEST(FuncTest, SysMemoryPerfTest) {
char* caseName = "FuncTest:SingleThreadTest"; char* caseName = "FuncTest:SingleThreadTest";
int32_t code = 0; int32_t code = 0;
@ -1076,7 +1064,7 @@ TEST(FuncTest, SysMemoryPerfTest) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
memset(p, 0, msize); memset(p, 0, msize);
int64_t totalUs = taosGetTimestampUs() - st; int64_t totalUs = taosGetTimestampUs() - st;
printf("memset %" PRId64 " used time:%" PRId64 "us\n", msize, totalUs, totalUs); printf("memset %" PRId64 " used time:%" PRId64 "us, speed:%dMB/ms\n", msize, totalUs, msize/1048576UL/(totalUs/1000UL));
int64_t freeSize = 0; int64_t freeSize = 0;
int32_t loopTimes = 1000000; int32_t loopTimes = 1000000;
@ -1085,7 +1073,7 @@ TEST(FuncTest, SysMemoryPerfTest) {
for (int32_t i = 0; i < loopTimes; ++i) { for (int32_t i = 0; i < loopTimes; ++i) {
code = taosGetSysAvailMemory(&freeSize); code = taosGetSysAvailMemory(&freeSize);
assert(0 == code); assert(0 == code);
taosMsleep(1); taosMsleep(5);
} }
totalUs = taosGetTimestampUs() - st; totalUs = taosGetTimestampUs() - st;