fix(stream): check for the repeatedly sent checkpoint-source msg.

This commit is contained in:
Haojun Liao 2023-11-09 23:26:09 +08:00
parent 7f3a50dfde
commit 730434ed82
1 changed files with 15 additions and 5 deletions

View File

@ -1663,7 +1663,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
@ -1671,7 +1671,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId);
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
@ -1691,7 +1691,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
tDecoderClear(&decoder);
// todo handle failure to reset from checkpoint procedure
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
@ -1702,7 +1701,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS;
}
// todo handle failure to reset from checkpoint procedure
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
@ -1723,7 +1721,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock);
@ -1734,6 +1732,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
// check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) {
ASSERT(pTask->checkpointingId == req.checkpointId);
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->checkpointingId);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);