From c7d4f45b7f669f729ab7988143d43a50241047bd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Jul 2024 15:33:25 +0800 Subject: [PATCH] fix(stream): remove invalid assert. --- source/dnode/mnode/impl/src/mndStream.c | 5 +++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 3 +-- source/dnode/vnode/src/tqCommon/tqCommon.c | 7 +++++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index db428635a8..fd9daf57db 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2667,7 +2667,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId); - int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream); + + int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream); if (ckId != -1) { // consensus checkpoint id already exist SRpcMsg rsp = {0}; rsp.code = 0; @@ -2678,7 +2679,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); - mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId); + mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId); doSendConsensusCheckpointRsp(&req, &rsp, ckId); taosThreadMutexUnlock(&execInfo.lock); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 98b9c7c0c7..66d32f53eb 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -929,8 +929,7 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTa } int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) { - if (pInfo->genTs > 0) { - ASSERT(pInfo->checkpointId > 0); + if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0. return pInfo->checkpointId; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1130ba8c05..668e178d2d 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1156,7 +1156,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } - // discard the rsp from before restart + // discard the rsp, since it is expired. if (req.startTs < pTask->execInfo.created) { tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 " from task createTs:%" PRId64 ", discard", @@ -1172,11 +1172,14 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { taosThreadMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); - if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) { + if (pTask->chkInfo.checkpointId != req.checkpointId) { tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); pTask->chkInfo.checkpointId = req.checkpointId; tqSetRestoreVersionInfo(pTask); + } else { + tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, no need to update", + pTask->id.idStr, vgId, req.checkpointId); } taosThreadMutexUnlock(&pTask->lock);