From eff1aff58eb1f31ee8ec17d13048fafb56448952 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Jan 2025 17:16:31 +0800 Subject: [PATCH] fix(stream): update the task last msgId when putting into input queue succ. --- source/libs/stream/src/streamDispatch.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 3d979f3d11..93f6c60da4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1882,7 +1882,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (pReq->msgId > pInfo->lastMsgId) { status = streamTaskAppendInputBlocks(pTask, pReq); - pInfo->lastMsgId = pReq->msgId; + if (status == TASK_INPUT_STATUS__NORMAL) { + stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %" PRId64, id, pInfo->lastMsgId, pReq->msgId); + pInfo->lastMsgId = pReq->msgId; + } else { + stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId); + } } else { stWarn( "s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv "