diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0c09e0e1ea..a6dac7f5ba 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -792,6 +792,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); +int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index 7be655fbed..ea0522bd5a 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -34,6 +34,8 @@ typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); typedef struct SAttachedEventInfo { ETaskStatus status; // required status that this event can be handled EStreamTaskEvent event; // the delayed handled event + void* pParam; + void* pFn; } SAttachedEventInfo; typedef struct STaskStateTrans { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1f7bb56ec1..b9839dfc0c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -356,9 +356,16 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); } else { - ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); - streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id); + ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); + int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id, + pStreamTask->id.idStr, tstrerror(code)); + streamMetaReleaseTask(pMeta, pStreamTask); + return code; + } else { + stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id); + } } // wait for the stream task to handle all in the inputQ, and to be idle diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 4bd6483f7f..d785932109 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -278,10 +278,10 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt ETaskStatus s = streamTaskGetStatus(pTask, NULL); taosThreadMutexUnlock(&pTask->lock); - if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { + if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); return TSDB_CODE_SUCCESS; - } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already + } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) { stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event)); taosMsleep(100); } else {