From c71413c2aa3b927252231880f3f7f00af8e053e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Jul 2024 14:52:15 +0800 Subject: [PATCH] fix(stream): discard the repeat send consensus-checkpointId msg. --- include/libs/stream/streamMsg.h | 1 + include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndStreamUtil.c | 1 + source/dnode/vnode/src/tqCommon/tqCommon.c | 17 +++++++++++++---- source/libs/stream/src/streamMsg.c | 2 ++ 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index b253054dfe..34921daac3 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -216,6 +216,7 @@ typedef struct SRestoreCheckpointInfo { int64_t startTs; int64_t streamId; int64_t checkpointId; // latest checkpoint id + int32_t transId; // transaction id of the update the consensus-checkpointId transaction int32_t taskId; int32_t nodeId; } SRestoreCheckpointInfo; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f867a82cbb..5ba0ce454c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -273,6 +273,7 @@ typedef struct SCheckpointInfo { int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t msgVer; + int32_t consensusTransId;// consensus checkpoint id SActiveCheckpointInfo* pActiveInfo; } SCheckpointInfo; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 0e498f20f6..c4adbd0fc3 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -846,6 +846,7 @@ static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStream .checkpointId = checkpointId, .startTs = ts, .nodeId = pTask->info.nodeId, + .transId = pTrans->id, }; int32_t code = 0; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a94c17f735..1f3c049211 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1178,16 +1178,25 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { taosThreadMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); + if (pTask->chkInfo.consensusTransId >= req.transId) { + tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", + pTask->id.idStr, vgId, pTask->chkInfo.consensusTransId, req.transId); + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + if (pTask->chkInfo.checkpointId != req.checkpointId) { - tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId, - pTask->chkInfo.checkpointId, req.checkpointId); + tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64" transId:%d", pTask->id.idStr, vgId, + pTask->chkInfo.checkpointId, req.checkpointId, req.transId); pTask->chkInfo.checkpointId = req.checkpointId; tqSetRestoreVersionInfo(pTask); } else { - tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, not update", - pTask->id.idStr, vgId, req.checkpointId); + tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update", + pTask->id.idStr, vgId, req.checkpointId, req.transId); } + pTask->chkInfo.consensusTransId = req.transId; taosThreadMutexUnlock(&pTask->lock); if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 1bc91d6984..40582b5144 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -638,6 +638,7 @@ int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpoi if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; tEndEncode(pEncoder); @@ -649,6 +650,7 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; tEndDecode(pDecoder);