From a85a73e2de83aa42ed1341346caedc9cae531b22 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 15 Sep 2022 16:30:21 +0800 Subject: [PATCH] fix: fix memory leak issues --- include/libs/executor/executor.h | 1 + source/libs/command/src/explain.c | 4 +++- source/libs/executor/src/executorimpl.c | 15 ++++++++------- source/libs/qworker/src/qworker.c | 2 +- source/libs/scheduler/src/schRemote.c | 2 ++ source/libs/scheduler/src/schTask.c | 3 ++- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 86ae5ef155..8c1d957381 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -33,6 +33,7 @@ typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, i typedef struct { void *handle; + bool localExec; localFetchFp fp; SArray *explainRes; } SLocalFetch; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 618bbb7593..577e9cae1d 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -28,7 +28,7 @@ void qExplainFreeResNode(SExplainResNode *resNode) { return; } - taosMemoryFreeClear(resNode->pExecInfo); + taosArrayDestroy(resNode->pExecInfo); SNode *node = NULL; FOREACH(node, resNode->pChildren) { qExplainFreeResNode((SExplainResNode *)node); } @@ -58,6 +58,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i); tFreeSExplainRsp(rsp); } + taosArrayDestroy(group->nodeExecInfo); } pIter = taosHashIterate(pCtx->groupHash, pIter); @@ -66,6 +67,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { taosHashCleanup(pCtx->groupHash); taosArrayDestroy(pCtx->rows); + taosMemoryFreeClear(pCtx->tbuf); taosMemoryFree(pCtx); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 90560a028b..0834894350 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1778,12 +1778,6 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return pTaskInfo->code; - } - SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); @@ -1797,7 +1791,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf SDataBuf pBuf = {0}; int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); loadRemoteDataCallback(pWrapper, &pBuf, code); + taosMemoryFree(pWrapper); } else { + SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources); @@ -4051,7 +4052,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); - if (!pTaskInfo->localFetch.fp) { + if (!pTaskInfo->localFetch.localExec) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f174a163c9..4b3df21319 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -94,7 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes}; + SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 4b15629407..d540b17c90 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -371,6 +371,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa } SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp)); + + taosMemoryFreeClear(msg); break; } case TDMT_SCH_FETCH_RSP: diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e725fb7e45..d463874a54 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -898,7 +898,8 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { pTask->msgLen); SCH_ERR_RET(code); } else { - SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); + //binary msg + //SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } }