fix: memory free not found issues

This commit is contained in:
dapan1121 2024-11-08 09:58:32 +08:00
parent dd2ab5b361
commit e4c87a024b
12 changed files with 60 additions and 45 deletions

View File

@ -145,20 +145,25 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp);
#ifndef BUILD_TEST #ifndef BUILD_TEST
extern void* gMemPoolHandle; extern void* gMemPoolHandle;
extern threadlocal void* threadPoolSession; extern threadlocal void* threadPoolSession;
extern threadlocal bool threadPoolEnabled;
#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) #define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableMemoryPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) #define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0)
#define taosMemoryMalloc(_size) ((NULL != gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) #define taosEnableMemPoolUsage() do { threadPoolEnabled = true; tsEnableRandErr = true;} while (0)
#define taosMemoryCalloc(_num, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) #define taosDisableMemPoolUsage() do { threadPoolEnabled = false; tsEnableRandErr = false;} while (0)
#define taosMemoryRealloc(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) #define taosMemoryMalloc(_size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryFree(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) #define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemorySize(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) #define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosMemoryTrim(_size, _trimed) ((NULL != gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) #define taosStrdup(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #define taosStrndup(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#else #else
#define taosEnableMemoryPoolUsage(_pool, _session) #define taosEnableMemoryPoolUsage(_pool, _session)
#define taosDisableMemoryPoolUsage() #define taosDisableMemoryPoolUsage()

View File

@ -2076,7 +2076,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes}, {"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental}, {"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum}, {"maxTsmaNum", &tsMaxTsmaNum},
{"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}}; {"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize},
{"minReservedMemorySize", &tsMinReservedMemorySize}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false); code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -445,7 +445,7 @@ int mainWindows(int argc, char **argv) {
} }
if ((code = taosMemoryPoolInit(qWorkerRetireJobs, qWorkerRetireJob)) != 0) { if ((code = taosMemoryPoolInit(qWorkerRetireJobs, qWorkerRetireJob)) != 0) {
dError("failed to init conv"); dError("failed to init memPool, error:0x%x", code);
taosCloseLog(); taosCloseLog();
taosCleanupArgs(); taosCleanupArgs();
return code; return code;

View File

@ -627,9 +627,12 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead)); tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
terrno = tEncodeSVDropStbReq(&encoder, &req); int32_t code = tEncodeSVDropStbReq(&encoder, &req);
tEncoderClear(&encoder); tEncoderClear(&encoder);
if (terrno != 0) return NULL; if (code != 0) {
terrno = code;
return NULL;
}
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;

View File

@ -262,11 +262,11 @@ extern SQueryMgmt gQueryMgmt;
#define QW_SINK_ENABLE_MEMPOOL(_ctx) \ #define QW_SINK_ENABLE_MEMPOOL(_ctx) \
do { \ do { \
if ((_ctx)->sinkWithMemPool) { \ if ((_ctx)->sinkWithMemPool) { \
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, (_ctx)->memPoolSession); \ taosEnableFullMemPoolUsage((_ctx)->memPoolSession); \
} \ } \
} while (0) } while (0)
#define QW_SINK_DISABLE_MEMPOOL() taosDisableMemoryPoolUsage() #define QW_SINK_DISABLE_MEMPOOL() taosDisableFullMemPoolUsage()
#define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n) #define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n)

View File

@ -276,9 +276,9 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); taosEnableFullMemPoolUsage(ctx->memPoolSession);
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
taosDisableMemoryPoolUsage(); taosDisableFullMemPoolUsage();
qDebug("task handle destroyed"); qDebug("task handle destroyed");
} }

View File

@ -167,9 +167,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); taosEnableFullMemPoolUsage(ctx->memPoolSession);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
taosDisableMemoryPoolUsage(); taosDisableFullMemPoolUsage();
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
@ -803,9 +803,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->queryMsgType = qwMsg->msgType; ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false; ctx->localExec = false;
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); taosEnableFullMemPoolUsage(ctx->memPoolSession);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
taosDisableMemoryPoolUsage(); taosDisableFullMemPoolUsage();
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
@ -813,9 +813,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); taosEnableFullMemPoolUsage(ctx->memPoolSession);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
taosDisableMemoryPoolUsage(); taosDisableFullMemPoolUsage();
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));

