diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 0b6b6a9ef2..b7aa398e59 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -73,6 +73,7 @@ typedef struct SNodeEntry { bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. int64_t hbTimestamp; // second + int32_t lastHbMsgId; // latest hb msgId } SNodeEntry; typedef struct SOrphanTask { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index bc10ec211d..04dd135320 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -265,6 +265,30 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { return -1; } + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execInfo.pNodeList, i); + if (pEntry == NULL) { + continue; + } + + if (pEntry->nodeId != req.vgId) { + continue; + } + + if (pEntry->lastHbMsgId == req.msgId) { + mError("vgId:%d Hb msgId:%d already handled, discard", pEntry->nodeId, req.msgId); + + terrno = TSDB_CODE_INVALID_MSG; + doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + + taosThreadMutexUnlock(&execInfo.lock); + cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); + return -1; + } else { + pEntry->lastHbMsgId = req.msgId; + } + } + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index e4e30bdf10..23eb3656da 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -607,7 +607,10 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { for (int32_t j = 0; j < size; ++j) { SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); if (pEntry->nodeId == p->nodeId) { + p->hbTimestamp = pEntry->hbTimestamp; + taosArrayPush(pValidList, p); + mDebug("vgId:%d ts:%"PRId64" HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId); break; } }