From 737138856c8a2cd7832715cb11922f18a9f54850 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jan 2025 11:29:17 +0800 Subject: [PATCH] refactor(stream): track the msgId for each upstream tasks. --- include/common/streamMsg.h | 1 + source/libs/stream/src/streamDispatch.c | 12 +++++++++++- source/libs/stream/src/streamTask.c | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index d410bd17e0..85646a328a 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -41,6 +41,7 @@ typedef struct SStreamUpstreamEpInfo { SEpSet epSet; bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer + int64_t lastMsgId; } SStreamUpstreamEpInfo; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42d7f44b62..ee2d337ff2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1842,6 +1842,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return TSDB_CODE_STREAM_TASK_NOT_EXIST; } + stDebug("s-task:%s lastMsgId:%"PRId64 " for upstream taskId:0x%x(vgId:%d)", id, pInfo->lastMsgId, pReq->upstreamTaskId, + pReq->upstreamNodeId); + if (pMeta->role == NODE_ROLE_FOLLOWER) { stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); status = TASK_INPUT_STATUS__REFUSED; @@ -1866,7 +1869,14 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId); } - status = streamTaskAppendInputBlocks(pTask, pReq); + if (pReq->msgId > pInfo->lastMsgId) { + status = streamTaskAppendInputBlocks(pTask, pReq); + pInfo->lastMsgId = pReq->msgId; + } else { + stWarn("s-task:%s duplicate msgId:%d from upstream:0x%x, from vgId:%d already recv msgId:%" PRId64, id, + pReq->msgId, pReq->upstreamTaskId, pReq->srcVgId, pInfo->lastMsgId); + status = TASK_INPUT_STATUS__NORMAL; // still return success + } } } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d27ed520c6..dd33ee9613 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -100,6 +100,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { pEpInfo->nodeId = pTask->info.nodeId; pEpInfo->taskId = pTask->id.taskId; pEpInfo->stage = -1; + pEpInfo->lastMsgId = -1; return pEpInfo; }