fix(stream): add more check.

This commit is contained in:
Haojun Liao 2023-10-29 22:57:33 +08:00
parent 91121974d7
commit 1bbad532a7
2 changed files with 21 additions and 6 deletions

View File

@ -2706,9 +2706,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
bool allReady = true;
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
}
}
taosThreadMutexUnlock(&execInfo.lock);

View File

@ -233,11 +233,17 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name);
} else {
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name);
taosThreadMutexUnlock(&pTask->lock);
return -1;
}
if (pSM->pActiveTrans != NULL) {
// currently in some state transfer procedure, not auto invoke transfer, abort it
stDebug("s-task:%s handle event %s quit, status %s -> %s failed, handle event %s now", pTask->id.idStr,
StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name,
StreamTaskEventList[event].name);
stDebug("s-task:%s handle event procedure %s quit, status %s -> %s failed, handle event %s now",
pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name,
pSM->pActiveTrans->next.name, StreamTaskEventList[event].name);
}
doHandleEvent(pSM, event, pTrans);