fix(stream): check for the repeatedly sent checkpoint-source msg.
This commit is contained in:
parent
e1de1de421
commit
4d9b422874
|
@ -1668,7 +1668,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
SStreamCheckpointSourceReq req = {0};
|
||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
|
||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
@ -1676,7 +1676,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
}
|
||||
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId);
|
||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
@ -1696,7 +1696,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
// todo handle failure to reset from checkpoint procedure
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
||||
|
@ -1707,7 +1706,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo handle failure to reset from checkpoint procedure
|
||||
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
||||
if (pTask->status.downstreamReady != 1) {
|
||||
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
||||
|
@ -1728,7 +1726,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
|
||||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
||||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
||||
pTask->id.idStr, req.checkpointId);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
|
@ -1739,6 +1737,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// check if the checkpoint msg already sent or not.
|
||||
if (status == TASK_STATUS__CK) {
|
||||
ASSERT(pTask->checkpointingId == req.checkpointId);
|
||||
|
||||
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
||||
" already received, ignore this msg and continue process checkpoint",
|
||||
pTask->id.idStr, pTask->checkpointingId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
streamProcessCheckpointSourceReq(pTask, &req);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
|
|
Loading…
Reference in New Issue