fix(stream): handle failure during checkpoint

This commit is contained in:
Haojun Liao 2023-10-31 10:38:17 +08:00
parent eaf76854f6
commit 72f3e4446d
3 changed files with 10 additions and 3 deletions

View File

@ -175,7 +175,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// set task status // set task status
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
pTask->checkpointingId = checkpointId; pTask->checkpointingId = checkpointId;
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
return code;
}
} }
{ // todo: remove this when the pipeline checkpoint generating is used. { // todo: remove this when the pipeline checkpoint generating is used.

View File

@ -24,7 +24,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamTask* pTask) { bool streamTaskShouldStop(const SStreamTask* pTask) {
ETaskStatus s = streamTaskGetStatus(pTask, NULL); ETaskStatus s = streamTaskGetStatus(pTask, NULL);
return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING) || (s == TASK_STATUS__UNINIT);
} }
bool streamTaskShouldPause(const SStreamTask* pTask) { bool streamTaskShouldPause(const SStreamTask* pTask) {

View File

@ -118,9 +118,12 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) {
} else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) {
// the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block.
} else { } else {
ASSERT(0); ASSERT(0);
} }
return NULL; return NULL;
} }
@ -237,7 +240,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
if (pTrans == NULL) { if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
return -1; return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
} }
if (pSM->pActiveTrans != NULL) { if (pSM->pActiveTrans != NULL) {