fix(stream): update the task last msgId when putting into input queue succ.

This commit is contained in:
Haojun Liao 2025-01-16 17:16:31 +08:00
parent 05a27eb7b0
commit eff1aff58e
1 changed files with 6 additions and 1 deletions

View File

@ -1882,7 +1882,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
if (pReq->msgId > pInfo->lastMsgId) {
status = streamTaskAppendInputBlocks(pTask, pReq);
pInfo->lastMsgId = pReq->msgId;
if (status == TASK_INPUT_STATUS__NORMAL) {
stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %" PRId64, id, pInfo->lastMsgId, pReq->msgId);
pInfo->lastMsgId = pReq->msgId;
} else {
stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId);
}
} else {
stWarn(
"s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv "