diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index f3569d8973..7c80d15307 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1023,7 +1023,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); + stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStatePrepare(pTask); @@ -1121,21 +1121,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { - atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - - // disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state - STaskId* pRelTaskId = &pTask->streamTaskId; - SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId); - if (pStreamTask != NULL) { - atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); - streamMetaReleaseTask(pMeta, pStreamTask); - } - - stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64 - " close inputQ for upstream:0x%x", - id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId); + stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId); } status = streamTaskAppendInputBlocks(pTask, pReq);