fix(stream): remove invalid assert.
This commit is contained in:
parent
4e56b6f9eb
commit
c7d4f45b7f
|
@ -2667,7 +2667,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
|
||||||
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
|
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
|
||||||
|
|
||||||
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId);
|
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId);
|
||||||
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
|
|
||||||
|
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
|
||||||
if (ckId != -1) { // consensus checkpoint id already exist
|
if (ckId != -1) { // consensus checkpoint id already exist
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
|
@ -2678,7 +2679,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
|
||||||
SMsgHead *pHead = rsp.pCont;
|
SMsgHead *pHead = rsp.pCont;
|
||||||
pHead->vgId = htonl(req.nodeId);
|
pHead->vgId = htonl(req.nodeId);
|
||||||
|
|
||||||
mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
|
mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
|
||||||
doSendConsensusCheckpointRsp(&req, &rsp, ckId);
|
doSendConsensusCheckpointRsp(&req, &rsp, ckId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
|
@ -929,8 +929,7 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTa
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
|
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
|
||||||
if (pInfo->genTs > 0) {
|
if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0.
|
||||||
ASSERT(pInfo->checkpointId > 0);
|
|
||||||
return pInfo->checkpointId;
|
return pInfo->checkpointId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1156,7 +1156,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// discard the rsp from before restart
|
// discard the rsp, since it is expired.
|
||||||
if (req.startTs < pTask->execInfo.created) {
|
if (req.startTs < pTask->execInfo.created) {
|
||||||
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
|
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
|
||||||
" from task createTs:%" PRId64 ", discard",
|
" from task createTs:%" PRId64 ", discard",
|
||||||
|
@ -1172,11 +1172,14 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
||||||
|
|
||||||
if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) {
|
if (pTask->chkInfo.checkpointId != req.checkpointId) {
|
||||||
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
|
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
|
||||||
pTask->chkInfo.checkpointId, req.checkpointId);
|
pTask->chkInfo.checkpointId, req.checkpointId);
|
||||||
pTask->chkInfo.checkpointId = req.checkpointId;
|
pTask->chkInfo.checkpointId = req.checkpointId;
|
||||||
tqSetRestoreVersionInfo(pTask);
|
tqSetRestoreVersionInfo(pTask);
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, no need to update",
|
||||||
|
pTask->id.idStr, vgId, req.checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
Loading…
Reference in New Issue