diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e1b27049af..7ad0721531 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1269,9 +1269,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - tqScanWalAsync(pTq, false); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + if (code == TSDB_CODE_SUCCESS) { + tqScanWalAsync(pTq, false); + } } + streamMetaReleaseTask(pMeta, pStreamTask); } else { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 2d951147d0..7756d7a2e0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -580,7 +580,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; // execute in the scan history complete call back msg, ready to process data from inputQ - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); streamTaskSetSchedStatusInactive(pTask); taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 1aa4dd8e13..78728e82d1 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -105,6 +105,17 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE +static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) { + if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) { + if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) { + return true; + } + } + + return false; +} + // todo optimize the perf of find the trans objs by using hash table static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) { int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans); @@ -115,10 +126,8 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream } } - if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { - - } else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) { - // the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block. + if (isUnsupportedTransform(state, event)) { + return NULL; } else { ASSERT(0); }