diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 28b67029ce..9eaa9fcb92 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -175,7 +175,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set task status if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { pTask->checkpointingId = checkpointId; - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); + return code; + } } { // todo: remove this when the pipeline checkpoint generating is used. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 28d5336a5e..e4365fe625 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { ETaskStatus s = streamTaskGetStatus(pTask, NULL); - return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); + return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING) || (s == TASK_STATUS__UNINIT); } bool streamTaskShouldPause(const SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 11a033a9a3..2dcf440729 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -118,9 +118,12 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { + } else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) { + // the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block. } else { ASSERT(0); } + return NULL; } @@ -237,7 +240,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); taosThreadMutexUnlock(&pTask->lock); - return -1; + return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event. } if (pSM->pActiveTrans != NULL) {