From 72f3e4446d984cd90b1025f6ea911d1435aed57d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 10:38:17 +0800 Subject: [PATCH] fix(stream): handle failure during checkpoint --- source/libs/stream/src/streamCheckpoint.c | 6 +++++- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamTaskSm.c | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 28b67029ce..9eaa9fcb92 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -175,7 +175,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set task status if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { pTask->checkpointingId = checkpointId; - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); + return code; + } } { // todo: remove this when the pipeline checkpoint generating is used. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 28d5336a5e..e4365fe625 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { ETaskStatus s = streamTaskGetStatus(pTask, NULL); - return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); + return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING) || (s == TASK_STATUS__UNINIT); } bool streamTaskShouldPause(const SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 11a033a9a3..2dcf440729 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -118,9 +118,12 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { + } else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) { + // the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block. } else { ASSERT(0); } + return NULL; } @@ -237,7 +240,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); taosThreadMutexUnlock(&pTask->lock); - return -1; + return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event. } if (pSM->pActiveTrans != NULL) {