fix: remove taosMem calling

This commit is contained in:
dapan1121 2024-11-11 09:09:40 +08:00
parent 371094cc13
commit 411d61504f
11 changed files with 73 additions and 45 deletions

View File

@ -1370,7 +1370,7 @@ static void *hbThreadFunc(void *param) {
break;
}
*(int32_t *)pInfo->param = i;
pInfo->paramFreeFp = taosMemFree;
pInfo->paramFreeFp = taosAutoMemoryFree;
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;

View File

@ -659,7 +659,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
@ -1385,7 +1385,7 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemFree;
sendInfo->paramFreeFp = taosAutoMemoryFree;
sendInfo->fp = askEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
@ -2173,7 +2173,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemFree;
sendInfo->paramFreeFp = taosAutoMemoryFree;
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
@ -3323,7 +3323,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemFree;
sendInfo->paramFreeFp = taosAutoMemoryFree;
sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

View File

@ -946,7 +946,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
arg->tsdb = fs->tsdb;
arg->fid = fset->fid;
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemFree, arg, NULL);
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true;
}

View File

@ -139,7 +139,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
pParam->pInserter = pInserter;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = msgLen;
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
@ -432,7 +432,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosArrayDestroy(pInserter->pDataBlocks);
taosMemFree(pInserter->pSchema);
taosMemoryFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
(void)taosThreadMutexDestroy(&pInserter->mutex);

View File

@ -116,7 +116,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
taosMemFreeClear(pDataInfo->pRsp);
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
}
} else {
@ -125,7 +125,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
taosMemFreeClear(pDataInfo->pRsp);
taosMemoryFreeClear(pDataInfo->pRsp);
}
break;
}
@ -154,13 +154,13 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
}
taosMemFreeClear(pDataInfo->pRsp);
taosMemoryFreeClear(pDataInfo->pRsp);
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
taosMemFreeClear(pDataInfo->pRsp);
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
}
}
@ -644,7 +644,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
(*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
code = loadRemoteDataCallback(pWrapper, &pBuf, code);
taosMemFree(pWrapper);
taosMemoryFree(pWrapper);
QUERY_CHECK_CODE(code, lino, _end);
} else {
SResFetchReq req = {0};
@ -660,7 +660,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pDataInfo->pSrcUidList = NULL;
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
taosMemFree(pWrapper);
taosMemoryFree(pWrapper);
return pTaskInfo->code;
}
}
@ -668,7 +668,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemFree(pWrapper);
taosMemoryFree(pWrapper);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
}
@ -676,15 +676,15 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
void* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemFree(pWrapper);
taosMemoryFree(pWrapper);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
}
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemFree(pWrapper);
taosMemFree(msg);
taosMemoryFree(pWrapper);
taosMemoryFree(msg);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
}
@ -698,15 +698,15 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemFreeClear(msg);
taosMemFree(pWrapper);
taosMemoryFreeClear(msg);
taosMemoryFree(pWrapper);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return pTaskInfo->code;
}
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgType = pSource->fetchMsgType;

View File

@ -2219,7 +2219,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pRsp->numOfRows, pInfo->loadInfo.totalRows);
if (pRsp->numOfRows == 0) {
taosMemFree(pRsp);
taosMemoryFree(pRsp);
return NULL;
}
}
@ -2229,7 +2229,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
taosMemFreeClear(pRsp);
taosMemoryFreeClear(pRsp);
T_LONG_JMP(pTaskInfo->env, code);
}
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
@ -2239,10 +2239,10 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
taosMemFreeClear(pRsp);
taosMemoryFreeClear(pRsp);
T_LONG_JMP(pTaskInfo->env, code);
}
taosMemFree(pRsp);
taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
} else if (pOperator->status == OP_EXEC_DONE) {

View File

@ -88,7 +88,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
}
abort_parse:
ParseFree(pParser, (FFree)taosMemFree);
ParseFree(pParser, (FFree)taosAutoMemoryFree);
if (TSDB_CODE_SUCCESS == cxt.errCode) {
int32_t code = buildQueryAfterParse(pQuery, cxt.pRootNode, cxt.placeholderNo, &cxt.pPlaceholderValues);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -650,7 +650,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SCH_ERR_JRET(terrno);
}
msgSendInfo->paramFreeFp = taosMemFree;
msgSendInfo->paramFreeFp = taosAutoMemoryFree;
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
@ -821,7 +821,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
param->pTrans = pJob->conn.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
@ -941,7 +941,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
pDst->paramFreeFp = taosMemFree;
pDst->paramFreeFp = taosAutoMemoryFree;
*dst = pDst;

