From 03e9b15237994f7815b472c2371966b70f557db5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 18 Jul 2022 13:32:17 +0800 Subject: [PATCH] fix: fix taosc memory leak --- include/libs/qcom/query.h | 5 +++++ source/client/src/clientHb.c | 4 +--- source/client/src/clientImpl.c | 8 -------- source/client/src/clientMsgHandler.c | 6 ++++++ source/client/src/tmq.c | 2 ++ source/libs/catalog/inc/catalogInt.h | 8 ++++++++ source/libs/catalog/src/ctgRemote.c | 8 ++++---- source/libs/executor/src/dataInserter.c | 1 + source/libs/executor/src/executorimpl.c | 7 +------ source/libs/nodes/src/nodesUtilFuncs.c | 10 +++++++++- source/libs/qcom/src/queryUtil.c | 10 ++++++++++ source/libs/scheduler/src/schRemote.c | 10 ++++++---- source/libs/scheduler/src/schUtil.c | 16 +++++++++------- 13 files changed, 62 insertions(+), 33 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index a93cf1f9b8..58739b4af7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -162,9 +162,12 @@ typedef struct SRequestConnInfo { SEpSet mgmtEps; } SRequestConnInfo; +typedef void (*__freeFunc)(void *param); + typedef struct SMsgSendInfo { __async_send_cb_fn_t fp; // async callback function STargetInfo target; // for update epset + __freeFunc paramFreeFp; void* param; uint64_t requestId; uint64_t requestObjRefId; @@ -188,6 +191,8 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); +void destroySendMsgInfo(SMsgSendInfo* pMsgBody); + int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void* ctx); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index b52df105b1..6a583842d1 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -286,13 +286,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { if (pInst == NULL || NULL == *pInst) { taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); - taosMemoryFreeClear(param); tFreeClientHbBatchRsp(&pRsp); return -1; } - taosMemoryFreeClear(param); - if (code != 0) { (*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1); tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes); @@ -716,6 +713,7 @@ static void *hbThreadFunc(void *param) { pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; pInfo->param = strdup(pAppHbMgr->key); + pInfo->paramFreeFp = taosMemoryFree; pInfo->requestId = generateRequestId(); pInfo->requestObjRefId = 0; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e8eef0d287..9ad3be4d92 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -29,7 +29,6 @@ static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); -static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -1215,13 +1214,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { return pMsgSendInfo; } -static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { - assert(pMsgBody != NULL); - taosMemoryFreeClear(pMsgBody->target.dbFName); - taosMemoryFreeClear(pMsgBody->msgInfo.pData); - taosMemoryFreeClear(pMsgBody); -} - void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) { if (NULL == pEpSet) { return; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 520a566e2b..db33532aef 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -255,6 +255,8 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); } + taosMemoryFree(pMsg->pData); + if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); } else { @@ -278,6 +280,8 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { pRequest->body.resInfo.execRes.res = alterRsp.pMeta; } + taosMemoryFree(pMsg->pData); + if (pRequest->body.queryFp != NULL) { SExecResult* pRes = &pRequest->body.resInfo.execRes; @@ -387,6 +391,8 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { tFreeSShowVariablesRsp(&rsp); } + taosMemoryFree(pMsg->pData); + if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); } else { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ed0ec516b2..fb835d3878 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -506,6 +506,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->param = pParam; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb2; pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; // send msg @@ -1516,6 +1517,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; + sendInfo->paramFreeFp = taosMemoryFree; sendInfo->fp = tmqAskEpCb; sendInfo->msgType = TDMT_MND_MQ_ASK_EP; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index fb9f588bae..9003de97d7 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -532,6 +532,14 @@ typedef struct SCtgOperation { } \ } while (0) +#define CTG_API_JENTER() do { \ + CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \ + if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \ + CTG_ERR_JRET(TSDB_CODE_CTG_OUT_OF_SERVICE); \ + } \ +} while (0) + #define CTG_API_LEAVE_NOLOCK(c) do { \ int32_t __code = c; \ diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 8e0a5b7de3..1e375471f9 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -244,10 +244,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; int32_t code = 0; + SCtgJob* pJob = NULL; - CTG_API_ENTER(); + CTG_API_JENTER(); - SCtgJob* pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId); + pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId); if (NULL == pJob) { qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId); goto _return; @@ -266,8 +267,6 @@ _return: if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); } - - taosMemoryFree(param); CTG_API_LEAVE(code); } @@ -293,6 +292,7 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg param->taskId = pTask->taskId; msgSendInfo->param = param; + msgSendInfo->paramFreeFp = taosMemoryFree; msgSendInfo->fp = ctgHandleMsgCallback; *pMsgSendInfo = msgSendInfo; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index a575e355f1..440a8e014a 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -110,6 +110,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs pParam->pInserter = pInserter; pMsgSendInfo->param = pParam; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = ntohl(pMsg->length); pMsgSendInfo->msgType = TDMT_VND_SUBMIT; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c3d9f66479..5a1fb770e5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1990,12 +1990,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } -static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { - assert(pMsgBody != NULL); - taosMemoryFreeClear(pMsgBody->msgInfo.pData); - taosMemoryFreeClear(pMsgBody); -} - void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; assert(pMsg->info.ahandle != NULL); @@ -2055,6 +2049,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf pWrapper->sourceIndex = sourceIndex; pMsgSendInfo->param = pWrapper; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgType = pSource->fetchMsgType; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 1dc3db033b..82b6e7b236 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1923,15 +1923,18 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond); } + bool needOutput = false; switch (classifyCondition(*pCondition)) { case COND_TYPE_PRIMARY_KEY: if (NULL != pPrimaryKeyCond) { *pPrimaryKeyCond = *pCondition; + needOutput = true; } break; case COND_TYPE_TAG_INDEX: if (NULL != pTagIndexCond) { *pTagIndexCond = *pCondition; + needOutput = true; } if (NULL != pTagCond) { SNode* pTempCond = *pCondition; @@ -1942,21 +1945,26 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** } } *pTagCond = pTempCond; + needOutput = true; } break; case COND_TYPE_TAG: if (NULL != pTagCond) { *pTagCond = *pCondition; + needOutput = true; } break; case COND_TYPE_NORMAL: default: if (NULL != pOtherCond) { *pOtherCond = *pCondition; + needOutput = true; } break; } - *pCondition = NULL; + if (needOutput) { + *pCondition = NULL; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index eeb44c4f82..6b1476fe46 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -138,6 +138,16 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) return 0; } +void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { + assert(pMsgBody != NULL); + taosMemoryFreeClear(pMsgBody->target.dbFName); + taosMemoryFreeClear(pMsgBody->msgInfo.pData); + if (pMsgBody->paramFreeFp) { + (*pMsgBody->paramFreeFp)(pMsgBody->param); + } + taosMemoryFreeClear(pMsgBody); +} + int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void* rpcCtx) { char* pMsg = rpcMallocCont(pInfo->msgInfo.len); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 6983bbf013..c8eafd7382 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -386,7 +386,6 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { schProcessOnCbEnd(pJob, pTask, code); taosMemoryFreeClear(pMsg->pData); - taosMemoryFreeClear(param); qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); @@ -398,7 +397,6 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); - taosMemoryFreeClear(param); return TSDB_CODE_SUCCESS; } @@ -447,8 +445,8 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); _return: + tFreeSSchedulerHbRsp(&rsp); - taosMemoryFree(param); taosMemoryFree(pMsg->pData); SCH_RET(code); } @@ -514,7 +512,9 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + msgSendInfo->paramFreeFp = taosMemoryFree; SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param)); + SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp)); if (pJob) { @@ -535,7 +535,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 _return: - schFreeSMsgSendInfo(msgSendInfo); + destroySendMsgInfo(msgSendInfo); SCH_RET(code); } @@ -676,6 +676,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { param->pTrans = pJob->conn.pTrans; pMsgSendInfo->param = param; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = fp; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo}; @@ -795,6 +796,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { pDst->param = NULL; SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); + pDst->paramFreeFp = taosMemoryFree; *dst = pDst; diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index f848dfa210..6f0aca9a8d 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -50,6 +50,12 @@ char* schGetOpStr(SCH_OP_TYPE type) { } } +void schFreeHbTrans(SSchHbTrans *pTrans) { + rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT); + + schFreeRpcCtx(&pTrans->rpcCtx); +} + void schCleanClusterHb(void* pTrans) { SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); @@ -57,7 +63,7 @@ void schCleanClusterHb(void* pTrans) { while (hb) { if (hb->trans.pTrans == pTrans) { SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL); - rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT); + schFreeHbTrans(hb); taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); } @@ -68,8 +74,6 @@ void schCleanClusterHb(void* pTrans) { } int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) { - return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY - int32_t code = 0; SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); @@ -82,7 +86,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep int64_t taskNum = atomic_load_64(&hb->taskNum); if (taskNum <= 0) { - rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT); + schFreeHbTrans(hb); taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); @@ -265,9 +269,7 @@ void schFreeRpcCtxVal(const void *arg) { } SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg; - taosMemoryFreeClear(pMsgSendInfo->param); - taosMemoryFreeClear(pMsgSendInfo->msgInfo.pData); - taosMemoryFreeClear(pMsgSendInfo); + destroySendMsgInfo(pMsgSendInfo); } void schFreeRpcCtx(SRpcCtx *pCtx) {