diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 580bf06b71..dbd4362466 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2247,6 +2247,7 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); execNodeList.pNodeEntryList = pValidNodeEntryList; + taosArrayDestroy(pRemoveTaskList); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bf7abc7457..fc3e17c826 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1111,20 +1111,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } - - // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); - ASSERT(pTask->info.fillHistory == 1); - - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens - } - - // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - return TSDB_CODE_SUCCESS; - } } int32_t leftRsp = 0; @@ -1168,6 +1154,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // this message has been sent successfully, let's try next one. pTask->msgInfo.retryCount = 0; + + // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); + ASSERT(pTask->info.fillHistory == 1); + + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens + } + + // now ready for next data output + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + return TSDB_CODE_SUCCESS; + } + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } }