diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index 4b87dcfe84..c2a351a297 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -55,6 +55,7 @@ struct SStreamTaskSM { SArray* pTransList; // SArray int64_t stateTs; SStreamTask* pTask; + STaskStateTrans* pActiveTrans; }; typedef struct SStreamEventInfo { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index b6bd53eb87..c219556c6f 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -38,6 +38,7 @@ static int32_t initStateTransferTable(SStreamTaskSM* pSM); static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return 0; } static int32_t streamTaskStartCheckDownstream(SStreamTask* pTask) { stDebug("s-task:%s start to check downstream tasks", pTask->id.idStr); + streamTaskCheckDownstream(pTask); return 0; } static int32_t streamTaskDoPause(SStreamTask* pTask) { @@ -84,21 +85,33 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { taosMemoryFree(pSM); return NULL; } - return pSM; } int32_t taskSMHandleEvent(SStreamTaskSM* pSM, const EStreamTaskEvent* pEvent) { STaskStateTrans* pTrans = streamTaskFindTransform(pSM, pEvent); - qDebug("start to handle event:%d", *pEvent); + ASSERT(pSM->pActiveTrans == NULL); - pSM->current = pTrans->next; + stDebug("start to handle event:%d, state:%s", *pEvent, pSM->current.name); + pSM->pActiveTrans = pTrans; pSM->stateTs = taosGetTimestampMs(); - qDebug("new state:%s from %s", pTrans->next.name, pSM->current.name); - return pTrans->pAction(pSM->pTask); } +int32_t taskSMOnHandleEventSuccess(SStreamTaskSM* pSM) { + STaskStateTrans* pTrans = pSM->pActiveTrans; + EStreamTaskEvent* pEvent = &pTrans->event; + + int64_t el = (taosGetTimestampMs() - pSM->stateTs); + stDebug("handle event:%d completed, elapsd time:%" PRId64 "ms new state:%s from %s", *pEvent, el, pTrans->next.name, + pSM->current.name); + + // todo: add lock + pSM->current = pTrans->next; +} + + + STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn) { STaskStateTrans trans = {0}; trans.state = StreamTaskStatusList[current];