diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f5de719848..9213c729d7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -887,6 +887,14 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { tstrerror(code)); } } + + // let's kill the query procedure within stream, to end it ASAP. + if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); + } + } return code; } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c3a2742aa2..f995c48688 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -500,7 +500,9 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even STaskStateTrans* pTrans = pSM->pActiveTrans; if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - + // when trying to finish current event successfully, another event with high priorities, such as dropping/stop, has + // interrupted this procedure, and changed the status after freeing the activeTrans, resulting in the failure of + // processing of current event. if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,