fix(stream): avoid the later pause overwrite the previous pause state.

This commit is contained in:
Haojun Liao 2024-10-10 18:34:52 +08:00
parent dfb47ac5a9
commit 0c7b3cfb39
1 changed files with 6 additions and 8 deletions

View File

@ -485,11 +485,6 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
static void keepPrevInfo(SStreamTaskSM* pSM) { static void keepPrevInfo(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans; STaskStateTrans* pTrans = pSM->pActiveTrans;
// we only keep the latest pause state
if (pSM->prev.state.state == TASK_STATUS__PAUSE && pSM->current.state == TASK_STATUS__PAUSE) {
return;
}
pSM->prev.state = pSM->current; pSM->prev.state = pSM->current;
pSM->prev.evt = pTrans->event; pSM->prev.evt = pTrans->event;
} }
@ -527,10 +522,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
return TSDB_CODE_STREAM_INVALID_STATETRANS; return TSDB_CODE_STREAM_INVALID_STATETRANS;
} }
keepPrevInfo(pSM); // repeat pause will not overwrite the previous pause state
if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) {
keepPrevInfo(pSM);
pSM->current = pTrans->next; pSM->current = pTrans->next;
pSM->pActiveTrans = NULL; pSM->pActiveTrans = NULL;
}
// todo remove it // todo remove it
// todo: handle the error code // todo: handle the error code