diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7859c60944..971604ab1c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -746,7 +746,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM); +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 4c71841062..751b809fb7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -163,7 +163,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { } } else { stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskOnHandleEventSuccess(pTask->status.pSM); + streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); } return 0; @@ -313,7 +313,7 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) { // todo: refactor this function. static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { - streamTaskOnHandleEventSuccess(pTask->status.pSM); + streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); #if 0 const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 23dd06d640..5dfe3ec187 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -214,7 +214,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt // todo handle error code; if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM); + streamTaskOnHandleEventSuccess(pSM, event); } } @@ -262,7 +262,7 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { pSM->prev.evt = pTrans->event; } -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) { SStreamTask* pTask = pSM->pTask; // do update the task status @@ -280,6 +280,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { return TSDB_CODE_INVALID_PARA; } + if (pTrans->event != event) { + stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name, + pSM->current.name); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_INVALID_PARA; + } + keepPrevInfo(pSM); pSM->current = pTrans->next; @@ -309,7 +316,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { - return streamTaskOnHandleEventSuccess(pSM); + return streamTaskOnHandleEventSuccess(pSM, event); } else { return code; }