From c116b69941e514f7fbbad4777d336f0cb062f577 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jul 2023 09:25:18 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/libs/stream/src/streamDispatch.c | 16 +++++++++++++--- source/libs/stream/src/streamTask.c | 10 ++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d145ff5636..c5ae27029b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -489,23 +489,33 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId}; + SStreamScanHistoryFinishReq req = { + .streamId = pTask->id.streamId, + .childId = pTask->info.selfChildId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->pMeta->vgId, + }; // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + pTask->notReadyTasks = 1; doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->notReadyTasks = numOfVgs; - qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", - pTask->id.idStr, numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); + qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, + numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.downstreamTaskId = pVgInfo->taskId; doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } + } else { + qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr); + streamProcessScanHistoryFinishRsp(pTask); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 31149a3f22..8088791e0e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -240,6 +240,11 @@ static void freeItem(void* p) { rpcFreeCont(pInfo->msg.pCont); } +static void freeUpstreamItem(void* p) { + SStreamChildEpInfo** pInfo = p; + taosMemoryFree(*pInfo); +} + void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s", pTask->id.idStr); @@ -295,6 +300,11 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pRspMsgList = NULL; } + if (pTask->pUpstreamInfoList != NULL) { + taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem); + pTask->pUpstreamInfoList = NULL; + } + taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); }