fix(stream): handle error when checkpoint is interrupted by nodeUpdate.

This commit is contained in:
Haojun Liao 2023-12-26 19:02:52 +08:00
parent 90645cbc77
commit 7eb0e42bb9
3 changed files with 25 additions and 6 deletions

View File

@ -1294,8 +1294,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (status == TASK_STATUS__CK) {
ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId);
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->chkInfo.checkpointingId);
" transId:%d already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);

View File

@ -746,7 +746,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
taosArrayClear(pTask->pReadyMsgList);
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
} else {
stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel);
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
}
taosThreadMutexUnlock(&pTask->lock);

View File

@ -625,10 +625,29 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
// todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
streamTaskBuildCheckpoint(pTask);
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
streamTaskBuildCheckpoint(pTask);
} else {
// todo refactor
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
0, tstrerror(code));
}
}
return 0;
}
}