fix(stream): discard the repeat send consensus-checkpointId msg.
This commit is contained in:
parent
acde36a25b
commit
c71413c2aa
|
@ -216,6 +216,7 @@ typedef struct SRestoreCheckpointInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t checkpointId; // latest checkpoint id
|
int64_t checkpointId; // latest checkpoint id
|
||||||
|
int32_t transId; // transaction id of the update the consensus-checkpointId transaction
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
} SRestoreCheckpointInfo;
|
} SRestoreCheckpointInfo;
|
||||||
|
|
|
@ -273,6 +273,7 @@ typedef struct SCheckpointInfo {
|
||||||
int64_t processedVer;
|
int64_t processedVer;
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t msgVer;
|
int64_t msgVer;
|
||||||
|
int32_t consensusTransId;// consensus checkpoint id
|
||||||
SActiveCheckpointInfo* pActiveInfo;
|
SActiveCheckpointInfo* pActiveInfo;
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
|
|
|
@ -846,6 +846,7 @@ static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStream
|
||||||
.checkpointId = checkpointId,
|
.checkpointId = checkpointId,
|
||||||
.startTs = ts,
|
.startTs = ts,
|
||||||
.nodeId = pTask->info.nodeId,
|
.nodeId = pTask->info.nodeId,
|
||||||
|
.transId = pTrans->id,
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -1178,16 +1178,25 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
||||||
|
|
||||||
|
if (pTask->chkInfo.consensusTransId >= req.transId) {
|
||||||
|
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard",
|
||||||
|
pTask->id.idStr, vgId, pTask->chkInfo.consensusTransId, req.transId);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask->chkInfo.checkpointId != req.checkpointId) {
|
if (pTask->chkInfo.checkpointId != req.checkpointId) {
|
||||||
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
|
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64" transId:%d", pTask->id.idStr, vgId,
|
||||||
pTask->chkInfo.checkpointId, req.checkpointId);
|
pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
|
||||||
pTask->chkInfo.checkpointId = req.checkpointId;
|
pTask->chkInfo.checkpointId = req.checkpointId;
|
||||||
tqSetRestoreVersionInfo(pTask);
|
tqSetRestoreVersionInfo(pTask);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, not update",
|
tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
|
||||||
pTask->id.idStr, vgId, req.checkpointId);
|
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->chkInfo.consensusTransId = req.transId;
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
|
|
@ -638,6 +638,7 @@ int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpoi
|
||||||
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
|
@ -649,6 +650,7 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo*
|
||||||
if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
|
|
Loading…
Reference in New Issue