View File

@ -142,7 +142,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
_OVER: _OVER:
if (pJson != NULL) tjsonDelete(pJson); if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer); if (buffer != NULL) taosMemFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) { if (code != 0) {

View File

@ -1085,13 +1085,13 @@ int32_t walSaveMeta(SWal* pWal) {
} }
} }
taosMemoryFree(serialized); taosMemFree(serialized);
return code; return code;
_err: _err:
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code)); wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
(void)taosCloseFile(&pMetaFile); (void)taosCloseFile(&pMetaFile);
taosMemoryFree(serialized); taosMemFree(serialized);
return code; return code;
} }

View File

@ -433,7 +433,12 @@ int32_t taosMemTrim(int32_t size, bool* trimed) {
// do nothing // do nothing
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
#else #else
if (trimed) {
*trimed = malloc_trim(size); *trimed = malloc_trim(size);
} else {
malloc_trim(size);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
#endif #endif
} }

View File

@ -318,6 +318,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
return terrno = TSDB_CODE_INVALID_PTR; return terrno = TSDB_CODE_INVALID_PTR;
} }
int32_t code = TSDB_CODE_SUCCESS;
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// need the resize process, write lock applied // need the resize process, write lock applied
@ -327,10 +328,15 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
taosHashWUnlock(pHashObj); taosHashWUnlock(pHashObj);
} }
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
code = terrno;
return code;
}
// disable resize // disable resize
taosHashRLock(pHashObj); taosHashRLock(pHashObj);
int32_t code = TSDB_CODE_SUCCESS;
uint32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); uint32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
@ -350,33 +356,22 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
if (pNode == NULL) { if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table // no data in hash table with the specified key, add it into hash table
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _exit;
}
pushfrontNodeInEntryList(pe, pNewNode); pushfrontNodeInEntryList(pe, pNewNode);
(void)atomic_add_fetch_64(&pHashObj->size, 1); (void)atomic_add_fetch_64(&pHashObj->size, 1);
} else { } else {
// not support the update operation, return error // not support the update operation, return error
if (pHashObj->enableUpdate) { if (pHashObj->enableUpdate) {
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _exit;
}
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else { } else {
FREE_HASH_NODE(pHashObj->freeFp, pNewNode);
terrno = TSDB_CODE_DUP_KEY; terrno = TSDB_CODE_DUP_KEY;
code = terrno; code = terrno;
goto _exit; goto _exit;
} }
} }
_exit: _exit:
taosHashEntryWUnlock(pHashObj, pe); taosHashEntryWUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj); taosHashRUnlock(pHashObj);
return code; return code;

View File

@ -24,6 +24,8 @@
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
void* gMemPoolHandle = NULL; void* gMemPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL; threadlocal void* threadPoolSession = NULL;
threadlocal bool threadPoolEnabled = true;
SMemPoolMgmt gMPMgmt = {0}; SMemPoolMgmt gMPMgmt = {0};
SMPStrategyFp gMPFps[] = { SMPStrategyFp gMPFps[] = {
{NULL}, {NULL},
@ -209,7 +211,7 @@ int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) {
} }
int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(mpCheckCfg(cfg)); // MP_ERR_RET(mpCheckCfg(cfg));
TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg)); TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg));
@ -878,10 +880,14 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
mpLogDetailStat(&pPool->stat.statDetail, item, pInput); mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
} }
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
taosDisableMemPoolUsage();
mpLogPosStat(&pSession->stat.posStat, item, pInput, true); mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
taosEnableMemPoolUsage();
} }
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
taosDisableMemPoolUsage();
mpLogPosStat(&pPool->stat.posStat, item, pInput, false); mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
taosEnableMemPoolUsage();
} }
break; break;
} }