diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index 34921daac3..0ceaa93a72 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -164,6 +164,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint typedef struct SStreamHbMsg { int32_t vgId; int32_t msgId; + int64_t ts; int32_t numOfTasks; SArray* pTaskStatus; // SArray SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index d253e58703..89343ce37c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -80,6 +80,7 @@ typedef struct SNodeEntry { SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. int64_t hbTimestamp; // second int32_t lastHbMsgId; // latest hb msgId + int64_t lastHbMsgTs; } SNodeEntry; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b7ab76984a..a1fd75c774 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2085,7 +2085,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi break; } - SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; epsetAssign(&entry.epset, &pTask->info.epSet); (void)taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); } @@ -2265,7 +2265,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) } if (!exist) { - SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; epsetAssign(&nodeEntry.epset, &pTask->info.epSet); void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index bba39d0c98..50db903520 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -333,7 +333,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); + mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId, + req.numOfTasks, req.msgId, req.ts); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); @@ -366,17 +367,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } - if (pEntry->lastHbMsgId == req.msgId) { - mError("vgId:%d Hb msgId:%d already handled, discard", pEntry->nodeId, req.msgId); + if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { + mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); terrno = TSDB_CODE_INVALID_MSG; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); - return -1; + return terrno; } else { pEntry->lastHbMsgId = req.msgId; + pEntry->lastHbMsgTs = req.ts; } } @@ -417,6 +419,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { + mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code)); continue; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 9804943ec2..a158d6e4bb 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -142,11 +142,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } SStreamHbMsg* pMsg = &pInfo->hbMsg; - stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER), - pMeta->pHbInfo->hbCount); - pMsg->vgId = pMeta->vgId; pMsg->msgId = pMeta->pHbInfo->hbCount; + pMsg->ts = taosGetTimestampMs(); + + stDebug("vgId:%d build stream hbMsg, leader:%d HbMsgId:%d, HbMsgTs:%" PRId64, pMeta->vgId, + (pMeta->role == NODE_ROLE_LEADER), pMsg->msgId, pMsg->ts); pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index bc0faacb32..75cb0e6683 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -382,6 +382,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { } if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -454,6 +455,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { } if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; tEndDecode(pDecoder); return 0; diff --git a/tests/script/tsim/stream/checkpointInterval0.sim b/tests/script/tsim/stream/checkpointInterval0.sim index a548f05c82..a5e5c87704 100644 --- a/tests/script/tsim/stream/checkpointInterval0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -76,6 +76,8 @@ system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start +run tsim/stream/checkTaskStatus.sim + sql insert into t1 values(1648791213002,3,2,3,1.1); $loop_count = 0