diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 3d979f3d11..93f6c60da4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1882,7 +1882,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (pReq->msgId > pInfo->lastMsgId) { status = streamTaskAppendInputBlocks(pTask, pReq); - pInfo->lastMsgId = pReq->msgId; + if (status == TASK_INPUT_STATUS__NORMAL) { + stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %" PRId64, id, pInfo->lastMsgId, pReq->msgId); + pInfo->lastMsgId = pReq->msgId; + } else { + stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId); + } } else { stWarn( "s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv "