diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bd2b51e9b2..b2927d839d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -290,8 +290,11 @@ typedef struct SSTaskBasicInfo { int64_t triggerParam; // in msec } SSTaskBasicInfo; +typedef struct SStreamDispatchReq SStreamDispatchReq; + typedef struct SDispatchMsgInfo { - void* pData; // current dispatch data + SStreamDispatchReq* pData; // current dispatch data + int8_t dispatchMsgType; int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count int64_t blockingTs; // output blocking timestamp @@ -327,6 +330,7 @@ typedef struct { int64_t step2Start; int64_t start; int32_t updateCount; + int32_t dispatchCount; int64_t latestUpdateTs; } STaskExecStatisInfo; @@ -442,7 +446,7 @@ typedef struct { int32_t taskId; } SStreamTaskRunReq; -typedef struct { +struct SStreamDispatchReq { int32_t type; int64_t stage; // nodeId from upstream task int64_t streamId; @@ -455,7 +459,7 @@ typedef struct { int64_t totalLen; SArray* dataLen; // SArray SArray* data; // SArray -} SStreamDispatchReq; +}; typedef struct { int64_t streamId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 08163ec245..4e9667d969 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -897,12 +897,12 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp status %d", + ") from task:0x%x (vgId:%d), rsp check_status %d", req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 40cadd3387..10a7dc7be7 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -59,6 +59,8 @@ extern int32_t streamBackendCfWrapperId; void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); +void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); +int32_t getNumOfDispatchBranch(SStreamTask* pTask); int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 519271703b..2775a90abf 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -277,59 +277,66 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR return 0; } -static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { +void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) { + for (int32_t i = 0; i < numOfVgroups; i++) { + taosArrayDestroyP(pReq[i].data, taosMemoryFree); + taosArrayDestroy(pReq[i].dataLen); + } + + taosMemoryFree(pReq); +} + +int32_t getNumOfDispatchBranch(SStreamTask* pTask) { + return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) + ? 1 + : taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); +} + +static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); - ASSERT(numOfBlocks != 0); + ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL); + + pTask->msgInfo.dispatchMsgType = pData->type; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - SStreamDispatchReq req = {0}; + SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; - code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); + code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { return code; } for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - - code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); + destroyDispatchMsg(pReq, 1); return code; } } - int32_t vgId = pTask->fixedEpDispatcher.nodeId; - SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; - - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId); - - code = doSendDispatchMsg(pTask, &req, vgId, pEpSet); - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); - return code; + pTask->msgInfo.pData = pReq; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); + int32_t numOfVgroups = taosArrayGetSize(vgInfo); - SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); + SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - for (int32_t i = 0; i < vgSz; i++) { + for (int32_t i = 0; i < numOfVgroups; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); if (code != TSDB_CODE_SUCCESS) { - goto FAIL_SHUFFLE_DISPATCH; + destroyDispatchMsg(pReqs, numOfVgroups); + return code; } } @@ -338,52 +345,95 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) { - for (int32_t j = 0; j < vgSz; j++) { - if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { - goto FAIL_SHUFFLE_DISPATCH; + for (int32_t j = 0; j < numOfVgroups; j++) { + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]); + if (code != 0) { + destroyDispatchMsg(pReqs, numOfVgroups); + return code; } if (pReqs[j].blockNum == 0) { atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); } + pReqs[j].blockNum++; } continue; } - if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) { - goto FAIL_SHUFFLE_DISPATCH; + code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId); + if(code != 0) { + destroyDispatchMsg(pReqs, numOfVgroups); + return code; } } - stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, vgSz); + pTask->msgInfo.pData = pReqs; +// *pDispatchReq = pReqs; - for (int32_t i = 0; i < vgSz; i++) { - if (pReqs[i].blockNum > 0) { +// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroup(s), msgId:%d", pTask->id.idStr, +// pTask->info.selfChildId, numOfBlocks, numOfVgroups, msgId); +// +// for (int32_t i = 0; i < numOfVgroups; i++) { +// if (pReqs[i].blockNum > 0) { +// SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); +// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, +// pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); +// +// code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); +// if (code < 0) { +// destroyDispatchMsg(pReqs, numOfVgroups); +// return code; +// } +// } +// } +// +// stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId); +// code = 0; +// +// *pDispatchReq = pReqs; + } + + stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount); + return code; +} + +static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) { + int32_t code = 0; + int32_t msgId = pTask->taskExecInfo.dispatchCount; + const char* id = pTask->id.idStr; + + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + int32_t vgId = pTask->fixedEpDispatcher.nodeId; + SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; + int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, + pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); + + code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet); + } else { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d", + id, pTask->info.selfChildId, numOfVgroups, msgId); + + for (int32_t i = 0; i < numOfVgroups; i++) { + if (pDispatchMsg[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, - pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); + pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId); - code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); + code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet); if (code < 0) { - goto FAIL_SHUFFLE_DISPATCH; + break; } } } - stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes", pTask->id.idStr, vgSz); - - code = 0; - - FAIL_SHUFFLE_DISPATCH: - for (int32_t i = 0; i < vgSz; i++) { - taosArrayDestroyP(pReqs[i].data, taosMemoryFree); - taosArrayDestroy(pReqs[i].dataLen); - } - - taosMemoryFree(pReqs); + stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId); } return code; @@ -400,7 +450,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); + int32_t code = sendDispatchMsg(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { if (!streamTaskShouldStop(&pTask->status)) { stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); @@ -524,25 +574,31 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - pTask->msgInfo.pData = pBlock; ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER || pBlock->type == STREAM_INPUT__TRANS_STATE); int32_t retryCount = 0; + pTask->taskExecInfo.dispatchCount += 1; + + int32_t code = doBuildDispatchMsg(pTask, pBlock); + if (code == 0) { + destroyStreamDataBlock(pBlock); + } else { // todo handle build dispatch msg failed + } while (1) { - int32_t code = doDispatchAllBlocks(pTask, pBlock); + code = sendDispatchMsg(pTask, pTask->msgInfo.pData); if (code == TSDB_CODE_SUCCESS) { break; } - stDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", id, - tstrerror(terrno), pTask->outputInfo.status, retryCount); + stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, + pTask->taskExecInfo.dispatchCount, tstrerror(terrno), pTask->outputInfo.status, retryCount); // todo deal with only partially success dispatch case atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore - destroyStreamDataBlock(pTask->msgInfo.pData); + destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); pTask->msgInfo.pData = NULL; return code; } @@ -552,6 +608,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } @@ -951,7 +1008,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { - destroyStreamDataBlock(pTask->msgInfo.pData); + destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); pTask->msgInfo.pData = NULL; if (pTask->msgInfo.blockingTs != 0) { @@ -974,6 +1031,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { const char* id = pTask->id.idStr; + int32_t msgId = pTask->taskExecInfo.dispatchCount; if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. @@ -982,14 +1040,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // happened too fast. // todo handle the shuffle dispatch failure if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore - stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); + stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); - SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData; - if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id); - streamProcessCheckpointReadyMsg(pTask); - } +// SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData; +// if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { +// stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id); +// streamProcessCheckpointReadyMsg(pTask); +// } // we should set the correct finish flag to make sure the shuffle dispatch will be executed completed. if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -1002,11 +1060,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } } } else { - stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code), ++pTask->msgInfo.retryCount); + stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, retry", id, msgId, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); + SStreamDispatchReq* pDispatchMsg = pTask->msgInfo.pData; - int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); - if (ret != TSDB_CODE_SUCCESS) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + for(int32_t i = 0; i < numOfVgroups; ++i) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (pVgInfo->vgId == pRsp->downstreamNodeId) { + stDebug("s-task:%s (child taskId:%d) re-send blocks:%d to vgId:%d", pTask->id.idStr, + pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId); + code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet); + } + } + } else { + sendDispatchMsg(pTask, pTask->msgInfo.pData); } } @@ -1017,22 +1087,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); if (leftRsp > 0) { - stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", id, pRsp->downstreamTaskId, - pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); + stDebug( + "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d " + "rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); return 0; } else { - stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + stDebug("s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id, + msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); } } else { - stDebug("s-task:%s recv fix-dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); } // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state - SStreamDataBlock* p = pTask->msgInfo.pData; - if (p->type == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask); @@ -1066,7 +1137,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i " wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d", id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); - } else { // pipeline send data in output queue + } else { // this message has been sent successfully, let's try next one. handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 777e93da47..8d651c43a0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -358,8 +358,9 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); if (pTask->msgInfo.pData != NULL) { - destroyStreamDataBlock(pTask->msgInfo.pData); + destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); pTask->msgInfo.pData = NULL; + pTask->msgInfo.dispatchMsgType = 0; } if (pTask->id.idStr != NULL) {