fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-07-25 09:25:18 +08:00
parent 01f11bf5a6
commit c116b69941
2 changed files with 23 additions and 3 deletions

View File

@ -489,23 +489,33 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId};
SStreamScanHistoryFinishReq req = {
.streamId = pTask->id.streamId,
.childId = pTask->info.selfChildId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->pMeta->vgId,
};
// serialize
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->notReadyTasks = 1;
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
pTask->id.idStr, numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamTaskId = pVgInfo->taskId;
doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
streamProcessScanHistoryFinishRsp(pTask);
}
return 0;

View File

@ -240,6 +240,11 @@ static void freeItem(void* p) {
rpcFreeCont(pInfo->msg.pCont);
}
static void freeUpstreamItem(void* p) {
SStreamChildEpInfo** pInfo = p;
taosMemoryFree(*pInfo);
}
void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr);
@ -295,6 +300,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pRspMsgList = NULL;
}
if (pTask->pUpstreamInfoList != NULL) {
taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem);
pTask->pUpstreamInfoList = NULL;
}
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);
}