From e4aeea176bcfc9345c8b865b5061ce12bf5e791e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 13:59:51 +0800 Subject: [PATCH] fix(stream):fix the bug when event in waiting list not fulfilled. --- source/libs/stream/src/streamTaskSm.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 71521bd8f2..fa14c0618f 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -323,13 +323,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); - SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList); + SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); // OK, let's handle the attached event, since the task has reached the required status now if (pSM->current.state == pEvtInfo->status) { - stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr, + stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr, StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); + // remove it + taosArrayPop(pSM->pWaitingEventList); + STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); @@ -344,6 +347,11 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even } else { return code; } + } else { + taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr, + pSM->current.name, StreamTaskEventList[pEvtInfo->event].name, + StreamTaskStatusList[pEvtInfo->status].name); } } else { taosThreadMutexUnlock(&pTask->lock);