View File

@ -142,7 +142,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemFree(buffer);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {

View File

@ -1098,13 +1098,13 @@ int32_t walSaveMeta(SWal* pWal) {
}
}
taosMemFree(serialized);
taosMemoryFree(serialized);
return code;
_err:
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
(void)taosCloseFile(&pMetaFile);
taosMemFree(serialized);
taosMemoryFree(serialized);
return code;
}

View File

@ -223,7 +223,7 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(mpUpdateCfg(pPool));
pPool->ctrl.statFlags = MP_STAT_FLAG_LOG_ALL;
pPool->ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS);
pPool->ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT;
pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE;
@ -744,19 +744,34 @@ void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput,
SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
if (TSDB_CODE_SUCCESS != code) {
uError("add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d",
uError("add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
if (TSDB_CODE_SUCCESS != code) {
uError("add pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d",
if (TSDB_CODE_DUP_KEY == code) {
SMPFileLine* pFileLine = (SMPFileLine*)taosHashAcquire(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
if (pFileLine) {
char* pFileName = (char*)taosHashGet(pStat->fileHash, &pFileLine->fl.fileId, sizeof(pFileLine->fl.fileId));
if (NULL == pFileName) {
uError("fail to get fileId %u in fileHash", pFileLine->fl.fileId);
} else {
uError("add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d, origAllocAt %s:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat, pFileName, pFileLine->fl.line);
MP_ERR_JRET(code);
}
}
}
uError("add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
if (TSDB_CODE_SUCCESS != code) {
uError("add pMem:%p file:%s line:%d to allocHash failed, error:%s, sessionStat:%d",
uError("add pMem:%p %s:%d to allocHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
@ -771,7 +786,7 @@ void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput,
SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d",
uError("realloc: add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
@ -781,13 +796,13 @@ void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput,
if (pInput->pOrigMem && pInput->origSize > 0) {
code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: rm pOrigMem:%p file:%s line:%d from remainHash failed, error:%s, sessionStat:%d",
uError("realloc: rm pOrigMem:%p %s:%d from remainHash failed, error:%s, sessionStat:%d",
pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->freeHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pOrigMem:%p file:%s line:%d to freeHash failed, error:%s, sessionStat:%d",
uError("realloc: add pOrigMem:%p %s:%d to freeHash failed, error:%s, sessionStat:%d",
pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
@ -798,14 +813,28 @@ void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput,
code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d",
if (TSDB_CODE_DUP_KEY == code) {
SMPFileLine* pFileLine = (SMPFileLine*)taosHashAcquire(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
if (pFileLine) {
char* pFileName = (char*)taosHashGet(pStat->fileHash, &pFileLine->fl.fileId, sizeof(pFileLine->fl.fileId));
if (NULL == pFileName) {
uError("realloc: fail to get fileId %u in fileHash", pFileLine->fl.fileId);
} else {
uError("realloc: add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d, origAllocAt %s:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat, pFileName, pFileLine->fl.line);
MP_ERR_JRET(code);
}
}
}
uError("realloc: add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to allocHash failed, error:%s, sessionStat:%d",
uError("realloc: add pMem:%p %s:%d to allocHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
@ -820,21 +849,20 @@ void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput,
SMPFileLineId fl = {.line = pInput->line};
code = mpGetPosStatFileId(pStat, pInput->file, &fl.fileId, sessionStat);
if (TSDB_CODE_SUCCESS != code) {
uError("free: add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d",
uError("free: add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = taosHashRemove(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
uError("free: rm pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d",
uDebug("free: rm pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->freeHash, &fl, sizeof(fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to freeHash failed, error:%s, sessionStat:%d",
uError("free: add pMem:%p %s:%d to freeHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}