diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 62d253e17a..6f78f99302 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2706,9 +2706,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (checkpointFailed && activeCheckpointId != 0) { - // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint); - mndResetFromCheckpoint(pMnode); + bool allReady = true; + SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(p); + + if (allReady) { + // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal + mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", + execInfo.activeCheckpoint); + mndResetFromCheckpoint(pMnode); + } else { + mInfo("not all vgroups are ready, wait for next HB from stream tasks"); + } } taosThreadMutexUnlock(&execInfo.lock); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index e6400be1b8..bcc2228dfd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -233,11 +233,17 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name); } else { pTrans = streamTaskFindTransform(pSM->current.state, event); + if (pTrans == NULL) { + stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); + taosThreadMutexUnlock(&pTask->lock); + return -1; + } + if (pSM->pActiveTrans != NULL) { // currently in some state transfer procedure, not auto invoke transfer, abort it - stDebug("s-task:%s handle event %s quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, - StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name, - StreamTaskEventList[event].name); + stDebug("s-task:%s handle event procedure %s quit, status %s -> %s failed, handle event %s now", + pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, + pSM->pActiveTrans->next.name, StreamTaskEventList[event].name); } doHandleEvent(pSM, event, pTrans);