diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 756880dc4f..e6400be1b8 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -106,16 +106,16 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { } // todo optimize the perf of find the trans objs by using hash table -static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) { +static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) { int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans); for (int32_t i = 0; i < numOfTrans; ++i) { STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i); - if (pTrans->state.state == pState->current.state && pTrans->event == event) { + if (pTrans->state.state == state && pTrans->event == event) { return pTrans; } } - if (event == TASK_EVENT_CHECKPOINT_DONE && pState->current.state == TASK_STATUS__STOP) { + if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { } else { ASSERT(0); @@ -181,21 +181,9 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { return NULL; } -int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { +static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) { SStreamTask* pTask = pSM->pTask; - taosThreadMutexLock(&pTask->lock); - - STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); - if (pTrans == NULL) { - stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name); - taosThreadMutexUnlock(&pTask->lock); - return -1; - } else { - stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name, - pSM->current.name); - } - if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); @@ -209,20 +197,14 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name); return TSDB_CODE_SUCCESS; - } else {// this event has been handled already + } else { // this event has been handled already stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr, StreamTaskEventList[event].name); taosMsleep(100); } } - } else { - if (pSM->pActiveTrans != NULL) { - ASSERT(!pSM->pActiveTrans->autoInvokeEndFn); - stWarn("s-task:%s status:%s handle event:%s is interrupted, handle the new event:%s", pTask->id.idStr, - pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name, StreamTaskEventList[event].name); - } - + } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); @@ -238,6 +220,34 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { return TSDB_CODE_SUCCESS; } +int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { + SStreamTask* pTask = pSM->pTask; + STaskStateTrans* pTrans = NULL; + + while (1) { + taosThreadMutexLock(&pTask->lock); + if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { + taosThreadMutexUnlock(&pTask->lock); + taosMsleep(100); + stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", + pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name); + } else { + pTrans = streamTaskFindTransform(pSM->current.state, event); + if (pSM->pActiveTrans != NULL) { + // currently in some state transfer procedure, not auto invoke transfer, abort it + stDebug("s-task:%s handle event %s quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, + StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name, + StreamTaskEventList[event].name); + } + + doHandleEvent(pSM, event, pTrans); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; @@ -257,7 +267,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); // the pSM->prev.evt may be 0, so print string is not appropriate. stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt, - pTask->id.idStr); + pTask->id.idStr); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; @@ -283,7 +293,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); - STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event); + STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); pSM->pActiveTrans = pNextTrans; @@ -395,12 +405,12 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, - streamTaskSetReadyForWal, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, + NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, - streamTaskSetReadyForWal, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, + NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status