fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-05-20 22:07:04 +08:00
parent 899e4d3350
commit cb75e5a863
1 changed files with 4 additions and 6 deletions

View File

@ -421,12 +421,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
if (doSendDispatchMsg(pTask, &req, vgId, pEpSet) < 0) {
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
}
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);