fix(stream): discard the processed hbmsg in the mnode.
This commit is contained in:
parent
da4018931b
commit
ad96333336
|
@ -73,6 +73,7 @@ typedef struct SNodeEntry {
|
||||||
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
|
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
|
||||||
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
||||||
int64_t hbTimestamp; // second
|
int64_t hbTimestamp; // second
|
||||||
|
int32_t lastHbMsgId; // latest hb msgId
|
||||||
} SNodeEntry;
|
} SNodeEntry;
|
||||||
|
|
||||||
typedef struct SOrphanTask {
|
typedef struct SOrphanTask {
|
||||||
|
|
|
@ -265,6 +265,30 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
|
||||||
|
SNodeEntry* pEntry = taosArrayGet(execInfo.pNodeList, i);
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->nodeId != req.vgId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->lastHbMsgId == req.msgId) {
|
||||||
|
mError("vgId:%d Hb msgId:%d already handled, discard", pEntry->nodeId, req.msgId);
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
pEntry->lastHbMsgId = req.msgId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||||
if (numOfUpdated > 0) {
|
if (numOfUpdated > 0) {
|
||||||
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||||
|
|
|
@ -607,7 +607,10 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
|
||||||
for (int32_t j = 0; j < size; ++j) {
|
for (int32_t j = 0; j < size; ++j) {
|
||||||
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
|
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
|
||||||
if (pEntry->nodeId == p->nodeId) {
|
if (pEntry->nodeId == p->nodeId) {
|
||||||
|
p->hbTimestamp = pEntry->hbTimestamp;
|
||||||
|
|
||||||
taosArrayPush(pValidList, p);
|
taosArrayPush(pValidList, p);
|
||||||
|
mDebug("vgId:%d ts:%"PRId64" HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue