From 90fc572394bb37776cee92e08d188b96319560cc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Nov 2022 15:32:48 +0800 Subject: [PATCH 1/4] avoid epset leak --- source/libs/scheduler/src/schRemote.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 47de2528fa..17dc3d588f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -425,6 +425,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { _return: taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); @@ -438,6 +439,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { code); if (pMsg) { taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); } return TSDB_CODE_SUCCESS; } @@ -492,6 +494,7 @@ _return: tFreeSSchedulerHbRsp(&rsp); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); SCH_RET(code); } From 495bfe932a7412c4587b32e890aebcb41b84c797 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Nov 2022 18:56:44 +0800 Subject: [PATCH 2/4] fix invalid free --- source/libs/scheduler/src/schTask.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index dbdccff302..7e5b3faedb 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -439,6 +439,8 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 code = schDoTaskRedirect(pJob, pTask, pData, rspCode); taosMemoryFree(pData->pData); taosMemoryFree(pData->pEpSet); + pData->pData = NULL; + pData->pEpSet = NULL; SCH_RET(code); @@ -446,6 +448,8 @@ _return: taosMemoryFree(pData->pData); taosMemoryFree(pData->pEpSet); + pData->pData = NULL; + pData->pEpSet = NULL; SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -942,7 +946,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { } SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, - pTask->execId, &qwMsg, explainRes)); + pTask->execId, &qwMsg, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { SCH_ERR_RET(schHandleExplainRes(explainRes)); @@ -1115,7 +1119,7 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { } SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, - pTask->execId, &pRsp, explainRes)); + pTask->execId, &pRsp, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { SCH_ERR_RET(schHandleExplainRes(explainRes)); From 7b30a6ee23f9572a12f018cb6c1b54a6e220a99b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Nov 2022 20:45:11 +0800 Subject: [PATCH 3/4] free unfinished ahandle while quit --- include/libs/qcom/query.h | 2 ++ include/libs/transport/trpc.h | 4 +++ source/client/src/clientEnv.c | 3 ++- source/libs/qcom/src/queryUtil.c | 6 +++++ source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 31 +++++++++++++++--------- 7 files changed, 36 insertions(+), 12 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 651b379851..c6f7b5cf2e 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -199,6 +199,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); +void destroyAhandle(void* ahandle); + int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* ctx); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 53edcd4fee..8a0cccc71f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -72,6 +72,7 @@ typedef struct SRpcMsg { typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); +typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; @@ -97,6 +98,9 @@ typedef struct SRpcInit { // set up timeout for particular msg RpcTfp tfp; + // destroy client ahandle; + RpcDfp dfp; + void *parent; } SRpcInit; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bb25a595af..bb93e4d934 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -140,12 +140,13 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; rpcInit.rfp = clientRpcRfp; - // rpcInit.tfp = clientRpcTfp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; + rpcInit.dfp = destroyAhandle; + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index cf064881c2..6eadf80e3d 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -146,6 +146,12 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } taosMemoryFreeClear(pMsgBody); } +void destroyAhandle(void *ahandle) { + SMsgSendInfo *pSendInfo = ahandle; + if (pSendInfo == NULL) return; + + destroySendMsgInfo(pSendInfo); +} int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* rpcCtx) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index b9167501e2..3b7182f983 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -53,6 +53,7 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); + void (*destroyFp)(void* ahandle); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index d3db4879d1..feb55985de 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -53,6 +53,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; + pRpc->destroyFp = pInit->dfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 21444018cd..42dc65abea 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -143,6 +143,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); +static void cliReleaseUnfinishedMsg(SCliConn* conn); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); @@ -163,17 +164,6 @@ static void destroyThrdObj(SCliThrd* pThrd); static void cliWalkCb(uv_handle_t* handle, void* arg); -static void cliReleaseUnfinishedMsg(SCliConn* conn) { - for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { - SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); - if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { - if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { - conn->ctx.freeFunc(msg->ctx->ahandle); - } - } - destroyCmsg(msg); - } -} #define CLI_RELEASE_UV(loop) \ do { \ uv_walk(loop, cliWalkCb, NULL); \ @@ -266,6 +256,25 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { static void* cliWorkThread(void* arg); +static void cliReleaseUnfinishedMsg(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { + SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); + if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { + if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { + conn->ctx.freeFunc(msg->ctx->ahandle); + continue; + } + if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { + tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); + pTransInst->destroyFp(msg->ctx->ahandle); + } + } + destroyCmsg(msg); + } +} bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; From 1ed333140b8ffd638d0a228b46c31f4e95c2acef Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Nov 2022 22:09:58 +0800 Subject: [PATCH 4/4] fix invalid free --- source/libs/transport/src/transCli.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 42dc65abea..bae9295749 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -265,9 +265,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { conn->ctx.freeFunc(msg->ctx->ahandle); - continue; - } - if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { + } else if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); pTransInst->destroyFp(msg->ctx->ahandle); }