fix(stream):

This commit is contained in:
Haojun Liao 2024-05-22 19:46:24 +08:00
parent 96e1487787
commit f146ae4198
1 changed files with 2 additions and 16 deletions

View File

@ -1023,7 +1023,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId);
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStatePrepare(pTask);
@ -1121,21 +1121,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
STaskId* pRelTaskId = &pTask->streamTaskId;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
if (pStreamTask != NULL) {
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
streamMetaReleaseTask(pMeta, pStreamTask);
}
stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64
" close inputQ for upstream:0x%x",
id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId);
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
}
status = streamTaskAppendInputBlocks(pTask, pReq);