Merge pull request #26370 from taosdata/fix/3_liaohj

fix(stream): remove invalid assert.
This commit is contained in:
Haojun Liao 2024-07-02 18:41:45 +08:00 committed by GitHub
commit 0a9bcc0870
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 9 additions and 6 deletions

View File

@ -2667,7 +2667,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId);
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
if (ckId != -1) { // consensus checkpoint id already exist
SRpcMsg rsp = {0};
rsp.code = 0;
@ -2678,7 +2679,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
doSendConsensusCheckpointRsp(&req, &rsp, ckId);
taosThreadMutexUnlock(&execInfo.lock);

View File

@ -929,8 +929,7 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTa
}
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
if (pInfo->genTs > 0) {
ASSERT(pInfo->checkpointId > 0);
if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0.
return pInfo->checkpointId;
}

View File

@ -1156,7 +1156,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS;
}
// discard the rsp from before restart
// discard the rsp, since it is expired.
if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
" from task createTs:%" PRId64 ", discard",
@ -1172,11 +1172,14 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
taosThreadMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) {
if (pTask->chkInfo.checkpointId != req.checkpointId) {
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
pTask->chkInfo.checkpointId, req.checkpointId);
pTask->chkInfo.checkpointId = req.checkpointId;
tqSetRestoreVersionInfo(pTask);
} else {
tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, no need to update",
pTask->id.idStr, vgId, req.checkpointId);
}
taosThreadMutexUnlock(&pTask->lock);