fix(stream): handle error when checkpoint is interrupted by nodeUpdate.
This commit is contained in:
parent
ccf861dec8
commit
5592e11235
|
@ -1295,8 +1295,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
if (status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__CK) {
|
||||||
ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId);
|
ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId);
|
||||||
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
||||||
" already received, ignore this msg and continue process checkpoint",
|
" transId:%d already received, ignore this msg and continue process checkpoint",
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointingId);
|
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
|
@ -746,7 +746,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
||||||
taosArrayClear(pTask->pReadyMsgList);
|
taosArrayClear(pTask->pReadyMsgList);
|
||||||
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -625,10 +625,29 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
// todo other thread may change the status
|
// todo other thread may change the status
|
||||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||||
if (type == STREAM_INPUT__CHECKPOINT) {
|
if (type == STREAM_INPUT__CHECKPOINT) {
|
||||||
|
|
||||||
|
// todo add lock
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
ETaskStatus s = streamTaskGetStatus(pTask, &p);
|
||||||
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
|
if (s == TASK_STATUS__CK) {
|
||||||
streamTaskBuildCheckpoint(pTask);
|
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
|
||||||
|
streamTaskBuildCheckpoint(pTask);
|
||||||
|
} else {
|
||||||
|
// todo refactor
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||||
|
} else {
|
||||||
|
code = streamTaskSendCheckpointReadyMsg(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// todo: let's retry send rsp to upstream/mnode
|
||||||
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
|
||||||
|
0, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue