fix(stream):fix the bug when event in waiting list not fulfilled.

This commit is contained in:
Haojun Liao 2023-11-01 13:59:51 +08:00
parent 5ff89bc098
commit e4aeea176b
1 changed files with 10 additions and 2 deletions

View File

@ -323,13 +323,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList); SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
// OK, let's handle the attached event, since the task has reached the required status now // OK, let's handle the attached event, since the task has reached the required status now
if (pSM->current.state == pEvtInfo->status) { if (pSM->current.state == pEvtInfo->status) {
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr, stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
// remove it
taosArrayPop(pSM->pWaitingEventList);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
@ -344,6 +347,11 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
} else { } else {
return code; return code;
} }
} else {
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
pSM->current.name, StreamTaskEventList[pEvtInfo->event].name,
StreamTaskStatusList[pEvtInfo->status].name);
} }
} else { } else {
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);